Fork me on GitHub
#missionary
<
2024-04-15
>
Eric Dvorsak09:04:19

When my io-task throws an error, in take up to a minute before the log/error happens, is it a sign that something really wrong is done in the inputs-flow ? or can in be normal when there's quite a big stack of tasks/flows including many many m/via m/block calls?

((m/reduce
      (fn [_ input]
        (m/? (m/via m/blk (io-task input))))
      []
      inputs-flow)
     (fn [_]
       (log/info "Inputs flow consumed"))
     #(log/error "Error during consumption of input flow" %))

leonoel10:04:17

try this pattern instead

((m/reduce (constantly nil)
   (m/ap (let [input (m/?> inputs-flow)]
           (m/? (m/via m/blk (io-task input))))))
 (fn [_] (log/info "Inputs flow consumed"))
 #(log/error "Error during consumption of input flow" %))

xificurC10:04:39

also, what are your requirements? Should the tasks run sequentially or concurrently?

Eric Dvorsak10:04:59

sequentially, the flow is made of lots of tasks that are running concurrently and it ends up in that reduction to save the outputs sequentially

Eric Dvorsak10:04:27

@U053XQP4S Thanks I'll keep this pattern, but it's similar, 2 minutes between the throw and the log

leonoel11:04:28

if there's a propagation delay it is likely due to another part of the pipeline failing to react to the cancellation signal

leonoel11:04:58

maybe some blocking call ignores thread interruption

xificurC11:04:43

unlikely to be a missionary bug, try minimizing your code, adding printlns etc.

Eric Dvorsak15:04:32

Working on that, it seems like this construct might be a culprit:

(defn flow-from-inputs [inputs]
  (m/ap
    (m/?>
     (m/?> ##Inf
           (m/seed
            (mapv (fn [input]
                    (flow-from-input input))
                  inputs))))))
flow-from-input is a function that returns a flow, and the idea is to have a single flow that gets values from multiple flows from the inputs

Eric Dvorsak15:04:27

In manifold I had a stream, and I was creating streams for each value of this stream, then concatenated the streams into a single stream (getting values from a pipeline that was transforming a stream of SSE, starting new streams of SSE from these values and pushing all the final values into a single stream)

xificurC15:04:43

https://github.com/hyperfiddle/electric/blob/45f7881df46f86e91a1730e893a05bed6cf4e728/src/contrib/missionary_contrib.cljc#L6 that matches your definition, minus the flow-from-input. (defn flow-from-inputs [inputs] (apply mx/mix (mapv flow-from-input inputs))). Therefore I don't think the construct itself is problematic

Eric Dvorsak15:04:19

If (count flows) is unknown I assume you can use ##Inf? Or alternatively a lower number if you want to limit concurrency?

Eric Dvorsak15:04:51

does this construct have any advantage/difference with the above appart for being for 2 flows only?

(m/ap
      (m/amb= (m/?> >flow1)
              (m/?> >flow2)))

leonoel15:04:47

it is literally sugar over the above

Eric Dvorsak15:04:04

seems like I have to redo my experiments because somehow println works but taoensso.timbre/log does not in the error callback 🧩

Eric Dvorsak15:04:33

is it possible that it is because it's async and sometimes gets killed before it prints?

leonoel15:04:00

yes, try to wrap your task with m/compel if you want to be sure

Eric Dvorsak15:04:25

((m/sp
          (m/? (m/via m/blk (log/error "generate-questions-flow-failure"))))
        #() #())
this construct in the on-cancel worked but m/compel didn't
((m/compel
   (m/reduce
      (fn [_ input]
        (m/? (m/via m/blk (io-task input))))
      []
      inputs-flow)
     (fn [_]
       (log/info "Inputs flow consumed"))
     #(log/error "Error during consumption of input flow" %)))
Although I might have put it in the wrong place

Eric Dvorsak06:04:25

@U053XQP4S is it possible that your fix from yesteday also fixed this? I had a couple of test runs and so far only a couple seconds max between the throw and the logs

leonoel09:04:44

it is not impossible but I've no idea why a delay is observed

Eric Dvorsak09:04:03

*was 😅

Dallas Surewood16:04:10

Are flows appropriate for creating a queue? It seems like all the examples involve making a flow from a finite literal value, except for m/watch which creates a continuous flow (which discards values. Probably bad for a queue). I'm wondering how I would take infinite input from something like a client and create a queue from that using missionary. Like taking tasks from users that need to be run

leonoel16:04:06

Discrete flows (i.e. not continuous) don't discard values. A flow is basically an adapter between a producer a consumer, so it solves the same problem as a queue. It is possible to create flows from asynchronous event sources, including infinite ones.

Dallas Surewood16:04:52

Didn't I say continuous flows are the ones that discard values? My understanding was discrete flows throw an error if there's no consumer ready

leonoel16:04:44

yes, you're right about continuous flows

leonoel16:04:01

the "no consumer ready" error is specific to m/observe, there are other ways to build discrete flows that fully support backpressure

leonoel16:04:28

the exact recipe will depend on how your input looks like

Dallas Surewood16:04:54

What is the point of using forking without binding it to a symbol like in the code below? It's not being used concurrently. Isn't forking normally a convenience macro for binding a flow value?

(defn forever [task]
  (m/ap (m/? (m/?> (m/seed (repeat task))))))

leonoel16:04:05

It's just more concise. If the value is used in a single place then the binding is unnecessary and thus the fork can be inlined