Fork me on GitHub

I was a little surprised by this:

(let [c (doto (a/chan) (a/offer! 1))]
    (Thread/sleep 100)
    (a/poll! c))
;;=> nil

Drew Verlee02:01:34

why did you add the sleep line? What did that have to do with your question? Just trying to understand the idea here.


So with an unbuffered channel, offer! never succeeds? Is that why I get nil ? The handshake can't happen with offer! ?


That makes sense, based on the docstring:

user=> (doc clojure.core.async/offer!)
([port val])
  Puts a val into port if it's possible to do so immediately.
   nil values are not allowed. Never blocks. Returns true if offer succeeds.
user=> (require '[clojure.core.async :as a])
user=> (a/offer! (a/chan) 1)
Channels are synchronization points and an unbuffered channel has no "space" to put values into.


user=> (a/offer! (a/chan 1) 1)


Ya, it does make sense, but also is a bit surprising.


I'm surprised that you find it surprising 🙂


I don't know, feel like if you asked this in some pretend core.async quiz, a lot of people would think it was a trick question


It happened to me because something was offering to a chan, and in my repl I did a (<!! c) on it, which I know better to always use poll!, but I forgot this time, and it froze my REPL. Looking at the code I was confused for a while. Especially when I switched to a poll! and was getting nil, until I finally clued in

Ben Sless07:01:54

The questions to always ask yourself in c.a. are "is this blocking", " can this block"


It reminds me of some of the popup quiz back in the day in school for Java haha, Like questions about if i++ or ++i would result in different outcomes 😛

Ben Sless07:01:22

You wouldn't want to use a blocking op in the REPL. Ever

Ben Sless07:01:04

Channel size can also be a bit leaky. Putting in a channel blocks depending on its current "capacity"

Ben Sless07:01:35

An unbuffered channel has zero capacity. Better to think about it like a rendezvous

Drew Verlee02:01:47

what is a rendezvous? I can't seem to find this term defined in a context that makes sense to me. It pops up here

Ben Sless05:01:53

A rendezvous is a point in space and time where two meet, in this case, the processes under discussion meet at the channel, and only once they're both there they exchange some data and go about their business

Drew Verlee05:01:06

I think i get it, ill have to check the paper for context.

Ben Sless05:01:47

The original paper has only unbuffered channels

Ben Sless07:01:51

Channels do more than one thing. They're queues, they're synchronization primitives. Whenever you think about them, you have to keep in mind both time and space


Yeah, for me it took several years to "get" that (unbuffered) channels are not queues, they're rendezvous points.

Ben Sless07:01:46

Reading Hoare's paper helped. Implementing the examples really helped


Heh, given that I studied CSP and Occam at university back in the early '80s, I really ought to have found core.async a lot more natural than I did...


(and one of my early jobs was writing a transpiler to Parallel C to run on a grid of Transputers)

Ben Sless07:01:32

Core async is hard to "grok"

Ben Sless07:01:55

You have to think about time, space, and synchronous code, while clojure isn't really synchronous


Occam was easier to grok? Or you just forgot everything cause it been so long?


Mostly the latter. That was forty years ago.

Ben Sless07:01:13

This is one reason I'm currently evaluating missionary.


I'm writing an async library on top of core.async. It brings JS like async/await and all of its accompanying functions like race, any, all as well as the functions Promesa adds like then, handle, alet, do!, etc.

Ben Sless07:01:02

Why not just use promesa?

Ben Sless07:01:59

It's just so easy to make terrible mistakes and leak resources with core async

Ben Sless07:01:26

You already have race and any in c.a.

Ben Sless07:01:40

Race is alt, any is a merge which closes after the first result


Oh and I added compute and blocking as alternative to async as well for running heavy computation and blocking ops

Ben Sless07:01:00

But that's only valid of you work with channels as promises


The whole thing works over promise-chan

Ben Sless07:01:49

> The 80s were 40 years ago

🤯 1

My first programming was almost 50 years ago at this point.


(I had a Sinclair Programmable Calculator in the early/mid-'70s)


I find the async/await model way simpler to grok haha. So even though I'm close-ish to done and implemented it all over core.async, I still get confused by basics of core.async when I go back to that haha

Ben Sless07:01:59

I hate async await :)


I've been watching Missionary. I think it's very interesting. But it's a pretty serious DSL to learn and it is pretty cryptic.


Well, the nice thing is its still core.async, you can mix and match.

Ben Sless07:01:46

I used to think it was a cryptic DSL but it turns out that's not so much compared to core async


But I added some niceties, cancelation, error propagation and handling, etc.

Ben Sless07:01:14

How can you cancel in core.async?


Well, not really in core.async 😛 but in my layer above it you can.

Ben Sless07:01:57

Without leaking?


I think so 😛


What exactly leaks?


(-> (race [(blocking
              (Thread/sleep 100)
              (when (cancelled?)
                (println :cancelled)))
              (Thread/sleep 100)
              (when (cancelled?)
                (println :cancelled)))
              (Thread/sleep 100)
              (when (cancelled?)
                (println :cancelled)))
             (async :done)])
      (handle println))
