This page is not created by, affiliated with, or supported by Slack Technologies, Inc.
2021-06-03
Channels
- # announcements (1)
- # asami (13)
- # babashka (14)
- # beginners (58)
- # calva (49)
- # cljs-dev (6)
- # clojure (99)
- # clojure-europe (28)
- # clojure-gamedev (4)
- # clojure-nl (19)
- # clojure-spec (4)
- # clojure-uk (39)
- # clojured (5)
- # clojurescript (39)
- # code-reviews (3)
- # conjure (4)
- # cryogen (12)
- # cursive (21)
- # data-science (4)
- # datalog (1)
- # datomic (16)
- # duct (4)
- # events (1)
- # fulcro (6)
- # graalvm (1)
- # graphql (1)
- # jobs (13)
- # jobs-discuss (23)
- # kaocha (2)
- # lsp (15)
- # malli (24)
- # missionary (6)
- # off-topic (21)
- # polylith (75)
- # releases (2)
- # remote-jobs (4)
- # shadow-cljs (47)
- # sql (35)
- # vim (10)
- # xtdb (4)
Hi. First, just wanted to say thanks for missionary, I'm using it a lot and it is such a joy to work with. Makes complex concurrent systems much easier to build.
Second, my question is what would be the most idiomatic/best way to consume a BlockingQueue and turn it into a flow?
Currently I have this:
(->> (ms/observe (fn [event]
(let [q (d/tx-report-queue (./dev-conn))
f (future
(loop [u (.take q)]
(try
(event [u])
(catch Exception ex
(tap> [::ex ex])))
(recur (.take q))))]
#(do
(future-cancel f)
(d/remove-tx-report-queue (./dev-conn))))))
(ms/relieve #(into %1 %2)))
But I'm not sure if there is a better way to do it that does not involve future
@mjmeintjes more idiomatic would be to register/deregister the queue at the edge of your app, then start the pipeline with (m/ap (m/? (m/?> (m/seed (repeat (m/via m/blk (.take q)))))))
observe
/`relieve` works but there's no backpressure so it's effectively an unbounded buffer, it could grow out of control if the consumer is too slow
Thanks, I did not realize I could return tasks from seed
Thanks, I did not realize I could return tasks from seed