Fork me on GitHub
#data-science
<
2023-03-04
>
chucklehead17:03:10

I'd like to do streaming conversion of some gzipped-TSV-via-HTTP files to Parquet-on-S3 without persisting any intermediate copies to disk and without needing to materialize whole files in memory - are there good Clojure/Java options for this?

Rupert (All Street)19:03:05

I think this should be very easily doable in Clojure with stream large data sizes and without touching the disk. • All the HTP clients provide streaming as (inputstreams). • Gzip decoding can be done automatically by the http client or its one line of Java interop code (no dependencies). • TSV parsing into a lazy seq is trivial you can code it 6 lines of code or use a library. Note that type information is not preserved in tsv and may need to be manually applied in you code (e.g. Double/parseDouble etc). • S3 there is a good library for using it from cognitect which is lightweight or the Amazonica library which is much more heavy weight. Both should support streaming. • Parquet there should be some wrapper libraries about.

chucklehead20:03:13

I guess the piece I'm really missing is Parquet to S3 without requiring hadoop, or really how to do much with Parquet in general without requiring hadoop. csv->dataset-seq + ds-seq->parquet in tech.ml.dataset is almost what I need but seems to only support local file output.

chucklehead20:03:32

like you say, it all seems like it should be straightforward. I have a PyArrow solution that works fine on local or S3 just by swapping out the filesystem interface, but the gap in functionality between PyArrow and the Java Arrow libraries is pretty large

Rupert (All Street)20:03:51

Ah, I see the parquet bit might be more of a problem if the existing implementations are only for files. Since you are writing to S3, could you instead write lots of small parquet files (ie write to disk and then send to S3 then delete from disk)? Or could you try a different serialisation format (e.g. nippy or bson or MessagePack etc)?

Rupert (All Street)20:03:53

The reason parquet is more likely to be disk based than just inputstream/outputstream based is that it allows a certain amount of random access/jumping over columns that you don’t need. If you are going to read (stream down) an entire file from S3 then the advantage of being a parquet file (vs a TSV) is more limited (since you have to read all the bytes either way).

Rupert (All Street)20:03:19

Parquet is also a more useful format in slower languages like python - where you want as little data entering into memory as possible (because its a slow language). Whereas Clojure is fast to allocate and release data and iterate over it so it’s often less of a problem - unless you have really large datasets e.g. multi+ terabyte/exabyte.

chucklehead21:03:50

It's a daily ingest of ~1100 files. Not particularly large but around 1.5 billion rows altogether. I was hoping to do it using Lambda fanning out from a SQS queue, but some of the files are larger than Lambda memory and temporary storage limits. I'm finding Parquet useful because I can choose which columns to compress and dictionary-encode and the Parquet files typically end up smaller than the gzipped TSVs, plus being able to leverage the embedded statistics later. Right now I'm manually storing them in S3 using hive-style partitions and using Athena to query. Parquet in that case is specifically giving me the ability to perform certain queries without Athena scanning the whole TSV.

chucklehead21:03:25

I wanted to play around with using t.m.d to generate some of the same results. It looks like there's a dataset-seq->stream! option for Arrow format, so I may give that a shot.

Rupert (All Street)21:03:11

Sounds like you have a good use case for parquet/arrow. They certainly has some good features over tsv. Hopefully you can find a way to streaming solution.

chrisn12:03:36

Parquet compression is a bit better than TSV especially when it detects run-length-encoding situations or it detects that a column can use a dictionary. Additionally it is far cheaper to read than TSV so you get a few wins in higher performance scenarios, not simply jumping over columns or using slower languages. I would also like to write parquet to a pure output stream but the exact output interface the parquet writer uses requires writing to a https://github.com/techascent/tech.ml.dataset/blob/master/src/tech/v3/libs/parquet.clj#L1016. Without a bit more reverse engineering I am not certain how much you would have to buffer in order to make that work with an output stream and the codebase is incredibly painful to dig through. Arrow has columnwise compression and is used in some high performance scenarios such as HFT - it may work for you. Parquet often gets the best compression in my experience. duckdb can also scan/write parquet files efficiently if you like an SQL interface 🙂.

chrisn13:03:56

hmm - on closer inspection that method is writing a subset of the data to the file channel. This could write to an output stream.

chrisn13:03:05

Small change to enable streams - appears to work -

