I had some memory issues when implementing a fan-out channel consumer that is used elsewhere in the system to store objects in S3. It uses the AWS SDKv2 async http client build on Netty. Here is a simplified version:
@Singleton
class Sample(private val s3Client: S3AsyncClient) {
private val job = SupervisorJob()
private val scope = CoroutineScope(job + Dispatchers.IO)
private val channel = Channel<Pair<String, String>>(4096)
init {
repeat(16) {
scope.processChannel()
}
}
fun storeMessage(key: String, message: String) = channel.offer(key to message)
private fun CoroutineScope.processChannel() = launch {
for ((key, message) in channel) {
try {
// returns CompletableFuture<PutObjectResponse>
s3Client.putObject(
PutObjectRequest.builder()
.bucket("myBucket")
.key(key)
.build(),
AsyncRequestBody.fromString(message)
).await() // coroutines-jdk8 extension
} catch (t: Throwable) {
// handle it
}
}
}
}
What we found was that memory usage climbed slowly but steadily until the container RAM limit was hit, whereupon performance degraded significantly. Pofiling with visualvm revealed this:
Sad times! Those objects should definitely not be hanging around for long.
Eventually I had the idea to wrap the operation in a child scope:
private fun CoroutineScope.processChannel() = launch {
for ((key, message) in channel) {
try {
coroutineScope {
s3Client.putObject(
// ...
).await()
}
} catch (t: Throwable) { }
}
}
Thankfully, this resolved the problem (as far as I can tell!). A coworker also suggested using asDeferred
as an alternative (which seems to function exactly the same):
private fun CoroutineScope.processChannel() = launch {
for ((key, message) in channel) {
try {
s3Client.putObject(
// ...
).asDeferred().join()
} catch (t: Throwable) { }
}
}
Question 1: was the memory problem a coroutines bug or was I simply not using the library correctly?
Question 2: is there any practical difference between the coroutineScope / deferred versions of the functions listed above? Which would you consider is better?