This page is not created by, affiliated with, or supported by Slack Technologies, Inc.
2020-11-02
Channels
- # announcements (1)
- # babashka (19)
- # beginners (85)
- # calva (1)
- # cider (23)
- # clara (19)
- # clj-kondo (28)
- # clojars (4)
- # clojure (60)
- # clojure-australia (8)
- # clojure-dev (14)
- # clojure-europe (117)
- # clojure-nl (3)
- # clojure-uk (11)
- # clojurescript (68)
- # conjure (2)
- # core-async (39)
- # cryogen (11)
- # cursive (7)
- # data-science (1)
- # datomic (9)
- # emacs (10)
- # etaoin (1)
- # events (1)
- # fulcro (1)
- # helix (10)
- # jobs (4)
- # kaocha (2)
- # keechma (3)
- # leiningen (1)
- # malli (6)
- # pathom (15)
- # pedestal (10)
- # re-frame (5)
- # reitit (1)
- # remote-jobs (1)
- # rum (3)
- # shadow-cljs (18)
- # tools-deps (30)
- # vim (6)
@smith.adriane thats a good thought...let me play with that
also, I'm just realizing that you probably want (comp (drop offset) (take limit))
rather than (comp (take limit) (drop offset))
while you typically supply functions to comp in "reverse order", you don't with transducers. see https://groups.google.com/g/clojure/c/OeXZOJYydAs for a longer explanation
@smith.adriane oh wow, I didnt know that either..ty
it was a little awkward, as the "to" channel was supplied to me, so I had to put two async/pipes in to get the transducer in the right spot
there are other alternatives
you can run the transducer yourself
(def xform (comp
(drop 1)
(take 3)))
(def my-rf (xform #(do %2)))
(my-rf nil 1) ;; nil
(my-rf nil 2) ;; 2
(my-rf nil 3) ;; 3
(my-rf nil 4) ;; (reduced 4)
which could be used like:
(def xform (comp
(drop 1)
(take 3)))
(def my-rf (xform #(do %2)))
(go-loop []
(let [v (my-rf nil (<! ch))]
(when v
(if (reduced? v)
(do-thing-with-v (unreduced v))
(do
(do-thing-with-v v)
(recur))))))
I'm sure there's a nice way to wrap up that functionality if that's not already done somewhere in core.async
every time I try to do something with transducers that's not the typical example, it always starts out as like 30 lines of code and gets reduced (😉) to like 5 lines once I figure it out
I'm not sure what the process you're building that's consuming from the channel, but async/reduce
might also be a good building block to look into
do they results get aggregated into a collection or are they streamed to some other process?
either way, you should be able to use async/transduce
:
• stream to collection:
(async/transduce
(comp (drop offset) (take limit))
conj
[]
ch)
• stream to other process
(async/transduce
(comp (drop offset) (take limit))
async/put!
other-process-chan
ch)
it should even work if you're streaming to another process using something besides a core async channel:
(async/transduce
(comp (drop offset) (take limit))
socket-send! ;; assuming usage like (socket-send! conn data)
socket-connection
ch)
edit: all the examples were missing the channel to consume argument, ch
oh whoops, pretty sure async/>!!
should be async/put!
I made one slight variation
(async/transduce
(comp (drop offset) (take limit))
(fn
([] nil)
([acc] (async/close! output-ch))
([acc x] (async/>!! output-ch x)))
nil)
that looks pretty good to me, but I wouldn't be surprised if there was a more idiomatic solution. I might pass output-ch
as the initial value and use it as the accumulator value rather than ignoring acc
, but it's a minor nitpick.
more importantly, async/>!!
should probably be async/put!
so it doesn't block the go loop used by async/transduce
if output-ch
isn't consuming as fast as the input channel is producing
ah, good point...the only problem with put! is the opposite case of a slow reader causing the 1024 buffer limit in put!. Since its an external client I was trying to be defensive, but go-pool blocking isnt attractive either
async/transduce
uses async/reduce
for implementation, https://github.com/clojure/core.async/blob/e9dc83dad06161176f176cad0f56f8bb235c51e5/src/main/clojure/clojure/core/async.clj#L651
Hello! I am trying to make a variant of take-while
that closes the source and destination channels once it takes all it can. Is there a better way to implement it than this:
(defn chan-while [src dst pred]
(let [filtered (a/chan 1 (take-while pred))]
(a/pipe src filtered)
(a/go-loop []
(if-let [val (a/<! filtered)]
(and (a/>! dst val)
(recur))
(do (a/close! src)
(a/close! dst))))))
?To test it I use this as the src
:
(def src (a/to-chan! [1 3 5 2 4 6]))
this as dst
:
(defn print-ch []
(let [ch (a/chan)]
(a/go-loop []
(if-let [val (a/<! ch)]
(do (println "VAL: " val)
(recur))
(println "CLOSED dst")))
ch))
and invoke it all via:
(chan-while src (print-ch) odd?)
Thanks a lot! You are right, I must have screwed up my testing.