core-async

2026-04-03T15:51:18.652229Z

What is the behavior of pipeline-blocking wr/t stateful transducers? I'm currently trying to use one with it and it seems like it's calling the finishing arity of the transducer more often than expected?

ghadi 2026-04-03T15:59:48.192929Z

you cannot use pipeline with stateful transducers

dpsutton 2026-04-03T16:01:56.321299Z

interesting. even something like (take 10)?

seancorfield 2026-04-03T16:24:39.666989Z

@ghadi Is that because you can't guarantee all the invocations would happen on the same thread?

2026-04-03T16:32:08.862909Z

It looks like the implementation doesn't even try, it treats each new element as a unique transducing context

ghadi 2026-04-03T17:13:15.540679Z

why look in the impl when you can look in the docstring

ghadi 2026-04-03T17:13:47.842589Z

> Because > it is parallel, the transducer will be applied independently to each > element, not across elements, and may produce zero or more outputs > per input.

ghadi 2026-04-03T17:16:17.007169Z

I have a variant of pipe somewhere that takes any transducer, including stateful ones

ghadi 2026-04-03T17:17:48.417459Z

you could also make another channel with the transducer on it and pipe to it, but you must use a variant of pipe that uses threads not go if your transducer can block, else you screw up the go pool

ghadi 2026-04-03T17:18:43.351299Z

it all depends on what you're trying to achieve

ghadi 2026-04-03T17:19:47.440949Z

@suskeyhose what need initially led you to pipeline-blocking?

2026-04-03T17:33:19.482229Z

Trying to do a blocking operation to upload some data to an s3 bucket to populate a redshift database. Specifically I made a stateful transducer that handles batching values based on partition keys, then used a pipeline blocking to take the batches and upload them to redshift, and then the output was the raw batch data which then went into an aggregation with a/reduce to determine some aggregated information about the data uploaded.

2026-04-03T17:34:57.559049Z

I suppose I could split this in some way to make it so that the batching is done on a single channel before the pipeline blocking rather than being handled inside the transducer passed to pipeline-blocking

2026-04-03T17:36:07.256419Z

Actually yeah, that makes sense.

Franco Gasperino 2026-04-09T00:23:54.343529Z

It can be constructed using 2 pipelines communicating with a stateful loop-recur handling state between the two. in | pipeline | out -> in | loop/recur (stateful) | out -> in | pipeline | out Not a simple pipeline as provided by the library, but the model can be built.

2026-04-14T17:38:34.053449Z

Yeah, what I ended up settling on was putting the stateful transducer on the output channel itself rather than passing it to pipeline-blocking, which produced the behavior I wanted.

2026-04-03T15:58:59.500509Z

Ah, I see; it appears that each input message is handled as an entirely separate transduction