missionary

2025-10-10T13:31:59.166779Z

Hello everyone! Why this start! function starts processing incoming queue messages only when third message arrive? In correct order and with parallelism I need, but waiting for third one to start... =( What I want to achieve: messages in every group (same group-id) are processed sequentially, but all groups in parallel. Maybe this approach is not the best, but I tried different ones and every was "on hold" until third message arrive... Maybe you can suggest me any idiomatic way?

2025-10-12T14:42:48.556169Z

Thank you. I’ll try and reply

2025-10-14T18:05:47.537939Z

I tried code like this. But unfortunately it still waiting for third message from queue to start processing (processing starts from first one). • queue-message->payload and get-group-id functions are just simple get-in s under the hood; • processor/processor is a function taking incoming data and returning m/sp task. It does not look important, but FYI: I am trying with messages with same group-id

Ian Chow 2025-10-15T03:01:25.832979Z

sounds like your producer is dropping messages because the consumer is not ready. i'd check if the queue is buffered.

leonoel 2025-10-15T07:57:10.544899Z

is pull-message a blocking function ?

2025-10-15T10:06:35.141969Z

@leonoel, oh yes! pull-message is blocking function! That could be the cause. But if I trying to wrap it as #(m/? (m/via m/blk (pull-message))) it fails with

Exception in thread "missionary blk-1" java.lang.NullPointerException: Cannot read field "prev" because "c" is null
	at missionary.impl.Ambiguous.done(Ambiguous.java:277)
	at missionary.impl.Ambiguous.ready(Ambiguous.java:361)
	at missionary.impl.Ambiguous$2.invoke(Ambiguous.java:440)
	at missionary.impl.Thunk$Process.run(Thunk.java:71)
	at java.base/java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1090)
	at java.base/java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:614)
	at java.base/java.lang.Thread.run(Thread.java:1474)

2025-10-15T10:28:00.983729Z

Maybe my approach is wrong somewhere fundamental? What is idiomatic Missionary way to take potentially infinite flow of messages (with blocking pull-message), group it to flows by some group-id , and parallel process groups by sequentially processing items in each group. Potential amount of group-ids is unknown, new group-id can appear anywhere in initial flow. This issues does not look like something impossible... but maybe something conceptually wrong in my understanding of process?

2025-10-15T10:36:02.531199Z

Is nested flow sampling works well in Ambiguous implementation?

leonoel 2025-10-15T10:53:07.678279Z

To repeatedly get values from a blocking function, use this pattern :

(m/ap (loop [] (m/amb (m/? (m/via m/blk (pull-message))) (recur))))
The rest of the pipeline looks correct to me

👍 1
👀 1
2025-10-15T11:25:42.040729Z

Yes, that works! Thank you!

👍 1
leonoel 2025-10-11T08:51:59.158369Z

m/? in the reducing function are illegal. Move them to an m/ap :

(defn make-task [[_ >flow]]
  (m/reduce conj [] (m/ap (m/? (processor/processor (m/?> >flow))))))

(defn start!
  "`pull-message` is a function that pulls a queue message from a queue.
     `processor/processor` is a function that processes a queue message.
     `get-group-id` is a function that gets a group id from a queue message."
  []
  (let [seed (repeatedly number-to-pull pull-message)
        >source (m/eduction eduction-fn (m/seed seed))
        >groups (m/group-by get-group-id >source)
        >tasks (m/ap (m/? (make-task (m/?> ##Inf >groups))))
        ?task (m/reduce conj [] >tasks)]
    (try (let [result (m/? ?task)]
           (println "Reduction finished: " result))
         (catch Exception err
           (println "Reduction failed: " err)))))

🙏 1