I'm trying to create a flow that dynamically pulls from other flows based on an input flows latest value. I'd like to cancel its subscription to any flow that it was listening to for a previous value of the input stream. The below is clearly wrong, but I hope conveys the basic meaning I'm after ... is there a clean way of doing what I'm attempting to do, or am I barking up the wrong tree?
(defn switch-between-flows
[inputs]
(ap
(let [n (?> inputs)]
(amb (case n
1 (?> flow-1)
2 (?> flow-2))))))Use ?< in ap (if discrete) or cp (if continuous). Note - with ap you'll need to guard the switch with try/catch, this is a legacy quirk that will eventually be fixed
oh wow, that is perfect and does exactly what I wanted.
I was expecting to need to clean up the old flow (cancel it) whenever the input value changed... this is so much simpler.
I think I need to go read more about the difference between ?> and ?<
For anyone coming along later, here is the working code
(defn switch-between-flows
[inputs]
(cp
(let [n (?< inputs)]
(case n
1 (?< flow-1)
2 (?< flow-2)))))Probably a dumb question; trying to figure out how m/ap works with recursion.
Is loop handled specially, or am I missing something?
I've got an example I've been playing with that works when using loop but doesn't when using a named function.
I've unwrapped the m/amb I was using into the m/?> (m/seed (range 2)) and case to try to figure out the source of my No matching clause: missionary.impl.Ambiguous in the second example.
(m/?
(m/reduce
#(println %2) nil
(m/ap
(loop [x 0]
(if (> x 10)
(m/?> m/none)
(let [y (m/?> (m/seed (range 2)))]
(println y)
(case y
0 x
1 (recur (inc x)))))))))
(m/?
(m/reduce
#(println %2) nil
(m/ap
((fn example [x]
(if (> x 10)
(m/?> m/none)
(let [y (m/?> (m/seed (range 2)))]
(println y)
(case y
0 x
1 (example (inc x))))))
0))))
;; y = #object[missionary.impl.Ambiguous$Branch 0x797ee69c missionary.impl.Ambiguous$Branch@797ee69c]
;; Execution error (IllegalArgumentException) at scratch/eval15485$cr15488-block-0$example (REPL:60).
;; No matching clause: missionary.impl.Ambiguous$Branch@797ee69cThat makes sense; thanks for explaining! 🙂
Curious if it'd make sense to make m/amb expand to have a default value that throws a more meaningful error in situations like this?
The second example is illegal because m/?> is an indirect call. m/ap doesn't rewrite code inside lambdas.
I agree about bad DX, it should be an error in this case https://github.com/leonoel/missionary/issues/127