Fork me on GitHub

I have a couple of urls (api endpoints) that I want to POST to using the body of one request as the payload for the next. Assuming post below returns the body

(->> (post "url:9001" mybody)
     (post "url:9002")
     (post "url:9003"))
how do I generalize this to n urls? I only want to keep the body of the last url in the chain


@slack.jcpsantiago You can use reduce or loop to handle a number of urls I guess?

bananadance 1

(reduce (fn [body url] (post url body)) my-body urls)

Milan Munzar10:09:29

Hey, 🙂 It seems to me that there is some problem with name resolution in defrecord when defining method named delete . In the following code it fails with a type error saying that delete is not a function. However when I rename it to del or something else it works as expected. Also I am able to call it like ((:delete foo)). Does anyone knows what is going on? I am using ClojureScript. Thx 🙂

(defn make
  (map->DBClient {:get    #(-> ddb-client .get .promise)
                  :put    #(-> ddb-client .put .promise)
                  :query  #(-> ddb-client .query .promise)
                  :delete #(-> ddb-client .delete .promise)}))

(def foo (make client))
(.delete foo)


Probably a name clash with javascript delete?

Milan Munzar11:09:30

Could be, in JS delete is an operator so it makes sense that it complains about not a function. Thx :thumbsup:


Might still be worth mentioning in #clojurescript as maybe it should work, I don't know

👍 1
Mark Wardle14:09:56

Hi. I’m getting started with core.async and am creating two channels, one with a list of files to be processed (files-c), and one with batches of data from those files (out-c). It looks as if I have a race condition if the number of worker threads I create outnumbers the list of files to be processed, because I think the channel gets closed before all the files can be processed. Is there a better approach? This code creates n worker threads that are meant to keep watch on the files channel and process as required, batching files and sending off on another channel.

(defn process-files
  "Processes files from the files-c channel sending data in batches to the out-c.
  The threads will end when the files-c channel is closed"
  [files-c out-c & {:keys [batchSize nthreads]}]
  (dotimes [i (if nthreads nthreads 2)]
      (loop [f (<!! files-c)]
        (if-not f
          (close! out-c)
            (println "Queuing file for import: " (.getPath f))
            (process-file (.getPath f) out-c (if batchSize batchSize 1000))
            (recur (<!! files-c))))))))

Mark Wardle14:09:26

I would solve this in golang using a waitgroup with the number of worker threads - and I’m copying the design pattern from what I did with that - so that is why I’m probably doing it wrong!


see my reply below - close doesn't discard messages that were already available to read


FYI there is a gotcha with doing IO in go blocks, usually the right thing is to use async/thread for anything that might block or be CPU intensive


minor suggestion: (if batchSize batchSize 1000) is better written as (or batchSize 1000)

👍 1

you can even use {:keys [batchSize nthreads] :or {batchSize 1000}} in the function arglist


though beyond a certain point destructures just get messy and hard to read


closing the out-c should not prevent reading messages that were already available

Clojure 1.10.1                                                                                                                         
(cmd)user=> (require '[clojure.core.async :as >])                                                                                      
(cmd)user=> (def c (>/chan))                                                                                                           
(cmd)user=> (>/put! c :a)                                                                                                              
(cmd)user=> (>/put! c :b)                                                                                                              
(cmd)user=> (>/put! c :c)                                                                                                              
(cmd)user=> (>/close! c)                                                                                                               
(cmd)user=> (>/<!! c)                                                                                                                  
(cmd)user=> (>/<!! c)                                                                                                                  
(cmd)user=> (>/<!! c)                                                                                                                  
(cmd)user=> (>/<!! c)                                                                                                                  

Mark Wardle14:09:59

That’s what I thought, but I wondered if the loop creates a thread that decides to close the channel before another thread can get on and add work to it (because it uses IO to read the disk I assumed it was a race condition)


you are creating two threads, loop does not create threads implicitly

Mark Wardle14:09:41

Yes - that’s intentional as I wanted worker threads that would drain a channel and send work to another channel - but I also wanted it to properly close sequentially if I closed the first channel. I think what is happening is: (def c (chan)) => #' (put! c :a) => true (put! c :b) => true (close! c) => nil (put! c :c) => false (<!! c) => :a (<!! c) => :b (<!! c) => nil

Mark Wardle14:09:10

ie I’m looking to build a a fan-out/fan-in type pattern but I think I need to look at some more examples!

Mark Wardle16:09:29

I fixed it by storing each worker thread’s channel (as returned from async/thread and merging those channels. It works but in the meantime I spotted pipeline and its ilk so will explore higher order abstractions there. Thanks for your help.

(defn file-worker
  [files-c out-c batchSize]
  (loop [f (<!! files-c)]
    (when f
      (println "Queuing file for import: " (.getPath f))
      (process-file (.getPath f) out-c (or batchSize 1000))
      (recur (<!! files-c)))))

(defn create-workers
  "Creates a number of worker threads"
  [n f & args]
  (loop [i 1 chans []]
    (if (= i n)
      (async/merge chans)
      (recur (inc i) (conj chans (thread (apply f args)))))))

Michaël Salihi14:09:45

Hi! I found interesting to tried to transpose this JavaScript destructuring snippet and here is what I got:

(let [data {:a 1 :b 2 :c 3}
      remove-prop :b
      my-remove (remove-prop data)
      my-rest (dissoc data remove-prop)]
  (println my-remove)
  (println my-rest))

;; Result
=> 2
=> {:a 1, :c 3}

Michaël Salihi14:09:11

Now I'm pretty sure there is a more idiomatic way. Any ideas ?


you could use clojure’s destructuring

(let [{b :b :as arg} {:a 1 :b 2 :c 3}
      rest (dissoc arg :b)]
  [b rest])

Michaël Salihi14:09:05

Perfect, very nice! I didn't know about aliases in a deconstruction block. I did well to ask! 🙂

👍 2

additionally, extra data in a hashmap usually isn't a problem, so depending on the context it might not be necessary to dissoc b.

👍 1

One-liner: [(:b data) (dissoc data :b)]

👍 2

Is there a canonical way to construct a function that returns subsequent elements from a lazy seq, on subsequent calls?

=> (let [f (yield-elems (cycle [:foo :bar]))]
     [(f) (f) (f)])
[:foo :bar :foo]
Is there something like yield-elems here?


This works, but seems somewhat cumbersome:

(defn yield-elems
  (let [xs' (atom xs)]
    (fn []
      (let [[x & rst] @xs']
        (reset! xs' rst)


I'd probably do this:

(defn yield-elems [s]
  (let [s (atom s)]
    (fn [] (ffirst (swap-vals! s rest)))))


There we go! 👏


swap-vals! is fairly new and it returns both the old and new values of the atom being swapped.


It's definitely new to me 🙂


Thanks for that


Added in 1.9. I'm only just getting used to it and still forget it exists and write something more verbose with swap! etc.


Any critique on my version?


It has a race condition if two threads called f at the same time.

👍 1

Both calls to f could read @xs' and get the same value, then both could call reset! so you'd only get one element consumed from two calls.


Pretty much any time you have both deref (`@`) and reset! in the same chunk of code, you can run into problems.


Makes sense. Just for completeness sake, would you fix that race condition with a locking, or is there a better way?


Even mixing deref and swap! can be problematic -- hence the addition of swap-vals!.


The better way is swap-vals! 🙂


Anything else isn't going to be atomic.


Noted, thanks! 👍


Also, remember that the function applied to an atom (in swap! or swap-vals!) can be called more than once if the STM needs to retry.

👍 1

(so, avoid side-effects in f)


I'm not sure if this is a good idea, but I realized this was essentially wanting a java.util.Iterator from a seq

(import (java.util Iterator
                   NoSuchElementException ))
(defn seq-iterator
  "eagerly consumes coll
  if cycle? is true, will cycle all items"
  ([coll] (seq-iterator coll false))
  ([coll cycle?]
   (let [i (atom (.listIterator coll))]
     (reify Iterator
       (hasNext [_] (or cycle? (.hasNext @i)))
       (next [_]
         (if-not cycle?
           (.next @i)
           (try (.next @i)
                (catch NoSuchElementException _
                  (reset! i (.listIterator coll))
                  (.next @i)))))
       (remove [_] (.remove @i))))))


actually if you don't need cyclic collections, all you need is (.listIterator coll)


and for arbitrary / indefinite lazy seqs you'd want something else (probably using reify on Iterator)


Sequences do have iterators (.iterator (cycle coll))

💯 1

And just to complete the circle, there's a built-in iterator-seq that returns a sequence from an Iterator 🙂


@U051SS2EU based on your last function :

(defn seq-iterator
    ([coll] (seq-iterator coll false))
    ([coll cycle?] (.iterator (if cycle? (cycle coll) coll))))


with .iterator you don't need the distinction, I only split the two in my reify because a cycle can't be represented that way (the List method ends up trying to force the entire coll)


I thought you specifically wanted to make a coll cyclic, but indeed, the handles everything