This page is not created by, affiliated with, or supported by Slack Technologies, Inc.
2020-02-27
Channels
- # announcements (8)
- # architecture (3)
- # aws (18)
- # beginners (96)
- # bristol-clojurians (3)
- # calva (15)
- # cider (7)
- # clj-kondo (8)
- # clojure (135)
- # clojure-denmark (1)
- # clojure-dev (14)
- # clojure-europe (37)
- # clojure-italy (9)
- # clojure-nl (14)
- # clojure-sanfrancisco (1)
- # clojure-spec (1)
- # clojure-uk (54)
- # clojurescript (27)
- # core-async (243)
- # cursive (28)
- # data-science (6)
- # datomic (33)
- # fulcro (25)
- # graalvm (24)
- # hoplon (2)
- # instaparse (12)
- # jackdaw (1)
- # java (21)
- # juxt (12)
- # meander (10)
- # nyc (4)
- # off-topic (6)
- # om (3)
- # pathom (17)
- # perun (1)
- # re-frame (29)
- # reitit (4)
- # rum (3)
- # shadow-cljs (119)
- # spacemacs (31)
- # xtdb (14)
Do you remember my magic issue about async stuck? I promised you to write here if I will learn something new:
(defonce queue (chan 100))
(defonce queue-warehouse (chan 100))
(defonce worker
(go-loop []
(let [job (<! queue)]
(l/debug "run job" job)
(when-not (nil? job)
(<! (thread
(try
(job)
(catch Throwable ex
(l/error ex)))))
(recur)))))
(def worker-warehouse
(doall
(repeat 3
(go-loop []
(let [job (<! queue-warehouse)]
(l/debug "run warehouse job" job)
(when-not (nil? job)
(<! (thread
(try
(job)
(catch Throwable ex
(l/error ex)))))
(recur)))))))
(defn add-job [ch f]
(l/debug "async: add" f "to" ch ": " (>!! ch f))
(l/debug "worker:" (clojure.core.async.impl.protocols/closed? worker)
"worker-warehouse" (pr-str (map clojure.core.async.impl.protocols/closed? worker-warehouse))))
And while everything stuck I see:
20-02-26 10:45:33 etingroup-sync-ff7dd5fdb-sf24n DEBUG [etingroup.sync:46] - async: add etingroup.sync$sync_auto$fn__24044@671e9b85 to clojure.core.async.impl.channels.ManyToManyChannel@7a4a107e : true
20-02-26 10:45:33 etingroup-sync-ff7dd5fdb-sf24n DEBUG [etingroup.sync:47] - worker: false worker-warehouse (false false false)
so it means the task was added correctly and workers are not closed.still I have no idea why this happening, but I have learned queues and workers didn’t crash and still running
so it looks like workers are running because of clojure.core.async.impl.protocols/closed?
and new tasks are added because of (l/debug "async: add" f "to" ch ": " (>!! ch f))
but anyway this things are never run
the only 1 explanation which I have is something in worker
start to do job endless in time which doesn’t make sense
ok I have crazy idea… I am curious what will happen if SOAP API (third party system) will keep connection endless, because of the bug on they side. If there is any timeout. Probably not this, but worth to check
my guess is your jobs are adding things to the work queue, which is a very easy way to lock things up
your workers are waiting for existing jobs to finish before pulling from the work queue again, and your jobs are waiting for space in the work queue to add items before exiting
@serioga
> what calls add-job
?
What do you mean? Exactly what is in the code. You probably asking about (>!! ch f)
part.
> do you add jobs to worker
or worker-warehouse
?
both
> do you log activity inside your jobs?
> enter/exit etc?
yes and there is silence
> why worker-warehouse
is not defonce?
I was testing something and forgot to change before paste
async: add etingroup.sync$sync_auto$fn__24044@671e9b85 to clojure.core.async.impl.channels.ManyToManyChannel@7a4a107e : true
(defn add-job [ch f]
(l/debug "async: add" f "to" ch ": " (>!! ch f))
(l/debug "worker:" (clojure.core.async.impl.protocols/closed? worker)
"worker-warehouse" (pr-str (map clojure.core.async.impl.protocols/closed? worker-warehouse))))
it gets 2 parameters, first is the channel. I add this to both queues«What do you mean? Exactly what is in the code.» In your code this function is declared but not used
the other very common thing is you've been messing around in a repl for while, and who knows what state your repl is in
«yes and there is silence» so there are no job which are started but not finished? they just don't start?
> the other very common thing is you’ve been messing around in a repl for while, and who knows what state your repl is in the bug is on production
> so there are no job which are started but not finished? any job start or finish, silence in logs. Logs showing only adding to queue
because it is an easy mistake to make when you are trying to build some kind of work queue thing in core.async instead of using executors directly
still I am adding to queue and I am sure because I see this in logs. Workers still exists, because I see this in logs.
worker
has to freeze for some reason in something endless or there is bug in modules which I use
because they are waiting for space in the queue, but there won't be space in the queue until the job exits so the worker go loops consume from the queue
literally you don't use loops? or do you mean your jobs don't have a recursive structure
the actual structure of your jobs doesn't matter, it only matters if they ever call add-job
> because they are waiting for space in the queue, but there won’t be space in the queue until the job exits so the worker go loops consume from the queue no. As I said logs show clear the job is added and not waiting for space in the queue
«unless endless API query can exist» If you are not sure then better to add timeout channels
so to make this even more clear. I added 2 queues instead of 1 because of this bug. Before there was only 1 queue and issue was the same.
> «do you call add-job in you jobs?» now I add in queue to qeueue-warehouse before I didn’t do this and the issue was the same
> Making top level def go routines is an antipattern What is right pattern for this case then? @ghadi
executors by default have an unbounded queue which will break your feedback loop (but unbounded queues can introduce other problems)
you could do something like setting the buffer size on the channel to Long/MAX_VALUE
As I mentioned before I added queue-warehouse
to split things to easier debug and to speed the process. But before it was working with only one queueu
«I don’t think the buffer size is the issue» Well, anyway you should decide behaviour if there is no room in queue chan like dropping or sliding buffer
lests assume this will stuck because of the buffer size. Even then from time to time worker will finish the job and new job can come in.
(defn sync-warehouse []
(let [shop-ids (keys conf/shops)
products-reservations (->> (pmap #(vector % (orders/atomstore-orders-reservations->products-reservation %)) shop-ids)
(into {}))
products-from-orders-not-synchronized-yet (->> (pmap #(vector % (orders/atomstore-orders-not-synchronized-yet->products-reservation %)) shop-ids)
(into {}))
products-reservations+not-synchronized-orders (reduce (fn [m shop-id]
(assoc m shop-id
(->> ((juxt products-reservations products-from-orders-not-synchronized-yet) shop-id)
(apply merge-with +))))
{} shop-ids)
stock-change (orders/products-reservations->stock-change-for-shop products-reservations+not-synchronized-orders)
products (products/wfirma->products :bk)
update-stock (fn [shop-id]
(->> products
(map (fn [product]
(if-let [reservation (get-in stock-change [shop-id (:code product)])]
(update product :quantity #(- % reservation))
product)))))
stock-round-down (fn [stock]
(map (fn [product]
(update product :quantity #(Math/floor %)))
stock))]
(add-job queue-warehouse #(products/products->atomstore :kh (update-stock :kh) {:prices? true
:stock? true}))
(add-job queue-warehouse #(products/products->atomstore :yms (update-stock :yms) {:stock? true}))
(add-job queue-warehouse #(products/products->atomstore :hurt (stock-round-down (update-stock :hurt)) {:stock? true}))
(add-job queue-warehouse #(warehouse/update-cache-supplier-orders products products-reservations+not-synchronized-orders))))
(defn sync-auto []
(if (state/get-sync)
(do
(slack/send-info "> > > Sync auto start add to queue")
(let [shop-ids (keys conf/shops)]
(doseq [shop-id shop-ids]
(add-job queue #(orders/atomstore-sync->wfirma shop-id :bk)))
(add-job queue sync-warehouse)
(doseq [shop-id shop-ids]
(add-job queue #(invoices/send-ivoices shop-id))))
(slack/send-info "< < < Sync auto added tasks to queue"))
(slack/send-warning "! ! ! Synchronization if off. Nothing to do.")))
so my guess is queue-worker
stuck in something endless…. But I don’t see how. Also I don’t see any CPU overusage.
@kwladyka «And while everything stuck I see:» you see it every 30 minute after everything is stuck?
> all of which points to a deadlock waiting on your queues
Where in your opinion can be deadlock?
Nothing block queue-warehouse
the things go only into 1 direction
https://clojurians.slack.com/archives/C05423W6H/p1582830301101200 and jobs don't run?
0-02-26 09:45:33 etingroup-sync-ff7dd5fdb-sf24n DEBUG [etingroup.sync:47] - worker: false worker-warehouse (false false false)
20-02-26 09:45:33 etingroup-sync-ff7dd5fdb-sf24n DEBUG [etingroup.slack:26] - Slack send: {:color "#0000FF", :text "< < < Sync auto added tasks to queue"}
20-02-26 10:15:33 etingroup-sync-ff7dd5fdb-sf24n DEBUG [etingroup.slack:26] - Slack send: {:color "#0000FF", :text "> > > Sync auto start add to queue"}
20-02-26 10:15:33 etingroup-sync-ff7dd5fdb-sf24n DEBUG [etingroup.sync:46] - async: add etingroup.sync$sync_auto$fn__24036@1ebb3fc to clojure.core.async.impl.channels.ManyToManyChannel@7a4a107e : true
20-02-26 10:15:33 etingroup-sync-ff7dd5fdb-sf24n DEBUG [etingroup.sync:47] - worker: false worker-warehouse (false false false)
20-02-26 10:15:33 etingroup-sync-ff7dd5fdb-sf24n DEBUG [etingroup.sync:46] - async: add etingroup.sync$sync_auto$fn__24036@40a18579 to clojure.core.async.impl.channels.ManyToManyChannel@7a4a107e : true
20-02-26 10:15:33 etingroup-sync-ff7dd5fdb-sf24n DEBUG [etingroup.sync:47] - worker: false worker-warehouse (false false false)
20-02-26 10:15:33 etingroup-sync-ff7dd5fdb-sf24n DEBUG [etingroup.sync:46] - async: add etingroup.sync$sync_auto$fn__24036@28d752b5 to clojure.core.async.impl.channels.ManyToManyChannel@7a4a107e : true
20-02-26 10:15:33 etingroup-sync-ff7dd5fdb-sf24n DEBUG [etingroup.sync:47] - worker: false worker-warehouse (false false false)
20-02-26 10:15:33 etingroup-sync-ff7dd5fdb-sf24n DEBUG [etingroup.sync:46] - async: add etingroup.sync$sync_warehouse@647d254d to clojure.core.async.impl.channels.ManyToManyChannel@7a4a107e : true
20-02-26 10:15:33 etingroup-sync-ff7dd5fdb-sf24n DEBUG [etingroup.sync:47] - worker: false worker-warehouse (false false false)
20-02-26 10:15:33 etingroup-sync-ff7dd5fdb-sf24n DEBUG [etingroup.sync:46] - async: add etingroup.sync$sync_auto$fn__24044@1b54dc3e to clojure.core.async.impl.channels.ManyToManyChannel@7a4a107e : true
20-02-26 10:15:33 etingroup-sync-ff7dd5fdb-sf24n DEBUG [etingroup.sync:47] - worker: false worker-warehouse (false false false)
20-02-26 10:15:33 etingroup-sync-ff7dd5fdb-sf24n DEBUG [etingroup.sync:46] - async: add etingroup.sync$sync_auto$fn__24044@36c12338 to clojure.core.async.impl.channels.ManyToManyChannel@7a4a107e : true
20-02-26 10:15:33 etingroup-sync-ff7dd5fdb-sf24n DEBUG [etingroup.sync:47] - worker: false worker-warehouse (false false false)
20-02-26 10:15:33 etingroup-sync-ff7dd5fdb-sf24n DEBUG [etingroup.sync:46] - async: add etingroup.sync$sync_auto$fn__24044@5f1cb6ba to clojure.core.async.impl.channels.ManyToManyChannel@7a4a107e : true
20-02-26 10:15:33 etingroup-sync-ff7dd5fdb-sf24n DEBUG [etingroup.sync:47] - worker: false worker-warehouse (false false false)
20-02-26 10:15:33 etingroup-sync-ff7dd5fdb-sf24n DEBUG [etingroup.slack:26] - Slack send: {:color "#0000FF", :text "< < < Sync auto added tasks to queue"}
20-02-26 10:45:33 etingroup-sync-ff7dd5fdb-sf24n DEBUG [etingroup.slack:26] - Slack send: {:color "#0000FF", :text "> > > Sync auto start add to queue"}
20-02-26 10:45:33 etingroup-sync-ff7dd5fdb-sf24n DEBUG [etingroup.sync:46] - async: add etingroup.sync$sync_auto$fn__24036@141dd6dc to clojure.core.async.impl.channels.ManyToManyChannel@7a4a107e : true
20-02-26 10:45:33 etingroup-sync-ff7dd5fdb-sf24n DEBUG [etingroup.sync:47] - worker: false worker-warehouse (false false false)
20-02-26 10:45:33 etingroup-sync-ff7dd5fdb-sf24n DEBUG [etingroup.sync:46] - async: add etingroup.sync$sync_auto$fn__24036@3b5a82d3 to clojure.core.async.impl.channels.ManyToManyChannel@7a4a107e : true
20-02-26 10:45:33 etingroup-sync-ff7dd5fdb-sf24n DEBUG [etingroup.sync:47] - worker: false worker-warehouse (false false false)
20-02-26 10:45:33 etingroup-sync-ff7dd5fdb-sf24n DEBUG [etingroup.sync:46] - async: add etingroup.sync$sync_auto$fn__24036@65462cf9 to clojure.core.async.impl.channels.ManyToManyChannel@7a4a107e : true
20-02-26 10:45:33 etingroup-sync-ff7dd5fdb-sf24n DEBUG [etingroup.sync:47] - worker: false worker-warehouse (false false false)
20-02-26 10:45:33 etingroup-sync-ff7dd5fdb-sf24n DEBUG [etingroup.sync:46] - async: add etingroup.sync$sync_warehouse@647d254d to clojure.core.async.impl.channels.ManyToManyChannel@7a4a107e : true
20-02-26 10:45:33 etingroup-sync-ff7dd5fdb-sf24n DEBUG [etingroup.sync:47] - worker: false worker-warehouse (false false false)
20-02-26 10:45:33 etingroup-sync-ff7dd5fdb-sf24n DEBUG [etingroup.sync:46] - async: add etingroup.sync$sync_auto$fn__24044@2ed18f00 to clojure.core.async.impl.channels.ManyToManyChannel@7a4a107e : true
20-02-26 10:45:33 etingroup-sync-ff7dd5fdb-sf24n DEBUG [etingroup.sync:47] - worker: false worker-warehouse (false false false)
20-02-26 10:45:33 etingroup-sync-ff7dd5fdb-sf24n DEBUG [etingroup.sync:46] - async: add etingroup.sync$sync_auto$fn__24044@671e9b85 to clojure.core.async.impl.channels.ManyToManyChannel@7a4a107e : true
20-02-26 10:45:33 etingroup-sync-ff7dd5fdb-sf24n DEBUG [etingroup.sync:47] - worker: false worker-warehouse (false false false)
I don't know, I don't have all your code, and I don't really want to read it, but the situation you are describing matches one
look, just remove the buffers from your channels and run your tests and see if things lock up
@hiredman I know it looks like that, but I don’t see logical possibility of that. Especially also there was only 1 queue to which cron was adding
I think there is no other way to solve this then add more debug to determine exect line where it stops
@kwladyka
well, I feel like >!!
is blocked inside your thread
— to many jobs for fixed size chan
my suggestions:
1. replace >!!
with non-blocking put!
2. add logging in add-job
before calling >!!
(now you will see nothing if >!!
stuck)
(def queue (java.util.concurrent.Executors/newFixedThreadPool. 3))
(defn add-job [queue f]
(.execute queue f))
replaces all the core.async code above> well, I feel like >!!
is blocked inside your thread
— to many jobs for fixed size chan
> my suggestions:
lets assume this can be true. So what about:
1. we see in logs new jobs are added to queue
2. even if queue
-> queue-warehouse
will wait because of the buffer size from time to time worker-warehouse
will finish the job, so the new job will be added.
Right? Do I miss something?
like your analysis is based on the code you are looking at, are you sure it is exactly the same as the code in production?
the logs you shared, as already mentioned, only show add-job being called with the same channel over and over again
(defn add-job [ch f]
(l/debug "async: add" f "to" ch ": " (>!! ch f))
(l/debug "worker:" (clojure.core.async.impl.protocols/closed? worker)
"worker-warehouse" (pr-str (map clojure.core.async.impl.protocols/closed? worker-warehouse))))
is it about current implementation or future one? https://clojurians.slack.com/archives/C05423W6H/p1582831471121700
hmm I have to check this, but I found this somewhere as a solution to make X workers
1. We don't see failed attempts to add job in logs for current implementation.
2. worker-warehouse
cannot finish any job, it waits for adding job to the channel which is not consumed anymore.
https://clojurians.slack.com/archives/C05423W6H/p1582831471121700
Ah! Now I understand why testing queue-warehouse did not work parallel https://clojurians.slack.com/archives/C05423W6H/p1582831795128100
I will fix this 3 workers thing, add more debugs, etc. Thank for help. I know this was hard discussion, but the issue is also very hard to figure out.
@kwladyka just do these two simple changes and remove queue-warehouse back (as not relevant) https://clojurians.slack.com/archives/C05423W6H/p1582831143118000
sure, I will also try decrease buffer size. Just I need to add more things to debug this
I don’t know. To determine some magic 😉 For example if with buffer 10 it will stuck after a week always this will mean something. I don’t think this is the case, but have to experiment.
my opinion: using put!
you eliminate possibility to block
adding logging is “just in case”
«and stop logging I/O inside a go block» generally it is correct but I don't bother so hard 🙂 I configure async loggers instead
«asking to hit the 1024 pending puts limit» fine. Then he will need to think about back-pressure but I doubt in this particular case
I think that architecture of this solution is problematic itself I'm not ready to offer architectural changes quickly.
architecture is the most important part of getting these little concurrent machines to work well
where does new data come from? where does it go? what happens when the data is produced fast than it can be consumed? etc. etc.
if you're able to reproduce while running in debug mode (say in intellij) then have you tried to inspect your threads to see what blocks?
> if you’re able to reproduce while running in debug mode (say in intellij) then have you tried to inspect your threads to see what blocks? I can’t
This video helps a lot in understanding internals: https://vimeo.com/100518968
(racey, for the same reason the closed?
method is not public API -- it's almost guaranteed to be racey)
whatever is responsible for creating workers should pass their input channels as arguments to those go routines, along with maybe a logging channel
(defonce worker
(go-loop []
(let [job (<! queue)]
(l/debug "run job" job)
(when-not (nil? job)
(<! (thread
(try
(job)
(catch Throwable ex
(l/error ex)))))
(recur)))))
hmm what is wrong with this?so the best practice is to launch-worker
each time when run the job? And each time create a channel for this purpose?
Or only launch workers once with initialization?
@kwladyka you should read and understand the pmax
example https://stuartsierra.com/2013/12/08/parallel-processing-with-core-async
I did not mean to change the internal state, only to inspect it so you know if buffer is full, the number of pending put, etc