Hi all,
I’m attempting to integrate a RabbitMQ consumer into a Kotlin project that uses coroutines heavily, but I’m having some issues adapting the RabbitMQ calls into coroutine calls. Has anyone done anything similar to this?
I’m running into a couple of different roadblocks. If I try to turn the below code into
val deliverCallback = DeliverCallback { _: String, delivery: Delivery ->
val message = delivery.body.toString(Charset.forName("UTF-8"))
// process stuff
channel.basicAck(delivery.envelope.deliveryTag, false)
}
val cancelCallback = CancelCallback {
//
}
try {
channel.basicConsume(queueName, false, deliverCallback, cancelCallback)
} catch (e: IOException) {
//
}
into a suspending function using produce<Foo>()
suspend fun test() = withContext(Dispatchers.IO) {
return@withContext produce<Delivery> {
val deliverCallback = DeliverCallback { _: String, delivery: Delivery ->
// send(delivery) // cannot call
}
val cancelCallback = CancelCallback {
println("cancel")
}
rabbitChannel.basicConsume(queueName, false, deliverCallback, cancelCallback)
}
}
I get an error from the compiler. I’ve also tried this as a flow
, but the model didn’t fit well because I this consumer needs to run for the entirety of the application, and it also needs to react to a message as soon as its received.
Secondly, it seems that I may be able to use Channel.asyncCompletableRpc
by putting it inside of a suspendCoroutine
and calling resume in the handleAsync
, but if I used it to send a AMQP.Basic.Consume
, the default consumer would be called and I’d be back to a listener in which I can’t call send
from since it’s not suspending.
Additionally, the asyncCompletableRpc
approach causes Intellij to warn about improper blocking (side-note, would the withContext(Dispatchers.IO) {} help in practice?).
@Suppress("BlockingMethodInNonBlockingContext")
suspend fun ack(tag: Long) = withContext(Dispatchers.IO) {
return@withContext suspendCoroutine<Unit> { cont ->
rabbitChannel.asyncCompletableRpc(AMQP.Basic.Ack.Builder().deliveryTag(tag).build())
.handleAsync { _: Command?, err: Throwable? ->
if (err == null) {
cont.resumeWith(Result.success(Unit))
} else {
cont.resumeWithException(err)
}
}
}
}
Are there any other ways to wrap the RabbitMQ client into a coroutines framework? Is a Producer
the right approach here? Or is there a better approach in reactive streams / reactor? Thanks in advance.