Fork me on GitHub
#core-async
<
2020-08-26
>
seralbdev17:08:44

Hi all... first time here I hope I don't break anything...I am Clojure beginner evaluating core.async and I have a situation I cannot understand

phronmophobic18:08:54

trying to take all this in. if you reformat the example into a code block, it makes it easier to process.

seralbdev18:08:53

thanks so much...sorry but first time using Slack...how do I reformat it?

seralbdev17:08:48

(defonce server (atom nil)) (def flag (atom false)) (def rawreqchan (async/chan 1)) (def decreqchan (async/chan 1)) (def seye-auth-chan (async/chan 1)) (def reqpub (async/pub decreqchan (fn [arg] (:fid arg)))) (defn handle-msg [rawreqchan rawreq] (log/debug "handle-msg: " rawreq) (async/>!! rawreqchan rawreq)) (defn pump! [inchan outchan] (let [dict nil] ;;(build-dict) (async/go (loop [] (log/debug "waiting for req...") (let [msg (async/<! inchan)] (if-not (some? msg) (do (log/debug "closing!...") (async/close! outchan)) (do (log/debug "decoding req...") (let [decdata "data"];; (decode-msg! msg dict) (log/debug "forwarding decoded req...") (async/>! outchan decdata) (recur))))))))) (defn start [] (let [_ nil] (async/go (pump! rawreqchan decreqchan)) (async/go (loop [] (when-some [v (async/<! seye-auth-chan)] (log/debug "dec: " v) (recur)))) (async/sub reqpub 2 seye-auth-chan) (future (async/>!! rawreqchan {:fid 2 :sid 1 :data "dummy"})))) ;;(reset! server (msg/listen! 21000 (partial handle-msg rawreqchan) {:endianess :LE}))))

seralbdev17:08:44

calling (start) gives this result asyncedge.bootstrap> (start) #<Future@15349450: true>19:44:46.712 [async-dispatch-5] DEBUG asyncedge.bootstrap - waiting for req... 19:44:46.713 [async-dispatch-5] DEBUG asyncedge.bootstrap - decoding req... 19:44:46.714 [async-dispatch-5] DEBUG asyncedge.bootstrap - forwarding decoded req... 19:44:46.714 [async-dispatch-5] DEBUG asyncedge.bootstrap - waiting for req...

seralbdev17:08:31

but...this produces the right effect asyncedge.bootstrap> (async/>!! decreqchan {:fid 2 :sid 1 :data "data"}) true19:45:14.410 [async-dispatch-1] DEBUG asyncedge.bootstrap - dec: {:fid 2, :sid 1, :data data}

seralbdev17:08:09

The pub & sub seems to be working OK but, somehow, the call to (async/>! outchan decdata) inside the pump! function does not produce the same result...

seralbdev18:08:37

If instead of this (when-some [v (async/<! seye-auth-chan)] put this (when-some [v (async/<! decreqchan)] and call (start) things work fine, so the "pipeline" between rawreqchan and decreqchan is working...

seralbdev18:08:27

Its been 24 hours thinking about this and I cannot get it... some subtle detail from core.async internals??

seralbdev18:08:40

(defonce server (atom nil))
(def flag (atom false))
(def rawreqchan (async/chan 1))
(def decreqchan (async/chan 1))
(def seye-auth-chan (async/chan 1))
(def reqpub (async/pub decreqchan (fn [arg] (:fid arg))))

(defn handle-msg [rawreqchan rawreq]
  (log/debug "handle-msg: " rawreq)
  (async/>!! rawreqchan rawreq))

(defn pump! [inchan outchan]
  (let [dict nil] ;;(build-dict)
    (async/go
      (loop []      
        (log/debug "waiting for req...")
        (let [msg (async/<! inchan)]
          (if-not (some? msg)
            (do
              (log/debug "closing!...")
              (async/close! outchan))
            (do
              (log/debug "decoding req...")
              (let [decdata "data"];; (decode-msg! msg dict)
                (log/debug "forwarding decoded req...")
                (async/>! outchan decdata)
                (recur)))))))))


(defn start []
  (let [_ nil]

    (async/go
      (pump! rawreqchan decreqchan))

    (async/go
      (loop []
        (when-some [v (async/<! seye-auth-chan)]
          (log/debug "dec: " v)
          (recur))))  

    (async/sub reqpub 2 seye-auth-chan)

    (future (async/>!! rawreqchan {:fid 2 :sid 1 :data "dummy"}))))
    ;;(reset! server (msg/listen! 21000 (partial handle-msg rawreqchan) {:endianess :LE}))))

