I am currently testing with audio IO,
and I wonder what's the recommended way to stop some opened IO and then return its value.
Is it possible to call (cancel) -> do some cleanup (like close io) -> then return the result?
also, i think (cancel) will not cleanup the opened IO?
Thanks!
the code:
(ns testing.debug
(:require [missionary.core :as m])
(:import [javax.sound.sampled
AudioSystem
AudioFormat
DataLine$Info
TargetDataLine
SourceDataLine]))
(def audio-format
(AudioFormat. 44100 ; sample rate
16 ; sample size in bits
1 ; channels (mono)
true ; signed
true)) ; big endian
(defn get-mixer-with-line
"Find a mixer that supports the given line type (TargetDataLine or SourceDataLine)"
[line-class format]
(let [info (DataLine$Info. line-class format)
mixers (AudioSystem/getMixerInfo)]
(->> mixers
(map #(AudioSystem/getMixer %))
(filter #(.isLineSupported % info))
first)))
(defn get-input-line []
(let [line-info (DataLine$Info. TargetDataLine audio-format)
mixer (get-mixer-with-line TargetDataLine audio-format)]
(if mixer
(.getLine ^javax.sound.sampled.Mixer mixer line-info)
(throw (Exception. "No suitable audio input device found")))))
(defn play-audio
"Play audio data from byte array"
[audio-data]
(let [buffer-size (alength audio-data)
line-info (DataLine$Info. SourceDataLine audio-format)
mixer (get-mixer-with-line SourceDataLine audio-format)
source-line (.getLine ^javax.sound.sampled.Mixer mixer line-info)]
(when (nil? source-line)
(throw (Exception. "No output device found supporting the audio format")))
(doto ^SourceDataLine source-line
(.open audio-format)
(.start))
(.write ^SourceDataLine source-line audio-data 0 buffer-size)
(.drain ^SourceDataLine source-line)
(.stop ^SourceDataLine source-line)
(.close ^SourceDataLine source-line)))
(defn record-audio
([filename]
(let [running (atom true)
buffer-size 4096 ; Smaller buffer for more frequent reads
buffer (byte-array buffer-size)
output-stream (java.io.ByteArrayOutputStream.)
^TargetDataLine target-line (doto ^TargetDataLine (get-input-line)
(.open audio-format))]
{:task (m/via m/blk
(println "starting recording")
(.start target-line)
(while @running
(let [bytes-read (.read target-line buffer 0 buffer-size)]
(.write output-stream buffer 0 bytes-read)))
:recording-end)
:stop-fn (fn []
(reset! running false)
(.stop target-line)
(.close target-line)
(let [audio-data (.toByteArray output-stream)]
(when filename
(with-open [audio-input (javax.sound.sampled.AudioInputStream.
(java.io.ByteArrayInputStream. audio-data)
audio-format
(/ (alength audio-data)
(* (. audio-format getFrameSize))))]
(AudioSystem/write
audio-input
javax.sound.sampled.AudioFileFormat$Type/WAVE
(java.io.File. filename))))
audio-data))})))
(comment
(let [{:keys [stop-fn task]} (record-audio nil)
cancel (task #(println "success: " %) #(println "error: " %))]
#_(cancel) ;; how to modify this to cleaup & get results
(do
(Thread/sleep 3000)
(play-audio (stop-fn))
)
)
)hi @leonoel , with this suggestion: > I would rather have a minimal m/observe with just the close logic
(defn record-audio-v5
[]
(m/sp
(let [result (atom nil)
buffer-size 4096
buffer (byte-array buffer-size)
^TargetDataLine target-line (doto ^TargetDataLine (get-input-line)
(.open audio-format))
_ (.start target-line)
record-task (m/reduce conj (m/seed (repeat (m/via m/blk
(let [bytes-read (.read target-line buffer 0 buffer-size)]
bytes-read)))))
cancel-record-task (record-task (fn [v]
(reset! result v))
(fn [_]))]
(m/?
(m/reduce {} nil
(m/observe
(fn [_]
(fn cleanup []
(cancel-record-task)
(.stop target-line)
(.close target-line))))))
@result)))
(let [task (record-audio-v5)
cancel (task #(println "success: " %) #(println "error: " %))
_ (def kill cancel)]
)
(kill)
why would this block the repl before i can call kill?
i think cancel-record-task (record-task ... will not block the sp block?(m/reduce conj (m/seed (repeat x))) is a malformed task, on boot it will perform the reduction as long as there are available values, i.e. forever
I guess what you want is to run the task repeatedly and emit the results as a flow, you need ap for that
(m/ap (m/? (m/?> (m/seed (repeat (m/via m/blk
(let [bytes-read (.read target-line buffer 0 buffer-size)]
bytes-read)))))))Thanks for all your help!
After reading many m/observe example, the common pattern i found is that there is a external producer, and then i add an event listener and feed the data back to the "subject callback".
If i go for the m/observe approach, does that mean i need to create a producer by putting (.read target-line buffer 0 buffer-size) in a while loop in a new Thread inside m/observe?
m/observe can't emit anything after cancellation so it would not work in this case. I would rather have a minimal m/observe with just the close logic, and run it concurrently with the effect running the loop.
thanks for your suggestion! i need to think about that for a while. 🙏
doesn't m/observe have the ability to omit a terminal value when cancelled? it is up to the consumer if they want to look at it or (in the case of a switch) flush-and-discard the terminal value
m/observe, e.g. https://github.com/dustingetz/missionary-quickstart/blob/386026e1535a44dcc5d2e663005b4ae788b7a581/src/quickstart.cljs#L220-L229
also, please format large code snippets using slack attachment feature not preformatted block - cmd-shift-enter on mac to create attachment, or drag drop a file
• m/observe gives you cleanup handlers when the flow is cancelled
• you can dynamically start and cancel flows using if inside m/cp and m/ap
If you want to run some code on cancellation event, you can race the task with an infinite one that runs this code on cleanup. Example with m/never (`m/observe` would work too but may be less idiomatic here)
(defn record-audio
([filename]
(let [buffer-size 4096 ; Smaller buffer for more frequent reads
buffer (byte-array buffer-size)
output-stream (ByteArrayOutputStream.)
^TargetDataLine target-line (doto ^TargetDataLine (get-input-line)
(.open audio-format))]
(m/race (m/sp (try (m/? m/never) (finally (.close target-line))))
(m/via m/blk
(println "starting recording")
(.start target-line)
(loop []
(let [bytes-read (.read target-line buffer 0 buffer-size)]
(when (pos? bytes-read)
(.write output-stream buffer 0 bytes-read)
(recur))))
(.stop target-line)
(let [audio-data (.toByteArray output-stream)]
(when filename
(with-open [audio-input (AudioInputStream.
(ByteArrayInputStream. audio-data)
audio-format
(/ (alength audio-data)
(* (. audio-format getFrameSize))))]
(AudioSystem/write
audio-input
AudioFileFormat$Type/WAVE
(java.io.File. filename))))
audio-data))))))you're saying that finally runs on cancellation in m/sp m/ap m/cp ?
in this example, finally is run right after (m/? m/never) returns, i.e. on cancellation
I do not recommend using finally in m/ap and m/cp because it will run for each value returned by the body, which is usually not what you expect from a cleanup construct
how would you idiomatically run a cleanup effect on cancellation in a cp or ap context, without m/observe?
with flows I would use m/observe