Fork me on GitHub
#core-async
<
2020-02-27
>
kwladyka10:02:57

Do you remember my magic issue about async stuck? I promised you to write here if I will learn something new:

(defonce queue (chan 100))
(defonce queue-warehouse (chan 100))

(defonce worker
  (go-loop []
    (let [job (<! queue)]
      (l/debug "run job" job)
      (when-not (nil? job)
        (<! (thread
              (try
                (job)
                (catch Throwable ex
                  (l/error ex)))))
        (recur)))))

(def worker-warehouse
  (doall
    (repeat 3
      (go-loop []
        (let [job (<! queue-warehouse)]
          (l/debug "run warehouse job" job)
          (when-not (nil? job)
            (<! (thread
                  (try
                    (job)
                    (catch Throwable ex
                      (l/error ex)))))
            (recur)))))))

(defn add-job [ch f]
  (l/debug "async: add" f "to" ch ": " (>!! ch f))
  (l/debug "worker:" (clojure.core.async.impl.protocols/closed? worker)
           "worker-warehouse" (pr-str (map clojure.core.async.impl.protocols/closed? worker-warehouse))))
And while everything stuck I see:
20-02-26 10:45:33 etingroup-sync-ff7dd5fdb-sf24n DEBUG [etingroup.sync:46] - async: add etingroup.sync$sync_auto$fn__24044@671e9b85 to clojure.core.async.impl.channels.ManyToManyChannel@7a4a107e :  true
20-02-26 10:45:33 etingroup-sync-ff7dd5fdb-sf24n DEBUG [etingroup.sync:47] - worker: false worker-warehouse (false false false)
so it means the task was added correctly and workers are not closed.

kwladyka10:02:29

but for some reason all async things stop run at the same moment

kwladyka10:02:19

still I have no idea why this happening, but I have learned queues and workers didn’t crash and still running

kwladyka10:02:13

I can only observe it is happening after about 3 weeks of running the app

kwladyka10:02:22

so this is super hard to debug

kwladyka10:02:39

so… like last time I have still no idea WTF

kwladyka10:02:14

> (l/debug “run job” job) and > (l/debug “run warehouse job” job) no appear in logs

kwladyka10:02:12

so it looks like workers are running because of clojure.core.async.impl.protocols/closed? and new tasks are added because of (l/debug "async: add" f "to" ch ": " (>!! ch f)) but anyway this things are never run

kwladyka10:02:57

How is it possible?

kwladyka10:02:45

I can’t find any logic here

kwladyka10:02:08

Only 1 suspicious things is the time needed to make everything stuck

kwladyka10:02:55

the only 1 explanation which I have is something in worker start to do job endless in time which doesn’t make sense

kwladyka10:02:15

because there are no loops there

kwladyka10:02:15

ok I have crazy idea… I am curious what will happen if SOAP API (third party system) will keep connection endless, because of the bug on they side. If there is any timeout. Probably not this, but worth to check

serioga18:02:02

@kwladyka what calls add-job ?

serioga18:02:04

do you add jobs to worker or worker-warehouse ? why both?

serioga18:02:58

do you log activity inside your jobs? enter/exit etc?

serioga18:02:43

why worker-warehouse is not defonce?

hiredman18:02:23

my guess is your jobs are adding things to the work queue, which is a very easy way to lock things up

hiredman18:02:10

your workers are waiting for existing jobs to finish before pulling from the work queue again, and your jobs are waiting for space in the work queue to add items before exiting

kwladyka18:02:05

@serioga > what calls add-job ? What do you mean? Exactly what is in the code. You probably asking about (>!! ch f) part. > do you add jobs to worker or worker-warehouse ? both > do you log activity inside your jobs? > enter/exit etc? yes and there is silence > why worker-warehouse is not defonce? I was testing something and forgot to change before paste

kwladyka18:02:05

@hiredman, no because I got from add-job clear feadback the job was added

