Fork me on GitHub
#code-reviews
<
2020-08-14
>
Jeremy Cronk01:08:17

I’d love to see any feedback on making this more Clojure-ish: I want to read data from a file that may or may not have a weird, useless compression algorithm applied to it. The compression algorithm is just to compress repeating characters, and it works like this: 0xFA signals that the next three bytes are special - the first two get converted to a string, and this is (in hex) the number of times to repeat the character, and then the third byte is the character to be repeated. So if I’m reading a record that’s supposed to have 176 bytes, I want it to read it in, then expand any compressed sequences, up to 176 bytes and return a collection. Here’s what I came up with:

(declare read-bytes)
(defn inflate-compressed
  [is nbr bvec]
  (let [code (repeatedly 3 #(.read is))
        len (Integer/parseInt (->> code
                                   butlast
                                   (map char)
                                   st/join) 16)
        chr (last code)
        exp (vec (concat
                   bvec
                   (repeat len chr)))
        remain (- nbr len)]
    (fn [] (read-bytes is (if (neg? remain) 0 remain) exp))))

(defn compressed?
  [b]
  (= 250 b))

(defn read-bytes
  ([is nbr bvec]
   (if (zero? nbr) bvec
     (let [b (.read is)]
       (cond
         (= -1 b) (when-not (empty? bvec) bvec)
         (compressed? b) (fn [] (inflate-compressed is nbr bvec))
         :else
           (fn [] (read-bytes is (dec nbr) (conj bvec b)))))))
  ([is nbr]
   (trampoline read-bytes is nbr [])))
I’m using io/input-stream because even though most of the data is text, the 0xFA character (250) screws thing up if I use io/reader - I don’t know enough about Java to know why, but it changes the character to 65533 (`0xFFDD`) when I try to read it. I feel like there are more conditionals than I’d like, but I’m not sure where to start getting rid of them, or if I’ve overthought this whole thing and there’s a simpler way to do it.

jaihindhreddy21:08:47

In inflate-compressed, I'd destructure the 2 chars as opposed to using butlast and last. And instead of vec over the concat, you can use into. And (if (neg? remain) 0 remain) becomes (max remain 0) which can be done in the let itself. And in read-bytes, I'd write the arities in increasing order of number of args, and replace the cond with case. is can be typehinted to avoid reflection. And finally, in both inflate-compressed and read-bytes, you could use reader syntax for the returned anonymous fns. Applying all of the above, we get this:

(declare read-bytes)

(defn inflate-compressed [^java.io.InputStream is nbr bvec]
  (let [[n1 n2 chr] (repeatedly 3 #(.read is))
        len         (Integer/parseInt (str (char n1) (char n2)) 16)
        exp         (into bvec (repeat len chr))
        remain      (max 0 (- nbr len))]
    #(read-bytes is remain exp)))

(defn read-bytes
  ([is nbr] (trampoline read-bytes is nbr []))
  ([^java.io.InputStream is nbr bvec]
   (if (zero? nbr) bvec
     (let [b (.read is)]
       (case b
         -1  (not-empty bvec)
         250 #(inflate-compressed is nbr bvec)
         #(read-bytes is (dec nbr) (conj bvec b)))))))
This is of course, a surface level refactoring of your code. read-bytes still does too much IMO. It's dealing with the InputStream, making a vector, making sure we only get nbr number of bytes, and doing the decompression. Of all of these things, we can separate and implement just the decompression algorithm as a https://clojure.org/reference/transducers:
(defn decompress [xf]
  (let [filled (volatile! -1)
        buf    (volatile! 0)]
    (fn
      ([] (xf))
      ([result] (xf result))
      ([result b]
        (let [n @filled]
          (case (int n)
            -1 (if (= b 250)
                 (do (vreset! filled 0)
                     result)
                 (xf result b))
             0 (do (vreset! filled 1)
                   (vreset! buf (* 16 (Character/digit (char b) 16)))
                   result)
             1 (do (vreset! filled 2)
                   (vreset! buf (+ @buf (Character/digit (char b) 16)))
                   result)
             2 (do (vreset! filled -1)
                   (reduce xf result (repeat @buf b)))))))))
Of course, this doesn't look very readable, and that's probably because of my incompetence, but this doesn't do all of that other stuff, which inherently makes it more understandable. This way, we can make take do the job of ensuring we stop when we've consumed nbr bytes. And my take on doing that, is this:
(defn reducible ^clojure.lang.IReduceInit [^java.io.InputStream is]
  (reify
    clojure.lang.IReduceInit
    (reduce [_ f init]
      (loop [state init]
        (let [b (.read is)]
          (if (= b -1)
            state
            (let [result (f state b)]
              (if (reduced? result)
                @result
                (recur result)))))))))

(defn read-bytes [^java.io.InputStream is nb]
  (let [xf (comp decompress (take nb))]
    (into [] xf (reducible is))))
The reducible fn here only deals with the InputStream and returns a thing we can reduce over. And we do that using into, and as that reduction happens, we transform the bytes using the composition of decompress and (take nb) transducers.

Jeremy Cronk21:08:10

I’m blown away by the transducer. I don’t think that ever would have occurred to me. In fact, the density of new concepts here is amazing and I’m going to have to save this to refer back to. Thanks so much for the feedback!

Jeremy Cronk01:08:53

@U883WCP5Z After trying to implement this I have one issue - if I try to read n bytes and a compressed sequence happens to end at n, for some reason the input stream reads a byte and throws it away, but I’m not sure why. What should I be looking at to prevent that from happening? I’m confused about this, because take calls ensure-reduced when it’s done, and I thought that was supposed to stop all processing and return the result. When the decompression happens in the middle of a group of n bytes, everything works correctly, but I wonder if that’s coincidental.

Jeremy Cronk01:08:08

You know what, I think this is because I’m trying to access the input stream between calls. So the “extra” byte is there in the reducer ready to go, but if I mess with the input stream, everything gets screwed up and I start losing bytes. Of course I realize this shortly after I get frustrated enough to come back and ask. 😐 I guess the takeaway here is that accessing something mutable in multiple places without synchronization is asking for trouble, which I should already know…

seancorfield03:08:57

(when-not (empty? bvec) bvec) => (not-empty bvec)

seancorfield03:08:28

I found the if in read-bytes hard to read because bvec is on the same line as the condition -- don't do that: it looks like a single-arm if and my first thought was "Use when!" then I was puzzled about it returning nil... until I saw that bvec in there!

seancorfield03:08:38

The use of trampoline here feels weird. I would try to rewrite it without that, and just use a loop instead maybe?

Jeremy Cronk13:08:41

Thank you! not-empty was exactly what I was looking for there. I see your point about the formatting on the if - that part began life as part of cond until I realized that I needed to check for zero and get out before I read a byte that I might potentially just lose. This whole thing actually started as a loop, and then when I added support for the “compression”, it looked like that needed its own function, and then it looked like two functions that call each other, so that’s why I went with the trampoline. But since there’s no way to exit inflate-compressed without returning a function, it’s true that it doesn’t seem to quite fit. Seems to me that a trampoline properly goes between two functions that can each either return either a function or a value. Now that I’ve got it in a form that’s working, I’ll see about how I might get that back into a loop.

ndonolli20:08:44

Curious if about thoughts regarding improving something like this, or other alternatives to implementing a stateful iterator in clojure. I know that philosophically it could be considered at odds with the language, but if you had to do it, what’s the best way?

seancorfield20:08:48

@ndonolli As it stands, that isn't thread-safe. Also (iter!) is not valid (I think you mean (iter! 1)?)

ndonolli20:08:51

@seancorfield Thanks, would you mind elaborating on why it isn’t thread safe? I was playing with this code in a clojurescript context so I might not see that behavior

seancorfield20:08:56

(def iter! (stateful-seq (range)))
(future (iter! 1)) (future (iter! 1)) ; could both return (take 1 @cursor) before either swap! occurs

seancorfield20:08:21

That could give you 0 0 or 0 1 or even 1 0

seancorfield20:08:22

Since you want it to be atomic/thread-safe, I'd probably use split-at and use a pair in the atom.

seancorfield20:08:10

(defn stateful-seq [s]
  (let [s (atom [nil s])]
    (fn [n]
      (first (swap! s #(split-at (if (and (number? n) (pos? n)) n 1) (second %)))))))

seancorfield20:08:13

Seems to work.

ndonolli20:08:18

thanks, that makes sense. Thank you @seancorfield!

phronmophobic21:08:19

you can also use the lesser known swap-vals! :

(defn stateful-seq [s]
  (let [cursor (atom s)]
    (fn
      iter!
      ([]
       (iter! 1))
      ([n]
       (let [[old _] (swap-vals! cursor #(drop n %))]
         (take n old))))))

phronmophobic21:08:44

and on principle, I think it's better to crash on an invalid input than to silently coerce the invalid input into some valid one

jaihindhreddy21:08:33

Is this a good way to make a reducible thing from a .InputStream?

(defn reducible ^clojure.lang.IReduceInit [^java.io.InputStream is]
  (reify
    clojure.lang.IReduceInit
    (reduce [_ f init]
      (loop [state init]
        (let [b (.read is)]
          (if (= b -1)
            state
            (let [result (f state b)]
              (if (reduced? result)
                @result
                (recur result)))))))))

phronmophobic21:08:15

when you have something like an InputStream that might be holding onto system resources, you want to make it easy to manage resources. I'm not sure what making an input stream reducible buys you. check out https://juxt.pro/blog/ontheflycollections-with-reducible#_it_gets_useful_when in that example, the reducible wraps the resource and makes sure that the resources get released.

seancorfield22:08:02

@smith.adriane Ah, yes, I think I like that better (and I agree about just letting it crash on a bad n value).

phronmophobic22:08:55

adding an assert or :pre condition wouldn't hurt

seancorfield22:08:08

Also, both take and drop have reasonable behavior for negative n.

seancorfield22:08:09

user=> (def iter! (stateful-seq (range)))
#'user/iter!
user=> (iter! -2)
()
user=> (iter! -2)
()
user=> (iter! 1)
(0)
user=> (iter! 10)
(1 2 3 4 5 6 7 8 9 10)
user=> (iter!)
(11)
user=> 

seancorfield22:08:57

(even for an infinite range it still works nicely)