seralbdev18:08:27

asyncedge.bootstrap> (start)
#<Future@15349450: true>19:44:46.712 [async-dispatch-5] DEBUG asyncedge.bootstrap - waiting for req...
19:44:46.713 [async-dispatch-5] DEBUG asyncedge.bootstrap - decoding req...
19:44:46.714 [async-dispatch-5] DEBUG asyncedge.bootstrap - forwarding decoded req...
19:44:46.714 [async-dispatch-5] DEBUG asyncedge.bootstrap - waiting for req...

seralbdev18:08:42

asyncedge.bootstrap> (async/>!! decreqchan {:fid 2 :sid 1 :data "data"})
true19:45:14.410 [async-dispatch-1] DEBUG asyncedge.bootstrap - dec:  {:fid 2, :sid 1, :data data}

phronmophobic18:08:17

do you have a high level description of what this code is trying to do?

phronmophobic18:08:17

what's the difference between pump! and the built in pipe?

seralbdev18:08:22

I am just trying to play a bit with core.async so doing the transformation and copy between channels "by hand"

seralbdev18:08:53

pump function will decode a request...

seralbdev18:08:10

and put the result in decreqchan

seralbdev18:08:26

a pub will publish it and a sub should receive it

seralbdev18:08:02

I can't get why a direct >!! in decreqchan works (pub & sub are triggered)

seralbdev18:08:43

but inside pump! not

seralbdev18:08:35

which ends up writing in decreqchan anyway...

seralbdev18:08:53

TCP server -> msg put in rawreqchan -> pump -> decreqchan -> pub -> sub -> seye-auth-chan

seralbdev18:08:11

TCP server is disabled now, just inserting directly in rawreqchan

phronmophobic18:08:55

the data is different, in one case, it's {:fid 2 :sid 1 :data "dummy"}, in the other, it's "data"

seralbdev18:08:35

yeah that is not relevant...all the decoding mechanism is now disabled...

phronmophobic18:08:52

right, but isn't the pub/sub based on the :fid key?

seralbdev18:08:31

umm the map is {:fid :sid :data} at this moment the relevant part is :fid yes, this is the dispatch key but the :data content is not being used

phronmophobic18:08:33

in pump, you're putting "data". try changing the let from

(let [decdata "data"] ...
to
(let [decdata {:fid 2 :sid 1 :data "dummy"}] ...

seralbdev18:08:18

OMG you are right

seralbdev18:08:28

this is stupid

phronmophobic18:08:18

if you have a persistent bug, it's always hiding in the place where you assume everything is working

seralbdev18:08:57

😄 yeah that's quite true...also our brain insists in saying you are right and tunnel vision

seralbdev18:08:35

thanks so much for taking time to help. very much appreciated!

👍 3
phronmophobic18:08:00

so I think it is relevant

hiredman18:08:24

I would start by making sure you aren't shooting your self in the foot by running multiple copies of this code in your repl all sharing the global channels

phronmophobic18:08:30

you only have a subscription for data that has an :fid value of 2

hiredman18:08:43

you can make sure of that by verifying the behavior in a fresh started repl

seralbdev18:08:29

yeah but in both cases (future & direct call) :fid = 2

seralbdev18:08:57

😄 yeah that's quite true...also our brain insists in saying you are right and tunnel vision