Fork me on GitHub
#core-async
<
2022-06-21
>
bortexz09:06:21

Is there anyway to add metadata to a chan? It throws ; class clojure.core.async.impl.channels.ManyToManyChannel cannot be cast to class clojure.lang.IObj when using (with-meta …). Particularly, I am trying to make chan’s implement a protocol via metadata, can’t use extend-type as implementations for this protocol may vary per producer

Ferdinand Beyer10:06:57

You can’t, as the deftype for channels does not support metadata. You could create a defrecord that wraps a channel and implements the same protocols, however this makes you depend on core.async internals. So this is not recommended.

Ben Sless10:06:28

What use case are you aiming at?

bortexz11:06:07

I want to implement a layer of ch’s on top of internal producers, thinks like websocket connections, where you might do subscriptions etc. Then I want to be able to close such connection, closing the chan but also releasing the internal resources used to produce values on such chan (ws subscription, etc).

bortexz11:06:34

There are other ways like creating something that wraps the chan inside and that it implements such protocol, although I was trying to make it as simple as possible, and that you could ‘unsubscribe’ the channel directly, and that would close the needed connection on the producer and also close! the chan

Ferdinand Beyer11:06:39

Just use a type/record that has a get-chan method?

Ferdinand Beyer11:06:44

I’d argue that wrapping/composing is simpler than extending protocols

bortexz11:06:07

So far I’m inclined to wrap the ch in something that also implements the async/ReadPort, so you can use the stream as the channel

bortexz11:06:16

Even if it relies on internals

bortexz11:06:04

I’d prefer to have as few layers as possible, but yes that’s also a solution

Ferdinand Beyer12:06:55

Not sure if I got all of this. You want to have a producer process that puts something on a channel. Then when this output channel is closed, you want to release resources? If so, why would you need a protocol for the producer at all?

bortexz12:06:02

I want the consumer to be the one closing the channel (and the resources needed to produce values like internal subscriptions)

bortexz12:06:10

mostly for realtime streams/subscriptions, using async ch’s as the delivery mechanism

bortexz12:06:58

I’m liking what you suggested about wrapping the chan and have a protocol with get-chan and close, although I’d like to extend the chan directly, I can still make implementations that also implement p/ReadPort and dispatch to the implementation on the internal ch

bortexz12:06:55

> I want the consumer to be the one closing the channel (and the resources needed to produce values like internal subscriptions) Rereading this, think It’s not quite clear, I want the consumer to close the connection/subscription/stream, then it would be the producer who would close the ch

Ferdinand Beyer13:06:59

Putting functions like >! and put! give feedback whether the put succeeded or the channel was already closed. So in theory, you could just let your consumers close their input channel, and eventually producers will fail to put something on the channel and can react on that. Then you would not need any extra protocol.

Ben Sless13:06:18

putting on a closed channel returns false, so you can totally do something like (if (>! ch v) (continue) (cleanup))

Ben Sless13:06:39

Basically what Ferdinand said 🙂

bortexz14:06:17

I tried that in the past, but it makes the async pipelines inside the producer a bit more convoluted, as that won't work if you use that chan to connect to a tap or sub to a pub, needing a custom pipe and more chans to be the ones connected.

Ben Sless14:06:07

It's easier to conceptually have producers own channels instead of consumers

bortexz14:06:17

Yes, in general my api is now with buf-or-n as param to make the producer create the chan, and also I prefer the producer to be the one closing it

bortexz14:06:15

thanks both for the ideas, I think I'll go with wrapping the chan inside something and have new protocol for closing that is called by the consumers to signal the producer that must interrupt the producing and closes the ch

didibus20:06:39

You can use the same channel to communicate back to the producer I think, or use a separate one

didibus20:06:44

For example, have a cleanup channel, have the producer poll it, if it polls a :cleanup it cleans up and stop producing.

didibus20:06:03

Then the consumer when done can just put to that channel

didibus20:06:51

This isn't a perfect stop, it's possible in the meantime the producer produced something more. If you use the same channel you can ping/pong them. Producer takes a :produce from the channel, consumer takes the produced result and puts a :produce back when it wants the next one. If it's done, it could put a :cleanup instead.

bortexz20:06:41

For now I have gone with this:

(defprotocol IStream
  (close! [this] "Closes given stream")
  (ch [this] "Returns channel for this stream"))

(defrecord Stream [ch
                   on-close
                   closed?_]
  IStream
  (ch [_] ch)
  (close! [_]
    (let [[prev _] (reset-vals! closed?_ true)]
      (when-not prev (on-close) (a/close! ch)))
  p/ReadPort
  (take! [_ fnh] (p/take! ch fnh)))

(defn create
  "Creates a stream given a ch and nullary fn on-close, that will be called
   once when the Stream is closed. 
   Experimental: The resulting Stream also implements core.async/ReadPort so you can
   consume the stream directly as you would a chan (take!/<!/<!!/...)."
  [ch on-close]
  (->Stream ch on-close (atom false)))
which solves my problem and I think it’s quite simple. In this solution it’s not perfect stop either but that’s fine, what I’d usually do is call close! on the stream, then <!! the process consuming it to wait until everything is processed

bortexz20:06:28

two chans for communicating with the producer seems a bit more complex for my needs right now, also when coding producers I’d just connect it to a pub internally, and unsub when close! is called, etc..