Fork me on GitHub
#core-async
<
2020-10-23
>
otfrom16:10:34

so, I've been moving some of my design around based on the stateful transducer issue talked about above https://clojurians.slack.com/archives/C05423W6H/p1603202921084800 Does the following seem like a reasonable example? I've often got a large-ish collection of compressed data files (represented by the take 20 range here that I want to do some series of mapping, grouping, reducing, mapping again. Is this a reasonable set up?

(let [inc-chan    (a/chan 32 (map inc))
        reduce-eo   (a/reduce (fn [acc n] (if (even? n)
                                            (update acc :even conj n)
                                            (update acc :odd conj n)))
                              {:even []
                               :odd  []}
                              inc-chan)
        mapcat-chan (a/chan 8 (comp
                               (mapcat (fn [m] (map identity m)))
                               (mapcat (fn [[num-type nums]]
                                         (println "Numtype: " num-type " Nums: " nums)
                                         (into []
                                               (map (fn [n] [num-type n]))
                                               nums)))
                               (map (fn [[numtype n]] [numtype (inc n)]))))
        _           (a/pipe reduce-eo mapcat-chan)
        last-reduce (a/reduce (fn [acc [numtype n]]
                                (println "Numtype: " numtype " Num: " n)
                                (if (= :odd numtype)
                                  (update acc :used-to-be-odd conj n)
                                  (update acc :used-to-be-even conj n)))
                              {:used-to-be-even []
                               :used-to-be-odd  []}
                              mapcat-chan)]
    ;; Fire in events
    (a/onto-chan inc-chan (take 20 (range)))
    ;; collect events
    (a/<!! last-reduce))

otfrom16:10:25

I like using the higher order core.async function rather than doing something in a go block, unless I need to keep a look up table up to date or similar

otfrom16:10:21

this pattern of setting up the "machinery" in a let, followed by firing in the events, followed by retrieving the results from the channels is a pattern I'm following as well that seems to be working for me, but I don't know if there is a better way for this kind of problem

otfrom16:10:47

I'm mostly using this for embarrassingly parallel problems so the pipe would be replaced with some kind of pipeline for a lot of things

Alex Miller (Clojure team)16:10:25

is this different than a/pipeline?

otfrom16:10:01

the overall thing? or just the use of a/pipe?

otfrom16:10:21

I didn't think I could have a reducing stage in the middle of a/pipeline

otfrom16:10:06

this is a toy example of the kind of data processing I'd like to do

otfrom16:10:26

read in the records from a file, do scrubbing on each record, gather them up by some key (user id say), then process each data item grouped by a key and grab the results to put into some kind of output (csv, excel, database, api)

otfrom16:10:51

I've not found good examples of people chaining together the higher level core.async things (my google-fu has failed me)

otfrom16:10:14

and I'm just wondering if I'm using the library in a way it wasn't really mean to be

Alex Miller (Clojure team)16:10:19

Clojure Applied has a chapter on this but I think the key is - make components that are given the channels to use

Alex Miller (Clojure team)16:10:11

then whatever is assembling the units can make channels and hand it as an input to one and an output to another (and you don't even need pipe then)

Alex Miller (Clojure team)16:10:58

it also lets the assembler choose buffer policies and sizes etc

otfrom16:10:34

yeah, I think the pipe in here is a bit gratuitous looking at it again. That would probably be where I'd put in a a/pipeline if that was appropriate

otfrom16:10:52

I'll have a look again at the Clojure Applied chapter

otfrom16:10:00

re-reading it with that in mind should help

otfrom16:10:01

thx 🙂

Alex Miller (Clojure team)16:10:10

I think it's ch 6 but I don't have it in front of me

otfrom16:10:59

I'm looking about for my dead tree version. I might have to go to safari

Alex Miller (Clojure team)16:10:50

I assume you mean Programming Clojure ;)

otfrom16:10:22

I've only got one copy of that

otfrom16:10:42

I think I gave away my dead tree copy of Clojure Applied (unless it is in another room). I push it at people who want to go from noodling at the repl to actually building things

otfrom16:10:47

it is the best book for that

otfrom16:10:41

the authors of all of them are really friendly

otfrom16:10:02

mmm... vertical slices

otfrom16:10:45

sounds then that things like a/mult should mostly live in the assembler namespaces

otfrom16:10:37

If I had a component doing a reduce/transduce/into then the component would have a pipe at the end to put the data on the passed in output channel I suppose (as those functions don't take output channels, but just return them

otfrom16:10:22

@alexmiller thx for this. This makes me think that a little assembly function I wrote for some sugar is almost completely a bad idea

otfrom16:10:41

I was thinking that was true before, but didn't have a good way of thinking how things should be assembled