Fork me on GitHub
#core-async
<
2020-07-23
>
fiddlerwoaroof20:07:07

Is there a transducer like paritition-all that times out if there are elements in a batch?

fiddlerwoaroof20:07:51

for example, I have a core-async pipeline listening to a channel, and I want to process the elements in batches of 10, unless I have some elements queued up and 500ms have passed?

noisesmith20:07:02

so batching by count / timeout, whichever comes first?

fiddlerwoaroof20:07:14

yeah, that would work too

fiddlerwoaroof20:07:24

I was thinking timeout between consecutive items on the channel

noisesmith20:07:46

I can imagine how to write this (using a timeout chan plus fresh collection as loop variable), and I think it would be small / simple, but I don't know of it existing

fiddlerwoaroof20:07:57

Cool, I hate re-inventing the wheel for simple utilities like this: I always think there's some library with all the utilities I want

fiddlerwoaroof20:07:42

It's a bit tricky, because I want a transducer: so I think you'd have to do some kind of stateful transducer that occasionally fires the reducing function without a new element coming in, or something like that

noisesmith20:07:00

the natural thing would be to wrap the original channel with an alt / alts over that channel and a timeout, then creating a new timeout on each complete chunk

noisesmith20:07:31

but that would consume and wrap your channel, unlike a transducer which is wrapped by the channel

fiddlerwoaroof20:07:10

Yeah, I have an ETL pipeline that basically works by providing a kafka topic, a target channel and a transducer

fiddlerwoaroof20:07:32

I want to batch up the messages from kafka in one particular case, without rewriting everything

hiredman20:07:47

it isn't possible to do that with a transducer

noisesmith21:07:30

thanks - I was thinking that would be provable, but was stumped on the proof

hiredman21:07:10

a reducing function gets two arguments, and a transducer (a transformation of a reducing function) might cause it to close over something, so call it three arguments: state accum value

hiredman21:07:23

in a purely functional context the reducing function can only do things with those 3 arguments, and until it is done the thread doing it is blocked

hiredman21:07:50

(purely functional isn't exactly what I want here, I mean a context where the return value is important)

hiredman21:07:27

for core.async channels the return value isn't really important because the reducing function is mutating something

hiredman21:07:17

erm, actually sorry, the reducing function would be four arguments, not three: original reducing function, state, accum, value

hiredman21:07:54

so a transducer on a channel could stick those three things in a queue somewhere to be run at another time

hiredman21:07:46

but that would be very unsafe because you will mutating the internals of a channel without the channel's lock being held

hiredman21:07:21

so for the pure model it is not possible because the transducer/reducing function has to yield control to the actual reducing code, and practically in core.async where you might use another thread to retain control after returning, it is unsafe because of locks

fiddlerwoaroof21:07:38

Ok, so I think that's what I was wondering, it's unsafe to call the reducing function asynchronously?

fiddlerwoaroof21:07:10

e.g.

([coll next]
  (future (sleep 10)
          (rf coll (inc next)))
  (rf coll next))

fiddlerwoaroof21:07:39

In this case, line 3 could potentially cause some issue with the channel's locks?

ghadi21:07:29

if you are looking for a pulsed+batching process, it's very straightforward

fiddlerwoaroof21:07:32

That's a weird abstraction leak, to me 🙂

ghadi21:07:52

that would still be broken whether it was core.async or not

ghadi21:07:13

the return value of a reducing function generally needs to be used

fiddlerwoaroof21:07:14

Oh yeah, because coll is wrong

fiddlerwoaroof21:07:03

I know that way 🙂

fiddlerwoaroof21:07:31

I already have a transducer and I wanted to add a step to batch the incoming messages to the transducer

ghadi21:07:10

as mentioned before, not possible to factor in timing delays in a transducer

fiddlerwoaroof21:07:54

Yeah, I guess it makes sense