Fork me on GitHub

@kingoftheknoll: to your kafka clj bindings collection 🙂 I use it for a couple of projects, much simpler than others I used before. (have not used kinsky though)


@tolitius Thanks for the link, might look into using this when we upgrade Kafka again. 🙂


@tolitius thanks I’ll check it out


sure, hope it works out, I really like it so far: nothing extra, just kafka Java API in a more concise form


For whatever reason, Clojure Kafka bindings have been tough to maintain over the years.


@tolitius one thing I’ve seen few examples of is how to do the consumer polling loop. I’d like to use core.async so it’s not blocking. I’m only really needing to have a consumer dump messages onto a channel to push down a websocket connect. Any suggestions on how to tackle that?


scala "don't care about backwards compatibility" culture.. the bindings needs to be rewritten almost completely for new(er) kafka versions..


@kingoftheknoll > I’d like to use core.async so it’s not blocking can you have consumer thread(s) be polling and put the messages into the channels? unless I misunderstand something. subscribing (what happens when polling) is tying a consumer thread (uses a file descriptor)


you can control a polling timeout if consuming gets "jumpy", but usually it just flows under the steady flow of new data (produced)


So would you create a new thread on each poll or could you reuse it. <- sorry rather new to this side of the java world coming from a python background.


ah.. you reuse a consumer thread


i.e. if you have X partitions, you'd create X mod Y (could be just X) consumers (threads)


i.e. 100 partitions, you'd create 20 threads, each would get assigned to 5 partitions


simplifying a bit (with no error catching) this what a single consumer thread would do:

(let [consumer-records (kafka/poll consumer ms)]
        (process consumer-records)
        (kafka/commit-offsets! consumer))


you don't have to commit offsets yourself though.. I just prefer that ( gives me that 2PC feeling 🙂 )


process in your case would place this batch onto a core.async channel (or break them down and place them on a channel one by one, whatever works for you)


The flow makes sense but dumb question, how does one create the thread and manage the lot of them. I know core.async provides go blocks which utilize a thread pool. I’ve read that core.async also has a thread function and there’s the java interop Thread..


Sounds like I am working on the same questions. Was just reading this, which might be helpful -


but am interested to hear the answer to your question 🙂


@nrako I’m doing a websocket pub/sub with Aleph as a webserver. It uses manifold and it has a nice abstraction for an event bus. you might find that useful.


Thanks for sending this. So I think I understand what you are doing with the Kafka piece and the websocket piece. Can you clarify what you are wanting to use core.async for?


Nm. I think I got it. Didn't mean to interrupt your question thread.


@nrako not a problem. So big part of it is my limited knowledge of kafka consumer model. All the toy examples make perfect sense. But once I dig a bit deeper I keep running into things I’ve never done before like making threads for an ongoing process. Learning clojure, questions around threading typically get me directed towards core.async so that’s why my first inclination is to think managing the consumer polling with core.async is the right direction. But it seems that core.async's correct use in this example is passing messages to a channel from multiple while loops in gen’d threads. And that I need to create and manage those threads myself. Trouble is, not sure where to start learning about that. ¯\(ツ)


@kingoftheknoll: you would create a threadpool (i.e. similar to this: or could be just Java's, and submit your consumer threads there. give me about an hour, I am in a meeting, I'll show you a couple of examples.


@tolitius you sir are a gentleman and a scholar.


@kingoftheknoll: a little more code that you might need, but just to be explicit


Taking a look, just finished reading through the readme of lasync


I can’t begin to say how helpful this is, thanks! So it seems the main reason for the ThreadFactory is having our own exception handler? Otherwise would it crash the program or does java have a concept of a global handler?


Also AtomicInteger needs to be the name of a band


yea, ThreadFactory there is not required, but gives a better exception logging + a default (i.e. last resort / unhandled exception) handler


Well, Distributed Masonry now has an office. We're keeping the Onyx in there. In case it, you know, runs out, and we need more.


take care of onyx - and they will multiply like rabbits 🙂


Discovered in EuroClojure > at least one presentation using onyx in a slide 🙂


@zamaterian That's great to hear. Anything interesting?


His talk was about using spec to infrer relation between input data (kafa) and then using that informations for generations materializedviews in a postgres, Which could trigger new events for other materializedviews.


I had short talk with him afterward to discovery this cluster size, but at the moment the only ran it on a single node. What he really likede was the abstractions the onyx enabled.


Good timing for onyx-local-rt, then. That's cool.


@zamaterian Thanks, that's very kind of you. 🙂 Gotta help everyone to get up and going if this is going to be collectively sustainable.

Drew Verlee23:10:27

Here is the rough draft of the first level 7 challenge on how to make aggregations, feedback welcome! The plan is to create 2-3 more depending how many divergent concepts their are.


Thanks @drewverlee. If the number of challenges for that stage are small, this might be a good thing to slot under the in-browser Onyx tutorial that @colinhicks is working on.

Drew Verlee23:10:18

@michaeldrogalis is the hope to have the in browser tutorial replace the jvm based one?


Nah, we can cover the same content in a different way in the browser. Make things a little more interactive.

Drew Verlee23:10:00

I suppose i’m not sure what you meant by “slot under”.