missionary

Lovro 2026-04-10T22:06:32.592449Z

@leonoel i ran into an issue that we https://clojurians.slack.com/archives/CL85MBPEF/p1752396898983449?thread_ts=1752314760.885599&cid=CL85MBPEF, namely how to reduce over a flow with a task-producing reducing function. the point is to model a sequence of tasks where each task depends on the previous one's execution, something like acc_{n+1} = exec(rf(acc_n, x_n)) where x_i are the elements of the flow. i've outlined 3 different implementations of a hypothetical task-reduce https://github.com/hokomo/missionary-lab/blob/master/src/missionary_lab/reduce.clj, only one of which is actually correct (the imperative one that we had mentioned). do you have an idea why task-reduce2 deadlocks on the rdv's "give"? why does the same not happen in your async-state-machine example? poll and feedback are as in your example except that feedback returns the value instead of nil. i feel like i'm definitely missing something simple

leonoel 2026-04-12T12:29:49.709819Z

The second works fine until end of stream is reached. When the seed terminates, zip cancels the poll, then the feedback process hangs because the rdv put never resolves. Using a mailbox solves the problem :

(defn feedback [f & args]
  (m/ap
    (let [m (m/mbx)
          x (m/?> (apply f (poll m) args))]
      (m x)
      x)))
Backpressure with cycles is very tricky

Lovro 2026-04-12T14:37:04.407099Z

ah, i think i was mixing up flow termination vs. cancellation a bit. for some reason i thought that feedback would terminate "immediately" (via the termination of seed, then zip, then (apply f (poll rdv) args)), but termination is a graceful operation and therefore feedback still has to run its last branch until the end, which contains the problematic (m/? (rdv x). that clears it up, thanks! tricky indeed! 🙂

leonoel 2026-04-12T14:44:21.925959Z

I still haven't figured out what should be the correct behavior of feedback wrt backpressure