Fork me on GitHub
#core-async
<
2022-11-11
>
kwladyka12:11:18

(async/pipeline-blocking 3 output (map ab->sync) (async/to-chan!! abs-to-sync)) What I can use if I don’t care about output. I want to process N things at a time, but I am not interested in to read what functions return. Let’s say fn is like:

(defn ab->sync [...]
(do-something-and-return-nil))

kwladyka18:11:35

(defn sync-db [abs-to-sync]
  (let [pool-size (.getMaximumPoolSize (get-in dc-integrant/state [:db/postgresql :pools :historical-data]))
        output (async/chan 1)
        fns-to-run (mapcat
                     (fn [{:keys [a b]}]
                       [#(sync-db-spot-trades-from-csv a b)
                        #(sync-db-spot-candlesticks-1m-from-csv a b)])
                     abs-to-sync)
        ab->sync (fn [f] (f) :chan-cant-get-nil)]
    (async/pipeline-blocking pool-size output (map ab->sync) (async/to-chan!! fns-to-run))
    (loop []
      (when (async/<!! output)
        (recur)))))
This is extended version, but I don’t like it. What can I do to make it simpler / better? First of all output is not needed here at all. I only need to control number of functions run at a time. (fn [f] (f) :chan-cant-get-nil) is so ugly

hiredman18:11:08

Just stop trying to force a square peg in a round hole, if it doesn't match pipeline don't use pipeline

kwladyka18:11:38

What I can use? Nothing good in my mind

hiredman18:11:40

Use dotimes to spine up threads, use a fixed pool executor

hiredman18:11:04

Use doseq to spin up a thread per function or whatever

kwladyka18:11:30

But how I would control to run only N threads at a time then?

kwladyka18:11:08

consider it can be 200 functions to run or 500

kwladyka18:11:25

I can’t run all of them at once

kwladyka18:11:33

it will throw OOM

kwladyka18:11:42

or rather cross memory in cloud machine

kwladyka18:11:51

because hard drive is in memory

kwladyka18:11:24

controlling number of threads running at the same time will be helpful to avoid all kind of complications

kwladyka18:11:02

hmm can Java 19 virtual threads control number of threads processing at once?

kwladyka18:11:22

Unless you mean to do (repeatedly 4 ...) to make a 4 threads worker and then queue all this functions. Still not ideal.

kwladyka18:11:37

But maybe there is no solution which I am looking for in async

kwladyka18:11:40

@U0NCTKEV8 > use a fixed pool executor Does clojure.async have it? How code looks for that?

kwladyka18:11:56

Or you mean Java interop?

kwladyka19:11:32

I found only this for async https://stackoverflow.com/questions/18779296/clojure-core-async-any-way-to-control-number-of-threads-in-that-go-thread but it doesn’t so good, because it affects all async code in the app

kwladyka19:11:39

To summary this up as I understand there is no function in clojure async to run max N functions at once. Like thread pool size. Then start N workers reading from 1 chan is the best idea which I have. Alternatively Java interop with thead pool executor

kwladyka20:11:47

(let [pool-size (.getMaximumPoolSize (get-in dc-integrant/state [:db/postgresql :pools :historical-data]))
        fns-to-run (mapcat (fn [{:keys [a b]}]
                             [#(sync-db-spot-candlesticks-1m-from-csv a b)
                              #(sync-db-spot-trades-from-csv a b)])
                           abs-to-sync)
        fns-chan (async/to-chan!! fns-to-run)]
    (dotimes [_ pool-size]
      (async/go-loop []
        (when-let [f (async/<! fns-chan)]
          (f)
          (recur)))))
Finally this.

bortexz14:11:28

Looks like you will be doing i/o inside that go-loop (fns-to-run seem to be doing io), probably better to use (a/thread (loop [] …))

bortexz06:11:02

go-loops run on a fixed thread pool, blocking i/o should not be done in that thread pool. If you want to get the desired parallelism you want to create pool-size threads, not go-loops

kwladyka11:11:55

yeah, this system is only about IO and nothing else and some processes between take a little CPU, so choice is not as obvious us usually is