Fork me on GitHub
#core-async
<
2021-06-25
>
Franco Gasperino20:06:53

I'm looking to express the following: • A sequence of channels are expected to have a single keyword value to indicate if the prodcuer attached to the channel has started (:started) or encountered an error (:error) • All channels should be read. • A timeout should be applied to the read of all channels as a sequence to prevent deadlocks from an unresponsive producer. I've come up with the following, and am looking for a quick yea/nay on the idiomatic style.

(def c1 (async/chan 1))
  (def c2 (async/chan 1))
  (def c3 (async/chan 1))

  (doseq [c [c1 c2 c3]]
    (async/put! c :started))

  (defn started? [& responses]
    (every? #(= :started %) responses))

  (def c4 (async/map started? [c1 c2 c3]))
  
  (let [t (async/timeout 5000)
        [v p] (async/alts!! [c4 t])]
    (cond 
      (= p t) "Timed out!"
      (true? v) "All channels started!"
      (false? v) "One or more channels failed to start!"
      :default "Unknown case"))

Franco Gasperino17:06:17

Odd that map would act that way, as map returns a channel which (i would think) could be composed as such. If i execute my example without the doseq put! operation on c1 c2 c3, it does in fact time out at the repl.

Franco Gasperino17:06:28

the same timeout behavior if i put! on c1 c2, but not c3

Franco Gasperino17:06:06

a put! on all 3 channels results in "all channels started!"

hiredman18:06:04

The thing to understand is map is starting a go block which is basically copying from the input to the output. And your timeout has no effect on that process

hiredman18:06:23

It sits there trying to copy until the cows come home

hiredman18:06:38

In particular for map, leaving the map process sitting around doing that copying likely is not a big deal

hiredman18:06:23

If I recall the output channel is unbuffered and the map process will just wedge writing to it until it gets gc'ed

hiredman18:06:18

But in general leaving that kind of process sitting around can cause things to behave unexpectedly (if the channel is buffered, or channels aren't being used as one shot value conveyors)

hiredman18:06:23

At the very least I think you'd be better off using something like merge over map, because mapping is ordered and you don't car about order

Franco Gasperino15:06:57

acknowledged. thanks

hiredman21:06:04

if you use map like that, it will hang around reading from the channels regardless of the timeout

hiredman21:06:26

you need to pass all the channels together to alt

hiredman21:06:51

(let [t (async/timeout 5000)]
  (loop [channels [c1 c2 c3]]
    (if (seq channels)
      (let [[v p] (async/alts!! (conj channels t))]
        (cond (= p t)
              "Timed out!"
              (= v :started)
              (recur (remove #{p} channels))
              :else
              "One or more channels failed to start!"))
      "All channels started!")))