Fork me on GitHub
Timur Latypoff06:02:28

Hi all! I am having "random" issues with async pubs (or maybe interaction of them with the REPL) — would you please help me with diagnosing what I am doing wrong. Below is the sample code. A stream of news messages is coming through all-news chan. The :country of the message shows to which country the news piece relates, absent or nil :country means the message relates to the whole world. What I want is to route the messages through the a/pub, so that individual subscribers can get only the news they want. There I create a chan russian-news which subscribes to news from country "RU" and to global news at the same time.

(ns myproject.test-async
  (:require [clojure.core.async :as a]))

(def all-news (a/chan (a/sliding-buffer 1000)))
(def the-pub (a/pub all-news :country (fn [_] (a/sliding-buffer 1000))))
(def russian-news (a/chan (a/sliding-buffer 1000)))
(a/sub the-pub "RU" russian-news)
(a/sub the-pub nil russian-news)

(a/offer! all-news {:country "RU" :title "News from Russia 1"})
(a/offer! all-news {:country "TR" :title "News from Turkey 1"})
(a/offer! all-news {:title "Global news 1"})
(a/offer! all-news {:country "TR" :title "News from Turkey 2"})
(a/offer! all-news {:country "RU" :title "News from Russia 2"})
(a/offer! all-news {:title "Global news 2"})
(a/close! all-news)

(print (a/<!! (a/into [] russian-news)))
When I run the code, I expect two Russian and two global news to be printed. I don't rely on the order of Russian news relative to global news, but I would like to rely that all the news messages will get eventually properly routed (in this case, the input channel is closed in the end, so we can ensure eventuality). The problem is: as I run the same code in the REPL multiple times, one after the other, I get different results:
Loading src/myproject/test_async.clj... 
[{:country RU, :title News from Russia 1} {:title Global news 1} {:country RU, :title News from Russia 2} {:title Global news 2}]Loaded
Loading src/myproject/test_async.clj... 
[{:country RU, :title News from Russia 1} {:title Global news 1} {:country RU, :title News from Russia 2} {:title Global news 2}]Loaded
Loading src/myproject/test_async.clj... 
[{:country RU, :title News from Russia 1} {:title Global news 1} {:title Global news 2}]Loaded
Loading src/myproject/test_async.clj... 
[{:country RU, :title News from Russia 1} {:title Global news 1} {:country RU, :title News from Russia 2} {:title Global news 2}]Loaded
Loading src/myproject/test_async.clj... 
[{:country RU, :title News from Russia 1} {:title Global news 1} {:country RU, :title News from Russia 2} {:title Global news 2}]Loaded
Loading src/myproject/test_async.clj... 
[{:country RU, :title News from Russia 1} {:country RU, :title News from Russia 2}]Loaded
In short, I sometimes (2 out of 6 times) don't get some of the news in the resulting stream. What could be the problem?


You have a race condition when subscribing the same channel to more than one topic. When you close the source chan all-news the go-loop created by pub closes the topic channels when it’s done, which causes the go-loops created by mult for each topic to close the subscribed channel by default. One of these mult go-loops will close russian-news before the other, and that other could be in the middle of still putting values on russian-news. It will silently fail and untap automatically.

👍 4
Timur Latypoff12:02:15

@U1B0DFD25 thank you! Would you please suggest, what is the proper way to subscribe a single channel to multiple topics? I’d assume it’s a/mix, but the whole thing starts looking really heavy-weight with multiple (4—5) intermediary buffers.


you could use a mult directly and tap with a channel using a filter transducer.

👍 4

not sure if you'll run into similar races with that though


any particular reason to use offer! instead of put!?

Timur Latypoff07:02:52

@U051SS2EU I guess put! without callback is equivalent to offer! — I didn’t know the callback-less form existed. I usually use offer!/poll! in REPL for convenience.


it's different in that offer! fails to put anything on the channel (without error) if there's no buffer space or immediate consumer


put! has its own buffer


the reason I bring this up is that using offer silently drops your data if you don't account for that, I didn't look close but it can be a source of unexpected behavior for that reason

Timur Latypoff19:02:19

@U051SS2EU ah yes, that's the expected behavior of offer! for me when I'm REPLing, it returns false if the buffer is full, I believe. In my case above, @U1B0DFD25 was 100% correct — indeed that was a race condition, the topic channel gets closed when input channel is closed, so in my case I sometimes end up not receiving all the messages.


You can subscribe two different channels and a/merge them into one. The merged channel will be closed only after both the sources are closed.


Or subscribe without closing the destination channel

Timur Latypoff19:02:29

Thank you! I even have uses for both of the ways :)


should I close every channel I open?


Probably not


Closing a channel means you will not put any more values on it, and consumers know to stop working (when they take a nil value from the channel)


I see, thanks!


I was just worried if creating a channel makes it registered somewhere so it might lead to memory leaks


No, it doesn’t even release pending puts on the closed channel, that’s why it’s the responsibility of the producer thread to close the channel after it’s done + by default piping a channel into another channel will close the destination channel when the source is drained, causing a cascading automatic closing of all channels.


is it an okay way to implement delayed retries (assuming fn asynchronously puts one value to fn-ch, and delays is a seq of millis)?

(defn with-retries [fn fn-ch delays out-ch]
  (fn fn-ch)
  (async/go-loop [val (<! fn-ch)
                  delays delays]
    (if (instance? Exception val)
      (let [[delay & delays] delays]
        (if delay
          (do (<! (async/timeout delay))
              (fn fn-ch)
              (recur (<! fn-ch) delays))
          (>! out-ch val)))
      (>! out-ch val))))

Ben Sless13:02:21

Do you intend for the process to take another value from the channel while the previous is being timed out?


no, I know there will be at mose one value from fn-ch

Ben Sless13:02:57

Then it makes sense at a first glance


thanks for the review!


Was there any plan to differ in implementation between pipeline and pipeline-blocking? (they are identical atm afaik)


I guess that would be a question for @alexmiller

Alex Miller (Clojure team)14:02:09

they used to be different, we decided blocking was not correct so atm they are the same. they may be modified to be different in the future

Alex Miller (Clojure team)14:02:34

this was a recent change


makes sense. thx

Alex Miller (Clojure team)14:02:40

I would recommend still picking the function that best reflects whether it is computation only or potentially blocking operations


does it use a separate thread pool or do they share one between blocking and "normal"?


pipeline and pipeline-blocking are the same and they both use async/thread internally (so the internal core.async cachedthreadpool). pipeline-async doesn't assume any execution context, it runs in go blocks


not sure what's "normal" 🙂

Alex Miller (Clojure team)15:02:17

note that the pool used by thread is different than the pool used by go blocks

Alex Miller (Clojure team)15:02:13

the former is a cached thread pool (grows without bound, reuses if possible, goes to sleep and dies if not used)

Alex Miller (Clojure team)15:02:24

the latter is a fixed size thread pool

Alex Miller (Clojure team)15:02:10

pipeline or pipeline-blocking will have concurrency up to N


yes, it's quite similar to using an executorservice in practice


since it's bound

Alex Miller (Clojure team)15:02:57

it is literally using an executorservice


I mean per pipeline

Alex Miller (Clojure team)15:02:42

it would be reasonable to use a fixed size pool for pipeline instead as the tasks should be computation-only and not block

Alex Miller (Clojure team)15:02:24

but the behavior would not be much different in practice


I am saying that because I used to be a proponent to have the ability to specify executor a bit everywhere in core async api, but with usage/time I am not sure it's really useful anymore, defaults are good


as long as the user knows what runs where


only downside I can think of now is we don't get nice thread names in logs 🙂, which is ok


and writing a variant of async/thread that takes an executor as arg takes ~10 lines

Alex Miller (Clojure team)15:02:29

you'll get something like "async-thread-macro-1"


yes, sometimes you'd prefer names that indicates what it's responsible for, but that's ok, there are other places in logs to express that and enough information to figure it out

Alex Miller (Clojure team)15:02:15

the issue with caching threads is that they can be more than one thing of course over time


the issue with naming you mean?

Alex Miller (Clojure team)15:02:14

it is possible to change thread names dynamically (and this is a super sneaky debugging technique) but there are perf impacts


I guess you could cheat and use Thread/setName


at Run time, but yuk

Alex Miller (Clojure team)15:02:37

I've totally abused this for debugging :)


me too 🙂