;; Prints:

Ben Sless07:01:59

Will the sleep be interrupted by cancel?


Sleep no, it works like how you need to handle thread interup

Ben Sless07:01:35

yes, but that's a leak


By checking for (cancelled?) explicitly

Ben Sless07:01:52

you could be hogging CPU and you need to stop immediately and return the resource

Ben Sless07:01:03

or socket, or connection in a pool

Ben Sless07:01:41

This is why async is hard and terrible and a pain to get right


Hum... I don't think this would be a problem apart for Thread/sleep, which why would you use that anyways in async code?

Ben Sless07:01:26

Because now it's not sleep

Ben Sless07:01:33

it's very-costly-calculation

Ben Sless07:01:08

It's network-request

Ben Sless07:01:40

have you watched Leo's talk from reClojure?


(-> (compute
       (dotimes [i 10000000000]
         (when-not (cancelled?)
           (fib i))))
      (handle println))


Ideally, your fib also checks for (cancelled?) inside itself

Ben Sless07:01:43

that's not a good example, and if you have to check for cancelled inside everything that's a leaky implementation

Ben Sless07:01:50

and I can't use any expensive function from the outside world


That's just how Threads work though

Ben Sless07:01:02

you're forcing me to mix "business logic" with framework

Ben Sless07:01:46

nope, threads can be interrupted, then they raise an interrupt exception which you can catch and handle

Ben Sless07:01:51

even in medias res

Ben Sless07:01:27

all of this to say, it's hard


Hum... I don't think that's correct


Thread.sleep throws an InterruptedException if the thread is interupted


Within your computation, you need to check for Thread.interrupted() explicitly


> The interrupt mechanism is implemented using an internal flag known as the interrupt status. Invoking Thread.interrupt sets this flag. When a thread checks for an interrupt by invoking the static method Thread.interrupted, interrupt status is cleared. The non-static isInterrupted method, which is used by one thread to query the interrupt status of another, does not change the interrupt status flag.


Yea, I think all the interrupt/cancellation options are cooperative.


so your workers have to cooperatively check as @didibus was saying.

Ben Sless07:01:29

that's pretty terrible from a correctness perspective

Ben Sless07:01:48

and another point in favor of missionary which does it for you


It's a performance trade-off, otherwise the runtime has to insert a check in between every CPU instructions


Missionary can auto-magically interup a long running computation?


missionary can only sort of do that for you right? unless it's implementing its own java vm?


if you have a loop that missionary can see, I can imagine how it could re-write it to cooperatively check every loop, but if you call another function that missionary can't inspect, then it would have to apply some deep magic to overcome that


it's cooperative, like threads


So what would you need to do if you wanted to be able to cancel:

(while true (+ 1 1))
In Missionary?


insert checks with !, same pattern as Thread/interrupted or your cancelled?


Hum, not a bad idea to have it fallback to thread interup outside a process


Also, can you explain this part: > Throws an exception if current computation is required to terminate


it is just a different api, instead of returning true or false it returns nil or throws


like assert


Oh, so you mean, if it was canceled it will throw, ok ok. I just read the sentence wrong, I thought you could like flag that the process is not allowed to be terminated and then it would throw haha.


I didn't think about throwing instead of returning false... It could make it easier for people to add cancellation checks inside a process. Thanks for that, I'll think about it if it might be a better API design


BTW keep in mind Thread/interrupted is side-effectful


In my coming-soon lib you'd do:

(def infinite-adder (compute (while (not (cancelled?)) (+ 1 1))
                               (println "I've been cancelled!")))

(cancel infinite-adder)
;;> I've been cancelled!


If you were feeling ambitious, you could have a deep walking macro that expands while to see that it's a loop/recur and replace the recur with recur + interrupt check


Maybe eventually I'll experiment haha. Seems it would be too janky though, like would not work in all scenarios


another idea is to have a namespace that adds interrupt checking to a number of useful looping constructs


yea, it would be pretty challenging


I did wonder if I could hijack clojure.core fn so that all fn check to see for cancelation, that would cover all recursion. And then I guess there'd be loop/recur to deal with. But ya, also thought that's just too invasive to be reliable


next level would be analyzing compiled bytecode and also inspecting called functions and either rewriting or rejecting function calls that don't check for interrupts


or you could just switch to erlang's BEAM vm


Haha, yup, I think switching to Clojerl at that point would start to make sense


maybe if we wait long enough, project loom will help


It probably will. But, I was also inspired by this:

🆒 1

Which seem to say that, in the end, you kind of need to be aware and in control of your async logic.


So I basically made: async/compute/blocking//await


async for small computations that are basically IO polls, moving things around, etc, running on the core.async go machinery and go threadpool. compute for heavy computation running on a pool of num of cores. blocking for blocking things running on the go thread pool which is unbounded and cached.


another idea I've thought about is that if clojure-in-clojure becomes a thing, you could add another compiler pass that does stuff like adds interrupt checks

Ben Sless08:01:51

This guide is nice but inaccurate

Ben Sless08:01:11

An unbound pool for blocking operations is a good way to OOM


> To be clear, this is a very dangerous type of thread pool. It isn't going to prevent you from just allocating more and more threads as the others block, which is a very dangerous state of affairs. You need to make sure that any data flow which results in running actions on this pool is externally bounded, meaning that you have semantically higher-level checks in place to ensure that only a fixed number of blocking actions may be outstanding at any point in time (this is often done with a non-blocking bounded queue).


To be fair, it does state that it is a very dangerous thread pool because of that. But he recommends being externally bounded, like say bounding the number of incoming requests

Ben Sless08:01:22

Nice to see this caveat was added


Hum.... actually, ya that part I had forgotten about. I'm not sure I get what they say about using a non-blocking bounded queue

Ben Sless08:01:45

Sometimes I just want to have monads when dealing with async


I'm not sure even Monads save you from everything. Once you go multi-threaded and blocking it gets tough. That's why say in NodeJS Async is super easy, cause in a single-threaded with only non-blocking IO its very straightforward

Ben Sless08:01:37

Monads do fix it, but good luck working out the types there


I just don't see how a Monad fix the problem of interupting something arbitrarily

Ben Sless08:01:15

Doesn't fix, but makes it easier if you build everything from the ground up with them


BEAM VM kinds of fix it by having each entry into a function yield after some N number of entries, and all looping is recursive and causes an entry into a function. But still it can't interupt midway into a function.


Ya, Monads are hard enough to grok and use in a synchronous single threaded context 😛


NodeJS async doesn't handle the case where you have a stream of incoming events that you don't initiate, right? For example, a stream of mouse clicks


Wait, so does any of you know what was meant by: > this is often done with a non-blocking bounded queue I wonder if I can implement that under the hood for people using the lib


By the way, I am doing this library mostly for fun, and so that I can point people to an async/await lib when they complain that Clojure doesn't have async/await 😛


Though now that it's taken shape, I'm kind of like, async/await + some macro sugar and functional goodness is actually pretty neat


