Fork me on GitHub
#missionary
<
2022-07-05
>
Richie19:07:28

I'm still trying to understand "switch" vs "concat", especially https://clojurians.slack.com/archives/CL85MBPEF/p1656877751239169?thread_ts=1656873299.996749&amp;cid=CL85MBPEF.

((->> (m/ap (try
              (let [x (m/?>             ; first
                       (m/seed (range)))]
                (m/?<                   ;second
                 (m/seed (range 3)))
                x)
              (catch Cancelled _)))
      (m/eduction (take 20))
      (m/reduce conj))
 prn prn)
;; ?< ?< => [nil nil nil nil nil nil nil nil nil nil nil nil nil nil nil nil nil nil nil nil]
;; ?< ?> => #object[TypeError TypeError: Cannot set properties of undefined (setting 'parent')]
;; ?> ?> => [0 0 0 1 1 1 2 2 2 3 3 3 4 4 4 5 5 5 6 6]
;; ?> ?< => [0 0 0 1 1 1 2 2 2 3 3 3 4 4 4 5 5 5 6 6]
The first concat is on a flow that never ends. I think maybe this isn't an example of what Dustin was talking about since he said "continuous" and this is discrete. I didn't find docs on "concatMap" but I did look at "https://reactivex.io/documentation/operators/flatmap.html". "FlatMap merges the emissions of these Observables, so that they may interleave" Does "concatMap" merge emissions such that they don't interleave and that's why it would need the observable to terminate?

Richie19:07:15

((->> (m/ap (try
              (let [x (m/?>             ; first
                       (m/seed (range 3)))]
                (m/?>                   ;second
                 (m/seed (range)))
                x)
              (catch Cancelled _)))
      (m/eduction (take 20))
      (m/reduce conj))
 prn prn)
;; ?< ?< => [nil nil 2 2 2 2 2 2 2 2 2 2 2 2 2 2 2 2 2 2]
;; ?< ?> => #object[TypeError TypeError: Cannot set properties of undefined (setting 'parent')]
;; ?> ?> => [0 0 0 0 0 0 0 0 0 0 0 0 0 0 0 0 0 0 0 0]
;; ?> ?< => [0 0 0 0 0 0 0 0 0 0 0 0 0 0 0 0 0 0 0 0]
This makes sense. If I concat the second flow and it doesn't terminate then I can't progress the first flow.

Richie19:07:42

Why do I get an error if I switch and then concat?

leonoel19:07:01

it may be a bug

leonoel19:07:27

which version ?

Richie20:07:29

"b.27-SNAPSHOT" cljs

Richie20:07:37

I can pull a new snapshot if you want.

leonoel20:07:28

I do reproduce it, it is a bug indeed

Richie20:07:05

Ok, cool. Thanks.

leonoel20:07:09

the result should be the same as ?< ?<

Richie20:07:03

Ok, thanks.

leonoel20:07:36

with @U09K620SG’s switch it would be [2 2 2 2 2 2 2 2 2 2 2 2 2 2 2 2 2 2 2 2]

Richie22:07:53

Why does this fail? The commented code gives the commented error. No error with the uncommented code.

Richie22:07:49

Oh, does signal! not return the flow?

Dustin Getz23:07:12

it does and you can, it requires a reactor in dynamic scope

Dustin Getz23:07:46

i don't see the error, but on mobile and the code doesn't format well

Richie23:07:24

(let [count (atom 0)
      db (atom {})]
  ((m/reactor
    (m/stream!
     (m/ap
      (try
        (let [count (:count (m/?< (m/watch db)))]
          (when count
            (println "current count:" (m/?< count))))
        (catch Cancelled _))))
    (swap! db assoc :count (m/signal! (m/watch count))))
   prn prn))
;; #object[Error Error: Subscription failure : not in publisher context.]

👀 1
Richie23:07:43

Here is the code with the error and more newlines.

Dustin Getz23:07:23

sorry i meant i don't see a mistake

Dustin Getz00:07:37

this is subtle, i think the swap! may be initially running before the reactor is booted? the reactor call probably evaluates the body to a flow (IO action /thunk) and then boots the reactor around that flow. so the unmanaged swap is a side effect and it happens too soon

Dustin Getz00:07:26

move the swap into a m/ap block, or move the m/signal! into the m/ap block

Dustin Getz00:07:33

it's also a code smell to swap so close to system startup, we are trying to do functional programming here so the swap should probably be entirely external to the system (the system can respond through watches on the atom). or the swap can be managed in an m/ap block like the println - but i think that would be uncommon, there is usually a way to eliminate the atom

Richie00:07:40

https://github.com/rgkirch/missionary-help/blob/c461071266678069b42bdba23f8d63d4b2aa5de8/src/missionary_help/core.cljs#L200 what I'm really trying to do. I'm swapping into mutable state in my reactor boot function. Dom attributes and properties that consume a flow watch the mutable state for a flow by :id. Dom event listeners e.g. :on-click swap! a flow into the mutable state under :id. It's my first attempt at wiring things together. However smelly it is, haha, it should work...

Richie01:07:20

Thank you for your input. I'll try and figure out if what you described is what's giving me trouble.

leonoel08:07:43

There are two errors here. 1. The error you get means the subscription doesn't happen as a side-effect of a node reaction. It is not allowed because the reactor must know the subscriber node to register the subscription in the dag. In this case, the subscription is a side-effect of the swap!, which is performed directly in the reactor boot function so the reactor has no way to bind it to the stream. To fix this, wrap the db watch in a signal. The reactor now has a subscription registered from the stream to the db, so whenever a swap! happens the reaction will be bound to the stream.

(let [count (atom 0)
      db (atom {})]
  ((m/reactor
     (m/stream!
       (m/ap
         (try
           (let [count (:count (m/?< (m/signal! (m/watch db))))]
             (when count
               (println "current count:" (m/?< count))))
           (catch Cancelled _))))
     (swap! db assoc :count (m/signal! (m/watch count))))
   prn prn))
However, if you run this the reactor complains about cyclic dependencies, this is the second error. 2. The subscription is in the wrong direction. The reactor enforces the dag structure by maintaining a total order on nodes and forbidding subscriptions inconsistent with it. In this case, the subscription is forbidden because the signal was created after the stream in the boot function. One possible fix is to declare the signal before :
(let [count (atom 0)
      db (atom {})]
  ((m/reactor
     (let [<count (m/signal! (m/watch count))]
       (m/stream!
         (m/ap
           (try
             (let [count (:count (m/?< (m/signal! (m/watch db))))]
               (when count
                 (println "current count:" (m/?< count))))
             (catch Cancelled _))))
       (swap! db assoc :count <count)))
   prn prn))
It is also possible to declare the signal in reaction to the db change, the reactor then considers it as a child of the stream and sees it as inferior in the total order, allowing further subscriptions :
(let [count (atom 0)
      db (atom {})]
  ((m/reactor
     (m/stream!
       (m/ap
         (try
           (let [count (:count (m/?< (m/signal! (m/watch db))))]
             (when count
               (println "current count:" (m/?< (m/signal! count)))))
           (catch Cancelled _))))
     (swap! db assoc :count (m/watch count)))
   prn prn))

👀 1