kwladyka18:02:46

async: add etingroup.sync$sync_auto$fn__24044@671e9b85 to clojure.core.async.impl.channels.ManyToManyChannel@7a4a107e : true

hiredman18:02:00

but are you publishing to queue or warehouse-queue from the job?

kwladyka18:02:40

(defn add-job [ch f]
  (l/debug "async: add" f "to" ch ": " (>!! ch f))
  (l/debug "worker:" (clojure.core.async.impl.protocols/closed? worker)
           "worker-warehouse" (pr-str (map clojure.core.async.impl.protocols/closed? worker-warehouse))))
it gets 2 parameters, first is the channel. I add this to both queues

serioga18:02:57

«What do you mean? Exactly what is in the code.» In your code this function is declared but not used

hiredman18:02:09

the other very common thing is you've been messing around in a repl for while, and who knows what state your repl is in

hiredman18:02:16

I mean from the job

hiredman18:02:24

like when f is run, does it call add-job

hiredman18:02:41

are your jobs recursive

kwladyka18:02:58

@serioga oh sorry I misread

serioga18:02:05

«yes and there is silence» so there are no job which are started but not finished? they just don't start?

kwladyka18:02:23

> the other very common thing is you’ve been messing around in a repl for while, and who knows what state your repl is in the bug is on production

hiredman18:02:44

oh, then what I said the first time almost certainly

hiredman18:02:55

you have a feedback loop

kwladyka18:02:10

Why do you think so?

kwladyka18:02:19

> so there are no job which are started but not finished? any job start or finish, silence in logs. Logs showing only adding to queue

hiredman18:02:27

because it is an easy mistake to make when you are trying to build some kind of work queue thing in core.async instead of using executors directly

hiredman18:02:55

but it can be overlooked in testing if your buffer sizes are large enough

kwladyka18:02:18

still I am adding to queue and I am sure because I see this in logs. Workers still exists, because I see this in logs.

kwladyka18:02:02

but I don’t see (l/debug "run job" job) or any other logs

kwladyka18:02:08

so nothing is run anymore

kwladyka18:02:19

I have only one explanation for that

hiredman18:02:22

because you have jobs running that cannot complete running

kwladyka18:02:47

worker has to freeze for some reason in something endless or there is bug in modules which I use

hiredman18:02:07

because they are waiting for space in the queue, but there won't be space in the queue until the job exits so the worker go loops consume from the queue

kwladyka18:02:16

but because I don’t use loops in my jobs… that is strange

kwladyka18:02:40

unless endless API query can exist

kwladyka18:02:46

or somethingl ike that

hiredman18:02:49

literally you don't use loops? or do you mean your jobs don't have a recursive structure

hiredman18:02:15

the actual structure of your jobs doesn't matter, it only matters if they ever call add-job

kwladyka18:02:15

> because they are waiting for space in the queue, but there won’t be space in the queue until the job exits so the worker go loops consume from the queue no. As I said logs show clear the job is added and not waiting for space in the queue

kwladyka18:02:08

I think I don’t use loops in this jobs and there is no recursive structure

hiredman18:02:08

"And while everything stuck I see:"

hiredman18:02:13

is everything stuck or not?

kwladyka18:02:20

This is API query mainly and data processing between

serioga18:02:23

«unless endless API query can exist» If you are not sure then better to add timeout channels

hiredman18:02:33

do you call add-job in you jobs?

ghadi18:02:44

Making top level def go routines is an antipattern

kwladyka18:02:47

everything stuck expect adding new items to the queue

hiredman18:02:56

to which queue?

serioga18:02:21

«do you call add-job in you jobs?» this is top secret, who and how call it 🙂

kwladyka18:02:47

so to make this even more clear. I added 2 queues instead of 1 because of this bug. Before there was only 1 queue and issue was the same.

kwladyka18:02:58

App add to queue once per 30 minutes

hiredman18:02:07

definitely what I am saying

