Fork me on GitHub
#missionary
<
2021-11-03
>
martinklepsch19:11:23

(let [reader (.. response -body getReader)]
                  (-> (.read reader)
                      (.then (fn [result]
                               ;; repeat the .read call when not done
                               (tap> {:done? (.-done result)})
                               ;; emit chunk to flow
                               #(js/console.log :chunk (.decode decoder (.-value result)))))))
I’m trying to get flow from a reader object like above. It requires me to repeatedly .read , wait for the promise and then call .read again. I’m struggling a bit with figuring out how to turn that into a flow

martinklepsch19:11:33

I guess I can use observe :thinking_face:

leonoel19:11:30

Assuming you have a task to read the next chunk, you can use this pattern :

(m/ap
  (loop []
    (let [result (m/? (read! reader))]
      (if (.-done result) result (m/amb> result (recur))))))

martinklepsch19:11:33

(defn read-chunks
  [reader emit!]
  (let [decoder (js/TextDecoder.)]
    (-> (.read reader)
        (.then (fn [result]
                 ;; repeat the .read call when not done
                 (js/console.log :read-result result)
                 (if (.-done result)
                   (emit! {:done? (.-done result)})
                   (do (emit! {:chunk (.decode decoder (.-value result))})
                       (read-chunks reader emit!))))))))
found this approach now

martinklepsch19:11:02

what does amb> mean in the above?

leonoel19:11:48

it means emit the result, then emit the result of the next loop

leonoel19:11:18

this is very close to your approach in fact

leonoel20:11:13

(defn read-chunks [reader]
  (m/ap
    (let [decoder (js/TextDecoder.)]
      (loop []
        (let [result (m/? (fn [s f] (.then (.read reader) s f) #()))]
          (js/console.log :read-result result)
          (if (.-done result)
            {:done? (.-done result)}
            (m/amb> {:chunk (.decode decoder (.-value result))}
              (recur))))))))