Fork me on GitHub

how can i rewrite this code in a better way guys?


Some hints: - use transducers on your channel, something like (map (fn [url] <code from your fetch fn>)) that way you just need to put the url on your channel and at the other end you get fetch results. - use pipelines for parallelism? - use the time fn?


this will perform very poorly, and will block other go blocks from running, once you ask for a larger list of URLs


go blocks are not meant for blocking work, they are meant for coordinating on channels


if you run into an error on any of the fetches, and the channel write doesn't happen, it will lock up your code


(because you simply do blocking reads of the same number of messages as you made requests)


if you changed the fetch function to (defn fetch [url ch] (async/thread ... (>!! ...))) it would no longer risk clogging up all of core.async


I’ve read a few tutorials (not core-async ones, but libraries) advocate go blocks, than I learned about this issue with IO in go blocks being a no-no. What would be then the use of go blocks?


I mean, my experience so far is that you reach for asynchronous stuff to do non-blocking IO. Perhaps I lack some context?


go blocks are for coordinating channels, because you can park on channel reads and writes without blocking the thread


they are not for blocking work


async/thread uses a separate thread pool, and returns a channel you can park on


but this code is not even that async that it needs to park / coordinate, it just wants results in the order available


Right, it’s the “coordinating” part that I lack some concrete example for. Is it something like “take a value from this channel, transform it and push it to some other channels”?


or "if result a comes back first wait on channel b, otherwise if c comes first close b, cancel a, and put something on d"


Right ok and that of course doesn’t do any IO, only channel work - so that goes into a go block.


it's great for turning callback hell into orderly step by step code


I guess many people (myself including) first see core-async as a replacement for threads/futures/promises


right, the distinction between parallelism and async is important here


Because it can do those things - but you could also write that fetcher function just fine with a future.


typically it goes like: 1) you need more speed 2) you go parallel 3) async coordination is now needed


core.async is good for helping with step 3, once you get there


@orestis yeah, a future plus a queue


but then you'd likely want to control the number of threads, and the number of threads talking to a given specific host at any time (to avoid being an unintentional DoS attack) and pretty fast you do find a use for core.async in real world cases


How does core async help with that? I had the impression that you can’t give it a thread pool or executor, can you?


I mean, for thread


but how do you do "only n requests per host"


channels (and the other abstractions on top of them) help with that kind of logic


and you can control parallelism by starting N threads that do a read,execute,write loop


or N blocking channels with no buffer, and some logic around the dispatch


Damn I’m close but I still don’t get it. Do you mean, I can just create 10 plain threads, each one just does blocking reads on a channel, receives a URL, does a blocking fetch, puts the result in some other channel and loop back? And crucially, no go loop in sight?


that's one way to do it - because the task you described doesn't require asynchronous coordination


So that way I have a very clear and bounded worker group, no need to worry about using some bounded thread pool etc


but if you don't want all those threads hanging around for resource reasons, have channels used in N go blocks, each one parks, then executes a thread, reads from the resulting channel and puts the result on your result chan


since go blocks are cheaper than threads


I need some core-async design patterns book :)


Or some cookbook. Perhaps I just don’t deal with problems that are of that nature :)


there's some good talks out there, I saw a good one from Tim Baldridge at clojure/west 2017 in particular


yeah - I learned this stuff by using core.async wrong (and also using futures wrong...) and ending up with an app that misbehaved on a massive scale


I’ve seen that - I’m taking his advice to just not use it if it doesn’t solve some specific problem :)


but hey, if you don't have the throughput that leads to needing this stuff, that's even easier :D


I don't use core.async any more (and very little threading), because my current job doesn't have the massive throughput demands my last one did


I’m trying to understand the characteristics of JVM web applications. For example, I know that Jetty has a threadpool it uses to run request handlers. I then wander if it’s ok to block that with file or DB IO, and it seems a common way to signal async execution is by returning a channel (at least in pedestal)


So I guess it’s fine to use a go block to do that, as long as you’re careful to do all blocking IO in threads.


right - and in the jetty case the thread pool likely expands (you can likely control the max size)


the core.async pool doesn't expand, and isn't meant to be a worker pool


But I guess in the end, you’re still just shuffling threads around, since most DBs are synchronous anyway. Just looking t from a different point of view.


also http is IO, so jetty is already opted in for those threads to be (relatively) long lived


I think jetty does NIO these days - it’s hard to find some concrete document about those things, it’s a bit frustrating.


the main thing here is that they typical usage of eg. ring in clojure would use a non-nio method to consume the request body


once you do that, sure the server is using NIO, but your handler is doing blocking IO and sitting on a thread etc. thanks to that


That’s a great resource, thanks.


I need to go now unfortunately - I learned a lot, thanks!


I do feel Go blocks are most useful with IO.


io directly in go blocks with large enough throughput will choke core.async


Because you can easilly queue up non blocking IO requests at a much faster rate then the IO will finish.


and core.async only uses a small threadpool


I'm talking non blocking IO exclusively


So it lets you do IO multiplexing


Like make 10 io calls and then wait on the first one to finish


Basically anything you have an NxM problem. Where you need to do N thing but only have M threads.


So you want to schedule N tasks on M threads, and interleave that work. Now, the task cannot block the thread, because then it just steals away a worker thread and that defeats the point.


But the interleaving is key. That's what gives you concurrency.


