missionary

Absolute Negativity 2024-11-13T11:50:12.981489Z

Hi. Does the following pattern causes memory leak as read> returns error and we descend into recursive calls:

(defn read-with-retry
    ([delay-ms read>]
     (m/ap
       (let [read (try (m/?> read>)
                    (catch Exception e
                      :error))]
         (if (= :error read)
           (m/amb read
             (m/? (m/sleep delay-ms :retrying))
             (m/?> (read-with-retry delay-ms read>)))
           read))))
    ([read>] (read-with-retry 5000 read>)))
It's for a long running process I don't intend to shutdown for the life of the app; I suspect it will be problematic and rewrite it using loop:
(defn read-with-retry-loop
    ([delay-ms read>]
     (m/ap
       (loop []
         (let [read (try (m/?> read>)
                      (catch Exception e
                        :error))]
           (if (not= :error read)
             read
             (m/amb
               (m/? (m/sleep delay-ms :retrying))
               (recur)))))))
    ([read>] (read-with-retry-loop 5000 read>)))
Is it fixed and was there any problem in the first place?

Absolute Negativity 2024-11-13T11:59:14.464449Z

Do I need to do:

(m/?> (m/eduction
          (medley/take-upto (comp (partial = :error) :status))
          (m/ap
            (try (m/?> read>)
              (catch Exception e
                :error)))))
To ensure that read> flow that already error terminates.

leonoel 2024-11-13T12:00:16.284729Z

the first version grows indefinitely, the second doesn't

leonoel 2024-11-13T12:02:21.167769Z

assuming the flow doesn't terminate spontaneously after error, yes you need to force it in the eduction stage

Absolute Negativity 2024-11-13T12:03:46.576039Z

Perfect! Many thanks to you!