Fork me on GitHub
#core-async
<
2017-05-26
>
Alex Miller (Clojure team)00:05:26

re runtests - no, don’t think

Alex Miller (Clojure team)00:05:33

re script, yes - but need to check with @dnolen

Alex Miller (Clojure team)00:05:09

can we split the contributing changes into a separate ticket/patch too?

Alex Miller (Clojure team)00:05:18

I can apply that one now

hlship00:05:43

Ok, so you want a patch that's everything but CONTRIBUTING.md?

Alex Miller (Clojure team)00:05:02

yeah, the enhancement separated from the contributing changes

hlship00:05:28

I find that somewhat ridiculously fine-grained, but will put it together.

Alex Miller (Clojure team)00:05:51

well these will be applied on much different time scales :)

Alex Miller (Clojure team)00:05:15

I need to get a thumbs up from Rich on the enhancement but I can apply the contributing change now

hlship00:05:22

Well, in terms of the docs, can we loop in @dnolen about cleaning up those misleading (?) scripts as well. And maybe a tiny change to the test runner to show something in the window, not just the console. Doesn't have to be fancy!

hlship00:05:46

Boy Scout: Leave everything better than you found it.

Alex Miller (Clojure team)00:05:40

sure, I suspect David probably won’t be available till tomorrow but we are planning to do a core.async release tomorrow

dnolen13:05:51

@hlship you can indeed remove script/test, that can’t really work since a l carte JS engine builds won’t have an async dispatch mechanism

dnolen13:05:31

showing something in the window would be a nice enhancement - I agree

lmergen14:05:06

so, i know that closing a channel propagates down (that is, if you close a from channel, usually the to channel will be closed as well), but what happens when you turn that around ? does closing a to channel by default close the chain upwards ?

lmergen14:05:21

i should probably test this and figure out for myself… 🙂

joshjones14:05:21

@lmergen no -- perhaps there are others reading the from channel still, so you don't want to close it. As the doc says for pipe and pipeline: `"Will stop consuming the from channel if the to channel closes"

joshjones14:05:13

as the source for pipe shows, from is only referenced one place, and that's (<! from), so it's never closed:

([from to close?]
     (go-loop []
      (let [v (<! from)]
        (if (nil? v)
          (when close? (close! to))
          (when (>! to v)
            (recur)))))
     to)

lmergen14:05:13

yeah, that makes sense

lmergen14:05:20

ok, in that case, i’ll have to tailor this to my specific use case

lmergen14:05:32

thanks for the info!

joshjones14:05:03

@lmergen you're welcome -- note that you would probably not have anyone taking from from if you intend to pipe it to to, as your takes from the to channel would not be consistent then, but I guess it's good to know that you can take a previously piped channel and after closing the to, still continue using it, if for some reason you wanted to; I'm curious as to your use case, as I think about why you'd care about this?

shader14:05:51

so, in my searching I've seen this question asked several times before but usually without any particularly clear answers - is there a way to identify whether a channel has an item waiting to be consumed without consuming said item? a.k.a. peek? I'm trying to create an event system with routing/subscription using core.async channels, but I want to be able to monitor the feeds for debugging purposes, to warn if there is an event stream that never had a handler attached

shader14:05:47

the only ideas I've had so far are 1) a peek based monitor, that periodically checks to see if any events have not been handled 2) an ack model, where all events are copied into a collection and aren't removed until an explicit ack message is received on another channel

joshjones14:05:51

you are using the standard core.async channels and buffers, correct? in other words, you have not implemented your own ManyToManyChannel etc? @shader

joshjones14:05:00

when you say "waiting to be consumed," if you mean "would a <!! block?" then that means that either the channel's buffer is not empty, or that there are pending puts onto the channel. you can check for these two as such, though you're digging into implementation details which may not be the best thing to do.

joshjones14:05:55

@shader

(defn can-take
  [ch]
  (or (> (count (.puts ch)) 0)
      (> (count (.buf ch)) 0)))

shader14:05:11

yes; I have not implemented any custom channels or buffers. Since I would be using the standard core.async implementation only, I suppose relying on implementation details isn't too bad

shader14:05:43

@joshjones thanks for the code

Alex Miller (Clojure team)14:05:50

@hlship I removed the script/test script, added a hint to runtests.html, and updated the readme with the cljs test instructions

hlship15:05:56

ok, I was just starting work on a patch for that.

noisesmith16:05:05

@shader @joshjones why not just use clojure.core.async/peek! ?

noisesmith16:05:13

I mean, that’s what it is for

shader16:05:34

I didn't see that when I was looking...

noisesmith16:05:50

it doesn’t answer the “would reading from this channel block?” question because CSP can never make any such promise, but it returns nil or the item

noisesmith16:05:29

oh I’m sorry, it’s called poll! - my bad

noisesmith16:05:00

