Fork me on GitHub
#core-async
<
2019-04-24
>
tianshu13:04:55

Sometimes async throw a assert fail if put a nil on a chan.

Exception in thread "async-thread-macro-39" java.lang.AssertionError: Assert failed: (not (nil? itm))
but usually this error will give me no information about where it happens in my source code.

Alex Miller (Clojure team)15:04:58

nils are invalid channel values. if you're putting a nil on a channel in a go block or thread, you need to have some try/catch handling around the puts to be able to catch those errors

Alex Miller (Clojure team)15:04:31

although it would be better to ensure you don't put it in the first place

Ivan Koz16:04:12

from docs to pipeline

Because it is parallel, the transducer will be applied independently to each element, not across elements, and may produce zero or more outputs per input.
But how transducers can return multiple results?

Ivan Koz16:04:44

transducer calls to other reducing function with result and value(one value)

Alex Miller (Clojure team)16:04:57

It can do that more than once

Ivan Koz16:04:09

interesting

Alex Miller (Clojure team)16:04:17

‘mapcat` actually does it with a nested reduce iirc

tianshu16:04:53

I know I can't put a nil on a channel. but if this happens, how to find where the problem happened? is there any trick or I have to check one by one. (there're a lot usage in my code)

Ivan Koz16:04:17

breakpoint on exception in cursive may help i guess, to examine threads stacks

Ivan Koz16:04:07

dont you see the source of your nil value in stacktrace?

noisesmith16:04:32

core.async stack traces are often far from helpful

noisesmith16:04:20

because the method that actually called your code is calling a part of the state machine that core.async made from your go block, not any named object identifiable directly in your code

Ivan Koz16:04:17

bad, some metadata on generated code must be helpful

robertfw16:04:02

We have some chains of async blocks in our code, and I ended up writing a guarded-go macro and guarded-take! macro for our async bits. guarded go is a go block wrapped in a try/catch, if an exception occurs it will catch it and return it on the channel. for consuming those channels, guarded-take! will check if the value returns is an exception, and re-raise it after attaching on some information about the call location

robertfw16:04:27

It's not the prettiest, definitely felt wrong writing it, but it has helped make exceptions easier to deal with in some of our more complex async code

robertfw16:04:55

the exception eventually winds its way out to the caller and you can see exactly what path caused the issue

robertfw16:04:46

(or in the case of something recoverable or that needs special handling, can be caught and dealt with)

tianshu16:04:07

the full exception stack trace is

Exception in thread "async-thread-macro-15" java.lang.AssertionError: Assert failed: (not (nil? itm))
        at clojure.core.async.impl.protocols$add_BANG_.invokeStatic(protocols.clj:40)
        at clojure.core.async.impl.protocols$add_BANG_.invoke(protocols.clj:37)
        at clojure.core$map$fn__5864$fn__5865.invoke(core.clj:2742)
        at clojure.core.async.impl.channels$chan$fn__6842.invoke(channels.clj:300)
        at clojure.core.async.impl.channels.ManyToManyChannel.put_BANG_(channels.clj:143)
        at clojure.core.async$_GT__BANG__BANG_.invokeStatic(async.clj:143)
        at clojure.core.async$_GT__BANG__BANG_.invoke(async.clj:138)
        at clojure.core.async$pipeline_STAR_$process__11435.invoke(async.clj:492)
        at clojure.core.async$pipeline_STAR_$fn__11447.invoke(async.clj:507)
        at clojure.core.async$thread_call$fn__11358.invoke(async.clj:442)
        at clojure.lang.AFn.run(AFn.java:22)
        at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149)
        at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624)
        at java.lang.Thread.run(Thread.java:748)
can I confirm it come from a pipeline-*?

robertfw16:04:40

That would be my first guess, but I couldn't say for sure

tianshu16:04:50

If pass the exception throw the async channel, what is the downside?

robertfw16:04:47

You'll need to check for exceptions manually at the other side; they won't be thrown automatically. That's why I pair using my guarded-go with a guarded-take!

markmarkmark16:04:36

@doglooksgood is the issue at line 2742 of your core.clj file?

markmarkmark16:04:30

that's in clojure's core.clj

markmarkmark16:04:21

I do think that the issue is happening in a pipeline, in which case you aren't going to be able to write any replacement macros that handle the case

tianshu17:04:48

is there a elegant way to deal with the possible exceptions with core.async. I don't want to try/catch everywhere. I can write the predicate but sometimes it may not cover the case.

Ivan Koz17:04:21

always return default value?

tianshu17:04:16

maybe it's not a good way I think

hiredman17:04:18

the elegant way is to not put nils on channels

hiredman17:04:30

in general, if you have something that returns "nil" it is likely some io operation which people insist in putting in transducers on pipelines for some god awful reason

hiredman17:04:06

like, if you don't consume and produce meaningful values, don't put it in the middle of a pipeline

hiredman17:04:50

I know some company wrote some terrible blog post that did that and it made the front page of various aggregators, but it is bad

tianshu17:04:25

that's right, I should rewrite some pipeline-blocking

hiredman17:04:32

or just, you know, use an executor

robertfw17:04:10

yup. ultimately the exception stuff I describe is our last ditch "something went wrong" handler to ensure that we return a valid response (this is for a web service). in "regular" usage, it shouldn't be hit, only when something goes terribly wrong. I wouldn't use it as a core building block

Ivan Koz17:04:28

i'm looking if its even possible to create debouncing transducer which drops any element except last one in a period of time. Looks like there is no way to deliver value when timeout was hit, because we need one more transducer call for that, which may never happen for a channel. Any ideas?

hiredman17:04:50

that is not a transducer

hiredman17:04:15

that is a special case of copying from channel A to channel B

Ivan Koz17:04:52

yeah due to sequential nature of transducers and abstracted iteration, i don't think its possible

Ivan Koz17:04:29

channel to channel copying is easy tho

hiredman17:04:43

core.async is largely about writing machines, not data processing, transducers are for data processing, there for if you are reaching for transducers writing a core.async program, the odds are pretty good that you are making a mistake

Ivan Koz17:04:22

data processing machines

Alex Miller (Clojure team)17:04:28

that makes no sense re transducers

Alex Miller (Clojure team)17:04:08

one of the major reasons transducers exist is core.async

hiredman17:04:26

but if you are writing a little stateful machines, there is very little demand for sticking functional data processing (map, filter, etc) in the middle of the queues connecting those machines. And it is super weird to stick io operations in the middle of the queues connecting those machines.

hiredman17:04:48

if you are using core.async as a substrate for parallel data processing then it does make sense

Ivan Koz17:04:10

you write data processing code, core async makes it state machine i think you missing that part

hiredman17:04:46

so it depends on which type of core.async usage you have, and I would argue the more compelling reason to use core.async is for the machines, not parallel data processing

hiredman17:04:03

I am well aware of the go macro and the state machine transform

noisesmith17:04:20

yeah, there's other stuff for clojure that is much better at parallelizing things

hiredman17:04:06

the state machine transform the go macro does is largely about stateful machines processing queues (channels)

hiredman17:04:55

and there are even programs that do both kinds of core.async usage, and pipelines are, in my mind, basically the interface between those two kinds of core.async programs

tianshu17:04:10

I found my problem, thanks to mount I can start my program partly. 😎

hiredman17:04:31

(I have opinions about mount too)

robertfw17:04:43

Would love to hear them. We considered using it on our project but went with component instead

robertfw17:04:19

but this is my first big clojure project, so trying to sponge up as much as I can these days

hiredman17:04:17

mount encourages reliance on global singletons which is not a good way to structure a program

robertfw18:04:19

that was our conclusion as well. we went with component as it seemed to be a more manageable approach, plus being able to run multiple systems concurrently meant nice isolation for repl/test

tianshu17:04:12

it took me about 30 mins to find out where the exception come from.

Ajay17:04:30

hi, I have a use case where I am working with 30-40 items, for each item I need to make 3 http calls (each http call returns a piece of information about the item), and then at the end I want an aggregated results, so the number of http calls will be between 90-120, I am using pipeline-async with 3 stages, although it works I get javax.net.ssl.SSLException: SSLEngine is closing/closed or timeout exception for some calls, I want to limit the number of http calls that are in progress at any point, is it possible here?

Ajay17:04:29

here is my setup

tianshu17:04:47

maybe you can try pipeline-blocking with a connection-pool of http client?

Ajay17:04:37

Ok, what is the difference between pipeline-async and pipeline-blocking when applied to above setup? Will pipeline-async trigger 1st call, then trigger next call without waiting for result, and pipeline-blocking trigger at most n calls (as specified in concurrency parameter)?

hiredman17:04:44

pipeline-async because of its construction barely limits anything in flight

hiredman17:04:08

so if you need to limit the number of external calls inflight pipeline-async isn't a great choice

hiredman17:04:22

I would start with creating an executor with a fixed thread count, and making sure all your http requests happen on that executor

hiredman17:04:33

then no matter what the code that makes http requests looks like (pipelines, different threads, etc) you can always limit the number in flight based on the number of threads available to the executor

Ajay17:04:55

Ok, I think I will try pipeline-blocking first before going to the executor level

Ajay18:04:13

thanks for ideas! 👍:skin-tone-3:

Ivan Koz20:04:03

Made a debouncer using just core.async, not even sure if it's correct. Clearly it can be done using standalone resetable timer instead of deliver function. Would be nice to get some feedback.

(defn debounce
  ([c ms] (debounce c (chan) ms))
  ([c c' ms]
   (let* [vkey (volatile! nil)
          deliver (fn [key val]
                    (go (<! (timeout ms))
                        (when (= key @vkey)
                          (>! c' val))))]
     (go-loop []
       (let [val (<! c)
             key (vreset! vkey (keyword (gensym "id_")))]
         (deliver key val))
       (recur)))
   c'))

hiredman20:04:07

I don't even know what you are doing

hiredman20:04:38

the volatile thing is weird, the gensym keyword is weird

Ivan Koz20:04:25

something like that

hiredman20:04:27

(defn debounce [in out t]
  (async/go
    (let [msg (async/<! in)]
      (if (some? msg)
        (if (async/>! out msg)
          (let [to (async/timeout t)]
            (loop []
              (async/alt! to
                          ([_]
                           (debounce in out t))
                          in
                          ([msg]
                           (if (some? msg)
                             (recur)
                             (async/close! out))))))
          (async/close! in))
        (async/close! out)))))

hiredman20:04:52

(never actually ran that or pasted it in a repl, so ymmv)

Ivan Koz20:04:50

yeah thanks for example, looking into it but i need some feedback on my implementation

noisesmith20:04:01

volatile in core.async seems like a bad idea

Ivan Koz20:04:27

even considering vreset?

hiredman20:04:38

the whole thing is just weird

hiredman20:04:33

creating go blocks without waiting for them

noisesmith20:04:11

@nxtk volatiles are faster than atoms because do nothing to ensure values stay in synch between threads, core.async go guarantees you can't control what thread that code is running in

hiredman20:04:24

at the very least I would use (Object.) instead of that keyword gensym thing

Ivan Koz20:04:36

@noisesmith arent (go) about single threaded CSP?

Ivan Koz20:04:11

i just started with coreasync this morning, too much information for one day, may be missing some pieces

noisesmith20:04:13

on the JVM, go uses a thread pool, on JS there's no volatile! iirc

noisesmith20:04:26

it's small, but it's usually going to be more than one thread

Ivan Koz20:04:52

so one thread is only cljs specific of go?

Ivan Koz20:04:01

yeah then my solution is not optimal

hiredman20:04:10

you are also generating timeouts for everything that goes through, regardless of it will be ignored or not

hiredman20:04:28

all the callbacks that go blocks turn in to

hiredman20:04:35

all of which are supposed to be relatively cheap

hiredman20:04:24

but timeouts, for example, are globals, so if you create a timeout and then wait on consuming a value from it, that isn't eligible for gc until the timeout ends

Ivan Koz20:04:35

thanks guys