Fork me on GitHub
#core-async
<
2020-01-30
>
otfrom09:01:50

hmm... I don't understand something about the interaction between core.async/take and mults

otfrom09:01:17

this simplified example works: `

otfrom09:01:19

(let [input-chan (a/chan 32)
        input-mult (a/mult input-chan)

        plus-chan (a/reduce + 0 (a/tap input-mult (a/chan 32)))
        all-chan (a/into [] (a/tap input-mult (a/chan 32)))
        zero-chan (a/into [] (a/take 1 (a/tap input-mult (a/chan 32))))]

    (a/pipe (a/to-chan (range 33)) input-chan)

    {:all (a/<!! all-chan)
     :plus (a/<!! plus-chan)
     :zero (a/<!! zero-chan)})
  {:all [0 1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32], :plus 528, :zero [0]}

otfrom09:01:57

this hangs forever

(let [input-chan (a/chan 32)
        input-mult (a/mult input-chan)

        plus-chan (a/reduce + 0 (a/tap input-mult (a/chan 32)))
        all-chan (a/into [] (a/tap input-mult (a/chan 32)))
        zero-chan (a/into [] (a/take 1 (a/tap input-mult (a/chan 32))))]

    (a/pipe (a/to-chan (range 34)) input-chan)

    {:all (a/<!! all-chan)
     :plus (a/<!! plus-chan)
     :zero (a/<!! zero-chan)})

otfrom09:01:07

an even simpler example:

(let [input-chan (a/chan 32)
        input-mult (a/mult input-chan)

        plus-chan (a/reduce + 0 (a/tap input-mult (a/chan 32)))
        zero-chan (a/into [] (a/take 1 (a/tap input-mult (a/chan 32))))]

    (a/pipe (a/to-chan (range 34)) input-chan)

    {:plus (a/<!! plus-chan)
     :zero (a/<!! zero-chan)})
  ;;; hangs forever

  (let [input-chan (a/chan 32)
        input-mult (a/mult input-chan)

        plus-chan (a/reduce + 0 (a/tap input-mult (a/chan 32)))
        zero-chan (a/into [] (a/take 1 (a/tap input-mult (a/chan 32))))]

    (a/pipe (a/to-chan (range 33)) input-chan)

    {:plus (a/<!! plus-chan)
     :zero (a/<!! zero-chan)})
  {:plus 528, :zero [0]}

otfrom09:01:38

creating multiple things off the same input is a really common pattern for me so I'd love to figure out what I'm doing wrong with the mult

bortexz10:01:13

@otfrom I think the problem is with this channel (a/take 1 (a/tap input-mult (a/chan 32))) the one with 32 buffered. When the take takes 1 and returns a channel, the channel created for the tap is no longer consumed, so when the buffer is full, it prevents the mult from continuing distributing items, while the range 33 version works as it’s able to buffer all the items. If the tapped channel of the take you put a bigger buffer, then it works

bortexz10:01:59

Not being able to distribute all messages makes the reduce ch to not close and then doesn’t produce a result

bortexz10:01:39

(let [input-chan (a/chan 32)
        input-mult (a/mult input-chan)
        plus-chan (a/reduce + 0 (a/tap input-mult (a/chan 32)))
        zero-chan (a/into [] (a/take 1 (a/tap input-mult (a/chan 33))))]
    (a/pipe (a/to-chan (range 34)) input-chan)
    {:plus (a/<!! plus-chan)
     :zero (a/<!! zero-chan)})
Just with 33 on the zero-chan tap ch it works

otfrom10:01:49

ok. I had thought that the take would close its consuming channel when it got its one item thus not blocking the rest

otfrom10:01:49

forcing the channel for the a/take to be the size of the whole input doesn't feel like what I was hoping for with a/take

bortexz10:01:53

yeah I agree, a dropping or sliding buffer wouldn’t need to know the length , but if you want to close the channel so it doesn’t keep consuming the mult after the take, you could implement your own take that closes the input channel

otfrom10:01:22

ah, so just use a different kind of buffer. Thx. Hadn't twigged that one

otfrom10:01:35

excellent. This works:

(let [input-chan (a/chan 32)
        input-mult (a/mult input-chan)

        plus-chan (a/reduce + 0 (a/tap input-mult (a/chan 32)))
        zero-chan (a/into [] (a/take 1 (a/tap input-mult (a/chan (a/dropping-buffer 1)))))]

    (a/pipe (a/to-chan (range 100)) input-chan)

    {:plus (a/<!! plus-chan)
     :zero (a/<!! zero-chan)})

👍 4
otfrom10:01:09

thx for the answer 🙂

otfrom10:01:35

excellent. This works:

(let [input-chan (a/chan 32)
        input-mult (a/mult input-chan)

        plus-chan (a/reduce + 0 (a/tap input-mult (a/chan 32)))
        zero-chan (a/into [] (a/take 1 (a/tap input-mult (a/chan (a/dropping-buffer 1)))))]

    (a/pipe (a/to-chan (range 100)) input-chan)

    {:plus (a/<!! plus-chan)
     :zero (a/<!! zero-chan)})

👍 4
otfrom11:01:16

so for some of the results above (like the all-chan) what I really want to do is just stream it out to a file. I worry a bit about blocking the threadpool if I do that. Are they any good examples out there of taking something off a channel and streaming each record (or batches) to a file or other io?

otfrom13:01:39

ah, looks like the async_edn.clj has a good example of writing things out from a channel: https://github.com/Datomic/mbrainz-importer/blob/master/src/cognitect/xform/async_edn.clj

otfrom13:01:17

good use of anomalies too I think

noisesmith17:01:48

yeah, the usual idiom is to use thread, with blocking ops inside. helpfully thread returns a channel so it can be used in a go block idiomatically

otfrom17:01:56

tho here I think I just want it for the side effects

otfrom11:01:42

thx for the help @bertofer

bortexz11:01:38

np, I enjoyed to figure it out as it was quite puzzling

reborg11:01:43

I’m wondering what advantages the above has compared to:

(let [input (range 34)
      all (future (into [] input))
      plus (future (reduce + input))
      zero (future (take 1 input))]
  {:all (deref all) 
   :plus (deref plus) 
   :zero (deref zero)})

otfrom11:01:49

for simple topologies I've written that exact code

otfrom11:01:02

does future use a pool or a new thread each time?

otfrom11:01:29

I've got other bits where there are shared transforms that go to some downstream components

otfrom11:01:12

I suppose with your example there is no back pressure either

reborg11:01:54

what should slow down if one of those parallel task is taking longer?

reborg11:01:16

(it could be me not really understanding some concepts, so genuine questions)

otfrom15:01:01

@reborg do you mean why would I choose a larger buffer for one task rather than another?

otfrom15:01:36

If + was doing real work I'd want that to have a larger buffer

reborg15:01:38

I was thinking about the need for backpressure, that it could be useful in case you don’t control the producer upstream, as a way to communicate the need for a slowdown without crashing the consumer or responding 500. But if you have control of the source of the data (such as a file), the backpressure is already built in with sequences or input streams. So I tend to think backpressure is more important with a streaming consumer (always on). I was under the impression you were dealing with one off processing via files.

otfrom17:01:15

hmm... I wonder if back pressure is the right term. For what I'm doing atm I'm not processing files, I'm processing generated simulations. The buffers let me process things multiple times without having to realise the whole of the sequence into memory at the same time or re-create the seq for multiple outputs