Fork me on GitHub
#core-async
<
2019-10-24
>
Jakub Holý (HolyJak)08:10:00

I do not want to put the data into a coll first. The main reason why I use async is that the input data is huge and I need to run it 1 by 1 through a transducer that extracts only the small part I really need - only then will all fit into memory and I can use into. And obviously I want to run the extraction transducer only on the data items, that's why I separate throwables into a separate channel. Not sure how cat applies here? Thank you all for your contributions!

noisesmith16:10:46

I think I misenterpreted by missing some context. I debug something like that with an atom where I use (swap! debug conj x), then I can easily play with the data in the repl even as the process is running

noisesmith16:10:10

you don't need channel semantics to debug this, so using a channel just adds complexity - just swap the data in from the same code that handles the channel

noisesmith16:10:44

you can combine the conj with random-sample logic to avoid excess memory usage while still seeing representative data

noisesmith16:10:51

often a group-by on the intermediate values in the atom will be helpful for debugging, also consider a tuple [timestamp ::code-context-keyword value]

noisesmith16:10:10

because those are the two big differentiators when doing complex debugging of live data

noisesmith16:10:07

where you derive the keyword as (defn my-broken-fn [blah] (swap! debug conj [(now) ::my-broken-fn {:args {:blah blah}}]) ...)

Jakub Holý (HolyJak)09:10:45

well, the input of the fn is a channel so I would need to wrap it with another channel with a transducer that puts stuff into the debug atom and use this wrapped channel further on, no?

noisesmith17:10:26

right, the transducer would be a noop in terms of channel data, but it would have the side effect of putting the data in a growing mutable collection

noisesmith17:10:59

(map (fn [x] (swap! debug conj {... ...}) x))

noisesmith17:10:15

or even an agent since async doesn't hurt here

👍 4
joaohgomes20:10:02

One thing you could do is to construct the resulting object {:result ... :error ...} inside the reducing function. If it is a throwable, you conj to :errors else merge to :result.

Jakub Holý (HolyJak)09:10:41

the reducing function does not see the errors for it only runs on the data-ch not the errors-ch. Or you mean not split the input ch in 2 like this? But I want to differentiate erros coming in and happening during reducing

joaohgomes17:10:00

Maybe something like this

(a/reduce
    (fn [result curr]
      (if (throwable? curr)
        (update result :errors conj curr)
        (update result :data extract-and-combine-data curr)))
    {:data nil :errors nil}
    data-ch)

joaohgomes18:10:22

I’ve put together a working snippet:

(defn new-error [] 
  (ex-info "some error..." {:code 999}))

(defn new-data []
  (let [k (rand-nth [:a :b :c :d :e :f])
        v (rand-int 10)]
    {k v}))
    
(defn emulate-load [max]
  (->> (repeatedly #(if (> (rand) 0.05) (new-data) (new-error)))        
       (take max)))

(defn throwable? [x] (instance? Throwable x))

(let [from (async/chan)
      to (async/reduce
           (fn [result curr]
             (if (throwable? curr)
               (update result :errors conj curr)
               (update result :data (partial merge-with +) curr)))
           {:data nil :errors nil}
           from)]
  (async/onto-chan from (emulate-load 15))
  (println (async/<!! to)))