Hi guys! It’s allowed to do this:
(def task
(m/sp
...
(completable-future-wrapper #(...) #(...))
...))
I use m/memo a lot and it’s a convenient function but sometimes I dont’t want the result of a process. Just fire and forget it.completable-future-wrapper returns a missionary task? It is allowed, but you lose what you get from task composition, e.g. if the wrapper m/sp dies it won't cancel this rogue task
I'd look at compel, join, race to keep supervision
In my mind completable-future-wrapper it's a wrapper that launch a CompletableFuture with a success and failure callback.
I use join a lot too but here it's not that expected. For example:
(def task
(m/join
{}
(m/via m/blk (Thread/sleep 1000) (prn "Hello")
(m/sp "World")))
Here, World is returned after 1s (and Hello is printed at the same time).
I'm looking for the behaviour or World is printed without wait 1s.hard to move forward without more context. I'd slice the tasks up in this particular case
(m/join {}
(m/via m/blk (Thread/sleep 1000) (prn "Hello"))
(m/sp "World"))Sorry for the lack of context. So I have a function that return a missionary task that I use in a ring async handler. This task return a ring response and sometimes the task should make an http call:
(defn ?task
[ring-request]
(m/sp
... do some computation ...
(when should-make-http-call?
(m/? (?make-http-call ...))
{:status 204}))
Here the ring response is returned after the end of the ?make-http-call. Ideally, I want fire the http call without waiting for the response.If you don't need supervision, it's OK to just run effects. Just make sure to not block the thread
Call the task as a function, pass two no-ops callbacks
What's the correct way to produce a discrete flow from a process which receives a callback and invokes it for each incoming message? In my particular case it's a web socket client which receives a message stream
well, whether the flow is coming out of an atom watch, or m/seed, or underneath a switching if, or a datastructure containing many flows, or a flow returning function, or some other flow of flows, i think it is all the same to missionary
i think you can e.g. put a missionary flow into an atom and watch/join it all at runtime
and if that works, then a lot of fancy stuff can probably work too
Did you look at https://cljdoc.org/d/missionary/missionary/b.42/api/missionary.core#observe ?
yes, but I'm not sure how to get the discrete flow "out" of my constructor. It's as trivial as
(defn create
^WebSocketClient
[& {:keys [uri
on-open
on-close
on-message
on-error]
:or {on-open noop
on-close noop
on-message println
on-error println}}]
(proxy [WebSocketClient] [uri]
(onOpen [handshake-data]
(on-open handshake-data))
(onClose [code reason remote?]
(on-close code reason remote?))
(onMessage [msg]
(on-message msg))
(onError [ex]
(on-error ex))))
observe returns the flow, but what am I going to observe in this case?cannot test at the moment, but something like this should work:
(let [cb (fn [s]
(let [ws (create {:on-open #(s %)
:on-message #(s %)})]
#(.onClose ws))
)]
(m/observe cb))Will it block the callback thread if I don't relieve the stream?
from the docs:
On initialization, the process calls the subject with a fresh callback. Passing a value to the callback makes the process ready to transfer this value. While a transfer is pending, the callback blocks the calling thread until the transfer is complete. If the host platform doesn't support blocking, the callback throws an error instead
That was my understanding from reading the docstring, just wanted to make sure
it will block the callback thread yes
if blocking is not an option, you have to consume fast (e.g. m/relieve, m/buffer)
The mbx / forever combo is just an infinite buffer
Is there a sliding buffer or do I have to implement one myself?
there is no sliding buffer like in core.async, if you have a compelling use case please share it
I'll zoom out a bit and focus on what I'm trying to achieve - given I have an active connection to the websocket stream, I want to interactively develop flows on top of it. They can error, I may want to try several things. My thinking was the first step would be creating a flow I can subscribe to to get the data stream which wouldn't blow up memory on one hand, and would drop values if I don't process them fast enough. I expect to process them fast enough in the end but during development I care less. What's the correct workflow here? With core.async I'd use a channel with sliding buffer, pub and sub/unsub my experiments
Would appreciate some directions or advice
Your idea to create a flow as near to the websocket as possible sounds sane because missionary gives you the building blocks to deal with backpressure in a simple way.How you deal with backpressure depends on your problem.your problem description is still very abstract …
i think you can e.g. put a missionary flow into an atom and watch/join it all at runtime
interesting approach. I have not thought about that, yet.
Are there any missionary primitives to support that out of the box or would one have to write it. I guess it would be roughly like this?
(defn make-dynamic-flow [flow-atom]
(let [flow-of-flows (e/watch flow-atom)]
(m/ap
(m/?> (m/seed (m/?> (m/seed flow-flows)))))))That's why I rephrased the problem slightly - I want to create a flow from a web socket stream and develop interactively on it. Since my code might error and I'll be working at the REPL I don't want to affect the parent flow by making it shut down or blocking its callback thread. What's the best way to set it up and develop interactively at the REPL with that data source? Is there a way to "build out" a flow?
ah ok. so you want to change parts of the process dynamically with the repl? If so, I think that this cannot be easily done. missionary is mostly functions and a started process is basically a big opaque lambda. I guess that it is much easier to use some kind of reload workflow where you shutdown the old missionary process and start a new one. what is your usecase of keeping the parent flow alive?
btw. Electric uses missionary very dynamically and it changes the missionary process during runtime. However, it only can do this because the Electric compiler knows all branches at compile time (aka static knowledge). If you change the electric program, then this results in a recompilation
I'd start with m/observe + m/relieve + m/stream to get a flow of websocket messages that can have multiple consumers. You can change the backpressure strategy to m/buffer or something else later
Mix of mbx and forever?
I can't block the callback thread