How to send to/receive from channel without blocking?

1. jetty-http@497a49a8-268 @coroutine#1 - Main suspending function
    1a.  request-dispatcher-1 @request-consumer#2 - Non-blocking channel 2 consumer
    1b.  jetty-http@497a49a8-268 @response-consumer#1 - Blocking channel 1 consumer

    2.   blocking callback method call
      2a.    grpc-default-executor-1 - Blocking channel 1 producer
      2b.	 request-dispatcher-1 @request-producer#5 - Non-blocking channel 2 producer

Code for 1a.

runBlocking { // doesn't work, blocks, and stops 1b from executing until complete
    launch(requestDispatcher + CoroutineName("request-producer")) {

GlobalScope.launch(requestDispatcher + CoroutineName(“request-consumer”)) { // works fine

Channel 2 capacity is unlimited. Clearly, I don’t want any operations on channel 2 to block the execution thread, but I couldn’t find a way to do it other than using GlobalScope.launch. Why using my custom dispatcher with launch still blocks?

From the runBlocking docs:

This function should not be used from a coroutine.

The dispatcher is just a thread pool of loops running the tasks posted to it (the started or resumed coroutines). runBlocking blocks the thread, so that for loop can’t continue until runBlocking finishes.

If you are in a suspending context, you can call coroutineScope to suspend until a launched coroutines have finished. If you are in a non-suspending context (like a callback API), then you probably want to just launch. If you do need to block the callback functions for some suspending code, then runBlocking is appropriate but those callbacks should not be running in a CoroutineDispatcher.

1 Like

I don’t want to block the current thread from either coroutine scope or blocking callback when it’s sending to or receiving from channel 2.
Here’s the code in question, with references to the pointers in my original question; it does weird things that I’m trying to debug, like blocking the second time it’s executed.

override suspend fun exchange(request: Request): Response {
	// blocking callback, #2 in original post
    val responseHandler = ResponseHandler(request, requestDispatcher, requestChannel, responseChannel)
    val requestObserver = stub.service(responseHandler)

    if (request.url == null || request.url!!.isBlank()) {
        requestChannel.send(...)
    } else {
        requestChannel.send(...)
    }

    // don't block, EVER, #1a in original post
    val job = coroutineScope {
        launch (requestDispatcher) {
//        val job = GlobalScope.launch(requestDispatcher + CoroutineName("request-consumer")) {
            LOG.debug("Entering request-consumer")
            withTimeout(...) {
                for (req in requestChannel) {
                    requestObserver.onNext(req)
                }
            }
            LOG.debug("Done request-consumer")
        }
    }

    // block until timeout, #1b in original post
    return withContext(CoroutineName("response-consumer")) {
        LOG.debug("Entering response-consumer")
        withTimeout(...) {
            responseChannel.receive()
        }
            .also {
                LOG.debug("Done response-consumer")
            }
    }
}

coroutinScope waits for all its children to finish in addition to the lambda so your code really just is all running sequentially.

Usually producers and consumers are in separate coroutines. I’m guessing you want the initial send and “request-consumer” and “response-consumer” to run in parallel in different coroutines. If so, put them all inside coroutineScope, each in there own launch. This way they will all run in parallel, and then coroutineScope will wait for all of them.

1 Like

The idea is for all three to launch at the same time, but then wait for response-consumer until success or timeout. I don’t care about request-consumer, it can do whatever it can within the given time, then get cancelled by the timeout. Is that possible?

1 Like

Sounds like you want something like this:

coroutineScope {
    launch { <send> }
    val requestJob =  launch { <request-consumer>}
    launch {
        <responce-consumer>
        requestJob.cancel()
    } 

I ended up with the following:

val ioCoroutineScope = CoroutineScope(Dispatchers.IO)
...
ioCoroutineScope.launch(CoroutineName("request-producer")) {
    LOG.trace("{} is started", this.coroutineContext[CoroutineName]!!.name)
    responseHandler.requestChannel.send(initialRequest)
    LOG.trace("{} is done", this.coroutineContext[CoroutineName]!!.name)
}

ioCoroutineScope.launch(CoroutineName("request-consumer")) {
    LOG.trace("{} is started", this.coroutineContext[CoroutineName]!!.name)
    try {
        for (req in responseHandler.requestChannel) {
            requestObserver.onNext(req)
        }
    } finally {
        LOG.trace("{} is done", this.coroutineContext[CoroutineName]!!.name)
    }
}

return withContext(CoroutineName("response-consumer")) {
    LOG.trace("{} is started", this.coroutineContext[CoroutineName]!!.name)
    val deferred = async { responseHandler.responseChannel.receive() }

    try {
        withTimeout(timeout) { deferred.await() }
    } catch (e: TimeoutCancellationException) {
        LOG.error("Didn't receive a response in {}S", timeout)
        throw e
    } finally {
        LOG.trace(
            "{} is done",
            this.coroutineContext[CoroutineName]!!.name
        )
        (requestObserver as ClientCallStreamObserver<Request>)
            .cancel("Cancelled", null)
        ioCoroutineScope.cancel()
        responseHandler.closeChannels()
    }
}

Your method does not wait for all its launched work to finish cleaning up. Using the coroutineScope method (with lowercase “c”) which is designed for this use case, will fix that. You can wrap the whole thing in a withTimeout.

What is the purpose of requestChannel? How is this loop any different than just calling requestObserver.onNext(initialRequest). Seems like there’s only ever the one request.

What is the purpose of responseChannel? It seems like it only ever has one response (I didn’t notice it’s not in a for loop before). Can’t you just return the response directly from onNext?

If you are suspending for the received item, why call async? Why not just:

try {
    withTimeout(timeout) { responseHandler.responseChannel.receive() }
}

Really, I don’t understand why you are using parallel coroutines or Channel at all. Seems like this code is just sending in one request and getting back one response. I think a regular suspend method would be all you need.

No, what’s happening here is that the gRPC handler (responseHandler in the code shown) may put more requests in the channel, that then need to be parsed and forwarded further.

Same as the requestChannel, but the idea is to stop waiting as soon as we get the first response, cancel all pending requests in requestChannel, and return.

Basically, the channels are a means of communication between this class, and gRPC handler code running on a different thread pool.

The trouble is that this code all looks sequential:

  • “request-consumer” won’t run until “request-producer” finishes.
  • “request-consumer” is just a for-loop so it appears to processes all it’s requests sequentially
  • “response-consumer” won’t do anything until the “request-consumer” has produced a result.

Because of this, there is no need for Channel (a regular queue would work fine or simply use recursion), and no need for launch/async.

Perhaps onNext is launching/posting work? If so, then the Channel still doesn’t really help, since the code adding to requestChannel could simply launch/post right there. A CompletableJob may make more sense than a Channel for the response, since there’s only one.

Sorry, I did my best to explain multiple times that’s not the intent, I don’t know how else I can elaborate. Please refer to this comment.