Must-go-fast: Sequences, Flows, Bounded Channels?

#1

I have what I thought was simple, but I’m finding out is very complex (thank you for your patience!)

I’d like an app to complete as fast as possible, without running out of memory. It takes in a video file, decodes the frames (thank you ffmpeg!), averages the frames, and writes them back out.

fun main() {
    MyImage.videoFileToFrames(sourceFileName = "molt.mp4")
        .chunked(30)
        .map { it.median() }
        .framesToVideoFile("molt_30x.mp4")
}

Yay Kotlin being so fun: Sequences make this VERY easy.

BUT - I’m not doing things very efficiently, the “median” step take a lot of CPU time and isn’t spread across my desktop’s cores.

So I tried making them use “async { it.median() }” which of course blew up my memory as everything tried to load at once. I needed a pipe that wasn’t infinitely wide…

So I tried sending the async calls through a Channel<Deferred>(NUM_CORES). Which stalls out, because I’m not sure if I should have

  • main() = runBlocking(Dispatchers.IO) {...
  • async(Dispatchers.IO) { ...
  • runBlocking(Dispatchers.IO) { it.await() }

Then Flows came along, which sounded great, but not sure how (or IF) I should make use of them here…

What I really want: Some way to convert that initial code chunk to a way that makes use of all cores (as reasonable), but doesn’t blow up with an out of memory error…

#2

The basic way to do parallel collection evaluation via coroutunes is to do this:

coroutineScope{
  yourData
    .map{async(Dispatchers.IO){doYourWork(it)} }
    .map{it.await()}
}

This way your first create a number of deferred results on different cores, then collect them.

We are currently discussing parallel processing with flows, but there is not out-of-box support yet.

Another thing is that if you are working on JVM (not MPP), then simple old Java parallel stream could serve you better than coroutines. It was designed for simple parallel processing.

1 Like
#3

I started with that way, but it blew out the memory, because I couldn’t figure out an easy way to say “limit to a max of myNumCores async jobs pending at any given time” - which is why I started trying to muck around with capacity constrained Channels to get that sort of max-at-once limiter.

If there is an easier way to bake that into a sequence - wonderful!

#4

I think that basic Java 8 stream().parallel() does exactly what you need. Of course, in this case you need to organize your data acquisition as stream as well. The conversion between streams and sequences is performed by stdlib-jdk8 functions.

#5

Wouldn’t Dispatchers.Default, rather than Dispatchers.IO do the trick?

#6

You should not use Default for blocking tasks. And this is obviously the case.

#7

Ok, only it seemed that the OP wanted to maximise core usage for something both CPU and memory intensive. It was my understanding of the docs that “cpu intensive” was what Default dispatcher is for.

I do admit to being relatively new to coroutines, however.

#8

The use of CPU is more or less the same. IO could create additional threads for new tasks though. The Default is not recommended for blocking task because you can accidentally block the whole coroutine framework. The correct way is to create a separate thread pool with number of threads equals the number of effective cores.

#9

ok, so I create the pool with the # of cores. Maybe I’m misunderstanding Sequences - can’t it still race ahead if not constrained?

I got the following working, but it felt like a hack bridging Sequences and Blocking Channels. If I was sure that the final “record” step would always stay ahead of the game then sure, no worries. But if reading is medium-fast, intermediate steps are (might be fast might be VERY slow), and writing is kinda-slow, then I think I need to be careful and have something like the capacity bound blocking channel.

But, as you noticed - it feels kludgy.

runBlocking(Dispatchers.IO) {
    // 12 = cores, could be done dynamically.
    val rc: ReceiveChannel<Deferred<UByteArrayImage>> = produce(capacity = 12) { 
        UByteArrayImage.videoFileToFrames(sourceFileName = "molt.mp4")
            .chunked(30)
            .forEach {
                send(async { it.median() })
            }
        LOG.info { "Done with read. " }
    }

    rc.asSequence().map {
        runBlocking { it.await() }
    }.framesToVideoFile("out.mp4")
    LOG.info { "Done with recording." }
}
#10

You should read a bit more about coroutines handling. You should never call runBlocking inside the coroutine.

#11

I thought I had, drat. The compiler said it was outside of a coroutine body without it.

@elizarov - I saw your most recent post of Kotlin Flow mentioned backpressure. Is that this problem, or am I barking up the wrong tree thinking that Flow will solve this exact issue?

#12

That is different problem. What you want is to do parallel map, but to limit its concurrency at the same time (so it will not run out of memory) and we don’t yet have nice primitives for that available in our libraries. As was pointed out, Java parallel streams can do that.

2 Likes
#13

A. Very clear answer, I’ll try parallel streams. (Tried them. Maybe incorrectly) . It did limit the processing, but was harder to do all the fun sequence things like “chunked”

B. Drat!! I was really hoping back-pressure was what I was looking for. Oh well. Thank you for checking!

#14

I found https://github.com/Kotlin/kotlinx.coroutines/issues/1147 and think I am re-debating the same need. Moving this conversation to there. Thank you for the feedback -and I look forward to whatever Kotlin eng decides the canonical solution should be!

#15

The thread you are referencing is about multi-platform. If you are targeting JVM, Streams already provide all what you need. It is really hard to do better.

#16

Sounds reasonable. Thank you!

#17

it is not a different problem. To limit concurrency, we need to limit the size of the input queue of the thread pool, and to limit the size of queue we can employ backpressure.