missionary

stagmoose 2025-02-16T08:18:11.085089Z

I am currently testing with audio IO, and I wonder what's the recommended way to stop some opened IO and then return its value. Is it possible to call (cancel) -> do some cleanup (like close io) -> then return the result? also, i think (cancel) will not cleanup the opened IO? Thanks! the code:

(ns testing.debug
  (:require [missionary.core :as m])
  (:import [javax.sound.sampled
            AudioSystem
            AudioFormat
            DataLine$Info
            TargetDataLine
            SourceDataLine]))

 (def audio-format
  (AudioFormat. 44100  ; sample rate
                16     ; sample size in bits
                1      ; channels (mono)
                true   ; signed
                true)) ; big endian

 (defn get-mixer-with-line
  "Find a mixer that supports the given line type (TargetDataLine or SourceDataLine)"
  [line-class format]
  (let [info (DataLine$Info. line-class format)
        mixers (AudioSystem/getMixerInfo)]
    (->> mixers
         (map #(AudioSystem/getMixer %))
         (filter #(.isLineSupported % info))
         first)))

(defn get-input-line []
  (let [line-info (DataLine$Info. TargetDataLine audio-format)
        mixer (get-mixer-with-line TargetDataLine audio-format)]
    (if mixer
      (.getLine ^javax.sound.sampled.Mixer mixer line-info)
      (throw (Exception. "No suitable audio input device found")))))

(defn play-audio
  "Play audio data from byte array"
  [audio-data]
  (let [buffer-size (alength audio-data)
        line-info (DataLine$Info. SourceDataLine audio-format)
        mixer (get-mixer-with-line SourceDataLine audio-format)
        source-line (.getLine ^javax.sound.sampled.Mixer mixer line-info)]
    
    (when (nil? source-line)
      (throw (Exception. "No output device found supporting the audio format")))
    
    (doto ^SourceDataLine source-line
      (.open audio-format)
      (.start))
    
    (.write ^SourceDataLine source-line audio-data 0 buffer-size)
    
    (.drain ^SourceDataLine source-line)
    (.stop ^SourceDataLine source-line)
    (.close ^SourceDataLine source-line)))
 
 (defn record-audio
  ([filename]
   (let [running (atom true)
         buffer-size 4096  ; Smaller buffer for more frequent reads
         buffer (byte-array buffer-size)
         output-stream (java.io.ByteArrayOutputStream.)
         ^TargetDataLine target-line (doto ^TargetDataLine (get-input-line)
                                       (.open audio-format))]
     {:task (m/via m/blk
                   (println "starting recording")
                   (.start target-line)
                   (while @running
                     (let [bytes-read (.read target-line buffer 0 buffer-size)]
                       (.write output-stream buffer 0 bytes-read)))
                   :recording-end)
      :stop-fn (fn []
                 (reset! running false)
                 (.stop target-line)
                 (.close target-line)
                 (let [audio-data (.toByteArray output-stream)]
                   (when filename
                     (with-open [audio-input (javax.sound.sampled.AudioInputStream.
                                              (java.io.ByteArrayInputStream. audio-data)
                                              audio-format
                                              (/ (alength audio-data)
                                                 (* (. audio-format getFrameSize))))]
                       (AudioSystem/write
                        audio-input
                        javax.sound.sampled.AudioFileFormat$Type/WAVE
                        (java.io.File. filename))))
                   audio-data))})))


(comment
  (let [{:keys [stop-fn task]} (record-audio nil)
        cancel (task #(println "success: " %) #(println "error: " %))]
    #_(cancel) ;; how to modify this to cleaup & get results

    (do
      (Thread/sleep 3000)
      (play-audio (stop-fn))
      )
    )
  )

stagmoose 2025-02-19T13:35:34.610529Z

hi @leonoel , with this suggestion: > I would rather have a minimal m/observe with just the close logic

(defn record-audio-v5
  []
  (m/sp
   (let [result (atom nil)
         buffer-size 4096
         buffer (byte-array buffer-size)
         ^TargetDataLine target-line (doto ^TargetDataLine (get-input-line)
                                       (.open audio-format))
         _ (.start target-line)
         record-task (m/reduce conj (m/seed (repeat (m/via m/blk
                                                           (let [bytes-read (.read target-line buffer 0 buffer-size)]
                                                             bytes-read)))))
         cancel-record-task (record-task (fn [v]
                             (reset! result v))
                           (fn [_]))]
     (m/?
      (m/reduce {} nil
                (m/observe
                 (fn [_]
                   (fn cleanup []
                     (cancel-record-task)
                     (.stop target-line)
                     (.close target-line))))))
     @result)))

(let [task (record-audio-v5)
      cancel (task #(println "success: " %) #(println "error: " %))
      _ (def kill cancel)]
  )

(kill)
why would this block the repl before i can call kill? i think cancel-record-task (record-task ... will not block the sp block?

leonoel 2025-02-20T19:52:40.696229Z

(m/reduce conj (m/seed (repeat x))) is a malformed task, on boot it will perform the reduction as long as there are available values, i.e. forever

leonoel 2025-02-20T19:55:23.938419Z

I guess what you want is to run the task repeatedly and emit the results as a flow, you need ap for that

(m/ap (m/? (m/?> (m/seed (repeat (m/via m/blk
                                   (let [bytes-read (.read target-line buffer 0 buffer-size)]
                                     bytes-read)))))))

stagmoose 2025-02-17T14:56:40.208999Z

Thanks for all your help! After reading many m/observe example, the common pattern i found is that there is a external producer, and then i add an event listener and feed the data back to the "subject callback". If i go for the m/observe approach, does that mean i need to create a producer by putting (.read target-line buffer 0 buffer-size) in a while loop in a new Thread inside m/observe?

leonoel 2025-02-17T15:03:05.001289Z

m/observe can't emit anything after cancellation so it would not work in this case. I would rather have a minimal m/observe with just the close logic, and run it concurrently with the effect running the loop.

👀 1
🙏 1
stagmoose 2025-02-17T15:09:01.955779Z

thanks for your suggestion! i need to think about that for a while. 🙏

Dustin Getz (Hyperfiddle) 2025-02-17T15:31:06.690739Z

doesn't m/observe have the ability to omit a terminal value when cancelled? it is up to the consumer if they want to look at it or (in the case of a switch) flush-and-discard the terminal value

Dustin Getz (Hyperfiddle) 2025-02-16T12:13:43.886679Z

also, please format large code snippets using slack attachment feature not preformatted block - cmd-shift-enter on mac to create attachment, or drag drop a file

👌 1
Dustin Getz (Hyperfiddle) 2025-02-16T12:15:37.301229Z

• m/observe gives you cleanup handlers when the flow is cancelled • you can dynamically start and cancel flows using if inside m/cp and m/ap

leonoel 2025-02-16T15:12:39.349339Z

If you want to run some code on cancellation event, you can race the task with an infinite one that runs this code on cleanup. Example with m/never (`m/observe` would work too but may be less idiomatic here)

(defn record-audio
  ([filename]
   (let [buffer-size 4096  ; Smaller buffer for more frequent reads
         buffer (byte-array buffer-size)
         output-stream (ByteArrayOutputStream.)
         ^TargetDataLine target-line (doto ^TargetDataLine (get-input-line)
                                       (.open audio-format))]
     (m/race (m/sp (try (m/? m/never) (finally (.close target-line))))
       (m/via m/blk
         (println "starting recording")
         (.start target-line)
         (loop []
           (let [bytes-read (.read target-line buffer 0 buffer-size)]
             (when (pos? bytes-read)
               (.write output-stream buffer 0 bytes-read)
               (recur))))
         (.stop target-line)
         (let [audio-data (.toByteArray output-stream)]
           (when filename
             (with-open [audio-input (AudioInputStream.
                                       (ByteArrayInputStream. audio-data)
                                       audio-format
                                       (/ (alength audio-data)
                                         (* (. audio-format getFrameSize))))]
               (AudioSystem/write
                 audio-input
                 AudioFileFormat$Type/WAVE
                 (java.io.File. filename))))
           audio-data))))))

Dustin Getz (Hyperfiddle) 2025-02-16T15:14:54.912409Z

you're saying that finally runs on cancellation in m/sp m/ap m/cp ?

leonoel 2025-02-16T15:16:31.770339Z

in this example, finally is run right after (m/? m/never) returns, i.e. on cancellation

💡 1
leonoel 2025-02-16T15:18:53.086389Z

I do not recommend using finally in m/ap and m/cp because it will run for each value returned by the body, which is usually not what you expect from a cleanup construct

Dustin Getz (Hyperfiddle) 2025-02-16T15:19:31.496629Z

how would you idiomatically run a cleanup effect on cancellation in a cp or ap context, without m/observe?

leonoel 2025-02-16T15:22:53.360059Z

with flows I would use m/observe