This page is not created by, affiliated with, or supported by Slack Technologies, Inc.
2021-10-06
Channels
- # babashka (19)
- # beginners (68)
- # calva (9)
- # cider (27)
- # clj-kondo (64)
- # clj-on-windows (2)
- # cljdoc (8)
- # clojure (11)
- # clojure-europe (58)
- # clojure-italy (1)
- # clojure-nl (23)
- # clojure-uk (5)
- # clojurescript (9)
- # cryogen (18)
- # cursive (14)
- # data-science (17)
- # emacs (6)
- # gorilla (6)
- # graphql (1)
- # gratitude (2)
- # holy-lambda (10)
- # introduce-yourself (1)
- # jackdaw (3)
- # jobs (1)
- # leiningen (2)
- # malli (3)
- # missionary (33)
- # off-topic (21)
- # pedestal (7)
- # polylith (8)
- # quil (3)
- # random (1)
- # releases (1)
- # remote-jobs (7)
- # shadow-cljs (18)
- # specter (1)
- # sql (8)
Figured I can return an Iterable which on next()
will call the consumer's poll()
then use it as an input to m/seed
.
Is that a good method or should I handle polling differently because it's blocking?
you may find inspiration here https://github.com/leonoel/missionary/issues/23
use (m/via m/blk ,,,)
for blocking operations, m/seed
is fine, you can also use loop
/`recur` with amb>
Not 100% when I should use via
here
(defn topic-iterable
[consumer]
(reify Iterable
(iterator [_]
(reify java.util.Iterator
(next [_] (.poll ^KafkaConsumer consumer 2000))
(hasNext [_] true)))))
(->> (m/seed (topic-iterable consumer))
(m/eduction (comp cat (map -value)))
(m/reduce (fn [_ x] (println 'consumed x)) nil)
m/?)
I also need to get used to the terminology and that some code is not run sequentially anymore
(defn poll [^KafkaConsumer consumer]
(m/via m/blk (.poll consumer Long/MAX_VALUE)))
(defn forever [task]
(m/ap (m/? (m/?> (m/seed (repeat task))))))
(->> (forever (poll consumer))
(m/eduction (comp cat (map -value)))
(m/reduce (fn [_ x] (println 'consumed x)) nil)
m/?)
Thank you
To make sure I understand the steps up the levels of abstraction:
• poll
returns a task which will always be called on the blocking thread pool. Is an "infinite" timeout necessary and correct?
• forever
creates an ambiguous process, ?>
forks on every new value. ?
parks the process until it is preempted from someone pulling it?
forever
might make a nice addition to the library.
timeout is not necessary because .poll
supports thread interruption. when the pipeline is cancelled the via
will interrupt the thread running the poll.
?
runs a single poll and returns the result of the poll when it's ready. Because it's the last expression of the ap
, it will make the flow ready for transfer, when the value is transferred the control flow will backtrack to ?>
, which will take the next item of the seed
and pass it again to ?
, polling the next result.
This is strictly equivalent :
(m/ap
(loop []
(m/amb> (m/? (poll consumer))
(recur))))
Hi Leonel,
apologies for resurrecting a year old thread.
While, your reply https://clojurians.slack.com/archives/CL85MBPEF/p1633890768036000?thread_ts=1633532647.034400&cid=CL85MBPEF works great for scenarios where next .poll
is called within
.
If consumer-record processing logic takes time (e.g. call to external service, retrial with delay due to transient failure), Kafka may trigger a rebalance of the consumer group.
To overcome this, Kafka consumer offers .pause
method, after which .poll
effectively just keeps the consumer-group alive with Kafka.
.poll
returns records only when .resume
is called on the consumer instance.
I am transitioning my app from Kafka streams to reactive Kafka consumer for this reason.
I have tested
• Alpakka Kafka Java DSL with interop
◦ Abstracts .pause
and .resume
in its DSL
◦ Had to battle my way through types and reification
◦ While it works great and is functional, the interop makes code is hard for people unaware of various Scala Functional interfaces (e.g. Creator)
◦ Have to pull in org.scala-lang/scala-library
apart from Alpakka kafka dependencies
◦ Very rough implementation https://gist.github.com/fr33m0nk/21c3d986485fa560f6644c550a5ec391
• Core.async
◦ This implementation was really imperative
I was wondering how to refactor your last post https://clojurians.slack.com/archives/CL85MBPEF/p1633890768036000?thread_ts=1633532647.034400&cid=CL85MBPEF to take .pause
& .resume
into account and let .poll
run continuously well within
May be I can use m/mbx
for signalling poll
fn to .resume
and then call .poll
:thinking_face:
Akka streams' backpressuring model is pretty close to missionary, it should be possible to write a similar functional stream adapter. The approach is explained here https://www.lightbend.com/blog/alpakka-kafka-flow-control-optimizations. If you already have an imperative POC I can take a look at it
Thanks Leonel. here is the https://gist.github.com/fr33m0nk/efdca7d5e77db9bc0cdcbf6a1ce38497#file-kafka-core_async-clj-L10 of a crude imperative implementation using core.async
Hi Leonel, took a stab at creating self contained Akka like Kafka actor using Promesa. Here's the https://gist.github.com/fr33m0nk/0958c61377d429e8690a8d7fe0ee9cc4 I will take a stab at creating something similar (and hopefully less imperative) using Missionary tomorrow. The https://github.com/leonoel/missionary/blob/master/src/missionary/core.cljc#L343 example looks particularly useful.
This problem is trickier than I thought, all interactions to kafka must be serialized and you also need to poll frequently enough. I'm puzzled, I'll dig deeper next week.
.pause
does not pause polling per say.
.poll
after .pause
just does not fetch records from kafka and keeps the consumer alive preventing expensive consumer group rebalancing. This ensures pull semantics from consumer of consumer records.
PS: .poll
still needs to be called on the consumer instance after .pause
is called.
When the consumer of consumer records is ready for more, it signals consumer object to call .resume
after which .poll
again starts to fetch consumer records from Kafka.
All reactive Kafka (stream) frameworks, Alpakka, project reactor use above.
Native Kafka streams unfortunately doesn't have this, not at least yet.
Yeah but why call pause at all? Why not just block calls to poll as long as there is back pressure? To prevent partition reassignment?
Yes, .poll
after .pause
is needed in order to prevent consumer group rebalancing. Consumer group rebalanicng is especially taxing on high throughput topics with 50+ partitions.
The only other way is to have a large enough
, which has its own implications.
.poll
after .pause
ensures respecting backpressure as well as preventing consumer-group rebalacing.
https://www.lightbend.com/blog/alpakka-kafka-flow-control-optimizations should give you a better idea.
I'm asking the other way around, let's imagine in the core async world, I have one thread with a consumer putting into a channel, another thread pulling off that channel and processing. If the downstream thread is slower than the consumer, backpressure will propagate and the upstream thread won't poll. Isn't that sufficient?
It has the same problem as that of push semantics.
Slow consumer slows down .poll
eventually and then triggers Kafka consumer group rebalance. In core.async or any CSP case this story applies. Native Kafka streams also suffers from this same problem, thus the rise of Reactive Kafka (stream) libraries.
The consumer in question is slow due to myriad of reasons including consumer record enrichment with API calls (with retries on failure), slow datomic write speeds (with retries on failure), etc.
Repeated .poll
after .pause
, let consumer process records at their pace while keeping consumer-group active with Kafka.
I hope above helps in clearing how .pause
contributes in enabling better back pressure management and ensuring pull semantics.
I guess an analogy with event-loop is a good example.
Consider .poll
loop as an event-loop, you don't want to pause it as pausing it triggers consumer-group rebalancing, however you do want to control the rate at which .poll
pulls records from Kafka so that consumers of consumer-records can process in their time.
.pause
and .resume
enable the control over the rate at which kafka consumer pulls consumer records from kafka.
Okay, so what you're saying is that not polling after a period triggers a rebalance?
Increasing
and tweaking other Kafka Stream config values was the first thing I did.
In my case, it's already high enough now for Kafka Streams and I still have rebalancing issue on high throughput topics.
High
has an impact on Kafka group management as internally Kafka uses this timeout for other things as well.
On a side note, increasing timeouts is rarely a solution to such problems. At least for my case, a tested solution already exists (e.g. Reactive Kafka).
Here is my attempt https://gist.github.com/leonoel/b4ec06fe2df30764217977918084fda9
I used plain atoms as a side-channel for backpressure and offset committing, mbx would work too but it's more difficult to get batching on top of it
Figured I can return an Iterable which on next()
will call the consumer's poll()
then use it as an input to m/seed
.
Is that a good method or should I handle polling differently because it's blocking?
Hi Leonel,
apologies for resurrecting a year old thread.
While, your reply https://clojurians.slack.com/archives/CL85MBPEF/p1633890768036000?thread_ts=1633532647.034400&cid=CL85MBPEF works great for scenarios where next .poll
is called within
.
If consumer-record processing logic takes time (e.g. call to external service, retrial with delay due to transient failure), Kafka may trigger a rebalance of the consumer group.
To overcome this, Kafka consumer offers .pause
method, after which .poll
effectively just keeps the consumer-group alive with Kafka.
.poll
returns records only when .resume
is called on the consumer instance.
I am transitioning my app from Kafka streams to reactive Kafka consumer for this reason.
I have tested
• Alpakka Kafka Java DSL with interop
◦ Abstracts .pause
and .resume
in its DSL
◦ Had to battle my way through types and reification
◦ While it works great and is functional, the interop makes code is hard for people unaware of various Scala Functional interfaces (e.g. Creator)
◦ Have to pull in org.scala-lang/scala-library
apart from Alpakka kafka dependencies
◦ Very rough implementation https://gist.github.com/fr33m0nk/21c3d986485fa560f6644c550a5ec391
• Core.async
◦ This implementation was really imperative
I was wondering how to refactor your last post https://clojurians.slack.com/archives/CL85MBPEF/p1633890768036000?thread_ts=1633532647.034400&cid=CL85MBPEF to take .pause
& .resume
into account and let .poll
run continuously well within
May be I can use m/mbx
for signalling poll
fn to .resume
and then call .poll
:thinking_face:
Here is my attempt https://gist.github.com/leonoel/b4ec06fe2df30764217977918084fda9