Fork me on GitHub
#aleph
<
2020-02-01
>
vincentjames50102:02:57

Any manifold experts here? Is there something idiomatically wrong with this or a better approach? We need to poll a stream for a new value and take an action on timeout. When this runs, we eventually hit 100% cpu and the garbage collector starts going crazy.

(let [shutdown (d/deferred)
      s (s/stream)]
    (d/loop []
      (d/chain (s/try-take! s nil 100 ::timeout)
               (fn [_]
                 (when-not (d/realized? shutdown)
                   (d/recur)))))
    shutdown)
If you change the timeout from 100 to 1, the problem happens within a minute and you’ll also see this:
WARNING: excessive pending takes (> 16384), closing stream
java.lang.IllegalStateException
	at manifold.stream.default.Stream.take(default.clj:234)
	at foo.manifold_test$run_BANG_$this__2988__auto____4284$fn__4285.invoke(manifold_test.clj:10)
	at foo.manifold_test$run_BANG_$this__2988__auto____4284.invoke(manifold_test.clj:9)
	at clojure.lang.AFn.applyToHelper(AFn.java:154)
	at clojure.lang.AFn.applyTo(AFn.java:144)
	at clojure.core$apply.invokeStatic(core.clj:667)
	at clojure.core$apply.invoke(core.clj:660)
	at foo.manifold_test$run_BANG_$this__2988__auto____4284$fn__4289.invoke(manifold_test.clj:9)
	at manifold.deferred.Listener.onSuccess(deferred.clj:219)
	at manifold.deferred.Deferred$fn__2673.invoke(deferred.clj:398)
	...

mccraigmccraig10:02:22

@vincentjames501 can't say I've used try-take! much, but you could try repeating a deferred timeout on the try-take! deferred instead of repeating the try-take!