This page is not created by, affiliated with, or supported by Slack Technologies, Inc.
2024-04-15
Channels
- # announcements (2)
- # beginners (23)
- # calva (22)
- # cider (4)
- # clara (15)
- # clojure (24)
- # clojure-europe (24)
- # clojure-korea (16)
- # clojure-nl (1)
- # clojure-norway (7)
- # clojure-uk (8)
- # clojurescript (30)
- # conjure (1)
- # datahike (1)
- # datomic (7)
- # emacs (10)
- # events (1)
- # fulcro (22)
- # graalvm (18)
- # graphql (6)
- # jobs (1)
- # jobs-discuss (66)
- # lsp (22)
- # malli (27)
- # missionary (31)
- # polylith (7)
- # portal (6)
- # releases (1)
- # remote-jobs (3)
- # ring (9)
- # shadow-cljs (16)
- # squint (1)
- # tools-deps (9)
- # vim (6)
- # xtdb (15)
When my io-task throws an error, in take up to a minute before the log/error happens, is it a sign that something really wrong is done in the inputs-flow ? or can in be normal when there's quite a big stack of tasks/flows including many many m/via m/block
calls?
((m/reduce
(fn [_ input]
(m/? (m/via m/blk (io-task input))))
[]
inputs-flow)
(fn [_]
(log/info "Inputs flow consumed"))
#(log/error "Error during consumption of input flow" %))
try this pattern instead
((m/reduce (constantly nil)
(m/ap (let [input (m/?> inputs-flow)]
(m/? (m/via m/blk (io-task input))))))
(fn [_] (log/info "Inputs flow consumed"))
#(log/error "Error during consumption of input flow" %))
also, what are your requirements? Should the tasks run sequentially or concurrently?
sequentially, the flow is made of lots of tasks that are running concurrently and it ends up in that reduction to save the outputs sequentially
@U053XQP4S Thanks I'll keep this pattern, but it's similar, 2 minutes between the throw and the log
if there's a propagation delay it is likely due to another part of the pipeline failing to react to the cancellation signal
Working on that, it seems like this construct might be a culprit:
(defn flow-from-inputs [inputs]
(m/ap
(m/?>
(m/?> ##Inf
(m/seed
(mapv (fn [input]
(flow-from-input input))
inputs))))))
flow-from-input
is a function that returns a flow, and the idea is to have a single flow that gets values from multiple flows from the inputsIn manifold I had a stream, and I was creating streams for each value of this stream, then concatenated the streams into a single stream (getting values from a pipeline that was transforming a stream of SSE, starting new streams of SSE from these values and pushing all the final values into a single stream)
https://github.com/hyperfiddle/electric/blob/45f7881df46f86e91a1730e893a05bed6cf4e728/src/contrib/missionary_contrib.cljc#L6 that matches your definition, minus the flow-from-input
. (defn flow-from-inputs [inputs] (apply mx/mix (mapv flow-from-input inputs)))
.
Therefore I don't think the construct itself is problematic
If (count flows)
is unknown I assume you can use ##Inf? Or alternatively a lower number if you want to limit concurrency?
does this construct have any advantage/difference with the above appart for being for 2 flows only?
(m/ap
(m/amb= (m/?> >flow1)
(m/?> >flow2)))
seems like I have to redo my experiments because somehow println
works but taoensso.timbre/log
does not in the error callback 🧩
is it possible that it is because it's async and sometimes gets killed before it prints?
((m/sp
(m/? (m/via m/blk (log/error "generate-questions-flow-failure"))))
#() #())
this construct in the on-cancel worked
but m/compel didn't
((m/compel
(m/reduce
(fn [_ input]
(m/? (m/via m/blk (io-task input))))
[]
inputs-flow)
(fn [_]
(log/info "Inputs flow consumed"))
#(log/error "Error during consumption of input flow" %)))
Although I might have put it in the wrong place@U053XQP4S is it possible that your fix from yesteday also fixed this? I had a couple of test runs and so far only a couple seconds max between the throw and the logs
*was 😅
Are flows appropriate for creating a queue? It seems like all the examples involve making a flow from a finite literal value, except for m/watch
which creates a continuous flow (which discards values. Probably bad for a queue). I'm wondering how I would take infinite input from something like a client and create a queue from that using missionary. Like taking tasks from users that need to be run
Discrete flows (i.e. not continuous) don't discard values. A flow is basically an adapter between a producer a consumer, so it solves the same problem as a queue. It is possible to create flows from asynchronous event sources, including infinite ones.
Didn't I say continuous flows are the ones that discard values? My understanding was discrete flows throw an error if there's no consumer ready
the "no consumer ready" error is specific to m/observe
, there are other ways to build discrete flows that fully support backpressure
What is the point of using forking without binding it to a symbol like in the code below? It's not being used concurrently. Isn't forking normally a convenience macro for binding a flow value?
(defn forever [task]
(m/ap (m/? (m/?> (m/seed (repeat task))))))