Fork me on GitHub

Is trying to model an SQS queue with a core.async channel a good idea?


The obvious difference I can see is that SQS requires the consumer to explicitly delete the message after processing, so that requires a second “channel” of communication (could be a callback function, or could be just a “processed” channel where the consumer puts the message.


The only reason I bring core.async into this is to get an abstraction of sorts for testing.


we've done similar in the past and used a core.async channel to model a kafka topic for testing


Good to know — seems a little bit low level for my tastes, figuring out threads and errors etc. I wonder if there’s a library on top of core.async that has some good opinions on all this.


iirc we hid all the detail of interacting with the particular transport behind a protocol, and it was really happy-path testing rather than attempting to make the core.async transport fully equivalent

👍 1

Ah — so instead of trying to actually pull items off a SQS queue and putting them into a core.async channel, make a protocol with a few relevant functions, and implement it once for SQS for production, once for core.async for testing?


OK, I guess this frees me from the mental gymnastics in trying to merge the two worlds together. Making a protocol is actually quite easier than trying to wrap my head around all the possible ways I could screw up with core.async.


@orestis We have done something kind of similar to this as well. We have a generic multi-threaded "message processor". The message processor takes a function for fetching messages and a function for processing messages (and some other thread related stuff). The SQS wrapper implementation simply polls sqs for the fetch message fn. The process message function is passed a map of the sqs message and a function to delete the current message.


I think there is something interesting stuff to do directly extending the ReadPort and WritePort (the two halves of a channel) protocols to things like message queues. (off the dome never actually run code) most projects seem to avoid that kind of thing though and instead having something that reads from a message queue and writes to a channel.


implementing back pressure properly (without introducing unbounded queues) is the tricky bit


the bounds of sqs are pretty large


But yeah, there is a bit of subtly, which is maybe part of the reason most people don't do it. Another thing is determining when to ack a message, the example code above acks the message when it is given to whoever is waiting for a message, but depending on your system you might want to withhold acks until some downstream process has completed


(defprotocol Dispatcher
  (receive [this] "Receive a message, blocking.")
  (delete [this msg] "Delete a message, indicate successful processing.")
  (msg-body [this msg] "Given an opaque message, retrieve the message body."))

(defn SQSDispatcher [client queue-url]
  (reify Dispatcher
    (receive [_] (sqs/receive-one-message client queue-url {}))
    (delete [_ msg] (sqs/delete-message client queue-url msg))
    (msg-body [_ msg] (:Body msg))))

(defn observe-dispatcher
  "Blocks, observing a `Dispatcher` until the atom `stop` is reset to false.
  Every message is processed through `process-fn`, errors are passed to `err-fn`
  and othwise ignored."
  [dispatcher process-fn err-fn stop]
  (loop []
    (when-not @stop
      (when-let [msg (receive dispatcher)]
          (process-fn (msg-body dispatcher msg))
          (delete dispatcher msg)
          (catch Exception e
            (err-fn e))))


Here’s what I have so far. No core.async machinery at this level, but the receive call is blocking with timeout.


process-fn should throw if it can’t process the message, otherwise the message is ack’ed. Not sure if this is generic enough, but for this use case (email sending) seems ok to me.


I need to figure out how to deal with threads etc, probably some ExecutorService — any pointers on resources there? Would like to have a small bounded threadpool to deal with more than one msg at a time.


Some other out-of-bounds system would scale up in the case of a big SQS backlog, this code doesn’t care much.

Travis Martin19:11:14

Does anybody know the rationale for the core.async/pipeline behavior change in the following commit This seems to contradict the documentation for pipeline vs pipeline-blocking.


it sounds like @alexmiller's position is that the previous behavior was in error

Travis Martin19:11:43

ok, thanks @hiredman! that link answered my implicit question as well, which was what in particular was erroneous about the old implementation


by the by, I am currently writing some core.async docs (better 6 years late, than never, right?). starting to look unlikely I'm going to finish pre-Thanksgiving, but something to look for soon.

👍 10
thanks3 1
hammock 1

the first is from Rich's original announcement, the latter is something I wrote long ago that's embedded in the repo but hard to find (have not revisited the content, just reformatted). The reference is new.


So I'm having trouble reproducing a minimal example of this. This is in clojurescript. The following code works fine. But I've been debuging a real, more complicated example, but which seems to be the same code in essence, and can't figure this out:

(def ch1 (a/chan 1))
(def m (a/mult ch1))
(def ch2 (a/tap m (a/chan 1 (comp (filter odd?)
                                  ;;(take 1)

(doseq [x (range 10)]
  (a/go (a/>! ch1 x)))
  (when-let [x (a/
This works fine and prints 1. In the real code however, nothing prints as it is, unless remove the a/close! line. Also, if I remove this line it works, but if I (equivalently?) uncomment the (take 1) transducer, again it fails, but changing it to (take 2) works fine. Any tips? I've been staring at this wayyyy too long


I'm just confused how closing the chan after it's already been taken from could possibly prevent it from doing the take in the first place


the close of ch2 is allowed to happen before the put on ch1


I guess that doesn't really y explain anything here


wait so ch2 can close before the put on ch1, even though it's after the take in the go block?


if there was anything to read on ch2, sure


but in that example, something would first be printed then right?


also the put of 1 could happen before the put of 0 (not likely, but allowed)


yes, a print would happen before the close, so that alone doesn't explain anything


yeah. It's probably something else i'm doing that the example above doesn't include. Just feel the need to brainstorm. (the go block is in fact basically identical)


also, one should usually use put! instead of >! inside go


(if that is all the go is doing)


do you happen to know if it's possible for clojurescript go blocks to cause a deadlock?


Or is it the thread pool in clj that makes that a problem and cljs just "rewrites callbacks"?


in cljs the scheduling will be more predictable iirc, and a blocking op in a go block will block the entire vm


js has a threadpool, the pool has 1 thread, and everything happens on it


so if you are doing something on the js main thread, and never yielding, no core.async work (or much of anything else you would want js to do) will happen


a channel is a meeting place, where two threads of control can meet and exchange values


if one thread of control is at a channel to exchange a value, but another never meets it, it will be there indefinitely, which looks similar to deadlock