missionary

2024-05-18T08:31:54.502709Z

Is there a nicer way to do this? I want to write to a mbx and at any point and be able to subscribe for a certain duration. This works, but requires me to push a dummy value to offset the (drop 1) eduction.

leonoel 2024-05-22T08:59:58.994009Z

Instead of m/relieve I would rather spawn a global subscriber bound to the lifecycle of your application, consuming events ASAP. That would ensure events don't accumulate in the mailbox.

(defn with-log [app]
  (let [!log (m/mbx)
        read-log (m/stream (m/ap (loop [] (m/amb (m/? !log) (recur)))))]
    (m/memo (m/join {}
              (m/reduce (constantly nil) read-log)
              (app !log read-log)))))
Example :
(def cancel
  ((with-log
     (fn [! >]
       (def log! !)
       (def >log >)
       m/never))
   prn prn))

(def result (atom []))
((m/timeout (m/reduce (fn [_ x] (swap! result conj x)) nil >log) 3000) prn prn)
(log! :foo)
(log! :bar)
(log! :baz)
@result

🙏 1
2024-05-22T09:21:04.876469Z

Thank you!!

leonoel 2024-05-21T09:54:23.345149Z

Could you explain more what you're trying to do, especially why m/stream is needed and what should happen to the mailbox when there is no subscriber ?

2024-05-21T13:14:56.388769Z

I’m logging console outputs from a browser to a clj back-end via websockets. I have a handler that should be able to check these outputs for a duration. Requests that come in when there are no listeners can be dropped.

2024-05-21T13:15:35.717459Z

I think stream is needed because multiple tasks can be interested in the console output at the same time.

leonoel 2024-05-21T19:39:48.014799Z

is (drop 1) a hack or is it part of the problem statement ?

2024-05-21T19:58:23.253269Z

It’s a hack because it seems there is always a latest value when a new subscription happens.