Consuming process via coroutines


#1

I want to launch external process and create a suspending function that consumes its output and waits for it to finish.

I wrote something like this:

suspend fun consumeProcess(proc: Process, outputConsumer: (String) -> Unit, errConsumer: (String) -> Unit) {
        proc.inputStream.bufferedReader().useLines {
            it.forEach(outputConsumer)
        }

        proc.errorStream.bufferedReader().useLines {
            it.forEach(errConsumer)
        }

        proc.waitFor()
}

But it is probably not the best way to do it. waitFor is blocking so it is probably not wise to use it inside coroutine. Any recommendations? By the way, such function would be great addition to coroutine extension library.


#2

Hi @darksnake,
useLines consumes inputStream before errorStream, so if proc write to errorStream before closing inputStream then a deadlock can occur (inputStream is empty and errorStream is full).
You must consume both inputStream and errorStream at same time (probably using two dedicated thread, unfortunately InputStream exposes only a blocking API).

On JVM 9 you can await process termination using:

https://docs.oracle.com/javase/9/docs/api/java/lang/Process.html#onExit--


#3

onExit is the interesting solution. I missed it. But it requires java 9. I am still on 8 mostly due to tornadofx incompatibility with JDK 9.
I was wandering is there any other way to wait for process end without blocking the thread.


#4

My -really old- trick is to decect process termination using both inputStream and errorStream terminations.
If the process has nothing more to write then it was terminated (however always an waitFor before get the exitValue).


#5

How about that:

suspend fun consumeProcess(proc: Process, outputConsumer: (String) -> Unit, errConsumer: (String) -> Unit = outputConsumer) {
        val outReader = proc.inputStream.bufferedReader()

        val outChannel = produce{
            try{
                send(outReader.readLine())
            } catch (ex: IOException){
                close(ex)
            }
        }

        val errReader = proc.errorStream.bufferedReader()

        val errChannel = produce{
            try{
                send(errReader.readLine())
            } catch (ex: IOException){
                close(ex)
            }
        }

        launch {
            proc.waitFor()
            outChannel.consumeEach(outputConsumer)
            errChannel.consumeEach(errConsumer)
        }.join()
}

Did not test it yet.


#6

outChannel blocks a CommonPool’s thread to send only the first line, errChannel does the same.
Moreover I suspect that launch block doesn’t work as expected.


#7

Oops, forgot the loop. Will test it later.