Fork me on GitHub
#core-async
<
2018-03-02
>
mbjarland21:03:56

anything unholy about using core.async for stuff like this:

(defn trace-stream [prefix stream c]
	(let [lines (line-seq (BufferedReader. (InputStreamReader. stream)))]
		(go
			(doseq [line lines]
				(>! c (str prefix line))))))

(defn trace-process [p]
	(let [c (chan 1000 (keep println))]
		(trace-stream "out> " (.getInputStream p) c)
		(trace-stream "err> " (.getErrorStream p) c)))

(<!! (trace-process (.start (ProcessBuilder. ["ls" "-la"])))) ; to out>
(<!! (trace-process (.start (ProcessBuilder. ["ls" "-0bogus"])))) ; to err>
? Not sure if there are better ways of doing this in clojure but I wanted a) custom handling of out/err for the process b) no intra-line interleaving of output between out/err

hiredman21:03:10

you are doing io in a go block

hiredman21:03:31

in that line-seq is lazy, you do io to realize it, and you are realizing it in a go block

hiredman21:03:56

you should definitely put a with-open around that buffered reader

hiredman21:03:07

and then put that whole thing in async/thread

mbjarland21:03:42

duh with-open I should have seen…what’s evil about io in go blocks?

hiredman21:03:24

imagine there is an event loop like javascript has, and go blocks are all callbacks being executed there

hiredman21:03:49

if you do anything that blocks in that event loop, it stalls everything

mbjarland21:03:55

ah so the go block becomes long running, think I’m catching on

mbjarland21:03:55

ok thanks for pointing me in the right direction - had a feeling there was something rotten there

hiredman21:03:13

you know there is a method on processbuilder to have it redirect ls from the process to the jvms io streams

noisesmith21:03:18

@mbjarland: also that doseq can be replaced by onto-chan

hiredman21:03:02

(if you just want to see the io from the process)

mbjarland21:03:26

@hiredman, yes I know but I want to be able to prefix lines to know which process and stream this is coming from

mbjarland21:03:41

to distinguish this output from other stuff in the logs

noisesmith21:03:43

@mbjarland: also, even if you don't switch the doseq out for onto-chan, closing the channel when done tends to be a good pattern

hiredman21:03:53

and you aren't consuming from c, so it will fill up after 1000 lines and block

hiredman21:03:24

I think you would be better off spinning on a thread per stream and having it read and print out

mbjarland21:03:56

@hiredman but then you get intra-line interleaving

mbjarland21:03:32

i.e. two readable lines get garbled into an unholy union of unreadability

mbjarland21:03:05

well…is that really better at that point?

hiredman21:03:10

I sort of recall if you print a single string it will be printed atomically

mbjarland21:03:51

I tried it, wasn’t printed atomically in my somewhat convoluted sample code at least…I can re-check

mbjarland21:03:54

@hiredman as for consuming, doesn’t the (keep println) consume everything fairly fast?

hiredman21:03:06

transducers do not consume

hiredman21:03:21

they transform data as it passes through

mbjarland21:03:30

but filter throws things away

hiredman21:03:45

that is not consuming

noisesmith21:03:48

as a transducer, it doesnt' drive consumption

hiredman21:03:59

that is throwing things away as they move through

mbjarland21:03:10

ok, my noobness in core.async is starting to hurt…perhaps I should go rtfm

mbjarland21:03:01

hmm…so somebody would still have to pull on the channel even though nothing will actually get let through?

noisesmith21:03:03

@hiredman: won't a transducer that drops inputs keep the channel from backing up though?

hiredman21:03:53

ugh, it would, but that is a gross hack

noisesmith21:03:34

especially considering all it takes to be correct is try to take a single value off the channel

hiredman21:03:44

(defn trace-stream [prefix stream]
  (async/thread
    (with-open [rdr (BufferedReader. (InputStreamReader. stream))]
      (doseq [line (line-seq rdr)]
        (println prefix line)))))

(defn trace-process [p]
  (async/into [] [(trace-stream "out> " (.getInputStream p))
                  (trace-stream "err> " (.getErrorStream p))]))

(<!! (trace-process (.start (ProcessBuilder. ["ls" "-la"])))) ; to out>
(<!! (trace-process (.start (ProcessBuilder. ["ls" "-0bogus"])))) ; to err>

mbjarland21:03:28

@hiredman but that still leaves us with interleaving right? (assuming that is actually a problem)

hiredman21:03:46

(defn trace-stream [prefix stream]
  (async/thread
    (with-open [rdr (BufferedReader. (InputStreamReader. stream))]
      (doseq [line (line-seq rdr)]
        (locking #'*out*
          (println prefix line))))))

(defn trace-process [p]
  (async/into [] [(trace-stream "out> " (.getInputStream p))
                  (trace-stream "err> " (.getErrorStream p))]))

(<!! (trace-process (.start (ProcessBuilder. ["ls" "-la"])))) ; to out>
(<!! (trace-process (.start (ProcessBuilder. ["ls" "-0bogus"])))) ; to err>

hiredman21:03:30

I dunno, it has been a while, but I think if you called (println (str prefix " " line)) it would also take care of interleaving

hiredman21:03:49

I think you pass println a single string it is printed atomically

mbjarland21:03:27

(future 
	(while true
		(println "foo")))
(future 
	(while true
		(println "bar")))
produces:
foo
barfoo
foo
foo
foo

foo
foo

mbjarland21:03:50

ok, so the lock would be a better solution than using channels for this then. In the interest of learning something of the core.async fu, would you care to elaborate why?

mbjarland21:03:53

I mean aside from all the other issues my code had : P

hiredman21:03:00

if you use a lock you are less likely to do something like (keep println)

hiredman21:03:36

(defn trace-stream [prefix stream out]
  (async/thread
    (with-open [rdr (BufferedReader. (InputStreamReader. stream))]
      (doseq [line (line-seq rdr)]
        (async/>!! out (str prefix " " line))))))

(defn trace-process [p]
  (let [c (async/chan)]
    (async/into [] [(trace-stream "out> " (.getInputStream p) c)
                    (trace-stream "err> " (.getErrorStream p) c)
                    (async/thread
                      (when-let [line (async/<!! c)]
                        (println line)))])))

(<!! (trace-process (.start (ProcessBuilder. ["ls" "-la"])))) ; to out>
(<!! (trace-process (.start (ProcessBuilder. ["ls" "-0bogus"])))) ; to err>

hiredman21:03:58

hrrm, needs a loop recur around the when-let

mbjarland21:03:31

well if nothing else I found this example instructive for playing around with core.async

noisesmith21:03:09

@hiredman: any reason to use doseq instead of onto-chan and a transducer there?

hiredman21:03:19

I forgot about it

mbjarland21:03:20

will onto-chan close the channel when the seq runs out…i.e. we have two seqs and the first to finish might kill it

noisesmith21:03:28

mbarbieri: it's optional

hiredman21:03:43

that would be a good case for a transducer, transforming the data as it passes through

noisesmith21:03:56

@hiredman: cool - you know enough core.async stuff that I halfway suspected you knew some downside I was missing

hiredman21:03:58

no, I forget about those combinators

mbjarland21:03:54

@hiredman how does the async/into call in the above work, isn’t the second param supposed to be a channel?

hiredman22:03:00

ah, yeah, that should be merge