Fork me on GitHub
#core-async
<
2019-12-13
>
roklenarcic12:12:40

I wanna move elements from channel1 to channel2 until I see a specific token, is there a way to do this?

roklenarcic12:12:14

I tried using pipe to move from channel1 to channel2 with transducer take-while but it drains 1 more element from the channel1 than it should

roklenarcic12:12:02

(let [cc (async/to-chan [0 1 2 3])
      cc2 (async/chan 1000 (take-while #(not= 0 %)))
      _ (async/pipe cc cc2)]
  (println (async/<!! cc2))
  (println (async/<!! cc)))

roklenarcic12:12:29

this will print nil and 2 respectively, but should print nil and 1

alexmiller12:12:03

Might be a good case for halt-when

roklenarcic12:12:22

didn’t know that existed

roklenarcic12:12:59

at first glance it just looks like inverse of take-while

alexmiller12:12:25

I guess you may still drain one more than desired

alexmiller12:12:12

Not sure, not at a repl to try it

fmjrey12:12:24

Could it be related to the time when transducers are executed, which could be either during take or put

roklenarcic12:12:28

still same problem

fmjrey13:12:00

I see it's not related to when the transducer is executed, the code would work ok for any sequence not starting with 0, but when it does, it just eats the next value... would think this is a bug

alexmiller13:12:45

This is just the nature of how transducers work

fmjrey14:12:33

actually I'm having a hard time getting a deterministic result with the following code, sometimes I get cc=2 and other times cc=1

(defn thread-name []
  (.getName (Thread/currentThread)))

(defn p [& args]
  (locking *out*
    (apply println (thread-name) ":" args)))

(defn log [msg v]
  (p msg v)
  v)

(let [cc (async/to-chan [-2 -1 0 1 2 3])
      cc2 (async/chan 1000 (take-while (fn [x] (log "transducing" x) (not= 0 x))))
      _ (async/pipe cc cc2)]
  (p "cc2=" (async/<!! cc2))
  (p "cc=" (async/<!! cc))) 

Steven Katz14:12:29

I’m trying to create a non-blocking/async service that for each inbound connection it makes several outbound calls and then combines the results of all those calls into its ultimate return. I’m using Pedestal to handle the inbound and HTTP-Kit for the outbound. Here is the code:

(ns pedrepl.fetch
  (:require [org.httpkit.client :as http]
            [clojure.core.async :as async]))
          

(defn fetcher [options url] 
  (let [c (async/chan 1)
        p (http/get url options
                     (fn [{:keys [status headers body error]}] ;; asynchronous response handling
                       (if error
                         (async/>!! c (str "Failed, exception is " error))
                         (async/>!! c body))
                       (async/close! c)))]
    c))

(defn fetch-bids [] 
  (let [urls ["" ""]
        options {:timeout 200 :as :text}
        results (mapv (partial fetcher options) urls)]
    (map async/<!! results)))

Steven Katz14:12:41

I would like for the “(map async/<!! results)” to be parking and not blocking, but I don’t see how to do this. Any advice would be appreciated.

jjttjj14:12:31

can you just use a go loop to loop-recur through the urls instead?

jjttjj14:12:05

oh i'm sorry I was looking at the wrong line

alexmiller14:12:59

parking ops only exist in go blocks

Steven Katz14:12:05

yes, but putting the “(map async/<!! results)” in a block as in (go (map async/<! results)) does not work

alexmiller14:12:53

you probably want something more like run! than map

alexmiller14:12:33

or use alts!

jjttjj14:12:15

With a parking op/go you're either gonna have to do something with each result as you get it or return a chan (which you could still do a blocking take from if you want)

Steven Katz14:12:43

I’ve tried using doseq, but cant figure out how to get the result back as the return of the “go” that the pedestal route is waiting on. The whole thing is started with:

(def my-interceptor
  {:name ::my-interceptor
   :enter (fn [context]
            (async/go
              (assoc context :response (ring-resp/response (fetch/fetch-bids)))))})

;; Tabular routes
(def routes #{["/" :get (conj common-interceptors my-interceptor)]
              ["/about" :get (conj common-interceptors `about-page)]})

jjttjj14:12:20

(<!!
 (go-loop [results (set results)
           x []]
   (if-let [[v ch] (and (not-empty results)
                        (alts! results))]
     (recur (disj results ch) (conj x v))
     results)))

jjttjj14:12:49

something like that, or you can do something stateful with each result in the loop

Steven Katz14:12:41

not sure I understand the “<!!” at the beginning. Isn’t that blocking? Also, I dont see what channel it is taking from?

jjttjj14:12:18

go-loop returns a chan that contains the result of the go loop

jjttjj14:12:48

there's no way to do a parking op without returning a chan so you have to either do something with the values or block somewhere

Steven Katz14:12:17

so there is still a thread blocking waiting on the go-loop…doesn’t that result in 1 thread per connection/request?

jjttjj14:12:48

there is due to the <!! but you could omit it and do something like this instead:

(go-loop [results (set results)
          x []]
  (when-let [[v ch] (and (not-empty results)
                         (alts! results))]
    (save-result-to-database! v)
    (recur (disj results ch) (conj x v))))

jjttjj14:12:59

although i guess that's not ideal either bc you're not really supposed to do blocking ops like database calls in the go-loop

Steven Katz14:12:56

I feel like there should be a way to have a pool of threads handling inbound requests and parking, what pedestal does and a pool of thread handling non-blocking outbound calls, what http-kit does and connect the two with a thread pool in the middles that is also non-blocking.

roklenarcic15:12:07

you do not understand

roklenarcic15:12:23

unless the outbound driver supports parking, you are always going to use threads

roklenarcic15:12:00

just because you ditch threads and are fully “async” doesn’t mean some library (like http-kit client) down the stack doesn’t use a threadpool

roklenarcic15:12:26

you just move the bottleneck from your code into the library that’s doing the calls

roklenarcic15:12:34

the only real way to get massive concurrency in terms of number of connections without bloating thread counts is to use IO that uses epoll, like java.nio

roklenarcic15:12:00

but the problem is that your libraries have to use it too, otherwise it doesn’t matter

roklenarcic15:12:57

I’ve seen dozens of people boast about making their code fully “async”, but then they use a standard JDBC driver which just eats a thread until query is completed, and then they have to make connection pool larger or the queues in front of connection pool get impossibly long

roklenarcic15:12:29

The reality of situation is that “async” is just snake oil 95% of the time. Most of the time even a fairly medium sized thread pool and synchronous processing saturates your CPU, GC or IO bandwidth, so making it async won’t magically make it faster, in fact it will decrease throughput

roklenarcic15:12:43

and code is simpler in synchronous

Steven Katz15:12:20

http-kit uses a threadpool, it works via callbacks and NIO.

roklenarcic15:12:52

http-kit uses http-client, which uses apache http client

Steven Katz15:12:30

regardless, it does not matter…I would like to connect its callbacks (either actual or apparent async) to pedestal

roklenarcic15:12:02

pedestal has a callback right?

roklenarcic15:12:28

that is, it has a function you call wth the result

Steven Katz15:12:56

pedestal expects a channel which will contain a “context”

roklenarcic15:12:13

you can reduce a sequence of channels in a go block into a result

roklenarcic15:12:49

each http-kit success callback pushes a value into result channel

roklenarcic15:12:26

you can create aggregate channel with async/into

Steven Katz15:12:52

so each callback still has its own channel, and I would combine them into one with async/into?

roklenarcic15:12:08

there’s a lot of ways to do it

Steven Katz15:12:17

http-kit is based on https://github.com/cch1/http.async.client (at least according to its project.clj)

roklenarcic15:12:21

if your http-kit code returns channel

roklenarcic15:12:37

then use async/merge

roklenarcic15:12:51

to merge all channels into a new channel that has all the results

Steven Katz15:12:15

does async/merge bock the underlying thread?

roklenarcic15:12:34

not it uses go-loop

roklenarcic15:12:42

it returns a channel

roklenarcic15:12:53

which then has a single collection value

Steven Katz15:12:59

i’ll give it a try, thanks!

roklenarcic15:12:13

and if you need to modify that

roklenarcic15:12:49

you can do (async/go {:body (async/<! merged-chan) :status 200})

roklenarcic15:12:02

which returns another chan that you can give to pedestal

roklenarcic15:12:07

this is all nonblocking

roklenarcic15:12:52

go and go-loop blocks return channel that has 1 value and that is the return/last value of the block

otfrom16:12:36

👋

👋 1
kwrooijen20:12:43

Hey, if you create a (chan) and don’t close it, but it’s no longer referenced anywhere; will that cause problems in the runtime or is it automatically garbage collected?

(let [my-chan (chan)]
  (close! chan)
  nil)

;; vs

(let [my-chan (chan)]
  nil)

hiredman20:12:25

It will be gc'ed

kwrooijen20:12:02

Nice, so if I don’t save it to an atom, or global definition, I don’t have to manually close it?

hiredman20:12:39

You don't have to

kwrooijen20:12:52

All right, thank you 🙂