Why would using coroutines be slower than sequential for a big file?

I have large csv files (~2gb each). I’d like to read each line, parse it, and do stuff with the parsed objects.

So initially I use a very straightforward approach which takes 21 seconds per file. Given that just counting the lines in the file takes ~4 seconds, that is 15sec for parsing the lines. Not bad.

val total = File("data").walk().filter { it.isFile && it.toString().endsWith("csv") }.take(1).map { file ->
    println("Processing file ${file}")
    file.inputStream()
            .bufferedReader()
            .lineSequence()
            .map(Sample.Companion::parse)
}.flatMap { it }
.filterNotNull()
.sumBy { it.matchCount }

But I’m thinking that I’m CPU bound, because out of my 8 cores on my old laptop, I see 1 at 100% with the java process for those 21 seconds. Enter coroutines to handle the line parsing

.lineSequence()
.map { line ->
    async {
        Sample.parse(line)
    }
}

But this makes it take foooooorreeever. (I quit it after 5+ minutes). Why would that hurt instead of help? Wouldn’t distributing the parsing among all 8 cores be a huge speedup?

1 Like

Coroutines won’t help you if you need to parse everything. async returns Deferred which you need to call later. I do not see, where you call await method. It is very useful if you want to parse only some lines, than you can just await for those when you need it. But for all data, coroutines will just add additional (yet small) overhead for creating coroutine context. Basically, for this I would recommend Stream processing API from Java 8. You can convert your sequence to stream and then use parallel() to split your operation. I think answer here has some good recommendations/

I do need to parse everything.

I thought that unless I specifically indicated “lazy” that async would start immediately, and was an easy and relatively cheap way to do parallel ops, like in this answer from the link you indicated. My hope was that coroutine overhead was relatively small, compared to getting access to all 8 cores.

You still need to call result somewhere.

I do, I have a function that loops through all the results and awaits for each one.

The subtle trap for those who want to use coroutines is that they use the common thread pool with the number of threads equal to the number of processors by default. It means that if you accidentally call say 8 blocking operations which for some reason require each other, you will cause a deadlock which is almost impossible to detect. So you need to present the whole code to localize the problem.

Absolutely. Main.kt · GitHub

I’m starting to wonder if it has something to do with Sequences. When I create a sequence and speed through the file adding up parsed lines, it is doing each one-at-a-time, so it can reuse stuff.

But when I try to do it in parallel, I’m trying to “eagerly” kick off all the async jobs with a toList, which may bloat out the memory. Or I don’t eagerly kick them off, they stay a sequence with lazy eval, and each async gets started as soon as I try to read the results, which means I’ve just eliminated any parallelism.

To parallelize parsing you should use a dedicated thread for fetch data from disk, ie:

val blockingDispatcher = ...
produce(blockingDispatcher, size = 1024) {
    file.inputStream()
            .bufferedReader()
            .lineSequence()
            .forEach { send(it) }
}

then use this channel as a base for further operations.

Otherwise for a fully non-blocking chain please refer to java.nio.channels.FileChannel or some better API.

1 Like

Aha, the problem is that you try to use lazy sequence method. The line you want to read is actually read only when you try to call await. I can’t clearly see, why it causes deadlock, but it probably does. So you need either to read all lines, collect them in list and then transform them the way you want, or use fully asynchronous solution proposed by @fvasco.

I believe you, but darnit, that goes against the hope I had for coroutines making parallel processing easier.

At the core of it, I’m surprised that there isn’t an easier (best-practice?) way to do this, I would guess that what I’m doing is very common:

  1. Iterate through the rows of a data-source that is bigger than what fits in memory.
  2. Do an action on each row that takes some non-negligible amount of CPU.
  3. Collect the results

… and be sure to balance 1-3 in such a way that all resources (cores, disk IO, etc) are used as much as possible to speed it up.

I tried using a producer with 2 threads and 10k capacity that would produce lines, then parsing each line through async, and… still slow.

