Fork me on GitHub
Ramon Rios08:03:45

Hello everyone. Does someone already tried to handle erros while your using stream/consume ??


what do you want to happen when there are errors in your callback @ramonp.rios ?

Ramon Rios08:03:17

Let me give some code

Ramon Rios08:03:22

and then explain

Ramon Rios08:03:16

(let [handler (fn [msg]
                    (log/info (str "creating customer into " http-url " : " msg))
                    (->> @(add-customer-ror http-url msg)
                         (log/info "returned from server ")))
          stream (m/topic->stream conn topic)]
      (ms/consume handler stream))

Ramon Rios08:03:16

I'm connecting to activemq and getting messages from topics. Every time i send a message , i get this message and save into a http endpoint

Ramon Rios08:03:51

I test that, when i do not connect or got some 404 or another error in http, my function will handle it

Ramon Rios08:03:13

Sending a message to another topic and/or restarting to consume

Ramon Rios08:03:03

I read that catch is my function, but not useful because it's with deferred and not stream.


if you use consume then the only error will be in your callback, not on the stream - you can use deferred/catch or just plain catch there (since you are derefing what i presume is a deferred from add-customer-ror)


i would generally take a different approach - rather than using consume i would map the source stream with a fn which calls add-customer-ror , catches any error and stashes it in a marker record or variant etc... then you can [1] buffer the mapped stream to control your concurrency hitting the API, [2] get backpressure and [3] process the mapped stream to do anything you want with normal and error responses


(the final piece of the puzzle being to stream/reduce the mapped stream, to force consumption)


oh, and of course stream/realize-each to turn any deferred values on a stream into plain values

Ramon Rios10:03:54

I use it and really supply my needs

Ramon Rios10:03:14

I put a catch in my handler and now it works as expected