Fork me on GitHub
#core-async
<
2023-02-03
>
diego.videco00:02:56

I’ve got a program that uses a series of channels to process and pass data from one another. But my problem is that once the buffer of one of the channels fills up the whole program blocks. I am not entirely sure what I am doing wrong, as I would expect that if a channel filled up, then the producer of that channel would be temporarily blocked while the full channel would be processing and to the next one and emptying its queue. The core of my code looks like this:

(defn send-cycles-to-next-channel
  [chan-map writer-chan cycle]
  (let [chan (get chan-map (count cycle))]
    (cond
      (cycle-complete? cycle) (a/>!! writer-chan (str (readble-cycle cycle) "\n"))

      chan (a/>!! chan cycle)
      :else (taoensso.timbre/error "No channel for this cycle" cycle))))

(defn make-processor [cps]
  (let [graph (-> cps :graphs :simple)
        chans (map (fn [i] [(inc i) (a/chan 5000)])
                   ;; make one channel for each scale step
                   (range (count (:scale cps))))
        chan-map (into {} chans)
        ;; last step is to write to a file
        writer-chan (a/chan 5000)]
    ;; writter loop
    (a/go-loop [line (a/<! writer-chan)]
      (when line
        (taoensso.timbre/info "Writing" cycle)
        (spit "test.txt" line :append true)
        (recur (a/<! writer-chan))))
    ;; channel loops
    (doseq [[_chan-index chan] chans]
      (a/go-loop [cycle (a/<! chan)]
        (when cycle
          (let [next-cycles (->> cycle
                                 (add-nodes-to-cycle graph)
                                 filter-partial-cycles)]
            (taoensso.timbre/info "More cycles")
            (doseq [cycle next-cycles]
              ;; put cycle on next channel or write to file if completed
              (send-cycles-to-next-channel chan-map writer-chan cycle)))
          (recur (a/<! chan)))))
    chan-map))

hiredman00:02:22

a guess would be you have a cycle leading to a deadlock

hiredman00:02:42

a fixed size buffer like core.async uses can easily lead to deadlocks when the output of a process is fed back into the input of the same process

phronmophobic00:02:27

There's a couple issues I see. I'm not sure they're causing your deadlock, but fixing them might help make some progress: • send-cycles-to-next-channel is using block ops( a/>!! ), but is called from within a go loop. • I/O (ie. spit) inside a go loop.

diego.videco00:02:39

@U0NCTKEV8 might be possible, although I think I am avoiding that, that’s why I make a channel for each step in the process

hiredman00:02:02

sure, it was just a guess based on seeing the word "cycle"

hiredman00:02:18

the use of blocking ops ina go block is almost certainly it

diego.videco00:02:30

it’s an algorithm for getting subcycles of a graph

hiredman00:02:24

using the blocking ops on the go threadpool will hang up those threads, meaning go blocks can't be run anymore

diego.videco00:02:08

Oh, if I use non blocking ops then I get this error:

Exception in thread "async-dispatch-65" java.lang.AssertionError: Assert failed: No more than 1024 pending puts are allowed on a single channel. Consider using a windowed buffer.
(< (.size puts) impl/MAX-QUEUE-SIZE)
What’s a good way to avoid that

hiredman00:02:27

depends on why you getting that error

hiredman00:02:48

my guess is you wrapped the body of send-cycles-to-next-channel in async/go

diego.videco00:02:48

Probably some processes are slower than others

hiredman00:02:02

to get access to the parking version of the ops

diego.videco00:02:17

Did this:

(defn send-cycles-to-next-channel
  [chan-map writer-chan archi-factors cycle]
  (let [chan (get chan-map (count cycle))]
    (cond
      (cycle-complete? cycle) (a/go (a/>! writer-chan (str (readble-cycle archi-factors cycle) "\n")))

      chan (a/go (a/>! chan cycle))
      :else (taoensso.timbre/error "No channel for this cycle" cycle))))

hiredman00:02:06

that means now send-cycles-to-next-channel is asynchronous, it can and likely does return before before those writes to channels are complete

hiredman00:02:35

so you may be building up a large back log of channels ops without waiting for them to complete

phronmophobic00:02:11

I'm not exactly sure what you're trying to do (a picture might help), so it's hard to give advice on what you should do. I might try:

(defn send-cycles-to-next-channel
  [chan-map writer-chan archi-factors cycle]
(async/go  (let [chan (get chan-map (count cycle))]
    (cond
      (cycle-complete? cycle) (a/>! writer-chan (str (readble-cycle archi-factors cycle) "\n"))

      chan (a/>! chan cycle)
      :else (taoensso.timbre/error "No channel for this cycle" cycle)))))
and then call it like:
(<! (send-cycles-to-next-channel chan-map writer-chan cycle))))
note: I wrote this outside my editor so the formatting and parens might be wrong.

diego.videco00:02:31

Thanks, I’ll try that tomorrow. I am producing a list subcycles of a highly symmetrical cyclic graph. The concept is take all paths in the graph as they branch. The list is like this:

A.B.C - C.D.E - A.B.C
A.B.C - C.D.E - D.E.F - B.D.E - A.B.C
A.B.C - C.D.E - D.E.F - B.D.E - A.D.E - A.B.C
...
A.B.C is the name of the node. This is a reduced view of the graph (many paths are missing)

diego.videco00:02:48

it’s a musical scale (if you are interested)

👍 1
diego.videco02:02:10

Fantastic, seems to be working thank you both @U0NCTKEV8 @U7RJTCH6J. As a thank you here’s a piece made with this scale/tuning. It explores a couple of cycles from the graph above, and that’s why it inspired me to make a list of all possible cycle (which are far more than I expected). https://anaphoria.bandcamp.com/album/the-creation-of-the-worlds

🎶 2