This page is not created by, affiliated with, or supported by Slack Technologies, Inc.
2019-12-30
Channels
- # adventofcode (7)
- # announcements (9)
- # babashka (20)
- # beginners (182)
- # calva (9)
- # cider (20)
- # circleci (3)
- # clj-kondo (1)
- # clojure (269)
- # clojure-europe (2)
- # clojure-india (1)
- # clojure-italy (6)
- # clojure-nl (5)
- # clojure-uk (50)
- # clojurescript (56)
- # code-reviews (3)
- # core-async (174)
- # datomic (4)
- # duct (1)
- # emacs (3)
- # events (1)
- # fulcro (31)
- # graalvm (10)
- # graphql (8)
- # jobs (1)
- # joker (11)
- # juxt (7)
- # luminus (2)
- # malli (4)
- # off-topic (2)
- # overtone (1)
- # pathom (2)
- # re-frame (24)
- # shadow-cljs (42)
- # sql (1)
- # tools-deps (10)
Can you look on my question https://clojurians.slack.com/archives/C03S1KBA2/p1577696618239500 here? Am I using async correctly or is it not right pattern for async? When should I use async then and when not to use? Is it possible to solve my issue with async?
well in general this was made to avoid situation when processing is running at the same time, because it makes issues. So it is a queue.
not wrong to my eyes. i just saw a queue taking runnable functions which is essentially a simpler version of a thread pool
could it be one of these two?
* go-loop
works fine, but whoever is enqueueing has a problem
* some other (unrelated) piece of code uses core.async in such a way that it causes trouble to c.a's internal thread pool
also, the JMX suggestion you received is on point. Similarly you can run Yourkit in prod/staging and connect to it from your machine.
Tools like yourkit tell you exactly where/how your threads are stalling
We can assume go-loop
overflow after some time, so debugging is not really needed. But the question is how to fix it?
> We can assume go-loop overflow after some time
What makes you believe so? Your go-loop consumes work in an orderly way thanks to its <!
There's no such thing as consumption overflow (there is the notion of production overflow)
hmm that is what @delaguardo suggested
He seems to talk about the case of using #'worker
as a channel. Are you even doing that?
* go-loop works fine, but whoever is enqueueing has a problem
What do you mean by this point then?> He seems to talk about the case of using #'worker
as a channel. Are you even doing that?
I don’t and he said it is an issue, because this channel growing and growing
> What do you mean by this point then?
Hypothetically, the problem is in the code that enqueues the work with >!
. Maybe it's blocking indefinitely, making you believe the go-loop
doesn't work
Also, using put!
can be dangerous because it disregards queue sizes. Do you have any calls to put!
?
> Hypothetically, the problem is in the code that enqueues the work with >!
. Maybe it’s blocking indefinitely, making you believe the go-loop
doesn’t work
it works, because I see in logs it going further and finish adding things to queue. I have a message on the end to debug it.
1. channels is closed for some magic reason 2. worker died 3. magic issues about memory or overflow something
but the strangest thing is both workers stop doing jobs at the same time - at least I think it is like that
...it could be many things. thread
uses an unbounded (but cached) thread pool which might make the OS upset.
but because both workers stop doing jobs at the same time…. something overflow somehow probably
go-loop
as it name implies is made for looping so... I doubt it. It'd be extremely fragile if it'd didn't support infinite loops
a/thread
are daemon threads so the JVM does not wait for them when exiting. Maybe something else is making your JVM terminate....
the issue is (I guess) worker
and worker-warehouse
overflow capacity after 2-4 weeks
I'd use something like AWS SQS for never losing work and having a greater degree of observability
I can lose work, it is designed to not be a problem. I would like to keep it in the app to decrease complexity
> I can lose work That's an interesting requirement I'd say, in that case you should be able to use whatever you feel more comfortable with. Could be agents, futures, core.async, j.u.concurrent, claypoole... obviously they have different demands in case something goes wrong (i.e., many people will recommend you j.u.c, but if something goes wrong will you pick up a Java book? Many devs won't)
> That’s an interesting requirement The processing is resistant for crash in any moment
if you are comfortable with it, it's more observable say it's a thread pool with threadsize 1, backed by a queue of max size 1000 if someting goes wrong you can fetch the queue size, its contents, etc. Likewise you might interact with the thread pool itself (check javadocs). also the error handling will be more straightforward. Less magic overall (vs. core.async)
but from logs I am sure both workers stop doing new jobs at the same time which probably should tell me something, but it doesn’t
make sure your channel (queues and workers) are opened
(clojure.core.async.impl.protocols/closed? chan)
and be careful with that function because it is intended to be “implementation details” and it should not be a part of your code base
I can’t recreate it on localhost, so only one thing which I can do is push it to production and wait 2-4 weeks
(clojure.core.async.impl.protocols/closed? chan)
do you know how to run it on production?
No implementation of method: :closed? of protocol: #'clojure.core.async.impl.protocols/Channel found for class: nil
while it works on localhost in repl and even compile with it
@delaguardo ^ do you know how to run it?
you are trying to call it with nil as an argument, but it expects a channel
I’m gessing here, based on error message)
(defonce worker
(go-loop []
(let [job (<! queue)]
(l/debug "run job" job)
(when-not (nil? job)
(<! (thread
(try
(job)
(catch Throwable ex
(l/error ex)))))
(recur)))))
(defonce worker-warehouse
(dotimes [x 3]
(go-loop []
(let [job (<! queue-warehouse)]
(l/debug "run warehouse job" job)
(when-not (nil? job)
(<! (thread
(try
(job)
(catch Throwable ex
(l/error ex)))))
(recur))))))
(defn add-job [ch f]
(l/debug "async: add" f "to" ch ": " (>!! ch f))
(l/debug "worker:" (clojure.core.async.impl.protocols/closed? worker)
"worker-warehouse" (clojure.core.async.impl.protocols/closed? worker-warehouse)))
but worker-warehouse should be nil, isn’t it?
there is dotimes call inside
there is also a strange (when-not (nil? job
The only way to get nil from a channel is to consume from closed and exhausted channel
but after consuming everything your code is going into recur branch and will try to consume from already closed channel then will got to the same branch
but this is a big assumption, I mean statement that it will nether be true
ah, sorry, my bad
misreading
the funny thing it started to happen after some update, maybe version of async or who knows. It wasn’t like that from beginning
Like I said in other thread, your go-loop is pretty weird. I would rewrite it so it logs on nil and stops looping. Since when the channel take returns nil, it means the channel has closed. At least that way you'd know if the issue you're encountering is due to the channel closing
(defonce worker
(go-loop []
(if-let [job (<! queue)]
(do
(l/debug "run job" job)
(<! (thread
(try
(job)
(catch Throwable ex
(l/error ex)))))
(recur))
(l/debug "channel closed"))))
why this issue happening? I have no idea. I added extra debugs and I have to wait a few weeks
I have logs
(l/debug "run job" job)
and I see both workers stop run new jobs at the same timeIf I was you though, I'd just do this instead:
(defonce worker
(thread
(loop [job (<!! queue)]
(l/debug "run job" job)
(when-not (nil? job)
(try
(job)
(catch Throwable ex
(l/error ex)))
(recur (<!! queue))))))
Well, since you want to process one job per worker at a time. This doesn't create a new thread on every job, it just reuses the same thread, and runs one job after another on it.
Also, it isn't as much at risk of having issues where you've blocked the parking threadpool.
(defn worker [process-fn]
(let [in (chan)
out (chan)]
(go-loop []
(let [msg (<! in)]
(>! out (process-fn msg)))
(recur))
[in out]))
consider something like this. worker function that returns in and out queues connected by process-fn. It will give you more control on created channelsthis is also allows you to restart worker on some condition
why is that? i’m not restricting chan by any buffer-size
So, if you stop seeing "run job" at the same time, it could be the queues are empty, and your puts are failing, it could be they are deadlocked, or it could be the channel have closed. That's all I can think of
small question - from the code above:
(loop [job (<! queue)]
...
(job))
does it means that you are passing functions throw channels?Well, your process would exit if it stops looping. Normally an application starts and closes when it runs out of instructions to run, unless it has a main loop of some sort. In your case, your go-loop seem to be what does it. So if there's no other infinite loop, your program should shutdown if it was that the channels were closed and returned nil which would have exited the loops.
is it possible for that functions to throw checked exceptions? they are not runtime so will not be catched by try ... Trhowable
something like ClassNotFoundException
@delaguardo which function?
those exception will be thrown in the thread so your worker will be there
(job)
What do you mean?
(try
(job)
(catch Throwable ex
(l/error ex)))
is not exactly this?There's another possibility I'm thinking. I'm not sure what happens to the channel returned by thread on that exception case
if (job)
throws checked exception it will not be catched by your try/catch block
Ya, you could try and wait and see if others have ideas, but it seems to be a very rare ocurence so it will be hard
(defn sync-auto []
(if (state/get-sync)
(do
(slack/send-info "> > > Sync auto start add to queue")
(let [shop-ids (keys conf/shops)]
(doseq [shop-id shop-ids]
(add-job queue #(orders/atomstore-sync->wfirma shop-id :bk)))
(add-job queue sync-warehouse)
(doseq [shop-id shop-ids]
(add-job queue #(invoices/send-ivoices shop-id))))
(slack/send-info "< < < Sync auto added tasks to queue"))
(slack/send-warning "! ! ! Synchronization if off. Nothing to do.")))
I have super simple fn to add jobs