Fork me on GitHub
#aleph
<
2020-05-03
>
Sam DeSota13:05:45

Looking through the Aleph manifold API, is there an easy way to merge 2 streams? I've primarily familiar with the Rx model of streams, I know manifold has different semantics. I'd like to do a merge-map operation, like mapcat but instead of concating the streams, it merges all returned streams together, emitting events concurrenty. Tried to create the method myself, but struggling to get a working solution.

mccraigmccraig15:05:05

what are the semantics of the op you want @me1260? are you wanting to merge maps from each stream or are you wanting the contents of the streams unchanged but mixed?

Sam DeSota15:05:29

Ended up implementing via:

(defn merge-streams
  "Takes a stream of streams, and merges them into a single stream."
  [s]
  (let [out (s/stream)]
    (s/consume #(s/consume (fn [value] (s/put! out value)) %) s)
    (s/source-only out)))

Sam DeSota15:05:39

Not sure how well this compares to concat , but the semantics are to merge each stream unchanged into the destination stream. Seems like it would be a useful operator to have in the library.

Sam DeSota15:05:07

The use case is a websocket server that recieves a subscribe message, then starts streaming updates about the resource to the client. Each time a new resource is subscribed via a websocket message, I merge in a new stream of updates about the resource.

mccraigmccraig20:05:42

@me1260 ah, ok, you aren't doing anything with the values on the stream - in similar circumstances I've connected the new streams to the merged stream