This page is not created by, affiliated with, or supported by Slack Technologies, Inc.
2019-04-24
Channels
- # beginners (64)
- # calva (40)
- # cider (76)
- # clara (13)
- # clojure (72)
- # clojure-dev (34)
- # clojure-italy (4)
- # clojure-nl (14)
- # clojure-poland (1)
- # clojure-uk (30)
- # clojurescript (58)
- # clr (10)
- # core-async (101)
- # cursive (31)
- # datomic (9)
- # emacs (20)
- # fulcro (2)
- # jackdaw (1)
- # jobs (3)
- # juxt (3)
- # luminus (4)
- # lumo (15)
- # mount (4)
- # nrepl (29)
- # nyc (1)
- # off-topic (27)
- # qlkit (1)
- # quil (5)
- # re-frame (19)
- # reitit (8)
- # remote-jobs (4)
- # rewrite-clj (5)
- # shadow-cljs (45)
- # spacemacs (22)
- # sql (9)
- # uncomplicate (1)
- # xtdb (14)
Sometimes async throw a assert fail if put a nil on a chan.
Exception in thread "async-thread-macro-39" java.lang.AssertionError: Assert failed: (not (nil? itm))
but usually this error will give me no information about where it happens in my source code.nils are invalid channel values. if you're putting a nil on a channel in a go block or thread, you need to have some try/catch handling around the puts to be able to catch those errors
although it would be better to ensure you don't put it in the first place
from docs to pipeline
Because it is parallel, the transducer will be applied independently to each element, not across elements, and may produce zero or more outputs per input.
But how transducers can return multiple results?It can do that more than once
Like mapcat
‘mapcat` actually does it with a nested reduce iirc
I know I can't put a nil on a channel. but if this happens, how to find where the problem happened? is there any trick or I have to check one by one. (there're a lot usage in my code)
core.async stack traces are often far from helpful
because the method that actually called your code is calling a part of the state machine that core.async made from your go block, not any named object identifiable directly in your code
We have some chains of async blocks in our code, and I ended up writing a guarded-go
macro and guarded-take!
macro for our async bits. guarded go is a go
block wrapped in a try/catch, if an exception occurs it will catch it and return it on the channel. for consuming those channels, guarded-take!
will check if the value returns is an exception, and re-raise it after attaching on some information about the call location
It's not the prettiest, definitely felt wrong writing it, but it has helped make exceptions easier to deal with in some of our more complex async code
the exception eventually winds its way out to the caller and you can see exactly what path caused the issue
(or in the case of something recoverable or that needs special handling, can be caught and dealt with)
@robertfrederickwarner I think this works
the full exception stack trace is
Exception in thread "async-thread-macro-15" java.lang.AssertionError: Assert failed: (not (nil? itm))
at clojure.core.async.impl.protocols$add_BANG_.invokeStatic(protocols.clj:40)
at clojure.core.async.impl.protocols$add_BANG_.invoke(protocols.clj:37)
at clojure.core$map$fn__5864$fn__5865.invoke(core.clj:2742)
at clojure.core.async.impl.channels$chan$fn__6842.invoke(channels.clj:300)
at clojure.core.async.impl.channels.ManyToManyChannel.put_BANG_(channels.clj:143)
at clojure.core.async$_GT__BANG__BANG_.invokeStatic(async.clj:143)
at clojure.core.async$_GT__BANG__BANG_.invoke(async.clj:138)
at clojure.core.async$pipeline_STAR_$process__11435.invoke(async.clj:492)
at clojure.core.async$pipeline_STAR_$fn__11447.invoke(async.clj:507)
at clojure.core.async$thread_call$fn__11358.invoke(async.clj:442)
at clojure.lang.AFn.run(AFn.java:22)
at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149)
at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624)
at java.lang.Thread.run(Thread.java:748)
can I confirm it come from a pipeline-*
?You'll need to check for exceptions manually at the other side; they won't be thrown automatically. That's why I pair using my guarded-go
with a guarded-take!
@doglooksgood is the issue at line 2742 of your core.clj file?
ah, nevermind
that's in clojure's core.clj
I do think that the issue is happening in a pipeline, in which case you aren't going to be able to write any replacement macros that handle the case
is there a elegant way to deal with the possible exceptions with core.async. I don't want to try/catch everywhere. I can write the predicate but sometimes it may not cover the case.
in general, if you have something that returns "nil" it is likely some io operation which people insist in putting in transducers on pipelines for some god awful reason
like, if you don't consume and produce meaningful values, don't put it in the middle of a pipeline
I know some company wrote some terrible blog post that did that and it made the front page of various aggregators, but it is bad
yup. ultimately the exception stuff I describe is our last ditch "something went wrong" handler to ensure that we return a valid response (this is for a web service). in "regular" usage, it shouldn't be hit, only when something goes terribly wrong. I wouldn't use it as a core building block
i'm looking if its even possible to create debouncing transducer which drops any element except last one in a period of time. Looks like there is no way to deliver value when timeout was hit, because we need one more transducer call for that, which may never happen for a channel. Any ideas?
yeah due to sequential nature of transducers and abstracted iteration, i don't think its possible
core.async is largely about writing machines, not data processing, transducers are for data processing, there for if you are reaching for transducers writing a core.async program, the odds are pretty good that you are making a mistake
that makes no sense re transducers
one of the major reasons transducers exist is core.async
but if you are writing a little stateful machines, there is very little demand for sticking functional data processing (map, filter, etc) in the middle of the queues connecting those machines. And it is super weird to stick io operations in the middle of the queues connecting those machines.
if you are using core.async as a substrate for parallel data processing then it does make sense
you write data processing code, core async makes it state machine i think you missing that part
so it depends on which type of core.async usage you have, and I would argue the more compelling reason to use core.async is for the machines, not parallel data processing
yeah, there's other stuff for clojure that is much better at parallelizing things
the state machine transform the go macro does is largely about stateful machines processing queues (channels)
and there are even programs that do both kinds of core.async usage, and pipelines are, in my mind, basically the interface between those two kinds of core.async programs
Would love to hear them. We considered using it on our project but went with component instead
but this is my first big clojure project, so trying to sponge up as much as I can these days
mount encourages reliance on global singletons which is not a good way to structure a program
that was our conclusion as well. we went with component as it seemed to be a more manageable approach, plus being able to run multiple systems concurrently meant nice isolation for repl/test
hi, I have a use case where I am working with 30-40 items, for each item I need to make 3 http calls (each http call returns a piece of information about the item), and then at the end I want an aggregated results, so the number of http calls will be between 90-120, I am using pipeline-async with 3 stages, although it works I get javax.net.ssl.SSLException: SSLEngine is closing/closed or timeout exception for some calls, I want to limit the number of http calls that are in progress at any point, is it possible here?
Ok, what is the difference between pipeline-async and pipeline-blocking when applied to above setup? Will pipeline-async trigger 1st call, then trigger next call without waiting for result, and pipeline-blocking trigger at most n calls (as specified in concurrency parameter)?
so if you need to limit the number of external calls inflight pipeline-async isn't a great choice
I would start with creating an executor with a fixed thread count, and making sure all your http requests happen on that executor
then no matter what the code that makes http requests looks like (pipelines, different threads, etc) you can always limit the number in flight based on the number of threads available to the executor
Made a debouncer using just core.async, not even sure if it's correct. Clearly it can be done using standalone resetable timer instead of deliver function. Would be nice to get some feedback.
(defn debounce
([c ms] (debounce c (chan) ms))
([c c' ms]
(let* [vkey (volatile! nil)
deliver (fn [key val]
(go (<! (timeout ms))
(when (= key @vkey)
(>! c' val))))]
(go-loop []
(let [val (<! c)
key (vreset! vkey (keyword (gensym "id_")))]
(deliver key val))
(recur)))
c'))
(defn debounce [in out t]
(async/go
(let [msg (async/<! in)]
(if (some? msg)
(if (async/>! out msg)
(let [to (async/timeout t)]
(loop []
(async/alt! to
([_]
(debounce in out t))
in
([msg]
(if (some? msg)
(recur)
(async/close! out))))))
(async/close! in))
(async/close! out)))))
yeah thanks for example, looking into it but i need some feedback on my implementation
volatile in core.async seems like a bad idea
@nxtk volatiles are faster than atoms because do nothing to ensure values stay in synch between threads, core.async go guarantees you can't control what thread that code is running in
@noisesmith arent (go) about single threaded CSP?
i just started with coreasync this morning, too much information for one day, may be missing some pieces
on the JVM, go uses a thread pool, on JS there's no volatile! iirc
it's small, but it's usually going to be more than one thread
right
you are also generating timeouts for everything that goes through, regardless of it will be ignored or not