Fork me on GitHub
#code-reviews
<
2019-06-13
>
jjttjj17:06:47

Any thoughts on this? I'm looking for a way to add a new value to an atom subject to a transducer, sort of like how core.async chans can take an xf and apply them to incoming inputs. Except I don't really need any other features of core.async. Name suggestions welcome too 🙂

(defn xfswapper
  "Takes a transducer and a reducing function and returns a vector containing 1) a function that will swap! an atom with the reducing function subject to the transduer 2) the atom and 3) a promise which will be delivered to with the final dereferenced value if the becomes reduced."
  ([xf rf] (xfswapper xf rf (rf)))
  ([xf rf init]
   (let [a       (atom init)
         swapper (fn [acc x] (doto acc (swap! rf x)))
         rf      (xf swapper)
         done?   (promise)
         add!    #(let [result (rf a %)]
                    (if (reduced? result)
                      (do (deliver done? @@result) @result)
                      result))]
     [add! a done?])))


(let [[f a d] (xfswapper (comp (map inc) (filter odd?) (take 3))
                         conj
                         [])]
  (def a1 a)
  (def f1 f)
  (def d1 d))

(f1 1) ;;#<Atom@.. []>
(f1 2) ;;#<Atom@.. [3]>
(f1 3) ;;#<Atom@.. [3]>
(f1 4) ;;#<Atom@.. [3 5]>
(f1 5) ;;#<Atom@.. [3 5]>
(f1 6) ;;#<Atom@.. [3 5 7]>
(f1 7) ;;#<Atom@.. [3 5 7]>
@d1 ;;[3 5 7]

jjttjj18:06:15

I guess one issue is I can't control what's done with the atom once it's returned so maybe i need something slightly different that's not swap!able outside of the internal machinery

noisesmith18:06:52

I am skeptical of hiding side effects like atomic updates under abstractions - in my experience what's most reliable is to compose and abstract pure operations, and leave state changes explicit

noisesmith18:06:05

I assume the doto in swapper is there to return the container instead of the value?

jjttjj18:06:20

that makes sense. Is there another way to subject state changes to a transducer outside of core.async though? I basically have a bunch of big transducers I'd like to get some reuse out of in terms of being able to apply them to sequences, but also incoming data

noisesmith18:06:54

I think there's an example somewhere of a transducer on a queue, which I think directly matches this use case

jjttjj18:06:36

It'd be interesting to see, I'll try to find that! But I'm more interested in accumulating a result than "passing through" a queue. Like I'm basically looking for exactly (core.async/chan 1 xform) except something that can just be deref'd like an atom

jjttjj18:06:09

maybe it'd be better off as a deftype that implements deref and has it's own add method via a protocol?

noisesmith18:06:15

there are many xforms that use a volatile internally, perhaps similarly to the way you use an atom here

jjttjj18:06:08

yes but what I need is something that can accumulate state from values that don't exist yet

jjttjj18:06:22

like if I get a new random value every 5 minutes

jjttjj18:06:43

So I think I definitely need an actual transducible process

jjttjj18:06:02

because my goal is to be able to use the same big composed thread of transducers on this incoming data and also just via regular transduce on a regular collection of say the data for yesterday

noisesmith18:06:29

is sequence (lazy transducing context) not applicable here?

jjttjj18:06:10

no because it requires a coll argument, where I will have no coll for the live data

noisesmith18:06:13

I guess it wouldn't be if you needed to update / replace / remove a result that was already calculated, but I'm not certain you need or allow that here

noisesmith18:06:08

well, if that's the only blocker there's always (fn foo [] (lazy-seq (cons (some-side-effect) (foo)))

jjttjj18:06:10

i basically need an inspectable core.async chan with unlimited buffer and without the CSP

noisesmith18:06:36

why inspectable? n/m I think I see that now

jjttjj18:06:50

because i need to analyze the data as it's coming in

jjttjj18:06:41

the lazy-seq is interesting but I still need a way to subject the new values to the transducers

noisesmith18:06:43

you can always do the manual conversion to lazy-seq as above, or if the seq-ing is a bottleneck make a new transducible process

noisesmith18:06:19

@jjttjj oh my thought was that's a sufficient input to sequence, which gives a lazy collection result and allows a transducing function on the input

jjttjj18:06:17

ohhh let me wrap my head around this

noisesmith18:06:47

and, d'oh, that silly function above is just (repeatedly some-side-effect)

jjttjj18:06:33

hmmmm so i think the issue with that is the api i'm interacting with pushes data to me rather than allows for it to be pulled via a side effect function. Basically I have to implement methods on an object that are called when external events occur.

jjttjj19:06:34

so basically this

;;incoming data out of my control
(def new-random (atom nil))
(defn go []
  (future
    (reduce (fn [_ _]
              (reset! new-random (rand-int 100))
              (Thread/sleep 1000))
            (range 1000))))

;;my code
(def my-xf (comp (filter odd?) (map inc) (partition-all 2)))
(def result (atom []))
(add-watch new-random ::x
           (fn [_ _ _ new]
             ;;how do i conj this new value to the result atom subject to my-xf
             ))

jjttjj19:06:51

This is the current incarnation of what i'm working on that at least hides the internal atom:

(defprotocol IAdd
  (add! [this x]))

(deftype Xfer [rf underlying-atom prom]
  IAdd
  (add! [this x]
    (let [result (rf underlying-atom x)]
      (if (reduced? result)
        (do (deliver prom @@result) @result)
        result)))
  
  clojure.lang.IDeref
  (deref [this]
    (.deref underlying-atom))
  
  clojure.lang.IPending
  (isRealized [this] (realized? prom)))

(defn xfer
  ([xf rf] (xfer xf rf (rf)))
  ([xf rf init]
   (let [a       (atom init)
         swapper (fn [acc x] (doto acc (swap! rf x)))
         rf      (xf swapper)
         done?   (promise)]
     (->Xfer rf a done?))))

jjttjj19:06:01

so now I can do:

;;incoming data out of my control
(def new-random (atom nil))
(defn go []
  (future
    (reduce (fn [_ _]
              (reset! new-random (rand-int 100))
              (Thread/sleep 1000))
            (range 1000))))

;;my code
(def my-xf (comp (filter odd?) (map inc) (partition-all 2)))
;;(def result (atom []))

(def result (xfer my-xf conj []))

(add-watch new-random ::x
           (fn [_ _ _ new]
             (add! result new)))
@result => [[56 78] [70 86]] @result => [[56 78] [70 86] [50 24]] @result => [[56 78] [70 86] [50 24] [68 84]]