Fork me on GitHub
#core-async
<
2021-06-29
>
souenzzo02:06:36

Hello. After study core.async and some async API's I end up with this "recipe" to turn an sync API into a async one (in terms of interface). I would like a feedback about this, if it's clearly wrong, something that could be done in a better way. Anything 🙂 https://gist.github.com/souenzzo/ee4f1ef4124d21993e0c2e55d4626ce9

Ben Sless10:06:55

I'm not convinced regarding the return channel. It's essentially like passing a promise to a function. It's passing by-ref instead of by value. Why not return a thread or go which are already channels? The exception handler is a good idea

souenzzo11:06:22

To be honest, IDK. I saw it in some place and just reproduced the pattern. Many other places accept a xform This night I will play and prototype with my sample APIs and try to remove the chan from the params

Ben Sless11:06:31

A xform isn't a good fit here either, imo

Ben Sless12:06:03

You want simple composable APIs

souenzzo16:06:27

@UK0810AQ2 you have some API to use example?

Ben Sless16:06:43

An api call returns a thing. In what context is it appropriate to apply a transducer to one thing and not a sequence of things? In terms of converting synchronous to async calls, this post has a good example in the end http://danboykis.com/posts/things-i-wish-i-knew-about-core-async/ When you return a channel you can naturally use stuff like async/merge

didibus06:07:47

The send API is expected to return a single value correct? The result of the operation? So logically it would return a channel where 1 element will be put on it and then the channel will be closed

✔️ 2
didibus06:07:39

That style of API is "promise like", so if you want to use core.async, the equivalent of a promise is a channel of 1 thing that closes immediatly. In fact, core.async has exactly such a thing: https://clojuredocs.org/clojure.core.async/promise-chan

✔️ 2
didibus06:07:39

But with core.async, you can model your async in other ways, you can use Processes instead of APIs, and Channels instead of API calls.

didibus06:07:18

So what you can do is have a send-process which is a infinite running process which sends each message added to some channel, and returns the response for it on some other channel. With the channels closing only once the process stops.

didibus07:07:28

(defn send [msg]
  (let [pc (promise-chan)]
    (go
      (try (<! (timeout 1000)) ;; Here I simulate something with timeout, but your sending logic would go here
           (>! pc :success)
           (catch Throwable e
             (>! pc :error))))
    pc))

didibus07:07:24

Now send returns a promise channel which will either have the result or the error

souenzzo10:07:39

@U0K064KQV I see this approach in many places. Thinking about "messages", feels wrong the same channel returns a success value or an error message. Im not saying that this do not work. I use this daily, but to use it, I need a new stack of macros, that auto check the return and re-trow it. In particular I use https://github.com/wilkerlucio/wsscode-async Im trying to do the exercise of use the core.async api as it planned to be used originally

didibus18:07:52

That's the nature of async though. The error has happened somewhere else at a different time. If you look at Clojure's future and promise it works the same. The future or promise won't throw to the caller, they just return a value.

didibus23:07:36

The other way, is to work with processes like I said, instead of a promise like interaction. Something like:

(defn make-send-process [request-ch response-ch]
  (go-loop []
    (if-let [request (<! request-ch)]
      (do
        (try (<! (timeout 1000))
             (>! response-ch {:request-id (:request-id request), :result :success})
             (catch Throwable e
               (>! response-ch {:request-id (:request-id request), :result :error})))
        (recur))
      (close! response-ch))))


(def send-request-ch (chan))
(def send-response-ch (chan))

(make-send-process send-request-ch send-response-ch)

;; Now you can send requests to the process to send something by putting
;; the request on the request channel for it:
(offer! send-request-ch {:request-id 123})
;;=> true

;; And you can get the results for the requests from the response channel:
(poll! send-response-ch)
;;=> {:request-id 123, :result :success}

;; Keep in mind these are asynchronous, so the requests can execute out of order
;; and the response could be out of order as well, that's why you need a request-id
;; if you care to track each request you made for their response.

;; Finally when you are done with the process you can simply close the request channel:
(close! send-request-ch)

didibus23:07:54

In this model, it makes more sense why async errors work as they do. The calls are asynchronous, you can't just try/catch it, because the code will execute somewhere else at an undefined time in the future, you'll long be beyond the try/catch for it.

didibus23:07:59

In both cases though, I see your problem, and it would be okay to pass a handler for errors as well, if the caller wants to dictate how to handle an error in the process

didibus02:07:22

(defn send [msg ex-handler]
  (let [pc (promise-chan)]
    (go
      (try (<! (timeout 1000))
           (throw (ex-info "" {}))
           (>! pc :success)
           (catch Throwable e
             (>! pc (ex-handler msg e)))))
    pc))

(def sendc (send :wtv (fn[msg err] :error)))

(poll! sendc)
;;=> :error
I don't know if this is any nicer to use than having <? like macros

didibus02:07:24

Though maybe it makes the processes approach cleaner:

(defn make-send-process [request-ch response-ch ex-handler]
  (go-loop []
    (if-let [request (<! request-ch)]
      (do
        (try (<! (timeout 1000))
             (throw (ex-info "" {}))
             (>! response-ch {:request-id (:request-id request), :result :success})
             (catch Throwable e
               (>! response-ch (ex-handler request e))))
        (recur))
      (close! response-ch))))

(def send-request-ch (chan))
(def send-response-ch (chan))

(make-send-process send-request-ch send-response-ch (fn[request err] {:error (:request-id request)}))

(offer! send-request-ch {:request-id 123})
(poll! send-response-ch)
;;=> {:error 123}

(close! send-request-ch)

didibus03:07:51

Oh, also, pipeline and co is basically the same as my make-send-process. And like what you mention, they do take optional close? and ex-handler as well. And normally I would use those over what I did. The difference is pipeline functions are serial in that if you send 5 requests to it, the results on the result channel will be in the same order. Also they take a transducer, but with map transducer, you can pass any request handling function you want to it. And it lets you control the concurrency level as well which is nice.

didibus03:07:05

Hum... well I thought mine would potentially process requests out of order, but in my tests I always get them in order as well, hum not sure.

didibus04:07:44

So something like this:

(defn make-pipeline-async-send
  [request-ch response-ch concurrency-amount ex-handler]
  (pipeline-async
   concurrency-amount
   response-ch
   (fn async [request response-ch]
     (thread
       (try (<!! (timeout 100))
            (>!! response-ch {:request-id (:request-id request), :result :success})
            (catch Throwable e
              (>!! response-ch (ex-handler request e)))
            (finally (close! response-ch)))))
   request-ch))