As a learning practice, I want to implement a timeout function like the https://reactivex.io/documentation/operators/timeout.html.
Do you see any errors? Is there a more idiomatic solution?
I box values because I want to give an identity to each values. The timeout should work even if the flow emit the same value every time.
Gotchas: I have to use ?= when creating twin delayed values to emit them at the same pace.
Question: The ReactiveX timeout ends with an error, how can I end the flow and signal an error? Just by throwing an exception?
Edit: Update snippet, replace the use of System/nanoTime by an AtomicLong to provide an id to each value.
> how can I end the flow and signal an error? Just by throwing an exception?
Yes, if you throw from an m/ap, the exception will be delivered to the consumer of the flow when it takes the next item.
Here is an alternative solution
(defn timeout [delay input]
(m/ap
(let [!last (atom (System/currentTimeMillis))]
(m/amb=
(try (m/? (m/sleep (- (+ delay (m/?< (m/watch !last)))
(System/currentTimeMillis))))
(throw (java.util.concurrent.TimeoutException.))
(catch Cancelled _ (m/amb)))
(let [x (m/?> input)]
(reset! !last (System/currentTimeMillis))
x)))))@kawas your version disables backpressure on input due to (m/?= flow) on L8. If the producer is faster than consumer then ap's internal buffer will grow indefinitely
also it doesn't timeout if the input doesn't emit any value
Ok, I understand. Using ?= should have been some kind of smell warning that my solution was going wrong way 😅
Thanks for the very nice solution and your code review 😀
By the way in your solution, does an Atom is necessary? Do the !last value can be read from multiple threads even if we read input with back pressure?
... oh maybe the m/watch code and the m/amb= code could be different threads
the atom can be manipulated from different threads but never concurrently
I mean the thread calling the reset! also performs the switch