how can one use missionary to stream values from the callback of a blocking function?
(defn blocking-fn [cb]
(Thread/sleep 100)
(cb 1)
(Thread/sleep 100)
(cb 2)
)
(m/? (m/via m/blk (blocking-fn println)))
; => 1
; => 2
(m/? (m/reduce println (m/observe (fn [!] (! nil) (m/? (m/via m/blk (blocking-fn !)))))))
; Execution error (Error) at missionary.impl.Observe/1 (Observe.java:70).
; Can't process event - observer is not ready.so running the task like this is idiomatic:
(fn [!]
((m/via m/blk (blocking-fn !))
(fn [_] (! ::done))
(fn [_] (! ::fail)))))it's fine. For this problem the idiomatic solution would be the loop-amb pattern
interesting. hmm, I don’t understand how loop-amb can work for callback based blocking functions
is there an example somewhere?
note that in the real world usecase the blocking function (or task) invokes the callback multiple times while streaming new values
fwiw, I’m happy to use the existing version. but I’m curious to see what a more elegant solution would look like
In your case it's not impacting because there is only one writer thread and the pipeline consumes all values immediately (due to m/reduce)
the loop-amb pattern would look like this (untested)
you cannot use m/? inside the function, which is passed to observer. I think, that you should use a mailbox as intermediate to communicate between the thread and the flow
hmm, mailbox looks like it returns a task on 2 arity. I’m looking for a flow to continuously stream values
hmm this seems like a roundabout solution
if blocking-fn was instead async the observe would just work, correct?
I have this solution in search for a problem https://github.com/leonoel/missionary/issues/93 Your example could look like this
(m/observe
(fn [!]
((m/via m/blk (blocking-fn !))
(fn [_] (! :done))
(fn [_] (! :fail)))))
describe "just work", I think (m/? (m/reduce #(prn %2) nil (m/observe (fn [!] (blocking-fn !) #()) should run, blocking the current thread
it's not correct to block the thread calling the m/observe constructor
I see
thank you!
ultimately this works
(m/?
(m/reduce
(fn [x y]
(println :x x :y y)
(case y
(::done ::fail) (reduced x)
(conj x y)))
[]
(m/observe
(fn [!]
((m/via m/blk (blocking-fn !))
(fn [_] (! ::done))
(fn [_] (! ::fail)))))))I thought it was not idiomatic to start a task process inside the observe
as long as you return the cancellation handler, that's fine
the problem is, currently the producer thread can be faster than the consumer, hence the issue
can you provide more context to motivate the fix ?
yes, the general use case is and http-client streaming a response (in this case an Anthropic’s Claude API but it could also be chatGPT). I’m looking to stream the response as new tokens come in and render them immediately in the UI.
example code
@leonoel just tired b.38, the following is still not working:
Blocking in the observe constructor is still forbidden. If you need to block you have to do it in another thread, e.g. with m/via
The b.38 fix just blocks the callback instead of throwing the backpressure error