Fork me on GitHub
#aleph
<
2020-03-13
>
ramonp.rios08:03:45

Hello everyone. Does someone already tried to handle erros while your using stream/consume ??

mccraigmccraig08:03:02

what do you want to happen when there are errors in your callback @ramonp.rios ?

ramonp.rios08:03:17

Let me give some code

ramonp.rios08:03:22

and then explain

ramonp.rios08:03:16

(let [handler (fn [msg]
                    (log/info (str "creating customer into " http-url " : " msg))
                    (->> @(add-customer-ror http-url msg)
                         (log/info "returned from server ")))
          stream (m/topic->stream conn topic)]
      (ms/consume handler stream))

ramonp.rios08:03:16

I'm connecting to activemq and getting messages from topics. Every time i send a message , i get this message and save into a http endpoint

ramonp.rios08:03:51

I test that, when i do not connect or got some 404 or another error in http, my function will handle it

ramonp.rios08:03:13

Sending a message to another topic and/or restarting to consume

ramonp.rios08:03:03

I read that catch is my function, but not useful because it's with deferred and not stream.

mccraigmccraig08:03:08

if you use consume then the only error will be in your callback, not on the stream - you can use deferred/catch or just plain catch there (since you are derefing what i presume is a deferred from add-customer-ror)

mccraigmccraig08:03:50

i would generally take a different approach - rather than using consume i would map the source stream with a fn which calls add-customer-ror , catches any error and stashes it in a marker record or variant etc... then you can [1] buffer the mapped stream to control your concurrency hitting the API, [2] get backpressure and [3] process the mapped stream to do anything you want with normal and error responses

mccraigmccraig08:03:03

(the final piece of the puzzle being to stream/reduce the mapped stream, to force consumption)

mccraigmccraig08:03:59

oh, and of course stream/realize-each to turn any deferred values on a stream into plain values

ramonp.rios10:03:44

Thank you @mccraigmccraig

ramonp.rios10:03:54

I use it and really supply my needs

ramonp.rios10:03:14

I put a catch in my handler and now it works as expected