Fork me on GitHub
#missionary
<
2023-07-15
>
Reily Siegel14:07:38

Is there a way to combine two flows like (?> (amb= f1 f2)), but where if one flow is cancelled terminates, the other is cancelled? If it helps for a solution, only the outputs of the first flow are needed, the seconds flow is only used to run side effects. However, the flows have to be unsynchronized unlike with zip.

Reily Siegel14:07:41

I'm trying to create a function that runs a process, where it takes as an argument a flow to be written to stdin, and returns a flow of lines from stdout. So f1 is the flow of stdout, and f2 is an AP reading from the in flow and writing to stdin.

leonoel14:07:54

I detect some wording mismatch here. cancellation is a signal sent to a process from outside, this is not the same as a process terminating spontaneously. As I understand it, f1 is able to self-terminate and you want to cancel f2 when that happens, right ?

leonoel14:07:22

One way to achieve that is to make f1 crash instead of terminating successfully

Dustin Getz15:07:56

mix will race N flows and collect the results, if some flow crashes the siblings will be cancelled. Note it uses the alternate arity of ?> which specifies parallelism factor (defn mix [& flows] (m/ap (m/?> (m/?> (count flows) (m/seed flows)))))

Reily Siegel15:07:04

Thanks, you were exactly right about me getting my terminology mixed up. Changing the flow reading from stdout from

(-> p
    :out
    io/reader
    line-seq
    m/seed)
to
(m/ap
   (m/?>
    (m/amb (-> p
               :out
               io/reader
               line-seq
               m/seed)
           (m/ap (throw Cancelled)))))
seems to do what I want.

leonoel16:07:38

you should not consume line-seq with m/seed because it is blocking, consider this pattern instead https://clojurians.slack.com/archives/CL85MBPEF/p1665862256595759?thread_ts=1665835286.602949&amp;cid=CL85MBPEF

👍 2
kawas22:07:35

Hi all, I have been playing with flows and I don't understand why this gist doesn't do the same amount of request for each version of items-flow? Just using seed seems to eagerly make an extra request. If I add a tiny sleep time (m/sleep 0 i) it makes the right amount of request. Is it related to the advice of not using seed for blocking stuff?

(defn fetch-item-task [idx]
  (m/via m/blk
         (println "costly request" idx)
         (Thread/sleep 1000) idx))

(defn items-flow []
  (m/ap (m/? (let [i (m/?> (m/seed (range)))]
               (fetch-item-task i)))))

(defn items-flow' []
  (m/ap (m/? (let [i (m/?> (m/seed (range)))]
               (fetch-item-task (m/? (m/sleep 0 i)))))))

(comment
  ;; Bad: This code will make 3 requests!
  (->> (items-flow) (m/eduction (take 2))
       (m/reduce conj []) m/?)

  ;; Good: This code will make 2 requests :)
  (->> (items-flow') (m/eduction (take 2))
       (m/reduce conj []) m/?))

leonoel09:07:53

> Is it related to the advice of not using seed for blocking stuff? No, seed is fine here because range is not blocking. Third request is started and immediately cancelled. Event sequence is : 1. ap ready to transfer second item 2. eduction transfers second item from ap 3. ap starts third request because its output buffer is now free 4. eduction processes item 2 and cancels ap due to early termination 5. ap cancels third request 6. third request crashes, ap crashes, eduction terminates, reduce terminates What you observed is sleep intercepting cancellation and crashing, preventing the request to start. It is technically a race condition, one other valid outcome could be sleep completing before cancellation and starting the request. In any case, the request will be eventually cancelled, interrupting (Thread/sleep 1000).

🙏 2