Fork me on GitHub
#core-async
<
2018-11-11
>
abdullahibra10:11:21

how can i rewrite this code in a better way guys?

fmjrey14:11:59

Some hints: - use transducers on your channel, something like (map (fn [url] <code from your fetch fn>)) that way you just need to put the url on your channel and at the other end you get fetch results. - use pipelines for parallelism? - use the time fn?

noisesmith17:11:42

this will perform very poorly, and will block other go blocks from running, once you ask for a larger list of URLs

noisesmith17:11:08

go blocks are not meant for blocking work, they are meant for coordinating on channels

noisesmith17:11:23

if you run into an error on any of the fetches, and the channel write doesn't happen, it will lock up your code

noisesmith17:11:46

(because you simply do blocking reads of the same number of messages as you made requests)

noisesmith17:11:20

if you changed the fetch function to (defn fetch [url ch] (async/thread ... (>!! ...))) it would no longer risk clogging up all of core.async

orestis17:11:40

I’ve read a few tutorials (not core-async ones, but libraries) advocate go blocks, than I learned about this issue with IO in go blocks being a no-no. What would be then the use of go blocks?

orestis17:11:39

I mean, my experience so far is that you reach for asynchronous stuff to do non-blocking IO. Perhaps I lack some context?

noisesmith17:11:30

go blocks are for coordinating channels, because you can park on channel reads and writes without blocking the thread

noisesmith17:11:38

they are not for blocking work

noisesmith17:11:04

async/thread uses a separate thread pool, and returns a channel you can park on

noisesmith17:11:29

but this code is not even that async that it needs to park / coordinate, it just wants results in the order available

orestis17:11:47

Right, it’s the “coordinating” part that I lack some concrete example for. Is it something like “take a value from this channel, transform it and push it to some other channels”?

noisesmith17:11:20

or "if result a comes back first wait on channel b, otherwise if c comes first close b, cancel a, and put something on d"

orestis17:11:54

Right ok and that of course doesn’t do any IO, only channel work - so that goes into a go block.

noisesmith17:11:56

it's great for turning callback hell into orderly step by step code

orestis17:11:33

I guess many people (myself including) first see core-async as a replacement for threads/futures/promises

noisesmith17:11:53

right, the distinction between parallelism and async is important here

orestis17:11:21

Because it can do those things - but you could also write that fetcher function just fine with a future.

noisesmith17:11:31

typically it goes like: 1) you need more speed 2) you go parallel 3) async coordination is now needed

noisesmith17:11:47

core.async is good for helping with step 3, once you get there

noisesmith17:11:16

@orestis yeah, a future plus a queue

noisesmith17:11:33

but then you'd likely want to control the number of threads, and the number of threads talking to a given specific host at any time (to avoid being an unintentional DoS attack) and pretty fast you do find a use for core.async in real world cases

orestis17:11:21

How does core async help with that? I had the impression that you can’t give it a thread pool or executor, can you?

orestis17:11:32

I mean, for thread

noisesmith17:11:39

but how do you do "only n requests per host"

noisesmith17:11:32

channels (and the other abstractions on top of them) help with that kind of logic

noisesmith17:11:06

and you can control parallelism by starting N threads that do a read,execute,write loop

noisesmith17:11:15

or N blocking channels with no buffer, and some logic around the dispatch

orestis17:11:29

Damn I’m close but I still don’t get it. Do you mean, I can just create 10 plain threads, each one just does blocking reads on a channel, receives a URL, does a blocking fetch, puts the result in some other channel and loop back? And crucially, no go loop in sight?

noisesmith17:11:21

that's one way to do it - because the task you described doesn't require asynchronous coordination

orestis17:11:06

So that way I have a very clear and bounded worker group, no need to worry about using some bounded thread pool etc

noisesmith17:11:12

but if you don't want all those threads hanging around for resource reasons, have channels used in N go blocks, each one parks, then executes a thread, reads from the resulting channel and puts the result on your result chan

noisesmith17:11:30

since go blocks are cheaper than threads

orestis17:11:30

I need some core-async design patterns book :)

orestis17:11:21

Or some cookbook. Perhaps I just don’t deal with problems that are of that nature :)

noisesmith17:11:24

there's some good talks out there, I saw a good one from Tim Baldridge at clojure/west 2017 in particular

noisesmith17:11:03

yeah - I learned this stuff by using core.async wrong (and also using futures wrong...) and ending up with an app that misbehaved on a massive scale

orestis17:11:16

I’ve seen that - I’m taking his advice to just not use it if it doesn’t solve some specific problem :)

