This page is not created by, affiliated with, or supported by Slack Technologies, Inc.
2021-08-02
Channels
- # announcements (3)
- # aws (2)
- # babashka (60)
- # beginners (21)
- # cljs-dev (35)
- # cljsrn (3)
- # clojure (53)
- # clojure-android (2)
- # clojure-australia (3)
- # clojure-europe (45)
- # clojure-france (4)
- # clojure-nl (4)
- # clojure-uk (6)
- # clojurescript (33)
- # core-typed (1)
- # cursive (13)
- # datomic (6)
- # duct (1)
- # emacs (2)
- # fulcro (10)
- # introduce-yourself (3)
- # jobs (2)
- # jobs-discuss (13)
- # leiningen (1)
- # malli (19)
- # missionary (63)
- # music (1)
- # off-topic (21)
- # pathom (3)
- # polylith (18)
- # practicalli (12)
- # proletarian (1)
- # reagent (40)
- # reitit (23)
- # releases (1)
- # remote-jobs (1)
- # ring (14)
- # ring-swagger (1)
- # shadow-cljs (13)
- # sql (30)
- # testing (27)
- # tools-deps (31)
- # vim (10)
- # xtdb (4)
I've read all available materials, including tests, and I've learned how to do very complex things, but the simplest things are still not clear to me.
I haven't dealt with RxJava
, but I use RxJS
with beicon
, so basically the same thing.
Anyway, how can I start a process/stream running in the background? The simplest example from beicon
.
(def a (atom 1))
(def stream (rx/from-atom a))
(rx/sub! stream #(println "v:" %))
(swap! a inc)
;; ==> v: 2
all examples found in the missionary
documentation that use flow
ends with (m/? (m/reduce ...
, which blocks the main thread.
if you want to refer to a running flow (instead of the recipe), the tool you need is the reactor
I've seen in tests how reactor is more or less used, but I don't know how to apply it.
reactor
is a kind of supervisor for a dynamic dag, you run it in your app entrypoint, it takes a boot function
inside the boot function, you can call signal!
and stream!
(resp. for continuous and discrete flows), it will run the flow and return another flow that is a subscription to the running process
(def a (atom 1))
(def app
(m/reactor
(let [>a (m/signal! (m/watch a))]
(m/stream! (m/ap (println "v:" (m/?> >a)))))))
(def cancel (app prn prn)) ;; "v: 1"
(swap! a inc) ;; "v: 2"
I am a journalist by education, and while I can grasp many things conceptually through examples, I am quite deficient in understanding terminology.
m/watch
is rx/from-atom
, returns continuous flow
m/signal!
runs the flow and assigns an identity to it, ie subscribe instead of re-run
m/stream!
is the same, but this time it's discrete because you want the result of side-effects
If I remember correctly, hot observables push data constantly, regardless of whether there is a subscriber, cold, only if there is a subscriber
here the approach is slightly different but the insight is close : by default you work on the recipe, it's pure functional programming and nothing happens until you run the full pipeline. It works fine until you want to share some part of the computation to multiple subscribers. At this point it's not a tree anymore, it's a dag, and it comes with more challenges
glitch-free propagation is one of them, Rx doesn't support that which is why it's often dismissed as proper FRP
Here is another example, a diamond shape typical of DAGs :
(def a (atom 1))
(def app
(m/reactor
(let [>a (m/signal! (m/watch a))
>b (m/signal! (m/latest + >a >a))]
(m/stream! (m/ap (println "v:" (m/?> >b)))))))
(def cancel (app prn prn)) ;; "v: 2"
(swap! a inc) ;; "v: 4"
A glitch-free engine writes 2 then 4, Rx would write 2, 3, then 4@leonoel Is it possible to do similar thing but using externally declared flows?
(def a_ (atom 0))
(def >a (m/ap (m/?> (m/signal! (m/watch a_)))))
(def >b (m/ap (m/?> (m/signal! (m/latest + >a >a)))))
(def >c (m/ap (m/?> (m/signal! (m/latest + >a >a)))))
(def >d (m/ap (m/?> (m/signal! (m/latest + >b >c)))))
(let [app (m/reactor (m/stream! (m/ap (println :d (m/?> (m/signal! >d))))))]
(def dispose (app prn prn)))
(swap! a_ inc)
(dispose)
This is a concrete problem, not an imaginary one. Let's say I want to replicate a reagent/re-frame.
I have one state atom, a flow from it, and many subscriptions
returning flows that operate on previous returned flows. It's probably possible, but I'm running out of ideas.
What I mean is whether it is possible that in the given example the last element is recalculated once.
Here is another example
(def a_ (atom 0))
(defn graph [>input]
(let [>a (m/signal! >input)
>b (m/signal! (m/latest + >a >a))
>c (m/signal! (m/latest + >a >a))
>d (m/signal! (m/latest + >b >c))]
>d))
(def app
(m/reactor
(m/stream! (m/ap (println "v:" (m/?> (graph (m/watch a_))))))))
(def cancel (app prn prn))
(swap! a_ inc)
Here is a working example. I can't help much without more context, a few remarks though :
• (m/ap (m/?> flow))
is the same thing as flow
, if you want to defer evaluation, just use functions
• (m/latest identity flow)
is the same thing as flow
(defn spawn! [on-done! on-fail! on-change! ctor >db]
((m/reactor (m/stream! (m/ap (on-change! (m/?> (ctor (m/signal! >db))))))) on-done! on-fail!))
(defn cursor [k >f]
(->> >f
(m/eduction (map k) (dedupe))
(m/relieve {})))
(def >div (partial m/latest (partial vector :div)))
(defn component [>db]
(let [>first-name (m/signal! (cursor :first-name >db))
>last-name (m/signal! (cursor :last-name >db))
>full-name (m/signal! (m/latest #(str %1 " " %2) >first-name >last-name))
>email (m/signal! (m/latest #(str % "@.") >full-name))]
(m/signal!
(>div
(>div >first-name)
(>div >last-name)
(>div >email)))))
(comment
(def db_ (atom {:first-name "grzegorz"
:last-name "brzęczyszczykiewicz"}))
;; in the component initialization logic
(let [tmp (useState nil)
state (aget tmp 0)
cancel (spawn!
(fn [_] (println "component terminated"))
(fn [e] (println "component failed" e))
(aget tmp 1) component (m/watch db_))]
;; TODO register cancel function on unmount callback
;; TODO use state variable somehow, it will react when db_ changes
)
)
But thanks for giving me your attention, I'm going to try to rewrite the tutorials from RxJS
using missionary and it will probably click in my head at some point.
I saw that https://cljdoc.org/d/missionary/missionary/b.20/api/missionary.core#gather is deprecated but the docstring doesn’t mention what it’s replaced with. Is there a replacement?
(m/gather f g h)
(m/ap (m/?> (m/amb= f g h)))
gather
was written before ?=
, which is more powerful so I see no point having bothAh interesting, I’ll have a look at that then
BTW, I have a few notes here that I took while exploring missionary! https://gatheround.slab.com/public/posts/9mpig8sv
I'm breaking through slowly but can't figure out how to use publisher
& subscribe
, @leonoel would you have a moment to give a hint?
can I create one flow, and in many places in the code connect subscribers who will respond on specific topics?
there's no equivalent to core.async's pub/sub currently, but depending on your use case it should be possible to emulate it
publisher
and subscribe
are for interop with reactive streams, it's not really about the pub/sub pattern
I'm actually looking forward to hyperfiddle.photon
, because there it will probably be possible to see all the ways in which missionary can be used