This page is not created by, affiliated with, or supported by Slack Technologies, Inc.
2023-07-15
Channels
- # announcements (1)
- # babashka (4)
- # beginners (4)
- # calva (42)
- # clerk (2)
- # cljdoc (1)
- # clojure (71)
- # clojure-denver (3)
- # clojure-europe (4)
- # clojure-uk (11)
- # clojurescript (1)
- # cursive (12)
- # deps-new (4)
- # emacs (3)
- # hyperfiddle (46)
- # java (1)
- # jobs (2)
- # jvm (16)
- # missionary (10)
- # polylith (5)
- # releases (1)
- # remote-jobs (2)
- # scittle (6)
- # sql (7)
- # transit (10)
Is there a way to combine two flows like (?> (amb= f1 f2))
, but where if one flow is cancelled terminates, the other is cancelled? If it helps for a solution, only the outputs of the first flow are needed, the seconds flow is only used to run side effects. However, the flows have to be unsynchronized unlike with zip.
I'm trying to create a function that runs a process, where it takes as an argument a flow to be written to stdin, and returns a flow of lines from stdout. So f1 is the flow of stdout, and f2 is an AP reading from the in flow and writing to stdin.
I detect some wording mismatch here. cancellation is a signal sent to a process from outside, this is not the same as a process terminating spontaneously. As I understand it, f1
is able to self-terminate and you want to cancel f2
when that happens, right ?
mix
will race N flows and collect the results, if some flow crashes the siblings will be cancelled. Note it uses the alternate arity of ?>
which specifies parallelism factor
(defn mix [& flows] (m/ap (m/?> (m/?> (count flows) (m/seed flows)))))
Thanks, you were exactly right about me getting my terminology mixed up. Changing the flow reading from stdout from
(-> p
:out
io/reader
line-seq
m/seed)
to
(m/ap
(m/?>
(m/amb (-> p
:out
io/reader
line-seq
m/seed)
(m/ap (throw Cancelled)))))
seems to do what I want.you should not consume line-seq
with m/seed
because it is blocking, consider this pattern instead
https://clojurians.slack.com/archives/CL85MBPEF/p1665862256595759?thread_ts=1665835286.602949&cid=CL85MBPEF
Hi all,
I have been playing with flows and I don't understand why this gist doesn't do the same amount of request for each version of items-flow
?
Just using seed
seems to eagerly make an extra request. If I add a tiny sleep time (m/sleep 0 i)
it makes the right amount of request.
Is it related to the advice of not using seed
for blocking stuff?
(defn fetch-item-task [idx]
(m/via m/blk
(println "costly request" idx)
(Thread/sleep 1000) idx))
(defn items-flow []
(m/ap (m/? (let [i (m/?> (m/seed (range)))]
(fetch-item-task i)))))
(defn items-flow' []
(m/ap (m/? (let [i (m/?> (m/seed (range)))]
(fetch-item-task (m/? (m/sleep 0 i)))))))
(comment
;; Bad: This code will make 3 requests!
(->> (items-flow) (m/eduction (take 2))
(m/reduce conj []) m/?)
;; Good: This code will make 2 requests :)
(->> (items-flow') (m/eduction (take 2))
(m/reduce conj []) m/?))
> Is it related to the advice of not using seed
for blocking stuff?
No, seed
is fine here because range
is not blocking.
Third request is started and immediately cancelled. Event sequence is :
1. ap
ready to transfer second item
2. eduction
transfers second item from ap
3. ap
starts third request because its output buffer is now free
4. eduction
processes item 2 and cancels ap
due to early termination
5. ap
cancels third request
6. third request crashes, ap
crashes, eduction
terminates, reduce
terminates
What you observed is sleep
intercepting cancellation and crashing, preventing the request to start. It is technically a race condition, one other valid outcome could be sleep
completing before cancellation and starting the request. In any case, the request will be eventually cancelled, interrupting (Thread/sleep 1000)
.