Fork me on GitHub
#missionary
<
2021-12-05
>
Dustin Getz19:12:20

(defn await-promise "Returns a task completing with the result of given promise"
  [p]
  (let [v (m/dfv)]                   ; dataflow "atom"
    (.then p
           #(v (fn [] %))            ; wrap result in closure and put closure in atom
           #(v (fn [] (throw %))))   ; delayed throw
    (m/absolve v)))                  ; run whichever closure once it is set, yielding an async value or async exception
Why does it need to work like this? What is m/absolve for? I understand the types but not the "why"

Dustin Getz19:12:00

This was difficult for me to unpack - it would help if atom-like types implemented reset!, deref etc

ribelo19:12:17

i guess, because of error handling

ribelo19:12:56

we want to throw an error only when dereferencing data

Dustin Getz19:12:24

I see how it works but I don't understand the general pattern of m/absolve, unless it is for exactly this

Dustin Getz19:12:01

define: absolve To pronounce clear of guilt or blame. To relieve of a requirement or obligation.

ribelo19:12:41

in fact, attempt and absolve is a mystery to me

Ben Sless19:12:54

I was just looking at it, I think I'm missing something basic, let's say I have this little wrapper:

(defn http-task
  [req opts]
  (let [v (m/rdv)]
    (http/send-async
     req
     opts
     #(v (fn [] %))
     #(v (fn [] (throw %))))
    (m/absolve v)))
I call it and get back a task, but how do I get the result of running it?

leonoel19:12:40

how is it different from any other task ?

Ben Sless19:12:07

I get back an Event from the rdv, and I want to get its result

leonoel19:12:20

oh I misread I thought it was a mbx

leonoel19:12:09

is there a reason why you need rdv here ?

Ben Sless19:12:55

That's based on your suggestion from when I tried to convert callback based functions to ask, like promise or completable future

leonoel19:12:50

I don't remember exactly what I suggested but this is clearly a bad usage of rdv

leonoel19:12:38

rdv with 1 argument is pure and returns a task that puts the value and completes when it's read

leonoel19:12:44

here I suggest dfv

Ben Sless19:12:46

Same semantics?

Ben Sless19:12:32

Ah, no delay

Ben Sless20:12:47

Here we go

(defn forever [task]
  (m/ap (m/? (m/?> (m/seed (repeat task))))))

(defn http-task
  [opts req]
  (fn [s f] (http/send-async opts req s f)))

(def dad-joke-task
  (http-task
   {:uri ""
    :headers {"accept" "text/plain"}
    :method :get}
   nil))

(defn flower
  [t]
  (fn [s f]
    (let [v (m/dfv)]
      (t
       (fn [r] (v r))
       (fn [e] (v (throw e))))
      (v s f))))

(def flow-dad (flower dad-joke-task))

(comment
  (m/?
   (m/reduce
    (fn
      ([])
      ([_])
      ([_ x] (println x)))
    (m/eduction
     (comp (take 2) (map :body))
     (forever flow-dad)))))

Ben Sless20:12:03

Using [java-http-clj.core :as http]

Ben Sless20:12:11

I'm working on building an end to end example with missionary in a web programming context

leonoel21:12:21

I'd love to see that

leonoel21:12:03

For http I think in practice it's OK to simply wrap the callback API and ignore cancellation, assuming the timeouts are reasonably low

(def flow-dad
  (fn [s f]
    (http/send-async
      {:uri     ""
       :headers {"accept" "text/plain"}
       :method  :get}
      nil s f) #()))

Ben Sless21:12:35

I'm assuming a context where errors have to be handled, but I'll try this as well to figure out what I missed. For some reason this didn't work for me, but maybe something small was off

Ben Sless09:12:27

oh, was it because I was returning something I shouldn't have been?

Ben Sless09:12:40

I needed to return a cancelling function?

leonoel09:12:21

yes, it's mandated by the task protocol

Ben Sless10:12:24

That's what I forgot Good, now I'm at:

(defn prn-rf
    ([])
    ([_])
    ([_ x] (println x)))

  (m/?
   (m/reduce
    prn-rf
    (m/eduction
     (comp (take 2) (map :body))
     (forever dad-joke-task))))


  (->>
   (m/seed ["R7UfaahVfFd" "q4hVKRnOmjb" "xHQucUvszd"])
   (m/eduction (map (comp m/? joke-task)))
   (m/eduction (map :body))
   (m/reduce prn-rf)
   m/?)

Ben Sless10:12:27

Both work nicely

Ben Sless10:12:46

next step is fork/joining over a seed of tasks and collecting the results

leonoel10:12:06

(m/eduction (map (comp m/? joke-task))) m/? is blocking the thread here, is it your intent ?

Ben Sless10:12:30

I didn't know how to "flatmap" the tasks

leonoel10:12:34

I would put everything in an ap

Ben Sless10:12:40

Everything = eduction or even the reduce?

leonoel10:12:54

(m/ap
  (-> (m/?> (m/seed ["R7UfaahVfFd" "q4hVKRnOmjb" "xHQucUvszd"]))
    joke-task m/? :body println))

Ben Sless10:12:32

ack, I forget that inside the ap the context changes to a single value from flow

leonoel10:12:40

println in the reducing function is OK and maybe more modular

Ben Sless10:12:32

Here we go!

(->> ["R7UfaahVfFd" "q4hVKRnOmjb" "xHQucUvszd"]
      m/seed
      m/?>
      joke-task
      m/?
      :body
      m/ap
      (m/reduce conj)
      m/?)

Ben Sless10:12:29

(defn map-reduce
    [rf f flow]
    (->> flow m/?> f m/? m/ap (m/reduce rf)))

  (->> ["R7UfaahVfFd" "q4hVKRnOmjb" "xHQucUvszd"]
       m/seed
       (map-reduce ((map :body) conj) joke-task)
       m/?)

Ben Sless10:12:33

To the cookbook?

leonoel10:12:29

maybe ap in the threading macro is a bit too much for beginners

leonoel10:12:27

but this is the right way

Ben Sless10:12:22

Yeah it needs to be organized such that the boundaries of macro magic are clearer

Ben Sless11:12:04

Slightly more readable

(defn map-reduce
    [rf f flow]
    (->>
     (m/ap (->> flow m/?> f m/?))
     (m/reduce rf)))

leonoel17:12:15

I like this one

Ben Sless17:12:21

Is this only correct for f which takes data and returns a task?

Ben Sless17:12:00

So this is mapcat ?

(m/ap (->> flow m/?> f m/?))

leonoel17:12:56

not exactly, because there are 2 type constructors here

Ben Sless17:12:04

Can you please elaborate? Also, is it more correct to say this is monadic bind?

leonoel17:12:54

the signature is (A -> Task[B]) -> Flow[A] -> Flow[B]

leonoel17:12:29

monadic bind would be (A -> Flow[B]) -> Flow[A] -> Flow[B]

Ben Sless17:12:57

It's just an aesthetic preference but I dislike the names in Rx. Is the transformation of A -> T[B] to A -> F[B] a Lift?

leonoel17:12:38

that's not how I understand lift, but I may be wrong

Ben Sless17:12:39

more likely I am wrong, my cat theory is weak

Ben Sless17:12:44

Looking in wiki (>>=) : (M T, T → M U) → M U so if `mx : M T` and `f : T → M U`, then `(mx >>= f) : M U` Then the transition from T[B] to F[B] is a bind

ribelo19:12:22

All I know is that attempt wraps the task and its result in functions, and absolve unpacks

Dustin Getz19:12:32

absolve unpacks that extra layer of closure

ribelo19:12:17

I seem to understand how it works, but I don't want to click in my head why it works that way and why it doesn't work without it

ribelo19:12:08

without encapsulating it in a functions and unpacking it at the end, exception is thrown inside and no error is passed as a value

ribelo19:12:18

and the task is interrupted

Ben Sless19:12:08

Instead of putting it in terms of packing it in functions, this about it as delay or thunk. You delay evaluation

ribelo19:12:43

:thumbsup:

ribelo19:12:34

btw, @ben.sless, I'd love to read more about this but I don't even know where to start

Ben Sless19:12:49

Depends, how comfortable are you with monads?

ribelo19:12:15

I've never been able to understand monads 😐

Ben Sless19:12:37

Cool, they're still pretty fuzzy for me, too, don't feel bad, read the Micro Kanren paper

Ben Sless19:12:55

You can read it in an hour

Ben Sless19:12:06

The concept of delayed streams is similar to how you want to delay tasks

Dustin Getz19:12:14

I am wanting to write fdef specs on params that are task, flow etc, anyone find a hack to do this

ribelo19:12:34

you have a similar problem to me, only in a different place

Dustin Getz19:12:53

yeah i recall your q

ribelo19:12:58

sp and ap is fn and there's not much you can do about it

Ben Sless19:12:10

(s/def ::task
  (s/fspec :args (s/cat :success (s/fspec ,,,)
                        :fail (s/fspec ,,,))
           :ret ,,,
           ))

ribelo19:12:41

i'm going to read paper about μKanren

Ben Sless20:12:47

Here we go

(defn forever [task]
  (m/ap (m/? (m/?> (m/seed (repeat task))))))

(defn http-task
  [opts req]
  (fn [s f] (http/send-async opts req s f)))

(def dad-joke-task
  (http-task
   {:uri ""
    :headers {"accept" "text/plain"}
    :method :get}
   nil))

(defn flower
  [t]
  (fn [s f]
    (let [v (m/dfv)]
      (t
       (fn [r] (v r))
       (fn [e] (v (throw e))))
      (v s f))))

(def flow-dad (flower dad-joke-task))

(comment
  (m/?
   (m/reduce
    (fn
      ([])
      ([_])
      ([_ x] (println x)))
    (m/eduction
     (comp (take 2) (map :body))
     (forever flow-dad)))))

ribelo21:12:19

I can't figure out how mi/buffer works

leonoel21:12:46

how it works or what it's for ?

ribelo21:12:43

probably both

ribelo21:12:22

the tests are also very sparse and tell me very little

leonoel21:12:35

do you understand the docstring ?

ribelo21:12:25

i think so

ribelo21:12:17

basically it works like take-nth ?

ribelo21:12:44

((mi/reduce conj (mi/ap
                   (let [x (mi/?< (mi/buffer 3 (mi/seed (range 12))))]
                     x))) prn prn)
;=> [0 3 6 9]

ribelo21:12:47

is identical to

((mi/reduce conj (mi/ap
                   (let [x (mi/?< (mi/eduction (take-nth 3) (mi/seed (range 12))))]
                     x))) prn prn)

leonoel21:12:35

do you understand what backpressure is about ?

leonoel21:12:12

buffer is a pipeline stage that doesn't change the semantics of the flow, it just consumes more memory.

ribelo21:12:19

I've tried attaching a buffer to relieve somehow, or to faster input than output, but I don't have any conclusions as to what this does

ribelo21:12:09

It is probably possible to do dropping buffer and sliding buffer like in core.async, but how?

leonoel21:12:05

that is a different concern

leonoel21:12:02

buffer doesn't solve that, it is just allocating more storage to absorb temporary fast inputs

leonoel21:12:26

it is the same as core.async channel capacity

ribelo21:12:44

:thinking_face: