This page is not created by, affiliated with, or supported by Slack Technologies, Inc.
2022-01-13
Channels
- # admin-announcements (1)
- # announcements (40)
- # aws (2)
- # babashka (46)
- # babashka-sci-dev (106)
- # beginners (31)
- # cider (5)
- # circleci (2)
- # clj-kondo (48)
- # clojure (118)
- # clojure-belgium (1)
- # clojure-chicago (3)
- # clojure-europe (19)
- # clojure-ireland (3)
- # clojure-losangeles (2)
- # clojure-nl (2)
- # clojure-spec (10)
- # clojure-uk (4)
- # clojurescript (18)
- # community-development (5)
- # core-async (159)
- # cursive (16)
- # datomic (16)
- # etaoin (1)
- # fulcro (21)
- # funcool (14)
- # graalvm (5)
- # gratitude (4)
- # holy-lambda (28)
- # jobs (1)
- # jobs-discuss (1)
- # kaocha (1)
- # lsp (12)
- # malli (21)
- # meander (12)
- # music (1)
- # off-topic (8)
- # portal (5)
- # react (18)
- # reitit (1)
- # releases (4)
- # remote-jobs (1)
- # shadow-cljs (56)
- # timbre (4)
I was a little surprised by this:
(let [c (doto (a/chan) (a/offer! 1))]
(Thread/sleep 100)
(a/poll! c))
;;=> nil
why did you add the sleep line? What did that have to do with your question? Just trying to understand the idea here.
So with an unbuffered channel, offer!
never succeeds? Is that why I get nil
? The handshake can't happen with offer! ?
That makes sense, based on the docstring:
user=> (doc clojure.core.async/offer!)
-------------------------
clojure.core.async/offer!
([port val])
Puts a val into port if it's possible to do so immediately.
nil values are not allowed. Never blocks. Returns true if offer succeeds.
nil
user=> (require '[clojure.core.async :as a])
nil
user=> (a/offer! (a/chan) 1)
nil
Channels are synchronization points and an unbuffered channel has no "space" to put values into.user=> (a/offer! (a/chan 1) 1)
true
I'm surprised that you find it surprising 🙂
I don't know, feel like if you asked this in some pretend core.async quiz, a lot of people would think it was a trick question
It happened to me because something was offering to a chan, and in my repl I did a (<!! c) on it, which I know better to always use poll!, but I forgot this time, and it froze my REPL. Looking at the code I was confused for a while. Especially when I switched to a poll! and was getting nil, until I finally clued in
The questions to always ask yourself in c.a. are "is this blocking", " can this block"
It reminds me of some of the popup quiz back in the day in school for Java haha, Like questions about if i++ or ++i would result in different outcomes 😛
Channel size can also be a bit leaky. Putting in a channel blocks depending on its current "capacity"
An unbuffered channel has zero capacity. Better to think about it like a rendezvous
what is a rendezvous? I can't seem to find this term defined in a context that makes sense to me. It pops up here https://en.wikipedia.org/wiki/Communicating_sequential_processes
A rendezvous is a point in space and time where two meet, in this case, the processes under discussion meet at the channel, and only once they're both there they exchange some data and go about their business
I think i get it, ill have to check the paper for context.
Channels do more than one thing. They're queues, they're synchronization primitives. Whenever you think about them, you have to keep in mind both time and space
Yeah, for me it took several years to "get" that (unbuffered) channels are not queues, they're rendezvous points.
Heh, given that I studied CSP and Occam at university back in the early '80s, I really ought to have found core.async a lot more natural than I did...
(and one of my early jobs was writing a transpiler to Parallel C to run on a grid of Transputers)
You have to think about time, space, and synchronous code, while clojure isn't really synchronous
Mostly the latter. That was forty years ago.
I'm writing an async library on top of core.async. It brings JS like async/await and all of its accompanying functions like race, any, all as well as the functions Promesa adds like then, handle, alet, do!, etc.
Oh and I added compute and blocking as alternative to async as well for running heavy computation and blocking ops
Mostly the latter. That was forty years ago.
My first programming was almost 50 years ago at this point.
(I had a Sinclair Programmable Calculator in the early/mid-'70s)
I find the async/await model way simpler to grok haha. So even though I'm close-ish to done and implemented it all over core.async, I still get confused by basics of core.async when I go back to that haha
I've been watching Missionary. I think it's very interesting. But it's a pretty serious DSL to learn and it is pretty cryptic.
I used to think it was a cryptic DSL but it turns out that's not so much compared to core async
(-> (race [(blocking
(Thread/sleep 100)
(when (cancelled?)
(println :cancelled)))
(blocking
(Thread/sleep 100)
(when (cancelled?)
(println :cancelled)))
(blocking
(Thread/sleep 100)
(when (cancelled?)
(println :cancelled)))
(async :done)])
(handle println))
;; Prints:
:done
:cancelled
:cancelled
:cancelled
Hum... I don't think this would be a problem apart for Thread/sleep, which why would you use that anyways in async code?
(-> (compute
(dotimes [i 10000000000]
(when-not (cancelled?)
(fib i))))
(handle println))
that's not a good example, and if you have to check for cancelled
inside everything that's a leaky implementation
nope, threads can be interrupted, then they raise an interrupt exception which you can catch and handle
> The interrupt mechanism is implemented using an internal flag known as the interrupt status. Invoking Thread.interrupt sets this flag. When a thread checks for an interrupt by invoking the static method Thread.interrupted, interrupt status is cleared. The non-static isInterrupted method, which is used by one thread to query the interrupt status of another, does not change the interrupt status flag.
Yea, I think all the interrupt/cancellation options are cooperative.
so your workers have to cooperatively check as @didibus was saying.
It's a performance trade-off, otherwise the runtime has to insert a check in between every CPU instructions
missionary can only sort of do that for you right? unless it's implementing its own java vm?
if you have a loop that missionary can see, I can imagine how it could re-write it to cooperatively check every loop, but if you call another function that missionary can't inspect, then it would have to apply some deep magic to overcome that
So what would you need to do if you wanted to be able to cancel:
(while true (+ 1 1))
In Missionary?Also, can you explain this part: > Throws an exception if current computation is required to terminate
it is just a different api, instead of returning true
or false
it returns nil
or throws
Oh, so you mean, if it was canceled it will throw, ok ok. I just read the sentence wrong, I thought you could like flag that the process is not allowed to be terminated and then it would throw haha.
I didn't think about throwing instead of returning false... It could make it easier for people to add cancellation checks inside a process. Thanks for that, I'll think about it if it might be a better API design
In my coming-soon lib you'd do:
(def infinite-adder (compute (while (not (cancelled?)) (+ 1 1))
(println "I've been cancelled!")))
(cancel infinite-adder)
;;> I've been cancelled!
If you were feeling ambitious, you could have a deep walking macro that expands while
to see that it's a loop/recur and replace the recur
with recur
+ interrupt check
Maybe eventually I'll experiment haha. Seems it would be too janky though, like would not work in all scenarios
another idea is to have a namespace that adds interrupt checking to a number of useful looping constructs
yea, it would be pretty challenging
I did wonder if I could hijack clojure.core fn
so that all fn check to see for cancelation, that would cover all recursion.
And then I guess there'd be loop/recur to deal with.
But ya, also thought that's just too invasive to be reliable
next level would be analyzing compiled bytecode and also inspecting called functions and either rewriting or rejecting function calls that don't check for interrupts
or you could just switch to erlang's BEAM vm
maybe if we wait long enough, project loom will help
It probably will. But, I was also inspired by this: https://gist.github.com/djspiewak/46b543800958cf61af6efa8e072bfd5c
Which seem to say that, in the end, you kind of need to be aware and in control of your async logic.
async for small computations that are basically IO polls, moving things around, etc, running on the core.async go machinery and go threadpool. compute for heavy computation running on a pool of num of cores. blocking for blocking things running on the go thread pool which is unbounded and cached.
another idea I've thought about is that if clojure-in-clojure becomes a thing, you could add another compiler pass that does stuff like adds interrupt checks
> To be clear, this is a very dangerous type of thread pool. It isn't going to prevent you from just allocating more and more threads as the others block, which is a very dangerous state of affairs. You need to make sure that any data flow which results in running actions on this pool is externally bounded, meaning that you have semantically higher-level checks in place to ensure that only a fixed number of blocking actions may be outstanding at any point in time (this is often done with a non-blocking bounded queue).
To be fair, it does state that it is a very dangerous thread pool because of that. But he recommends being externally bounded, like say bounding the number of incoming requests
Hum.... actually, ya that part I had forgotten about. I'm not sure I get what they say about using a non-blocking bounded queue
I'm not sure even Monads save you from everything. Once you go multi-threaded and blocking it gets tough. That's why say in NodeJS Async is super easy, cause in a single-threaded with only non-blocking IO its very straightforward
Doesn't fix, but makes it easier if you build everything from the ground up with them
BEAM VM kinds of fix it by having each entry into a function yield after some N number of entries, and all looping is recursive and causes an entry into a function. But still it can't interupt midway into a function.
Ya, Monads are hard enough to grok and use in a synchronous single threaded context 😛
NodeJS async doesn't handle the case where you have a stream of incoming events that you don't initiate, right? For example, a stream of mouse clicks
Wait, so does any of you know what was meant by: > this is often done with a non-blocking bounded queue I wonder if I can implement that under the hood for people using the lib
By the way, I am doing this library mostly for fun, and so that I can point people to an async/await lib when they complain that Clojure doesn't have async/await 😛
Though now that it's taken shape, I'm kind of like, async/await + some macro sugar and functional goodness is actually pretty neat
Something else I've been pondering to add, is I want to also add a "process" abstraction. So now I added a one asynchronous value at a time model. You kick off an async process that should return once and be done. async
fires it, and await
waits for its result.
I wanted to add something that can be a stream of ins/outs as well. But also hoping it fits in well with the rest, and is relatively more straightforward maybe to grok.
What I've arrived at is something that conceptually look a bit like this:
The idea would be, you create a process-factory which specifies ins and outs. The ins are channels, and the outs are channels. And the body is the block of code running the process, where you are free to take from the ins and put in the out as you see fit. Channels would default to a buffer of 1, but you could specify batches if you wanted to make the process batching instead of streaming. When you call the process-factory, it would return something like:
{:ins [chan chan chan]
:out [chan]}
This is now a running "process".
Then you'd have process-connectors that connects outs of one process to ins of another. Some process-connectors can be one to one, others can fan-out and replicate a message from an out to multiple ins. Some process connectors could do buffering or rate limits, etc.
They can always be canceled, because they'd wrap a go-loop/thread-loop which checks for (cancelled?) implicitly.
(deffactory random-number-generator-factory
:ins []
:outs [random-number]
(sleep 1000)
(>! random-number (rand-int 10000))
(deffactory file-appender-factory
[file-name]
:ins [line]
:outs []
(when-let [line (<! line)]
(spit file-name line :append true)))
(def random-number-generator1 (random-number-generator-factory))
(def random-number-generator2 (random-number-generator-factory))
(def file-appender (file-appender-factory "/tmp/test.txt"))
(connect random-number-generator1 :random-number file-appender :line)
(connect random-number-generator2 :random-number file-appender :line)
Curious if anyone ever tried something of that sort over core.async?
P.S.: Got a bit carried away here, sorry for the spamming, still core.async adjacent, but probably should re-post this to a forum, anyways still taking feedback here if anyone has any thoughts.how do i create a "void" channel?
sometimes i need it to put in the to
of a pipeline
a channel that just accepts and discard
I'm using a channel with a sliding buffer with size=1
some would call this a sink
we have a ticket for that actually I think
although I guess that's slightly different than a sink channel
should be
https://ask.clojure.org/index.php/597/can-we-get-a-generic-sink-operation
This is already way better than what we have now
The issue is with the meaning
When I create a channel with a sliding buffer with size=1, it may be confusing to the next developer if it does means something or not
(doto (chan) sink)
is way more explicit in what I'm doing
there's actually a drain in the old labs namespace too think
nvm guess that was removed
is there a way to get the current size of a channel? I'm thinking to have something that picks the channel with the most items
You could make an observable buffer if you wanted to do something like that
I realized maybe it's good enough if I pick the first one and remove it from the list. What is the best way to lump things together by topic? Say I have flower-send events and I want to lump them by country and city. Then every second I send flowers to all countries but only 1 city for each country. I was thinking about using an agent with the events aswell.
pubsubs are mutable state, combining mutating that with mutating your atom is asking for a race condition