user> (require '[tech.v3.libs.parquet :as p])
nil
user> (require '[tech.v3.dataset :as ds])
nil
user> (require '[tech.v3.dataset :as ds])
nil
user> (def ds (ds/->dataset "test/data/stocks.csv"))
#'user/ds
user> (def ios ( "test.parquet"))
#'user/ios
user> (p/ds-seq->parquet ios [ds])
06:46:09.644 [nREPL-session-93da7685-fd28-436d-881e-86be86cfd13f] INFO org.apache.parquet.hadoop.InternalParquetRecordWriter - Flushing mem columnStore to file. allocated memory: 11643
:ok
user> (p/parquet->ds "test.parquet")
_unnamed [560 3]:

| symbol |       date |  price |
|--------|------------|-------:|
|   MSFT | 2000-01-01 |  39.81 |
|   MSFT | 2000-02-01 |  36.35 |
|   MSFT | 2000-03-01 |  43.22 |
|   MSFT | 2000-04-01 |  28.37 |
|   MSFT | 2000-05-01 |  25.45 |
|   MSFT | 2000-06-01 |  32.54 |
|   MSFT | 2000-07-01 |  28.40 |
|   MSFT | 2000-08-01 |  28.40 |
|   MSFT | 2000-09-01 |  24.53 |
|   MSFT | 2000-10-01 |  28.02 |
|    ... |        ... |    ... |
|   AAPL | 2009-05-01 | 135.81 |
|   AAPL | 2009-06-01 | 142.43 |
|   AAPL | 2009-07-01 | 163.39 |
|   AAPL | 2009-08-01 | 168.21 |
|   AAPL | 2009-09-01 | 185.35 |
|   AAPL | 2009-10-01 | 188.50 |
|   AAPL | 2009-11-01 | 199.91 |
|   AAPL | 2009-12-01 | 210.73 |
|   AAPL | 2010-01-01 | 192.06 |
|   AAPL | 2010-02-01 | 204.62 |
|   AAPL | 2010-03-01 | 223.02 |
user> 

👍 2
2
chrisn14:03:06

There is another wilder option. You can https://github.com/cnuernber/tmdjs/blob/master/src/tech/v3/libs/transit.clj (binary data is base-64 encoded). Then you can store it on s3 with the appropriate metadata and query it from your browser - we have used this with financial data. This doesn't get the compression of parquet or arrow but being able to open/view a portion of your dataset in a browser may be interesting/useful. Its still significantly faster and better compression than gzipped TSV.

2
chucklehead17:03:24

thanks! I'll try to test out the new release with parquet streaming a little later. I tried poking around in the implementation before posting here, but wasn't sure if it would be as straightforward as swapping out that local-file-output-file call

chrisn23:03:21

I wasn't either - and I haven't tried this over a network socket 🙂. Enjoy.

phronmophobic19:03:20

I was trying to implement an exercise where you calculate the total area of a heterogenous collection of shapes with dtype-next, but was getting an error loading the data. More details in 🧵 .

phronmophobic19:03:27

This code might be a little clunky, but it seems like it should work. I tried "9.033" and "10.000-beta-36" and got different errors.

(require '[tech.v3.datatype.struct :as dt-struct])

(def switch-shapes
  (into []
        (map (fn [_]
               {:type (rand-nth [:square
                                 :rectangle
                                 :triangle
                                 :circle])
                :width (* 10 (rand))
                :height (* 10 (rand))}))
        (range 1000)))

(dt-struct/define-datatype! :shape
    [{:name :type :datatype :int32}
     {:name :width :datatype :float32}
     {:name :height :datatype :float32}])
(def structs
  (dt-struct/new-array-of-structs :shape (count switch-shapes) ))

(def type->int
  {:square 1
   :rectangle 2
   :triangle 3
   :circle 4})

(doseq [[struct m] (map vector structs switch-shapes)]
  (.put struct :type (int (type->int (get m :type))))
  (.put struct :width (float (get m :width)))
  (.put struct :height (float (get m :height))))

;; :mvn/version "9.033"
;;   Execution error (ArrayIndexOutOfBoundsException) at tech.v3.datatype.ArrayHelpers/aset (ArrayHelpers.java:11).
;; Index 12000 out of bounds for length 12000

;; :mvn/version "10.000-beta-36"
;; Execution error (IndexOutOfBoundsException) at ham_fisted.ChunkedList/indexCheck (ChunkedList.java:229).
;; Index out of range: 12 : 12

phronmophobic19:03:18

I was able to find a small failing example and filed an issue, https://github.com/cnuernber/dtype-next/issues/76.