Missionary Util library
https://github.com/clojure-quant/quanta-market/blob/main/src/quanta/market/util.clj
I do not understand the purpose of wrap-blk and split-seq-flow
Split-seq-flow transforms a flow of seqs to a flow of values. There are many streaming APIs that send out batches of updates that in reality just represent a stream of updates. I had issues of not understanding m/amb well which resulted in producing unwanted nils on the flow when unbatching. So split-seq-flow should be named unbatch-flow.
wrap-blk because it is shorter syntax for a task, and with less chance to fuck up something. I added it because missionary docs give example of how to wrap a blocking function, and I many times I was confused how to do it for a task.
split-seq-flow is seq -> flow and is approximately equivalent to m/seed
wrap-blk is task -> task and doesn't change the meaning of its input, m/via and m/? cancel each other
> split-seq-flow transforms (m/seed [ [1 2] [3 4] [5]]) to (m/seed [1 2 3 4 5])
That's not what your implementation does. Also, what you describe can be just #(m/eduction cat %)
> offload a task to a separate thread
It doesn't really mean anything. A task process can run on many threads successively, depending on how it's defined, but generally operators are written in such a way that it doesn't matter (i.e. no blocking/expensive operations). It's not possible to redefine on which thread a task process should be run after it's defined. The best you can do is to decide on which thread it's spawned, but again you normally don't need to do that.
> (m/via m/cpu (+ 1 1)) offloads the addition to a separate thread
Yes.
> So if I want to offload a task don't I have to write (m/via m/cpu (m/? task)) ?
No. If the task has blocking/expensive function calls inside then these calls must be wrapped right off the bat with m/via and then be composed, e.g. with m/sp
> And the result of m/via is a task, no?
Yes.
@leonoel split-seq-flow transforms (m/seed [ [1 2] [3 4] [5]]) to (m/seed [1 2 3 4 5]). I have seem a lot of data publishers that send out a stream of updates, but to reduce the number of messages sent on the wire they send updates grouped together. For the application logic this grouping needs to be undone. The best name for this function is perhaps unbatch-flow? I don't see where it relates to m/seed, as it is essentially a flow transformation operation.
@leonoel So in regards to wrap-blk .. what you say means that I would have to rewrite wrap-blk as a macro if I wanted to have some kind of shortcut to offload a task to a separate thread?
I guess I still dont get it how to use (m/via m/cpu body). I understand that (m/via m/cpu (+ 1 1)) offloads the addition to a separate thread. Or in other words m/via expects a blocking expression as body. So if I want to offload a task don't I have to write (m/via m/cpu (m/? task)) ?
And the result of m/via is a task, no?
I want to share here my namespace of Missionary Utility functions.
start! stop! I use a lot to start and stop a task; make repl driven development easier.
Start-flow-logger is handy to spit all messages from a flow to a file. Also useful for repl development.
Rlock and with-lock is a reentrantlock wrapper that works I think identically to m/sem and m/holding.
Flow-Sender I use for Services that Provide a flow that can be consumed.
This are all very simple functions/macros. And I dont know if they are 100% okay. But they help me in using missionary in my project.
I need to work around a weird behavior if missionary: when I m/? await a task all works fine. But when I use the same task in a flow .. then awaiting the task does not happen.
@leonoel this issue got completely resolved by using reentrantlock.
@leonoel thanks for your help and Support- Highly appreciated! On the m/holding issue I believe that it should be possible to detect errors like this somehow in the m/sp and m/ap macro. I know how hard it is to write macros and I realize what has been achieved in this library already; so I completely get it that not all stupidity of the user can be safeguarded. It Was just a weird behavior that when I wrote the task and tested the task it was working fine. And when I used this task in a flow I got the fuckup. This would not have happened with synchronus clojure code. That all.
And thanks for this library! It's a very interesting approach, whose potential implications are huge in many areas! Missionary really could be a missionary for writing very different codebases.
I agree it's possible in principle to detect indirect calls to ? from sp and ap, and I also agree it's a serious inconvenience. However I do not think it's possible with the current design of cloroutine. That being said, a major rewrite is required by https://github.com/leonoel/missionary/issues/109 so it will eventually happen and we could address this issue in the new design, as well as the sci support.
(defn place-order-near-market "returns a missionary task that returns a places a limit-order near the last trade whose limit is diff percentage better than the last trade trade price received using feed :feed for order :asset :side" [{:keys [qm pm]} order-feed-diff] (let [order-create-t (limit-order-near-market qm order-feed-diff) ; order-place-t ; cannot use the let trick here. ] (m/sp (let [order (m/? order-create-t)] (warn "will place order: " order) (m/? (p/order-create! pm order))))))
The above is the fn that returns the task. From what I have seen in my Experiments with missionary my error would get away if I could move (p/order-create! pm order) into the let binding outside of m/sp. My issue is that when I run this task from inside a m/ap then the task returns nil. The line (warn ) Logs the order. But the value returned to the m/ap immediately returns nil.
(defn create-entry-f [{:keys [qm pm] :as env} {:keys [account qty feed diff] :as robot-opts} signal-f] (assert qm "entry-robot needs :qm") (assert pm "entry-robot needs :pm") (assert account "entry-robot needs :account") (assert qty "entry-robot needs :qty") (assert feed "entry-robot needs :feed") (assert diff "entry-robot needs :diff") (m/ap (let [signal (m/?> signal-f) {:keys [asset side]} signal order-feed-diff (merge robot-opts signal) place-order-t (place-order-near-market env order-feed-diff) ;place-order-t (wrap-blk place-order-t) ; this does not change anything. order (m/? place-order-t)] (warn "order placed: " order) {:signal signal :order-feed-diff order-feed-diff :order order})))
This is the m/ap returning fn. (warn "order placed" order) immediately logs nil.
And the Output of the flow returns {:order nil :signal :good :order-feed-diff :good}
:good means all ok with this value.
(m/? (place-order-near-market {:qm qm :pm pm} {:asset "BTCUSDT" :side :buy :account :rene/test4 :qty 0.001 :feed :bybit :diff 0.1} ))
The above test using the task works fine.
(def signal-flow-sender (flow-sender)) (defn send-signal [signal] (warn "new signal: " signal) ((:send signal-flow-sender) signal)) (def robot (start-entry-robot {:qm qm :pm pm} {:account :rene/test4 :qty 0.001 :feed :bybit :diff 0.001} (:flow signal-flow-sender) ".data/robot-entry.txt")) (comment (send-signal {:asset "BTCUSDT" :side :buy}) (send-signal {:asset "BTCUSDT" :side :sell})
This is the test of the m/ap that does not work as it should.
Thevtask that gets awaited from the m/ap dies get run correctly. However the m/? does not wait for the task to finish. Instead it immediately returns nil. And the started task continues running until it finishes.
I had similar issue with missionary.. and They went away when I moved creation of tasks to let binding outside of m/sp. My issue is in this case this approach does not work as I need the result of the first task to create and await the second task.
@leonoel I still have the code of "m/holding being called from an m/sp block" that is somewhere in the chain of flows that are being used. Could this be the culprit as well here?