Fork me on GitHub
#aleph
<
2019-03-28
>
kachayev05:03:37

Well, technically anything that you run on JVM “consumes” thread in some meaning of the word “consume”. Even “non-blocking” calls. @igrishaev the code you’ve showed is actually doing some redundant work moving computation to manifold.executor/execute-pool. If you just remove d/future it should give you the same final result.

igrishaev09:03:27

one more example: when a URL is nil, I get an exception immediately, like this:

(-> (http/get nil) 
    (d/chain 
     (fn [result] :good)))
;; throws Execution error (URISyntaxException) at java.net.URI$Parser/fail
But with the following approach, I’ll capture any exception with d/catch:
(-> nil 
    
    (d/chain 
     #(http/get %) 
     (fn [result] :good)) 
    
    (d/catch 
        (fn [e] :error)))

igrishaev09:03:57

I wonder if the second variant is alright?

igrishaev09:03:45

so the idea is, I’d like to have only one entry point for all the the exceptions.

mccraigmccraig10:03:02

@igrishaev we do that with a macro - our real code would be confusing, 'cos it's got a lot of cross-platform stuff in, but something like this should do it

mccraigmccraig10:03:07

(defmacro safe-catch
  [pr-form error-handler]
  `(manifold.deferred/catch
       (try
         ~pr-form
         (catch Exception x#
             (manifold.deferred/error-deferred x#)))
       ~error-handler))

mccraigmccraig10:03:33

(def r (-> (manifold.deferred/success-deferred 1)
           (manifold.deferred/chain inc)
           (manifold.deferred/chain
            (fn [& args] (throw (ex-info "boo" {}))))
           (safe-catch identity)))

mccraigmccraig10:03:43

(def r (-> (throw (ex-info "boo" {}))
           (safe-catch identity)))

mccraigmccraig10:03:14

and you get a promise out the end even if the exception happens before the first callback

igrishaev10:03:07

yes, that’s approach is clear to me. I’m wondering what might be wrong in the second block of code I posted above?

igrishaev10:03:37

when you just pass a value and the main function is under a chain

alexyakushev10:03:26

Sorry, missed the threading.

mccraigmccraig10:03:08

@alexyakushev no, it throws immediately, but the error is caught and returned as an errored promise

igrishaev10:03:29

yes, so would be great to hear your thoughts on that

mccraigmccraig10:03:48

@igrishaev i like the "always use safe-catch rather than catch" approach because it's easy to find violations with ag... but chaining a simple value should also work fine

mccraigmccraig10:03:46

and when you get into more complex scenarios with some other promise which you need to compose on to... having a catch which works in all situations is nice

igrishaev10:03:40

ok I made my own version of your macro:

(defmacro with-catch
  [& body]
  `(try
     [email protected]
     (catch Throwable e#
       (d/error-deferred e#))))
so the chain body looks like:
(->

       (with-catch
         (http/get url http-options))

       (d/chain

        (fn [response]

          (let [{:keys [body headers]} response
                content-length (get headers "content-length")]
...

mccraigmccraig10:03:30

well that will work - but you will have an uncaught errored promise at the end... so you'll perhaps want a d/catch too ? i like the "you don't have to think about it anymore" aspect of the safe-catch macro approach, and the way it has the same shape as the d/catch fn and plays the same way with threading

igrishaev10:03:42

There is a d/catch at the bottom, I didn’t put it here

igrishaev10:03:09

I always finish the chain like with a common d/chain handler

mccraigmccraig10:03:19

with safe-catch then you can just replace the d/catch with the safe-catch and you get everything

igrishaev10:03:40

the whole pipeline in my example would be:

(->

 (with-catch
     (http/get url http-options))

 (d/chain

  (fn [response]
      (:body response)))
 
  (fn [body]
    (process-the-body body))

 (d/catch
  (fn [^Exception e]
    (reporn-error e))))

igrishaev10:03:21

And also, can anyone help me with d/loop? I’m a bit confused with it. What I’m going to do is to poll Kafka and feed messages from it to a stream. That stream should be processes by several workers. I’m not sure about how to build a d/loop logic.

igrishaev10:03:08

Inside d/loop, I’ve got a d/chain clause that polls a consumer. However, this approach is blocking.

igrishaev10:03:04

(->

 nil

 (d/chain

  (fn [& _]
      (consumer/poll! consumer poll-timeout))

  (fn [messages]
      (when-not (empty? messages)
                (s/put-all! stream messages))))

 (d/catch
  (fn [^Throwable e]
      (report-exeption e {:topic topic})))

 (d/chain
  (fn [& _]
      (d/recur))))

igrishaev10:03:39

But when I move polling into d/future, that works:

igrishaev10:03:05

(->

 (d/future
   (consumer/poll! consumer poll-timeout))

 (d/chain

  (fn [messages]
      (when-not (empty? messages)
                (s/put-all! stream messages))))

 (d/catch
  (fn [^Throwable e]
      (report-exeption e {:topic topic})))

 (d/chain
  (fn [& _]
      (d/recur))))

igrishaev10:03:58

So my question is, why does the first variant block the main thread?

igrishaev11:03:44

btw @mccraigmccraig thank you fore sharing your macro. Only now I’ve got what’s behind it, I’m a bit slow.

igrishaev14:03:10

anyone familiar with aleph-http stuff, could you take a look please? https://github.com/ztellman/aleph/issues/500

mccraigmccraig16:03:59

@igrishaev hmm - i've not seen this, and we often upload mobile video files which can get quite large - i wonder why your connection is being closed ?

igrishaev16:03:45

so am I, really. I even managed to reproduce that in repl

igrishaev16:03:58

first, get stream from http/get, then wrap it with AWS stream what counts the number of bytes consumed

igrishaev16:03:30

then read that wrapped stream in a loop into a byte-array

igrishaev16:03:50

and it time the exception will araise

mccraigmccraig16:03:08

one thing i remember from way back... we don't stream directly to S3, we go via a tmpfile first - and we had terrible trouble (with the sort of random truncation errors you are seeing) until we started using the :raw-stream? option and doing things async https://github.com/ztellman/aleph/blob/master/examples/src/aleph/examples/http.clj#L177

mccraigmccraig16:03:29

but we've got #yada in between us and raw #aleph so it may have been some other problem

hiredman16:03:27

if you haven't considered/are not aware of multipart uploads to s3, I would suggest checking those out. instead of having to deal with figuring out the size from a stream you can upload fixed size chunks

igrishaev17:03:23

@mccraigmccraig I’m a bit confused about how to turn a raw-stream into a intputStream that AWS needs?

mccraigmccraig17:03:40

@igrishaev i haven't tried - we just wrote the bytes from the raw-stream to a tmpfile and then gave the whole tmpfile to the s3 uploader

igrishaev17:03:30

@mccraigmccraig I see, but it won’t apply in my case. I’ve got to do the whole stuff in memory

igrishaev17:03:01

but I put a comment in the issue, I figured out with clj-http

igrishaev17:03:50

when you pass {:as :stream}, the connection won’t close until you consume it till the end

hiredman17:03:28

also, I am pretty sure using multipart uploading is the only way to get true streaming, I think (I am not 100% sure) the cognitect aws-api clients turn inputstreams into byte arrays (read them all in as one big one) when passed