Fork me on GitHub
#core-async
<
2020-11-04
>
respatialized15:11:04

I'm not sure if this is the right place to ask, but is there a preferred lightweight solution for treating remote machines (e.g. those running separate JVM processes and accessible over a network) as core.async channels? What's the idiomatic way to send some long-running computation to another machine on the wire? Most of the stuff I've seen when looking up information (`onyx`, dragonmark, etc.) looks unmaintained.

noisesmith16:11:25

the core.async semantics break with network fallibility, you don't get the same correctness guarantees or straightforward behaviors

noisesmith16:11:56

I'd treat the network as an IO source/sink, not as a part of a larger async channel system

noisesmith16:11:16

because it has failure modes core.async doesn't have

👍 12
hiredman18:11:07

;; clj -J-Xmx32m -Sdeps '{:deps {org.jgroups/jgroups {:mvn/version "4.0.12.Final"} org.clojure/core.async {:mvn/version "0.4.490"}}}'

(require '[clojure.core.async :as async]
         '[clojure.core.async.impl.protocols :as p])

(defn cluster-channel [cluster-name]
  (let [in (async/chan)
        out (async/chan)
        jchannel (org.jgroups.JChannel.)
        _ (.connect jchannel cluster-name)
        _ (.setReceiver
           jchannel
           (proxy [org.jgroups.ReceiverAdapter] []
             (receive [msg-batch]
               (doseq [msg (.array msg-batch)
                       :let [msg (.getObject msg)]]
                 (async/>!! in msg)))))]
    (async/go-loop []
      (let [msg (async/<! out)]
        (if (some? msg)
          (do
            (async/<!
             (async/thread
               (.send jchannel (org.jgroups.Message. nil msg))))
            (recur))
          (.close jchannel))))
    (reify
      p/ReadPort
      (take! [_ fn1-handler]
        (p/take! in fn1-handler))
      p/WritePort
      (put! [_ val fn1-handler]
        (p/put! out val fn1-handler))
      p/Channel
      (close! [_]
        (p/close! in)
        (p/close! out))
      (closed? [_]
        (or (p/closed? in)
            (p/closed? out))))))

😮 3
holyjak19:11:16

Hello! Given a number of channels, each with a single collection, how do I create a single channel with the individual items concatenated? It is certainly trivial but I cannot see it... I mean, are there functions that I can do that? Or do I need to make a low-level solution with go-loop ? I guess cat is not usable here?

ghadi19:11:01

core.async/map "Takes a function and a collection of source channels, and returns a channel which contains the values produced by applying f to the set of first items taken from each source channel, followed by applying f to the set of second items from each channel, until any one of the channels is closed, at which point the output channel will be closed. The returned channel will be unbuffered by default, or a buf-or-n can be supplied"

ghadi19:11:21

sounds simple, not a trivial impl though @holyjak

ghadi19:11:01

(core.async/map vector chs)

ghadi19:11:28

if I understand you correctly

holyjak19:11:48

Thank you! But this is not what I want, I have ch1 with [1 2 3] and ch2 with [4 5 6] and want a channel with 1 2 3 4 5 6.

ghadi19:11:58

core.async/merge

holyjak19:11:21

that will give me a single channel with [1 2 3] [4 5 6]

holyjak19:11:51

so it is a step in the right direction...

ghadi19:11:53

cat transducer

ghadi19:11:05

merge -> pipe -> ch with cat transducer

Jan K19:11:08

(clojure.core.async/map concat chans) ? (ah this doesn't unwrap the collection)

holyjak19:11:57

You mean (pipe ch-with-colls (chan 1 (cat *rf*))) ? But what should be the *rf* function?

ghadi19:11:31

(chan 1 cat)

holyjak19:11:39

ah, ok, will try, thanks!!!

holyjak19:11:12

transducers are still a mystery to me even after having used them. And the docs are famously terse, for people that already know what they are doing 🙂

holyjak19:11:22

Thanks @jkr.sw but I am not sure what that would do. You would call (concat 1st-elm-of-1st-ch 1st-elm-of-2nd-ch ..) which would not work I think.

Jan K19:11:49

if I understood well that would be (concat [1 2 3] [4 5 6]) => (1 2 3 4 5 6)

holyjak15:11:07

Lets take async out of the question. What you propose is similar to (map concat [[1 2 3] [4 5 6]]) You run (concat chN) for each channel, which does not make sense.

Jan K17:11:00

I don't think this is equivalent to what happens with clojure.core.async/map. Try this:

(def c1 (doto (a/chan 10) (a/put! [1 2 3])))
(def c2 (doto (a/chan 10) (a/put! [4 5 6])))
(a/<!! (a/map concat [c1 c2]))
; => (1 2 3 4 5 6)

👍 3