Fork me on GitHub
#missionary
<
2021-10-06
>
Ben Sless15:10:07

Anyone ever tried using Kafka with missionary?

Ben Sless07:10:29

Figured I can return an Iterable which on next() will call the consumer's poll() then use it as an input to m/seed. Is that a good method or should I handle polling differently because it's blocking?

leonoel07:10:29

use (m/via m/blk ,,,) for blocking operations, m/seed is fine, you can also use loop/`recur` with amb>

Ben Sless08:10:17

Not 100% when I should use via here

(defn topic-iterable
  [consumer]
  (reify Iterable
    (iterator [_]
      (reify java.util.Iterator
        (next [_] (.poll ^KafkaConsumer consumer 2000))
        (hasNext [_] true)))))

  (->> (m/seed (topic-iterable consumer))
       (m/eduction (comp cat (map -value)))
       (m/reduce (fn [_ x] (println 'consumed x)) nil)
       m/?)

Ben Sless08:10:51

I also need to get used to the terminology and that some code is not run sequentially anymore

leonoel18:10:48

(defn poll [^KafkaConsumer consumer]
  (m/via m/blk (.poll consumer Long/MAX_VALUE)))

(defn forever [task]
  (m/ap (m/? (m/?> (m/seed (repeat task))))))

(->> (forever (poll consumer))
  (m/eduction (comp cat (map -value)))
  (m/reduce (fn [_ x] (println 'consumed x)) nil)
  m/?)

Ben Sless18:10:47

Thank you To make sure I understand the steps up the levels of abstraction: • poll returns a task which will always be called on the blocking thread pool. Is an "infinite" timeout necessary and correct? • forever creates an ambiguous process, ?> forks on every new value. ? parks the process until it is preempted from someone pulling it? forever might make a nice addition to the library.

leonoel19:10:37

timeout is not necessary because .poll supports thread interruption. when the pipeline is cancelled the via will interrupt the thread running the poll.

leonoel19:10:04

? runs a single poll and returns the result of the poll when it's ready. Because it's the last expression of the ap, it will make the flow ready for transfer, when the value is transferred the control flow will backtrack to ?>, which will take the next item of the seed and pass it again to ?, polling the next result.

leonoel19:10:03

This is strictly equivalent :

(m/ap
  (loop []
    (m/amb> (m/? (poll consumer))
      (recur))))

Prashant19:12:25

Hi Leonel, apologies for resurrecting a year old thread. While, your reply https://clojurians.slack.com/archives/CL85MBPEF/p1633890768036000?thread_ts=1633532647.034400&amp;cid=CL85MBPEF works great for scenarios where next .poll is called within . If consumer-record processing logic takes time (e.g. call to external service, retrial with delay due to transient failure), Kafka may trigger a rebalance of the consumer group. To overcome this, Kafka consumer offers .pause method, after which .poll effectively just keeps the consumer-group alive with Kafka. .poll returns records only when .resume is called on the consumer instance. I am transitioning my app from Kafka streams to reactive Kafka consumer for this reason. I have tested • Alpakka Kafka Java DSL with interop ◦ Abstracts .pause and .resume in its DSL ◦ Had to battle my way through types and reification ◦ While it works great and is functional, the interop makes code is hard for people unaware of various Scala Functional interfaces (e.g. Creator) ◦ Have to pull in org.scala-lang/scala-library apart from Alpakka kafka dependencies ◦ Very rough implementation https://gist.github.com/fr33m0nk/21c3d986485fa560f6644c550a5ec391 • Core.async ◦ This implementation was really imperative I was wondering how to refactor your last post https://clojurians.slack.com/archives/CL85MBPEF/p1633890768036000?thread_ts=1633532647.034400&amp;cid=CL85MBPEF to take .pause & .resume into account and let .poll run continuously well within May be I can use m/mbx for signalling poll fn to .resume and then call .poll :thinking_face:

leonoel21:12:58

Akka streams' backpressuring model is pretty close to missionary, it should be possible to write a similar functional stream adapter. The approach is explained here https://www.lightbend.com/blog/alpakka-kafka-flow-control-optimizations. If you already have an imperative POC I can take a look at it

Prashant05:12:29

Thanks Leonel. here is the https://gist.github.com/fr33m0nk/efdca7d5e77db9bc0cdcbf6a1ce38497#file-kafka-core_async-clj-L10 of a crude imperative implementation using core.async

👀 1
Prashant14:12:02

Hi Leonel, took a stab at creating self contained Akka like Kafka actor using Promesa. Here's the https://gist.github.com/fr33m0nk/0958c61377d429e8690a8d7fe0ee9cc4 I will take a stab at creating something similar (and hopefully less imperative) using Missionary tomorrow. The https://github.com/leonoel/missionary/blob/master/src/missionary/core.cljc#L343 example looks particularly useful.

leonoel19:12:24

This problem is trickier than I thought, all interactions to kafka must be serialized and you also need to poll frequently enough. I'm puzzled, I'll dig deeper next week.

Ben Sless10:12:07

Why is pausing required? Why not just poll repeatedly?

Prashant10:12:21

.pause does not pause polling per say. .poll after .pause just does not fetch records from kafka and keeps the consumer alive preventing expensive consumer group rebalancing. This ensures pull semantics from consumer of consumer records. PS: .poll still needs to be called on the consumer instance after .pause is called. When the consumer of consumer records is ready for more, it signals consumer object to call .resume after which .poll again starts to fetch consumer records from Kafka. All reactive Kafka (stream) frameworks, Alpakka, project reactor use above. Native Kafka streams unfortunately doesn't have this, not at least yet.

Ben Sless10:12:01

Yeah but why call pause at all? Why not just block calls to poll as long as there is back pressure? To prevent partition reassignment?

Prashant10:12:52

Yes, .poll after .pause is needed in order to prevent consumer group rebalancing. Consumer group rebalanicng is especially taxing on high throughput topics with 50+ partitions. The only other way is to have a large enough , which has its own implications.

Prashant10:12:04

.poll after .pause ensures respecting backpressure as well as preventing consumer-group rebalacing.

Ben Sless10:12:57

I'm asking the other way around, let's imagine in the core async world, I have one thread with a consumer putting into a channel, another thread pulling off that channel and processing. If the downstream thread is slower than the consumer, backpressure will propagate and the upstream thread won't poll. Isn't that sufficient?

Ben Sless10:12:31

The question is why is pause even necessary for back pressure

Prashant10:12:02

It has the same problem as that of push semantics. Slow consumer slows down .poll eventually and then triggers Kafka consumer group rebalance. In core.async or any CSP case this story applies. Native Kafka streams also suffers from this same problem, thus the rise of Reactive Kafka (stream) libraries. The consumer in question is slow due to myriad of reasons including consumer record enrichment with API calls (with retries on failure), slow datomic write speeds (with retries on failure), etc. Repeated .poll after .pause, let consumer process records at their pace while keeping consumer-group active with Kafka. I hope above helps in clearing how .pause contributes in enabling better back pressure management and ensuring pull semantics.

Prashant10:12:02

I guess an analogy with event-loop is a good example. Consider .poll loop as an event-loop, you don't want to pause it as pausing it triggers consumer-group rebalancing, however you do want to control the rate at which .poll pulls records from Kafka so that consumers of consumer-records can process in their time. .pause and .resume enable the control over the rate at which kafka consumer pulls consumer records from kafka.

Ben Sless11:12:41

Okay, so what you're saying is that not polling after a period triggers a rebalance?

Prashant11:12:39

Yes, and that period is controlled by configuration of Kafka consumer/stream.

Ben Sless11:12:23

Okay, so in theory, why not set it to a high number instead of pausing?

Prashant12:12:19

Increasing and tweaking other Kafka Stream config values was the first thing I did. In my case, it's already high enough now for Kafka Streams and I still have rebalancing issue on high throughput topics. High has an impact on Kafka group management as internally Kafka uses this timeout for other things as well. On a side note, increasing timeouts is rarely a solution to such problems. At least for my case, a tested solution already exists (e.g. Reactive Kafka).

leonoel10:12:05

I used plain atoms as a side-channel for backpressure and offset committing, mbx would work too but it's more difficult to get batching on top of it

👍 2
Prashant04:01:04

Thank you Leonel 🙂 🙏:skin-tone-2:

Ben Sless07:10:29

Figured I can return an Iterable which on next() will call the consumer's poll() then use it as an input to m/seed. Is that a good method or should I handle polling differently because it's blocking?

Prashant19:12:25

Hi Leonel, apologies for resurrecting a year old thread. While, your reply https://clojurians.slack.com/archives/CL85MBPEF/p1633890768036000?thread_ts=1633532647.034400&amp;cid=CL85MBPEF works great for scenarios where next .poll is called within . If consumer-record processing logic takes time (e.g. call to external service, retrial with delay due to transient failure), Kafka may trigger a rebalance of the consumer group. To overcome this, Kafka consumer offers .pause method, after which .poll effectively just keeps the consumer-group alive with Kafka. .poll returns records only when .resume is called on the consumer instance. I am transitioning my app from Kafka streams to reactive Kafka consumer for this reason. I have tested • Alpakka Kafka Java DSL with interop ◦ Abstracts .pause and .resume in its DSL ◦ Had to battle my way through types and reification ◦ While it works great and is functional, the interop makes code is hard for people unaware of various Scala Functional interfaces (e.g. Creator) ◦ Have to pull in org.scala-lang/scala-library apart from Alpakka kafka dependencies ◦ Very rough implementation https://gist.github.com/fr33m0nk/21c3d986485fa560f6644c550a5ec391 • Core.async ◦ This implementation was really imperative I was wondering how to refactor your last post https://clojurians.slack.com/archives/CL85MBPEF/p1633890768036000?thread_ts=1633532647.034400&amp;cid=CL85MBPEF to take .pause & .resume into account and let .poll run continuously well within May be I can use m/mbx for signalling poll fn to .resume and then call .poll :thinking_face: