Hi, I have a trouble working with pmap. There is the example
(def temp-stat (->> (build-range (t/inst from) (t/inst to) step-mins) ;; builds a long list about few thousans elements
(pmap
(fn [{:keys [from to]}]
(query-logs-statistics client index from to))))) ;; a call takes 0.1-10s so I want to parallelize it
Unfortunately, when I try to use the value it leads to
> (last temp-stat)
java.util.concurrent.ExecutionException: EOF reached while reading throttling-stats *cider-repl path:localhost:1667(clj)*:1:7
Definitely last tryies to use value that wasn't calculated yet. But why? I expect that it should actively process all the LazySeq returned by pmap one element after another and wait when a result not ready yet. Am I wrong?
With map I don't have that issue. Also, I run it on babashka, but don't think it's important here.I suspect that something stateful is going on. Is the file getting closed by one thread before the other has finished with it?
I'm not sure what do you mean by file . The queyr-logs-statistics does an API call, higly likely some calls are faster then others, but why it's a problem?
also I've tried to put doall after the pmap but got same error.
the exception says "end of file" in it's error message. So I guessed it was reading files.
I don't think it's a lazyness problem, I think it's more likely a running things in parallel problem, and something in query-logs-statistics is sharing a resource between calls? or something?
the exception has :file "*cider-repl path:localhost:1667(clj)*" that's my emac's repl.
what does *e say?
(that should be the value of the last exception thrown)
nope, at least explicitly, query-log-statistics does an API-call, then process a result (remove unnecessayr key, converts list of maps to a single map etc). no sharing resources.
I'll try to get *e but it takes some time, give me few minutes
#error {
:cause "EOF reached while reading"
:via
[{:type clojure.lang.ExceptionInfo
:message "java.io.IOException: EOF reached while reading"
:data {:type :sci/error, :line 13, :column 9, :message "java.io.IOException: EOF reached while reading", :sci.impl/callstack #object[clojure.lang.Volatile 0x4dd2ada7 {:status :ready, :val ({:line 1, :column 1, :ns #object[sci.lang.Namespace 0x3fa882db "throttling-stats"], :file "*cider-repl path/script:localhost:1667(clj)*", :special true} {:line 1, :column 14, :ns #object[sci.lang.Namespace 0x3fa882db "throttling-stats"], :file "*cider-repl path/script:localhost:1667(clj)*", :sci.impl/f-meta {:name build-stats, :ns #object[sci.lang.Namespace 0x3fa882db "throttling-stats"], :file "/home/user/path/script/throttling-stats.bb", :arglists ([client index step-mins] [client index from to step-mins]), :line 1, :column 1}} {:line 5, :column 6, :ns #object[sci.lang.Namespace 0x3fa882db "throttling-stats"], :file "/home/user/path/script/throttling-stats.bb", :sci.impl/f-meta {:name build-stats, :ns #object[sci.lang.Namespace 0x3fa882db "throttling-stats"], :file "/home/user/path/script/throttling-stats.bb", :arglists ([client index step-mins] [client index from to step-mins]), :line 1, :column 1}} {:line 13, :column 9, :ns #object[sci.lang.Namespace 0x3fa882db "throttling-stats"], :file "/home/user/path/script/throttling-stats.bb", :sci.impl/f-meta {:name reduce, :doc "f should be a function of 2 arguments. If val is not supplied,\n returns the result of applying f to the first 2 items in coll, then\n applying f to that result and the 3rd item, etc. If coll contains no\n items, f must accept no arguments as well, and reduce returns the\n result of calling f with no arguments. If coll has only 1 item, it\n is returned and f is not called. If val is supplied, returns the\n result of applying f to val and the first item in coll, then\n applying f to that result and the 2nd item, etc. If coll contains no\n items, returns val and f is not called.", :arglists ([f coll] [f val coll]), :sci/built-in true, :ns #object[sci.lang.Namespace 0x3a1854e0 "clojure.core"]}})}], :file "/home/user/path/script/throttling-stats.bb"}
:at [sci.impl.utils$rethrow_with_location_of_node invokeStatic "utils.cljc" 142]}
{:type java.util.concurrent.ExecutionException
:message "java.io.IOException: EOF reached while reading"
:at [java.util.concurrent.FutureTask report "FutureTask.java" 124]}
{:type java.io.IOException
:message "EOF reached while reading"
:at [jdk.internal.net.http.HttpClientImpl send "HttpClientImpl.java" 938]}
{:type java.io.EOFException
:message "EOF reached while reading"
:at [jdk.internal.net.http.Http2Connection$Http2TubeSubscriber onComplete "Http2Connection.java" 1903]}]
:trace
[[jdk.internal.net.http.Http2Connection$Http2TubeSubscriber onComplete "Http2Connection.java" 1903]
[jdk.internal.net.http.common.SSLTube$DelegateWrapper onComplete "SSLTube.java" 285]
[jdk.internal.net.http.common.SSLTube$SSLSubscriberWrapper complete "SSLTube.java" 447]
[jdk.internal.net.http.common.SSLTube$SSLSubscriberWrapper onComplete "SSLTube.java" 548]
[jdk.internal.net.http.common.SubscriberWrapper checkCompletion "SubscriberWrapper.java" 474]
[jdk.internal.net.http.common.SubscriberWrapper$DownstreamPusher run1 "SubscriberWrapper.java" 334]
[jdk.internal.net.http.common.SubscriberWrapper$DownstreamPusher run "SubscriberWrapper.java" 259]
[jdk.internal.net.http.common.SequentialScheduler$LockingRestartableTask run "SequentialScheduler.java" 182]
[jdk.internal.net.http.common.SequentialScheduler$CompleteRestartableTask run "SequentialScheduler.java" 149]
[jdk.internal.net.http.common.SequentialScheduler$SchedulableTask run "SequentialScheduler.java" 207]
[jdk.internal.net.http.common.SequentialScheduler runOrSchedule "SequentialScheduler.java" 280]
[jdk.internal.net.http.common.SequentialScheduler runOrSchedule "SequentialScheduler.java" 233]
[jdk.internal.net.http.common.SubscriberWrapper outgoing "SubscriberWrapper.java" 232]
[jdk.internal.net.http.common.SSLFlowDelegate$Reader processData "SSLFlowDelegate.java" 540]
[jdk.internal.net.http.common.SSLFlowDelegate$Reader$ReaderDownstreamPusher run "SSLFlowDelegate.java" 283]
[jdk.internal.net.http.common.SequentialScheduler$LockingRestartableTask run "SequentialScheduler.java" 182]
[jdk.internal.net.http.common.SequentialScheduler$CompleteRestartableTask run "SequentialScheduler.java" 149]
[jdk.internal.net.http.common.SequentialScheduler$SchedulableTask run "SequentialScheduler.java" 207]
[java.util.concurrent.ThreadPoolExecutor runWorker "ThreadPoolExecutor.java" 1090]
[java.util.concurrent.ThreadPoolExecutor$Worker run "ThreadPoolExecutor.java" 614]
[java.lang.Thread runWith "Thread.java" 1487]
[java.lang.Thread run "Thread.java" 1474]
[com.oracle.svm.core.thread.PlatformThreads threadStartRoutine "PlatformThreads.java" 832]
[com.oracle.svm.core.thread.PlatformThreads threadStartRoutine "PlatformThreads.java" 808]]}
@borkdude any idea? The stack trace indicates something in sci/bb failing here?
sorry - got distracted by work. Reading the error, it looks like you're using babashka, not straight clojure. Did you read this? https://github.com/babashka/sci?tab=readme-ov-file#futures
I think tomorrow I'll check it with clojure as well.
@l0st3d nice catch.I've missed it.
I've not played with bb for more than small tasks like builds and scripts, so I'm afraid I've not got much experience to share with that.
You may want to realize the result of pmap, perhaps itโs a laziness issue
The docs pointed to do not apply to bb, just vanilla SCI
https://clojurians.slack.com/archives/C053AK3F9/p1763137161610419?thread_ts=1763135511.559429&cid=C053AK3F9 - @igor.gaisin also said they'd tried adding doall to realise the result of pmap