hiredman18:02:20

which is why adding another queue just sort of works around it

hiredman18:02:45

it has the same effect as increasing the buffer size of the channel

kwladyka18:02:45

> «do you call add-job in you jobs?» now I add in queue to qeueue-warehouse before I didn’t do this and the issue was the same

hiredman18:02:14

you have the same feedback loop, just slightly larger

hiredman18:02:14

get rid of this code, use an executor directly

serioga18:02:24

«Before there was only 1 queue» Which one? Warehouse?

kwladyka18:02:35

> Making top level def go routines is an antipattern What is right pattern for this case then? @ghadi

kwladyka18:02:51

> «Before there was only 1 queue» > Which one? Warehouse? no, queue

ghadi18:02:13

Call a function passing it the input chan, that fn returns a goroutine

hiredman18:02:13

executors by default have an unbounded queue which will break your feedback loop (but unbounded queues can introduce other problems)

serioga18:02:50

«no, queue» so you don't need parallel execution for you jobs?

hiredman18:02:29

you could do something like setting the buffer size on the channel to Long/MAX_VALUE

hiredman18:02:39

but like, just use an executor

kwladyka18:02:50

As I mentioned before I added queue-warehouse to split things to easier debug and to speed the process. But before it was working with only one queueu

kwladyka18:02:12

I don’t think the buffer size is the issue

hiredman18:02:28

and it is why you don't see the issue in testing

kwladyka18:02:40

I did test running the 100000000 times and this didn’t stuck

hiredman18:02:44

you have a large enough buffer to avoid filling the channel in tests

hiredman18:02:03

just do (chan) without a buffer in your tests

serioga18:02:12

«I don’t think the buffer size is the issue» Well, anyway you should decide behaviour if there is no room in queue chan like dropping or sliding buffer

kwladyka18:02:51

lests assume this will stuck because of the buffer size. Even then from time to time worker will finish the job and new job can come in.

kwladyka18:02:10

but this things don’t happen

serioga18:02:13

@kwladyka do you have any other core.async operations in your project?

kwladyka18:02:28

@serioga no I use only here async

hiredman18:02:33

if the channel ever fills up and then it will stop

hiredman18:02:59

it is like feedback between a speaker and a mic

hiredman18:02:17

below some volume you won't get feedback

kwladyka18:02:23

not really, because here is only one direction cron -> queue -> queue-warehouse

kwladyka18:02:38

and before queue-warehouse didn’t exist

hiredman18:02:52

do you ever call add-job from within a job

serioga18:02:12

«queue -> queue-warehouse» I don't see this in your code

kwladyka18:02:15

as I mentioned before only from queue to add to queue-warehouse

kwladyka18:02:34

> I don’t see this in your code I know, this is in different part

kwladyka18:02:54

