This page is not created by, affiliated with, or supported by Slack Technologies, Inc.
2024-04-20
Channels
- # announcements (4)
- # babashka (10)
- # beginners (8)
- # calva (13)
- # cherry (20)
- # cider (9)
- # clojure (11)
- # clojure-austin (2)
- # clojure-europe (3)
- # clojure-norway (3)
- # clojurescript (1)
- # datomic (5)
- # dev-tooling (9)
- # fulcro (5)
- # hyperfiddle (5)
- # instaparse (1)
- # lingy (1)
- # malli (21)
- # matrix (1)
- # missionary (9)
- # off-topic (12)
- # releases (2)
- # ring-swagger (8)
- # scittle (9)
- # shadow-cljs (20)
As a learning practice, I want to implement a timeout
function like the https://reactivex.io/documentation/operators/timeout.html.
Do you see any errors? Is there a more idiomatic solution?
I box values because I want to give an identity to each values. The timeout should work even if the flow emit the same value every time.
Gotchas: I have to use ?=
when creating twin delayed values to emit them at the same pace.
Question: The ReactiveX timeout ends with an error, how can I end the flow and signal an error? Just by throwing an exception?
Edit: Update snippet, replace the use of System/nanoTime by an AtomicLong to provide an id to each value.
> how can I end the flow and signal an error? Just by throwing an exception?
Yes, if you throw from an m/ap
, the exception will be delivered to the consumer of the flow when it takes the next item.
Here is an alternative solution
(defn timeout [delay input]
(m/ap
(let [!last (atom (System/currentTimeMillis))]
(m/amb=
(try (m/? (m/sleep (- (+ delay (m/?< (m/watch !last)))
(System/currentTimeMillis))))
(throw (java.util.concurrent.TimeoutException.))
(catch Cancelled _ (m/amb)))
(let [x (m/?> input)]
(reset! !last (System/currentTimeMillis))
x)))))
@U0569GLFZ your version disables backpressure on input due to (m/?= flow)
on L8. If the producer is faster than consumer then ap
's internal buffer will grow indefinitely
Ok, I understand. Using ?=
should have been some kind of smell warning that my solution was going wrong way 😅
Thanks for the very nice solution and your code review 😀
By the way in your solution, does an Atom is necessary? Do the !last
value can be read from multiple threads even if we read input with back pressure?