core-async

2024-09-17T18:25:08.785299Z

How can I make tests using core.async? In a way that after my channel finalized to receive the messages (or when itโ€™s closed), I make my assertion?

2024-09-17T19:37:47.732959Z

Tests are just programs with assertions in them. If you are writing a program using core.async you can write a test with core.async.

2024-09-18T16:24:48.556709Z

(let [in (a/chan)
      out (a/chan)
      in-b (a/chan)
      out-b (a/chan)
      end (a/chan)
      {:keys [s3-client]
       :as   service} (-> utils/test-config
                          utils/start-ls!
                          utils/add-s3-client!
                          closeable-map)
      processors (.availableProcessors (Runtime/getRuntime))
      bucket-name "s3-bucket"
      data (gen/sample (s/gen ::my-data/my-data) 5000)
      filtered-data (keep process-not-completed data)
      xf (keep (comp (fn [{::error-report/keys [file-name]
                           :as                 data}]
                       (when data
                         {:bucket         bucket-name
                          :key            (str file-name "-" (random-uuid) ".json")
                          :stream-content (-> data
                                              (json/encode {:key-fn name})
                                              (.getBytes StandardCharsets/UTF_8)
                                              ByteArrayInputStream.)}))
                     process-not-completed))
      _ (.createBucket s3-client bucket-name)
      make-req (fn [continuation-token]
                 (if continuation-token
                   (-> (ListObjectsV2Request.)
                       (.withBucketName bucket-name)
                       (.withContinuationToken continuation-token))
                   (-> (ListObjectsV2Request.)
                       (.withBucketName bucket-name))))
      get-outside-data (fn [] (into []
                                    cat
                                    (iteration #(.listObjectsV2 ^AmazonS3Client
                                                                s3-client
                                                                (make-req %))
                                               :vf #(.getObjectSummaries %)
                                               :kf #(.getNextContinuationToken %))))]
  (is (s/valid? (s/coll-of ::my-data/my-data)
                filtered-data))
  #_(run! (partial utils/put-input-stream! service)
          (into []
                xf
                data))
  (a/go (a/pipeline processors
                    out
                    xf
                    in)
        (a/pipeline-blocking processors
                             out-b
                             (keep (partial utils/put-input-stream! service))
                             in-b)
        (a/go (doseq [d (conj data :end)]
                (a/>! in d)))
        (a/go-loop []
          (a/>! in-b (a/<! out))
          (recur)))
  (a/go-loop [i 0]
    (let [data (a/<!! out-b)]
      (if (= data :end)
        (a/>! end data)
        (do (println i)
            (println data))))
    (recur (inc i)))
  (a/go (when (a/<!! end)
          (a/<! (a/timeout 1000))
          (println (= (count filtered-data)
                      (count (get-outside-data))))
          (is (= (count filtered-data)
                 (count (get-outside-data))))
          (a/close! end))))
the thins happening at this block:
(a/go (when (a/<!! end)
is not giving me informations, I can't print etc.

2024-09-18T16:24:59.593459Z

why is it happening?

2024-09-18T16:31:37.850289Z

You are using !! ops inside go blocks, I recommend reading through https://www.braveclojure.com/core-async/ with particular attention to the section on blocking vs parking

2024-09-18T16:32:26.038549Z

Hmmm, actually that isn't a very good write up

2024-09-18T16:33:12.273059Z

You should never use !! ops in the dynamic extent of a go block

2024-09-18T16:47:50.500189Z

Single ! marks a "parking" operation in core.async terminology, and !! marks a blocking operation. Parking means it looks like blocking, but in fact the continuation is being attached as a callback to the channel, but !! ops actually block the thread

2024-09-18T16:50:37.124109Z

Channel callbacks (continuations of go blocks) run on a fixed size thread pool, so if you have a enough !! ops running at once on that threadpool it will just deadlock (the code to unblock the blocking ops is stuck behind it in the work queue and can't run so the blocking code cannot unblock)

2024-09-18T16:52:14.397639Z

There is a system property you can set which I think makes core.async spit out warnings if it detects you using the blocking ops in a go block, it is a little overly conservative but might be useful

2024-09-18T16:52:29.683959Z

which is?

2024-09-18T16:52:31.595249Z

๐Ÿ˜„

2024-09-18T16:52:57.548279Z

I was doing this bad stuff because I wanted to force print

2024-09-18T16:53:10.750289Z

I know that most part of the stuff I did is bad.

2024-09-18T16:54:51.188059Z

Not sure what forcing is supposed to mean in this context?

2024-09-18T17:01:09.229329Z

having a vague sense that something you did is bad is not the same thing as knowing exactly what you did that is bad, the former is useless, the latter tells you what you need to fix. when I talk about using !! in the go blocks causing thread starvation that is not idle berating, that is very likely exactly what is wrong with the code and what you need to fix to make it work (until it hits whatever the next bug is)