(defn sync-warehouse []
  (let [shop-ids (keys conf/shops)
        products-reservations (->> (pmap #(vector % (orders/atomstore-orders-reservations->products-reservation %)) shop-ids)
                                   (into {}))
        products-from-orders-not-synchronized-yet (->> (pmap #(vector % (orders/atomstore-orders-not-synchronized-yet->products-reservation %)) shop-ids)
                                                       (into {}))
        products-reservations+not-synchronized-orders (reduce (fn [m shop-id]
                                                                (assoc m shop-id
                                                                         (->> ((juxt products-reservations products-from-orders-not-synchronized-yet) shop-id)
                                                                              (apply merge-with +))))
                                                              {} shop-ids)
        stock-change (orders/products-reservations->stock-change-for-shop products-reservations+not-synchronized-orders)
        products (products/wfirma->products :bk)
        update-stock (fn [shop-id]
                       (->> products
                            (map (fn [product]
                                   (if-let [reservation (get-in stock-change [shop-id (:code product)])]
                                     (update product :quantity #(- % reservation))
                                     product)))))
        stock-round-down (fn [stock]
                           (map (fn [product]
                                  (update product :quantity #(Math/floor %)))
                                stock))]
    (add-job queue-warehouse #(products/products->atomstore :kh (update-stock :kh) {:prices? true
                                                                    :stock? true}))
    (add-job queue-warehouse #(products/products->atomstore :yms (update-stock :yms) {:stock? true}))
    (add-job queue-warehouse #(products/products->atomstore :hurt (stock-round-down (update-stock :hurt)) {:stock? true}))
    (add-job queue-warehouse #(warehouse/update-cache-supplier-orders products products-reservations+not-synchronized-orders))))

(defn sync-auto []
  (if (state/get-sync)
    (do
      (slack/send-info "> > > Sync auto start add to queue")
      (let [shop-ids (keys conf/shops)]
        (doseq [shop-id shop-ids]
          (add-job queue #(orders/atomstore-sync->wfirma shop-id :bk)))
        (add-job queue sync-warehouse)
        (doseq [shop-id shop-ids]
          (add-job queue #(invoices/send-ivoices shop-id))))
      (slack/send-info "< < < Sync auto added tasks to queue"))
    (slack/send-warning "! ! ! Synchronization if off. Nothing to do.")))

kwladyka19:02:29

there is no other place with add-job

kwladyka19:02:16

so my guess is queue-worker stuck in something endless…. But I don’t see how. Also I don’t see any CPU overusage.

kwladyka19:02:27

or memory issues

kwladyka19:02:34

which could say me there is endless loop issue

serioga19:02:47

@kwladyka «And while everything stuck I see:» you see it every 30 minute after everything is stuck?

kwladyka19:02:47

uh I lost to which my reply you reffer

serioga19:02:03

Where sync-auto is called from? 🙂

kwladyka19:02:31

every 30 minutes by some kind of cron solution

hiredman19:02:32

all of which points to a deadlock waiting on your queues

kwladyka19:02:28

> all of which points to a deadlock waiting on your queues Where in your opinion can be deadlock? Nothing block queue-warehouse the things go only into 1 direction

kwladyka19:02:01

@serioga yes, I see this every 30 minutes

kwladyka19:02:41

0-02-26 09:45:33 etingroup-sync-ff7dd5fdb-sf24n DEBUG [etingroup.sync:47] - worker: false worker-warehouse (false false false)
20-02-26 09:45:33 etingroup-sync-ff7dd5fdb-sf24n DEBUG [etingroup.slack:26] - Slack send: {:color "#0000FF", :text "< < < Sync auto added tasks to queue"}
20-02-26 10:15:33 etingroup-sync-ff7dd5fdb-sf24n DEBUG [etingroup.slack:26] - Slack send: {:color "#0000FF", :text "> > > Sync auto start add to queue"}
20-02-26 10:15:33 etingroup-sync-ff7dd5fdb-sf24n DEBUG [etingroup.sync:46] - async: add etingroup.sync$sync_auto$fn__24036@1ebb3fc to clojure.core.async.impl.channels.ManyToManyChannel@7a4a107e :  true
20-02-26 10:15:33 etingroup-sync-ff7dd5fdb-sf24n DEBUG [etingroup.sync:47] - worker: false worker-warehouse (false false false)
20-02-26 10:15:33 etingroup-sync-ff7dd5fdb-sf24n DEBUG [etingroup.sync:46] - async: add etingroup.sync$sync_auto$fn__24036@40a18579 to clojure.core.async.impl.channels.ManyToManyChannel@7a4a107e :  true
20-02-26 10:15:33 etingroup-sync-ff7dd5fdb-sf24n DEBUG [etingroup.sync:47] - worker: false worker-warehouse (false false false)
20-02-26 10:15:33 etingroup-sync-ff7dd5fdb-sf24n DEBUG [etingroup.sync:46] - async: add etingroup.sync$sync_auto$fn__24036@28d752b5 to clojure.core.async.impl.channels.ManyToManyChannel@7a4a107e :  true
20-02-26 10:15:33 etingroup-sync-ff7dd5fdb-sf24n DEBUG [etingroup.sync:47] - worker: false worker-warehouse (false false false)
20-02-26 10:15:33 etingroup-sync-ff7dd5fdb-sf24n DEBUG [etingroup.sync:46] - async: add etingroup.sync$sync_warehouse@647d254d to clojure.core.async.impl.channels.ManyToManyChannel@7a4a107e :  true
20-02-26 10:15:33 etingroup-sync-ff7dd5fdb-sf24n DEBUG [etingroup.sync:47] - worker: false worker-warehouse (false false false)
20-02-26 10:15:33 etingroup-sync-ff7dd5fdb-sf24n DEBUG [etingroup.sync:46] - async: add etingroup.sync$sync_auto$fn__24044@1b54dc3e to clojure.core.async.impl.channels.ManyToManyChannel@7a4a107e :  true
20-02-26 10:15:33 etingroup-sync-ff7dd5fdb-sf24n DEBUG [etingroup.sync:47] - worker: false worker-warehouse (false false false)
20-02-26 10:15:33 etingroup-sync-ff7dd5fdb-sf24n DEBUG [etingroup.sync:46] - async: add etingroup.sync$sync_auto$fn__24044@36c12338 to clojure.core.async.impl.channels.ManyToManyChannel@7a4a107e :  true
20-02-26 10:15:33 etingroup-sync-ff7dd5fdb-sf24n DEBUG [etingroup.sync:47] - worker: false worker-warehouse (false false false)
20-02-26 10:15:33 etingroup-sync-ff7dd5fdb-sf24n DEBUG [etingroup.sync:46] - async: add etingroup.sync$sync_auto$fn__24044@5f1cb6ba to clojure.core.async.impl.channels.ManyToManyChannel@7a4a107e :  true
20-02-26 10:15:33 etingroup-sync-ff7dd5fdb-sf24n DEBUG [etingroup.sync:47] - worker: false worker-warehouse (false false false)
20-02-26 10:15:33 etingroup-sync-ff7dd5fdb-sf24n DEBUG [etingroup.slack:26] - Slack send: {:color "#0000FF", :text "< < < Sync auto added tasks to queue"}
20-02-26 10:45:33 etingroup-sync-ff7dd5fdb-sf24n DEBUG [etingroup.slack:26] - Slack send: {:color "#0000FF", :text "> > > Sync auto start add to queue"}
20-02-26 10:45:33 etingroup-sync-ff7dd5fdb-sf24n DEBUG [etingroup.sync:46] - async: add etingroup.sync$sync_auto$fn__24036@141dd6dc to clojure.core.async.impl.channels.ManyToManyChannel@7a4a107e :  true
20-02-26 10:45:33 etingroup-sync-ff7dd5fdb-sf24n DEBUG [etingroup.sync:47] - worker: false worker-warehouse (false false false)
20-02-26 10:45:33 etingroup-sync-ff7dd5fdb-sf24n DEBUG [etingroup.sync:46] - async: add etingroup.sync$sync_auto$fn__24036@3b5a82d3 to clojure.core.async.impl.channels.ManyToManyChannel@7a4a107e :  true
20-02-26 10:45:33 etingroup-sync-ff7dd5fdb-sf24n DEBUG [etingroup.sync:47] - worker: false worker-warehouse (false false false)
20-02-26 10:45:33 etingroup-sync-ff7dd5fdb-sf24n DEBUG [etingroup.sync:46] - async: add etingroup.sync$sync_auto$fn__24036@65462cf9 to clojure.core.async.impl.channels.ManyToManyChannel@7a4a107e :  true
20-02-26 10:45:33 etingroup-sync-ff7dd5fdb-sf24n DEBUG [etingroup.sync:47] - worker: false worker-warehouse (false false false)
20-02-26 10:45:33 etingroup-sync-ff7dd5fdb-sf24n DEBUG [etingroup.sync:46] - async: add etingroup.sync$sync_warehouse@647d254d to clojure.core.async.impl.channels.ManyToManyChannel@7a4a107e :  true
20-02-26 10:45:33 etingroup-sync-ff7dd5fdb-sf24n DEBUG [etingroup.sync:47] - worker: false worker-warehouse (false false false)
20-02-26 10:45:33 etingroup-sync-ff7dd5fdb-sf24n DEBUG [etingroup.sync:46] - async: add etingroup.sync$sync_auto$fn__24044@2ed18f00 to clojure.core.async.impl.channels.ManyToManyChannel@7a4a107e :  true
20-02-26 10:45:33 etingroup-sync-ff7dd5fdb-sf24n DEBUG [etingroup.sync:47] - worker: false worker-warehouse (false false false)
20-02-26 10:45:33 etingroup-sync-ff7dd5fdb-sf24n DEBUG [etingroup.sync:46] - async: add etingroup.sync$sync_auto$fn__24044@671e9b85 to clojure.core.async.impl.channels.ManyToManyChannel@7a4a107e :  true
20-02-26 10:45:33 etingroup-sync-ff7dd5fdb-sf24n DEBUG [etingroup.sync:47] - worker: false worker-warehouse (false false false)

kwladyka19:02:26

yes and jobs don’t start

serioga19:02:02

so, as @hiredman said you call add-job from inside you jobs

kwladyka19:02:34

yes, as I mentioned now I call add-job from qeueue to queue-warehouse

kwladyka19:02:56

and before I didn’t have queue-warehouse

kwladyka19:02:04

the process was running in 1 shot

kwladyka19:02:47

only cron was adding to queue every 30 minutes

kwladyka19:02:54

nothing more

kwladyka19:02:31

@hiredman can you point where do you see possibility of deadlock? I don’t see this.

serioga19:02:14

well, only ManyToManyChannel@7a4a107e is alive in log, it look like it is queue

hiredman19:02:14

I don't know, I don't have all your code, and I don't really want to read it, but the situation you are describing matches one

kwladyka19:02:40

> well, only ManyToManyChannel@7a4a107e is alive in log, it look like it is queue

kwladyka19:02:56

yes it makes sense, because to queue-warehouse add only from queue

hiredman19:02:29

look, just remove the buffers from your channels and run your tests and see if things lock up

kwladyka19:02:43

@hiredman I know it looks like that, but I don’t see logical possibility of that. Especially also there was only 1 queue to which cron was adding

kwladyka19:02:53

I was trying this

kwladyka19:02:05

I was running this even 1000000 times to see the result

hiredman19:02:58

when logic and reality fight, reality always wins

🔥 4
kwladyka19:02:54

I think there is no other way to solve this then add more debug to determine exect line where it stops

kwladyka19:02:13

At least I can’t find another solution

kwladyka19:02:32

this is the most mysterious bug which I ever had

hiredman19:02:15

or you could just use an executor 🙂

serioga19:02:03

@kwladyka well, I feel like >!! is blocked inside your thread — to many jobs for fixed size chan my suggestions: 1. replace >!! with non-blocking put! 2. add logging in add-job before calling >!! (now you will see nothing if >!! stuck)

hiredman19:02:56

(def queue (java.util.concurrent.Executors/newFixedThreadPool. 3))

(defn add-job [queue f]
  (.execute queue f))
replaces all the core.async code above

hiredman19:02:28

(well, you'll need to wrap f to get the logging)

kwladyka19:02:31

> well, I feel like >!! is blocked inside your thread — to many jobs for fixed size chan > my suggestions: lets assume this can be true. So what about: 1. we see in logs new jobs are added to queue 2. even if queue -> queue-warehouse will wait because of the buffer size from time to time worker-warehouse will finish the job, so the new job will be added. Right? Do I miss something?

hiredman19:02:02

like your analysis is based on the code you are looking at, are you sure it is exactly the same as the code in production?

hiredman19:02:29

the logs you shared, as already mentioned, only show add-job being called with the same channel over and over again

hiredman19:02:42

which is not the behavior your'd get from the code you shared

kwladyka19:02:02

(defn add-job [ch f]
  (l/debug "async: add" f "to" ch ": " (>!! ch f))
  (l/debug "worker:" (clojure.core.async.impl.protocols/closed? worker)
           "worker-warehouse" (pr-str (map clojure.core.async.impl.protocols/closed? worker-warehouse))))

hiredman19:02:26

because add-job, in the code you shared, is called for both queues

hiredman19:02:39

so you should get logging where 'ch' is two different channels

kwladyka19:02:42

@serioga it works like that currently

kwladyka19:02:36

sync-auto add to queue . In queue add to queue-warehouse . Everything is correct.

hiredman19:02:55

also you only have 1 go loop consuming from queue-warehouse

hiredman19:02:05

you just repeat the channel returned 3 times

kwladyka19:02:24

which make from this 3 workers

hiredman19:02:54

(repeat 3 x) makes a seq of the value of x 3 times

hiredman19:02:05

it doesn't evaluate x three times

hiredman19:02:29

user=> (repeat 3 (println "foo"))
foo
(nil nil nil)
user=>

kwladyka19:02:41

hmm I have to check this, but I found this somewhere as a solution to make X workers

hiredman19:02:42

there is nothing to check, it is empirically the case

kwladyka19:02:17

damn… you are right here

didibus06:03:45

you want repeatedly

kwladyka07:03:14

yes, I fixed did

serioga19:02:08

1. We don't see failed attempts to add job in logs for current implementation. 2. worker-warehouse cannot finish any job, it waits for adding job to the channel which is not consumed anymore. https://clojurians.slack.com/archives/C05423W6H/p1582831471121700

kwladyka19:02:57

at least I think it is

serioga19:02:11

Ah! Now I understand why testing queue-warehouse did not work parallel https://clojurians.slack.com/archives/C05423W6H/p1582831795128100

serioga19:02:04

tricky bug 🙂

serioga19:02:26

but initially there was single queue anyway...

kwladyka19:02:30

yes, not everything in google work like it should 😉

kwladyka19:02:46

and even with 1 warehouse-worker this should be fine

kwladyka19:02:50

only a little slower

kwladyka19:02:09

but nice catch @hiredman 👍

kwladyka19:02:27

I don’t know how I missed that

hiredman19:02:48

but that is an example of the kind of human factor thing

serioga19:02:21

I'd return to implementation without queue-warehouse

ghadi19:02:23

you need to make a diagram, not code

ghadi19:02:36

no one wants to read code

ghadi19:02:48

s/no one/I/

kwladyka19:02:33

I will fix this 3 workers thing, add more debugs, etc. Thank for help. I know this was hard discussion, but the issue is also very hard to figure out.

serioga19:02:41

@kwladyka just do these two simple changes and remove queue-warehouse back (as not relevant) https://clojurians.slack.com/archives/C05423W6H/p1582831143118000

kwladyka19:02:14

sure, I will also try decrease buffer size. Just I need to add more things to debug this

kwladyka19:02:27

last time I thought maybe worker crash in some way, but it looks like not

kwladyka19:02:30

I don’t know. To determine some magic 😉 For example if with buffer 10 it will stuck after a week always this will mean something. I don’t think this is the case, but have to experiment.

serioga19:02:17

programming is not magic better think longer but decide logically

kwladyka19:02:33

I have limited time to solve this issue

ghadi19:02:37

need to get rid of those top level processes

kwladyka19:02:40

but will do my the best

ghadi19:02:48

and stop logging I/O inside a go block

ghadi19:02:15

pass in a log channel (with dropping buffer) to which workers can send log entries

ghadi19:02:23

or use a thread and not care

serioga19:02:33

my opinion: using put! you eliminate possibility to block adding logging is “just in case”

kwladyka19:02:25

thanks for tips, I will tell you if I will find the solution

hiredman19:02:29

using put! is terrible

serioga19:02:38

«and stop logging I/O inside a go block» generally it is correct but I don't bother so hard 🙂 I configure async loggers instead

hiredman19:02:01

asking to hit the 1024 pending puts limit

💯 4
serioga19:02:03

«asking to hit the 1024 pending puts limit» fine. Then he will need to think about back-pressure but I doubt in this particular case

ghadi19:02:35

it's incorrect/problematic advice

serioga19:02:21

I think that architecture of this solution is problematic itself I'm not ready to offer architectural changes quickly.

serioga19:02:01

Function put! exists and should be used when it has sense

ghadi20:02:42

right, it doesn't make sense here @serioga

ghadi20:02:25

architecture is the most important part of getting these little concurrent machines to work well

ghadi20:02:02

where does new data come from? where does it go? what happens when the data is produced fast than it can be consumed? etc. etc.

fmjrey20:02:33

if you're able to reproduce while running in debug mode (say in intellij) then have you tried to inspect your threads to see what blocks?

ghadi20:02:28

go routines will not show up in thread stack dumps except when they're active

ghadi20:02:44

if they're waiting on a channel op they're not actually running on thread

ghadi20:02:22

async/thread will show up

fmjrey20:02:29

you can also inspect channels and see their internal state

kwladyka20:02:24

> you can also inspect channels and see their internal state How?

kwladyka20:02:25

> if you’re able to reproduce while running in debug mode (say in intellij) then have you tried to inspect your threads to see what blocks? I can’t

fmjrey20:02:26

This video helps a lot in understanding internals: https://vimeo.com/100518968

ghadi20:02:02

as one of the authors of core.async, I would avoid breaking into the channel

4
ghadi20:02:01

you need to clarify intent, develop hypotheses, and validate with experiments

ghadi20:02:25

observing the impl details of channels is inherently racey

kwladyka20:02:31

> I would avoid breaking into the channel What do you mean?

ghadi20:02:51

I think @fmjrey was suggesting looking inside the channel

kwladyka20:02:09

oh I thought you are writing to me

ghadi20:02:13

(racey, for the same reason the closed? method is not public API -- it's almost guaranteed to be racey)

kwladyka20:02:15

about somethign what I said

ghadi20:02:17

I am writing to you too

ghadi20:02:54

whatever is responsible for creating workers should pass their input channels as arguments to those go routines, along with maybe a logging channel

ghadi20:02:16

(defn launch-worker [in cancel log]
  (go ....))

kwladyka20:02:34

(defonce worker
  (go-loop []
    (let [job (<! queue)]
      (l/debug "run job" job)
      (when-not (nil? job)
        (<! (thread
              (try
                (job)
                (catch Throwable ex
                  (l/error ex)))))
        (recur)))))
hmm what is wrong with this?

ghadi20:02:55

that's a top level side effect that starts a process when the namespace loads

kwladyka20:02:42

so the best practice is to launch-worker each time when run the job? And each time create a channel for this purpose? Or only launch workers once with initialization?

ghadi20:02:28

you still want to launch your workers once, I mean don't do it with global defs

👍 4
kwladyka20:02:04

ok thanks, I need to take a break. It is too much for me today 🙂

fmjrey20:02:04

I did not mean to change the internal state, only to inspect it so you know if buffer is full, the number of pending put, etc

ghadi20:02:03

still a bad idea

ghadi20:02:30

observing concurrent devices changes the rhythm of the program

ghadi20:02:51

just like stopping the world in a debugger might not show you races that happen commonly without the debugger

fmjrey20:02:47

you don't need to stop execution just wait for the deadlock state and then inspect