Fork me on GitHub
#core-async
<
2020-04-29
>
abdullahibra19:04:09

I have general question, about consuming messages from Kafka topic, is it better to collect all messages into async channel and handle it by pure methods which take channel and process it and return new output channel and so on or what is the best approach to model that ?

noisesmith19:04:54

I responded already in the other channel, but more specifically about core.async here - what specifically would core.async give you when processing kafka- what feature are you looking for?

nikolavojicic19:04:49

I'd say use transducers instead of "pure" methods which take channel. Attach transducers to channels or pipelines.

noisesmith20:04:54

also you'll need to decouple the interaction between kafka and core.async, either can introduce lags that break the other

wilkerlucio20:04:58

hello, I just hit an issue while trying to use loop + try/catch, this is a code that breaks:

(defn x [some-chan]
  (go
    (loop []
      (try
        (when (<! some-chan)
          (recur))
        (catch :default e
          (recur))))))

wilkerlucio20:04:12

(reduced to make a point), but this code doesn't compile

wilkerlucio20:04:24

it does the error

------ ERROR -------------------------------------------------------------------
 File: /Users/wilkerlucio/Development/relemma/src/cljs/relemma/shared/logic/feature_flags.cljs:57:3
--------------------------------------------------------------------------------
  54 |     unlisten-cb))
  55 | 
  56 | (defn x [some-chan]
  57 |   (go
---------^----------------------------------------------------------------------
Encountered error when macroexpanding cljs.core/cond.
IllegalArgumentException: No implementation of method: :emit-instruction of protocol: #'cljs.core.async.impl.ioc-macros/IEmittableInstruction found for class: cljs.core.async.impl.ioc_macros.Jmp
	clojure.core/-cache-protocol-fn (core_deftype.clj:583)
	clojure.core/-cache-protocol-fn (core_deftype.clj:575)

wilkerlucio20:04:32

is there a way around it?

dpsutton20:04:31

you can recur outside of the try

dpsutton20:04:34

(defn x [some-chan]
  (go
    (loop []
      (try
        (a/<! some-chan))
      (recur))))

wilkerlucio20:04:03

yeah, but then when the channel is closed this would loop like crazy, right?

noisesmith20:04:52

that behavior would be shared by the original

wilkerlucio20:04:54

not really, the original has a when for no errors, and no recur in that case

dpsutton20:04:07

well my expanded not pasted version does too 🙂

wilkerlucio20:04:23

oh, you guys are right, because of the reduced code

noisesmith20:04:28

@wilkerlucio I don't see a when, and both the try and the catch recur to the same place

wilkerlucio20:04:31

but on the original I had, there was a when aroudn

dpsutton20:04:34

no worries.

wilkerlucio20:04:48

snippet updated

dpsutton20:04:54

you could just do an if-let and catch _ e nil

noisesmith20:04:55

err, they would recur to the same place if recur across try was valid

noisesmith20:04:03

which it isn't

wilkerlucio20:04:04

got it working with this:

(defn x [some-chan]
  (go
    (let [continue* (atom true)]
     (loop []
       (try
         (if (<! some-chan)
           (do-something)
           (reset! continue* false))
         (catch :default e
           (handle-error)))
       (if @continue*
         (recur))))))

noisesmith20:04:01

for that sort of thing, I like promise or delay, which convey the extra semantic that it's a one way switch

wilkerlucio20:04:19

in my case its a streaming thing

wilkerlucio20:04:36

agreed for single hit a promise would be great

noisesmith20:04:43

right, but nothing ever changes the atom to true - it's either not-done, or done

wilkerlucio20:04:59

it does, (snippet updated)

wilkerlucio20:04:09

when channel comes with nil, it stops

noisesmith20:04:40

I still don't see a case where it's assigned to true - you create a new one

wilkerlucio20:04:13

not sure if I get it, the point is just that one message can come as an error, in this case I want to report and wait for the next message

wilkerlucio20:04:28

the continue starts with true, but if the channel closes its set to false, and recur stops

noisesmith20:04:53

the point is that you have created a one-way-switch - once created it only changes once, from true to false

wilkerlucio20:04:09

yeah, that's when the channel gets closed, so nothing more to get from it

wilkerlucio20:04:30

after closing there is nothing else to do

noisesmith20:04:44

I'm not saying the atom is incorrect, I'm saying delay / promise are more specialized, and they are less error prone and more clear to a reader for that reason

noisesmith20:04:01

it's a style suggestion not a bug fix

noisesmith20:04:33

principle of least power

wilkerlucio20:04:41

but how I could use that since this is a stream channel?

wilkerlucio20:04:47

I expect multiple messages from it

noisesmith20:04:35

it literally replaces the atom, you create it in the same place, instead of using @ as the check, you use realized?, and instead of reset! you use deliver or force

wilkerlucio20:04:33

ah, gotcha, also I'm on CLJS land, you still thing would be better to use something else than atom there?

noisesmith20:04:37

(let [done? (promise)] (if (not (realized? done)) (try ... (catch ... (deliver done? :error-state)))

noisesmith20:04:53

oh, I'm forgetting cljs might not have those constructs

noisesmith20:04:45

cljs appears to have delay but not promise in the random cljs repl I checked

noisesmith20:04:54

then you could use (done (delay true)) then later (force done)

ghadi20:04:20

@wilkerlucio recur doesn't work across try boundaries, whether core.async or not

ghadi20:04:24

user=> (loop [] (try (recur) (catch Exception e)))
Syntax error (UnsupportedOperationException) compiling recur at (REPL:1:15).
Cannot recur across try