fun main(args: Array<String>) {
    val totalMs = measureTimeMillis {
        val mtContext = newFixedThreadPoolContext(2, "mtPool") // explicitly define context with enough threads
        val lineProducer = produce(mtContext, capacity = 100_000) {
            File("data")
                    .walk()
                    .filter { it.isFile && it.toString().endsWith("csv") }
                    .take(1)
                    .map { file ->
                        file.inputStream()
                                .bufferedReader()
                                .lineSequence()
                    }.flatMap { it }
                    .forEach { send(it) }
            println("Finished producing lines.")
            close()
        }
        println("Started producing to lineProducer<String>")

        val deferredParseProducer = produce(capacity = 100_000) {
            for(line in lineProducer) {
                send(async{ parseLine(line)})
            }
            println("Finished producing Deferred<Sample>s.")
            close()
        }

        var total = AtomicInteger()
        runBlocking {
            for(sampleDef in deferredParseProducer) {
                val sample = sampleDef.await()
                if(sample != null) {
                    total.addAndGet(sample.count)
                }
            }
        }

        println("Total: ${total.get()}")
    }
    println("lineSequence and parse min: ${totalMs.toDouble()/(1000 * 60)}")
}

You can try to parallelize your computation over files, not lines. Like:

  val channel = Channel<Something>(Channel.UNLIMITED)
  File("data")
    .walk()
    .filter { it.isFile && it.toString().endsWith("csv") }
    .forEach{ file->
        launch{
            file.forEachLine{ channel.send(doSomethingWithLine(it)) }
        }
    }

  channel.sumBy{some reduction operation}

But coroutines are just not done for that. They do not get you performance boost for super-short operations like line splitting. What you want to do is classical map-reduce workflow. You can do them with coroutines, but you should have all the data read in channel before that. And reading is performance bottleneck in this case.

makes sense, I’ll try that!

Unless I’m missing something, that is actually incorrect. async’s body starts executing immediately by default (see the docs).

I haven’t profiled it, but my guess is that the overhead is caused by scheduling so many tasks and moving data between threads in shared memory. So the advice to parallelize based on files or big chunks, as opposed to based on lines, sounds reasonable. Still, it would be interesting to compare the overhead of parallelizing based on lines with coroutines to a similar approach without coroutines.

Indeed, but the problem is that the sequence element containing async call is executed when it is called, not when it is declared. This could cause some bizarre order-of-execution problems.

Basically, it is easy to check just by looking at processor load. Deadlock would mean near zero load.

Are you referring to this code?

    val ms5 = measureTimeMillis {
        var total = 0
        var listOfAsync = file.inputStream()
                .bufferedReader()
                .lineSequence()
                .map { line->
                    async {
                        val (ngram, _year, match_count, _, _) = line.toLowerCase().split("\t")
                         match_count.toInt()
                    }
                }.toList() // to start all the asyncs.  Maybe this slows it down?
        println("Asyncs started")
        runBlocking {
            listOfAsync.forEach {
                total += it.await()
            }
        }
        println(total)
    }
    println("lineSequence and async parse ms: $ms5")

All asyncs in the given code start executing when toList() is called. I don’t see any possible way it can deadlock.

It could, one needs to remember, that lines produced are not really sequential, they come from different files. One cant’t read a line from file before reading a previous line. So one thread one line from a file, and another one reads next line from the same file. The second thread is blocked until first one completes. It could cause a deadlock and even if it does, it does not save time really, since you still need to wait for all lines to be read in the single thread.

Again, disagreed. async here only starts executing when the corresponding line has already been read. It does not block waiting for its line.

What happens is toList reads a line, then feeds it to async, which starts executing asynchronously, while toList reads another line, feeds it to the next async, etc. So while reading the file is sequential, per-line work is parallel. And I still can’t see any opportunity for deadlock.

Maybe you are right. Lines are being read outside the async call, so it probably should not call dead lock.

Creating a coroutine context for each line could use tremendous amount of memory and processor time.