Fork me on GitHub
#missionary
<
2021-08-02
>
ribelo07:08:03

I've read all available materials, including tests, and I've learned how to do very complex things, but the simplest things are still not clear to me. I haven't dealt with RxJava, but I use RxJS with beicon, so basically the same thing. Anyway, how can I start a process/stream running in the background? The simplest example from beicon.

(def a (atom 1))

(def stream (rx/from-atom a))

(rx/sub! stream #(println "v:" %))
(swap! a inc)
;; ==> v: 2

ribelo07:08:50

all examples found in the missionary documentation that use flow ends with (m/? (m/reduce ..., which blocks the main thread.

leonoel08:08:53

if you want to refer to a running flow (instead of the recipe), the tool you need is the reactor

leonoel08:08:23

there's still no tutorial for it, so let me explain

ribelo08:08:02

I've seen in tests how reactor is more or less used, but I don't know how to apply it.

leonoel08:08:18

reactor is a kind of supervisor for a dynamic dag, you run it in your app entrypoint, it takes a boot function

leonoel08:08:41

inside the boot function, you can call signal! and stream!(resp. for continuous and discrete flows), it will run the flow and return another flow that is a subscription to the running process

ribelo08:08:23

in this case, an example will probably say more than 1000 words ; )

ribelo08:08:53

if it's not a problem, of course

leonoel08:08:39

(def a (atom 1))
(def app
  (m/reactor
    (let [>a (m/signal! (m/watch a))]
      (m/stream! (m/ap (println "v:" (m/?> >a)))))))
(def cancel (app prn prn))                                  ;; "v: 1"
(swap! a inc)                                               ;; "v: 2"

ribelo08:08:10

I am a journalist by education, and while I can grasp many things conceptually through examples, I am quite deficient in understanding terminology.

ribelo08:08:08

Okay, I get it!

leonoel08:08:10

m/watch is rx/from-atom, returns continuous flow m/signal! runs the flow and assigns an identity to it, ie subscribe instead of re-run m/stream! is the same, but this time it's discrete because you want the result of side-effects

leonoel08:08:42

are you familiar with the hot/cold distinction in Rx ?

ribelo08:08:07

If I remember correctly, hot observables push data constantly, regardless of whether there is a subscriber, cold, only if there is a subscriber

leonoel08:08:18

here the approach is slightly different but the insight is close : by default you work on the recipe, it's pure functional programming and nothing happens until you run the full pipeline. It works fine until you want to share some part of the computation to multiple subscribers. At this point it's not a tree anymore, it's a dag, and it comes with more challenges

leonoel08:08:41

glitch-free propagation is one of them, Rx doesn't support that which is why it's often dismissed as proper FRP

leonoel08:08:59

Here is another example, a diamond shape typical of DAGs :

(def a (atom 1))
(def app
  (m/reactor
    (let [>a (m/signal! (m/watch a))
          >b (m/signal! (m/latest + >a >a))]
      (m/stream! (m/ap (println "v:" (m/?> >b)))))))
(def cancel (app prn prn))                                  ;; "v: 2"
(swap! a inc)                                               ;; "v: 4"
A glitch-free engine writes 2 then 4, Rx would write 2, 3, then 4

ribelo23:08:09

@leonoel Is it possible to do similar thing but using externally declared flows?

(def a_ (atom 0))

(def >a (m/ap (m/?> (m/signal! (m/watch a_)))))

(def >b (m/ap (m/?> (m/signal! (m/latest + >a >a)))))

(def >c (m/ap (m/?> (m/signal! (m/latest + >a >a)))))

(def >d (m/ap (m/?> (m/signal! (m/latest + >b >c)))))

(let [app (m/reactor (m/stream! (m/ap (println :d (m/?> (m/signal! >d))))))]
  (def dispose (app prn prn)))

(swap! a_ inc)
(dispose)

ribelo23:08:25

; prints
:d 0
:d 1
:d 2
:d 3
:d 4

ribelo23:08:47

This is a concrete problem, not an imaginary one. Let's say I want to replicate a reagent/re-frame. I have one state atom, a flow from it, and many subscriptions returning flows that operate on previous returned flows. It's probably possible, but I'm running out of ideas.

leonoel07:08:16

I don't fully understand the problem

leonoel07:08:50

could you point me to a reagent example ?

ribelo09:08:46

What I mean is whether it is possible that in the given example the last element is recalculated once.

leonoel09:08:24

Here is another example

(def a_ (atom 0))

(defn graph [>input]
  (let [>a (m/signal! >input)
        >b (m/signal! (m/latest + >a >a))
        >c (m/signal! (m/latest + >a >a))
        >d (m/signal! (m/latest + >b >c))]
    >d))

(def app
  (m/reactor
    (m/stream! (m/ap (println "v:" (m/?> (graph (m/watch a_))))))))

(def cancel (app prn prn))

(swap! a_ inc)

leonoel09:08:28

what I don't understand is why do you want to declare externally

ribelo09:08:12

therefore, because I would want one flow to be used several times by others.

ribelo09:08:44

I would like to do something like a reagent make-reaction

ribelo09:08:59

and finally I would like to connect the React Hook to the Missionary Flow

ribelo10:08:51

proof of concept written quickly.

ribelo10:08:22

there is a significant possibility that what I am trying to do is senseless. 🙃

leonoel16:08:30

Here is a working example. I can't help much without more context, a few remarks though : • (m/ap (m/?> flow)) is the same thing as flow, if you want to defer evaluation, just use functions • (m/latest identity flow) is the same thing as flow

(defn spawn! [on-done! on-fail! on-change! ctor >db]
  ((m/reactor (m/stream! (m/ap (on-change! (m/?> (ctor (m/signal! >db))))))) on-done! on-fail!))

(defn cursor [k >f]
  (->> >f
    (m/eduction (map k) (dedupe))
    (m/relieve {})))

(def >div (partial m/latest (partial vector :div)))

(defn component [>db]
  (let [>first-name (m/signal! (cursor :first-name >db))
        >last-name  (m/signal! (cursor :last-name >db))
        >full-name  (m/signal! (m/latest #(str %1 " " %2) >first-name >last-name))
        >email      (m/signal! (m/latest #(str % "@.") >full-name))]
    (m/signal!
      (>div
        (>div >first-name)
        (>div >last-name)
        (>div >email)))))

(comment

  (def db_ (atom {:first-name "grzegorz"
                  :last-name  "brzęczyszczykiewicz"}))

  ;; in the component initialization logic
  (let [tmp (useState nil)
        state (aget tmp 0)
        cancel (spawn!
                 (fn [_] (println "component terminated"))
                 (fn [e] (println "component failed" e))
                 (aget tmp 1) component (m/watch db_))]
    ;; TODO register cancel function on unmount callback
    ;; TODO use state variable somehow, it will react when db_ changes

    )
  )

ribelo17:08:48

Thanks! It's great. Thanks for being patient with me 😉

ribelo08:08:17

I need to wrap my head around this

ribelo08:08:49

But thanks for giving me your attention, I'm going to try to rewrite the tutorials from RxJS using missionary and it will probably click in my head at some point.

👍 2
martinklepsch10:08:22

I saw that https://cljdoc.org/d/missionary/missionary/b.20/api/missionary.core#gather is deprecated but the docstring doesn’t mention what it’s replaced with. Is there a replacement?

leonoel10:08:32

(m/gather f g h)
(m/ap (m/?> (m/amb= f g h)))
gather was written before ?=, which is more powerful so I see no point having both

martinklepsch10:08:38

Ah interesting, I’ll have a look at that then

martinklepsch10:08:57

BTW, I have a few notes here that I took while exploring missionary! https://gatheround.slab.com/public/posts/9mpig8sv

👀 4
ribelo17:08:03

I'm breaking through slowly but can't figure out how to use publisher & subscribe, @leonoel would you have a moment to give a hint?

ribelo18:08:36

Is missionary pub/sub the same as pub/sub with core.async?

ribelo18:08:21

can I create one flow, and in many places in the code connect subscribers who will respond on specific topics?

leonoel18:08:54

there's no equivalent to core.async's pub/sub currently, but depending on your use case it should be possible to emulate it

leonoel18:08:07

is this for learning purpose or do you have an actual problem to solve ?

ribelo18:08:33

learning purpose

ribelo18:08:24

before I do anything, I try to understand every function I find in the documentation

leonoel19:08:29

publisher and subscribe are for interop with reactive streams, it's not really about the pub/sub pattern

ribelo19:08:24

which means I can just ignore it

ribelo19:08:35

I think I've figured out the rest

ribelo19:08:21

I'm actually looking forward to hyperfiddle.photon, because there it will probably be possible to see all the ways in which missionary can be used

ribelo19:08:54

thanks leonoel!

👍 2
leonoel19:08:41

photon is a really cool project, stay tuned !

ribelo19:08:43

I think everyone in the clojure community dreams of kicking react out of the ecosystem. An elm like framework in pure clojure could be a killer feature

ribelo23:08:36

I have an additional problem with the correct use of observer with js promise, can you give me an example?