Creating a channel from exhaustible source

Hi,

Suppose I have some exhaustible source (such as an SQS queue) from which I want to create a channel. What would be the correct strategy to implement such thing?

My draft implementation looks like this:

fun sqsChannel(queueUrl: String, interval: Long, sqs: AmazonSQSAsyncClient): ReceiveChannel<String> =
    produce {
        while (true) {
            val result = sqs.receiveMessage(queueUrl) // assume receiveMessage() is a suspend function
            if (result.messages.isEmpty()) {
                delay(interval)
            } else {
                result.messages.forEach { send(it.body) }
            }
        }
    }

Is the implementation correct? (i.e. won’t cause deadlocks, will propagate backpressure, etc.)

Thanks

I don’t see any reason why this would cause a deadlock in your current code.

the channel (here without buffer… maybe you should add capacity to your produce) is handling back pressure (send will potentially suspend current coroutine).

but something looks bad to me : delay(interval). you are actually polling an asyncClient.
I don’t know anything about AmazonSQSAsyncClient but maybe you should use an async method combined with suspend-coroutine to let AmazonSQSAsyncClient handle the async stuff.

to go further, internally AmazonSQSAsyncClient is using an executorService of blocking thread: this is not really that good… so if you feel adventurous, you can reimplement a real async client.

1 Like

Thanks for the helpful response.

The code I provided was deliberately simplified. receiveMessage() is actually wrapped in a method that runs the call on a separate worker thread pool and returns a coroutine that doesn’t block the current thread pool.

As for implementing an async client, it’s already a work in progress.