This page is not created by, affiliated with, or supported by Slack Technologies, Inc.
2023-02-03
Channels
- # announcements (1)
- # babashka (31)
- # babashka-sci-dev (53)
- # beginners (34)
- # calva (54)
- # cider (15)
- # clj-kondo (9)
- # clojure (115)
- # clojure-dev (19)
- # clojure-europe (21)
- # clojure-nl (1)
- # clojure-norway (78)
- # clojurescript (10)
- # clr (9)
- # community-development (9)
- # core-async (24)
- # cursive (18)
- # datomic (59)
- # emacs (43)
- # figwheel-main (2)
- # fulcro (4)
- # graphql (4)
- # malli (7)
- # meander (12)
- # nbb (14)
- # off-topic (22)
- # polylith (9)
- # re-frame (5)
- # reitit (3)
- # releases (1)
- # shadow-cljs (36)
- # sql (1)
- # tools-build (23)
- # xtdb (13)
I’ve got a program that uses a series of channels to process and pass data from one another. But my problem is that once the buffer of one of the channels fills up the whole program blocks. I am not entirely sure what I am doing wrong, as I would expect that if a channel filled up, then the producer of that channel would be temporarily blocked while the full channel would be processing and to the next one and emptying its queue. The core of my code looks like this:
(defn send-cycles-to-next-channel
[chan-map writer-chan cycle]
(let [chan (get chan-map (count cycle))]
(cond
(cycle-complete? cycle) (a/>!! writer-chan (str (readble-cycle cycle) "\n"))
chan (a/>!! chan cycle)
:else (taoensso.timbre/error "No channel for this cycle" cycle))))
(defn make-processor [cps]
(let [graph (-> cps :graphs :simple)
chans (map (fn [i] [(inc i) (a/chan 5000)])
;; make one channel for each scale step
(range (count (:scale cps))))
chan-map (into {} chans)
;; last step is to write to a file
writer-chan (a/chan 5000)]
;; writter loop
(a/go-loop [line (a/<! writer-chan)]
(when line
(taoensso.timbre/info "Writing" cycle)
(spit "test.txt" line :append true)
(recur (a/<! writer-chan))))
;; channel loops
(doseq [[_chan-index chan] chans]
(a/go-loop [cycle (a/<! chan)]
(when cycle
(let [next-cycles (->> cycle
(add-nodes-to-cycle graph)
filter-partial-cycles)]
(taoensso.timbre/info "More cycles")
(doseq [cycle next-cycles]
;; put cycle on next channel or write to file if completed
(send-cycles-to-next-channel chan-map writer-chan cycle)))
(recur (a/<! chan)))))
chan-map))
a fixed size buffer like core.async uses can easily lead to deadlocks when the output of a process is fed back into the input of the same process
There's a couple issues I see. I'm not sure they're causing your deadlock, but fixing them might help make some progress:
• send-cycles-to-next-channel
is using block ops( a/>!!
), but is called from within a go
loop.
• I/O (ie. spit
) inside a go
loop.
@U0NCTKEV8 might be possible, although I think I am avoiding that, that’s why I make a channel for each step in the process
it’s an algorithm for getting subcycles of a graph
using the blocking ops on the go threadpool will hang up those threads, meaning go blocks can't be run anymore
Oh, if I use non blocking ops then I get this error:
Exception in thread "async-dispatch-65" java.lang.AssertionError: Assert failed: No more than 1024 pending puts are allowed on a single channel. Consider using a windowed buffer.
(< (.size puts) impl/MAX-QUEUE-SIZE)
What’s a good way to avoid thatProbably some processes are slower than others
Did this:
(defn send-cycles-to-next-channel
[chan-map writer-chan archi-factors cycle]
(let [chan (get chan-map (count cycle))]
(cond
(cycle-complete? cycle) (a/go (a/>! writer-chan (str (readble-cycle archi-factors cycle) "\n")))
chan (a/go (a/>! chan cycle))
:else (taoensso.timbre/error "No channel for this cycle" cycle))))
that means now send-cycles-to-next-channel is asynchronous, it can and likely does return before before those writes to channels are complete
so you may be building up a large back log of channels ops without waiting for them to complete
I'm not exactly sure what you're trying to do (a picture might help), so it's hard to give advice on what you should do. I might try:
(defn send-cycles-to-next-channel
[chan-map writer-chan archi-factors cycle]
(async/go (let [chan (get chan-map (count cycle))]
(cond
(cycle-complete? cycle) (a/>! writer-chan (str (readble-cycle archi-factors cycle) "\n"))
chan (a/>! chan cycle)
:else (taoensso.timbre/error "No channel for this cycle" cycle)))))
and then call it like:
(<! (send-cycles-to-next-channel chan-map writer-chan cycle))))
note: I wrote this outside my editor so the formatting and parens might be wrong.what @U0NCTKEV8 said
Thanks, I’ll try that tomorrow. I am producing a list subcycles of a highly symmetrical cyclic graph. The concept is take all paths in the graph as they branch. The list is like this:
A.B.C - C.D.E - A.B.C
A.B.C - C.D.E - D.E.F - B.D.E - A.B.C
A.B.C - C.D.E - D.E.F - B.D.E - A.D.E - A.B.C
...
A.B.C
is the name of the node.
This is a reduced view of the graph (many paths are missing)Fantastic, seems to be working thank you both @U0NCTKEV8 @U7RJTCH6J. As a thank you here’s a piece made with this scale/tuning. It explores a couple of cycles from the graph above, and that’s why it inspired me to make a list of all possible cycle (which are far more than I expected). https://anaphoria.bandcamp.com/album/the-creation-of-the-worlds