Fork me on GitHub
#core-async
<
2021-06-09
>
Faiz Halde17:06:00

Probably a question that’s already solved. We wanted to make use of transducers with core async channels. The reason for using transducers is because the pipeline recipe can be leveraged in several situations. Here is the transducer flow

(comp
 (map pre-fn)
 (filter some-pred)
 (buffer ms N)
 ;; don't know how to implement this stateful buffer
 ;; the transducer should buffer inputs and emit when ms millisecond has passed or N elems are accumulated, whichever comes first
 (map (post-fn)))
Also, we wanted to make sure each step here continues reading from upstream only after the entire pipe is processed till the end. i.e. if (map post-fn) is currently executing, then (map pre-fn) should not consume from its upstream channel. Think of this like a pull based model (reactive streams). While this is not a major blocker, I’d be curious how to do this with core async For now, we couldn’t do the entire pipeline using transducers so the buffer logic is temporarily built using
(defn buffer
  [in ms N]
  (let [out (chan)]
    (go
      (loop [b [] timer nil]
        (if timer
          (alt!
            timer (do (>! out b)
                      (recur [] nil))
            in ([v]
                (let [new-buffer (conj b v)]
                  (if (= N (count new-buffer))
                    (do
                      (>! out new-buffer)
                      (recur [] nil))
                    (recur new-buffer timer)))))
          (recur [(<! in)] (timeout ms)))))
    out))
and the two ends are connected manually something like this
(let [source (chan (comp (map pre-fn) (filter some-pred)))
      buffering (buffer source ms N)]
  (loop []
    (post-fn (<! buffering))
    (recur)))

Alex Miller (Clojure team)17:06:08

transducers are a pull-based model, and core.async channels can have a transducer running on them. unclear to me what that doesn't satisfy?

Faiz Halde18:06:04

ok sorry, i was under the assumption that each step in the transducer would run concurrently. i.e once the map mapped and sent it to the filter step, the map step would read from the input channel whilst the previous element is processed by the filter. should've ran some experiments

Faiz Halde18:06:02

every element goes through the entire transducer steps before the next element gets consumed

Faiz Halde18:06:39

so the only thing that remains is how to implement a buffer transducer

hiredman18:06:34

strictly speaking, you cannot

Faiz Halde18:06:38

does a transducer with timer make any sense? coming from rxjava world it breaks back pressure and had a few issues

Faiz Halde18:06:38

https://clojurians.slack.com/archives/C05423W6H/p1623261874016800?thread_ts=1623261548.015700&amp;cid=C05423W6H hmm. that's fine. akka streams solves this for us. just wondered if at all it was possible to do so with core async before bringing in a new framework

hiredman18:06:55

the way you solve it with core.async is something like your buffer fn

hiredman18:06:49

the limitation is with something inherit to the model of transducers, outside of transducers you can do whatever

Faiz Halde19:06:44

ya actually I'd rather not go for a new framework just to be able to write a linear pipeline 😛

Faiz Halde19:06:43

so anyway what's the issue with buffer? where does it go wrong? i mean we have stateful transducers. won't this be a stateful transducer? or are you saying the timer makes the transducer non deterministic and that's the problem

hiredman19:06:07

A transducer is a a transformation of a step function (a function like you would pass to reduce)

hiredman19:06:32

And the step function interface/contract does something, then yields control back to the caller

hiredman19:06:00

It can't hang around and wait and see

🙌 2
jjttjj00:06:45

What about something like https://github.com/cgrand/xforms/blob/62375212a8604daad631c9024e9dbe1db4ec276b/src/net/cgrand/xforms.cljc#L585 where time is determined from the item being passed? You could also use reductions in that library to build up a vector until one of your two conditions is met

Faiz Halde05:06:40

^ hard to grasp my head around this. “where time is determined from the item being passed” — if nothing comes, then who triggers the time :thinking_face: e.g. N=5 and TS=1000ms, we buffered 3 elements so far and nothing comes in now and TS passed as per the last 3 element is below 1000ms

Faiz Halde05:06:57

i could be wrong. the xforms library had already gone above my head because of my weak understanding of the transducers mental model

jjttjj10:06:04

I just meant if are processing maps that each have a :timestamp field you can use those to measure time and do the batching

jjttjj10:06:01

But that's true you then won't get batches completing until a new thing is processed

jjttjj10:06:15

But I guess that's illustrative of the limitations of transducers talked about earlier in the thread, you can't do anything that's not directly in response to an input

Faiz Halde06:06:00

i was inspired by flink’s checkpointing mechanism where it periodically sends barriers into the stream. i did the same thing at the source level by emitting flush events periodically (timeout) alongside messages. with that, I was able to use partition-by with a stateful function 🙂 ^ thanks to your suggestion I was able to make this insight. the solution still needs a bit of furbishing

Faiz Halde06:06:18

and i’m still testing it ^ let me know if this has any gotchas

hiredman18:06:07

the time based batching in the middle