noisesmith17:11:26

but hey, if you don't have the throughput that leads to needing this stuff, that's even easier :D

noisesmith17:11:25

I don't use core.async any more (and very little threading), because my current job doesn't have the massive throughput demands my last one did

orestis17:11:48

I’m trying to understand the characteristics of JVM web applications. For example, I know that Jetty has a threadpool it uses to run request handlers. I then wander if it’s ok to block that with file or DB IO, and it seems a common way to signal async execution is by returning a channel (at least in pedestal)

orestis17:11:24

So I guess it’s fine to use a go block to do that, as long as you’re careful to do all blocking IO in threads.

noisesmith17:11:07

right - and in the jetty case the thread pool likely expands (you can likely control the max size)

noisesmith17:11:26

the core.async pool doesn't expand, and isn't meant to be a worker pool

orestis17:11:35

But I guess in the end, you’re still just shuffling threads around, since most DBs are synchronous anyway. Just looking t from a different point of view.

noisesmith17:11:24

also http is IO, so jetty is already opted in for those threads to be (relatively) long lived

orestis17:11:11

I think jetty does NIO these days - it’s hard to find some concrete document about those things, it’s a bit frustrating.

noisesmith17:11:51

the main thing here is that they typical usage of eg. ring in clojure would use a non-nio method to consume the request body

noisesmith17:11:18

once you do that, sure the server is using NIO, but your handler is doing blocking IO and sitting on a thread etc. thanks to that

orestis17:11:43

That’s a great resource, thanks.

orestis17:11:08

I need to go now unfortunately - I learned a lot, thanks!

didibus18:11:10

I do feel Go blocks are most useful with IO.

noisesmith18:11:07

io directly in go blocks with large enough throughput will choke core.async

didibus18:11:33

Because you can easilly queue up non blocking IO requests at a much faster rate then the IO will finish.

noisesmith18:11:14

and core.async only uses a small threadpool

didibus18:11:00

I'm talking non blocking IO exclusively

didibus18:11:32

So it lets you do IO multiplexing

didibus18:11:26

Like make 10 io calls and then wait on the first one to finish

didibus18:11:16

Basically anything you have an NxM problem. Where you need to do N thing but only have M threads.

didibus18:11:45

So you want to schedule N tasks on M threads, and interleave that work. Now, the task cannot block the thread, because then it just steals away a worker thread and that defeats the point.

didibus18:11:58

But the interleaving is key. That's what gives you concurrency.

didibus18:11:09

This talk probably explains it best: https://vimeo.com/49718712

fmjrey18:11:50

Interesting discussion, learning too. What do you guys think of the use of core.async in kinsky, which is a client lib for kafka: https://github.com/pyr/kinsky It uses a separate poller thread for consuming from kafka, and on the producing side it's async.

fmjrey19:11:48

In my case kinsky is making my dev/test workflow much easier, as I can substitute kinsky channels with plain channels, meaning I can run quick "end-to-end" tests using core.async instead of kafka.

noisesmith19:11:33

their producer function hides a future inside, and returns a channel, I would have opted to take the channel to write to as an arg to the producer, and return the future, or better replace the future with async/thread and return the channel that async/thread returns

noisesmith19:11:15

it's relatively minor, but the difference is you could detect / act on failure states in the producer

noisesmith19:11:52

and taking channels you write to as an arg means the consumer can pass in a channel with the buffering technique or transducer that gives the behavior they want

noisesmith19:11:45

oh - I notice both namespaces have a producer function, I mean async/producer

didibus19:11:22

Let me put it another way. The whole point of Go blocks is to maximize the use of a thread, and minimize the threading overhead. So until you find yourself in a situation where your thread is being wasted not doing anything, such as while it waits for non-blocking IO. And you have more things your program needs to do. And spawning a new thread to do them turns out to be too much overhead in memory and spawn time+context switching time. Then you do not need go blocks. Now if what you need to do concurrently is a small number of long computation, or long waits, then the thread overhead won't be a problem. So it is only if you've got a lot of small tasks to do that the thread overhead becomes an issue. And the smallest thing possible that you have a lot of is going to be non-blocking IO. Thus it is a great use case for Go Blocks.

fmjrey19:11:50

@noisesmith not sure how to understand you when you say "their producer function hides a future", I'm guessing you mean kinsky producer hides the future returned by kafka/send, and uses an out channel common to all send operations. Actually I started to work around this issue here: https://github.com/fmjrey/kinsky/commit/5e9ae38351781ba847e89421447c47cdcc968840

noisesmith19:11:27

