Fork me on GitHub
#core-async
<
2017-01-18
>
adamkowalski00:01:00

Well ideally I would have just one channel which would contain the messages that the consumer is getting from the broker

adamkowalski00:01:48

but the consumer is statefull. It has to maintain its position in the log, and you can subscribe to various topics, pause consumption and many other things

adamkowalski00:01:24

So I am was thinking about keeping this state inside of a go loop

adamkowalski00:01:59

but then I didn’t know how to interact with the consumer from outside of that go loop besides creating an incomming channel and listening for messages on there

adamkowalski00:01:16

any time I get a new message on that channel, I would call the corresponding method on the object

sophiago01:01:54

can anyone give me some insight as to how garbage collection works within go blocks? i somehow reached the limit running a criterium benchmark, but then using quick-bench the computation was actually quite fast. so thinking i need to tweak something to better control memory usage?

sophiago01:01:07

the function i'm testing seems pretty fast, at least by my standards, but if i use any go-blocks as opposed to doing everything with blocking it quickly exceeds 2gb memory and all the threads halt

danielcompton02:01:48

@sophiago if you are blocking, then criterium will run each iteration in serial

danielcompton02:01:08

if your criterium test is creating a go block without blocking for it at any point

danielcompton02:01:20

then criterium will just run that function that creates the go block as fast as it can

danielcompton02:01:39

thousands (millions?) times/second

sophiago02:01:10

i'm sorry, i'm not sure what you mean. are you saying that if i use go blocks then criterium won't pause between iterations as usual?

sophiago02:01:25

at the moment, even if i don't use go-blocks the gc is failing before it reaches 60 iterations and i have to use quick-bench instead. but it seems worse with them

danielcompton09:01:25

Can you share the code?

joshjones16:01:33

I’d like to be sure I understand the behavior of channel ops correctly: 1) put! and take! — asynchronous put/take. These will not block, regardless of buffering. 2) >! and <! — parking put/take. These will park the (real or logical) thread and allow it to be reused by other code that is runnable, if the channel is unbuffered or the buffer is full (for a >!) or if the channel is empty (for a <!) 3) >!! and <!! — blocking put/take. Identical to #2, except: These will block the real thread if the channel is unbuffered or the buffer is full (for a >!!) or if the channel is empty (for a <!!) 4) I see that the ManyToManyChannel has both a buffer and puts/`takes` linked lists in the implementation. From what I can see of the behavior of the underlying data structure, it seems that puts is basically an “overflow buffer” for the main buf. For example, put! and >!! both put data into the buf, but once it is full, then put! will add to the puts linked list, while >!! will block as explained above (whether that’s blocking in the main thread, or in its own thread). If puts has one element and buf is full at three elements, then a <!! or a take! both take from the primary buf, and the element that was in puts is now moved to the buf. Similarly, takes is somewhat of a “reverse overflow” buffer. A take! from an empty channel enqueues the take! handler function provided, and both put! and >!! will trigger this handler, bypassing the buf. @tbaldridge , can you verify that my understanding is correct here?

tbaldridge16:01:10

@joshjones yep, pretty much. And if you think about it, only puts or takes will have data at a given time, never both. So actually they could be unified as parked-threads with some sort of flag to say parked-direction but that would complicate the code a bit

joshjones16:01:37

when you say puts and takes, you are talking about the fields in the ManyToManyChannel correct?

joshjones16:01:44

It was a little confusing at first to see that put! will always “succeed,” even if it is called many times on a non-buffered channel. However, (thread (>!! …)) will also always succeed, as it will simply create a thread and block. Is the put! implementation with the “overflow buffer” done this way to provide the illusion of an asynchronous put, in the same way that (thread (>!! …)) does? From what I can see, it simply enqueues the data, but it gives the appearance of a true async put operation that is logically “blocking” on another thread.

tbaldridge16:01:56

So one more thing to notice....all channel ops take a callback. Everything works on callbacks. Even <!! is just short hand for creating a promise, adding it to the channel, then derefing the promise (blocking)

