Fork me on GitHub
#core-async
<
2019-12-30
>
kwladyka09:12:53

Can you look on my question https://clojurians.slack.com/archives/C03S1KBA2/p1577696618239500 here? Am I using async correctly or is it not right pattern for async? When should I use async then and when not to use? Is it possible to solve my issue with async?

kwladyka09:12:15

^ the discussion is in the thread of this message

dpsutton09:12:32

does it need to be async rather than a thread pool?

kwladyka09:12:07

well in general this was made to avoid situation when processing is running at the same time, because it makes issues. So it is a queue.

👍 4
kwladyka09:12:32

anyway I would like to learn the topic around

kwladyka09:12:32

so is it wrong pattern for async?

dpsutton09:12:37

not wrong to my eyes. i just saw a queue taking runnable functions which is essentially a simpler version of a thread pool

kwladyka09:12:07

how can I fix my issue using async?

vemv10:12:50

could it be one of these two? * go-loop works fine, but whoever is enqueueing has a problem * some other (unrelated) piece of code uses core.async in such a way that it causes trouble to c.a's internal thread pool also, the JMX suggestion you received is on point. Similarly you can run Yourkit in prod/staging and connect to it from your machine. Tools like yourkit tell you exactly where/how your threads are stalling

kwladyka10:12:52

We can assume go-loop overflow after some time, so debugging is not really needed. But the question is how to fix it?

vemv10:12:30

> We can assume go-loop overflow after some time What makes you believe so? Your go-loop consumes work in an orderly way thanks to its <! There's no such thing as consumption overflow (there is the notion of production overflow)

kwladyka10:12:24

hmm that is what @delaguardo suggested

kwladyka10:12:05

but yes I don’t understand it too how it overflows

kwladyka10:12:26

unless it is about recur overflow?

vemv10:12:32

He seems to talk about the case of using #'worker as a channel. Are you even doing that?

kwladyka10:12:29

I think not

vemv10:12:37

no, if recur compiles it's tail-recursive and doesn't accrete a stack

kwladyka10:12:13

* go-loop works fine, but whoever is enqueueing has a problem
What do you mean by this point then?

kwladyka10:12:03

> He seems to talk about the case of using #'worker as a channel. Are you even doing that? I don’t and he said it is an issue, because this channel growing and growing

kwladyka10:12:30

but I think it doesn’t get even 1 value, because it is endless loop with recur

kwladyka10:12:38

but maybe I miss something

vemv10:12:17

> What do you mean by this point then? Hypothetically, the problem is in the code that enqueues the work with >!. Maybe it's blocking indefinitely, making you believe the go-loop doesn't work Also, using put! can be dangerous because it disregards queue sizes. Do you have any calls to put!?

kwladyka10:12:13

> Hypothetically, the problem is in the code that enqueues the work with >!. Maybe it’s blocking indefinitely, making you believe the go-loop doesn’t work it works, because I see in logs it going further and finish adding things to queue. I have a message on the end to debug it.

kwladyka10:12:22

I don’t use put!

kwladyka10:12:47

So adding to queue doesn’t stuck

kwladyka10:12:55

there are only 2 possibilites:

kwladyka10:12:23

1. channels is closed for some magic reason 2. worker died 3. magic issues about memory or overflow something

kwladyka10:12:54

but the strangest thing is both workers stop doing jobs at the same time - at least I think it is like that

vemv10:12:54

2 can be checked with (<!! worker) in the repl right? if it blocks, then it hasn't died

vemv10:12:38

...it could be many things. thread uses an unbounded (but cached) thread pool which might make the OS upset.

kwladyka10:12:48

not really because I can’t reproduce it on localhost

kwladyka10:12:14

it happens after 2-4 weeks on production only

kwladyka10:12:24

something like that

kwladyka10:12:09

but because both workers stop doing jobs at the same time…. something overflow somehow probably

kwladyka10:12:54

but I think it is not go-loop because it never returns even 1 value

vemv10:12:14

go-loop as it name implies is made for looping so... I doubt it. It'd be extremely fragile if it'd didn't support infinite loops

fmjrey15:12:24

a/thread are daemon threads so the JVM does not wait for them when exiting. Maybe something else is making your JVM terminate....

fmjrey15:12:24

forget what I said, your loop is not in a a/thread

