core-async

phronmophobic 2025-08-13T00:12:51.630379Z

One of the neat things about core.async is that timeouts are reified as channels. With core.async.flow, there are fewer explicit channels. Are there any resources that talk about how timeouts work within flows or do timeouts exist outside of flow definitions?

neumann 2025-08-13T00:53:58.334359Z

That's an interesting question! I don't know of any resources about using timeouts and core.async.flow. What are some examples where you'd like to use timeouts?

2025-08-13T00:57:45.552399Z

https://git.sr.ht/~hiredman/resderelictae/tree/master/item/irc.clj#L357 makes a process to send a periodic message using a go loop and a timeout channel

phronmophobic 2025-08-13T03:36:57.621649Z

> makes a process to send a periodic message using a go loop and a timeout channel that's kind of what I what I meant by a timeout living "outside" the flow

phronmophobic 2025-08-13T03:38:15.783619Z

@neumann I don't have any examples in mind right now. I've written lots of code with core.async and timeouts are pretty common. I was just curious if anyone had any thoughts or experience regarding how timeouts should be integrated in flows.

2025-08-13T03:38:54.143069Z

I have explored the in ports stuff completely, it seems like it would just work to throw a timeout in there too

phronmophobic 2025-08-13T03:41:07.854669Z

I'm asking less about how you could integrate timeouts and more about how you should integrate timeouts.

phronmophobic 2025-08-13T03:46:43.036189Z

In ports only cover using timeouts for inputs from outside of the flow, but don't cover timeouts between processes within a flow or timeouts when sending information out of a flow. I see how you could do that by just having all the timeouts live outside of the flow, but I'm wondering if there's a better way. My intuition is that there's some way to datafy timeouts inside the flow context so you can get all the benefits of the otherwise pure functions and data that make up flows.

neumann 2025-08-13T04:31:48.818769Z

I have used my own little framework (that pre-dates flow) to make some fairly complex dataflow graphs. I used timeouts in there, and they were generated by the outside and would sent a message into a process. That made sense to me because a timeout is a side effect. That's why I'm finding your question particularly intriguing. It makes me curious what making timeouts a part of the flow spec would look like and lead to.

Ovi Stoica 2025-08-13T08:21:42.517559Z

In simulflow, I introduced the idea of commands for sending realtime audio with a delay (timeout): https://github.com/shipclojure/simulflow/blob/11e889b57de158df0fbba45ed23bb0cce395e108/src/simulflow/transport/out.clj#L58 Having an external process that handles the timeout can maintain the transform function being pure. Example:

(defn proc
  ;; describe
  ([] {:ins {:in "Input"}
       :outs {:out "Out"}})
  ;; init
  ([params]
   (let [commands-chan (chan 1024)
         pass-to-out (chan 1024)]
     ;; outside process that handles timeouts and other impure stuff
     (vthread-loop []
       (when-let [command (!! pass-to-out (:data command))))
       (recur))
     {::flow/out-ports {:command-write commands-chan}
      ::flow/in-ports {:pass-to-out pass-to-out}}))
  ;; transition
  ([{::flow/keys [in-ports out-ports] :as state} transition]
   ;; Cleanup
   (when (= transition ::flow/stop)
       (doseq [port (concat (vals in-ports) (vals out-ports))]
      (a/close! port)))
   state)
  ;; pure, testable transform
  ([state in msg]
   (cond
     (= in :pass-to-out)
     [state {:out [msg]}]

     (= in :in)
     ;; some logic
     [state {:command-write [{:command/kind :command/cool-command
                              :delay-until (+ (u/mono-time) 50)
                              :data {:cool :stuff}}]}])))