Fork me on GitHub
#missionary
<
2023-11-26
>
telekid16:11:32

Is there a way to mark a flow as terminated from within e.g. an m/ap block?

telekid16:11:12

See comment in above snippet

telekid16:11:03

Also quick feedback: It took me a little while to work through how I should run a task within m/observe. My first instinct was obviously to reach for m/?, but that blocks. Eventually I realized I could invoke the task manually and pass ! into the task's callbacks. I think this is a pattern that should be clarified a bit – I'm definitely not sure I'm doing things the best way here.

telekid16:11:03

Maybe my original question is actually closer to "how do I terminate a flow created by m/observe?"

telekid16:11:27

Or restating to avoid a possible XY problem: I want the flow returned by -driver to terminate when the flow passed as the second argument to -driver terminates.

leonoel19:11:16

> Is there a way to mark a flow as terminated from within e.g. an m/ap block? there is no way, ap terminates when its last child process terminate > how do I terminate a flow created by m/observe? you can transform it with (m/eduction (take-while #(not= % ::complete)) ,,,)

👍 1
leonoel19:11:40

here is another idea

(defn run-tasks [driver flow]
  (m/reduce (fn [_ x] (prn x)) nil
    (m/ap (m/? ((m/?> flow) driver)))))

(defn with-driver [browser f & args]
  (m/sp
    (let [driver (browser)]
      (try (m/? (apply f driver args))
           (finally (e/quit driver))))))

(def run (with-driver e/chrome run-tasks (m/seed [market-hotel/run #_brooklyn-made/run])))

telekid20:11:21

Yeah that's definitely cleaner. I got stuck in my thinking that I should be using m/observe for RAII

telekid20:11:33

That's great. Here's a parallelized version of run:

(defn run-tasks [driver flow]
  (m/reduce conj []
            (m/ap (m/? ((m/?> flow) driver)))))

(defn with-driver [browser f & args]
  (m/sp
   (let [driver (browser)]
     (try (m/? (apply f driver args))
          (finally (e/quit driver))))))

(defn run [browser tasks parallelism]
  (->> (m/eduction (partition-all parallelism) (map m/seed) (m/seed tasks))
       (m/?> parallelism)
       (with-driver browser run-tasks)
       (m/?)
       (m/ap)
       (m/reduce conj [])))

((run e/chrome [brooklyn-made/run market-hotel/run example-3 example-4] 2)
 prn
 prn)