tbaldridge16:01:13

So put! takes an optional callback. put! is the lowest level primitive (or can be viewed that way). The puts queue then is a list of pairs of values and callbacks to execute once the put has been enqueued into the channel.

tbaldridge16:01:26

actually it's a puts list since there's no guarantee on the execution order of the puts callbacks. (Said incase Rich reads this in the future and feels the need to correct me 😉 )

joshjones16:01:27

Some interesting behavior here related to callbacks:

(def c1 (chan 2))
(put! c1 1)
(put! c1 2)
(put! c1 3 #(println "callback" %))
(<!! c1)  ; results in a value of 1, and the callback fn executing

joshjones16:01:01

I think that’s what you just referred to — no guarantee of execution order of callbacks

tbaldridge16:01:46

right. IIRC that will work differently in CLJS because we use a ring buffer there and the semantics are different. But those semantics are undefined.

tbaldridge16:01:14

The idea is, if you're executing multiple put!s with no callbacks you're basically saying "I don't care on the ordering". If you care, you should be blocking/parking/waiting for the callback.

tbaldridge16:01:38

wait no, that's something else

tbaldridge16:01:25

So think about what you're doing there real quick.... (ascii art incoming). First put, the channel looks like this (puts, buffer, takes): [] [1] []

tbaldridge16:01:52

then: [] [2 1] [] [3] [2 1] []

joshjones16:01:02

with you so far

tbaldridge16:01:10

Finally when you execute the <!!: [] [3 2] []

tbaldridge16:01:34

So the callback gets executed for 3 because the value has been enqueued into your buffer of 2

joshjones16:01:29

ahh, so the callback function for the put! has not been executed yet, as it would have if the buffer were not full

joshjones16:01:25

so in this case:

(def c1 (chan 2))
(put! c1 1)
(put! c1 2 #(println "callback" %))
the callback is executed immediately. In the earlier case, it is deferred until it’s actually put on the buf … got it

joshjones16:01:00

So to close the loop on this portion of my understanding here — from what I can see, (put! c1 3) on a channel with a full buffer has the same effect as (thread (>!! c1 3), in that the thread it’s called from is not blocked, and the put “overflow buffer” will have the handler/value. The difference is that put! enqueues the handler/value using the current thread, and the (thread … version creates a new thread. So, I would want to use >!! when I want to either (1) do something computationally intensive and call it from a separate thread, or (2) respect backpressure and avoid doing more ‘producer’ work than my consumer is clearly able to handle, by blocking if the channel is full. Am I on the right track at all with this conclusion, or no @tbaldridge ?

tbaldridge16:01:12

that's all correct.

tbaldridge16:01:04

And more on that, (go (>! c1 3)) is exactly the same as (put! c1 3), except the version with go is slower since it's doing more allocation/bookkeeping

tbaldridge16:01:03

And to expound a bit more: Core.async really wants you do deal with bounded buffers, but as you can see puts and takes are unbounded. So the library will throw an error if you try to do this:

tbaldridge16:01:35

(let [c (chan)]
  (dotimes [x 100000]
     (put! c x)))

tbaldridge16:01:28

Same if you did (thread (>!! c x)), at that point you're treating the puts as an unbounded queue, and core.async will error out and tell you that you can't have more than 1024 pending ops on a channel. \

tbaldridge16:01:57

And no there's not a technical reason for 1024, it's just large enough to allow correct programs but small enough to slap your hand when something goes wrong.

joshjones16:01:27

i see it’s just the puts overflow buffer that can only be up to 1024 … if it reaches that limit, it means you’re not doing something correct with the actual buffer in the channel

joshjones16:01:52

so now that i understand the internals better, i’ll have to zoom out to the usability zoom level and do some more experimenting to make sure i’m using things correctly (for example, rarely/never doing (go (>! c x)) or (go (<! c)) it would seem)

joshjones17:01:16

thanks so much for your help @tbaldridge