Fork me on GitHub
#core-async
<
2018-10-25
>
isaac06:10:58

Why core.async name alt! instead of select like go?

Dormo07:10:25

Clojure isn't the only language

Dormo07:10:45

That language is from 83, and it names the equivalent ALT

seancorfield07:10:03

Occam is fun! I did that at university!

Dormo07:10:23

python-csp apparently calls it alt as well.

Dormo07:10:33

I've never heard of Occam until just now after some googling lol

seancorfield07:10:47

(and went on to write a compiler from actuarial formulae to Parallel C on a grid of Transputers!)

seancorfield07:10:20

Well, Go's machinery and core.async are both based on CSP for which Occam is sort of the "reference implementation".

seancorfield07:10:36

So anything based on CSP is likely to use alt I suspect.

Dormo07:10:11

Ah. Yeah, I was specifically googling for what term most other CSP implementations used. Didn't realize Occam was the original implementation

Dormo07:10:17

or a very early one, anyway

seancorfield07:10:21

(I guess my question would be "What language(s) use select?")

Dormo07:10:57

I would appreciate a bit of an explanation of the terminology in the docs, I suppose

Dormo07:10:03

alt! isn't really the most intuitive name

Dormo07:10:01

But I also do (not (empty? col)) instead of (seq col) like people on the internet say to do so shrug

seancorfield07:10:43

Yeah, alt! seems natural if you're used to CSP in any form. But it's all about idioms and I will say that core.async has always made a lot of assumptions in that area.

borkdude11:10:43

