missionary

J 2024-12-30T15:18:19.197739Z

Hi guys! It’s allowed to do this:

(def task
  (m/sp
    ...
    (completable-future-wrapper #(...) #(...))
    ...))
I use m/memo a lot and it’s a convenient function but sometimes I dont’t want the result of a process. Just fire and forget it.

xificurC 2024-12-30T15:36:04.868239Z

completable-future-wrapper returns a missionary task? It is allowed, but you lose what you get from task composition, e.g. if the wrapper m/sp dies it won't cancel this rogue task

xificurC 2024-12-30T15:37:46.318209Z

I'd look at compel, join, race to keep supervision

J 2024-12-30T21:24:53.383499Z

In my mind completable-future-wrapper it's a wrapper that launch a CompletableFuture with a success and failure callback. I use join a lot too but here it's not that expected. For example:

(def task
  (m/join
    {}
    (m/via m/blk (Thread/sleep 1000) (prn "Hello")
    (m/sp "World")))
Here, World is returned after 1s (and Hello is printed at the same time). I'm looking for the behaviour or World is printed without wait 1s.

xificurC 2024-12-30T21:54:38.833049Z

hard to move forward without more context. I'd slice the tasks up in this particular case

(m/join {}
  (m/via m/blk (Thread/sleep 1000) (prn "Hello"))
  (m/sp "World"))

J 2024-12-30T22:11:27.715349Z

Sorry for the lack of context. So I have a function that return a missionary task that I use in a ring async handler. This task return a ring response and sometimes the task should make an http call:

(defn ?task
  [ring-request]
  (m/sp
     ... do some computation ...
     (when should-make-http-call?
        (m/? (?make-http-call ...))
     {:status 204}))
Here the ring response is returned after the end of the ?make-http-call. Ideally, I want fire the http call without waiting for the response.

leonoel 2024-12-31T09:17:21.834059Z

If you don't need supervision, it's OK to just run effects. Just make sure to not block the thread

leonoel 2024-12-31T09:19:12.978849Z

Call the task as a function, pass two no-ops callbacks

👍 1
Ben Sless 2024-12-30T05:43:04.231269Z

What's the correct way to produce a discrete flow from a process which receives a callback and invokes it for each incoming message? In my particular case it's a web socket client which receives a message stream

Dustin Getz (Hyperfiddle) 2025-01-06T11:49:03.813919Z

well, whether the flow is coming out of an atom watch, or m/seed, or underneath a switching if, or a datastructure containing many flows, or a flow returning function, or some other flow of flows, i think it is all the same to missionary

Dustin Getz (Hyperfiddle) 2025-01-02T21:28:46.130719Z

i think you can e.g. put a missionary flow into an atom and watch/join it all at runtime

Dustin Getz (Hyperfiddle) 2025-01-02T21:29:37.772569Z

and if that works, then a lot of fancy stuff can probably work too

Hendrik 2024-12-30T08:32:44.340359Z

Did you look at https://cljdoc.org/d/missionary/missionary/b.42/api/missionary.core#observe ?

Ben Sless 2024-12-30T08:35:32.464129Z

yes, but I'm not sure how to get the discrete flow "out" of my constructor. It's as trivial as

(defn create
  ^WebSocketClient
  [& {:keys [uri
           on-open
           on-close
           on-message
           on-error]
    :or {on-open noop
         on-close noop
         on-message println
         on-error println}}]
  (proxy [WebSocketClient] [uri]
    (onOpen [handshake-data]
      (on-open handshake-data))
    (onClose [code reason remote?]
      (on-close code reason remote?))
    (onMessage [msg]
      (on-message msg))
    (onError [ex]
      (on-error ex))))
observe returns the flow, but what am I going to observe in this case?

Hendrik 2024-12-30T08:47:19.841469Z

cannot test at the moment, but something like this should work:

(let [cb (fn [s]
             (let [ws (create {:on-open #(s %)
                               :on-message #(s %)})]
               #(.onClose ws))
             )]
    (m/observe cb))

👀 1
Ben Sless 2024-12-30T08:56:57.180639Z

Will it block the callback thread if I don't relieve the stream?

Hendrik 2024-12-30T09:04:10.387119Z

from the docs:

On initialization, the process calls the subject with a fresh callback. Passing a value to the callback makes the process ready to transfer this value. While a transfer is pending, the callback blocks the calling thread until the transfer is complete. If the host platform doesn't support blocking, the callback throws an error instead

Ben Sless 2024-12-30T09:37:55.658019Z

That was my understanding from reading the docstring, just wanted to make sure

👍 1
leonoel 2024-12-30T09:38:06.917949Z

it will block the callback thread yes

leonoel 2024-12-30T09:41:04.936969Z

if blocking is not an option, you have to consume fast (e.g. m/relieve, m/buffer)

leonoel 2024-12-30T09:42:27.953299Z

The mbx / forever combo is just an infinite buffer

Ben Sless 2024-12-30T09:44:15.407139Z

Is there a sliding buffer or do I have to implement one myself?

leonoel 2024-12-30T09:55:23.311699Z

there is no sliding buffer like in core.async, if you have a compelling use case please share it

Ben Sless 2024-12-30T10:07:46.827729Z

I'll zoom out a bit and focus on what I'm trying to achieve - given I have an active connection to the websocket stream, I want to interactively develop flows on top of it. They can error, I may want to try several things. My thinking was the first step would be creating a flow I can subscribe to to get the data stream which wouldn't blow up memory on one hand, and would drop values if I don't process them fast enough. I expect to process them fast enough in the end but during development I care less. What's the correct workflow here? With core.async I'd use a channel with sliding buffer, pub and sub/unsub my experiments

Ben Sless 2024-12-31T06:37:34.686799Z

Would appreciate some directions or advice

Hendrik 2024-12-31T07:34:31.650049Z

Your idea to create a flow as near to the websocket as possible sounds sane because missionary gives you the building blocks to deal with backpressure in a simple way.How you deal with backpressure depends on your problem.your problem description is still very abstract …

Hendrik 2025-01-06T07:46:17.794789Z

i think you can e.g. put a missionary flow into an atom and watch/join it all at runtime
interesting approach. I have not thought about that, yet. Are there any missionary primitives to support that out of the box or would one have to write it. I guess it would be roughly like this?
(defn make-dynamic-flow [flow-atom]
  (let [flow-of-flows (e/watch flow-atom)]
    (m/ap 
      (m/?> (m/seed (m/?> (m/seed flow-flows)))))))

Ben Sless 2024-12-31T08:03:52.557909Z

That's why I rephrased the problem slightly - I want to create a flow from a web socket stream and develop interactively on it. Since my code might error and I'll be working at the REPL I don't want to affect the parent flow by making it shut down or blocking its callback thread. What's the best way to set it up and develop interactively at the REPL with that data source? Is there a way to "build out" a flow?

Hendrik 2024-12-31T09:16:07.224969Z

ah ok. so you want to change parts of the process dynamically with the repl? If so, I think that this cannot be easily done. missionary is mostly functions and a started process is basically a big opaque lambda. I guess that it is much easier to use some kind of reload workflow where you shutdown the old missionary process and start a new one. what is your usecase of keeping the parent flow alive?

Hendrik 2024-12-31T09:19:38.106219Z

btw. Electric uses missionary very dynamically and it changes the missionary process during runtime. However, it only can do this because the Electric compiler knows all branches at compile time (aka static knowledge). If you change the electric program, then this results in a recompilation

xificurC 2024-12-31T09:24:26.726859Z

I'd start with m/observe + m/relieve + m/stream to get a flow of websocket messages that can have multiple consumers. You can change the backpressure strategy to m/buffer or something else later

✍️ 1
Ben Sless 2024-12-30T07:48:58.196599Z

Mix of mbx and forever?

Ben Sless 2024-12-30T07:49:08.845509Z

I can't block the callback thread