Hi, I might found a bug in new core.async.flow . I am using datomic and flow, and when I create simple step-fn like:
(defn handler
;; describe
([]
{:params {:datomic-entity ""}
:ins {:in "In"}
:outs {:out "Out"}
:workload :io})
;; init
([args] args)
;; transition
([state _transition] state)
;; transform
([state _in msg]
[state {:out [msg]}]))
Then I create flow like this (some-entity from datomic/entity and datomic/touch):
(flow/create-flow
{:procs {:handler {:args {:datomic-entity some-entity}
:proc (flow/process #'handler)}}
:conns []})
Then I am able to flow/start and flow/resume , until now everything is ok, but if I do flow/ping then I got this error message (on error-chan from flow/start:
Receiver class datomic.query.EntityMap does not define or inherit an implementation of the resolved method 'abstract clojure.lang.IPersistentCollection cons(java.lang.Object)' of interface clojure.lang.IPersistentCollection.
I got same error when I just ping handler process using flow/ping-proc .
Can someone help me if this is really bug in clojure.async or am I missing something? I tried to minimaze code so it will end in exception. Also, when I dissoc :datomic-entity in init, no error will occur.
Edit: datomic-entity support datafy
Edit 2: Using [com.datomic/peer "1.0.7187"] , [org.clojure/clojure "1.12.0"] , [org.clojure/core.async "1.9.808-alpha1"]Thanks for the repro - do you have a stack trace from that exception?
I do immediately wonder about datafy
("clojure.lang.RT.conj(RT.java:697)"
"clojure.core$conj__5474.invokeStatic(core.clj:87)"
"clojure.core$conj__5474.invoke(core.clj:84)"
"clojure.core.protocols$fn__8275.invokeStatic(protocols.clj:167)"
"clojure.core.protocols$fn__8275.invoke(protocols.clj:123)"
"clojure.core.protocols$fn__8229$G__8224__8238.invoke(protocols.clj:19)"
"clojure.core.protocols$seq_reduce.invokeStatic(protocols.clj:31)"
"clojure.core.protocols$fn__8262.invokeStatic(protocols.clj:74)"
"clojure.core.protocols$fn__8262.invoke(protocols.clj:74)"
"clojure.core.protocols$fn__8203$G__8198__8216.invoke(protocols.clj:13)"
"clojure.core$reduce.invokeStatic(core.clj:6965)"
"clojure.core$into.invokeStatic(core.clj:7038)"
"clojure.walk$walk.invokeStatic(walk.clj:50)"
"clojure.walk$postwalk.invokeStatic(walk.clj:53)"
"clojure.walk$postwalk.invoke(walk.clj:53)"
"clojure.core$partial$fn__5927.invoke(core.clj:2641)"
"clojure.walk$walk.invokeStatic(walk.clj:46)"
"clojure.walk$postwalk.invokeStatic(walk.clj:53)"
"clojure.walk$postwalk.invoke(walk.clj:53)"
"clojure.core$partial$fn__5927.invoke(core.clj:2641)"
"clojure.core$map$fn__5954.invoke(core.clj:2772)"
"clojure.lang.LazySeq.force(LazySeq.java:50)"
"clojure.lang.LazySeq.realize(LazySeq.java:89)"
"clojure.lang.LazySeq.seq(LazySeq.java:106)"
"clojure.lang.RT.seq(RT.java:555)"
"clojure.core$seq__5486.invokeStatic(core.clj:139)"
"clojure.core.protocols$seq_reduce.invokeStatic(protocols.clj:24)"
"clojure.core.protocols$fn__8262.invokeStatic(protocols.clj:74)"
"clojure.core.protocols$fn__8262.invoke(protocols.clj:74)"
"clojure.core.protocols$fn__8203$G__8198__8216.invoke(protocols.clj:13)"
"clojure.core$reduce.invokeStatic(core.clj:6965)"
"clojure.core$into.invokeStatic(core.clj:7038)"
"clojure.walk$walk.invokeStatic(walk.clj:50)"
"clojure.walk$postwalk.invokeStatic(walk.clj:53)"
"clojure.walk$postwalk.invoke(walk.clj:53)"
"clojure.core$partial$fn__5927.invoke(core.clj:2641)"
"clojure.walk$walk.invokeStatic(walk.clj:46)"
"clojure.walk$postwalk.invokeStatic(walk.clj:53)"
"clojure.walk$postwalk.invoke(walk.clj:53)"
"clojure.core$partial$fn__5927.invoke(core.clj:2641)"
"clojure.core$map$fn__5954.invoke(core.clj:2772)"
"clojure.lang.LazySeq.force(LazySeq.java:50)"
"clojure.lang.LazySeq.realize(LazySeq.java:89)"
"clojure.lang.LazySeq.seq(LazySeq.java:106)"
"clojure.lang.Cons.next(Cons.java:41)"
"clojure.lang.RT.next(RT.java:733)"
"clojure.core$next__5470.invokeStatic(core.clj:64)"
"clojure.core.protocols$fn__8275.invokeStatic(protocols.clj:168)"
"clojure.core.protocols$fn__8275.invoke(protocols.clj:123)"
"clojure.core.protocols$fn__8229$G__8224__8238.invoke(protocols.clj:19)"
"clojure.core.protocols$seq_reduce.invokeStatic(protocols.clj:31)"
"clojure.core.protocols$fn__8262.invokeStatic(protocols.clj:74)"
"clojure.core.protocols$fn__8262.invoke(protocols.clj:74)"
"clojure.core.protocols$fn__8203$G__8198__8216.invoke(protocols.clj:13)"
"clojure.core$reduce.invokeStatic(core.clj:6965)"
"clojure.core$into.invokeStatic(core.clj:7038)"
"clojure.walk$walk.invokeStatic(walk.clj:50)"
"clojure.walk$postwalk.invokeStatic(walk.clj:53)"
"clojure.walk$postwalk.invoke(walk.clj:53)"
"clojure.core.async.flow.impl$proc$reify__36010$run__36015$pong__36019.invoke(impl.clj:254)"
"clojure.core.async.flow.impl$handle_command.invokeStatic(impl.clj:188)"
"clojure.core.async.flow.impl$handle_command.invoke(impl.clj:182)"
"clojure.core$partial$fn__5929.invoke(core.clj:2649)"
"clojure.core.async.flow.impl$proc$reify__36010$run__36015$fn__36021.invoke(impl.clj:276)"
"clojure.core.async.flow.impl$proc$reify__36010$run__36015.invoke(impl.clj:260)"
"clojure.lang.AFn.applyToHelper(AFn.java:152)"
"clojure.lang.AFn.applyTo(AFn.java:144)"
"clojure.core$apply.invokeStatic(core.clj:667)"
"clojure.core$apply.invoke(core.clj:662)"
"clojure.core.async.flow.impl$futurize$fn__35839$fn__35840.invoke(impl.clj:34)"
"clojure.lang.AFn.call(AFn.java:18)"
"java.base/java.util.concurrent.FutureTask.run(FutureTask.java:317)"
"java.base/java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1144)"
"java.base/java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:642)"
"java.base/java.lang.Thread.run(Thread.java:1583)")Datomic EntityMaps extend IPersistentCollection but don't actually implement the cons method (which is really conj -confusing implementation detail). conj is used by into used by postwalk as it tries to datafy the state. you can repro this pretty simply with just (conj a-datomic-entity [:a 1]). will look at this further to see if this is something datomic should fix or flow should address.