Fork me on GitHub
Alex Miller (Clojure team)00:05:26

re runtests - no, don’t think

Alex Miller (Clojure team)00:05:33

re script, yes - but need to check with @dnolen

Alex Miller (Clojure team)00:05:09

can we split the contributing changes into a separate ticket/patch too?

Alex Miller (Clojure team)00:05:18

I can apply that one now


Ok, so you want a patch that's everything but

Alex Miller (Clojure team)00:05:02

yeah, the enhancement separated from the contributing changes


I find that somewhat ridiculously fine-grained, but will put it together.

Alex Miller (Clojure team)00:05:51

well these will be applied on much different time scales :)

Alex Miller (Clojure team)00:05:15

I need to get a thumbs up from Rich on the enhancement but I can apply the contributing change now


Well, in terms of the docs, can we loop in @dnolen about cleaning up those misleading (?) scripts as well. And maybe a tiny change to the test runner to show something in the window, not just the console. Doesn't have to be fancy!


Boy Scout: Leave everything better than you found it.

Alex Miller (Clojure team)00:05:40

sure, I suspect David probably won’t be available till tomorrow but we are planning to do a core.async release tomorrow


@hlship you can indeed remove script/test, that can’t really work since a l carte JS engine builds won’t have an async dispatch mechanism


showing something in the window would be a nice enhancement - I agree


so, i know that closing a channel propagates down (that is, if you close a from channel, usually the to channel will be closed as well), but what happens when you turn that around ? does closing a to channel by default close the chain upwards ?


i should probably test this and figure out for myself… 🙂


@lmergen no -- perhaps there are others reading the from channel still, so you don't want to close it. As the doc says for pipe and pipeline: `"Will stop consuming the from channel if the to channel closes"


as the source for pipe shows, from is only referenced one place, and that's (<! from), so it's never closed:

