Is there a nicer way to do this? I want to write to a mbx and at any point and be able to subscribe for a certain duration. This works, but requires me to push a dummy value to offset the (drop 1) eduction.
Instead of m/relieve I would rather spawn a global subscriber bound to the lifecycle of your application, consuming events ASAP. That would ensure events don't accumulate in the mailbox.
(defn with-log [app]
(let [!log (m/mbx)
read-log (m/stream (m/ap (loop [] (m/amb (m/? !log) (recur)))))]
(m/memo (m/join {}
(m/reduce (constantly nil) read-log)
(app !log read-log)))))
Example :
(def cancel
((with-log
(fn [! >]
(def log! !)
(def >log >)
m/never))
prn prn))
(def result (atom []))
((m/timeout (m/reduce (fn [_ x] (swap! result conj x)) nil >log) 3000) prn prn)
(log! :foo)
(log! :bar)
(log! :baz)
@resultThank you!!
Could you explain more what you're trying to do, especially why m/stream is needed and what should happen to the mailbox when there is no subscriber ?
I’m logging console outputs from a browser to a clj back-end via websockets. I have a handler that should be able to check these outputs for a duration. Requests that come in when there are no listeners can be dropped.
I think stream is needed because multiple tasks can be interested in the console output at the same time.
is (drop 1) a hack or is it part of the problem statement ?
It’s a hack because it seems there is always a latest value when a new subscription happens.