Fork me on GitHub
#core-async
<
2020-01-06
>
didibus05:01:40

Say I have a go block, and I need to mapcat a collection inside that go block, but for every element inside the collection I also use a go block? So what I did is:

(go (into [] (mapcat (fn [e] (go (something e)))) [1 2 3])

didibus05:01:47

But now I don't want my outer go block to return a vector of channels, so I'd like to take the value out of them, but I would need another map or for or anything which would have a nested fn with a go block doing a take again in it, and then I'd have the same issue

didibus06:01:42

Hum, my solution for now is to do this:

(go
  (loop [acc [] coll (into []
                               (map
                                (fn [c]
                                  (go
                                    (<! (somefn c))))
                               [1 2 3)]
        (if-let [ch (first coll)]
          (recur (conj acc (<! ch)) (next coll))
          (into [] cat acc)))))

hiredman06:01:40

You can just map somefn

hiredman06:01:16

And there is some combinator in core.async for this, I just don't recall what it is

hiredman06:01:38

Takes a collection of channels and returns a channel of a collection

didibus19:01:34

I'm guessing that's merge

hiredman19:01:08

into in fact

hiredman19:01:54

(async/<! (async/into [] (map some-channel-returning-fn [1 2 3 4])))

hiredman19:01:14

but you might want to look at replacing that with one of the pipeline variations

didibus19:01:08

Oh... interesting.

didibus19:01:11

Too bad that async/into doesn't take a transducer

didibus20:01:42

Hum, I don't think that works

didibus20:01:07

Because the second arg of async/into is supposed to be a channel, not an ISeq of channels

noisesmith20:01:26

it would work with async/merge in place of map though

noisesmith20:01:37

err, merge wrapping map, sorry

didibus20:01:55

Also, why does this not work? (doto (async/chan [10]) async/close!)

noisesmith20:01:44

is [10] really a valid buff-or-n ?

didibus20:01:00

So it does seem you can, async/merge and then async/into and then async/<!. At that point, I might prefer my loop variant 😛

didibus20:01:20

Oh, right, I was being dumb, and treating chan like atom

didibus20:01:30

This is what I wanted: (doto (async/chan) (async/put! [10]) (async/close!))

didibus20:01:40

That's a lot of contortion just to mapcat with a channel-returning-fn :S

hiredman20:01:23

You don't need merge

hiredman20:01:01

I don't know what you are doing now, but if you wrap in apply concat, my async/nto line completely replaces your earlier loop+into

didibus20:01:52

(apply concat
             (async/into []
                         (map (constantly (async/go [1 2])) [1 2 3])))
Throws java.lang.IllegalArgumentException: No implementation of method: :take! of protocol: #'clojure.core.async.impl.protocols/ReadPort found for class: clojure.lang.LazySeq

didibus20:01:02

Because async/into takes a channel, not a collection

didibus20:01:19

Fundamentally I'm trying to do:

(async/<!!
       (async/go
         (mapcat (constantly (async/go [1 2])) [1 2 3])))

didibus20:01:57

So mapcat inside a go block over a non channel coll where the mapping fn is a channel returning one

hiredman20:01:12

Merge is not ordered