Fork me on GitHub

Sometimes async throw a assert fail if put a nil on a chan.

Exception in thread "async-thread-macro-39" java.lang.AssertionError: Assert failed: (not (nil? itm))
but usually this error will give me no information about where it happens in my source code.

Alex Miller (Clojure team)15:04:58

nils are invalid channel values. if you're putting a nil on a channel in a go block or thread, you need to have some try/catch handling around the puts to be able to catch those errors

Alex Miller (Clojure team)15:04:31

although it would be better to ensure you don't put it in the first place

Ivan Koz16:04:12

from docs to pipeline

Because it is parallel, the transducer will be applied independently to each element, not across elements, and may produce zero or more outputs per input.
But how transducers can return multiple results?

Ivan Koz16:04:44

transducer calls to other reducing function with result and value(one value)

Alex Miller (Clojure team)16:04:57

It can do that more than once

Ivan Koz16:04:09


Alex Miller (Clojure team)16:04:17

‘mapcat` actually does it with a nested reduce iirc


I know I can't put a nil on a channel. but if this happens, how to find where the problem happened? is there any trick or I have to check one by one. (there're a lot usage in my code)

Ivan Koz16:04:17

breakpoint on exception in cursive may help i guess, to examine threads stacks

Ivan Koz16:04:07

dont you see the source of your nil value in stacktrace?


core.async stack traces are often far from helpful


because the method that actually called your code is calling a part of the state machine that core.async made from your go block, not any named object identifiable directly in your code

Ivan Koz16:04:17

bad, some metadata on generated code must be helpful


We have some chains of async blocks in our code, and I ended up writing a guarded-go macro and guarded-take! macro for our async bits. guarded go is a go block wrapped in a try/catch, if an exception occurs it will catch it and return it on the channel. for consuming those channels, guarded-take! will check if the value returns is an exception, and re-raise it after attaching on some information about the call location


It's not the prettiest, definitely felt wrong writing it, but it has helped make exceptions easier to deal with in some of our more complex async code


the exception eventually winds its way out to the caller and you can see exactly what path caused the issue


(or in the case of something recoverable or that needs special handling, can be caught and dealt with)


the full exception stack trace is

Exception in thread "async-thread-macro-15" java.lang.AssertionError: Assert failed: (not (nil? itm))
        at clojure.core.async.impl.protocols$add_BANG_.invokeStatic(protocols.clj:40)
        at clojure.core.async.impl.protocols$add_BANG_.invoke(protocols.clj:37)
        at clojure.core$map$fn__5864$fn__5865.invoke(core.clj:2742)
        at clojure.core.async.impl.channels$chan$fn__6842.invoke(channels.clj:300)
        at clojure.core.async.impl.channels.ManyToManyChannel.put_BANG_(channels.clj:143)
        at clojure.core.async$_GT__BANG__BANG_.invokeStatic(async.clj:143)
        at clojure.core.async$_GT__BANG__BANG_.invoke(async.clj:138)
        at clojure.core.async$pipeline_STAR_$process__11435.invoke(async.clj:492)
        at clojure.core.async$pipeline_STAR_$fn__11447.invoke(async.clj:507)
        at clojure.core.async$thread_call$fn__11358.invoke(async.clj:442)
        at java.util.concurrent.ThreadPoolExecutor.runWorker(
        at java.util.concurrent.ThreadPoolExecutor$
can I confirm it come from a pipeline-*?


That would be my first guess, but I couldn't say for sure


If pass the exception throw the async channel, what is the downside?


You'll need to check for exceptions manually at the other side; they won't be thrown automatically. That's why I pair using my guarded-go with a guarded-take!


@doglooksgood is the issue at line 2742 of your core.clj file?


that's in clojure's core.clj


I do think that the issue is happening in a pipeline, in which case you aren't going to be able to write any replacement macros that handle the case


is there a elegant way to deal with the possible exceptions with core.async. I don't want to try/catch everywhere. I can write the predicate but sometimes it may not cover the case.

Ivan Koz17:04:21

always return default value?


maybe it's not a good way I think


the elegant way is to not put nils on channels


in general, if you have something that returns "nil" it is likely some io operation which people insist in putting in transducers on pipelines for some god awful reason


like, if you don't consume and produce meaningful values, don't put it in the middle of a pipeline


I know some company wrote some terrible blog post that did that and it made the front page of various aggregators, but it is bad


that's right, I should rewrite some pipeline-blocking


or just, you know, use an executor


yup. ultimately the exception stuff I describe is our last ditch "something went wrong" handler to ensure that we return a valid response (this is for a web service). in "regular" usage, it shouldn't be hit, only when something goes terribly wrong. I wouldn't use it as a core building block

Ivan Koz17:04:28

i'm looking if its even possible to create debouncing transducer which drops any element except last one in a period of time. Looks like there is no way to deliver value when timeout was hit, because we need one more transducer call for that, which may never happen for a channel. Any ideas?


that is not a transducer


that is a special case of copying from channel A to channel B

Ivan Koz17:04:52

yeah due to sequential nature of transducers and abstracted iteration, i don't think its possible

Ivan Koz17:04:29

channel to channel copying is easy tho


core.async is largely about writing machines, not data processing, transducers are for data processing, there for if you are reaching for transducers writing a core.async program, the odds are pretty good that you are making a mistake

Ivan Koz17:04:22

data processing machines

Alex Miller (Clojure team)17:04:28

that makes no sense re transducers

Alex Miller (Clojure team)17:04:08

one of the major reasons transducers exist is core.async


but if you are writing a little stateful machines, there is very little demand for sticking functional data processing (map, filter, etc) in the middle of the queues connecting those machines. And it is super weird to stick io operations in the middle of the queues connecting those machines.


if you are using core.async as a substrate for parallel data processing then it does make sense

Ivan Koz17:04:10

you write data processing code, core async makes it state machine i think you missing that part


so it depends on which type of core.async usage you have, and I would argue the more compelling reason to use core.async is for the machines, not parallel data processing


I am well aware of the go macro and the state machine transform


yeah, there's other stuff for clojure that is much better at parallelizing things


the state machine transform the go macro does is largely about stateful machines processing queues (channels)


and there are even programs that do both kinds of core.async usage, and pipelines are, in my mind, basically the interface between those two kinds of core.async programs


I found my problem, thanks to mount I can start my program partly. 😎


(I have opinions about mount too)


Would love to hear them. We considered using it on our project but went with component instead


but this is my first big clojure project, so trying to sponge up as much as I can these days


mount encourages reliance on global singletons which is not a good way to structure a program


that was our conclusion as well. we went with component as it seemed to be a more manageable approach, plus being able to run multiple systems concurrently meant nice isolation for repl/test


it took me about 30 mins to find out where the exception come from.


hi, I have a use case where I am working with 30-40 items, for each item I need to make 3 http calls (each http call returns a piece of information about the item), and then at the end I want an aggregated results, so the number of http calls will be between 90-120, I am using pipeline-async with 3 stages, although it works I get SSLEngine is closing/closed or timeout exception for some calls, I want to limit the number of http calls that are in progress at any point, is it possible here?


here is my setup


maybe you can try pipeline-blocking with a connection-pool of http client?


Ok, what is the difference between pipeline-async and pipeline-blocking when applied to above setup? Will pipeline-async trigger 1st call, then trigger next call without waiting for result, and pipeline-blocking trigger at most n calls (as specified in concurrency parameter)?


pipeline-async because of its construction barely limits anything in flight


so if you need to limit the number of external calls inflight pipeline-async isn't a great choice


I would start with creating an executor with a fixed thread count, and making sure all your http requests happen on that executor


then no matter what the code that makes http requests looks like (pipelines, different threads, etc) you can always limit the number in flight based on the number of threads available to the executor


Ok, I think I will try pipeline-blocking first before going to the executor level


thanks for ideas! 👍:skin-tone-3:

Ivan Koz20:04:03

Made a debouncer using just core.async, not even sure if it's correct. Clearly it can be done using standalone resetable timer instead of deliver function. Would be nice to get some feedback.

(defn debounce
  ([c ms] (debounce c (chan) ms))
  ([c c' ms]
   (let* [vkey (volatile! nil)
          deliver (fn [key val]
                    (go (<! (timeout ms))
                        (when (= key @vkey)
                          (>! c' val))))]
     (go-loop []
       (let [val (<! c)
             key (vreset! vkey (keyword (gensym "id_")))]
         (deliver key val))


I don't even know what you are doing


the volatile thing is weird, the gensym keyword is weird

Ivan Koz20:04:25

something like that


(defn debounce [in out t]
    (let [msg (async/<! in)]
      (if (some? msg)
        (if (async/>! out msg)
          (let [to (async/timeout t)]
            (loop []
              (async/alt! to
                           (debounce in out t))
                           (if (some? msg)
                             (async/close! out))))))
          (async/close! in))
        (async/close! out)))))


(never actually ran that or pasted it in a repl, so ymmv)

Ivan Koz20:04:50

yeah thanks for example, looking into it but i need some feedback on my implementation


volatile in core.async seems like a bad idea

Ivan Koz20:04:27

even considering vreset?


the whole thing is just weird


creating go blocks without waiting for them


@nxtk volatiles are faster than atoms because do nothing to ensure values stay in synch between threads, core.async go guarantees you can't control what thread that code is running in


at the very least I would use (Object.) instead of that keyword gensym thing

Ivan Koz20:04:36

@noisesmith arent (go) about single threaded CSP?

Ivan Koz20:04:11

i just started with coreasync this morning, too much information for one day, may be missing some pieces


on the JVM, go uses a thread pool, on JS there's no volatile! iirc


it's small, but it's usually going to be more than one thread

Ivan Koz20:04:52

so one thread is only cljs specific of go?

Ivan Koz20:04:01

yeah then my solution is not optimal


you are also generating timeouts for everything that goes through, regardless of it will be ignored or not


all the callbacks that go blocks turn in to


all of which are supposed to be relatively cheap


but timeouts, for example, are globals, so if you create a timeout and then wait on consuming a value from it, that isn't eligible for gc until the timeout ends

Ivan Koz20:04:35

thanks guys