oh and it’s not peek, because it actually gets the item - so might be wrong for your case

noisesmith16:05:40

but that’s a better behavior because CSP can never make the promise “reading this would never block” per se, that’s just not a thing it can express, without making a special construct like poll! that maybe reads and maybe doesn’t

shader16:05:03

I don't really want to prove that reading won't block

shader16:05:11

I just want to warn if a particular channel hasn't been read in a period of time

noisesmith16:05:32

well, poll! wouldn’t be the right thing for that either

noisesmith16:05:42

xform on the chan that updates a mutable “read time”

ghadi16:05:05

There is not a "selective receive" like erlang or "peek" in core.async, and my guess is that there will never be.

ghadi16:05:44

I think it would lead to terrible errors to expose something like that.

noisesmith17:05:42

@ghadi right - what I was trying to say but much better put

noisesmith17:05:51

it’s not an operation that CSP can do correctly

ghadi17:05:36

There's probably an analogy to coupling in there. Peeking into another process's internals, rather than relying upon communication

noisesmith17:05:48

to the posters actual original concern (now that I understand it) a combination of “count of the channel’s buffer” with “time of last read from the channel” seems reasonable if all you are doing is raising an alarm about backlog or something

ghadi17:05:05

You can always introduce an intermediate/proxy process to read from some channels and mediate the distribution of values

noisesmith17:05:20

just going on channel count would false positive on high throughput

noisesmith17:05:05

instead of a stateful transducer that tells you how long its been since the last read from the chan, you could bundle each message as a tuple with a submission time, and track the deltas on each read (with an alarm condition for large deltas)

ghadi17:05:57

that seems fruitful ^ (adding metadata to the submission)

ghadi17:05:13

(not clojure metadata but just a plain map around the submission)

shader18:05:38

that's one of the directions I've been thinking; though in my particular case I'm more concerned with mistaken identifiers that result in a channel never being read from, rather than slow processing

shader18:05:32

the ack-based solution might be better, and affords additional flexibility in some ways, but also puts a greater burden on the recipient to implement the pattern

joshjones18:05:10

So what @shader originally asked was "is there a way to identify whether a channel has an item waiting to be consumed without consuming said item" -- checking the buf and pending puts is a way to verify whether or not this is the case. It doesn't modify state, doesn't receive anything, just takes a count. I'm not sure where the issue would be there.

shader18:05:15

yep; I'l probably start there honestly

noisesmith18:05:27

but the root thing is knowing whether it’s being consumed

joshjones18:05:43

what do you mean?

shader18:05:22

is there a way to check if there are any listeners/consumers associated with a channel?

joshjones18:05:33

core.async is based on CSP, but it's implemented with plain ol' data structures. I suppose one could (.lock (.mutex ch)) to verify atomicity of the count, if that level of correctness is required

noisesmith18:05:38

@shader: correct me if I’m wrong but if there were 100 items unread but you were reading 50 a second, that wouldbe fine right? the issue isn’t how many are in buffer, but whether things are actively being read

shader18:05:12

yeah; I just wanted to give a warning if there is an event that doesn't have any registered handlers

noisesmith18:05:32

yeah, that’s an odd fit for csp, and count is only a loose indicator of that at best

shader18:05:53

maybe "registered" is the wrong word...

noisesmith18:05:01

things in csp don’t ever have “registered handlers” - channels have reads and writes

ghadi18:05:40

Needless to say it's a really bad idea to read the implementation buf / puts

noisesmith18:05:47

which means you can’t check as an abstract questions “who reads this” - you can check if it’s been read, if you log such things, maybe you could even log who plans on reading, but you don’t have that status of being a dedicated reader

joshjones18:05:15

@ghadi depends on the use case

noisesmith18:05:32

that can definitely be an intended behavior, but it’s not reified into the rules / semantics / guarantees in any way

joshjones18:05:50

quite a bit of writing clojure every day depends on relying on implementation details ... it's not perfect, but it's life. If you need to do something and there's no other way, sometimes it's a necessary tradeoff. Not good practice for a long term project, but when you need to get something done, sometimes it's the only way.

noisesmith18:05:55

@shader maybe pub/sub are closer to the semantics you want? either core.async pub/sub or a more dedicated pub/sub system?

ghadi18:05:56

@joshjones you're guaranteed to have a bad time, and I'd never use a library that did that

joshjones18:05:06

is he writing a library? @ghadi

ghadi18:05:28

Not sure, but I wrote a bunch of core.async, just trying to give advice.

ghadi18:05:11

If you want to know whether a set of channels have been read recently, you can do it like this:

ghadi18:05:59

C1 C2 C3 C4 C5 <===== proxy go block that uses alts! to tracks recent read times / metadata in a map ====> output channel

ghadi18:05:41

you don't have to look inside C1 - C5 to know whether they've been touched. Now this makes the assumption that only the proxy go block has access to the channels

