This page is not created by, affiliated with, or supported by Slack Technologies, Inc.
2020-08-26
Channels
- # announcements (4)
- # babashka (8)
- # beginners (75)
- # calva (26)
- # cider (33)
- # circleci (5)
- # clojure (36)
- # clojure-dev (2)
- # clojure-europe (25)
- # clojure-nl (4)
- # clojure-spec (38)
- # clojure-uk (47)
- # clojurescript (15)
- # cloverage (2)
- # conjure (64)
- # core-async (41)
- # cursive (13)
- # data-science (2)
- # datalog (16)
- # datascript (22)
- # datomic (9)
- # duct (2)
- # emacs (81)
- # figwheel-main (1)
- # fulcro (4)
- # jobs (2)
- # jobs-discuss (29)
- # kaocha (3)
- # meander (11)
- # membrane (21)
- # off-topic (1)
- # portal (1)
- # re-frame (1)
- # reitit (1)
- # releases (2)
- # remote-jobs (31)
- # shadow-cljs (26)
- # specter (5)
- # sql (26)
- # tools-deps (19)
- # vim (12)
Hi all... first time here I hope I don't break anything...I am Clojure beginner evaluating core.async and I have a situation I cannot understand
trying to take all this in. if you reformat the example into a code block, it makes it easier to process.
:thumbsup:
(defonce server (atom nil)) (def flag (atom false)) (def rawreqchan (async/chan 1)) (def decreqchan (async/chan 1)) (def seye-auth-chan (async/chan 1)) (def reqpub (async/pub decreqchan (fn [arg] (:fid arg)))) (defn handle-msg [rawreqchan rawreq] (log/debug "handle-msg: " rawreq) (async/>!! rawreqchan rawreq)) (defn pump! [inchan outchan] (let [dict nil] ;;(build-dict) (async/go (loop [] (log/debug "waiting for req...") (let [msg (async/<! inchan)] (if-not (some? msg) (do (log/debug "closing!...") (async/close! outchan)) (do (log/debug "decoding req...") (let [decdata "data"];; (decode-msg! msg dict) (log/debug "forwarding decoded req...") (async/>! outchan decdata) (recur))))))))) (defn start [] (let [_ nil] (async/go (pump! rawreqchan decreqchan)) (async/go (loop [] (when-some [v (async/<! seye-auth-chan)] (log/debug "dec: " v) (recur)))) (async/sub reqpub 2 seye-auth-chan) (future (async/>!! rawreqchan {:fid 2 :sid 1 :data "dummy"})))) ;;(reset! server (msg/listen! 21000 (partial handle-msg rawreqchan) {:endianess :LE}))))
calling (start) gives this result asyncedge.bootstrap> (start) #<Future@15349450: true>19:44:46.712 [async-dispatch-5] DEBUG asyncedge.bootstrap - waiting for req... 19:44:46.713 [async-dispatch-5] DEBUG asyncedge.bootstrap - decoding req... 19:44:46.714 [async-dispatch-5] DEBUG asyncedge.bootstrap - forwarding decoded req... 19:44:46.714 [async-dispatch-5] DEBUG asyncedge.bootstrap - waiting for req...
but...this produces the right effect asyncedge.bootstrap> (async/>!! decreqchan {:fid 2 :sid 1 :data "data"}) true19:45:14.410 [async-dispatch-1] DEBUG asyncedge.bootstrap - dec: {:fid 2, :sid 1, :data data}
The pub & sub seems to be working OK but, somehow, the call to (async/>! outchan decdata) inside the pump! function does not produce the same result...
If instead of this (when-some [v (async/<! seye-auth-chan)] put this (when-some [v (async/<! decreqchan)] and call (start) things work fine, so the "pipeline" between rawreqchan and decreqchan is working...
Its been 24 hours thinking about this and I cannot get it... some subtle detail from core.async internals??
(defonce server (atom nil))
(def flag (atom false))
(def rawreqchan (async/chan 1))
(def decreqchan (async/chan 1))
(def seye-auth-chan (async/chan 1))
(def reqpub (async/pub decreqchan (fn [arg] (:fid arg))))
(defn handle-msg [rawreqchan rawreq]
(log/debug "handle-msg: " rawreq)
(async/>!! rawreqchan rawreq))
(defn pump! [inchan outchan]
(let [dict nil] ;;(build-dict)
(async/go
(loop []
(log/debug "waiting for req...")
(let [msg (async/<! inchan)]
(if-not (some? msg)
(do
(log/debug "closing!...")
(async/close! outchan))
(do
(log/debug "decoding req...")
(let [decdata "data"];; (decode-msg! msg dict)
(log/debug "forwarding decoded req...")
(async/>! outchan decdata)
(recur)))))))))
(defn start []
(let [_ nil]
(async/go
(pump! rawreqchan decreqchan))
(async/go
(loop []
(when-some [v (async/<! seye-auth-chan)]
(log/debug "dec: " v)
(recur))))
(async/sub reqpub 2 seye-auth-chan)
(future (async/>!! rawreqchan {:fid 2 :sid 1 :data "dummy"}))))
;;(reset! server (msg/listen! 21000 (partial handle-msg rawreqchan) {:endianess :LE}))))
asyncedge.bootstrap> (start)
#<Future@15349450: true>19:44:46.712 [async-dispatch-5] DEBUG asyncedge.bootstrap - waiting for req...
19:44:46.713 [async-dispatch-5] DEBUG asyncedge.bootstrap - decoding req...
19:44:46.714 [async-dispatch-5] DEBUG asyncedge.bootstrap - forwarding decoded req...
19:44:46.714 [async-dispatch-5] DEBUG asyncedge.bootstrap - waiting for req...
asyncedge.bootstrap> (async/>!! decreqchan {:fid 2 :sid 1 :data "data"})
true19:45:14.410 [async-dispatch-1] DEBUG asyncedge.bootstrap - dec: {:fid 2, :sid 1, :data data}
do you have a high level description of what this code is trying to do?
what's the difference between pump!
and the built in pipe
?
I am just trying to play a bit with core.async so doing the transformation and copy between channels "by hand"
TCP server -> msg put in rawreqchan -> pump -> decreqchan -> pub -> sub -> seye-auth-chan
the data is different, in one case, it's {:fid 2 :sid 1 :data "dummy"}
, in the other, it's "data"
right, but isn't the pub/sub based on the :fid
key?
umm the map is {:fid :sid :data} at this moment the relevant part is :fid yes, this is the dispatch key but the :data content is not being used
in pump, you're putting "data". try changing the let from
(let [decdata "data"] ...
to
(let [decdata {:fid 2 :sid 1 :data "dummy"}] ...
if you have a persistent bug, it's always hiding in the place where you assume everything is working
😄 yeah that's quite true...also our brain insists in saying you are right and tunnel vision
so I think it is relevant
I would start by making sure you aren't shooting your self in the foot by running multiple copies of this code in your repl all sharing the global channels
you only have a subscription for data that has an :fid
value of 2
😄 yeah that's quite true...also our brain insists in saying you are right and tunnel vision