missionary

denik 2024-04-19T12:31:35.034969Z

how can one use missionary to stream values from the callback of a blocking function?

(defn blocking-fn [cb]
  (Thread/sleep 100)
  (cb 1)
  (Thread/sleep 100)
  (cb 2)
  )

(m/? (m/via m/blk (blocking-fn println)))                   
; => 1
; => 2

(m/? (m/reduce println (m/observe (fn [!] (! nil) (m/? (m/via m/blk (blocking-fn !)))))))
; Execution error (Error) at missionary.impl.Observe/1 (Observe.java:70).
; Can't process event - observer is not ready.

denik 2024-04-21T20:17:41.798949Z

so running the task like this is idiomatic:

(fn [!]
        ((m/via m/blk (blocking-fn !))
         (fn [_] (! ::done))
         (fn [_] (! ::fail)))))

leonoel 2024-04-21T20:39:38.484589Z

it's fine. For this problem the idiomatic solution would be the loop-amb pattern

denik 2024-04-21T23:53:40.376319Z

interesting. hmm, I don’t understand how loop-amb can work for callback based blocking functions

denik 2024-04-21T23:53:48.209689Z

is there an example somewhere?

denik 2024-04-21T23:55:10.645919Z

note that in the real world usecase the blocking function (or task) invokes the callback multiple times while streaming new values

denik 2024-04-21T23:56:33.720209Z

fwiw, I’m happy to use the existing version. but I’m curious to see what a more elegant solution would look like

leonoel 2024-04-20T08:34:58.009699Z

In your case it's not impacting because there is only one writer thread and the pipeline consumes all values immediately (due to m/reduce)

👌 1
leonoel 2024-04-22T14:49:29.933099Z

the loop-amb pattern would look like this (untested)

Hendrik 2024-04-19T13:28:25.488149Z

you cannot use m/? inside the function, which is passed to observer. I think, that you should use a mailbox as intermediate to communicate between the thread and the flow

denik 2024-04-19T13:50:14.012699Z

hmm, mailbox looks like it returns a task on 2 arity. I’m looking for a flow to continuously stream values

denik 2024-04-19T14:07:08.587469Z

hmm this seems like a roundabout solution

denik 2024-04-19T14:07:38.908419Z

if blocking-fn was instead async the observe would just work, correct?

leonoel 2024-04-19T14:11:51.640029Z

I have this solution in search for a problem https://github.com/leonoel/missionary/issues/93 Your example could look like this

(m/observe
  (fn [!]
    ((m/via m/blk (blocking-fn !))
     (fn [_] (! :done))
     (fn [_] (! :fail)))))

xificurC 2024-04-19T14:12:37.333179Z

describe "just work", I think (m/? (m/reduce #(prn %2) nil (m/observe (fn [!] (blocking-fn !) #()) should run, blocking the current thread

leonoel 2024-04-19T14:13:29.611719Z

it's not correct to block the thread calling the m/observe constructor

xificurC 2024-04-19T14:13:41.539049Z

I see

denik 2024-04-19T14:22:39.618489Z

thank you!

denik 2024-04-19T14:22:44.861649Z

ultimately this works

(m/?
  (m/reduce
    (fn [x y]
      (println :x x :y y)
      (case y
        (::done ::fail) (reduced x)
        (conj x y)))
    []
    (m/observe
      (fn [!]
        ((m/via m/blk (blocking-fn !))
         (fn [_] (! ::done))
         (fn [_] (! ::fail)))))))

denik 2024-04-19T14:23:04.380539Z

I thought it was not idiomatic to start a task process inside the observe

leonoel 2024-04-19T14:23:54.933849Z

as long as you return the cancellation handler, that's fine

leonoel 2024-04-19T14:24:38.546149Z

the problem is, currently the producer thread can be faster than the consumer, hence the issue

leonoel 2024-04-19T14:29:36.494399Z

can you provide more context to motivate the fix ?

denik 2024-04-19T14:38:41.682209Z

yes, the general use case is and http-client streaming a response (in this case an Anthropic’s Claude API but it could also be chatGPT). I’m looking to stream the response as new tokens come in and render them immediately in the UI.

denik 2024-04-19T14:42:21.257349Z

example code

denik 2024-04-19T18:13:07.664219Z

@leonoel just tired b.38, the following is still not working:

leonoel 2024-04-19T19:01:57.011479Z

Blocking in the observe constructor is still forbidden. If you need to block you have to do it in another thread, e.g. with m/via

leonoel 2024-04-19T19:03:54.416179Z

The b.38 fix just blocks the callback instead of throwing the backpressure error

👌 1