I got a weird behavior if I take one or two values from a signal:
m/join boots all of its inputs sequentially and synchronously on boot.
(m/reduce conj [] (m/eduction (take 1) conn)) performs the following actions synchronously on boot :
• subscribes to conn
• transfers one value
• unsubscribes to conn
• terminates
In your example, the first input spawns the connection and cancels it immediately because there's no other subscription. If you want to ensure the connection is kept alive, you have to concurrently spawn a subscription with a larger lifespan.
Thanks for explaining the internals of join @leonoel I have naively assumed that all tasks would be started in parallel.
(ns demo.connection.signal (:require [nano-id.core :refer [nano-id]] [missionary.core :as m])) (def >conn (m/signal (m/ap (println "#### creating conn") (loop [c (nano-id 4)] (m/amb c (do (m/? (m/compel (m/sleep 10000 c))) (println "#### re-connecting .. ") (recur (nano-id 4)))))))) (m/? (m/join vector (m/reduce conj [] (m/eduction (take 1) >conn)) (m/reduce conj [] (m/eduction (take 2) >conn)) (m/sp (m/? (m/sleep 1)) (m/? (m/reduce conj [] (m/eduction (take 2) >conn))))) ) ;; => [["60Jb"] ["TO0J" "sdAK"] ["TO0J" "sdAK"]]
When I just take one value from the eduction then it starts the signal process two times.
What I have expected is that m/signal keeps Track of all subscribers .. and therefore the result for take1 should be the same as the first entry of take2.
It Seems to me that m/join does start the tasks sequentially .. and the the first value goes away so fast that the signal publishers gets shutdown before the next task gets started.
(m/? (m/join vector (m/reduce conj [] (m/eduction (take 2) conn)) (m/reduce conj [] (m/eduction (take 1) conn)) (m/reduce conj [] (m/eduction (take 2) conn)) (m/reduce conj [] (m/eduction (take 2) conn)) (m/reduce conj [] (m/eduction (take 2) conn)) (m/reduce conj [] (m/eduction (take 1) conn)) (m/reduce conj [] (m/eduction (take 1) conn)) (m/reduce conj [] (m/eduction (take 1) conn)) (m/sp (m/? (m/sleep 1)) (m/? (m/reduce conj [] (m/eduction (take 2) conn))))))
When the first eduction takes 2 values .. then all the values returned are as they should be.
Its surprising to me that m/join has this sort of unpredictable behavior.