Fork me on GitHub
#off-topic
<
2020-10-09
>
emccue23:10:43

does anyone have any experience using redis pubsub from clojure in a "system" sort of way

emccue23:10:12

Jedis has a subscribe method that blocks and keeps taking messages from redis

emccue23:10:42

but because it blocks I need to run it in a seperate thread and my logic for making sure errors aren't fatal feels sloppy

emccue23:10:56

and also my "shutdown" function doesn't really work

emccue23:10:20

;; ----------------------------------------------------------------------------
(def post-notification-channel "ABC")

;; ----------------------------------------------------------------------------
(defn create-pub-sub-listener
  "Creates a listener for pub-sub messages. Borrows a connection
  from the pool and doesn't return it until it is shut down."
  [pool]
  (let [client ^Jedis (.getResource pool)
        logic (proxy [JedisPubSub] []
                (onMessage [channel message]
                  (log/info :channel channel :message message)))
        executor ^ExecutorService (Executors/newSingleThreadExecutor)]
    (.submit
      executor
      (reify Runnable
        (run [_]
          (while true
            (try
              (.subscribe client
                          logic
                          ^{:tag "[Ljava.lang.String;"}
                          (into-array String [post-notification-channel]))
              (catch InterruptedException e
                (throw e))
              (catch Exception e
                (log/error ::jedis-pub-sub-error e
                           ::action-to-take "retry subscription")))))))
    {:borrowed-client client
     :pub-sub logic
     :executor executor}))

;; ----------------------------------------------------------------------------
(defn shutdown-pub-sub-listener!
  [pub-sub-listener]
  (let [{:keys [borrowed-client pub-sub executor]} pub-sub-listener]
    (.shutdownNow executor)
    (.unsubscribe pub-sub (into-array String [post-notification-channel]))
    (.close borrowed-client)))

emccue23:10:55

after shutdown if i send a message to the channel i get

emccue23:10:59

:cause "ERR only (P)SUBSCRIBE / (P)UNSUBSCRIBE / PING / QUIT allowed in this context"
 :via
 [{:type redis.clients.jedis.exceptions.JedisDataException
   :message "ERR only (P)SUBSCRIBE / (P)UNSUBSCRIBE / PING / QUIT allowed in this context"
   :at [redis.clients.jedis.Protocol processError "Protocol.java" 132]}]
 :trace ...

emccue23:10:14

which means its still consuming somehow

emccue23:10:44

right now I am trying to structure my app in a "system, but doing it all manually" way

emccue23:10:03

so making sure i can start and stop the mechanism is important

hiredman23:10:41

the jedis connection is not multi-thread safe

hiredman23:10:09

you are trying to share the connection that the subscribe is getting messages on with your other code doing redis stuff

hiredman23:10:00

that error is what happens if you subscribe on a redis connection, and then on the same connection try to do something other than the listed operations

hiredman23:10:56

jedis comes with some connection pool stuff which might help you here, but I ended up writing our own because of the need to pull out connections for subscriptions like this

emccue23:10:02

i thought that if i used .getResource and just didn't return the connection to the pool that it would be fine

emccue23:10:26

since no other thread is accessing the connection

emccue23:10:35

except during the shutdown

hiredman23:10:47

ah, I didn't realize you are using the pool

emccue23:10:25

though i guess it makes sense

emccue23:10:43

i take a connection from the pool, and a thread is doing its busy wait loop on it

emccue23:10:00

and i try to call .close, but the connection is in an invalid state

emccue23:10:27

but that doesn't explain why messages are still being consumed when i shutdown the executor

hiredman23:10:39

shutdown an executor doesn't stop it

hiredman23:10:55

or I should say, it doesn't stop already running tasks

hiredman23:10:05

it stops it from starting new ones

emccue23:10:14

/**
     * Attempts to stop all actively executing tasks, halts the
     * processing of waiting tasks, and returns a list of the tasks
     * that were awaiting execution.
     *
     * <p>This method does not wait for actively executing tasks to
     * terminate.  Use {@link #awaitTermination awaitTermination} to
     * do that.
     *
     * <p>There are no guarantees beyond best-effort attempts to stop
     * processing actively executing tasks.  For example, typical
     * implementations will cancel via {@link Thread#interrupt}, so any
     * task that fails to respond to interrupts may never terminate.
     *
     * @return list of tasks that never commenced execution
     * @throws SecurityException if a security manager exists and
     *         shutting down this ExecutorService may manipulate
     *         threads that the caller is not permitted to modify
     *         because it does not hold {@link
     *         java.lang.RuntimePermission}{@code ("modifyThread")},
     *         or the security manager's {@code checkAccess} method
     *         denies access.
     */
    List<Runnable> shutdownNow();

emccue23:10:27

the docs for shutdown now says it tries to

hiredman23:10:43

This method does not wait for actively executing tasks to
     * terminate.  Use {@link #awaitTermination awaitTermination} to
     * do that.

emccue23:10:12

awaitTermination just waits though, it doesn't do anything to the tasks

emccue23:10:18

so me physically waiting should be enough

hiredman23:10:20

you are not waiting

hiredman23:10:29

it maybe that calling .close on the connection is what is getting you that error message

emccue23:10:33

(defn shutdown-pub-sub-listener!
  [pub-sub-listener]
  (let [{:keys [borrowed-client pub-sub executor]} pub-sub-listener]
    (.shutdownNow executor)
    (.awaitTermination executor 2 TimeUnit/MINUTES)
    (.unsubscribe pub-sub (into-array String [post-notification-channel]))
    (.close borrowed-client)))

emccue23:10:40

adding await terminate just makes it hang

emccue23:10:51

trying without a .close

hiredman23:10:34

because your task is not exiting

emccue23:10:37

without the .close it is still recieving messages

emccue23:10:07

okay so then the question is, how to kill the task

emccue23:10:25

I can't add a check to Thread.isInterrupted anywhere obvious

hiredman23:10:36

actually, looking at our connection pool what I do to kill the pubsub is just close the connection

hiredman23:10:43

and ignore any errors