Fork me on GitHub
#core-async
<
2024-01-28
>
yedi02:01:34

I'm using core async to manage a data stream. I need to dispatch incoming datums to one or more "collector" channels. I have a coordinator (a go-loop) that pulls incoming maps off a data channel and determines which collector channels it should to put the datum to. It also needs to manage the creation of those channels as well as creating the collector go-loop that collects the datums into a chunk. Each collector should only live for some configured max collection time. So far, I've implemented it such that the collectors are managing the timeout and putting a value on the main data channel that the coordinator pulls from. See pseudo-ish code below:

(defn create-collector! [collect-ch data-ch]
  (let [timeout-ch (a/timeout max-collection-time)]
    (a/go-loop
     (let [msg (a/! data-ch {:timedout collect-ch})

         :else
         (collect-datum! msg))))))

(defn coordinator [data-ch]
  (a/go-loop [collector-state {}]
    (let [datum (a/! timedout :end)
          (recur (remove-timedout-collector collector-state timedout)))
        (let [{:keys [chs-to-put chs-to-create new-collector-state]} (partition-datum collector-state datum)]
          (doseq [c chs-to-put]
            (a/>! c datum))
          (doseq [c chs-to-create]
            (create-collector! c data-ch)
            (a/>! c datum))
          (recur new-collector-state))))))
But now I'm wondering if this is giving the collectors too much responsibility? Or if it's bad practice to have a consumer put messages onto a channel used upstream? Would it make more sense for the coordinator to manage the timeouts for each collector. Meaning the coordinator would create the timeout channels for collectors it creates (or have it returned by create-collector!) and would need to alts! on the data-ch + all the timeout channels it's keeping track of. I'm wondering which approach feels cleanest? I kinda feel like none of these options are perfect and maybe I'm missing a better way to manage this kind of workflow in core.async

hiredman03:01:59

Seems fine, but the alts! usage is not correct

hiredman03:01:47

alts! doesn't return a channel to take from, it returns a pair of the channel the op succeed on and the return value of the op

hiredman03:01:42

Actually I forget, there is alts! and alt! one of them takes a collection of ops and returns like I said, the other is like cond over channel ops, the cond like version might be what you want

1
hiredman03:01:13

I would consider not sending :end to indicate a shutdown, but close the channel instead

yedi03:01:01

ah, ok thank you

hiredman03:01:19

Actually, yeah, I think it does make more sense for the coordinator to manage the timeouts and just close the input channel to the collector instead of that kind of ping pong back and forth

1
yedi03:01:56

I would consider not sending :end to indicate a shutdown, but close the channel insteadin the real implementation, there's a finalization step that the collector has, so sending the :end would trigger that. Unless there's a better way to denote the collection ending? I guess takes would return nil? and i could just not recur after seeing a nil msg yea i'll go with this approach