Fork me on GitHub
#core-async
<
2018-05-11
>
twashing05:05:32

@mccraigmccraig I think I’m close to getting something interesting out of promisespromises… It looks really good by-the-way. Thanks for making it!

twashing05:05:34

I tried running the below code. But trying to run A), gives me the error in B). > I’m guessing I’m using the wrong key somewhere? > Can I subsequently use (s/take! result) to pull out values?

(require '[clojure.core.async :refer [to-chan]]
              '[manifold.stream :as s]
              '[prpr.stream.cross :as prpr :refer [event-source->sorted-stream]])


  (def c1 (to-chan [{:id 2 :value "a"} {:id 3} {:id 4}]))
  (def c2 (to-chan [{:id 1} {:id 2 :value "b"} {:id 3}]))
  (def c3 (to-chan [{:id 0} {:id 1} {:id 2 :value "c"}]))

  (def cs1 (s/->source c1))
  (def cs2 (s/->source c2))
  (def cs3 (s/->source c3))

  (def ss1 (prpr/event-source->sorted-stream :id cs1))
  (def ss2 (prpr/event-source->sorted-stream :id cs2))
  (def ss3 (prpr/event-source->sorted-stream :id cs3))

  (def result (prpr/set-streams-union {:default-key-fn :id
                                       :skey-streams {:ss1 ss1
                                                      :ss2 ss2
                                                      :ss3 ss3}}))

  (s/take! result)

twashing05:05:49

A)

(def result (prpr/set-streams-union {:default-key-fn :id
                                       :skey-streams {:ss1 ss1
                                                      :ss2 ss2
                                                      :ss3 ss3}}))

twashing05:05:58

B)

18-05-11 05:14:54 1f4f1184a4fc WARN [prpr.stream.cross:478] - error crossing the streams. op-id:
nil

                                 java.lang.Thread.run              Thread.java:  745
                                                  ...                               
               manifold.utils/thread-factory/reify/fn                utils.clj:   32
   java.util.concurrent.ThreadPoolExecutor$Worker.run  ThreadPoolExecutor.java:  617
    java.util.concurrent.ThreadPoolExecutor.runWorker  ThreadPoolExecutor.java: 1142
                                                  ...                               

twashing05:05:50

… I also played around with some of the tests, for example prpr.stream.cross-test/full-outer-join-records-test. But couldn’t get it to work. I feel I’m close though. https://github.com/employeerepublic/promisespromises/blob/master/test/prpr/stream/cross_test.clj#L704

mccraigmccraig07:05:43

@twashing you were just missing that prpr.stream.cross/set-streams-union returns a Deferred<Stream> rather than a straight up Stream

mccraigmccraig07:05:17

'course don't generally use @ to get at the contents of a Deferred

mccraigmccraig07:05:37

we're quite off-topic for #core-async now... perhaps we should move somewhere else... feel free to PM me

mccraigmccraig07:05:37

keeping it all in the async domain i would normally do something like: https://gist.github.com/03616f9dec96c54a56fc62ee1d04b1a7