Having difficulty adapting the RabbitMQ client to coroutines

Hi all,

I’m attempting to integrate a RabbitMQ consumer into a Kotlin project that uses coroutines heavily, but I’m having some issues adapting the RabbitMQ calls into coroutine calls. Has anyone done anything similar to this?

I’m running into a couple of different roadblocks. If I try to turn the below code into

val deliverCallback = DeliverCallback { _: String, delivery: Delivery ->
    val message = delivery.body.toString(Charset.forName("UTF-8"))
    // process stuff
    channel.basicAck(delivery.envelope.deliveryTag, false)
}

val cancelCallback = CancelCallback {
    //
}

try {
    channel.basicConsume(queueName, false, deliverCallback, cancelCallback)
} catch (e: IOException) {
    //
}

into a suspending function using produce<Foo>()

suspend fun test() = withContext(Dispatchers.IO) {
    return@withContext produce<Delivery> {
        val deliverCallback = DeliverCallback { _: String, delivery: Delivery ->
            // send(delivery) // cannot call
        }

        val cancelCallback = CancelCallback {
            println("cancel")
        }

        rabbitChannel.basicConsume(queueName, false, deliverCallback, cancelCallback)
    }
}

I get an error from the compiler. I’ve also tried this as a flow, but the model didn’t fit well because I this consumer needs to run for the entirety of the application, and it also needs to react to a message as soon as its received.

Secondly, it seems that I may be able to use Channel.asyncCompletableRpc by putting it inside of a suspendCoroutine and calling resume in the handleAsync, but if I used it to send a AMQP.Basic.Consume, the default consumer would be called and I’d be back to a listener in which I can’t call send from since it’s not suspending.

Additionally, the asyncCompletableRpc approach causes Intellij to warn about improper blocking (side-note, would the withContext(Dispatchers.IO) {} help in practice?).

@Suppress("BlockingMethodInNonBlockingContext")
suspend fun ack(tag: Long) = withContext(Dispatchers.IO) {
    return@withContext suspendCoroutine<Unit> { cont ->
        rabbitChannel.asyncCompletableRpc(AMQP.Basic.Ack.Builder().deliveryTag(tag).build())
            .handleAsync { _: Command?, err: Throwable? ->
                if (err == null) {
                    cont.resumeWith(Result.success(Unit))
                } else {
                    cont.resumeWithException(err)
                }
        }
    }
}

Are there any other ways to wrap the RabbitMQ client into a coroutines framework? Is a Producer the right approach here? Or is there a better approach in reactive streams / reactor? Thanks in advance.

I think there are several problems here:

  1. You can not call send in the DeliveryCallback, because send is a suspend function, but the handle method in the definition of the DeliveryCallback interface is not. So the compiler is right to complain, because the context from which your callback gets called might be (and in fact really is in this case) just a regular (non-coroutine) thread.
  2. I never worked with the plain RabbitMq client. But doing a quick google search it is my understanding that the basicConsume call does not block. So pay extra care here, because according to the docs produce will “close the channel when the coroutine completes” produce

Regarding 1.: Take a look at callbackFlow (callbackFlow) maybe this is for you. If you want to stay with produce, it may help to take a look at callbackFlow anyways. The particular problem you have there was solved in callbackFlow by simply using runBlocking here (see method trySendBlocking kotlinx.coroutines/Channels.kt at master · Kotlin/kotlinx.coroutines · GitHub).
So this is likely a valid use case for runBlocking. And in fact since trySendBlocking is just an extenstion on SendChannel you can just use it right there instead of send.
Regarding 2: awaitClose (awaitClose) might do the trick.

Your second problem looks to me like it has a similar cause.

withContext { produce { ... } } - ohh, this is actually a deadlock :smiley: In Kotlin we usually use suspending functions for synchronous execution and if a function needs to execute a task in the background, it is usually a non-suspending function accepting a CoroutineScope. Now look at produce():

fun <E> CoroutineScope.produce(...)

It is the latter, it only creates a producer and immediately returns. Now, your function is suspend which means it doesn’t launch anything in the background, it waits for its children to finish. That means test() won’t return until the producer finishes, but nobody consumes elements from the producer before test() could finish.

Instead, make your function an extension on CoroutineScope, don’t use withContext() and additionally don’t use Dispatchers.IO, because you don’t need it here:

fun CoroutineScope.test() = produce<Delivery> { ... }

As @qwert explained, this is because send() is a suspending function. And it is suspending only because the channel/buffer may be full. So you need to decide what should happen in such a case. If you want to block, waiting for the consumer of the channel - use trySendBlock(). If you want to reject the item - use trySend(). If you want to throw an exception - also trySend().

In any case you may like to provide capacity argument to produce(). By default it doesn’t create any buffer, so it accepts new items only if there is a consumer waiting for items.

I’m not sure what do you mean here. No matter if you use Channel or Flow, there has to be a consumer (or consumers) running all the time. Also, in both cases, events are not processed immediately, but wait for the consumer to be available. If you need to guarantee that events are processed immediately then I think neither channels nor flows will work here. You would need to accept a callback to test().

But if you don’t need it to be immediate, then I also encourage you to use callbackFlow(). Flows are usually easier to use than channels.

OK, after thinking about it a little, I’m not sure regarding channel vs flow. Flows are more about observing for changes or some data. Channels are for distributing of events or units of work between multiple producers and consumers. So actually, channel could be a better fit here.

Regarding the second part.

Kotlin already provides adapter for suspending on CompletableFuture, so you can just do:

suspend fun ack(...) = rabbitChannel.asyncCompletableRpc(...).await()

I guess this is because asyncCompletableRpc() may throw IOException, so IntelliJ assumed it is blocking. Its name and docs clearly say it is asynchronous, so Dispatchers.IO, should not be needed.

But… I must say docs of RabbitMQ are confusing to me here. It says: “Asynchronously send a method over this channel.” and that it throws IOException if there is “Problem transmitting method.”. This is ridiculous, it can’t be both. I would assume it performs only some non-blocking check initially (which may result in IOException) and then it performs sending asynchronously. In this case Dispatchers.IO is not needed, but it is really unclear to me ¯\_(ツ)_/¯

edit: Or it sends the data synchronously, by blocking and then only receives ack asynchronously. Then it would probably require Dispatchers.IO. But this is contrary to what docs say.

I’m not sure what do you mean here. What is the problem?