Hello everyone!
Why this start! function starts processing incoming queue messages only when third message arrive? In correct order and with parallelism I need, but waiting for third one to start... =(
What I want to achieve: messages in every group (same group-id) are processed sequentially, but all groups in parallel. Maybe this approach is not the best, but I tried different ones and every was "on hold" until third message arrive...
Maybe you can suggest me any idiomatic way?
Thank you. I’ll try and reply
I tried code like this. But unfortunately it still waiting for third message from queue to start processing (processing starts from first one).
• queue-message->payload and get-group-id functions are just simple get-in s under the hood;
• processor/processor is a function taking incoming data and returning m/sp task.
It does not look important, but FYI: I am trying with messages with same group-id
sounds like your producer is dropping messages because the consumer is not ready. i'd check if the queue is buffered.
is pull-message a blocking function ?
@leonoel, oh yes! pull-message is blocking function! That could be the cause. But if I trying to wrap it as #(m/? (m/via m/blk (pull-message))) it fails with
Exception in thread "missionary blk-1" java.lang.NullPointerException: Cannot read field "prev" because "c" is null
at missionary.impl.Ambiguous.done(Ambiguous.java:277)
at missionary.impl.Ambiguous.ready(Ambiguous.java:361)
at missionary.impl.Ambiguous$2.invoke(Ambiguous.java:440)
at missionary.impl.Thunk$Process.run(Thunk.java:71)
at java.base/java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1090)
at java.base/java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:614)
at java.base/java.lang.Thread.run(Thread.java:1474)Maybe my approach is wrong somewhere fundamental?
What is idiomatic Missionary way to take potentially infinite flow of messages (with blocking pull-message), group it to flows by some group-id , and parallel process groups by sequentially processing items in each group. Potential amount of group-ids is unknown, new group-id can appear anywhere in initial flow.
This issues does not look like something impossible... but maybe something conceptually wrong in my understanding of process?
Is nested flow sampling works well in Ambiguous implementation?
To repeatedly get values from a blocking function, use this pattern :
(m/ap (loop [] (m/amb (m/? (m/via m/blk (pull-message))) (recur))))
The rest of the pipeline looks correct to meYes, that works! Thank you!
m/observe version https://clojurians.slack.com/archives/CL85MBPEF/p1685970532152309?thread_ts=1684166929.251079&cid=CL85MBPEF
m/? in the reducing function are illegal. Move them to an m/ap :
(defn make-task [[_ >flow]]
(m/reduce conj [] (m/ap (m/? (processor/processor (m/?> >flow))))))
(defn start!
"`pull-message` is a function that pulls a queue message from a queue.
`processor/processor` is a function that processes a queue message.
`get-group-id` is a function that gets a group id from a queue message."
[]
(let [seed (repeatedly number-to-pull pull-message)
>source (m/eduction eduction-fn (m/seed seed))
>groups (m/group-by get-group-id >source)
>tasks (m/ap (m/? (make-task (m/?> ##Inf >groups))))
?task (m/reduce conj [] >tasks)]
(try (let [result (m/? ?task)]
(println "Reduction finished: " result))
(catch Exception err
(println "Reduction failed: " err)))))