Fork me on GitHub
#core-async
<
2020-11-02
>
ghaskins00:11:07

@smith.adriane thats a good thought...let me play with that

phronmophobic00:11:33

also, I'm just realizing that you probably want (comp (drop offset) (take limit)) rather than (comp (take limit) (drop offset))

phronmophobic00:11:26

while you typically supply functions to comp in "reverse order", you don't with transducers. see https://groups.google.com/g/clojure/c/OeXZOJYydAs for a longer explanation

ghaskins00:11:14

@smith.adriane oh wow, I didnt know that either..ty

ghaskins00:11:18

your idea did work, btw

ghaskins00:11:42

it was a little awkward, as the "to" channel was supplied to me, so I had to put two async/pipes in to get the transducer in the right spot

ghaskins00:11:51

its functional..thank you

ghaskins00:11:22

(would be nice if async/pipe had an arity for an xform...

ghaskins00:11:25

but i digress

phronmophobic00:11:36

there are other alternatives

phronmophobic00:11:44

you can run the transducer yourself

phronmophobic00:11:36

(def xform (comp
            (drop 1)
            (take 3)))


(def my-rf (xform #(do %2)))

(my-rf nil 1) ;; nil
(my-rf nil 2) ;; 2
(my-rf nil 3) ;; 3
(my-rf nil 4) ;; (reduced 4)
which could be used like:
(def xform (comp
            (drop 1)
            (take 3)))

(def my-rf (xform #(do %2)))

(go-loop []
  (let [v (my-rf nil (<! ch))]
    (when v
      (if (reduced? v)
        (do-thing-with-v (unreduced v))
        (do
          (do-thing-with-v v)
          (recur))))))

ghaskins00:11:21

interesting, ty for the details

phronmophobic00:11:23

I'm sure there's a nice way to wrap up that functionality if that's not already done somewhere in core.async

ghaskins00:11:04

im somewhat green on transducers, so ill take a bit to decode that but, very cool

phronmophobic00:11:46

every time I try to do something with transducers that's not the typical example, it always starts out as like 30 lines of code and gets reduced (😉) to like 5 lines once I figure it out

phronmophobic00:11:12

I'm not sure what the process you're building that's consuming from the channel, but async/reduce might also be a good building block to look into

👍 3
ghaskins00:11:00

im basically building a join from two cassandra CQL queries, with pagination

phronmophobic00:11:41

do they results get aggregated into a collection or are they streamed to some other process?

phronmophobic00:11:57

either way, you should be able to use async/transduce: • stream to collection:

(async/transduce
  (comp (drop offset) (take limit))
  conj
  []
  ch)
• stream to other process
(async/transduce
  (comp (drop offset) (take limit))
  async/put!
  other-process-chan
  ch)
it should even work if you're streaming to another process using something besides a core async channel:
(async/transduce
  (comp (drop offset) (take limit))
  socket-send! ;; assuming usage like (socket-send! conn data)
  socket-connection
  ch)
edit: all the examples were missing the channel to consume argument, ch

ghaskins00:11:22

oh wow, that is really cool

ghaskins00:11:48

its streaming out to a grpc client via protojure, which uses core.async

ghaskins00:11:56

so your async>!! example is likely perfect

🤞 3
phronmophobic00:11:26

oh whoops, pretty sure async/>!! should be async/put!

ghaskins01:11:36

I made one slight variation

(async/transduce
           (comp (drop offset) (take limit))
           (fn
             ([] nil)
             ([acc] (async/close! output-ch))
             ([acc x] (async/>!! output-ch x)))
           nil)

ghaskins01:11:57

otherwise I was getting an arity error for the [acc] case

ghaskins01:11:11

im sure there are more elegant ways to do this...

phronmophobic01:11:55

that looks pretty good to me, but I wouldn't be surprised if there was a more idiomatic solution. I might pass output-ch as the initial value and use it as the accumulator value rather than ignoring acc, but it's a minor nitpick. more importantly, async/>!! should probably be async/put! so it doesn't block the go loop used by async/transduce if output-ch isn't consuming as fast as the input channel is producing

ghaskins02:11:22

ah, good point...the only problem with put! is the opposite case of a slow reader causing the 1024 buffer limit in put!. Since its an external client I was trying to be defensive, but go-pool blocking isnt attractive either

ghaskins02:11:56

anyway, i think i understand the issues now, so I can tune this

ghaskins02:11:02

ty very much, again

ghaskins00:11:49

that looks promising...ty for the tips

👍 3
Jakub Holý (HolyJak)08:11:57

Hello! I am trying to make a variant of take-while that closes the source and destination channels once it takes all it can. Is there a better way to implement it than this:

(defn chan-while [src dst pred]
    (let [filtered (a/chan 1 (take-while pred))]
      (a/pipe src filtered)
      (a/go-loop []
        (if-let [val (a/<! filtered)]
          (and (a/>! dst val)
               (recur))
          (do (a/close! src)
              (a/close! dst))))))
?

Jakub Holý (HolyJak)08:11:17

To test it I use this as the src :

(def src (a/to-chan! [1 3 5 2 4 6]))
this as dst :
(defn print-ch []
    (let [ch (a/chan)]
      (a/go-loop []
        (if-let [val (a/<! ch)]
          (do (println "VAL: " val)
              (recur))
          (println "CLOSED dst")))
      ch))
and invoke it all via:
(chan-while src (print-ch) odd?)

Jan K11:11:35

I'm actually getting the output you expect from that code.

3
Jakub Holý (HolyJak)18:11:01

Thanks a lot! You are right, I must have screwed up my testing.