I came up with a missionary usecase that I dont yet know how to model. I want to build a flow-switch. Perhaps I have a flow of incoming tasks that comes from the main server, but when that flow stops because the main server is down, then it should switch over to the backup server. So it means that depending on on state a entirely different flow must be consumed. I made an example, that I tried to get to work with fork operator, but it is not working. My example has an atom and a clock-flow. If atom has non nil value, then the switch should output values from the atom. If the atom is nil, then it should output values from the clock-flow. demo.switch namespace is my test that failes to switch back to the atom-flow when it has values.
It depends how you want to react to new values when the flow is not active. The rdv solution works, another option is to share the flow with m/stream or m/signal and add a concurrent consumer to keep the flow alive
The switch operator is ?<. Compare with ?>, which waits for inner branch to terminate. If the inner branch pulls an infinite flow (here, the clock) then the next value will never be seen.
(defn switch [a f]
(m/ap
(try (let [a-cur (m/?< (m/watch a))]
(if a-cur a-cur (m/?> f)))
(catch Cancelled _ (m/amb)))))@rajeev22.18 yes this is the behavior I want. My "real clock" gets calculated based on real changing time, and taht needs to start from scratch, because on start, it needs to get the current system time.
@leonoel what's the recommended way to dynamically switch between two or more flows without restarting? Would you recommend piping the flows to rdvs?
Thanks @leonoel
I just finished writing a flow failover routine that works.
But I guess with the ?< operator this is way easier. hahahah
With your solution, I think the clock flows will restart from 0 every time you switch sources. Is that the behavior you want?
Okay, so I've poked around for a while with Missionary, and am now trying to figure out if it would be the right fit for what I'm doing.
I'm looking for a DAG propagation and maintenance system. Essentially, every non-root node has a list of dependencies and everytime one of them updates, then the function in the node should be run. If that changes the value of that nodes output, then propagation continues.
Looking at this from a Missionary perspective, nodes are flows, but I don't know how to keep the prior values around for comparison.
An ap block seems to be the best way to merge values, so would I then make a transducer that only allows non-successive values out of an eduction?
Finally, it seems that (reasonably so) Missionary is lazy. So I'll have all flows ultimately being merged down into one, which is being reduced with a side-effecting multimethod.
IIUC the question is "how do I stop propagation of a flow when the next value is = to the previous one?"
In the world of continuous flows (where you only care about latest values and can skip intermediate states, e.g. latest-mouse-position, latest-atom-value) you already get this for free with m/latest and m/cp.
In the world of discrete flows (where you have to react on every value change, e.g. transactions, button clicks usually) you can use (m/eduction (dedupe) ..)
That's something I'm still turning over in my head, whether the flows are continuous or discrete. I'm ultimately writing a static site generator, but one with a greater deal of interpage referencing. Inputs are mostly configuration and markdown files changing, which seems like I only care about the most recent version, except I worry that I'd miss something like a template update if I use continuous flows. Edit: I wanted a graph maintenance library, because I could see so many bugs occuring if I tried to manually track and validate the various states.
continuous flows will always process updates, the difference is they can skip values in between. E.g. if your markdown file changes twice in quick succession it might be that only the latest one gets processed. In that case it's preferred though, you don't want to process a stale file anyway.
@shawx538 have you seen clj-reload?
@chromalchemy not that one specifically. I'd been looking at tools.namespace for setting up networks of side effecting flows that wouldn't be duplicated everytime I reloaded a file. Clj-reload being able to do that at a var level seems very nifty. Or am I missing something?