Correct layering of coroutine scopes to prevent leak

I had some memory issues when implementing a fan-out channel consumer that is used elsewhere in the system to store objects in S3. It uses the AWS SDKv2 async http client build on Netty. Here is a simplified version:

@Singleton
class Sample(private val s3Client: S3AsyncClient) {
    private val job = SupervisorJob()
    private val scope = CoroutineScope(job + Dispatchers.IO)
    private val channel = Channel<Pair<String, String>>(4096)

    init {
        repeat(16) {
            scope.processChannel()
        }
    }

    fun storeMessage(key: String, message: String) = channel.offer(key to message)

    private fun CoroutineScope.processChannel() = launch {
        for ((key, message) in channel) {
            try {
                // returns CompletableFuture<PutObjectResponse>
                s3Client.putObject(
                    PutObjectRequest.builder()
                        .bucket("myBucket")
                        .key(key)
                        .build(),
                    AsyncRequestBody.fromString(message)
                ).await()  // coroutines-jdk8 extension
            } catch (t: Throwable) {
                // handle it
            }
        }
    }
}

What we found was that memory usage climbed slowly but steadily until the container RAM limit was hit, whereupon performance degraded significantly. Pofiling with visualvm revealed this:



Sad times! Those objects should definitely not be hanging around for long.

Eventually I had the idea to wrap the operation in a child scope:

private fun CoroutineScope.processChannel() = launch {
    for ((key, message) in channel) {
        try {
            coroutineScope {
                s3Client.putObject(
                    // ...
                ).await()
            }
        } catch (t: Throwable) { }
    }
}

Thankfully, this resolved the problem (as far as I can tell!). A coworker also suggested using asDeferred as an alternative (which seems to function exactly the same):

private fun CoroutineScope.processChannel() = launch {
    for ((key, message) in channel) {
        try {
            s3Client.putObject(
                // ...
            ).asDeferred().join()
        } catch (t: Throwable) { }
    }
}

Question 1: was the memory problem a coroutines bug or was I simply not using the library correctly?

Question 2: is there any practical difference between the coroutineScope / deferred versions of the functions listed above? Which would you consider is better?

Looks like a bug.

[Edit: removed incorrect reasoning for bug. I mixed up invokeOnCancellation with invokeOnCompletion. Still seems like a bug, but I’m not sure if it’s in the jdk8 extension or core coroutines library]

asDeferred creates a Job not associated with the current coroutine apparently allowing it to bypass the issue.

I’d definitely go with structured concurrency friendly solutions like coroutineScope whenever it’s an option. asDeferred().join() isn’t leveraging structured concurrency.

Be careful with any method that returns a Job but is not an extension method of CoroutineScope interface since it likely works outside of structured concurrency.

1 Like

Hi @mxkaras,
your code looks good to me.

Maybe you can use the Default Dispatcher, and you should consider that storeMessage drops messages if the channel is full.
However, this suggestions are trivial.

Regarding your issue, you should take a look to the (in)famous KT-16222.
I think that a stand alone reproducer, hopefully without any AWS service dependencies, may be very interesting.

Regarding the OOM error, your analysis can be partial.
Please consider the JVM options -XX:+ExitOnOutOfMemoryError -XX:+HeapDumpOnOutOfMemoryError and inspect the memory dump.
1Mb of ChildContinuations are not enough to justify a OOM.

Thank you both for your comments. @fvasco I need to make one clarification from my original post - we never allowed the running service to get to the point of OOM. Once the container heap limit was hit and performance degraded it was rolled back. The profiling was done by me locally, where admittedly I didn’t push it to an OOM either (will try to do that). The large amounts of ChildContinuations and PutObjectResponses stood out to me because they were not present in such high numbers when compared to a profile done using that inner coroutineScope.

I wil endeavour to get a reproducable version.

I’m having a similar issue and opened an issue on github: https://github.com/Kotlin/kotlinx.coroutines/issues/1855

1 Like