missionary

Eric Dvorsak 2024-04-15T09:40:19.930899Z

When my io-task throws an error, in take up to a minute before the log/error happens, is it a sign that something really wrong is done in the inputs-flow ? or can in be normal when there's quite a big stack of tasks/flows including many many m/via m/block calls?

((m/reduce
      (fn [_ input]
        (m/? (m/via m/blk (io-task input))))
      []
      inputs-flow)
     (fn [_]
       (log/info "Inputs flow consumed"))
     #(log/error "Error during consumption of input flow" %))

leonoel 2024-04-15T10:01:17.036629Z

try this pattern instead

((m/reduce (constantly nil)
   (m/ap (let [input (m/?> inputs-flow)]
           (m/? (m/via m/blk (io-task input))))))
 (fn [_] (log/info "Inputs flow consumed"))
 #(log/error "Error during consumption of input flow" %))

xificurC 2024-04-15T10:12:39.846419Z

also, what are your requirements? Should the tasks run sequentially or concurrently?

Eric Dvorsak 2024-04-15T10:35:59.946409Z

sequentially, the flow is made of lots of tasks that are running concurrently and it ends up in that reduction to save the outputs sequentially

Eric Dvorsak 2024-04-15T10:41:27.916589Z

@leonoel Thanks I'll keep this pattern, but it's similar, 2 minutes between the throw and the log

leonoel 2024-04-15T11:14:28.057459Z

if there's a propagation delay it is likely due to another part of the pipeline failing to react to the cancellation signal

leonoel 2024-04-15T11:39:58.964799Z

maybe some blocking call ignores thread interruption

xificurC 2024-04-15T11:41:43.062609Z

unlikely to be a missionary bug, try minimizing your code, adding printlns etc.

Eric Dvorsak 2024-04-15T15:00:32.791829Z

Working on that, it seems like this construct might be a culprit:

(defn flow-from-inputs [inputs]
  (m/ap
    (m/?>
     (m/?> ##Inf
           (m/seed
            (mapv (fn [input]
                    (flow-from-input input))
                  inputs))))))
flow-from-input is a function that returns a flow, and the idea is to have a single flow that gets values from multiple flows from the inputs

Eric Dvorsak 2024-04-15T15:04:27.315879Z

In manifold I had a stream, and I was creating streams for each value of this stream, then concatenated the streams into a single stream (getting values from a pipeline that was transforming a stream of SSE, starting new streams of SSE from these values and pushing all the final values into a single stream)

xificurC 2024-04-15T15:16:43.400959Z

https://github.com/hyperfiddle/electric/blob/45f7881df46f86e91a1730e893a05bed6cf4e728/src/contrib/missionary_contrib.cljc#L6 that matches your definition, minus the flow-from-input. (defn flow-from-inputs [inputs] (apply mx/mix (mapv flow-from-input inputs))). Therefore I don't think the construct itself is problematic

Eric Dvorsak 2024-04-15T15:19:19.838679Z

If (count flows) is unknown I assume you can use ##Inf? Or alternatively a lower number if you want to limit concurrency?

leonoel 2024-04-15T15:19:34.103999Z

yes

Eric Dvorsak 2024-04-15T15:23:51.345139Z

does this construct have any advantage/difference with the above appart for being for 2 flows only?

(m/ap
      (m/amb= (m/?> >flow1)
              (m/?> >flow2)))

leonoel 2024-04-15T15:24:47.735669Z

it is literally sugar over the above

Eric Dvorsak 2024-04-15T15:25:12.045569Z

ok

Eric Dvorsak 2024-04-15T15:26:04.583689Z

seems like I have to redo my experiments because somehow println works but taoensso.timbre/log does not in the error callback 🧩

Eric Dvorsak 2024-04-15T15:28:33.171129Z

is it possible that it is because it's async and sometimes gets killed before it prints?

leonoel 2024-04-15T15:30:00.171019Z

yes, try to wrap your task with m/compel if you want to be sure

Eric Dvorsak 2024-04-15T15:33:25.651949Z

((m/sp
          (m/? (m/via m/blk (log/error "generate-questions-flow-failure"))))
        #() #())
this construct in the on-cancel worked but m/compel didn't
((m/compel
   (m/reduce
      (fn [_ input]
        (m/? (m/via m/blk (io-task input))))
      []
      inputs-flow)
     (fn [_]
       (log/info "Inputs flow consumed"))
     #(log/error "Error during consumption of input flow" %)))
Although I might have put it in the wrong place

leonoel 2024-04-18T09:21:44.786949Z

it is not impossible but I've no idea why a delay is observed

Eric Dvorsak 2024-04-18T09:57:03.756339Z

*was 😅

Eric Dvorsak 2024-04-18T06:23:25.354649Z

@leonoel is it possible that your fix from yesteday also fixed this? I had a couple of test runs and so far only a couple seconds max between the throw and the logs

Dallas Surewood 2024-04-15T16:27:10.622539Z

Are flows appropriate for creating a queue? It seems like all the examples involve making a flow from a finite literal value, except for m/watch which creates a continuous flow (which discards values. Probably bad for a queue). I'm wondering how I would take infinite input from something like a client and create a queue from that using missionary. Like taking tasks from users that need to be run

leonoel 2024-04-15T16:35:06.039389Z

Discrete flows (i.e. not continuous) don't discard values. A flow is basically an adapter between a producer a consumer, so it solves the same problem as a queue. It is possible to create flows from asynchronous event sources, including infinite ones.

Dallas Surewood 2024-04-15T16:35:52.630169Z

Didn't I say continuous flows are the ones that discard values? My understanding was discrete flows throw an error if there's no consumer ready

leonoel 2024-04-15T16:36:44.165489Z

yes, you're right about continuous flows

leonoel 2024-04-15T16:38:01.736089Z

the "no consumer ready" error is specific to m/observe, there are other ways to build discrete flows that fully support backpressure

leonoel 2024-04-15T16:38:28.642299Z

the exact recipe will depend on how your input looks like

Dallas Surewood 2024-04-15T16:37:54.394189Z

What is the point of using forking without binding it to a symbol like in the code below? It's not being used concurrently. Isn't forking normally a convenience macro for binding a flow value?

(defn forever [task]
  (m/ap (m/? (m/?> (m/seed (repeat task))))))

leonoel 2024-04-15T16:47:05.694759Z

It's just more concise. If the value is used in a single place then the binding is unnecessary and thus the fork can be inlined