When my io-task throws an error, in take up to a minute before the log/error happens, is it a sign that something really wrong is done in the inputs-flow ? or can in be normal when there's quite a big stack of tasks/flows including many many m/via m/block calls?
((m/reduce
(fn [_ input]
(m/? (m/via m/blk (io-task input))))
[]
inputs-flow)
(fn [_]
(log/info "Inputs flow consumed"))
#(log/error "Error during consumption of input flow" %))
try this pattern instead
((m/reduce (constantly nil)
(m/ap (let [input (m/?> inputs-flow)]
(m/? (m/via m/blk (io-task input))))))
(fn [_] (log/info "Inputs flow consumed"))
#(log/error "Error during consumption of input flow" %))also, what are your requirements? Should the tasks run sequentially or concurrently?
sequentially, the flow is made of lots of tasks that are running concurrently and it ends up in that reduction to save the outputs sequentially
@leonoel Thanks I'll keep this pattern, but it's similar, 2 minutes between the throw and the log
if there's a propagation delay it is likely due to another part of the pipeline failing to react to the cancellation signal
maybe some blocking call ignores thread interruption
unlikely to be a missionary bug, try minimizing your code, adding printlns etc.
Working on that, it seems like this construct might be a culprit:
(defn flow-from-inputs [inputs]
(m/ap
(m/?>
(m/?> ##Inf
(m/seed
(mapv (fn [input]
(flow-from-input input))
inputs))))))
flow-from-input is a function that returns a flow, and the idea is to have a single flow that gets values from multiple flows from the inputsIn manifold I had a stream, and I was creating streams for each value of this stream, then concatenated the streams into a single stream (getting values from a pipeline that was transforming a stream of SSE, starting new streams of SSE from these values and pushing all the final values into a single stream)
https://github.com/hyperfiddle/electric/blob/45f7881df46f86e91a1730e893a05bed6cf4e728/src/contrib/missionary_contrib.cljc#L6 that matches your definition, minus the flow-from-input. (defn flow-from-inputs [inputs] (apply mx/mix (mapv flow-from-input inputs))).
Therefore I don't think the construct itself is problematic
If (count flows) is unknown I assume you can use ##Inf? Or alternatively a lower number if you want to limit concurrency?
yes
does this construct have any advantage/difference with the above appart for being for 2 flows only?
(m/ap
(m/amb= (m/?> >flow1)
(m/?> >flow2)))it is literally sugar over the above
ok
seems like I have to redo my experiments because somehow println works but taoensso.timbre/log does not in the error callback 🧩
is it possible that it is because it's async and sometimes gets killed before it prints?
yes, try to wrap your task with m/compel if you want to be sure
((m/sp
(m/? (m/via m/blk (log/error "generate-questions-flow-failure"))))
#() #())
this construct in the on-cancel worked
but m/compel didn't
((m/compel
(m/reduce
(fn [_ input]
(m/? (m/via m/blk (io-task input))))
[]
inputs-flow)
(fn [_]
(log/info "Inputs flow consumed"))
#(log/error "Error during consumption of input flow" %)))
Although I might have put it in the wrong placeit is not impossible but I've no idea why a delay is observed
*was 😅
@leonoel is it possible that your fix from yesteday also fixed this? I had a couple of test runs and so far only a couple seconds max between the throw and the logs
Are flows appropriate for creating a queue? It seems like all the examples involve making a flow from a finite literal value, except for m/watch which creates a continuous flow (which discards values. Probably bad for a queue). I'm wondering how I would take infinite input from something like a client and create a queue from that using missionary. Like taking tasks from users that need to be run
Discrete flows (i.e. not continuous) don't discard values. A flow is basically an adapter between a producer a consumer, so it solves the same problem as a queue. It is possible to create flows from asynchronous event sources, including infinite ones.
Didn't I say continuous flows are the ones that discard values? My understanding was discrete flows throw an error if there's no consumer ready
yes, you're right about continuous flows
the "no consumer ready" error is specific to m/observe, there are other ways to build discrete flows that fully support backpressure
the exact recipe will depend on how your input looks like
What is the point of using forking without binding it to a symbol like in the code below? It's not being used concurrently. Isn't forking normally a convenience macro for binding a flow value?
(defn forever [task]
(m/ap (m/? (m/?> (m/seed (repeat task))))))
It's just more concise. If the value is used in a single place then the binding is unnecessary and thus the fork can be inlined