Out of curiosity, the comment https://github.com/leonoel/missionary/issues/37 stays that > ap already covers the fan-in, therefore it should be enough to implement fan-out. What’s meant by “covers fan-in” here? Also maybe there’s an example somewhere about implementing pubsub like thing? Say, user A and user B are conditionally interested in flow of posts F
Here is a proof of concept
(defn stream-groups "
For each key+flow pair consumed from input, call given function with the key, the flow published as a stream, and
optional arguments. Merge and emit values produced by resulting flows. Each stream is kept alive for the time extent of
the flow process.
" [input f & args]
(m/ap
(let [[tag group] (m/?> ##Inf input)
event-stream (m/stream group)]
(m/amb= (do (m/?> event-stream) (m/amb))
(m/?> (apply f tag event-stream args))))))
(defn topic-store "
For each key+flow pair consumed from input, accumulate the successive maps associating each key with its stream, call
given function with states published as a signal, and optional arguments. Emit values produced by resulting flow. The
signal is kept alive for the time extent of the flow process.
" [input f & args]
(m/ap
(let [topics (m/signal (m/reductions conj {} (stream-groups input #(m/ap [%1 %2]))))]
(m/amb= (do (m/?> topics) (m/amb))
(m/?> (apply f topics args))))))
(defn get-topic "
Subscribe to the stream associated with given key in given signal of maps, as soon as it's available.
" [topics tag]
(m/ap (m/?> (m/? (m/reduce (comp reduced {}) nil
(m/eduction (keep tag) topics))))))
(defn user-subs "
For each user in given flow, call given function with the user name, a map associating each topic of interest to its
stream of events, and optional arguments. Merge and emit values produced by resulting flows.
" [topics users f & args]
(m/ap
(let [{:keys [name interests]} (m/?> ##Inf users)]
(m/?> (apply f name (into {} (map (juxt identity (partial get-topic topics))) interests) args)))))
(comment
(def ps ((m/reduce {} nil
(topic-store
(m/group-by :tag
(m/observe
(fn [!]
(defn create-post [tag text]
(! {:tag tag :text text}))
#())))
user-subs
(m/seed #{{:name "alice" :interests #{:math :science}}
{:name "bob" :interests #{:math :clojure}}})
(fn [name interests]
(m/ap (let [[tag msgs] (m/?> (count interests) (m/seed interests))]
(prn name '- (m/?> msgs)) (m/amb))))))
prn prn))
(create-post :math "linear algebra")
;; "alice" - {:tag :math, :text "linear algebra"}
;; "bob" - {:tag :math, :text "linear algebra"}
(create-post :clojure "transducers")
;; "bob" - {:tag :clojure, :text "transducers"}
(ps))Additional notes
• disconnection is left as an exercise for the reader
• memory footprint is linear with the number of tags in total history (`a/pub` may have the same problem)
• I think it's close to what a/pub does under the hood, but I doubt there is a useful abstraction here - prove me wrong
Thank you! Will play with the sample to understand how it works
fan-in : when a node consumes from many upstream nodes
fan-out : when a node is consumed by many downstream nodes
All functional operators with many inputs are fan-in, like ap with amb= or ?> with concurrency >1 - multiple child branches with their own subprocess
pubsub - can you elaborate ?
re: fan-in Understood, thank you! re: pubsub I meant something like this, not sure how to express this with missionary
(require '[clojure.core.async :as a])
(def new-posts-channel (chan 1))
(def post-updates (a/pub new-posts-channel :tag))
(def users
(atom {0 {:name :A
:interests #{:math :science}}
1 {:name :B
:interests #{:math :clojure}}}))
(defn send! [socket msg]
(println (str "socket [" socket "]: " msg)))
(defn socket-connected [socket]
(let [user (@users socket)
interests (user :interests)
updates (a/chan)]
(doseq [interest interests]
(a/sub post-updates interest updates))
(swap! users assoc-in [socket :updates-chan] updates)
(go-loop [upd (a/<! updates)]
(when upd
(send! socket upd)
(recur (a/<! updates))))))
(defn socket-disconnected [socket]
(when-let [upd (get-in @users [socket :updates-chan])]
(a/close! upd)))
(defn create-post [tag text]
(a/>!! new-posts-channel
{:tag tag :text text}))
(socket-connected 0)
(socket-connected 1)
(create-post :clojure "Hello world")
; socket [1]: {:tag :clojure, :text "Hello world"}
(create-post :math "Hello world")
; socket [1]: {:tag :math, :text "Hello world"}
; socket [0]: {:tag :math, :text "Hello world"}
(socket-disconnected 0)
(create-post :math "Hello world")
; socket [1]: {:tag :math, :text "Hello world"}