([from to close?]
     (go-loop []
      (let [v (<! from)]
        (if (nil? v)
          (when close? (close! to))
          (when (>! to v)


yeah, that makes sense


ok, in that case, i’ll have to tailor this to my specific use case


thanks for the info!


@lmergen you're welcome -- note that you would probably not have anyone taking from from if you intend to pipe it to to, as your takes from the to channel would not be consistent then, but I guess it's good to know that you can take a previously piped channel and after closing the to, still continue using it, if for some reason you wanted to; I'm curious as to your use case, as I think about why you'd care about this?


so, in my searching I've seen this question asked several times before but usually without any particularly clear answers - is there a way to identify whether a channel has an item waiting to be consumed without consuming said item? a.k.a. peek? I'm trying to create an event system with routing/subscription using core.async channels, but I want to be able to monitor the feeds for debugging purposes, to warn if there is an event stream that never had a handler attached


the only ideas I've had so far are 1) a peek based monitor, that periodically checks to see if any events have not been handled 2) an ack model, where all events are copied into a collection and aren't removed until an explicit ack message is received on another channel


you are using the standard core.async channels and buffers, correct? in other words, you have not implemented your own ManyToManyChannel etc? @shader


when you say "waiting to be consumed," if you mean "would a <!! block?" then that means that either the channel's buffer is not empty, or that there are pending puts onto the channel. you can check for these two as such, though you're digging into implementation details which may not be the best thing to do.



(defn can-take
  (or (> (count (.puts ch)) 0)
      (> (count (.buf ch)) 0)))


yes; I have not implemented any custom channels or buffers. Since I would be using the standard core.async implementation only, I suppose relying on implementation details isn't too bad


@joshjones thanks for the code

Alex Miller (Clojure team)14:05:50

@hlship I removed the script/test script, added a hint to runtests.html, and updated the readme with the cljs test instructions


ok, I was just starting work on a patch for that.


@shader @joshjones why not just use clojure.core.async/peek! ?


I mean, that’s what it is for


I didn't see that when I was looking...


it doesn’t answer the “would reading from this channel block?” question because CSP can never make any such promise, but it returns nil or the item


oh I’m sorry, it’s called poll! - my bad


oh and it’s not peek, because it actually gets the item - so might be wrong for your case


but that’s a better behavior because CSP can never make the promise “reading this would never block” per se, that’s just not a thing it can express, without making a special construct like poll! that maybe reads and maybe doesn’t


I don't really want to prove that reading won't block


I just want to warn if a particular channel hasn't been read in a period of time


well, poll! wouldn’t be the right thing for that either


xform on the chan that updates a mutable “read time”


There is not a "selective receive" like erlang or "peek" in core.async, and my guess is that there will never be.


I think it would lead to terrible errors to expose something like that.


@ghadi right - what I was trying to say but much better put


it’s not an operation that CSP can do correctly


There's probably an analogy to coupling in there. Peeking into another process's internals, rather than relying upon communication


to the posters actual original concern (now that I understand it) a combination of “count of the channel’s buffer” with “time of last read from the channel” seems reasonable if all you are doing is raising an alarm about backlog or something


You can always introduce an intermediate/proxy process to read from some channels and mediate the distribution of values


just going on channel count would false positive on high throughput


instead of a stateful transducer that tells you how long its been since the last read from the chan, you could bundle each message as a tuple with a submission time, and track the deltas on each read (with an alarm condition for large deltas)


that seems fruitful ^ (adding metadata to the submission)


(not clojure metadata but just a plain map around the submission)


that's one of the directions I've been thinking; though in my particular case I'm more concerned with mistaken identifiers that result in a channel never being read from, rather than slow processing


the ack-based solution might be better, and affords additional flexibility in some ways, but also puts a greater burden on the recipient to implement the pattern


So what @shader originally asked was "is there a way to identify whether a channel has an item waiting to be consumed without consuming said item" -- checking the buf and pending puts is a way to verify whether or not this is the case. It doesn't modify state, doesn't receive anything, just takes a count. I'm not sure where the issue would be there.


yep; I'l probably start there honestly


but the root thing is knowing whether it’s being consumed


what do you mean?


is there a way to check if there are any listeners/consumers associated with a channel?


core.async is based on CSP, but it's implemented with plain ol' data structures. I suppose one could (.lock (.mutex ch)) to verify atomicity of the count, if that level of correctness is required


@shader: correct me if I’m wrong but if there were 100 items unread but you were reading 50 a second, that wouldbe fine right? the issue isn’t how many are in buffer, but whether things are actively being read


yeah; I just wanted to give a warning if there is an event that doesn't have any registered handlers


yeah, that’s an odd fit for csp, and count is only a loose indicator of that at best


maybe "registered" is the wrong word...


things in csp don’t ever have “registered handlers” - channels have reads and writes


Needless to say it's a really bad idea to read the implementation buf / puts


which means you can’t check as an abstract questions “who reads this” - you can check if it’s been read, if you log such things, maybe you could even log who plans on reading, but you don’t have that status of being a dedicated reader


@ghadi depends on the use case


that can definitely be an intended behavior, but it’s not reified into the rules / semantics / guarantees in any way


quite a bit of writing clojure every day depends on relying on implementation details ... it's not perfect, but it's life. If you need to do something and there's no other way, sometimes it's a necessary tradeoff. Not good practice for a long term project, but when you need to get something done, sometimes it's the only way.


@shader maybe pub/sub are closer to the semantics you want? either core.async pub/sub or a more dedicated pub/sub system?


@joshjones you're guaranteed to have a bad time, and I'd never use a library that did that


is he writing a library? @ghadi


Not sure, but I wrote a bunch of core.async, just trying to give advice.


If you want to know whether a set of channels have been read recently, you can do it like this:


C1 C2 C3 C4 C5 <===== proxy go block that uses alts! to tracks recent read times / metadata in a map ====> output channel


you don't have to look inside C1 - C5 to know whether they've been touched. Now this makes the assumption that only the proxy go block has access to the channels


(Also, whether you write a library or program I wouldn't advocate reacharounds into impl)


yes, we recently implemented our own ManyToManyChannel on top of a distributed messaging system to use a vanilla core.async channel to send distributed messages. In doing so, it was necessary to understand implementation details that otherwise one wouldn't care about, and this is just a fact of life. Much reading of the core.async code and rewriting a lot of it. Perfect encapsulation and data hiding is the realm of OOP. Separating concerns is good engineering, and I do it every day, but sometimes you need something to work that requires bending these rules.


I'm specifically advocating that in the original poster's question, the answer did not need to reach into impl.


> quite a bit of writing clojure every day depends on relying on implementation details that's not my general experience


I'll bet you know how a vector is implemented, and how it differs from how a list is implemented...


and the ramifications of that, for example, knowing that (rest x) on a vector is slow, but the same operation on a list is near instantaneous


joshjones: FWIW rest on a vector is constant time and very fast


slower than on a list, sure, but not that much


hey this is veering off-topic


chunking of seqs ... 32 at a time ... it's all implementation specific, but stuff we need to know. I agree OT, just addressing your quoting me


I would like to dissuade people from reaching into a channel mutex when it's not minimal/necessary


so, I would definitely prefer not to dive into implementation details


you might look at a custom buffer type


you could write a buffer type that looks at the age of items in the buffer and does something if an item is too old


but the other options that I've thought of so far 1) a potentially complex set of intermediate channels, with alt! and timeout, etc, using backpressure from downstream to identify whether it's been read or not, or 2) some kind of ack-based system that copies all of the messages and discards the ones that are acknowledged, leaving a set of unhandled messages that can be checked with timeouts, etc.


@hiredman interesting option


you can also look at say tcp, and how it does reliable delivery (assuming that is the end of goal of this)


reliable delivery is one of the goals. I'm basically trying to make an alternative to the re-frame event system that is more functional, using channels. re-frame kindly warns if you aren't listening for an event, but then doesn't keep it around for later processing. I would still like to have the benefit of the warnings, but also less ordering sensitivity while bootstrapping. Having said that, the event routing structure will probably have to be immutable to meet the fp design goal, so I'm probably not going to have any handlers "registering" via side-effects


I was working on a similar system based on most.js before I switched to clojurescript, which seems much better suited for it because the immutability and channels are part of the core


so what you want is something like what I have above, with an "unreliable" channel in the middle that drops messages that aren't subscribed to


the consumer will notice a missing sequence number, then signal the producer, at which point the producer can look it its buffer of unacked messages and signal that a given message was unhandled


that would be one way to do it I suppose, though it seems simpler if the messages were just buffered and I could detect that they had not been read - the warnings are really just for debugging purposes. If the system is operating as intended, the handlers should eventually be loaded, and it would be nice not to lose the events before then


right, so you have a debug flag in the producer that switches from drop and warn and retry forever (assuming handlers will all be loaded)


good point


though 'retry forever' would probably just be a channel with a blocking put


I think there are sort of two ways people approach core.async as a library 1. channels are a sort of interface, so if you want your widget to participate, make it like a channel 2. channels are messing passing primitives you can use to build whatever machines that expose some kind of interface


the approaches are compatible, the main difference is where the complexity ends up


#1 ends up with sort of simple code that uses channels, but complex channel implementations #2 ends up with simpler channels and complex machines that use channels to communicate


interesting; I guess I don't normally think of providing alternate channel implementations


I usually think about providing wrappers to the interface functions. E.g., making them appear more functional by providing an equivalent to mix that 1) returns the out instead of taking it as input, 2) takes the desired input channels as arguments, plus a 'control' channel that can add new channels, or toggle their state via messages. That way the channel structure is immutable, while the contents provide the dynamics


this can be implemented as a wrapper around the existing mix that hides the messy side-effect details, though it may not be as efficient as a more complete custom solution


that's one of the things I liked about most.js - higher-order streams


I have a side project where I created my own ReadPort because I a) wanted to ensure that clients didn't write to it and b) needed to do some special cleanup work when it was closed.