Fork me on GitHub
Eric Dvorsak09:04:19

When my io-task throws an error, in take up to a minute before the log/error happens, is it a sign that something really wrong is done in the inputs-flow ? or can in be normal when there's quite a big stack of tasks/flows including many many m/via m/block calls?

      (fn [_ input]
        (m/? (m/via m/blk (io-task input))))
     (fn [_]
       (log/info "Inputs flow consumed"))
     #(log/error "Error during consumption of input flow" %))


try this pattern instead

((m/reduce (constantly nil)
   (m/ap (let [input (m/?> inputs-flow)]
           (m/? (m/via m/blk (io-task input))))))
 (fn [_] (log/info "Inputs flow consumed"))
 #(log/error "Error during consumption of input flow" %))


also, what are your requirements? Should the tasks run sequentially or concurrently?

Eric Dvorsak10:04:59

sequentially, the flow is made of lots of tasks that are running concurrently and it ends up in that reduction to save the outputs sequentially

Eric Dvorsak10:04:27

@U053XQP4S Thanks I'll keep this pattern, but it's similar, 2 minutes between the throw and the log


if there's a propagation delay it is likely due to another part of the pipeline failing to react to the cancellation signal


maybe some blocking call ignores thread interruption


unlikely to be a missionary bug, try minimizing your code, adding printlns etc.

Eric Dvorsak15:04:32

Working on that, it seems like this construct might be a culprit:

(defn flow-from-inputs [inputs]
     (m/?> ##Inf
            (mapv (fn [input]
                    (flow-from-input input))
flow-from-input is a function that returns a flow, and the idea is to have a single flow that gets values from multiple flows from the inputs

Eric Dvorsak15:04:27

In manifold I had a stream, and I was creating streams for each value of this stream, then concatenated the streams into a single stream (getting values from a pipeline that was transforming a stream of SSE, starting new streams of SSE from these values and pushing all the final values into a single stream)

xificurC15:04:43 that matches your definition, minus the flow-from-input. (defn flow-from-inputs [inputs] (apply mx/mix (mapv flow-from-input inputs))). Therefore I don't think the construct itself is problematic

Eric Dvorsak15:04:19

If (count flows) is unknown I assume you can use ##Inf? Or alternatively a lower number if you want to limit concurrency?

Eric Dvorsak15:04:51

does this construct have any advantage/difference with the above appart for being for 2 flows only?

      (m/amb= (m/?> >flow1)
              (m/?> >flow2)))


it is literally sugar over the above

Eric Dvorsak15:04:04

seems like I have to redo my experiments because somehow println works but taoensso.timbre/log does not in the error callback 🧩

Eric Dvorsak15:04:33

is it possible that it is because it's async and sometimes gets killed before it prints?


yes, try to wrap your task with m/compel if you want to be sure

Eric Dvorsak15:04:25

          (m/? (m/via m/blk (log/error "generate-questions-flow-failure"))))
        #() #())
this construct in the on-cancel worked but m/compel didn't
      (fn [_ input]
        (m/? (m/via m/blk (io-task input))))
     (fn [_]
       (log/info "Inputs flow consumed"))
     #(log/error "Error during consumption of input flow" %)))
Although I might have put it in the wrong place

Eric Dvorsak06:04:25

@U053XQP4S is it possible that your fix from yesteday also fixed this? I had a couple of test runs and so far only a couple seconds max between the throw and the logs


it is not impossible but I've no idea why a delay is observed

Eric Dvorsak09:04:03

*was 😅

Dallas Surewood16:04:10

Are flows appropriate for creating a queue? It seems like all the examples involve making a flow from a finite literal value, except for m/watch which creates a continuous flow (which discards values. Probably bad for a queue). I'm wondering how I would take infinite input from something like a client and create a queue from that using missionary. Like taking tasks from users that need to be run


Discrete flows (i.e. not continuous) don't discard values. A flow is basically an adapter between a producer a consumer, so it solves the same problem as a queue. It is possible to create flows from asynchronous event sources, including infinite ones.

Dallas Surewood16:04:52

Didn't I say continuous flows are the ones that discard values? My understanding was discrete flows throw an error if there's no consumer ready


yes, you're right about continuous flows


the "no consumer ready" error is specific to m/observe, there are other ways to build discrete flows that fully support backpressure


the exact recipe will depend on how your input looks like

Dallas Surewood16:04:54

What is the point of using forking without binding it to a symbol like in the code below? It's not being used concurrently. Isn't forking normally a convenience macro for binding a flow value?

(defn forever [task]
  (m/ap (m/? (m/?> (m/seed (repeat task))))))


It's just more concise. If the value is used in a single place then the binding is unnecessary and thus the fork can be inlined