This page is not created by, affiliated with, or supported by Slack Technologies, Inc.
- # aws (1)
- # beginners (15)
- # boot (30)
- # cider (13)
- # cljsrn (16)
- # clojure (458)
- # clojure-dev (15)
- # clojure-france (131)
- # clojure-greece (124)
- # clojure-korea (2)
- # clojure-spec (42)
- # clojure-uk (115)
- # clojure-ukraine (1)
- # clojurescript (103)
- # component (18)
- # cryogen (1)
- # datomic (4)
- # dirac (3)
- # figwheel (1)
- # funcool (13)
- # hoplon (60)
- # luminus (2)
- # off-topic (2)
- # om (28)
- # onyx (45)
- # parinfer (28)
- # pedestal (1)
- # proton (23)
- # re-frame (18)
- # reagent (36)
- # ring (1)
- # ring-swagger (5)
- # untangled (13)
- # vim (9)
@tolitius Thanks for the link, might look into using this when we upgrade Kafka again. 🙂
sure, hope it works out, I really like it so far: nothing extra, just kafka Java API in a more concise form
For whatever reason, Clojure Kafka bindings have been tough to maintain over the years.
@tolitius one thing I’ve seen few examples of is how to do the consumer polling loop. I’d like to use core.async so it’s not blocking. I’m only really needing to have a consumer dump messages onto a channel to push down a websocket connect. Any suggestions on how to tackle that?
scala "don't care about backwards compatibility" culture.. the bindings needs to be rewritten almost completely for new(er) kafka versions..
@kingoftheknoll > I’d like to use core.async so it’s not blocking can you have consumer thread(s) be polling and put the messages into the channels? unless I misunderstand something. subscribing (what happens when polling) is tying a consumer thread (uses a file descriptor)
you can control a polling timeout if consuming gets "jumpy", but usually it just flows under the steady flow of new data (produced)
So would you create a new thread on each poll or could you reuse it. <- sorry rather new to this side of the java world coming from a python background.
i.e. if you have X partitions, you'd create X mod Y (could be just X) consumers (threads)
i.e. 100 partitions, you'd create 20 threads, each would get assigned to 5 partitions
simplifying a bit (with no error catching) this what a single consumer thread would do:
(let [consumer-records (kafka/poll consumer ms)] (process consumer-records) (kafka/commit-offsets! consumer))
you don't have to commit offsets yourself though.. I just prefer that ( gives me that
2PC feeling 🙂 )
process in your case would place this batch onto a core.async channel (or break them down and place them on a channel one by one, whatever works for you)
The flow makes sense but dumb question, how does one create the thread and manage the lot of them. I know core.async provides go blocks which utilize a thread pool. I’ve read that core.async also has a
thread function and there’s the java interop
Sounds like I am working on the same questions. Was just reading this, which might be helpful - https://github.com/clojure/core.async/wiki/Pub-Sub
@nrako I’m doing a websocket pub/sub with Aleph as a webserver. It uses manifold and it has a nice abstraction for an event bus. https://github.com/ztellman/manifold/blob/master/docs/stream.md you might find that useful.
Thanks for sending this. So I think I understand what you are doing with the Kafka piece and the websocket piece. Can you clarify what you are wanting to use core.async for?
@nrako not a problem. So big part of it is my limited knowledge of kafka consumer model. All the toy examples make perfect sense. But once I dig a bit deeper I keep running into things I’ve never done before like making threads for an ongoing process. Learning clojure, questions around threading typically get me directed towards core.async so that’s why my first inclination is to think managing the consumer polling with core.async is the right direction. But it seems that core.async's correct use in this example is passing messages to a channel from multiple while loops in gen’d threads. And that I need to create and manage those threads myself. Trouble is, not sure where to start learning about that. ¯\(ツ)/¯
@kingoftheknoll: you would create a threadpool (i.e. similar to this: https://github.com/tolitius/lasync or could be just Java's https://docs.oracle.com/javase/8/docs/api/java/util/concurrent/ExecutorService.html), and submit your consumer threads there. give me about an hour, I am in a meeting, I'll show you a couple of examples.
@kingoftheknoll: a little more code that you might need, but just to be explicit https://gist.github.com/tolitius/cc968a2adcc9dccc24cf15386fc44345
I can’t begin to say how helpful this is, thanks! So it seems the main reason for the ThreadFactory is having our own exception handler? Otherwise would it crash the program or does java have a concept of a global handler?
ThreadFactory there is not required, but gives a better exception logging + a default (i.e. last resort / unhandled exception) handler
Well, Distributed Masonry now has an office. We're keeping the Onyx in there. In case it, you know, runs out, and we need more.
His talk was about using spec to infrer relation between input data (kafa) and then using that informations for generations materializedviews in a postgres, Which could trigger new events for other materializedviews.
I had short talk with him afterward to discovery this cluster size, but at the moment the only ran it on a single node. What he really likede was the abstractions the onyx enabled.
@zamaterian Thanks, that's very kind of you. 🙂 Gotta help everyone to get up and going if this is going to be collectively sustainable.
Here is the rough draft of the first level 7 challenge on how to make aggregations, feedback welcome! https://github.com/drewverlee/learn-onyx/tree/add-aggregation-challanges. The plan is to create 2-3 more depending how many divergent concepts their are.
@michaeldrogalis is the hope to have the in browser tutorial replace the jvm based one?
Nah, we can cover the same content in a different way in the browser. Make things a little more interactive.