Fork me on GitHub
#missionary
<
2023-02-17
>
Chris Lowe17:02:01

Hello all, Is it possible to split a flow into two paths and recombine at a later stage? I’m playing with something like the following, but I’m struggling to figure out what is needed to make this work (or if, indeed, this is possible)

(defn fetch-missing-items [cache-response-flow]
  (let [cache-hits (->> cache-response-flow               ;; path 1
                          (m/eduction (filter :result)))
          cache-misses (->> cache-response-flow           ;; path 2
                            (m/eduction (filter #(not (:result %))) (map :key) (partition-all 100))
                            fetch-details
                            (m/eduction cat)]
    (m/not-sure-what cache-hits cache-misses)))

(let [page-numbers '(1 2 3 4 5 6 7 8 9 10 11)
      inputs (map #(m/via m/blk (m/? (backoff (request %) delays))) page-numbers)
      values (m/ap
               (let [flow (m/seed inputs)                   ;; create a flow of tasks to execute
                     task (m/?> 5 flow)]                    ;; from here, fork on every task in **parallel**
                 (m/? task)))
      ;; drain the flow of values and count them
      all (m/?                                              ;; tasks are executed, and flow is consume here!
            (->> values
                 (m/eduction (partition-all 25))
                 fetch-product-details-cache
                 (m/eduction cat)
                 fetch-missing-products
                 (m/reduce (fn [p v]
                             (prn :v v)
                             (conj p v))
                           ())))]
  (println :all-count (count all) :all all))
Cheers, Chris

leonoel20:02:05

(defn fetch-missing-items [cache-response-flow]
  (m/ap (let [[hit? flow] (m/?> 2 (m/group-by #(contains? % :result) cache-response-flow))]
          (m/?> (if hit?
                  flow
                  (->> flow
                    (m/eduction (map :key) (partition-all 100))
                    fetch-details
                    (m/eduction cat)))))))

Chris Lowe20:02:00

Nice! That worked beautifully. Thank you so much for this!