This page is not created by, affiliated with, or supported by Slack Technologies, Inc.
2020-11-04
Channels
- # announcements (4)
- # babashka (15)
- # beginners (147)
- # bristol-clojurians (8)
- # calva (6)
- # chlorine-clover (39)
- # clj-kondo (29)
- # clojure (95)
- # clojure-australia (1)
- # clojure-berlin (1)
- # clojure-europe (24)
- # clojure-nl (3)
- # clojure-spec (185)
- # clojure-uk (98)
- # clojured (2)
- # conjure (3)
- # core-async (26)
- # datomic (11)
- # etaoin (1)
- # events (1)
- # fulcro (26)
- # graalvm (3)
- # graphql (4)
- # jobs (7)
- # jobs-discuss (1)
- # kaocha (12)
- # leiningen (21)
- # malli (2)
- # meander (2)
- # parinfer (3)
- # pathom (3)
- # pedestal (5)
- # remote-jobs (2)
- # shadow-cljs (71)
- # spacemacs (2)
- # sql (4)
- # tools-deps (22)
- # tree-sitter (1)
- # vim (2)
- # xtdb (5)
I'm not sure if this is the right place to ask, but is there a preferred lightweight solution for treating remote machines (e.g. those running separate JVM processes and accessible over a network) as core.async
channels? What's the idiomatic way to send some long-running computation to another machine on the wire? Most of the stuff I've seen when looking up information (`onyx`, dragonmark
, etc.) looks unmaintained.
the core.async semantics break with network fallibility, you don't get the same correctness guarantees or straightforward behaviors
I'd treat the network as an IO source/sink, not as a part of a larger async channel system
;; clj -J-Xmx32m -Sdeps '{:deps {org.jgroups/jgroups {:mvn/version "4.0.12.Final"} org.clojure/core.async {:mvn/version "0.4.490"}}}'
(require '[clojure.core.async :as async]
'[clojure.core.async.impl.protocols :as p])
(defn cluster-channel [cluster-name]
(let [in (async/chan)
out (async/chan)
jchannel (org.jgroups.JChannel.)
_ (.connect jchannel cluster-name)
_ (.setReceiver
jchannel
(proxy [org.jgroups.ReceiverAdapter] []
(receive [msg-batch]
(doseq [msg (.array msg-batch)
:let [msg (.getObject msg)]]
(async/>!! in msg)))))]
(async/go-loop []
(let [msg (async/<! out)]
(if (some? msg)
(do
(async/<!
(async/thread
(.send jchannel (org.jgroups.Message. nil msg))))
(recur))
(.close jchannel))))
(reify
p/ReadPort
(take! [_ fn1-handler]
(p/take! in fn1-handler))
p/WritePort
(put! [_ val fn1-handler]
(p/put! out val fn1-handler))
p/Channel
(close! [_]
(p/close! in)
(p/close! out))
(closed? [_]
(or (p/closed? in)
(p/closed? out))))))
Hello! Given a number of channels, each with a single collection, how do I create a single channel with the individual items concatenated? It is certainly trivial but I cannot see it... I mean, are there functions that I can do that? Or do I need to make a low-level solution with go-loop
? I guess cat
is not usable here?
core.async/map
"Takes a function and a collection of source channels, and returns a
channel which contains the values produced by applying f to the set
of first items taken from each source channel, followed by applying
f to the set of second items from each channel, until any one of the
channels is closed, at which point the output channel will be
closed. The returned channel will be unbuffered by default, or a
buf-or-n can be supplied"
Thank you! But this is not what I want, I have ch1 with [1 2 3] and ch2 with [4 5 6] and want a channel with 1 2 3 4 5 6.
that will give me a single channel with [1 2 3] [4 5 6]
so it is a step in the right direction...
You mean (pipe ch-with-colls (chan 1 (cat *rf*)))
? But what should be the *rf*
function?
ah, ok, will try, thanks!!!
transducers are still a mystery to me even after having used them. And the docs are famously terse, for people that already know what they are doing đ
Thanks @jkr.sw but I am not sure what that would do. You would call (concat 1st-elm-of-1st-ch 1st-elm-of-2nd-ch ..)
which would not work I think.
Lets take async out of the question. What you propose is similar to (map concat [[1 2 3] [4 5 6]])
You run (concat chN)
for each channel, which does not make sense.