This page is not created by, affiliated with, or supported by Slack Technologies, Inc.
2018-05-06
Channels
- # aws (11)
- # beginners (51)
- # cider (61)
- # cljsrn (37)
- # clojure (51)
- # clojure-spec (5)
- # clojure-uk (6)
- # clojurescript (35)
- # core-async (29)
- # cursive (3)
- # datomic (4)
- # defnpodcast (3)
- # editors (10)
- # emacs (3)
- # fulcro (2)
- # lein-figwheel (9)
- # leiningen (3)
- # mount (3)
- # off-topic (59)
- # parinfer (6)
- # portkey (4)
- # re-frame (6)
- # shadow-cljs (136)
- # spacemacs (1)
- # specter (1)
- # tools-deps (10)
Is there a way to join channel values by some id? For example, if you have 3 channels with values that you may want to join together by :id.
(def c1 (to-chan [{:id 2 :value "a"} {:id 3} {:id 4}]))
(def c2 (to-chan [{:id 1} {:id 2 :value "b"} {:id 3}]))
(def c3 (to-chan [{:id 0} {:id 1} {:id 2 :value "c"}]))
wouldn't mergeing the chans followed by pub and sub based on the id do it?
@noisesmith That’s a good idea. But the subscriber channel is going to get all events with an :id
. I still have to manually join messages with the same id.
The subscriptions can dispatch by the value under id
Oh that would be nice. I only see the ability to dispatch by key. https://clojuredocs.org/clojure.core.async/sub
topic-fn can be any function
Look at the doc for pub
In your case, specifying :id to pub as your topic fn should do what you want
Correct. So something like this will collect all messages with an id.
(pub (chan (sliding-buffer 100))
#(:id %))
But I need to join messages of the same id value. I think there’s no getting around statefulness for that.
you're still not understanding - it doesn't collect all items with an id field, it dispatches by the value under id, and each subscriber can specify what id they look for
@twashing i did a thing for manifold which joins values by key from streams sorted in the same key, supporting sort-merge, inner join, outer joins and set ops - there might be some inspiration there - https://github.com/employeerepublic/promisespromises/blob/master/src/prpr/stream/cross.clj
@mccraigmccraig From the prpr.stream.cross
namespace, these functions look interesting.
cross-streams
sort-merge-streams
set-streams-union (which uses cross-streams)
To be clear though, does :skey-streams
look like the below? My main thing is that I want to be able to join values across streams.
{k1 manifold.stream/stream
k2 manifold.stream/stream}
Ie, in the previous example, events with :id 2 (or 1 or 3) will have their values joined together.
[{:id 2 :value "a"} {:id 3} {:id 4}]
[{:id 1} {:id 2 :value "b"} {:id 3}]
[{:id 0} {:id 1} {:id 2 :value "c"}]
@twashing yes, :skey-streams
looks like that, though in your case you would have to wrap each of your streams in an ISortedStream
with (event-source->sorted-stream :id <stream>)
to configure the key extraction
then consecutive values with the same keys from each stream will be joined with a cartesian-product
and if any of your streams should happen to not be sorted in the key you will get an error
if you don’t want a join in the relational/cartesian sense, you could do something like sort-merge-streams
or set-streams-union
and then partition by your key ?
@mccraigmccraig This looks really interesting. Can I try your project as is? Or should I just pull out the relevant bits, for a play?
you can certainly try it as is - there are a bunch of tests which should show you how to use each fn
manifold also supports stream interop with core.async chans, so you should be able to use it with chans (although i haven’t tried that)
@mccraigmccraig Nice one. Thanks for sharing :)