Fork me on GitHub
#core-async
<
2018-05-06
>
twashing05:05:37

Is there a way to join channel values by some id? For example, if you have 3 channels with values that you may want to join together by :id.

twashing05:05:20

(def c1 (to-chan [{:id 2 :value "a"} {:id 3} {:id 4}]))
  (def c2 (to-chan [{:id 1} {:id 2 :value "b"} {:id 3}]))
  (def c3 (to-chan [{:id 0} {:id 1} {:id 2 :value "c"}]))
  

twashing05:05:29

From what I can tell, none of these will help join across channels

twashing05:05:39

clojure.core.async [mult
                      merge mix
                      pub sub
                      map]

  net.cgrand/xforms [group-by for]
  

noisesmith13:05:30

wouldn't mergeing the chans followed by pub and sub based on the id do it?

twashing16:05:17

@noisesmith That’s a good idea. But the subscriber channel is going to get all events with an :id. I still have to manually join messages with the same id.

twashing16:05:38

So preserve some state to do that, which is inevitably, I suppose.

noisesmith16:05:21

The subscriptions can dispatch by the value under id

twashing16:05:25

Oh that would be nice. I only see the ability to dispatch by key. https://clojuredocs.org/clojure.core.async/sub

noisesmith16:05:58

topic-fn can be any function

noisesmith16:05:15

Look at the doc for pub

noisesmith16:05:34

In your case, specifying :id to pub as your topic fn should do what you want

twashing16:05:13

Correct. So something like this will collect all messages with an id.

(pub (chan (sliding-buffer 100))
     #(:id %))

twashing16:05:36

But I need to join messages of the same id value. I think there’s no getting around statefulness for that.

noisesmith02:05:55

you're still not understanding - it doesn't collect all items with an id field, it dispatches by the value under id, and each subscriber can specify what id they look for

twashing16:05:55

I’ll play around with topic fns, to see what I get.

mccraigmccraig17:05:25

@twashing i did a thing for manifold which joins values by key from streams sorted in the same key, supporting sort-merge, inner join, outer joins and set ops - there might be some inspiration there - https://github.com/employeerepublic/promisespromises/blob/master/src/prpr/stream/cross.clj

twashing19:05:03

@mccraigmccraig From the prpr.stream.cross namespace, these functions look interesting.

cross-streams
sort-merge-streams
set-streams-union (which uses cross-streams)

twashing19:05:09

To be clear though, does :skey-streams look like the below? My main thing is that I want to be able to join values across streams.

{k1 manifold.stream/stream
 k2 manifold.stream/stream}

twashing19:05:30

Ie, in the previous example, events with :id 2 (or 1 or 3) will have their values joined together.

[{:id 2 :value "a"} {:id 3} {:id 4}]
[{:id 1} {:id 2 :value "b"} {:id 3}]
[{:id 0} {:id 1} {:id 2 :value "c"}]

mccraigmccraig19:05:34

@twashing yes, :skey-streams looks like that, though in your case you would have to wrap each of your streams in an ISortedStream with (event-source->sorted-stream :id <stream>) to configure the key extraction

mccraigmccraig19:05:06

then consecutive values with the same keys from each stream will be joined with a cartesian-product

mccraigmccraig19:05:15

and if any of your streams should happen to not be sorted in the key you will get an error

mccraigmccraig19:05:47

if you don’t want a join in the relational/cartesian sense, you could do something like sort-merge-streams or set-streams-union and then partition by your key ?

twashing19:05:59

@mccraigmccraig This looks really interesting. Can I try your project as is? Or should I just pull out the relevant bits, for a play?

mccraigmccraig19:05:45

you can certainly try it as is - there are a bunch of tests which should show you how to use each fn

mccraigmccraig19:05:51

manifold also supports stream interop with core.async chans, so you should be able to use it with chans (although i haven’t tried that)

twashing20:05:16

@mccraigmccraig Nice one. Thanks for sharing :)