This page is not created by, affiliated with, or supported by Slack Technologies, Inc.
2023-02-17
Channels
- # announcements (12)
- # babashka (27)
- # beginners (65)
- # biff (8)
- # calva (22)
- # clj-kondo (1)
- # clj-otel (5)
- # clojure (65)
- # clojure-europe (127)
- # clojure-nl (1)
- # clojure-norway (11)
- # clojure-portugal (2)
- # clojure-uk (2)
- # clojurescript (18)
- # cursive (5)
- # data-science (3)
- # datahike (14)
- # datascript (3)
- # datomic (7)
- # deps-new (11)
- # emacs (31)
- # exercism (1)
- # fulcro (1)
- # honeysql (3)
- # hyperfiddle (38)
- # introduce-yourself (4)
- # leiningen (2)
- # malli (20)
- # meander (2)
- # missionary (3)
- # off-topic (4)
- # pathom (3)
- # practicalli (2)
- # reagent (5)
- # releases (1)
- # sci (1)
- # shadow-cljs (9)
- # xtdb (8)
Hello all, Is it possible to split a flow into two paths and recombine at a later stage? I’m playing with something like the following, but I’m struggling to figure out what is needed to make this work (or if, indeed, this is possible)
(defn fetch-missing-items [cache-response-flow]
(let [cache-hits (->> cache-response-flow ;; path 1
(m/eduction (filter :result)))
cache-misses (->> cache-response-flow ;; path 2
(m/eduction (filter #(not (:result %))) (map :key) (partition-all 100))
fetch-details
(m/eduction cat)]
(m/not-sure-what cache-hits cache-misses)))
(let [page-numbers '(1 2 3 4 5 6 7 8 9 10 11)
inputs (map #(m/via m/blk (m/? (backoff (request %) delays))) page-numbers)
values (m/ap
(let [flow (m/seed inputs) ;; create a flow of tasks to execute
task (m/?> 5 flow)] ;; from here, fork on every task in **parallel**
(m/? task)))
;; drain the flow of values and count them
all (m/? ;; tasks are executed, and flow is consume here!
(->> values
(m/eduction (partition-all 25))
fetch-product-details-cache
(m/eduction cat)
fetch-missing-products
(m/reduce (fn [p v]
(prn :v v)
(conj p v))
())))]
(println :all-count (count all) :all all))
Cheers,
Chris(defn fetch-missing-items [cache-response-flow]
(m/ap (let [[hit? flow] (m/?> 2 (m/group-by #(contains? % :result) cache-response-flow))]
(m/?> (if hit?
flow
(->> flow
(m/eduction (map :key) (partition-all 100))
fetch-details
(m/eduction cat)))))))
Nice! That worked beautifully. Thank you so much for this!