Fork me on GitHub
#missionary
<
2024-04-20
>
kawas21:04:02

As a learning practice, I want to implement a timeout function like the https://reactivex.io/documentation/operators/timeout.html. Do you see any errors? Is there a more idiomatic solution? I box values because I want to give an identity to each values. The timeout should work even if the flow emit the same value every time. Gotchas: I have to use ?= when creating twin delayed values to emit them at the same pace. Question: The ReactiveX timeout ends with an error, how can I end the flow and signal an error? Just by throwing an exception? Edit: Update snippet, replace the use of System/nanoTime by an AtomicLong to provide an id to each value.

Andrew Wilcox23:04:30

> how can I end the flow and signal an error? Just by throwing an exception? Yes, if you throw from an m/ap, the exception will be delivered to the consumer of the flow when it takes the next item.

👍 1
leonoel13:04:41

Here is an alternative solution

(defn timeout [delay input]
  (m/ap
    (let [!last (atom (System/currentTimeMillis))]
      (m/amb=
        (try (m/? (m/sleep (- (+ delay (m/?< (m/watch !last)))
                             (System/currentTimeMillis))))
             (throw (java.util.concurrent.TimeoutException.))
             (catch Cancelled _ (m/amb)))
        (let [x (m/?> input)]
          (reset! !last (System/currentTimeMillis))
          x)))))

🙏 2
leonoel13:04:57

@U0569GLFZ your version disables backpressure on input due to (m/?= flow) on L8. If the producer is faster than consumer then ap's internal buffer will grow indefinitely

leonoel13:04:52

also it doesn't timeout if the input doesn't emit any value

2
kawas13:04:01

Ok, I understand. Using ?= should have been some kind of smell warning that my solution was going wrong way 😅 Thanks for the very nice solution and your code review 😀 By the way in your solution, does an Atom is necessary? Do the !last value can be read from multiple threads even if we read input with back pressure?

kawas13:04:34

... oh maybe the m/watch code and the m/amb= code could be different threads

leonoel13:04:49

the atom can be manipulated from different threads but never concurrently

leonoel14:04:54

I mean the thread calling the reset! also performs the switch

👌 1