What is the behavior of pipeline-blocking wr/t stateful transducers? I'm currently trying to use one with it and it seems like it's calling the finishing arity of the transducer more often than expected?
you cannot use pipeline with stateful transducers
interesting. even something like (take 10)?
@ghadi Is that because you can't guarantee all the invocations would happen on the same thread?
It looks like the implementation doesn't even try, it treats each new element as a unique transducing context
why look in the impl when you can look in the docstring
> Because > it is parallel, the transducer will be applied independently to each > element, not across elements, and may produce zero or more outputs > per input.
I have a variant of pipe somewhere that takes any transducer, including stateful ones
you could also make another channel with the transducer on it and pipe to it, but you must use a variant of pipe that uses threads not go if your transducer can block, else you screw up the go pool
it all depends on what you're trying to achieve
@suskeyhose what need initially led you to pipeline-blocking?
Trying to do a blocking operation to upload some data to an s3 bucket to populate a redshift database. Specifically I made a stateful transducer that handles batching values based on partition keys, then used a pipeline blocking to take the batches and upload them to redshift, and then the output was the raw batch data which then went into an aggregation with a/reduce to determine some aggregated information about the data uploaded.
I suppose I could split this in some way to make it so that the batching is done on a single channel before the pipeline blocking rather than being handled inside the transducer passed to pipeline-blocking
Actually yeah, that makes sense.
It can be constructed using 2 pipelines communicating with a stateful loop-recur handling state between the two. in | pipeline | out -> in | loop/recur (stateful) | out -> in | pipeline | out Not a simple pipeline as provided by the library, but the model can be built.
Yeah, what I ended up settling on was putting the stateful transducer on the output channel itself rather than passing it to pipeline-blocking, which produced the behavior I wanted.
Ah, I see; it appears that each input message is handled as an entirely separate transduction