Fork me on GitHub

Hi, I want to know if I'm doing this correctly.

(defn long-running-shell-command [x]
    (:out (sh "foo" x))))

(defn handler [val res-ch]
    (a/>! res-ch (a/<! long-running-shell-command val))
    (a/close! res-ch)))

(let [out-ch (a/chan)]
  (a/pipeline-async 5 out-ch handler (a/to-chan (range 10))))
Since the long-running-shell-command is blocking io, it shouldn't be run on a go block instead I run it on a core async thread and pull from inside the go block in handler. Is this the correct approach on doing this, or is there a better way?


I would recommend using pipeline-blocking


pipeline-async has some odd behavior that can be very intuitive, like if I recall correctly your example will happily run your shell command with more than 5 concurrent executions


Yep, thats correct it does upto n + 2 concurrent executions. I'm fine with that behaviour. What I really want to know is pulling from a thread in a go block is that okay?


For the other pipelines it is n+2


For pipeline-async, again it has been a while, but it is more complicated, but basically the n doesn't limit anything


Basically your xf will have at most n concurrent calls and returns, but because xf is assumed to be asynchronous, it continues to run even after it has returned, so you can effectively unlimited overlap of asynchronous executions


I need to write transformed events on a channel out to a file using transit-clj. I presume I just wrap a with-open in a async/thread and do a synchronous take from the channel and write each object to the file? (this is in a batch system)


I suppose just doing a normal loop/recur in the async/thread will keep things going. I just haven't thought my way around terminating the loop yet (I suppose on the nil from the channel)

Ben Sless09:11:49

nil from the channel is the correct "pattern". If you take nil it means the channel is closed, which means work is surely done. Use channel closure to signal process stoppage instead of "special" messages. See example


@UK0810AQ2 that looks like a useful example and exactly what I am after. Thx!

👍 3
Ben Sless10:11:06

Note the blocking produce/consume versions are agnostic of your threading model. You can run them in a future, async/thread or something else.


but don't use them inside a go block :D

☝️ 6
Ben Sless21:11:21

Yup. Until we get virtual threads never do blocking operations in go blocks.


Yeah, don't block the thread pool


I'm just wondering if I'd want it to return anything as it is just a side effect to write out a file. Possibly the path to the file so a future might be easier for that. Idk


Thread returns a channel iirc

Jakub Holý (HolyJak)23:11:44

Interesting library, thanks!


Hello, I’m trying to build a chat-like feature using websockets, and i need users (only 2) to send some initial data to each other. this is an example of what I’ve done so far, but i’m missing something:

(def signals (chan))
(def publication (pub signals :source))

(defn user-handler [signals publication room user_id other_id data]
  (let [user-chan (chan)
        topic (keyword (str room "_" user_id))]
    (sub publication topic user-chan)
      (>! signals {:source   (keyword (str room "_" other_id))
                   :room     room
                   :user_id  user_id
                   :other_id other_id
                   :data     data}))
      (let [timeout-chan (timeout 15000)]
          timeout-chan :timeout
          user-chan ([received-data] (println received-data)))))))

  (user-handler signals publication "room1" 1 2 "Hello from user1")
  (user-handler signals publication "room1" 2 1 "Hello from user2")

;; expect to print:
{:source :room1_2, :room "room1", :user_id 1, :other_id 2, :data "Hello from user1"}
{:source :room1_1, :room "room1", :user_id 2, :other_id 1, :data "Hello from user2"}

;; got:
{:source :room1_1, :room "room1", :user_id 2, :other_id 1, :data "Hello from user2"}