Fork me on GitHub

Any thoughts on this? I'm looking for a way to add a new value to an atom subject to a transducer, sort of like how core.async chans can take an xf and apply them to incoming inputs. Except I don't really need any other features of core.async. Name suggestions welcome too 🙂

(defn xfswapper
  "Takes a transducer and a reducing function and returns a vector containing 1) a function that will swap! an atom with the reducing function subject to the transduer 2) the atom and 3) a promise which will be delivered to with the final dereferenced value if the becomes reduced."
  ([xf rf] (xfswapper xf rf (rf)))
  ([xf rf init]
   (let [a       (atom init)
         swapper (fn [acc x] (doto acc (swap! rf x)))
         rf      (xf swapper)
         done?   (promise)
         add!    #(let [result (rf a %)]
                    (if (reduced? result)
                      (do (deliver done? @@result) @result)
     [add! a done?])))

(let [[f a d] (xfswapper (comp (map inc) (filter odd?) (take 3))
  (def a1 a)
  (def f1 f)
  (def d1 d))

(f1 1) ;;#<Atom@.. []>
(f1 2) ;;#<Atom@.. [3]>
(f1 3) ;;#<Atom@.. [3]>
(f1 4) ;;#<Atom@.. [3 5]>
(f1 5) ;;#<Atom@.. [3 5]>
(f1 6) ;;#<Atom@.. [3 5 7]>
(f1 7) ;;#<Atom@.. [3 5 7]>
@d1 ;;[3 5 7]


I guess one issue is I can't control what's done with the atom once it's returned so maybe i need something slightly different that's not swap!able outside of the internal machinery


I am skeptical of hiding side effects like atomic updates under abstractions - in my experience what's most reliable is to compose and abstract pure operations, and leave state changes explicit


I assume the doto in swapper is there to return the container instead of the value?


that makes sense. Is there another way to subject state changes to a transducer outside of core.async though? I basically have a bunch of big transducers I'd like to get some reuse out of in terms of being able to apply them to sequences, but also incoming data


I think there's an example somewhere of a transducer on a queue, which I think directly matches this use case


It'd be interesting to see, I'll try to find that! But I'm more interested in accumulating a result than "passing through" a queue. Like I'm basically looking for exactly (core.async/chan 1 xform) except something that can just be deref'd like an atom


maybe it'd be better off as a deftype that implements deref and has it's own add method via a protocol?


there are many xforms that use a volatile internally, perhaps similarly to the way you use an atom here


yes but what I need is something that can accumulate state from values that don't exist yet


like if I get a new random value every 5 minutes


So I think I definitely need an actual transducible process


because my goal is to be able to use the same big composed thread of transducers on this incoming data and also just via regular transduce on a regular collection of say the data for yesterday


is sequence (lazy transducing context) not applicable here?


no because it requires a coll argument, where I will have no coll for the live data


I guess it wouldn't be if you needed to update / replace / remove a result that was already calculated, but I'm not certain you need or allow that here


well, if that's the only blocker there's always (fn foo [] (lazy-seq (cons (some-side-effect) (foo)))


i basically need an inspectable core.async chan with unlimited buffer and without the CSP


why inspectable? n/m I think I see that now


because i need to analyze the data as it's coming in


the lazy-seq is interesting but I still need a way to subject the new values to the transducers


you can always do the manual conversion to lazy-seq as above, or if the seq-ing is a bottleneck make a new transducible process


@jjttjj oh my thought was that's a sufficient input to sequence, which gives a lazy collection result and allows a transducing function on the input


ohhh let me wrap my head around this


and, d'oh, that silly function above is just (repeatedly some-side-effect)


hmmmm so i think the issue with that is the api i'm interacting with pushes data to me rather than allows for it to be pulled via a side effect function. Basically I have to implement methods on an object that are called when external events occur.


so basically this

;;incoming data out of my control
(def new-random (atom nil))
(defn go []
    (reduce (fn [_ _]
              (reset! new-random (rand-int 100))
              (Thread/sleep 1000))
            (range 1000))))

;;my code
(def my-xf (comp (filter odd?) (map inc) (partition-all 2)))
(def result (atom []))
(add-watch new-random ::x
           (fn [_ _ _ new]
             ;;how do i conj this new value to the result atom subject to my-xf


This is the current incarnation of what i'm working on that at least hides the internal atom:

(defprotocol IAdd
  (add! [this x]))

(deftype Xfer [rf underlying-atom prom]
  (add! [this x]
    (let [result (rf underlying-atom x)]
      (if (reduced? result)
        (do (deliver prom @@result) @result)
  (deref [this]
    (.deref underlying-atom))
  (isRealized [this] (realized? prom)))

(defn xfer
  ([xf rf] (xfer xf rf (rf)))
  ([xf rf init]
   (let [a       (atom init)
         swapper (fn [acc x] (doto acc (swap! rf x)))
         rf      (xf swapper)
         done?   (promise)]
     (->Xfer rf a done?))))


so now I can do:

;;incoming data out of my control
(def new-random (atom nil))
(defn go []
    (reduce (fn [_ _]
              (reset! new-random (rand-int 100))
              (Thread/sleep 1000))
            (range 1000))))

;;my code
(def my-xf (comp (filter odd?) (map inc) (partition-all 2)))
;;(def result (atom []))

(def result (xfer my-xf conj []))

(add-watch new-random ::x
           (fn [_ _ _ new]
             (add! result new)))
@result => [[56 78] [70 86]] @result => [[56 78] [70 86] [50 24]] @result => [[56 78] [70 86] [50 24] [68 84]]