I mean that it creates a future, and doesn't return it

noisesmith19:11:42

this is a "hot take" - I have only skimmed the code, so there's probably more context

fmjrey19:11:40

OIC, yes that future is just to use a separate thread, actually I replaced it with async/thread

noisesmith19:11:29

right - I just tend to be suspicious when a new thread is created, might fail in a meaningful way, and there's no handle around to check on it

didibus19:11:20

I think in the case of kinsky it makes total sense. That's because the poller thread is infinite. And every time it polls, you have to dispatch the work to process the message to another thread to handle. And channels are exactly for that. A future only dispatches a single value once. But a poller thread will create an infinite stream of values to be handled.

noisesmith19:11:21

it's only infinite if it doesn't fail

didibus19:11:03

Failure should just be caught, and made into another message to be handled.

noisesmith19:11:17

the point of having a handle to the thread is you can wake up and do something if the thread dies (async/thread is perfect here, as you can park on the channel it returns, then handle the condition)

didibus19:11:27

Or dropped, depends on your use case.

noisesmith19:11:54

if that loop in the future exits, the one who started it presumably cares

noisesmith19:11:32

just a try/catch is relatively clumsy, a catch that writes to a channel on failure is not an improvement on a channel that gives you a value if the thread exits the loop

fmjrey19:11:16

in the kinsky case I also added some exception handling cause there wasn't any, and an exception message is sent out. Thread loop terminates with a stop operation on the control channel

didibus19:11:26

So, you'd let the polling thread crash, and then restart it?

noisesmith19:11:40

maybe restart it, maybe bail on the task - you can decide contextually when it happens. Just saying, async/thread (or holding onto the future and doing something if/when it realizes) is a perfect abstraction here because it has the precise semantics you need - if this exits, look at the result then do whatever

noisesmith19:11:26

as opposed to the current behavior: if/when this fails, the code waiting on this channel is stalled

fmjrey19:11:18

kinsky async producer and consumer return a vector of two channels, but not the channel from the inner thread....

didibus19:11:42

Hum.. I mean maybe, in the case of a catastrophic failure that doesn't get caught. You could do that too. But it does seem simpler to me to try/catch the poller and just not let the thread die.

noisesmith19:11:51

depending on the nature of error, a restart might be a bad idea

didibus19:11:52

Oh wait, you're talking about the producer future?

fmjrey19:11:59

I think @noisesmith is more concerned about putting the exception to a channel; and that could block too, possibly forever

didibus19:11:30

Ya ok. I see. That's true.

fmjrey19:11:02

actually I used a async/put! to send the exception

fmjrey19:11:25

so it should not block, it may just die without getting a chance to say why

noisesmith19:11:46

something I see in that block - if record is nil, that means the input channel is closed and you will get nils from the channel as fast as you can read from it

noisesmith19:11:59

I doubt you actually want to send nils to your consumer in a hot loop

noisesmith19:11:19

that is, client/send, I assume that writes to kafka - so if someone closes the input channel, it will write nils to kafka in a loop as fast as it can

fmjrey19:11:00

yes you're right @noisesmith, I handled that in my local copy, I have not pushed that change yet

fmjrey19:11:05

it's handled the same way as a close operation

noisesmith19:11:36

that try/catch only catches Exception, if you used the channel returned by async/thread to make a message for the consumer, it would get a message regardless of why the thread exits (which could be some sort of Error for example)

fmjrey19:11:14

yes that's a good idea. I wonder if catching Throwable instead would help too, but maybe I should let the thread channel return such uncaught exception

noisesmith19:11:54

I'm going to go have a weekend 👋

fmjrey19:11:45

ok thanks for the useful insights @noisesmith and @didibus

fmjrey19:11:31

I have not used core.async a lot, started with this small project I'm working on, which uses kinsky and kafka, and learned a lot. This no blocking IO in core.async is quite important indeed, using core.async for coordination is its sweet spot, thanks for reminding us of that @noisesmith. I think kinsky avoided that. @didibus I think you summarise it well by saying core.async must be used to maximise thread usage in a non-blocking IO context!

didibus20:11:42

Ya, it serves two purposes: 1) Inter-thread communication. 2) Cooperative (aka non-premptive) scheduling of tasks unto threads. I see a lot of people reaching to core.async that aren't in need of either these things. A lot of people try out core.async and don't understand what to do with it. And try to force it into problems that have simpler solutions. If you don't come to it because you reached a limitation either in trying to have two threads communicate or because you maxed out your throughput and are looking for ways to increase it, then core.async just won't make sense. And you should avoid it.