Fork me on GitHub
#missionary
<
2021-06-03
>
mjmeintjes00:06:06

Hi. First, just wanted to say thanks for missionary, I'm using it a lot and it is such a joy to work with. Makes complex concurrent systems much easier to build.

mjmeintjes00:06:27

Second, my question is what would be the most idiomatic/best way to consume a BlockingQueue and turn it into a flow?

mjmeintjes00:06:09

Currently I have this: (->> (ms/observe (fn [event] (let [q (d/tx-report-queue (./dev-conn)) f (future (loop [u (.take q)] (try (event [u]) (catch Exception ex (tap> [::ex ex]))) (recur (.take q))))] #(do (future-cancel f) (d/remove-tx-report-queue (./dev-conn)))))) (ms/relieve #(into %1 %2)))

mjmeintjes00:06:38

But I'm not sure if there is a better way to do it that does not involve future

leonoel06:06:00

@mjmeintjes more idiomatic would be to register/deregister the queue at the edge of your app, then start the pipeline with (m/ap (m/? (m/?> (m/seed (repeat (m/via m/blk (.take q))))))) observe/`relieve` works but there's no backpressure so it's effectively an unbounded buffer, it could grow out of control if the consumer is too slow

mjmeintjes20:06:27

Thanks, I did not realize I could return tasks from seed