core-async

2024-11-25T15:52:41.821369Z

At the end of the doc for pub it says

Note that if buf-fns are used then each topic is handled
  asynchronously, i.e. if a channel is subscribed to more than one
  topic it should not expect them to be interleaved identically with
  the source.
I have a case where I DO want output to be interleaved identically from the source and am not using buffer functions or any buffers at all but I'm finding that output is still not interleaved the same way input is
(let [input-ch (a/chan)
      pub   (a/pub input-ch :topic)
      ch1      (a/chan)
      _        (a/sub pub :A ch1)
      _        (a/sub pub :B ch1)
      messages
      (for [x (range 1000)]
        {:seq x :topic (rand-nth [:A :B])})
      result (a/chan)]
  (a/go
    (let [all (a/! result
        (keep identity
          (map
            (fn [x y] (when (not= x y) [x y]))
            (map :seq all)
            (range))))))
  (a/go
    (doseq [msg messages]
      (a/>! input-ch msg))
    (a/close! ch1))
  (a/
what's the reason for this?

Alex Miller (Clojure team) 2024-11-25T16:23:09.142969Z

because different threads are handling it asynchronously

Alex Miller (Clojure team) 2024-11-25T16:36:40.852939Z

but I understand your question - if pub publishes unbuffered to a topic mult and mult delivers in parallel synchronously, then you are expecting order

Alex Miller (Clojure team) 2024-11-25T16:41:56.280179Z

each mult having its own go loop and there is an additional channel hop there and that introduces the asynchronicity

2024-11-25T17:08:38.942209Z

Ok yeah I see now, I think. From the pub source code, since each of these mult chan's at (1) are being read from independently by separate threads (one per mult), the pub input may be read multiple times before the downstream processing is fully handled. If it were just a single mult, the >! at (1) would park until the final write of the mult was handled.

; from pub
(go-loop []
  (let [val (<! ch)]
    (if (nil? val)
      (doseq [m (vals @mults)]
        (close! (muxch* m)))
      (let [topic (topic-fn val)
            m     (get @mults topic)]
        (when m
          (when-not (>! (muxch* m) val) ; (1)
            (swap! mults dissoc topic)))
        (recur)))))
Thanks for the help