Fork me on GitHub
#missionary
<
2022-06-30
>
Richie14:06:59

This is an example https://github.com/leonoel/missionary/wiki/Basic-Walkthrough:-Tasks-&amp;-Flows#flow and it's giving me an error:

main.js:1426 Error: Unsupported operation.
    at Object.eval [as missionary$impl$Fiber$Fiber$park$arity$2] (Fiber.cljs:12:22)
    at Object.missionary$core$park [as park] (core.cljc:162:16)
    at Object.missionary_help$core$start_BANG_ [as start_BANG_] (core.cljs:11:3)
    at eval (shadow.module.main.append.js:4:28)
    at eval (<anonymous>)
    at goog.globalEval (main.js:472:11)
    at env.evalLoad (main.js:1534:12)
    at main.js:1700:12
https://github.com/rgkirch/missionary-help/blob/20744daa4a513f275358f8281d6a9675db4a2b96/src/missionary_help/core.cljs#L15 Error is only in ClojureScript. It works in Clojure.

leonoel14:06:28

This is expected, ? outside of sp or ap blocks the thread. The wiki is misleading

Richie14:06:41

Oh, right.

Richie14:06:44

Ok, yea. This works:

((m/reduce
  (constantly nil)
  (m/eduction (map println)
              (m/seed (range 20))))
 prn prn)

👍 1
Richie14:06:19

I thought I could run flows by calling it like a function (flow prn prn). I don't know where I got that idea but I guess that's wrong. I can run a reactor by calling it like that and it'll run any flows that I've "marked" as a publisher with signal! or stream! but if I'm outside of a reactor then I need to use reduce constantly nil to get a task and then I run that by calling it.

Dustin Getz14:06:36

you can run flows like that

Richie14:06:30

This is the code that I'm trying to understand. I expected to get the same results from both.

Richie14:06:47

Oh, is it just being lazy?

Richie14:06:04

Is it "running" but there's no consumer so it blocks?

Dustin Getz14:06:53

yeah, i believe this is backpressure

Richie14:06:03

Ok, yea. I see that now.

leonoel14:06:05

If you want to understand better how flows work under the hood, you may want to read the specification first https://github.com/leonoel/flow

Dustin Getz14:06:44

fixed typos = -> :=

leonoel15:06:35

the empty map should not be duplicated

Dustin Getz15:06:13

that is the behavior on my machine

1
martinklepsch16:06:20

Having a flow, can I somehow emit a delayed value for each value that comes through the flow? Specifically thinking of something like “things come in and need to be removed from state again after a while”

leonoel16:06:41

(defn duplicate-delayed [d >x]
  (m/ap
    (let [x (m/?= >x)]
      (m/amb> x (m/? (m/sleep d x))))))

martinklepsch16:06:22

Can you show me how that would work in context with something like observe ? And does this apply to CLJS, does ap work just the same?

martinklepsch16:06:07

Generally is there any sort of mnemonic around things like ap ?= ? etc? Everytime I see these functions I’m immediately a bit lost.

martinklepsch16:06:44

Yesterday I was wishing for a more verbose API that reads a bit more like English 😅

leonoel16:06:41

ap works exactly the same in clojurescript

leonoel16:06:41

(->> (m/observe (fn [!] ,,,))
    (duplicate-delayed 1000)
    (m/reduce conj))

🙌 1
martinklepsch17:06:41

So I guess >x is an idiom for a flow 🙂

martinklepsch17:06:59

Another question 😁 After a reduce how can I call a callback for each reduced value? thinking of something like this:

(->> (m/observe ,,)
     (m/reduce ,,,)
     (m/eduction (map callback)))

leonoel17:06:25

it is OK to perform side-effects in the reducing function of reduce, also check reductions it you need to emit intermediate states

martinklepsch17:06:00

Ah — with reductions + side effect in eduction I think I can get what I want

martinklepsch17:06:23

and yeah side effect in reduce also came to mind but felt a bit less idiomatic/right somehow

martinklepsch17:06:32

Ah, actually maybe more reductions + reduce so the flow is actually consumed?