This page is not created by, affiliated with, or supported by Slack Technologies, Inc.
2017-05-26
Channels
- # aws (7)
- # beginners (109)
- # boot (5)
- # carry (2)
- # cider (25)
- # clara (6)
- # cljs-dev (86)
- # cljs-experience (19)
- # cljsrn (1)
- # clojure (183)
- # clojure-dev (7)
- # clojure-dusseldorf (7)
- # clojure-gamedev (2)
- # clojure-greece (32)
- # clojure-italy (2)
- # clojure-norway (1)
- # clojure-russia (228)
- # clojure-sg (3)
- # clojure-spec (38)
- # clojure-uk (104)
- # clojurebridge (1)
- # clojurescript (29)
- # community-development (9)
- # core-async (118)
- # core-matrix (20)
- # cursive (5)
- # datomic (140)
- # emacs (25)
- # figwheel (1)
- # hoplon (21)
- # jobs (4)
- # lein-figwheel (2)
- # luminus (10)
- # lumo (35)
- # off-topic (137)
- # om (31)
- # onyx (62)
- # pedestal (6)
- # reagent (25)
- # remote-jobs (1)
- # ring-swagger (11)
- # spacemacs (2)
- # test-check (17)
- # uncomplicate (10)
- # unrepl (1)
- # untangled (20)
- # vim (4)
- # yada (3)
re runtests - no, don’t think
re script, yes - but need to check with @dnolen
can we split the contributing changes into a separate ticket/patch too?
I can apply that one now
I am bad at mentioning
yeah, the enhancement separated from the contributing changes
well these will be applied on much different time scales :)
I need to get a thumbs up from Rich on the enhancement but I can apply the contributing change now
Well, in terms of the docs, can we loop in @dnolen about cleaning up those misleading (?) scripts as well. And maybe a tiny change to the test runner to show something in the window, not just the console. Doesn't have to be fancy!
sure, I suspect David probably won’t be available till tomorrow but we are planning to do a core.async release tomorrow
@hlship you can indeed remove script/test
, that can’t really work since a l carte JS engine builds won’t have an async dispatch mechanism
so, i know that closing a channel propagates down (that is, if you close a from
channel, usually the to
channel will be closed as well), but what happens when you turn that around ? does closing a to
channel by default close the chain upwards ?
@lmergen no -- perhaps there are others reading the from
channel still, so you don't want to close it. As the doc says for pipe
and pipeline
: `"Will stop consuming the from channel if the to channel closes"
as the source for pipe
shows, from
is only referenced one place, and that's (<! from)
, so it's never closed:
([from to close?]
(go-loop []
(let [v (<! from)]
(if (nil? v)
(when close? (close! to))
(when (>! to v)
(recur)))))
to)
@lmergen you're welcome -- note that you would probably not have anyone taking from from
if you intend to pipe it to to
, as your takes from the to
channel would not be consistent then, but I guess it's good to know that you can take a previously piped channel and after closing the to
, still continue using it, if for some reason you wanted to; I'm curious as to your use case, as I think about why you'd care about this?
so, in my searching I've seen this question asked several times before but usually without any particularly clear answers - is there a way to identify whether a channel has an item waiting to be consumed without consuming said item? a.k.a. peek? I'm trying to create an event system with routing/subscription using core.async channels, but I want to be able to monitor the feeds for debugging purposes, to warn if there is an event stream that never had a handler attached
the only ideas I've had so far are 1) a peek based monitor, that periodically checks to see if any events have not been handled 2) an ack model, where all events are copied into a collection and aren't removed until an explicit ack message is received on another channel
you are using the standard core.async channels and buffers, correct? in other words, you have not implemented your own ManyToManyChannel
etc? @shader
when you say "waiting to be consumed," if you mean "would a <!!
block?" then that means that either the channel's buffer is not empty, or that there are pending puts onto the channel. you can check for these two as such, though you're digging into implementation details which may not be the best thing to do.
yes; I have not implemented any custom channels or buffers. Since I would be using the standard core.async implementation only, I suppose relying on implementation details isn't too bad
@joshjones thanks for the code
@hlship I removed the script/test script, added a hint to runtests.html, and updated the readme with the cljs test instructions
@shader @joshjones why not just use clojure.core.async/peek!
?
I mean, that’s what it is for
it doesn’t answer the “would reading from this channel block?” question because CSP can never make any such promise, but it returns nil or the item
still don't; not in the docs on http://clojure.github.io/core.async/
oh I’m sorry, it’s called poll!
- my bad
oh and it’s not peek, because it actually gets the item - so might be wrong for your case
but that’s a better behavior because CSP can never make the promise “reading this would never block” per se, that’s just not a thing it can express, without making a special construct like poll! that maybe reads and maybe doesn’t
well, poll! wouldn’t be the right thing for that either
xform on the chan that updates a mutable “read time”
maybe…
There is not a "selective receive" like erlang or "peek" in core.async, and my guess is that there will never be.
@ghadi right - what I was trying to say but much better put
it’s not an operation that CSP can do correctly
There's probably an analogy to coupling in there. Peeking into another process's internals, rather than relying upon communication
to the posters actual original concern (now that I understand it) a combination of “count of the channel’s buffer” with “time of last read from the channel” seems reasonable if all you are doing is raising an alarm about backlog or something
You can always introduce an intermediate/proxy process to read from some channels and mediate the distribution of values
just going on channel count would false positive on high throughput
instead of a stateful transducer that tells you how long its been since the last read from the chan, you could bundle each message as a tuple with a submission time, and track the deltas on each read (with an alarm condition for large deltas)
that's one of the directions I've been thinking; though in my particular case I'm more concerned with mistaken identifiers that result in a channel never being read from, rather than slow processing
the ack-based solution might be better, and affords additional flexibility in some ways, but also puts a greater burden on the recipient to implement the pattern
So what @shader originally asked was "is there a way to identify whether a channel has an item waiting to be consumed without consuming said item" -- checking the buf
and pending puts
is a way to verify whether or not this is the case. It doesn't modify state, doesn't receive anything, just takes a count. I'm not sure where the issue would be there.
but the root thing is knowing whether it’s being consumed
is there a way to check if there are any listeners/consumers associated with a channel?
core.async is based on CSP, but it's implemented with plain ol' data structures. I suppose one could (.lock (.mutex ch))
to verify atomicity of the count, if that level of correctness is required
@shader: correct me if I’m wrong but if there were 100 items unread but you were reading 50 a second, that wouldbe fine right? the issue isn’t how many are in buffer, but whether things are actively being read
yeah; I just wanted to give a warning if there is an event that doesn't have any registered handlers
yeah, that’s an odd fit for csp, and count is only a loose indicator of that at best
things in csp don’t ever have “registered handlers” - channels have reads and writes
which means you can’t check as an abstract questions “who reads this” - you can check if it’s been read, if you log such things, maybe you could even log who plans on reading, but you don’t have that status of being a dedicated reader
that can definitely be an intended behavior, but it’s not reified into the rules / semantics / guarantees in any way
quite a bit of writing clojure every day depends on relying on implementation details ... it's not perfect, but it's life. If you need to do something and there's no other way, sometimes it's a necessary tradeoff. Not good practice for a long term project, but when you need to get something done, sometimes it's the only way.
@shader maybe pub/sub are closer to the semantics you want? either core.async pub/sub or a more dedicated pub/sub system?
@joshjones you're guaranteed to have a bad time, and I'd never use a library that did that
If you want to know whether a set of channels have been read recently, you can do it like this:
C1 C2 C3 C4 C5 <===== proxy go block that uses alts! to tracks recent read times / metadata in a map ====> output channel
you don't have to look inside C1 - C5 to know whether they've been touched. Now this makes the assumption that only the proxy go block has access to the channels
(Also, whether you write a library or program I wouldn't advocate reacharounds into impl)
yes, we recently implemented our own ManyToManyChannel on top of a distributed messaging system to use a vanilla core.async channel to send distributed messages. In doing so, it was necessary to understand implementation details that otherwise one wouldn't care about, and this is just a fact of life. Much reading of the core.async code and rewriting a lot of it. Perfect encapsulation and data hiding is the realm of OOP. Separating concerns is good engineering, and I do it every day, but sometimes you need something to work that requires bending these rules.
I'm specifically advocating that in the original poster's question, the answer did not need to reach into impl.
> quite a bit of writing clojure every day depends on relying on implementation details that's not my general experience
I'll bet you know how a vector is implemented, and how it differs from how a list is implemented...
and the ramifications of that, for example, knowing that (rest x)
on a vector is slow, but the same operation on a list is near instantaneous
chunking of seqs ... 32 at a time ... it's all implementation specific, but stuff we need to know. I agree OT, just addressing your quoting me
I would like to dissuade people from reaching into a channel mutex when it's not minimal/necessary
you could write a buffer type that looks at the age of items in the buffer and does something if an item is too old
but the other options that I've thought of so far 1) a potentially complex set of intermediate channels, with alt! and timeout, etc, using backpressure from downstream to identify whether it's been read or not, or 2) some kind of ack-based system that copies all of the messages and discards the ones that are acknowledged, leaving a set of unhandled messages that can be checked with timeouts, etc.
https://github.com/hiredman/roundabout/blob/master/src/com/manigfeald/roundabout/reliable.clj
you can also look at say tcp, and how it does reliable delivery (assuming that is the end of goal of this)
reliable delivery is one of the goals. I'm basically trying to make an alternative to the re-frame event system that is more functional, using channels. re-frame kindly warns if you aren't listening for an event, but then doesn't keep it around for later processing. I would still like to have the benefit of the warnings, but also less ordering sensitivity while bootstrapping. Having said that, the event routing structure will probably have to be immutable to meet the fp design goal, so I'm probably not going to have any handlers "registering" via side-effects
I was working on a similar system based on most.js before I switched to clojurescript, which seems much better suited for it because the immutability and channels are part of the core
so what you want is something like what I have above, with an "unreliable" channel in the middle that drops messages that aren't subscribed to
the consumer will notice a missing sequence number, then signal the producer, at which point the producer can look it its buffer of unacked messages and signal that a given message was unhandled
that would be one way to do it I suppose, though it seems simpler if the messages were just buffered and I could detect that they had not been read - the warnings are really just for debugging purposes. If the system is operating as intended, the handlers should eventually be loaded, and it would be nice not to lose the events before then
right, so you have a debug flag in the producer that switches from drop and warn and retry forever (assuming handlers will all be loaded)
I think there are sort of two ways people approach core.async as a library 1. channels are a sort of interface, so if you want your widget to participate, make it like a channel 2. channels are messing passing primitives you can use to build whatever machines that expose some kind of interface
#1 ends up with sort of simple code that uses channels, but complex channel implementations #2 ends up with simpler channels and complex machines that use channels to communicate
interesting; I guess I don't normally think of providing alternate channel implementations
I usually think about providing wrappers to the interface functions. E.g., making them appear more functional by providing an equivalent to mix
that 1) returns the out instead of taking it as input, 2) takes the desired input channels as arguments, plus a 'control' channel that can add new channels, or toggle their state via messages. That way the channel structure is immutable, while the contents provide the dynamics