Fork me on GitHub
#clojure
<
2023-04-24
>
Stpn18:04:36

Hello! Who is using ring + reitit, can you help please? I am trying to create a route with an optional path parameter so it could work on /test route for POST and for /test/123 for GET (here 123 is the value of a parameter). But when I do this - it does not work: ["test" {:post post-handler} ["/:opt-param" {:get get-handler}]]

p-himik18:04:42

How about

["test"
 ["" {:post ...}]
 ["/:param" {:get ...}]]

p-himik19:04:52

(not saying that it'll work BTW - I myself don't use reitit, so it's just a guess based on other code that I've seen)

Stpn19:04:09

it worked like a charm 🙂 I still cannot get used to such tricks of clojure, too many java in my head and too few time spent coding in clojure still

👍 2
Nasiru Ibrahim12:04:15

From your initial code, /test will definitely run the POST request, same as /test/param cos you must run /test before /test/param But if you make it as @U2FRKM4TW suggested, /test”” will run separately from /test/param That’s how reitit routing works

stopa20:04:59

Hey team, I want to create a "partitioned worker pool" of sorts. Here's the idea: • I have a set of N workers • I have jobs that come with a partition-id There are two constraints: • I want to make sure only 1 job ever runs for a partition-id at a time • I want to distribute work evenly, so no one partition-id can hog all the workers To solve for this, I thought I could create a "queue of queues". To sketch it out:

(def id->items (ConcurrentHashMap.))
(def pending-ids (LinkedBlockingQueue.))
(def active-ids (atom #{}))

(defn- save-item [id item]
  (.compute id->items
            id
            (reify BiFunction
              (apply [_this _k q]
                (let [q (or q (LinkedBlockingQueue.))]
                  (.add q item)
                  q)))))

(defn- update-schedule [id]
  (let [already-scheduled? (or (.contains pending-ids id)
                               (@active-ids id))]
    (if-not already-scheduled?
      (.add pending-ids id))))

(defn enq [id item]
  (save-item id item)
  (update-schedule id))

(defn work [f]
  (let [id (.take pending-ids)
        _ (swap! active-ids conj id)
        q (.get id->items id)
        item (.take q)]
    (f item)
    (swap! active-ids disj id)
    (if-not (.isEmpty q)
      (update-schedule id))))
The rough outline: • enq ◦ Adds the job to a partition-level queue ◦ Adds the partition into a pending queue, unless it's already pending or worked on ▪︎ (this ensures that only one worker can ever work on a partition at a time) • work ◦ Takes the id from the pending queue ◦ Sets it in active ▪︎ (this is again to guarantee that only one worker can ever work on a partition at a time) ◦ Takes the appropriate item ◦ Does the work ◦ Removes it from the active set ◦ If there is any more work left, it "reschedules" it ▪︎ (this ensures that work is distributed evenly) This is just a rough outline -- I am pretty sure I would need to add locking on update-schedule, and when taking from pending-ids and transferring to active-ids. But, I wanted to get it out there. Is this the way you'd solve it? Would you do something different?

hiredman20:04:41

I would add more queues 🙂

hiredman20:04:21

or sort of flip from push to pull

hiredman20:04:07

a big question is if the partition ids are all known statically or if partitions can be added dynamically

stopa21:04:14

> a big question is if the partition ids are all known statically or if partitions can be added dynamically In this case it is dynamic! > I would add more queues > or sort of flip from push to pull Oo, I'm intrigued! Would you mind explaining a bit more? Thanks @U0NCTKEV8!

hiredman21:04:35

dynamic is much trickier

👍 2
hiredman21:04:32

basically it is a kind of pubsub with a single consumer for each topic (a topic being a partition id)

❤️ 2
hiredman21:04:09

if you have some pub/sub stuff, you can stick something that starts a single worker for a given topic between the input queue and the pubsub (copying from the input queue to pubsub, starting a worker for a partition if absent)

hiredman21:04:17

but removing workers from that is tricky (if you don't want idle workers hanging around because you saw some partition id once hours ago)

👍 2
stopa21:04:36

That is interesting! I could do something like start a go-loop dynamically for each partition. I could also have the go-loop "shut itself off" if it's queue is empty. Is something like that what you had in mind?

hiredman21:04:08

the thing with doing it via workers with multiple queues and locks is it is hard to keep all your workers running, and you end up doing things like having them poll each queue and then sleep for some amount of time if there is nothing nothing on the queues (instead of blocking on a single queue) which works, you just feel bad doing it

👍 2
hiredman21:04:57

if you don't do a non-blocking polling from the worker they get stuck reading from Q1 with no work on it, when Q2 has some work to do on it

hiredman21:04:49

(def input (async/chan))
(def next-input (async/chan))
(def work-pub (async/pub next-input part-id))
(def shutting-down (async/chan))

(def start-worker [the-part-id]
  (let [ch (async/sub work-pub the-part-id (async/chan))]
    (async/go-loop [ack-ch nil]
      (let [to (async/timeout! 1000)
            [the-val the-chan] (async/alts! (concat (when-not ack-ch
                                                      [to])
                                                    [ch]
                                                    (when ack-ch
                                                      [ack-ch])))]
        (cond (= the-chan to)
              (let [ack (async/chan)]
                (async/>! shutting-down [part-id ack])
                (recur ack))
              (= the-chan ch)
              (do
                (do-stuff the-val)
                (recur ch (when ack-ch (async/close! ack-ch) nil)))
              (= the-chan ack-ch)
              nil)))))

;; worker launcher
(async/go-loop [workers {} buf nil]
  (when-some? buf
    (async/>! next-input buf))
  (async/alt!
    shutting-down ([[part-id ack]]
                   (if (async/>! ack true)
                     (recur (dissoc workers part-id) buf)
                     (recur workers buf)))
    input ([msg]
           (let [p (part-id msg)]
             (if (contains? workers p)
               (recur workers msg)
               (recur (assoc workers p (start-worker p)) msg))))))

hiredman21:04:13

oh, the worker should shutdown its pubsub channel when it exits

hiredman21:04:56

and there are resource management issues with async/pub leaking mults and not removing closed channels until a message comes in for the topic it is subscribed to

hiredman21:04:28

(we have an alternative async/pub at work that avoids all that)

stopa23:04:34

I'll sit tomorrow morning with fresh eyes and think on this with full focus. Really appreciate the demo code, thanks @U0NCTKEV8!

Ben Sless04:04:10

This is essentially a group-by of streams. Both more.async and #CL85MBPEF have those building blocks Remember you also want to GC idle partitions after a while to avoid oom

stopa20:04:46

I've been playing with both solutions today, and I learned a lot. Thank you @U0NCTKEV8 @UK0810AQ2!

stopa20:04:11

@UK0810AQ2 I have a curious question for you, reading the implementation of group-by! in more.async https://github.com/bsless/more.async/blob/master/src/main/clojure/more/async.clj#L270-L272 Would there be a race condition of sorts, between delayed-death!, and get-chan. where the constraint of "one consumer is active for a given partition at a time" fails? My thinking is: • There may be a split second, when delayed-death closes a channel, though f is still running • get-chan would return a new channel, and run f, which would now mean two f s would run at once for a particular partition-id I realize group-by doesn't specifically support the constraint of "1 f running at a time for a partition id" -- Just wanted to ask to make sure I am understanding the code properly. I would guess I would need to do something more like what hiredman did with the ack-ch, is that right?

Ben Sless20:04:54

I think you're right regarding the race condition, it should be transactional, you could do some locking in the f level to avoid it, too

👍 2
stopa20:04:23

Gotcha, thanks Ben!

hiredman20:04:54

the ack channel in my example above is to avoid that race

🔥 2
stopa20:04:29

Yes! Really cool.

hiredman20:04:02

but there is a potential for a deadlock, because the worker launcher is waiting forever for a write to the ack channel

hiredman20:04:41

ah, no, the worker closes the ack channel if it decides to keep going

❤️ 2
stopa20:04:52

What do you think about this:

(defn- start-worker! [n work-ch done-ch f]
  (loop []
    (let [[k item] (a/<!! work-ch)]
      (when-not (nil? item)
        (f item)
        (a/>!! done-ch [k item])
        (recur)))))

(defn- process-from [{:keys [running] :as state}
                     [k item :as v]]
  (if (running k)
    [nil (update-in state [:pending-work k] (fnil conj []) item)]
    [v (update state :running conj k)]))

(defn- process-done [{:keys [pending-work] :as state} [k item :as v]]
  (let [[next-item & remaining] (pending-work k)]
    (if next-item
      [[k next-item] (assoc-in state [:pending-work k] remaining)]
      [nil (update state :running disj k)])))

(defn- start-scheduler! [state-atom from-ch work-ch done-ch]
  (loop []
    (let [[v ch] (a/alts!! [from-ch done-ch])
          state @state-atom]
      (when-not (nil? v)
        (let [[to-work new-state] (condp = ch
                                    from-ch (process-from state v)
                                    done-ch (process-done state v))]
          (reset! state-atom new-state)
          (when to-work (a/>!! work-ch to-work))
          (recur))))))

(defn start!
  [num-workers from-ch f]
  (let [work-ch (a/chan num-workers)
        done-ch (a/chan)
        workers (->> (range num-workers)
                     (mapv #(future (start-worker! %1 work-ch done-ch f))))
        sched-state (atom {:pending-work {} :running #{}})
        scheduler (future (start-scheduler!
                           sched-state
                           from-ch
                           work-ch
                           done-ch))]
    {:workers workers
     :scheduler scheduler
     :schedule-state sched-state}))

(comment
  (def num-workers 4)
  (def from-ch (a/chan))
  (defn f [x] (println "got! %s" x))
  (def ret (start! num-workers from-ch f)))
This is an attempt to marry everything I learned today. The scheduler is inspired by your guys' feedback. The main change I am trying to make, is to use a constant set of workers, rather than creating new go-loops per partition-id. I am oscillating between this choice. The upside to this version, I think would be that it's simpler to manage N workers, than potentially thousands of go loops. The downside, is that it may be harder to manage in the long-term. How would you think about it?

hiredman20:04:39

I would go with an unrestricted set of go workers that queue work on some executor (possibly on a fixed pool) and wait for it to finish

hiredman20:04:57

possibly with additional feedback like watching the queue of the executor and if gets too large wait for some amount of time (using a timeout channel, to yield the go executor thread to service other gos) before trying again

stopa20:04:27

Gotcha. For my own learning, one more curious question: what would be your reasoning? Is it because it makes things simpler down the road? (error handling etc)

hiredman20:04:52

it breaks the problem down into two smaller subproblems to solve(and those solutions can be reused) instead of trying to solve one big problem at once

❤️ 2
stopa20:04:28

That makes sense to me, thanks for all the help @U0NCTKEV8!

Brian Beckman22:04:31

SOLVED Hello — I have certain sets of symbols, for instance (def my-set #{'Required 'Optional}) and I want to intern some vars that evaluate to those symbols (it’s for a compatibility layer in an intermediate representation for a compiler), e.g.

(def Required 'Required)
(def Optional 'Optional)
I’d like to automate these definitions with something like the following (which obviously won’t work): (doseq [e my-set] (def e 'e)) But I can’t quite see how to do this properly. I’ve tried a few crappy tricks with eval and backtick, but I’m outfoxed! Eventually, I’d even like to write a macro like the following (which just as obviously won’t work):
(defmacro symbolate [a-set]
  `(doseq [e# ~a-set] (def e# 'e#)))
(symbolate my-set)
Any advice, please & thanks?

🏁 2
phronmophobic23:04:45

On the jvm clojure, you could muck around with intern, but I would just do:

(defmacro symbolate [a-set]
  `(do
     ~@(for [sym a-set]
         `(def ~sym (quote ~sym)))))

👍 2
Brian Beckman13:04:05

It took me (too much) trial-and-error to get it to work, but your idea sent me down the right path

(def my-set #{'Required 'Optional})

(defmacro symbolate [a-set-sym]
  (let [aset (eval a-set-sym)
        cmds (for [e aset] (list 'def e `'~e))]
    (do cmds)))

(defn -main
  "I don't do a whole lot ... yet."
  [& args]
  (symbolate my-set)
  (println Required)
  (println Optional))

🏁 2
jkxyz13:04:39

I don’t think the macro is doing quite what you expect. In particular the call to do is not doing anything, as it just returns a sequence like ((def A 'A) (def B 'B)) which achieves the correct result because the seq and defs get evaluated. The last line could be (into '(do) cmds)

Brian Beckman13:04:01

yes, you’re right. I modified it to (list ~@cmds)`

Brian Beckman13:04:10

getting there drop-by-drop 🙂

jkxyz13:04:35

That returns the same thing as it did before

Brian Beckman13:04:44

well it works for bigger lists

Brian Beckman13:04:00

(def my-set
  #{'Local 'In 'Out 'InOut 'ReturnVar 'Unspecified
    })

(defmacro symbolate [a-set-sym]
  (let [a-set (eval a-set-sym)
        cmds (for [e a-set] (list 'def e `'~e))]
    `(list ~@cmds)))

(defn -main
  "I don't do a whole lot ... yet."
  [& args]
  (symbolate my-set)

  (println Local)
  (println In)
  (println Out)
  (println InOut)
  (println ReturnVar)
  (println Unspecified)
  )

jkxyz13:04:10

The code you want to return is (do (def A 'A) (def B 'B)), but that returns ((def A 'A) ,,,)

Brian Beckman13:04:23

ok lemme try it

jkxyz13:04:43

You can also check it with macroexpand

Brian Beckman13:04:14

(defmacro symbolate [a-set-sym]
  (let [a-set (eval a-set-sym)
        cmds (for [e a-set] (list 'def e `'~e))]
    (into '(do) cmds)))

(defn -main
  "I don't do a whole lot ... yet."
  [& args]
  (symbolate my-set)

  (println Local)
  (println In)
  (println Out)
  (println InOut)
  (println ReturnVar)
  (println Unspecified))

=> Syntax error compiling at (macro_question/core.clj:11:12).
Unable to resolve symbol: do in this context

Brian Beckman13:04:27

I’ll try macroexpand

jkxyz13:04:49

Ah sorry, backtick instead of quote. (into (do) cmds)`

jkxyz13:04:42

Also a correction on your previous example, it's returning (list (def A 'A)) which then evaluates to a list of vars -- which may be what you want!

👍 2
Brian Beckman13:04:43

same result : unable to resolve symbol: do …

Brian Beckman13:04:11

yes, i want the side-effects of the defs as well!

Brian Beckman13:04:27

so i think the println’s test that the defs have happened

Brian Beckman13:04:15

wife calling … back later 🙂 thanks for your kind attention to this!

Brian Beckman14:04:12

The list of vars is exactly what i want; this method survived integration with the rest of my compiler IR 🙂

shaunlebron23:04:59

Has anyone had success in making their clojure app faster by just sprinkling transducers everywhere?

shaunlebron23:04:39

I used YourKit to try to find low-hanging fruit, but all the reports are obfuscated by anonymous function calls without line numbers, and call-graphs are lost at thread boundaries

phronmophobic23:04:31

I sprinkle transducers everywhere just out of personal preference. As far as speeding things up, I've found https://github.com/clojure-goes-fast/clj-async-profiler to very helpful. Depending on the use case, benchmark tools like https://github.com/hugoduncan/criterium are also helpful. Transducers should be generally faster, but it's hard to say how much of a win it will be without profiling. You might also find https://github.com/johnmn3/injest to be helpful.

🙏 2
respatialized23:04:03

https://github.com/BrunoBonacci/mulog is another great logging library that lets you capture event-level logging data (including duration) while preserving more context than call graphs alone

🙏 4
Alex Miller (Clojure team)00:04:50

In general, unless you have big collections or deep transformation stacks, this is unlikely to help much. Because of chunking, sequences can be faster than you expect, and if you’re not using the entire result of a lazy seq transformation, they can dona lot less work than an eager reduce with transducers. But you know your app - there are probably a small number of places that make the biggest difference, so just think hard or try both in those

🙏 2
Alex Miller (Clojure team)00:04:07

In general, using your brain and your tools to find and solve actual problems is going to be more efficient and impactful than randomly doing anything

shaunlebron01:04:05

Thanks yall! It's not my code, just being tasked with optimizing someone else's, which is hard to approach without the context of having written it

Ben Sless04:04:38

In that case async profiler is great. Look for the meatiest stack

2
Mark Wardle19:04:58

I started doing this and got frustrated when they were slower than the conventional sequence operations… even those with lots of steps… probably because I wasn’t dealing with very large sequences. However, it taught me how sometimes breaking up logic into transducers made things more composable… and so I could have greater reuse and legibility… so it wasn’t a wasted effort at all.