This page is not created by, affiliated with, or supported by Slack Technologies, Inc.
2021-12-05
Channels
- # adventofcode (111)
- # announcements (20)
- # babashka (19)
- # beginners (47)
- # calva (7)
- # clojure (56)
- # clojure-dev (27)
- # clojurescript (2)
- # events (1)
- # holy-lambda (1)
- # juxt (2)
- # meander (18)
- # minecraft (4)
- # missionary (107)
- # nextjournal (21)
- # off-topic (30)
- # reagent (7)
- # reitit (19)
- # releases (1)
- # tools-build (8)
- # tools-deps (7)
- # vim (22)
- # xtdb (4)
(defn await-promise "Returns a task completing with the result of given promise"
[p]
(let [v (m/dfv)] ; dataflow "atom"
(.then p
#(v (fn [] %)) ; wrap result in closure and put closure in atom
#(v (fn [] (throw %)))) ; delayed throw
(m/absolve v))) ; run whichever closure once it is set, yielding an async value or async exception
Why does it need to work like this? What is m/absolve for? I understand the types but not the "why"This was difficult for me to unpack - it would help if atom-like types implemented reset!, deref etc
I see how it works but I don't understand the general pattern of m/absolve, unless it is for exactly this
define: absolve To pronounce clear of guilt or blame. To relieve of a requirement or obligation.
I was just looking at it, I think I'm missing something basic, let's say I have this little wrapper:
(defn http-task
[req opts]
(let [v (m/rdv)]
(http/send-async
req
opts
#(v (fn [] %))
#(v (fn [] (throw %))))
(m/absolve v)))
I call it and get back a task, but how do I get the result of running it?That's based on your suggestion from when I tried to convert callback based functions to ask, like promise or completable future
rdv
with 1 argument is pure and returns a task that puts the value and completes when it's read
Here we go
(defn forever [task]
(m/ap (m/? (m/?> (m/seed (repeat task))))))
(defn http-task
[opts req]
(fn [s f] (http/send-async opts req s f)))
(def dad-joke-task
(http-task
{:uri " "
:headers {"accept" "text/plain"}
:method :get}
nil))
(defn flower
[t]
(fn [s f]
(let [v (m/dfv)]
(t
(fn [r] (v r))
(fn [e] (v (throw e))))
(v s f))))
(def flow-dad (flower dad-joke-task))
(comment
(m/?
(m/reduce
(fn
([])
([_])
([_ x] (println x)))
(m/eduction
(comp (take 2) (map :body))
(forever flow-dad)))))
I'm working on building an end to end example with missionary in a web programming context
For http I think in practice it's OK to simply wrap the callback API and ignore cancellation, assuming the timeouts are reasonably low
(def flow-dad
(fn [s f]
(http/send-async
{:uri " "
:headers {"accept" "text/plain"}
:method :get}
nil s f) #()))
I'm assuming a context where errors have to be handled, but I'll try this as well to figure out what I missed. For some reason this didn't work for me, but maybe something small was off
That's what I forgot Good, now I'm at:
(defn prn-rf
([])
([_])
([_ x] (println x)))
(m/?
(m/reduce
prn-rf
(m/eduction
(comp (take 2) (map :body))
(forever dad-joke-task))))
(->>
(m/seed ["R7UfaahVfFd" "q4hVKRnOmjb" "xHQucUvszd"])
(m/eduction (map (comp m/? joke-task)))
(m/eduction (map :body))
(m/reduce prn-rf)
m/?)
(m/eduction (map (comp m/? joke-task)))
m/?
is blocking the thread here, is it your intent ?
(m/ap
(-> (m/?> (m/seed ["R7UfaahVfFd" "q4hVKRnOmjb" "xHQucUvszd"]))
joke-task m/? :body println))
Here we go!
(->> ["R7UfaahVfFd" "q4hVKRnOmjb" "xHQucUvszd"]
m/seed
m/?>
joke-task
m/?
:body
m/ap
(m/reduce conj)
m/?)
(defn map-reduce
[rf f flow]
(->> flow m/?> f m/? m/ap (m/reduce rf)))
(->> ["R7UfaahVfFd" "q4hVKRnOmjb" "xHQucUvszd"]
m/seed
(map-reduce ((map :body) conj) joke-task)
m/?)
Slightly more readable
(defn map-reduce
[rf f flow]
(->>
(m/ap (->> flow m/?> f m/?))
(m/reduce rf)))
it is called concatMapSingle
in RxJava http://reactivex.io/RxJava/3.x/javadoc/io/reactivex/rxjava3/core/Flowable.html#concatMapSingle-io.reactivex.rxjava3.functions.Function-
It's just an aesthetic preference but I dislike the names in Rx. Is the transformation of A -> T[B] to A -> F[B] a Lift?
Looking in wiki
(>>=) : (M T, T → M U) → M U
so if `mx : M T` and `f : T → M U`, then `(mx >>= f) : M U`
Then the transition from T[B] to F[B] is a bind
All I know is that attempt wraps the task and its result in functions, and absolve unpacks
absolve unpacks that extra layer of closure
I seem to understand how it works, but I don't want to click in my head why it works that way and why it doesn't work without it
without encapsulating it in a functions and unpacking it at the end, exception is thrown inside and no error is passed as a value
Instead of putting it in terms of packing it in functions, this about it as delay or thunk. You delay evaluation
btw, @ben.sless, I'd love to read more about this but I don't even know where to start
Cool, they're still pretty fuzzy for me, too, don't feel bad, read the Micro Kanren paper
I am wanting to write fdef specs on params that are task, flow etc, anyone find a hack to do this
what's fspec
yeah i recall your q
(s/def ::task
(s/fspec :args (s/cat :success (s/fspec ,,,)
:fail (s/fspec ,,,))
:ret ,,,
))
Here we go
(defn forever [task]
(m/ap (m/? (m/?> (m/seed (repeat task))))))
(defn http-task
[opts req]
(fn [s f] (http/send-async opts req s f)))
(def dad-joke-task
(http-task
{:uri " "
:headers {"accept" "text/plain"}
:method :get}
nil))
(defn flower
[t]
(fn [s f]
(let [v (m/dfv)]
(t
(fn [r] (v r))
(fn [e] (v (throw e))))
(v s f))))
(def flow-dad (flower dad-joke-task))
(comment
(m/?
(m/reduce
(fn
([])
([_])
([_ x] (println x)))
(m/eduction
(comp (take 2) (map :body))
(forever flow-dad)))))
((mi/reduce conj (mi/ap
(let [x (mi/?< (mi/buffer 3 (mi/seed (range 12))))]
x))) prn prn)
;=> [0 3 6 9]
is identical to
((mi/reduce conj (mi/ap
(let [x (mi/?< (mi/eduction (take-nth 3) (mi/seed (range 12))))]
x))) prn prn)
buffer
is a pipeline stage that doesn't change the semantics of the flow, it just consumes more memory.
I've tried attaching a buffer to relieve
somehow, or to faster input than output, but I don't have any conclusions as to what this does
It is probably possible to do dropping buffer and sliding buffer like in core.async, but how?