Fork me on GitHub
#pedestal
<
2020-06-22
>
Ivar Refsdal12:06:30

Hi. I tried to create a simple async route, returning a channel for :body:

(defn respond-async [request]
  (let [c (async/chan)]
    (async/go-loop [[x & xs] (range 10)]
      (println "putting value..." x)
      (async/>! c x)
      (if xs
        (recur xs)
        (async/close! c)))
    {:status 200
     :body   c}))
This causes the following error message: No implementation of method: :default-content-type of protocol: #'io.pedestal.http.impl.servlet-interceptor/WriteableBody found for class: clojure.core.async.impl.channels.ManyToManyChannel Am I going about this the wrong way? Or did I miss something else? Edit: I'd like the response to be a (json/edn) array of [0 1 2 ... 9]. Full code and stacktrace here: https://gist.github.com/ivarref/b57cf40ea4739dacc4bbe7bbcaa8aec0 Thanks and kind regards.

souenzzo12:06:40

@ivar.refsdal without much attention to your async logic, pedestal.chain expect that context may be async, not the [:response :body] So you will need something like that:

(def respond-async
  {:name  ::respond-async
   :enter (fn [{:keys [request] :as ctx}]
            (let [c (async/chan)]
              (async/go-loop [[x & xs] (range 10)]
                (println "putting value..." x)
                (async/>! c x)
                (if xs
                  (recur xs)
                  (async/close! c)))
              (async/go
                (assoc ctx :response {:status 200
                                      :body   (async/<! c)}))))})

Ivar Refsdal12:06:49

Thanks @souenzzo That works, but only for a single value.

souenzzo12:06:53

Now it's more about #core-async then pedestal

(def respond-async
  {:name  ::respond-async
   :enter (fn [{:keys [request] :as ctx}]
            (let [c (async/chan)]
              (async/go-loop [[x & xs] (range 10)]
                (println "putting value..." x)
                (async/>! c x)
                (if xs
                  (recur xs)
                  (async/close! c)))
              (async/go
                (assoc ctx :response {:status 200
                                      :body   (async/<! (async/into [] c))}))))})


=> #'user/respond-async
(async/<!! ((:enter respond-async) {}))
putting value... 0
putting value... 1
putting value... 2
putting value... 3
putting value... 4
putting value... 5
putting value... 6
putting value... 7
putting value... 8
putting value... 9
=> {:response {:status 200, :body [0 1 2 3 4 5 6 7 8 9]}}
It works

souenzzo12:06:57

It's more idiomatic in pedestal turn "[1 2 3]" into json in another interceptor

["/async-seq"
 :get
 [body->json 
  respond-async]
 :route-name ::async-seq]
Where body->json is something like :leave (fn [ctx] (update-in ctx [:response :body] json/to-string))

Ivar Refsdal13:06:54

The reason I'm (trying) to use async is that I don't want all of the data to stay in memory at once. async/into and such will do that, no?

Ivar Refsdal13:06:43

The following does (almost) what I want:

(def respond-async
  {:name  ::respond-async
   :enter (fn [{:keys [request] :as ctx}]
            (let [c (async/chan)]
              (async/go-loop [[x & xs] (range 1e5)]
                (println "putting value..." x)
                (when (async/>! c (str x))
                  (if xs
                    (recur xs)
                    (async/close! c))))
              (assoc ctx :response {:status 200
                                    :body   (fn [^ServletOutputStream sout]
                                              (async/<!! ; this makes it work, but makes async pointless
                                                (async/go-loop [v (async/<! c)]
                                                  (if v
                                                    (do (.println sout (str v))
                                                        (recur (async/<! c)))
                                                    (do (println "closing!")
                                                        (.close sout))))))})))})

Ivar Refsdal13:06:26

That is: I'm not keeping all of the data in memory. Edit: It works/starts for for example (range 1e9)

Ivar Refsdal13:06:55

I figured that pedestal could/would do something like this out of the box

souenzzo13:06:19

https://gist.github.com/stathissideris/8659706 If you need to "stream" a thing that not fit in memory, you may need that lazyasyncseq thing + write direct to body (defn handler [req] (let [lazy-sequence (range 1eHUGE)] {:status 200 :body (fn [w] (json/write lazy-sequence w))}))

souenzzo13:06:31

AKA: you may not need async, just lazyness

Ivar Refsdal15:06:15

Thanks again @souenzzo! Can the lazyseq approach block while writing? If the writer is full or the consumer cannot keep up? This was the reason I wanted to go for an async solution: I thought that pedestal would check the underlying stream for me and juggle the writing to client and reading from my chan.

souenzzo15:06:39

write is a blocking operation. yes, you can "block waiting" + "block writing" Something like

(defn write-json-array-from-channel
  [^Writer w c]
  (.write w "[")
  (loop []
    (when-let [v (async/<!! c)]
      (json/write v w)
      (.write ", ")
      (recur)))
  (.write w "]")
  ;; I don't know much about `flush` / `close` but i think that 
  ;; you should do something after "finish" write.
  (.flush w)
  (.close w))
Probably there is already some json library that works/support core.async channels (AKA you don't need to write this function))

Ivar Refsdal15:06:16

Right... But I think this defeats the purpose of using async in the first place? That is: this code uses async/<!! and that may block the thread

Ivar Refsdal15:06:21

I see that javax.servlet.ServletOutputStream has setWriteListener:

/**
     * Instructs the <code>ServletOutputStream</code> to invoke the provided
     * {@link WriteListener} when it is possible to write
     *
     *
     * @param writeListener the {@link WriteListener} that should be notified
     *  when it's possible to write
     *
     * @exception IllegalStateException if one of the following conditions is true
     * <ul>
     * <li>the associated request is neither upgraded nor the async started
     * <li>setWriteListener is called more than once within the scope of the same request.
     * </ul>
     *
     * @throws NullPointerException if writeListener is null
     *
     * @since Servlet 3.1
     */
    public abstract void setWriteListener(WriteListener writeListener);

Ivar Refsdal15:06:33

I think that would be the correct approach to use. But that method is only called for immutant/undertow in pedestal as far as I can see: https://github.com/pedestal/pedestal/blob/aa71a3a630dd21861c0682eeeebec762cbf3f85c/immutant/src/io/pedestal/http/immutant/container.clj#L54

Ivar Refsdal15:06:43

It's my first time looking at async in pedestal, so apologies if I'm totally off.

souenzzo15:06:28

now you are way beyond what i know 🙂

Ivar Refsdal15:06:08

Thanks for the input, I would not have made it without it :-) Seems the "perfect" way is not available in pedestal yet though (as far as I can tell).

Ivar Refsdal15:06:47

I may poke further at it tomorrow

souenzzo15:06:31

Pedestal will make the simple case simple And will allow you to tweak/develop your complex case If your handler returns a simple HTML string, that done if your handler need to lazylly write in a specific format where the data comes from X Y Z, you can access the internals need to implement that.

souenzzo15:06:47

also take a look at "examples" directory in pedestal src 🙂

Ivar Refsdal08:06:24

I didn't find an async example in the "samples" directory that does a similar thing like the code above. The closest thing was a server-side-event server.

Ivar Refsdal08:06:21

Hi again @souenzzo If you are interested the following code solved the use case: https://github.com/pedestal/pedestal/issues/665#issuecomment-651652532 It's not perfect, but it works.

souenzzo13:06:59

I think that we are a bit confused about async and lazy things If you are sending a huge payload, your client probably will not handle it "as a json array", it will prefer "a steam of jsons"

Ivar Refsdal09:07:58

Then you are inventing a new content type. I don't think that is correct.