Fork me on GitHub
#clojure-italy
<
2019-05-28
>
andrea.crotti05:05:50

grazie @reborg, vabbeh ho risolto mettendo in src/cljc alla fine per adesso

andrea.crotti05:05:52

un po' di altre menate con i paths in docker ma ora va tutto, era cmq una soluzione temporanea visto che dovro' prenderli da S3 invece (con una cache magari)

mdallastella14:05:21

Quesito della sera: ho una struttura come la seguente

[{:a 1 :b 1 :c 10} {:a 2 :b 1 :c 5} {:a 1 :b 1 :c 20}]

mdallastella14:05:52

e voglio sommare e deduplicare i record con :a e :b uguali

mdallastella14:05:23

ottenendo [{:a 1 :b 1 :c 30} {:a 2 :b 1 :c 5}]

bronsa14:05:55

user=> (vals (clojure.set/index d #{:a :b}))
(#{{:a 1, :b 1, :c 20} {:a 1, :b 1, :c 10}} #{{:a 2, :b 1, :c 5}})
dovrebbe darti un buono spunto :)

4
mdallastella14:05:55

Attualmente abbiamo fatto:

(-> (reduce 
                         (fn [acc elem]
                           (let [k (select-keys elem [:a :b])
                                 v (:c elem)
                                 r (get acc k)]
                             (if r
                               (update-in acc [k :c] + v)
                               (assoc acc k elem))))
                         {}
                         ms)
                      vals)

mdallastella14:05:05

(Scusate la formattazione)

mdallastella14:05:07

Cioè, raggruppo per chiavi in una mappa e l'aggiorno... ma mi sembra un po' contorto

Andrea Imparato17:05:45

domanda, come faccio a “consumare” una reduce che prende una lazy seq?

Andrea Imparato17:05:06

che do devo usare?

Andrea Imparato17:05:22

hmm mi sa che devo semplicemente usare do 😄

alan18:05:47

doall in teoria

reborg19:05:25

reduce consuma di suo, hai un esempio?

Andrea Imparato20:05:33

@reborg questa schifezza qua:

(defn insert-data
  "multi inserts data and returns number of inserted rows"
  [data]
  (do
   (reduce +
           (for [d (partition-all config/multi-inserts-number data)]
             (:updated (insert-rows d))))))

Andrea Imparato20:05:59

se non metto do il programma si “blocca”

Andrea Imparato20:05:45

data è una lazy-seq

reborg20:05:17

sto parsando… dev’essere qualcos’altro

Andrea Imparato20:05:03

qua è dove mi arriva data:

(def handle-data
  "handles data sent by an async channel"
  (async/go-loop []
    ;; get the data from the channel and the db
    (let [channel-data (async/<!! config/chan)
          data (set (:data channel-data))
          bar (:progress-bar channel-data)
          count-data (count (:data channel-data))
          db-persons (set (select-persons data))
          ;; splitting it between to insert and to update data
          to-insert-data (clj.set/difference data db-persons)
          to-update-data (clj.set/intersection data db-persons)]
      (do
        ;; keeping in memory how much data we handled
        (swap! handled-data + count-data)
        (swap! to-insert clj.set/union to-insert-data)
        (swap! to-update clj.set/union to-update-data)
        ;; printing the progress bar
        (pr/print (pr/tick bar @handled-data))
        ))
    (recur))
  )

Andrea Imparato20:05:30

e qui mando data:

(defn run []
  (let [xml-list (load-xml)
        xml-list-partitioned (partition-all 100 xml-list)
        xml-count (count xml-list)
        progress-bar (pr/progress-bar xml-count)]
    (swap! db/total-rows + xml-count)
    (println "start processing" xml-count "xml members...")
    (doseq [data xml-list-partitioned]
      (async/>!! config/chan {:data data :progress-bar progress-bar}))))

Andrea Imparato20:05:53

(defn load-xml []
  (println "loading xml")
  (with-open [r (io/reader config/file-update-xml)]
    (xml/parse-xml (first (line-seq r)))))

reborg21:05:49

non capisco (per ora) come il do possa influenzare la cosa

Andrea Imparato21:05:08

neanch’io 😄 ¯\(ツ)

Andrea Imparato21:05:31

cmq si può considerare async come single threaded oppure no? perchè a quanto ho capito go spawna un processo quindi non è come fare async su nodejs giusto?

reborg21:05:35

partiamo dalle basi. perche’ insert-data non e’ chiamata da nessuna parte?

Andrea Imparato21:05:28

perchè mi sono dimenticato di pastare questo:

(def finish-processing
  (async/go-loop []
    (println @handled-data " " @total-rows)
    (when (and (not= @handled-data 0) (= @handled-data @total-rows))
      (let [num-to-insert (count @to-insert)
            num-to-update (count @to-update)
            num-inserted (insert-data @to-insert)
            num-updated (update-data @to-update)]
        (println "finished processing")
        ;;(println "total lines db:" num-rows)
        (println "to insert:" num-to-insert)
        (println "inserted:" num-inserted)
        (println "to update:" num-to-update)
        (println "updated:" num-updated)
        (println "total new rows:" (count-rows))
        (close-connection)))
    (recur)))

reborg21:05:25

dunque, finish-processing esegue ad evaluation time, che normalmente e’ un no-no. Sei sicuro che non debba essere connesso ad un main?

Andrea Imparato21:05:14

hmm cosa intendi per no-no?

Andrea Imparato21:05:33

io pensavo che così “partisse subito” no?

Andrea Imparato21:05:19

e dato che il canale è vuoto ad evaluation time quello si blocca e rimane in ascolto fino a quando qualcosa viene messo nel canale

Andrea Imparato21:05:26

almeno questo è quello che penso io 🙂

Andrea Imparato21:05:59

hmm sembra funzionare invece, c’era il db un po’ impaltanato

Andrea Imparato21:05:48

vabbè intanto vado a letto