This page is not created by, affiliated with, or supported by Slack Technologies, Inc.
2022-07-05
Channels
- # announcements (1)
- # asami (21)
- # aws (19)
- # babashka (37)
- # beginners (38)
- # clj-kondo (7)
- # clj-otel (8)
- # clojure (29)
- # clojure-europe (54)
- # clojure-nl (3)
- # clojure-spec (2)
- # clojure-uk (2)
- # clojurescript (15)
- # conjure (1)
- # data-science (1)
- # datomic (21)
- # emacs (6)
- # events (3)
- # figwheel-main (1)
- # gratitude (13)
- # holy-lambda (11)
- # joyride (6)
- # klipse (3)
- # malli (14)
- # missionary (26)
- # nbb (31)
- # omni-trace (2)
- # pathom (3)
- # reagent (1)
- # reitit (1)
- # releases (1)
- # shadow-cljs (24)
- # sql (27)
- # tools-deps (4)
- # vim (21)
I'm still trying to understand "switch" vs "concat", especially https://clojurians.slack.com/archives/CL85MBPEF/p1656877751239169?thread_ts=1656873299.996749&cid=CL85MBPEF.
((->> (m/ap (try
(let [x (m/?> ; first
(m/seed (range)))]
(m/?< ;second
(m/seed (range 3)))
x)
(catch Cancelled _)))
(m/eduction (take 20))
(m/reduce conj))
prn prn)
;; ?< ?< => [nil nil nil nil nil nil nil nil nil nil nil nil nil nil nil nil nil nil nil nil]
;; ?< ?> => #object[TypeError TypeError: Cannot set properties of undefined (setting 'parent')]
;; ?> ?> => [0 0 0 1 1 1 2 2 2 3 3 3 4 4 4 5 5 5 6 6]
;; ?> ?< => [0 0 0 1 1 1 2 2 2 3 3 3 4 4 4 5 5 5 6 6]
The first concat is on a flow that never ends. I think maybe this isn't an example of what Dustin was talking about since he said "continuous" and this is discrete. I didn't find docs on "concatMap" but I did look at "https://reactivex.io/documentation/operators/flatmap.html". "FlatMap merges the emissions of these Observables, so that they may interleave" Does "concatMap" merge emissions such that they don't interleave and that's why it would need the observable to terminate?((->> (m/ap (try
(let [x (m/?> ; first
(m/seed (range 3)))]
(m/?> ;second
(m/seed (range)))
x)
(catch Cancelled _)))
(m/eduction (take 20))
(m/reduce conj))
prn prn)
;; ?< ?< => [nil nil 2 2 2 2 2 2 2 2 2 2 2 2 2 2 2 2 2 2]
;; ?< ?> => #object[TypeError TypeError: Cannot set properties of undefined (setting 'parent')]
;; ?> ?> => [0 0 0 0 0 0 0 0 0 0 0 0 0 0 0 0 0 0 0 0]
;; ?> ?< => [0 0 0 0 0 0 0 0 0 0 0 0 0 0 0 0 0 0 0 0]
This makes sense. If I concat the second flow and it doesn't terminate then I can't progress the first flow.with @U09K620SG’s switch it would be [2 2 2 2 2 2 2 2 2 2 2 2 2 2 2 2 2 2 2 2]
Why does this fail? The commented code gives the commented error. No error with the uncommented code.
I saw https://gist.github.com/leonoel/c5e32b65ec7b6ab4b5b45772082a3d85#file-pump-clj-L121 which made me think that I could throw it in wherever.
it does and you can, it requires a reactor in dynamic scope
i don't see the error, but on mobile and the code doesn't format well
(let [count (atom 0)
db (atom {})]
((m/reactor
(m/stream!
(m/ap
(try
(let [count (:count (m/?< (m/watch db)))]
(when count
(println "current count:" (m/?< count))))
(catch Cancelled _))))
(swap! db assoc :count (m/signal! (m/watch count))))
prn prn))
;; #object[Error Error: Subscription failure : not in publisher context.]
sorry i meant i don't see a mistake
this is subtle, i think the swap! may be initially running before the reactor is booted? the reactor call probably evaluates the body to a flow (IO action /thunk) and then boots the reactor around that flow. so the unmanaged swap is a side effect and it happens too soon
move the swap into a m/ap block, or move the m/signal! into the m/ap block
it's also a code smell to swap so close to system startup, we are trying to do functional programming here so the swap should probably be entirely external to the system (the system can respond through watches on the atom). or the swap can be managed in an m/ap block like the println - but i think that would be uncommon, there is usually a way to eliminate the atom
https://github.com/rgkirch/missionary-help/blob/c461071266678069b42bdba23f8d63d4b2aa5de8/src/missionary_help/core.cljs#L200 what I'm really trying to do. I'm swapping into mutable state in my reactor boot function. Dom attributes and properties that consume a flow watch the mutable state for a flow by :id. Dom event listeners e.g. :on-click swap! a flow into the mutable state under :id. It's my first attempt at wiring things together. However smelly it is, haha, it should work...
Thank you for your input. I'll try and figure out if what you described is what's giving me trouble.
There are two errors here.
1. The error you get means the subscription doesn't happen as a side-effect of a node reaction. It is not allowed because the reactor must know the subscriber node to register the subscription in the dag. In this case, the subscription is a side-effect of the swap!
, which is performed directly in the reactor boot function so the reactor has no way to bind it to the stream. To fix this, wrap the db watch in a signal. The reactor now has a subscription registered from the stream to the db, so whenever a swap! happens the reaction will be bound to the stream.
(let [count (atom 0)
db (atom {})]
((m/reactor
(m/stream!
(m/ap
(try
(let [count (:count (m/?< (m/signal! (m/watch db))))]
(when count
(println "current count:" (m/?< count))))
(catch Cancelled _))))
(swap! db assoc :count (m/signal! (m/watch count))))
prn prn))
However, if you run this the reactor complains about cyclic dependencies, this is the second error.
2. The subscription is in the wrong direction. The reactor enforces the dag structure by maintaining a total order on nodes and forbidding subscriptions inconsistent with it. In this case, the subscription is forbidden because the signal was created after the stream in the boot function. One possible fix is to declare the signal before :
(let [count (atom 0)
db (atom {})]
((m/reactor
(let [<count (m/signal! (m/watch count))]
(m/stream!
(m/ap
(try
(let [count (:count (m/?< (m/signal! (m/watch db))))]
(when count
(println "current count:" (m/?< count))))
(catch Cancelled _))))
(swap! db assoc :count <count)))
prn prn))
It is also possible to declare the signal in reaction to the db change, the reactor then considers it as a child of the stream and sees it as inferior in the total order, allowing further subscriptions :
(let [count (atom 0)
db (atom {})]
((m/reactor
(m/stream!
(m/ap
(try
(let [count (:count (m/?< (m/signal! (m/watch db))))]
(when count
(println "current count:" (m/?< (m/signal! count)))))
(catch Cancelled _))))
(swap! db assoc :count (m/watch count)))
prn prn))