Why would using coroutines be slower than sequential for a big file?


#21

Actually, from what I can tell from the source code, creating a new context with default arguments is more or less a no-op in async. I still suspect that the real overhead comes from scheduling, coroutines state machinery and perhaps Job bookkeeping.


#22

With all of your help I think I figured out my issue(s): I mistakenly assumed that I wouldn’t have to deal with a bunch of stuff because coroutines would allocate resources. They don’t (they do exactly what I ask them to do) so stuff gets bogged down.

Detailed version:

No matter what every buffer between steps ends up banging against empty or full.

  • In the case where file-reading is too slow (not enough resources, or too much context switching), everything downstream is slowed down. (of course)
  • But if reading the file to strings is too fast, then you blow out the memory usage as the reader runs ahead of any consumers, and GC slows everything down.
  • And the same goes for each subsequent processing step - either a step doesn’t have enough resources and the inbox is filling up, or it has too much and it is sitting around twiddling it’s thumbs and processing each item as it arrives, eliminating any chance of batch efficiency.
  • Or in the case where everything gets evaluated in a lazy manner because of Sequence, everything goes in the same destination, and there isn’t any sort of incremental reduce part of the map-reduce plan.

That plus some potentially dumb data structure choices on my part (is a Set.find on a 50m entry Set as fast as a Map.get? it should be, because both use the same sort of Hash implementation, right?) made it all go very slowly.

VERY Wishful thinking: something like http://www.sosp.org/2001/papers/welsh.pdf but wrapped in Kotlin Actor/Producer simplicity to create a self-balancing workflow.

val workflow = workflowNode<File, Sequence<File>> (
  outputLowerBound: 10,
  outputUpperLimit: 50,
  it:File->{ // Search a starting folder for CSVs
    File(it)
    .walk()
    .filter { it.isFile && it.toString().endsWith("csv") }
}).workflowNode<File, Sequence<String>>( 
  outputLowerBound: 50_000, // Ask for more resources if there are less than this pending in the queue.  
    // Also a good batching size hint
  outputUpperLimitMem: 2g, // Give back resources as you approach this upper bound.  
    // Also only need to start up again when you are at 2gb - outputLowerBound, to encourage batching.
  it:File-> { // File to Lines
    file.inputStream()
    .bufferedReader()
    .lineSequence()
})...

workflow.add(File("./data"))

#23

Workflow balancing is complicated. You can implement it on top of coroutines using channels, but it requires additional work. I think it is better to use Apache Spark or similar framework for that.
Also, I never tried, but I believe that rxjava allows some margin of control using backpressure.


#24

One possible explanation is the way files are normally read. The underlying mechanism does not actually read one line from the disk at a time, but rather fills a buffer (or buffer set) that is some multiple of the minimum allocation size (data cluster - usually 4-8Kb) with a single disk read and references the “line” with a set of pointers. Disk access is a [hard] bottleneck, you cannot process the file faster than you can read through it sequentially. Reading it out of sequence will slow the process down for the following reasons (not an exhaustive list):

  • Seeking to a part of a file not in the disk cache will cause a disk read.
  • Buffers may be cleared or reallocated for every line if subsequent lines are not in the same buffer.
  • Lines are not addressable without some [additional] indexing scheme, if you want to address the 1000th line you need to process the preceding 999 (or keep an index of where each line starts).
  • when you seek to a position in a file the disk subsystem has to read the whole cluster into memory, even if you only access the last byte, if the line crosses a cluster boundary you need to read [at least] two clusters. The disk subsystem speeds up reads by trying to predict the likely next cluster and may read that into memory before it is requested, out-of-sequence requests will throw this prediction out.
  • out-of-sequence reads may cause the read head (on mechanical drives) to have to align to a new cylinder, and then back to one it was just reading.There are similar (if not as egregious) issues with SSD.
  • locking/synchronization

The recommended way to speed up your processing is to read the file sequentially and queue up each line (or set) for further processing in the background. Of course you may want to think of ways to avoid (or reduce) locks or their equivalent.

One way to think about mitigating the impact .of locks are to reduce their frequency by not queuing up one line, but by building a collection (list/array) and queuing that for processing.