kwladyka09:12:52

the issue is (I guess) worker and worker-warehouse overflow capacity after 2-4 weeks

kwladyka09:12:02

because nothing reads from then, this two are only workers who work hard 🙂

kwladyka09:12:19

but output doesn’t matter

kwladyka09:12:40

and all in all what is the best solution for my case?

vemv10:12:26

I'd use something like AWS SQS for never losing work and having a greater degree of observability

kwladyka10:12:26

I can lose work, it is designed to not be a problem. I would like to keep it in the app to decrease complexity

kwladyka10:12:39

using third party queue system increase complexity, which I don’t need

vemv10:12:24

> I can lose work That's an interesting requirement I'd say, in that case you should be able to use whatever you feel more comfortable with. Could be agents, futures, core.async, j.u.concurrent, claypoole... obviously they have different demands in case something goes wrong (i.e., many people will recommend you j.u.c, but if something goes wrong will you pick up a Java book? Many devs won't)

kwladyka10:12:35

> That’s an interesting requirement The processing is resistant for crash in any moment

kwladyka10:12:21

But for other reasons processing can processing only 1 thing at the same time

kwladyka10:12:37

because of the limitation of third party system

kwladyka10:12:50

so this is why app needs queue

vemv10:12:43

a thread pool of size 1 also works like that

kwladyka10:12:29

maybe, but will it solve anything to switch for thread pool (I didn’t use so far)?

kwladyka10:12:46

it doesn’t look like it is directly async issue

kwladyka10:12:52

well I have no idea what is the issue

kwladyka10:12:01

it doesn’t make sense 😛

vemv10:12:17

if you are comfortable with it, it's more observable say it's a thread pool with threadsize 1, backed by a queue of max size 1000 if someting goes wrong you can fetch the queue size, its contents, etc. Likewise you might interact with the thread pool itself (check javadocs). also the error handling will be more straightforward. Less magic overall (vs. core.async)

kwladyka10:12:46

I am running a simple job 100 000 times to see if it will stuck on localhost

👍 4
kwladyka10:12:54

so far it looks like it works

kwladyka10:12:48

ok it finished 100 000 times. It is probably not about overflow any channels.

kwladyka10:12:06

ech maybe run REPL on production temporary…

kwladyka10:12:21

to connect it during an issue and debug then

kwladyka10:12:44

but from logs I am sure both workers stop doing new jobs at the same time which probably should tell me something, but it doesn’t

kwladyka10:12:53

Does it tell you anything?

delaguardo10:12:53

make sure your channel (queues and workers) are opened (clojure.core.async.impl.protocols/closed? chan) and be careful with that function because it is intended to be “implementation details” and it should not be a part of your code base

👍 4
kwladyka11:12:19

I can’t recreate it on localhost, so only one thing which I can do is push it to production and wait 2-4 weeks

kwladyka11:12:46

this things which I already added to logs today to be sure

kwladyka11:12:41

(clojure.core.async.impl.protocols/closed? chan) do you know how to run it on production? No implementation of method: :closed? of protocol: #'clojure.core.async.impl.protocols/Channel found for class: nil while it works on localhost in repl and even compile with it

kwladyka11:12:17

@delaguardo ^ do you know how to run it?

delaguardo11:12:25

you are trying to call it with nil as an argument, but it expects a channel

kwladyka11:12:42

but it is not true 😕

delaguardo11:12:48

I’m gessing here, based on error message)

kwladyka11:12:22

(defonce worker
  (go-loop []
    (let [job (<! queue)]
      (l/debug "run job" job)
      (when-not (nil? job)
        (<! (thread
              (try
                (job)
                (catch Throwable ex
                  (l/error ex)))))
        (recur)))))

(defonce worker-warehouse
  (dotimes [x 3]
    (go-loop []
      (let [job (<! queue-warehouse)]
        (l/debug "run warehouse job" job)
        (when-not (nil? job)
          (<! (thread
                (try
                  (job)
                  (catch Throwable ex
                    (l/error ex)))))
          (recur))))))

(defn add-job [ch f]
  (l/debug "async: add" f "to" ch ": " (>!! ch f))
  (l/debug "worker:" (clojure.core.async.impl.protocols/closed? worker)
           "worker-warehouse" (clojure.core.async.impl.protocols/closed? worker-warehouse)))

