This page is not created by, affiliated with, or supported by Slack Technologies, Inc.
2020-10-09
Channels
- # announcements (1)
- # aws (2)
- # babashka (3)
- # beginners (39)
- # calva (6)
- # chlorine-clover (20)
- # cider (9)
- # clojure (105)
- # clojure-australia (1)
- # clojure-europe (64)
- # clojure-france (2)
- # clojure-gamedev (2)
- # clojure-nl (10)
- # clojure-provo (1)
- # clojure-uk (21)
- # clojuredesign-podcast (1)
- # clojurescript (77)
- # clojurewerkz (2)
- # clojutre (1)
- # community-development (4)
- # conjure (13)
- # data-science (6)
- # datascript (10)
- # datomic (37)
- # fulcro (33)
- # graphql (23)
- # jobs (1)
- # luminus (2)
- # malli (12)
- # meander (2)
- # off-topic (42)
- # pathom (5)
- # re-frame (5)
- # reitit (3)
- # remote-jobs (6)
- # reveal (38)
- # shadow-cljs (2)
- # spacemacs (14)
- # specmonstah (1)
- # sql (8)
- # tools-deps (2)
- # vim (8)
- # xtdb (22)
does anyone have any experience using redis pubsub from clojure in a "system" sort of way
but because it blocks I need to run it in a seperate thread and my logic for making sure errors aren't fatal feels sloppy
;; ----------------------------------------------------------------------------
(def post-notification-channel "ABC")
;; ----------------------------------------------------------------------------
(defn create-pub-sub-listener
"Creates a listener for pub-sub messages. Borrows a connection
from the pool and doesn't return it until it is shut down."
[pool]
(let [client ^Jedis (.getResource pool)
logic (proxy [JedisPubSub] []
(onMessage [channel message]
(log/info :channel channel :message message)))
executor ^ExecutorService (Executors/newSingleThreadExecutor)]
(.submit
executor
(reify Runnable
(run [_]
(while true
(try
(.subscribe client
logic
^{:tag "[Ljava.lang.String;"}
(into-array String [post-notification-channel]))
(catch InterruptedException e
(throw e))
(catch Exception e
(log/error ::jedis-pub-sub-error e
::action-to-take "retry subscription")))))))
{:borrowed-client client
:pub-sub logic
:executor executor}))
;; ----------------------------------------------------------------------------
(defn shutdown-pub-sub-listener!
[pub-sub-listener]
(let [{:keys [borrowed-client pub-sub executor]} pub-sub-listener]
(.shutdownNow executor)
(.unsubscribe pub-sub (into-array String [post-notification-channel]))
(.close borrowed-client)))
:cause "ERR only (P)SUBSCRIBE / (P)UNSUBSCRIBE / PING / QUIT allowed in this context"
:via
[{:type redis.clients.jedis.exceptions.JedisDataException
:message "ERR only (P)SUBSCRIBE / (P)UNSUBSCRIBE / PING / QUIT allowed in this context"
:at [redis.clients.jedis.Protocol processError "Protocol.java" 132]}]
:trace ...
right now I am trying to structure my app in a "system, but doing it all manually" way
you are trying to share the connection that the subscribe is getting messages on with your other code doing redis stuff
that error is what happens if you subscribe on a redis connection, and then on the same connection try to do something other than the listed operations
jedis comes with some connection pool stuff which might help you here, but I ended up writing our own because of the need to pull out connections for subscriptions like this
i thought that if i used .getResource
and just didn't return the connection to the pool that it would be fine
but that doesn't explain why messages are still being consumed when i shutdown the executor
/**
* Attempts to stop all actively executing tasks, halts the
* processing of waiting tasks, and returns a list of the tasks
* that were awaiting execution.
*
* <p>This method does not wait for actively executing tasks to
* terminate. Use {@link #awaitTermination awaitTermination} to
* do that.
*
* <p>There are no guarantees beyond best-effort attempts to stop
* processing actively executing tasks. For example, typical
* implementations will cancel via {@link Thread#interrupt}, so any
* task that fails to respond to interrupts may never terminate.
*
* @return list of tasks that never commenced execution
* @throws SecurityException if a security manager exists and
* shutting down this ExecutorService may manipulate
* threads that the caller is not permitted to modify
* because it does not hold {@link
* java.lang.RuntimePermission}{@code ("modifyThread")},
* or the security manager's {@code checkAccess} method
* denies access.
*/
List<Runnable> shutdownNow();
This method does not wait for actively executing tasks to
* terminate. Use {@link #awaitTermination awaitTermination} to
* do that.
it maybe that calling .close on the connection is what is getting you that error message
(defn shutdown-pub-sub-listener!
[pub-sub-listener]
(let [{:keys [borrowed-client pub-sub executor]} pub-sub-listener]
(.shutdownNow executor)
(.awaitTermination executor 2 TimeUnit/MINUTES)
(.unsubscribe pub-sub (into-array String [post-notification-channel]))
(.close borrowed-client)))