Something else I've been pondering to add, is I want to also add a "process" abstraction. So now I added a one asynchronous value at a time model. You kick off an async process that should return once and be done. async fires it, and await waits for its result. I wanted to add something that can be a stream of ins/outs as well. But also hoping it fits in well with the rest, and is relatively more straightforward maybe to grok. What I've arrived at is something that conceptually look a bit like this:


The idea would be, you create a process-factory which specifies ins and outs. The ins are channels, and the outs are channels. And the body is the block of code running the process, where you are free to take from the ins and put in the out as you see fit. Channels would default to a buffer of 1, but you could specify batches if you wanted to make the process batching instead of streaming. When you call the process-factory, it would return something like:

{:ins [chan chan chan]
 :out [chan]}
This is now a running "process". Then you'd have process-connectors that connects outs of one process to ins of another. Some process-connectors can be one to one, others can fan-out and replicate a message from an out to multiple ins. Some process connectors could do buffering or rate limits, etc. They can always be canceled, because they'd wrap a go-loop/thread-loop which checks for (cancelled?) implicitly.
(deffactory random-number-generator-factory
  :ins []
  :outs [random-number]
  (sleep 1000)
  (>! random-number (rand-int 10000))

(deffactory file-appender-factory
  :ins [line]
  :outs []
  (when-let [line (<! line)]
    (spit file-name line :append true)))

(def random-number-generator1 (random-number-generator-factory))
(def random-number-generator2 (random-number-generator-factory))
(def file-appender (file-appender-factory "/tmp/test.txt"))

(connect random-number-generator1 :random-number file-appender :line)
(connect random-number-generator2 :random-number file-appender :line)
Curious if anyone ever tried something of that sort over core.async? P.S.: Got a bit carried away here, sorry for the spamming, still core.async adjacent, but probably should re-post this to a forum, anyways still taking feedback here if anyone has any thoughts.

Ben Sless09:01:10

This is what I have in more.async.dataflow


Interesting. I'll have a look at it for inspo


how do i create a "void" channel? sometimes i need it to put in the to of a pipeline a channel that just accepts and discard I'm using a channel with a sliding buffer with size=1

Ben Sless17:01:30

/dev/null channel 🙂

Alex Miller (Clojure team)17:01:51

some would call this a sink

Alex Miller (Clojure team)18:01:05

we have a ticket for that actually I think

Alex Miller (Clojure team)18:01:28

although I guess that's slightly different than a sink channel

Ben Sless18:01:27

And there I thought I was being creative when I implemented one myself 🙃

Ben Sless18:01:27

Is there a respective ask.clojure for it I can vote up?


This is already way better than what we have now The issue is with the meaning When I create a channel with a sliding buffer with size=1, it may be confusing to the next developer if it does means something or not (doto (chan) sink) is way more explicit in what I'm doing

Alex Miller (Clojure team)18:01:33

there's actually a drain in the old labs namespace too think

Alex Miller (Clojure team)18:01:49

nvm guess that was removed


is there a way to get the current size of a channel? I'm thinking to have something that picks the channel with the most items

Alex Miller (Clojure team)18:01:54

You could make an observable buffer if you wanted to do something like that

👍 1

I realized maybe it's good enough if I pick the first one and remove it from the list. What is the best way to lump things together by topic? Say I have flower-send events and I want to lump them by country and city. Then every second I send flowers to all countries but only 1 city for each country. I was thinking about using an agent with the events aswell.


You could use mult and filter in the receiver.


a pubsub is the way to split things, multiple splits use multiple pubsubs


the tricky thing is if you want to dynamically add a consumer if one doesn't exist


yea indeed. It's probably an atom with a map or set of consumers somehow


pubsubs are mutable state, combining mutating that with mutating your atom is asking for a race condition


ah like adding a sup and also adding this sup to an atom say? And when I do that both I'm asking for that to not work?


unless you are very careful, you will end up doing things like adding a record to your atom then adding a subscription, and between those two operations things will be dropped, or in the other order you'll end up with multiple consumers