This talk probably explains it best:


Interesting discussion, learning too. What do you guys think of the use of core.async in kinsky, which is a client lib for kafka: It uses a separate poller thread for consuming from kafka, and on the producing side it's async.


In my case kinsky is making my dev/test workflow much easier, as I can substitute kinsky channels with plain channels, meaning I can run quick "end-to-end" tests using core.async instead of kafka.


their producer function hides a future inside, and returns a channel, I would have opted to take the channel to write to as an arg to the producer, and return the future, or better replace the future with async/thread and return the channel that async/thread returns


it's relatively minor, but the difference is you could detect / act on failure states in the producer


and taking channels you write to as an arg means the consumer can pass in a channel with the buffering technique or transducer that gives the behavior they want


oh - I notice both namespaces have a producer function, I mean async/producer


Let me put it another way. The whole point of Go blocks is to maximize the use of a thread, and minimize the threading overhead. So until you find yourself in a situation where your thread is being wasted not doing anything, such as while it waits for non-blocking IO. And you have more things your program needs to do. And spawning a new thread to do them turns out to be too much overhead in memory and spawn time+context switching time. Then you do not need go blocks. Now if what you need to do concurrently is a small number of long computation, or long waits, then the thread overhead won't be a problem. So it is only if you've got a lot of small tasks to do that the thread overhead becomes an issue. And the smallest thing possible that you have a lot of is going to be non-blocking IO. Thus it is a great use case for Go Blocks.


@noisesmith not sure how to understand you when you say "their producer function hides a future", I'm guessing you mean kinsky producer hides the future returned by kafka/send, and uses an out channel common to all send operations. Actually I started to work around this issue here:


I mean that it creates a future, and doesn't return it


this is a "hot take" - I have only skimmed the code, so there's probably more context


OIC, yes that future is just to use a separate thread, actually I replaced it with async/thread


right - I just tend to be suspicious when a new thread is created, might fail in a meaningful way, and there's no handle around to check on it


I think in the case of kinsky it makes total sense. That's because the poller thread is infinite. And every time it polls, you have to dispatch the work to process the message to another thread to handle. And channels are exactly for that. A future only dispatches a single value once. But a poller thread will create an infinite stream of values to be handled.


it's only infinite if it doesn't fail


Failure should just be caught, and made into another message to be handled.


the point of having a handle to the thread is you can wake up and do something if the thread dies (async/thread is perfect here, as you can park on the channel it returns, then handle the condition)


Or dropped, depends on your use case.


if that loop in the future exits, the one who started it presumably cares


just a try/catch is relatively clumsy, a catch that writes to a channel on failure is not an improvement on a channel that gives you a value if the thread exits the loop


in the kinsky case I also added some exception handling cause there wasn't any, and an exception message is sent out. Thread loop terminates with a stop operation on the control channel


So, you'd let the polling thread crash, and then restart it?


maybe restart it, maybe bail on the task - you can decide contextually when it happens. Just saying, async/thread (or holding onto the future and doing something if/when it realizes) is a perfect abstraction here because it has the precise semantics you need - if this exits, look at the result then do whatever


as opposed to the current behavior: if/when this fails, the code waiting on this channel is stalled


kinsky async producer and consumer return a vector of two channels, but not the channel from the inner thread....


Hum.. I mean maybe, in the case of a catastrophic failure that doesn't get caught. You could do that too. But it does seem simpler to me to try/catch the poller and just not let the thread die.


depending on the nature of error, a restart might be a bad idea


Oh wait, you're talking about the producer future?


I think @noisesmith is more concerned about putting the exception to a channel; and that could block too, possibly forever


Ya ok. I see. That's true.


actually I used a async/put! to send the exception


so it should not block, it may just die without getting a chance to say why


something I see in that block - if record is nil, that means the input channel is closed and you will get nils from the channel as fast as you can read from it


I doubt you actually want to send nils to your consumer in a hot loop


that is, client/send, I assume that writes to kafka - so if someone closes the input channel, it will write nils to kafka in a loop as fast as it can


yes you're right @noisesmith, I handled that in my local copy, I have not pushed that change yet


it's handled the same way as a close operation


that try/catch only catches Exception, if you used the channel returned by async/thread to make a message for the consumer, it would get a message regardless of why the thread exits (which could be some sort of Error for example)


yes that's a good idea. I wonder if catching Throwable instead would help too, but maybe I should let the thread channel return such uncaught exception


I'm going to go have a weekend 👋


ok thanks for the useful insights @noisesmith and @didibus


I have not used core.async a lot, started with this small project I'm working on, which uses kinsky and kafka, and learned a lot. This no blocking IO in core.async is quite important indeed, using core.async for coordination is its sweet spot, thanks for reminding us of that @noisesmith. I think kinsky avoided that. @didibus I think you summarise it well by saying core.async must be used to maximise thread usage in a non-blocking IO context!


Ya, it serves two purposes: 1) Inter-thread communication. 2) Cooperative (aka non-premptive) scheduling of tasks unto threads. I see a lot of people reaching to core.async that aren't in need of either these things. A lot of people try out core.async and don't understand what to do with it. And try to force it into problems that have simpler solutions. If you don't come to it because you reached a limitation either in trying to have two threads communicate or because you maxed out your throughput and are looking for ways to increase it, then core.async just won't make sense. And you should avoid it.