Fork me on GitHub
#aleph
<
2017-07-19
>
nha16:07:50

Can someone tell me why I need to call realize-each here? I suppose it means that my requests are all realized and kept into memory until further processing, which is something I am trying to avoid.

(s/stream->seq
  (s/map (fn [res]
           (println (type res)) ;;  manifold.deferred.Deferred if not realize-each
           (select-keys res [:request-time :connection-time :status]))
         (s/realize-each ;; why do I need this?
           (s/map (fn [u]
                    (println "HTTP REQ TO " u)
                    (aleph.http/get u {:throw-exceptions? false}))
                  (s/buffer 50
                            (s/->source
                              [""
                               ""
                               ""]))))))
Also, I note that the following works:
@(d/chain (aleph.http/get "")
          #(select-keys % [:request-time :status]))

dm316:07:57

you have a stream of deferreds

dm316:07:44

so you need to block/attach a callback in order to get to its contents

nha16:07:29

so realize-each is the blocking approach right? How do I use a callback?

dm316:07:24

you need to decide whether to parallelize a batch

nha16:07:36

It looks like https://github.com/ztellman/manifold/blob/master/docs/stream.md has a hint:

The value returned by the callback for connect-via provides backpressure - if a deferred value is returned, further messages will not be passed in until the deferred value is realized.
but I don’t understand it

nha16:07:01

I actually want to throttle it

nha16:07:12

That looks closer to what I want:

(time (doall (s/stream->seq
               (s/map (fn [res]
                        (select-keys res [:request-time :connection-time :status]))
                      (s/realize-each
                        (s/map (fn [u]
                                 (println "HTTP REQ TO " u)
                                 (aleph.http/get u {:throw-exceptions? false}))
                               (s/throttle 1
                                           (s/buffer 50
                                                     (s/->source
                                                       (map #(str "" %) (range 1 10))
                                                       )))))))))

nha16:07:30

Actually you are right, I want a mix of parallelize and throttle. How do I parallelize it?

dm318:07:08

@nha you can use deferred/loop to do it kinda manually

dm318:07:47

or pipe the batched stream into a (deferred/future-with my-single-thread-loop ..)