Fork me on GitHub
Jakub Holý (HolyJak)17:07:31

Hello, what happens when the transducer inside pipeline-blocking throws? Is it possible the output chan will never be closed? How to fix? With this:

(let [output-chan (a/chan concurrency)]
     (do (pipeline-blocking
           (map #(do-something-that-can-throw %))
           (a/to-chan accid->invnrs)
           ;; Put the error onto the channel for later reading
           (fn error-handler [throwable] throwable)))
The channel should be closed when it throws, no?

Alex Miller (Clojure team)17:07:58

there is an additional arg on chan for error handling

👀 3
Jakub Holý (HolyJak)17:07:21

Hm, from my experiments it seems ☝️ works fine and returns a channels with three throwables which then gets closed.

Jakub Holý (HolyJak)18:07:52

If somebody is willing to spend a little time, I would very much appreciate any help figuring out why my catching-transduce can freeze indefinitely on line 26, when trying to read the collection from errors-ch Briefly: It reads from a channel, splits into two (one for data, one for errors), runs transduce on the data, then checks its results for exceptions and the error channel for exceptions, returning either the data result or throwing. According to the tests (included), the function works correctly. So the only way I can see how this could happen is if the input channel never closes. But the input is produced by the pipeline-blocking above, which also seems to work correctly even if there is an exception. So I have no idea how to troubleshoot this further 😞 🙏


one escape-hatch for debugging is to use tap> which sends data to all functions registered by add-tap, and you can have a tapping function like #(swap! events conj %) to see the touchpoints in order as data

👀 3

since that doesn't use channels (but is still async), you can see traces of what happened and in what order, a bit more elegantly than prints

Jakub Holý (HolyJak)19:07:36

I was lucky 🙂 I discovered that changing one test slightly - so that the transformation fn throws for an input that is then followed by at least two inputs that either cause an exception or not - then I can replicate the problem. Now on to the solution... I should really setup some generative tests for this...

Jakub Holý (HolyJak)20:07:09

FYI The problem seems to be that when there is an exception during the transduction, I drain the input channel but it does not do what I expect (does not close the derived errors channel). Here is drain:

(defn drain
  (a/close! ch)
  (a/go-loop []
    (when (a/<! ch) (recur)))

Jakub Holý (HolyJak)20:07:34

Here is a replication of the issue:

(let [in-ch             (a/to-chan [2 4 6])
        [odd-ch even-ch] (a/split odd? in-ch)]
    (drain even-ch)
    {:even-ch (chan-status!! even-ch)
     :odd-ch (chan-status!! odd-ch)
     :in-ch  (chan-status!! in-ch)})
; => {:even-ch :closed, :odd-ch :ch-open-no-value-ready, :in-ch :ch-open-no-value-ready}
(chan-status!! does (or (first (a/alts!! [ch] :default :ch-open-no-value-ready)) :closed) )

Jakub Holý (HolyJak)20:07:30

It seems I have to drain both the input channel and the branch with any data (here even-ch).


What do I need to consider when using core async for chunking? Can I be notified on channel close so I can abort requests?


sounds like a good use case for a shutdown-channel that you can poll! - if it has data ready, you are done


you can also use a promise plus realized? as a cheap one-way latch indicating shutdown


if the requests are long-running, you can have a construction like

   done-ch :wrap-up
   (async/thread (get-batch ...)) ([result] ...)


where done-ch can be a chan that is either closed or a promise-chan with a value pushed to indicate you are wrapping up


you shouldn't need to poll anything