Fork me on GitHub
#core-async
<
2020-04-17
>
Alexis Vincent09:04:53

@hiredman I need it to work in cljs as well. Do you have any particular comments to level at this impl? Why it’s a bad idea? As far as I can tell this should work and be safe, will need to run it through a larger test suite though. the .lock and impl/active and commit are really just incantations for me. But intuitively this seems like it should work.

Alexis Vincent10:04:04

The following seems to be a safe version

(defn big-promise []
  (let [p (a/promise-chan)
        c (a/chan)
        m (a/mult c)]

    (a/go
      (let [v (a/<! p)]
        (when v
          (a/>! c v))
        (a/close! c)))

    (reify
      impl/ReadPort
      (take! [this handler]
        (a/go
          (let [c' (a/chan)
                v (do
                    (a/tap m c')
                    (let [v (or (a/poll! p)
                                (a/<! c'))]
                      (a/untap m c')
                      (a/close! c')
                      v))]

            #?(:clj
               (.lock handler))
            (let [good (and (impl/active? handler)
                            (impl/commit handler))]
              #?(:clj
                 (.unlock handler))
              (when good
                (dispatch/run #(good v))))))
        nil)

      impl/WritePort
      (put! [this v handler]
        (impl/put! p v handler))

      impl/Channel
      (close! [this]
        (impl/close! p))
      (closed? [this]
        (impl/closed? p)))))

Alexis Vincent10:04:26

But It doesnt support a/poll! Which it needs to do

Alexis Vincent11:04:34

(defn- box [val]
  #?(:clj
     (reify clojure.lang.IDeref
       (deref [_] val))
     :cljs
     (reify IDeref
       (-deref [_] val))))

(defn big-promise
  ([] (big-promise nil))
  ([max-pending-takes]
   (let [max-pending-takes (or max-pending-takes impl/MAX-QUEUE-SIZE)
         p (a/promise-chan)
         c (a/chan)
         m (a/mult c)
         takes (atom 0)]

     (a/go
       (let [v (a/<! p)]
         (when v
           (a/>! c v))
         (a/close! c)))

     (reify
       impl/ReadPort
       (take! [this handler]
         (if (not ^boolean (impl/active? handler))
           nil
           (if-let [v (a/poll! p)]
             (box v)
             (do
               (assert (< @takes max-pending-takes)
                       (str "No more than " max-pending-takes
                            " pending takes are allowed on this big-promise."))
               (a/go
                 (let [c' (a/chan)
                       v (do
                           (swap! takes inc)
                           (a/tap m c')
                           (let [v (or (a/poll! p)
                                       (a/<! c'))]
                             (a/untap m c')
                             (a/close! c')
                             v))]

                   (swap! takes dec)

                   #?(:clj (.lock handler))
                   (let [good (and (impl/active? handler)
                                   (impl/commit handler))]
                     #?(:clj (.unlock handler))
                     (when good
                       (dispatch/run #(good v))))))
               nil))))

       impl/WritePort
       (put! [this v handler]
         (impl/put! p v handler))

       impl/Channel
       (close! [this]
         (impl/close! p))
       (closed? [this]
         (impl/closed? p))))))