(def c (a/chan nil))
(a/put! c :val)
(a/take! c #(debug %))
I can put many times before taking. Doesn’t this mean the channel has a buffer? How does this work again?

borkdude11:10:38

same with:

(def c (a/chan 1 (dedupe)))
(a/put! c :val)
(a/take! c #(debug %))

mpenet11:10:48

there's a pending put (and take) buffer per chan yes

mpenet11:10:53

size is 1024

mpenet11:10:14

there's a talk from rh that explains all this stuff relatively well

mpenet11:10:46

I think that might be that one https://vimeo.com/100518968

borkdude11:10:08

I was there actually in the room 😉

mpenet11:10:30

put! is async, if you want to have blocking behavior constrained by the buffer size you need to use >!!

borkdude11:10:31

but I forgot. I never used channel buffers that much. how does it differ from the put buffer?

borkdude11:10:42

don’t want to block, just understand

mpenet11:10:44

it's a pending put buffer

mpenet11:10:05

I think if there's room in the channel buffer the value will not pass by it (the pending put buffer)

borkdude11:10:34

k. I think (chan 1 (dedupe)) is a reasonable choice for my problem then

mpenet11:10:34

Generally using put! or take! directly is not what you want I think

mpenet11:10:42

even tho for take! this is arguable

borkdude11:10:46

yeah, this is just for testing in the REPL

borkdude11:10:22

I do use put! though. I don’t have to block. I use it for server sent events. Each client (webbrowser connected to our webserver) has a channel. I put some messages in it. When the client connects (or already is connected), it reads those messages.

borkdude11:10:14

(defn send-message-to-user
  [sse user-id message]
  (let [c (channel-for-user sse user-id)]
    (a/put! c message)))

mpenet11:10:43

there's no backpressure there tho

borkdude11:10:03

there doesn’t have to be. if the client will never connect, I don’t want to block anything.

mpenet11:10:31

right, it's just the first message basically

borkdude11:10:09

could be multiple, but sometimes the same, hence dedupe

borkdude11:10:41

it’s a notification system

mpenet11:10:20

I am not familiar with sse, but I used to do something similar with ws, usually you want to have control over the rate of sending of those message, slow consumers can cause issues (waste etc)

mpenet11:10:47

I guess it also depends on the use case (long lived connections or just small one time burst of messages)

borkdude11:10:09

SSE is similar to WS, only the communication is one way and it’s over HTTP so I can use all my interceptors.

borkdude11:10:38

Actually I want to dedupe on the pending put channel I think. When the client is not connected yet, I don’t want to send the same notification more than once. But once connected, it can be that I send the same message twice with a 15 minute interval, that should actually be received.

mpenet11:10:59

I am not sure that happens at that time

mpenet11:10:17

I would think the pending put! queue ignores the xform

borkdude11:10:22

yes, I think so too, but now I realize I don’t want dedupe on the channel 🙂

mpenet11:10:09

also the pending put! queue is also something that will show up in other contexts. A put can happen in a go block via >! and so on. It's generally good to understand this stuff when using core async

mpenet11:10:21

same with take

mpenet11:10:41

ex:

(def ch (async/chan))
(dotimes [i 1025]
  (async/go (async/>! ch i)))
will blow up

mpenet11:10:58

it's basically a channel level thing

borkdude11:10:29

you mean the go block will blow up. but a put! won’t blow up, it will just return false?

mpenet11:10:19

any 'async' puts on a channel (no matter from where) that's full (or without a buffer) and has already 1024 pending puts, will blow up, in go block or otherwise

mpenet11:10:46

it will actually throw an exception

mpenet11:10:57

> java.lang.AssertionError: Assert failed: No more than 1024 pending puts are allowed on a single channel. Consider using a windowed buffer. > (< (.size puts) impl/MAX-QUEUE-SIZE)`

borkdude11:10:26

Good to know. Maybe it makes sense to not even send a message when the client is not connected. I can solve things a different way.

mpenet11:10:46

I am not sure about the sse part of pedestal, but the ws side of thing had (maybe still has) some questionable code

mpenet11:10:59

(if you are using pedestal)

borkdude11:10:13

I’m using yada. It only supports SSE

mpenet11:10:28

aleph is more solid for sure

borkdude11:10:57

They way I use it: send simple command via SSE. Then the client knows it should do a certain request. So I can hook into the same re-frame events we already had for those requests.

borkdude11:10:21

No “big” data goes over the SSE wire, that is done via normal requests, possibly transit etc.

Jan K14:10:26

race condition of the day: would you expect this will blow up in about 1% calls? async stuff is so hard 😞

(let [ch (chan)
        sub-ch (chan)
        p (pub ch (constantly :topic))]
    (>!! ch :hi)
    (sub p :topic sub-ch)
    (Thread/sleep 1)
    (assert (not (poll! sub-ch))))))

mccraigmccraig15:10:16

@borkdude aleph is fine with websockets - we use yada but drop to aleph for websocket support

borkdude15:10:08

@mccraigmccraig ok cool. I’m managing with SSE right now.

mrchance22:10:35

Can someone explain core.async garbage collection to me? E.g., if I have pipe a, mult b (of a) and pipe c, and my code only keeps a reference to c, to read out the result, will a and b be garbage collected? They probably wouldn't, because the tap is implemented as a goroutine, which still has a reference to the mult, which in turn has a reference to a. If I then untap the link between b and c, a and b would eventually get garbage collected?

mrchance22:10:08

(assuming there is a tap from b to c before, sorry if the explanation isn't very clear 😕 )

mccraigmccraig22:10:10

maybe post some code matching what you are thinking @mrchance - i'm having difficulty visualizing

hiredman22:10:52

go blocks turn in to callbacks with get attached to channels

hiredman22:10:03

so the gc story is the same as with callbacks on things

mrchance22:10:46

(def a (chan))
(def b (mult a))
(def c (chan))
(tap b c)

mrchance22:10:22

(And subsequently forget about a and b, let's say they just were in a let)

hiredman22:10:35

in general, anything that copies from a to b must have a reference to a and b

mrchance22:10:12

sure, so my conclusion above is correct? It will continue to run and not be garbage collected until I call untap?

hiredman22:10:02

well, in your example everything is globally def'ed

mrchance22:10:31

ok, let's make it a let then, and only return c

hiredman22:10:51

c won't stop the other channels from being gc'ed

mrchance22:10:35

Hm, I assumed the tap would still be there, somewhere in the core async scheduler

hiredman22:10:37

the go block that drives the mult has a reference to a and c

hiredman22:10:46

there is no scheduler

hiredman22:10:05

it is all inverted

hiredman22:10:11

go blocks don't exist

hiredman22:10:16

there are only callbacks on channels

mrchance22:10:11

sounds like I have to watch one of @tbaldridge s talks about async internals and the go macro 😉

mrchance22:10:21

I thought it gets rewritten into some kind of state machine

hiredman22:10:07

actually, c may or may not have a mult callback attached to it depending on where the mult is in its loop, and that mult callback might close over the reference to a

hiredman22:10:31

state machine in this case is a fancy kind of callback

hiredman22:10:26

ah, but mult is actually written in such a way that its callback isn't ever attached to c actually

mrchance22:10:38

I'm confused 🙂

mrchance22:10:20

What do you mean by "not attached to c"? I mean, it has to get stuff into c somehow, right?

hiredman22:10:01

the callback would get added to c if the go block used a parking op like !> or <!

mrchance22:10:35

but mult does park until all receivers are through, doesn't it?

hiredman22:10:58

it does, but it does it in a very indirect way, and actually it is the case that the mult callback will be referenced from c, but it is very and intermediate internal channel

hiredman22:10:41

c -> mutl internal done callback that doesn't reference a -> dchan -> callback from the go block in the mult that references a

hiredman22:10:03

so refering to c may or maybe not keep the mults state (which includes a) alive depending on if the mult is stalled on waiting for input or waiting to output

mrchance22:10:31

Ok, sounds like I chose particularly involved example ^^

mrchance22:10:48

But in general, a rule of thumb is that the channels are the central constructs, and only keeping them around and referenced will likely ensure that pipelines don't go away

mrchance22:10:34

Makes sense, actually, not very useful to have the pipeline still active when you can't actually put something in it

mrchance22:10:31

Thanks, considerably clearer now

mrchance22:10:05

Is there any resource you recommend to learn more about it? The talks I mentioned? Reading the code?

hiredman22:10:01

reading the code, also looking for (or implementing) some simpler toy csp implementations might help

mrchance23:10:12

Will do :thumbsup: