Hello I am playing with core.async and I am noticing that printing to console is not reliably reproducible when I try to pring from multiple go blocks. Is there a way of printing messages from different go blocks? Or is there a better way to approach this from workflow PoV? All help appreciated.
There are two common issues when printing from multiple threads, and there's nothing specific to core.async:
• Lack of flushing, the main symptom is delayed or absent output. Use (println ...) instead of (print ...) since the former flushes for you, or call (flush) manually
• Output interleaving, so e.g. you (println "ab") in one thread or go block and (println "cd") in another and instead of seeing ab\ncd\n in the output you might see something like acb\nd\n. For that, you have to use some locking
For the last item, something like
(defn my-println [& args]
(locking println ;; Using the function as a lock object.
(apply println args)))
would do the trick.thanks @p-himik, for that. Do you have any idea for the resolution of 1st point?
Uhm, doesn't the description of the first point also contain the resolution? :) The second sentence in the first point.
I tried that... and no resoluytion
I replaced all the prints
and I still don't see any outputs
I am using vscode repl
...if that helps...
• Make sure that whatever you did to the code has actually been applied. If you aren't sure, just restart the REPL and repeat the process • Make sure that it's indeed printing that's to blame and not e.g. the fact that that code block where the printing is just doesn't get executed for whatever reason
checking
Could this be to do with the out binding of the thread not being your editor's REPL?
😄 I will defer to your experience... I am noticing that this issue is always there whenever I am working with core.async
I see a similar thing when I work with web servers in my editor's REPL
I am open to any/all ideas.... without these prints working I am finding it hard to deal with different threads
Check this out https://stackoverflow.com/a/27056185
Ah, forgot about *out*, right.
@abhishek.mazy if it's for debugging, you should most definitely prefer tap>.
@michael.e.loughlin the stackoverflow solution didn't work for me on VSCODE + Calva REPL
@p-himik I am going to try the tap> and let you know
Note that it won't print as println would, not by itself. There must also be at least one call to add-tap. I myself prefer using Portal with it, it's designed to work with taps.
yes... reading the docs now
the tap> solution works!! Thanks so much all!
Hello I am trying to have an application which will have multiple persistent threads. For this I am using core.async. So far for persistent threads I've come up with a general structure as follows:
(defn threadname [...args...]
(go-loop [... loop params...]
(when-let [...wait for some event...]
....do something....
(recur ....))))
Firstly please can confirm if this is the most idiomatic way of declaring persistent threads?
Secondly how does one handle a persistent thread which will repeatedly execute some instructions every few 100msecs? I have experimented with timeout and also with Thread/sleep, but none of these approaches seem to work. This is because the when-let statement doesn't have any way to hold up the thread like waiting on data for a channel. It is possible I am thinking about this all wrong... all suggestions welcome.What do you mean by "persistent"?
If you need to run a non-blocking op every X units of time, I'd suggest using a SheduledExeucutorService, something like:
(def scheduler (Executors/newScheduledThreadPool 1))
(.scheduleAtFixedRate scheduler the-fn 0 100 TimeUnit/MILLISECONDS)
It's also possible to achieve that with go-loop where you wait on timeout and then execute the task, but it bears a risk if your operation can block - go blocks use a very limited thread pool and aren't designed for blocking operations. So you'd have to use it in combination with thread if anything can block, something like:
(defn run-periodically [f]
(a/go-loop []
(a/
That's a simplified example, in reality you'd probably want something that would allow you to actually stop the loop. So a scheduler is both simpler and easier.And just in case - neither of those handle the situation where you want multiple calls to (f) overlap if some (f) takes longer than 100 ms.
by persistent I mean... the app starts then splits into many threads along with the mainthread. All threads are holding and waiting for data to arrive so that these threads can pick up that data from their input channels and process the data; possibly put the results on to their individual output channels.
I'll try your solutions out and see how they work
thanks again @p-himik
your go loop recipe is working for my use-case quite well @p-himik! Thanks again!
If you want cancellation, a common approach is to have a separate channel and alt! on it. That channel should be returned from the function that starts the whole thing and, when you want to cancel it, just deliver any value to it.
It's not super clear what you want. But say you want some kind of asynchronous command process executor, you'd want something like:
(defn make-process
[input-chan]
(let [output-chan (chan)]
(thread
(loop []
(if-let [command (<!! input-chan)]
(try
(let [{:keys [op args]} command
result (case op
:foo (apply foo args)
:bar (apply bar args)
::unknown-op)]
(cond
(= result ::unknown-op)
(>!! output-chan {:error :unknown-op
:command command})
(nil? result)
(>!! output-chan {:error :nil-result
:command command})
:else
(>!! output-chan result)))
(recur)
(catch Exception e
(>!! output-chan {:error :exception
:command command
:ex e})
(recur)))
(close! output-chan))))
output-chan))
It looks a bit complicated, but that's mostly because the error handling is a little tricky. So this handles that as well.
But if you need something simpler, like just transform input -> output with a pure function, is simple IO, where it's always the same function you apply, so no "dispatching to a per-message command", you should use pipeline , pipeline-blocking or pipeline-async instead.Say for example you always wanted only foo done on input. And foo is a possibly blocking or heavy computation. You can just do:
(defn make-foo-process
[input-chan n]
(let [output-chan (chan)
xf (map (fn [msg]
(try
(let [result (foo msg)]
(if (nil? result)
{:error :nil-result :command msg}
result))
(catch Exception e
{:error :exception :command msg :ex e}))))]
(pipeline-blocking n output-chan xf input-chan true)
output-chan))Actually, even the first version, if you need "dispatch" is probably cleaner with using pipeline-blocking as well, just like this:
(defn make-process
[input-chan n]
(let [output-chan (chan)
xf (map (fn [command]
(try
(let [{:keys [op args]} command]
(case op
:foo (if-let [r (apply foo args)]
r
{:error :nil-result :command command})
:bar (if-let [r (apply bar args)]
r
{:error :nil-result :command command})
{:error :unknown-op :command command}))
(catch Exception e
{:error :exception :command command :ex e}))))]
(pipeline-blocking n output-chan xf input-chan true)
output-chan))@didibus I'll try out your suggested approaches. Many thanks!
Is there a way to find out how many core.async threads are still knocking about?
Do a thread dump, look for threads with names starting with async-.
Note that some of those threads could be there simply because they're cached. But they could still be unused. I don't know for sure, but I would expect for the threads that are actually doing something to be in the RUNNABLE and maybe BLOCKED state.
Hello, I'm taking a look at the function range and according to the docs if I pass a third parameter to it, I can specify the "step" for each number of the result. Is there a way to pass something like a function to the third parameter? I ask that because I'm trying to create a range of numbers that are multiple of 3 or 5, so I don't need to create two ranges and then merge them. Is that possible without using lazy-seq?
range takes only what the doc describes
add more constraints by transforming the collection that range returns (e.g. using filter or remove)
what’s more, you can see in the clojure https://github.com/clojure/clojure/blob/clojure-1.11.1/src/clj/clojure/core.clj#L3036 that range pivots off of whether the step param is a Long or not, and you can see in the source code java doc that both only take int/long:
• https://www.javadoc.io/static/org.clojure/clojure/1.5.0-RC6/clojure/lang/Range.html#Range(clojure.lang.IPersistentMap,%20int,%20int) takes an int step
• https://www.javadoc.io/static/org.clojure/clojure/1.8.0/clojure/lang/LongRange.html#create(long,%20long,%20long) takes a Long step
My first attempt was the following:
(defn sum-n [n]
(->> (range n)
(filter #(or (zero? (mod % 3))
(zero? (mod % 5))))
(reduce +)))
Which works fine even for large ranges. Right now I'm trying to find a way to create the range without filtering, because in theory, it would be faster than creating a range and then filtering.
I also attempted to use a lazy-seq to generate the list of the numbers divisible by 3 and 5. This approach works, but is something around 30 - 40% slower than the code above.in theory “in theory is better” is always better. in practice, “in theory is better” is not always better. but rough numbers
(time (let [n 500]
(->> (range n)
(filter #(or (zero? (mod % 3))
(zero? (mod % 5))))
(reduce +))))
"Elapsed time: 4.167208 msecs"
57918
(time (letfn [(multiple-of [n] #(zero? (mod % n)))]
(transduce (filter (some-fn (multiple-of 3) (multiple-of 5)))
+
0
(range 500))))
"Elapsed time: 1.737542 msecs"
57918
Here is the code I wrote to generate the multiple of 3 and 5:
(defn multiples-of-3-and-5
([limit]
(lazy-seq
(cons 0 (multiples-of-3-and-5 limit 0 0))))
([limit cur idx]
(let [pattern [3 2 1 3 1 2 3]
cur-idx (if (>= idx (count pattern)) 0 idx)
cur-sum (+ cur (get pattern cur-idx))]
(when (< cur-sum limit)
(lazy-seq
(cons cur-sum (multiples-of-3-and-5 limit cur-sum (inc cur-idx))))))))
(defn sum-n3 [n]
(->> (multiples-of-3-and-5 n)
(reduce +)))
And here are the benchmarks:
"Elapsed time for sum-n: 7485.135447 msecs"
"Elapsed time for sum-n3: 10631.884368 msecs"Is something wrong with my implementation?
Your approach is 100% faster than mine @dpsutton, does transduce may be impacting?
absolutely. You are messing with cons and lazy sequences, creating thunks to lazily realize the sequence. Transduce turns this on its head. range is optimized to call a function on each value in its range. especially with how dense multiples of 3 and 5 are in the integers, calling a function that accepts more than 1/3 of the time is faster than the construction of sequences containing the values
if this were multiples of much larger numbers i suspect your version becomes more efficient at some point. also more fun. kinda writing an infite and lazy merge sort on infinite streams of integers But your way is more fun. Kinda write a lazy merge sort on the infinite sequences of
For some reason, My approach is faster for a very specific case: when the number limit is 10000000 (ten millions), but I definitely could not find a reason for that.
that is surprising to me for intervals of 3 and 5
(time (letfn [(multiple-of [n] #(zero? (mod % n)))]
(transduce (filter (some-fn (multiple-of 3) (multiple-of 5)))
+
0
(range 10000000))))
"Elapsed time: 465.324084 msecs"
23333331666668Your approach still faster than mine for 10 million, but my approach is faster than a simple range - reduce
if you’re benchmarking, make sure you’re trying to start ‘fresh’ often - maybe not in this case, but it can often be easy to ‘warm up’ the JVM and your runtime for one test just to be wowed by speed increases on a subsequent test 🙂
user=> (time (sum-n3 10000000))
"Elapsed time: 919.840041 msecs"
23333331666668i find mine twice as fast as yours on 10000000
It is, your approach is the fastest so far
ah i thought you said yours was faster on a large number
and @ebernard is correct. profiling is very tricky. we’re using a very coarse instrument with time and the jvm might be doing more work for us in some cases so its not apples to apples
@ebernard How can I "start fresh"? I'm not used to the JVM world, so I would really appreciate if you suggest anything that I can study.
in short: maybe in this case, try with a ‘fresh’ JVM (e.g. if you’re running this at the REPL, consider creating a new REPL for each test case and only run a test once). like @dpsutton said, the JVM has been developed over many, many years (as well as our CPUs, and our operating systems’ control over memory, etc. etc. etc…) to try to make educated guesses as to what to return based on a given instruction. during execution, the various layers at play are building up their education to make those guesses. ergo, a benchmark run against a system that just benchmarked something else may very well perform differently (better? worse? who’s to say!) than if this were the first benchmark you’d run
there’s only so much one can reasonably do, and if you’re just comparing speeds on a couple of implementations like this, a fresh JVM is probably a good bang for your buck. and if you don’t notice a difference, then it may very well not even apply in this case. like i said - it’s a complex problem!
Got it, thanks for the information @ebernard! I think this small experience (The first function was basically the solution to the first problem from Project Euler) made me learn new things today. Also, thanks @dpsutton for the transducer example, I'll try to apply it more frequently on my daily studies. Have a great day!
sure thing - I’m not well-versed enough in benchmarking to really be able to say if that’s at play here - I just know I’ve been somewhat bit in the past by re-using processes and not otherwise accounting for the system components running the thing I was benchmarking, and it’s easy to miss 🙂
if they don’t quite click for you yet the reference page (https://clojure.org/reference/transducers) is really, really good
Kind of tangential... if the end goal is to sum them, it might not be necessary to even create the range - the answer is always going to be a multiple of the number, and with a bit of analysis, I think it's possible to get the answer with some division and some multiplication. It's not a direct "how to deal with the ranges" answer, but it could make the ranges unnecessary for this particular problem, and may be instructive when doing math-y things like this (maybe Project Euler?)
btw, transduce is pretty powerful! you may occasionally bump into other occasions where something like (-> m foo bar baz) may be better served by, instead, using comp to compose those functions together and pass it as a transducing function. it can often be a computational speed boost
Now that you mentioned, it totally makes sense to use comp, I have seen this function in the past, but could not really understand why use it instead of thread operators. Thanks a lot for this example Evan!!!
a user friendly language should have a built in fizz-buzz in core
Even shorter:
(reductions + (cycle [3 2 1 3 1 2 3]))(for [i (range)
d [3 5 6 9 10 12 15]]
(+ (* i 15) d))btw: you can use criterium to get more accurate benchmarks.
(require 'clojure.repl.deps)
(clojure.repl.deps/add-lib 'criterium/criterium {:mvn/version "0.4.6"})
(require '[criterium.core :as cc])
then
(cc/quick-bench (sum-n 1000))(->> (mapcat vector
(iterate (partial *' 3) 1)
(iterate (partial *' 5) 1))
(take 100)
(run! println))No idea if performant or not 👆
I suspect the presence of duplicated numbers and the fact that the numbers are out of order are violating unstated requirements that said, I compared to transducing version of the same thing and the perf is pretty close
(defn v1 []
(->> (mapcat vector
(iterate (partial *' 3) 1)
(iterate (partial *' 5) 1))
(take 100)
(into [])))
(defn v2 []
(into []
(take 100)
(mapcat vector
(iterate (partial *' 3) 1)
(iterate (partial *' 5) 1))))
(assert (= (v1) (v2)))
(defn measure-time
[f]
(let [start (System/nanoTime)
_ (f)
end (System/nanoTime)]
(- end start)))
(defn dumb-crit
[f n]
(/ (apply + (repeatedly n #(measure-time f)))
(* n (Math/pow 10 9))))
(println 'v1 (dumb-crit v1 1000))
(println 'v2 (dumb-crit v2 1000))v1 4.0298073E-5
v2 2.623332E-5