ghadi18:05:19

(Also, whether you write a library or program I wouldn't advocate reacharounds into impl)

joshjones18:05:45

yes, we recently implemented our own ManyToManyChannel on top of a distributed messaging system to use a vanilla core.async channel to send distributed messages. In doing so, it was necessary to understand implementation details that otherwise one wouldn't care about, and this is just a fact of life. Much reading of the core.async code and rewriting a lot of it. Perfect encapsulation and data hiding is the realm of OOP. Separating concerns is good engineering, and I do it every day, but sometimes you need something to work that requires bending these rules.

ghadi18:05:16

I'm specifically advocating that in the original poster's question, the answer did not need to reach into impl.

ghadi18:05:24

> quite a bit of writing clojure every day depends on relying on implementation details that's not my general experience

joshjones18:05:01

I'll bet you know how a vector is implemented, and how it differs from how a list is implemented...

joshjones18:05:27

and the ramifications of that, for example, knowing that (rest x) on a vector is slow, but the same operation on a list is near instantaneous

bronsa18:05:47

joshjones: FWIW rest on a vector is constant time and very fast

bronsa18:05:58

slower than on a list, sure, but not that much

ghadi18:05:41

hey this is veering off-topic

joshjones18:05:24

chunking of seqs ... 32 at a time ... it's all implementation specific, but stuff we need to know. I agree OT, just addressing your quoting me

ghadi18:05:59

I would like to dissuade people from reaching into a channel mutex when it's not minimal/necessary

shader18:05:11

so, I would definitely prefer not to dive into implementation details

hiredman18:05:33

you might look at a custom buffer type

hiredman18:05:25

you could write a buffer type that looks at the age of items in the buffer and does something if an item is too old

shader18:05:15

but the other options that I've thought of so far 1) a potentially complex set of intermediate channels, with alt! and timeout, etc, using backpressure from downstream to identify whether it's been read or not, or 2) some kind of ack-based system that copies all of the messages and discards the ones that are acknowledged, leaving a set of unhandled messages that can be checked with timeouts, etc.

shader18:05:37

@hiredman interesting option

hiredman18:05:32

you can also look at say tcp, and how it does reliable delivery (assuming that is the end of goal of this)

shader18:05:36

reliable delivery is one of the goals. I'm basically trying to make an alternative to the re-frame event system that is more functional, using channels. re-frame kindly warns if you aren't listening for an event, but then doesn't keep it around for later processing. I would still like to have the benefit of the warnings, but also less ordering sensitivity while bootstrapping. Having said that, the event routing structure will probably have to be immutable to meet the fp design goal, so I'm probably not going to have any handlers "registering" via side-effects

shader19:05:25

I was working on a similar system based on most.js before I switched to clojurescript, which seems much better suited for it because the immutability and channels are part of the core

hiredman19:05:54

so what you want is something like what I have above, with an "unreliable" channel in the middle that drops messages that aren't subscribed to

hiredman19:05:03

the consumer will notice a missing sequence number, then signal the producer, at which point the producer can look it its buffer of unacked messages and signal that a given message was unhandled

shader19:05:00

that would be one way to do it I suppose, though it seems simpler if the messages were just buffered and I could detect that they had not been read - the warnings are really just for debugging purposes. If the system is operating as intended, the handlers should eventually be loaded, and it would be nice not to lose the events before then

hiredman19:05:24

right, so you have a debug flag in the producer that switches from drop and warn and retry forever (assuming handlers will all be loaded)

shader19:05:21

good point

shader19:05:47

though 'retry forever' would probably just be a channel with a blocking put

hiredman19:05:27

I think there are sort of two ways people approach core.async as a library 1. channels are a sort of interface, so if you want your widget to participate, make it like a channel 2. channels are messing passing primitives you can use to build whatever machines that expose some kind of interface

hiredman19:05:49

the approaches are compatible, the main difference is where the complexity ends up

hiredman19:05:35

#1 ends up with sort of simple code that uses channels, but complex channel implementations #2 ends up with simpler channels and complex machines that use channels to communicate

shader20:05:13

interesting; I guess I don't normally think of providing alternate channel implementations

shader20:05:56

I usually think about providing wrappers to the interface functions. E.g., making them appear more functional by providing an equivalent to mix that 1) returns the out instead of taking it as input, 2) takes the desired input channels as arguments, plus a 'control' channel that can add new channels, or toggle their state via messages. That way the channel structure is immutable, while the contents provide the dynamics

shader20:05:54

this can be implemented as a wrapper around the existing mix that hides the messy side-effect details, though it may not be as efficient as a more complete custom solution

shader20:05:19

that's one of the things I liked about most.js - higher-order streams

hlship22:05:23

I have a side project where I created my own ReadPort because I a) wanted to ensure that clients didn't write to it and b) needed to do some special cleanup work when it was closed.