delaguardo11:12:58

but worker-warehouse should be nil, isn’t it?

delaguardo11:12:11

there is dotimes call inside

kwladyka11:12:29

ah… I am dump. true

delaguardo11:12:34

there is also a strange (when-not (nil? job The only way to get nil from a channel is to consume from closed and exhausted channel

kwladyka11:12:21

yes, it is added as a good habit to not pass nil for processing in such case

kwladyka11:12:53

but should be never true

kwladyka11:12:02

I mean false in this case

delaguardo11:12:12

but after consuming everything your code is going into recur branch and will try to consume from already closed channel then will got to the same branch

delaguardo11:12:19

but this is a big assumption, I mean statement that it will nether be true

delaguardo11:12:49

ah, sorry, my bad

kwladyka11:12:51

If it will be true, then worker will close itself, because of closed channel

kwladyka11:12:10

but I never close the channel, so ….. ok good 🙂

kwladyka11:12:25

the funny thing it started to happen after some update, maybe version of async or who knows. It wasn’t like that from beginning

didibus11:12:38

Like I said in other thread, your go-loop is pretty weird. I would rewrite it so it logs on nil and stops looping. Since when the channel take returns nil, it means the channel has closed. At least that way you'd know if the issue you're encountering is due to the channel closing

kwladyka11:12:06

it stops looping on nil already

didibus11:12:42

(defonce worker
  (go-loop []
    (if-let [job (<! queue)]
      (do
        (l/debug "run job" job)
        (<! (thread
              (try
                (job)
                (catch Throwable ex
                  (l/error ex)))))
        (recur))
      (l/debug "channel closed"))))

didibus11:12:07

Oh is the recur inside the when

didibus11:12:29

Ok, well do you know if that's the case?

kwladyka11:12:00

why this issue happening? I have no idea. I added extra debugs and I have to wait a few weeks

didibus11:12:29

I meant, do you know if it exited the go-loop ?

kwladyka11:12:15

I think it doesn’t, but next time I will have a proof in logs

kwladyka11:12:08

I have 0 close! this channel code

kwladyka11:12:19

it can close only itself cause by some magic bug in async

kwladyka11:12:29

But I did async update now also, so we will se

kwladyka11:12:28

But the issue is probably the most challenging which I ever had in Clojure

kwladyka11:12:36

at least one of them

didibus11:12:02

Hum, well the other thing I can think of is a race condition

didibus11:12:07

I mean a deadlock

didibus11:12:39

Like one job waiting on another

kwladyka11:12:22

doesn’t happen - I am sure from logs

kwladyka11:12:55

this is the interesting point

didibus11:12:37

Well, that could be a deadlock

didibus11:12:57

They'd both stop simultaneously if they deadlocked each other

kwladyka11:12:05

no, because I clearly see from logs functions not block

kwladyka11:12:21

and this fn don’t depend on each other in any way

didibus11:12:36

So what do you mean then by they both stop at the same time ?

kwladyka11:12:10

I have logs

(l/debug "run job" job)
and I see both workers stop run new jobs at the same time

kwladyka11:12:23

it is not like one worker stop running job and later second one

kwladyka11:12:36

the issue happening for both of them at the same time

didibus11:12:55

If I was you though, I'd just do this instead:

(defonce worker
  (thread
    (loop [job (<!! queue)]
      (l/debug "run job" job)
      (when-not (nil? job)
        (try
          (job)
          (catch Throwable ex
            (l/error ex)))
        (recur (<!! queue))))))

kwladyka12:12:48

what is the advantage of this fn?

didibus12:12:32

Well, since you want to process one job per worker at a time. This doesn't create a new thread on every job, it just reuses the same thread, and runs one job after another on it.

didibus12:12:24

Also, it isn't as much at risk of having issues where you've blocked the parking threadpool.

didibus12:12:54

Since each worker gets their own thread

delaguardo11:12:52

(defn worker [process-fn]
    (let [in (chan)
          out (chan)]
      (go-loop []
        (let [msg (<! in)]
          (>! out (process-fn msg)))
        (recur))
      [in out]))
consider something like this. worker function that returns in and out queues connected by process-fn. It will give you more control on created channels

delaguardo12:12:18

this is also allows you to restart worker on some condition

kwladyka12:12:45

it looks like out channel will be full at one moment

kwladyka12:12:05

and it will stuck

delaguardo12:12:51

why is that? i’m not restricting chan by any buffer-size

kwladyka12:12:46

I know but I guess there is memory limit or whatever limit. It can’t be endless

didibus12:12:19

So, if you stop seeing "run job" at the same time, it could be the queues are empty, and your puts are failing, it could be they are deadlocked, or it could be the channel have closed. That's all I can think of

kwladyka12:12:51

yes, I added debug to see if queue and workers are opened / closed today

kwladyka12:12:22

and for adding to channel fn

kwladyka12:12:33

so I will know this 3 things next time

didibus12:12:15

Is there any other thing in your app that would keep it alive if those loops exited?

kwladyka12:12:24

not sure what do you mean?

delaguardo12:12:44

small question - from the code above:

(loop [job (<! queue)]
   ...
   (job))
does it means that you are passing functions throw channels?

didibus12:12:31

Well, your process would exit if it stops looping. Normally an application starts and closes when it runs out of instructions to run, unless it has a main loop of some sort. In your case, your go-loop seem to be what does it. So if there's no other infinite loop, your program should shutdown if it was that the channels were closed and returned nil which would have exited the loops.

delaguardo12:12:42

is it possible for that functions to throw checked exceptions? they are not runtime so will not be catched by try ... Trhowable

delaguardo12:12:28

something like ClassNotFoundException

kwladyka12:12:50

the app wouldn’t exist if workers will stop working

delaguardo12:12:36

those exception will be thrown in the thread so your worker will be there

kwladyka12:12:13

What do you mean?

(try
                  (job)
                  (catch Throwable ex
                    (l/error ex)))
is not exactly this?

didibus12:12:24

There's another possibility I'm thinking. I'm not sure what happens to the channel returned by thread on that exception case

kwladyka12:12:38

there is 0 errors in logs

kwladyka12:12:02

*for this case

didibus12:12:03

Hum, ok. And all (job) return a non nil value ?

delaguardo12:12:23

if (job) throws checked exception it will not be catched by your try/catch block

kwladyka12:12:30

whatever (job) return it doesn’t matter

kwladyka12:12:44

hmm I have to read about this

kwladyka12:12:25

but it is not the case, because I don’t see `

(l/debug "run job" job)
`

didibus12:12:40

Throwable does catch everything

kwladyka12:12:17

I think we wouldn’t sole it today. We have to wait 2-4 weeks for new logs…

kwladyka12:12:50

I can post the solution here if I will find out it

didibus12:12:02

Ya, you could try and wait and see if others have ideas, but it seems to be a very rare ocurence so it will be hard

didibus12:12:14

And you're sure none of the jobs use core.async inside them?

didibus12:12:05

What does the producer code look like?

kwladyka12:12:16

(defn sync-auto []
  (if (state/get-sync)
    (do
      (slack/send-info "> > > Sync auto start add to queue")
      (let [shop-ids (keys conf/shops)]
        (doseq [shop-id shop-ids]
          (add-job queue #(orders/atomstore-sync->wfirma shop-id :bk)))
        (add-job queue sync-warehouse)
        (doseq [shop-id shop-ids]
          (add-job queue #(invoices/send-ivoices shop-id))))
      (slack/send-info "< < < Sync auto added tasks to queue"))
    (slack/send-warning "! ! ! Synchronization if off. Nothing to do.")))
I have super simple fn to add jobs

didibus12:12:22

Maybe there's an issue on their side

didibus12:12:10

Hum... And what triggers sync-auto? Maybe that's what stops working after a while?

kwladyka12:12:35

one per 30 minutes

didibus12:12:04

And those (slack/send-info "> > > Sync auto start add to queue") logs keep showing?

kwladyka12:12:07

it is working, because I see logs Sync auto start add to queue and Sync auto added tasks to queue

didibus12:12:30

Hum... ya that's pretty strange

didibus12:12:50

I'll be curious to know if you ever find out the problem

kwladyka12:12:58

I appreciate your engage into the topic, but probably we can only wait

kwladyka12:12:13

heh me too, give me a few months troll

didibus12:12:28

Good Luck!

fmjrey16:12:27

My guess would be something in (job) blocks the thread meaning the loop never completes