This page is not created by, affiliated with, or supported by Slack Technologies, Inc.
2022-06-21
Channels
- # announcements (3)
- # aws (8)
- # babashka (14)
- # beginners (39)
- # biff (22)
- # cider (5)
- # clj-kondo (1)
- # cljs-dev (12)
- # cljsjs (4)
- # clojure (16)
- # clojure-europe (47)
- # clojure-germany (6)
- # clojure-uk (2)
- # clojurescript (36)
- # core-async (29)
- # cursive (19)
- # datalevin (14)
- # etaoin (10)
- # helix (1)
- # hyperfiddle (6)
- # introduce-yourself (5)
- # kaocha (43)
- # keechma (1)
- # lsp (7)
- # nbb (68)
- # new-channels (1)
- # off-topic (12)
- # pathom (11)
- # quil (14)
- # rdf (3)
- # re-frame (5)
- # reitit (6)
- # shadow-cljs (88)
Is there anyway to add metadata to a chan? It throws ; class clojure.core.async.impl.channels.ManyToManyChannel cannot be cast to class clojure.lang.IObj
when using (with-meta …). Particularly, I am trying to make chan’s implement a protocol via metadata, can’t use extend-type as implementations for this protocol may vary per producer
You can’t, as the deftype
for channels does not support metadata.
You could create a defrecord
that wraps a channel and implements the same protocols, however this makes you depend on core.async internals. So this is not recommended.
I want to implement a layer of ch’s on top of internal producers, thinks like websocket connections, where you might do subscriptions etc. Then I want to be able to close such connection, closing the chan but also releasing the internal resources used to produce values on such chan (ws subscription, etc).
There are other ways like creating something that wraps the chan inside and that it implements such protocol, although I was trying to make it as simple as possible, and that you could ‘unsubscribe’ the channel directly, and that would close the needed connection on the producer and also close! the chan
Just use a type/record that has a get-chan
method?
I’d argue that wrapping/composing is simpler than extending protocols
So far I’m inclined to wrap the ch in something that also implements the async/ReadPort, so you can use the stream as the channel
Not sure if I got all of this. You want to have a producer process that puts something on a channel. Then when this output channel is closed, you want to release resources? If so, why would you need a protocol for the producer at all?
I want the consumer to be the one closing the channel (and the resources needed to produce values like internal subscriptions)
mostly for realtime streams/subscriptions, using async ch’s as the delivery mechanism
I’m liking what you suggested about wrapping the chan and have a protocol with get-chan and close, although I’d like to extend the chan directly, I can still make implementations that also implement p/ReadPort and dispatch to the implementation on the internal ch
> I want the consumer to be the one closing the channel (and the resources needed to produce values like internal subscriptions) Rereading this, think It’s not quite clear, I want the consumer to close the connection/subscription/stream, then it would be the producer who would close the ch
Putting functions like >!
and put!
give feedback whether the put succeeded or the channel was already closed. So in theory, you could just let your consumers close their input channel, and eventually producers will fail to put something on the channel and can react on that. Then you would not need any extra protocol.
putting on a closed channel returns false, so you can totally do something like (if (>! ch v) (continue) (cleanup))
I tried that in the past, but it makes the async pipelines inside the producer a bit more convoluted, as that won't work if you use that chan to connect to a tap or sub to a pub, needing a custom pipe and more chans to be the ones connected.
Yes, in general my api is now with buf-or-n as param to make the producer create the chan, and also I prefer the producer to be the one closing it
thanks both for the ideas, I think I'll go with wrapping the chan inside something and have new protocol for closing that is called by the consumers to signal the producer that must interrupt the producing and closes the ch
You can use the same channel to communicate back to the producer I think, or use a separate one
For example, have a cleanup channel, have the producer poll it, if it polls a :cleanup it cleans up and stop producing.
This isn't a perfect stop, it's possible in the meantime the producer produced something more. If you use the same channel you can ping/pong them. Producer takes a :produce from the channel, consumer takes the produced result and puts a :produce back when it wants the next one. If it's done, it could put a :cleanup instead.
For now I have gone with this:
(defprotocol IStream
(close! [this] "Closes given stream")
(ch [this] "Returns channel for this stream"))
(defrecord Stream [ch
on-close
closed?_]
IStream
(ch [_] ch)
(close! [_]
(let [[prev _] (reset-vals! closed?_ true)]
(when-not prev (on-close) (a/close! ch)))
p/ReadPort
(take! [_ fnh] (p/take! ch fnh)))
(defn create
"Creates a stream given a ch and nullary fn on-close, that will be called
once when the Stream is closed.
Experimental: The resulting Stream also implements core.async/ReadPort so you can
consume the stream directly as you would a chan (take!/<!/<!!/...)."
[ch on-close]
(->Stream ch on-close (atom false)))
which solves my problem and I think it’s quite simple. In this solution it’s not perfect stop either but that’s fine, what I’d usually do is call close! on the stream, then <!! the process consuming it to wait until everything is processed