At the end of the doc for pub it says
Note that if buf-fns are used then each topic is handled
asynchronously, i.e. if a channel is subscribed to more than one
topic it should not expect them to be interleaved identically with
the source.
I have a case where I DO want output to be interleaved identically from the source and am not using buffer functions or any buffers at all but I'm finding that output is still not interleaved the same way input is
(let [input-ch (a/chan)
pub (a/pub input-ch :topic)
ch1 (a/chan)
_ (a/sub pub :A ch1)
_ (a/sub pub :B ch1)
messages
(for [x (range 1000)]
{:seq x :topic (rand-nth [:A :B])})
result (a/chan)]
(a/go
(let [all (a/! result
(keep identity
(map
(fn [x y] (when (not= x y) [x y]))
(map :seq all)
(range))))))
(a/go
(doseq [msg messages]
(a/>! input-ch msg))
(a/close! ch1))
(a/
what's the reason for this?because different threads are handling it asynchronously
but I understand your question - if pub publishes unbuffered to a topic mult and mult delivers in parallel synchronously, then you are expecting order
each mult having its own go loop and there is an additional channel hop there and that introduces the asynchronicity
Ok yeah I see now, I think. From the pub source code, since each of these mult chan's at (1) are being read from independently by separate threads (one per mult), the pub input may be read multiple times before the downstream processing is fully handled. If it were just a single mult, the >! at (1) would park until the final write of the mult was handled.
; from pub
(go-loop []
(let [val (<! ch)]
(if (nil? val)
(doseq [m (vals @mults)]
(close! (muxch* m)))
(let [topic (topic-fn val)
m (get @mults topic)]
(when m
(when-not (>! (muxch* m) val) ; (1)
(swap! mults dissoc topic)))
(recur)))))
Thanks for the help