missionary

2024-04-20T21:20:02.628569Z

As a learning practice, I want to implement a timeout function like the https://reactivex.io/documentation/operators/timeout.html. Do you see any errors? Is there a more idiomatic solution? I box values because I want to give an identity to each values. The timeout should work even if the flow emit the same value every time. Gotchas: I have to use ?= when creating twin delayed values to emit them at the same pace. Question: The ReactiveX timeout ends with an error, how can I end the flow and signal an error? Just by throwing an exception? Edit: Update snippet, replace the use of System/nanoTime by an AtomicLong to provide an id to each value.

Andrew Wilcox 2024-04-20T23:59:30.717539Z

> how can I end the flow and signal an error? Just by throwing an exception? Yes, if you throw from an m/ap, the exception will be delivered to the consumer of the flow when it takes the next item.

👍 1
leonoel 2024-04-22T13:23:41.655649Z

Here is an alternative solution

(defn timeout [delay input]
  (m/ap
    (let [!last (atom (System/currentTimeMillis))]
      (m/amb=
        (try (m/? (m/sleep (- (+ delay (m/?< (m/watch !last)))
                             (System/currentTimeMillis))))
             (throw (java.util.concurrent.TimeoutException.))
             (catch Cancelled _ (m/amb)))
        (let [x (m/?> input)]
          (reset! !last (System/currentTimeMillis))
          x)))))

🙏 1
leonoel 2024-04-22T13:39:57.457369Z

@kawas your version disables backpressure on input due to (m/?= flow) on L8. If the producer is faster than consumer then ap's internal buffer will grow indefinitely

leonoel 2024-04-22T13:42:52.605089Z

also it doesn't timeout if the input doesn't emit any value

🤦 1
2024-04-22T13:49:01.592699Z

Ok, I understand. Using ?= should have been some kind of smell warning that my solution was going wrong way 😅 Thanks for the very nice solution and your code review 😀 By the way in your solution, does an Atom is necessary? Do the !last value can be read from multiple threads even if we read input with back pressure?

2024-04-22T13:50:34.813539Z

... oh maybe the m/watch code and the m/amb= code could be different threads

leonoel 2024-04-22T13:57:49.976089Z

the atom can be manipulated from different threads but never concurrently

leonoel 2024-04-22T14:00:54.850329Z

I mean the thread calling the reset! also performs the switch

👌 1