missionary

maybenot 2024-11-10T05:19:10.520719Z

Out of curiosity, the comment https://github.com/leonoel/missionary/issues/37 stays that > ap already covers the fan-in, therefore it should be enough to implement fan-out. What’s meant by “covers fan-in” here? Also maybe there’s an example somewhere about implementing pubsub like thing? Say, user A and user B are conditionally interested in flow of posts F

leonoel 2024-11-12T20:10:17.149669Z

Here is a proof of concept

(defn stream-groups "
For each key+flow pair consumed from input, call given function with the key, the flow published as a stream, and
optional arguments. Merge and emit values produced by resulting flows. Each stream is kept alive for the time extent of
the flow process.
" [input f & args]
  (m/ap
    (let [[tag group] (m/?> ##Inf input)
          event-stream (m/stream group)]
      (m/amb= (do (m/?> event-stream) (m/amb))
        (m/?> (apply f tag event-stream args))))))

(defn topic-store "
For each key+flow pair consumed from input, accumulate the successive maps associating each key with its stream, call
given function with states published as a signal, and optional arguments. Emit values produced by resulting flow. The
signal is kept alive for the time extent of the flow process.
" [input f & args]
  (m/ap
    (let [topics (m/signal (m/reductions conj {} (stream-groups input #(m/ap [%1 %2]))))]
      (m/amb= (do (m/?> topics) (m/amb))
        (m/?> (apply f topics args))))))

(defn get-topic "
Subscribe to the stream associated with given key in given signal of maps, as soon as it's available.
" [topics tag]
  (m/ap (m/?> (m/? (m/reduce (comp reduced {}) nil
                     (m/eduction (keep tag) topics))))))

(defn user-subs "
For each user in given flow, call given function with the user name, a map associating each topic of interest to its
stream of events, and optional arguments. Merge and emit values produced by resulting flows.
" [topics users f & args]
  (m/ap
    (let [{:keys [name interests]} (m/?> ##Inf users)]
      (m/?> (apply f name (into {} (map (juxt identity (partial get-topic topics))) interests) args)))))


(comment
  (def ps ((m/reduce {} nil
             (topic-store
               (m/group-by :tag
                 (m/observe
                   (fn [!]
                     (defn create-post [tag text]
                       (! {:tag tag :text text}))
                     #())))
               user-subs
               (m/seed #{{:name "alice" :interests #{:math :science}}
                         {:name "bob" :interests #{:math :clojure}}})
               (fn [name interests]
                 (m/ap (let [[tag msgs] (m/?> (count interests) (m/seed interests))]
                         (prn name '- (m/?> msgs)) (m/amb))))))
           prn prn))

  (create-post :math "linear algebra")
  ;;  "alice" - {:tag :math, :text "linear algebra"}
  ;;  "bob" - {:tag :math, :text "linear algebra"}

  (create-post :clojure "transducers")
  ;;  "bob" - {:tag :clojure, :text "transducers"}

  (ps))

leonoel 2024-11-12T20:32:03.589149Z

Additional notes • disconnection is left as an exercise for the reader • memory footprint is linear with the number of tags in total history (`a/pub` may have the same problem) • I think it's close to what a/pub does under the hood, but I doubt there is a useful abstraction here - prove me wrong

🔥 1
maybenot 2024-11-13T07:09:10.835369Z

Thank you! Will play with the sample to understand how it works

leonoel 2024-11-10T11:47:44.536909Z

fan-in : when a node consumes from many upstream nodes fan-out : when a node is consumed by many downstream nodes All functional operators with many inputs are fan-in, like ap with amb= or ?> with concurrency >1 - multiple child branches with their own subprocess

1
leonoel 2024-11-10T11:58:19.524679Z

pubsub - can you elaborate ?

maybenot 2024-11-10T16:10:29.594079Z

re: fan-in Understood, thank you! re: pubsub I meant something like this, not sure how to express this with missionary

(require '[clojure.core.async :as a])

(def new-posts-channel (chan 1))
(def post-updates (a/pub new-posts-channel :tag))

(def users 
  (atom {0 {:name :A
            :interests #{:math :science}}
         1 {:name :B
            :interests #{:math :clojure}}}))

(defn send! [socket msg]
  (println (str "socket [" socket "]: " msg)))


(defn socket-connected [socket]
  (let [user (@users socket)
        interests (user :interests)
        updates (a/chan)]
    (doseq [interest interests]
      (a/sub post-updates interest updates))
    (swap! users assoc-in [socket :updates-chan] updates)
    (go-loop [upd (a/<! updates)]
      (when upd
        (send! socket upd)
        (recur (a/<! updates))))))

(defn socket-disconnected [socket]
  (when-let [upd (get-in @users [socket :updates-chan])]
    (a/close! upd)))

(defn create-post [tag text]
  (a/>!! new-posts-channel
   {:tag tag :text text}))

(socket-connected 0)
(socket-connected 1)


(create-post :clojure "Hello world")
; socket [1]: {:tag :clojure, :text "Hello world"}

(create-post :math "Hello world")
; socket [1]: {:tag :math, :text "Hello world"}
; socket [0]: {:tag :math, :text "Hello world"}

(socket-disconnected 0)

(create-post :math "Hello world")
; socket [1]: {:tag :math, :text "Hello world"}