Fork me on GitHub
#core-async
<
2021-01-14
>
stephenmhopper02:01:57

I’m building an app that has a handful of background processes that pass messages to each other via core async channels. Does anyone have links to general patterns I should be following when building larger core.async systems? Right now, I’m tracking workers and channels in various atoms, but I feel like it’s growing out of control. So I’m tempted to just have a single channel and have everyone pub-sub to it as needed. Thoughts?

hiredman03:01:43

I use component for everything, and use the lifecycle start and stop to handle workers. We do have some pubsub stuff, but that is mostly used for distribution (it is wired into redis pubsub). Processes communicating locally are most often wired directly together via chans. We have a few mults that are just part of the component system map (you can stick anything in there) which components can depend on in the normal way if they want to tap them

hiredman03:01:36

I think clojure applied might have a chapter on this kind of thing, not sure

hiredman03:01:03

Chapter 6 of clojure applied has a section called "Connecting Components with Channels"

stephenmhopper03:01:45

Thanks for the response, that’s very helpful. My system is already designed in such a way that moving to component wouldn’t be hard at all, so maybe I’ll just do that Wiring everything up when my app starts isn’t hard. The issue I’m not sure how to deal with is development in the REPL. Suppose I have two workers which are really just go-loops that exit when they read a nil from their source channel. They communicate via two separate async channels. I want to recreate one of the workers (maybe the defn for that worker changed). I can’t just call close! on the old worker go-loop; I have to close the source chan that it’s reading from in order to get it to close. Which means I need to make a new source chan for the new worker and I need to find all of the other workers that were sending messages to that chan and recreate those as well. How do you deal with that situation?

stephenmhopper03:01:20

If I use component and the aforementioned source chan is a component and I call stop and then start on it, will component automatically recreate the downstream components (i.e. the workers) for me too?

seancorfield03:01:17

@stephenmhopper I'll put in a suggestion to subscribe to http://PurelyFunctional.tv for a month (or more!) and go through the core.async Patterns course on there.

seancorfield03:01:26

I don't remember the specifics but I'm pretty sure that course covers the scenario you're talking about (and several others). I'll need to go through it a few more times for it to sink in for me, but it's greatly improved my understanding of how to use core.async in the real world.

stephenmhopper03:01:29

Thanks for the recommendation, I’ll have to check that out.

jjttjj14:01:31

i've been struggling with the async components thing too. with how components relate to data flow. If my data flow graph looks like this, how exactly do i structure this as components? I feel like I clearly need some layer above core.async to manage the data flow but I'm not quite sure what it should look like

jjttjj14:01:49

like should each node there be its own component? My code is currently just a hodgepodge of "constructor" fns for each node but it gets pretty complicated to make changes https://gist.github.com/jjttjj/754083476035548aab4132ee9031b458

stephenmhopper14:01:59

I ended up just creating a package named workers (although “actors” might also be appropriate) that has all of my async workers in it. There’s a namespace in there called system which handles the setup / teardown of all of these workers . Every chan in my system is stored in an atom in the namespace for whatever worker consumes from that chan. And each namespace defines a single function for creating an acceptable chan that can be stored to that atom. However, no code in that namespace makes direct references to those channel atoms. Instead, my system ns handles the creation of all channels by calling swap! on each channel atom, calling close! on the previous value, and then creating a new channel by calling the function from the channel’s namespace. After closing the old channels and creating the new ones, system then creates the workers by derefing whatever channel atoms each worker needs and passing those in. If my system becomes anymore complicated than this, I’m going to move it to component , but for now it’s working well.

stephenmhopper14:01:58

@jjttjj It looks like all of those functions just create channels, no? You could probably build something similar to what I described above. Alternatively, you could use the component framework. Each of those channels be its own component (with some of them having dependencies on the other channel components in your system) and anything that uses those channels would be a component that takes them as a dependency

jjttjj15:01:14

I guess the difficult part of that is they are both channels and mult(when they fan out) and it's somewhat common for something that was once a channel to suddenly want multiple subscribers, at which point I make it a mult which requires changes in all connections to and from that component. I suppose I could start with everything defaulting to having a mult