runBlocking { // doesn't work, blocks, and stops 1b from executing until complete
launch(requestDispatcher + CoroutineName("request-producer")) {
GlobalScope.launch(requestDispatcher + CoroutineName(“request-consumer”)) { // works fine
Channel 2 capacity is unlimited. Clearly, I don’t want any operations on channel 2 to block the execution thread, but I couldn’t find a way to do it other than using GlobalScope.launch. Why using my custom dispatcher with launch still blocks?
This function should not be used from a coroutine.
The dispatcher is just a thread pool of loops running the tasks posted to it (the started or resumed coroutines). runBlocking blocks the thread, so that for loop can’t continue until runBlocking finishes.
If you are in a suspending context, you can call coroutineScope to suspend until a launched coroutines have finished. If you are in a non-suspending context (like a callback API), then you probably want to just launch. If you do need to block the callback functions for some suspending code, then runBlocking is appropriate but those callbacks should not be running in a CoroutineDispatcher.
I don’t want to block the current thread from either coroutine scope or blocking callback when it’s sending to or receiving from channel 2.
Here’s the code in question, with references to the pointers in my original question; it does weird things that I’m trying to debug, like blocking the second time it’s executed.
override suspend fun exchange(request: Request): Response {
// blocking callback, #2 in original post
val responseHandler = ResponseHandler(request, requestDispatcher, requestChannel, responseChannel)
val requestObserver = stub.service(responseHandler)
if (request.url == null || request.url!!.isBlank()) {
requestChannel.send(...)
} else {
requestChannel.send(...)
}
// don't block, EVER, #1a in original post
val job = coroutineScope {
launch (requestDispatcher) {
// val job = GlobalScope.launch(requestDispatcher + CoroutineName("request-consumer")) {
LOG.debug("Entering request-consumer")
withTimeout(...) {
for (req in requestChannel) {
requestObserver.onNext(req)
}
}
LOG.debug("Done request-consumer")
}
}
// block until timeout, #1b in original post
return withContext(CoroutineName("response-consumer")) {
LOG.debug("Entering response-consumer")
withTimeout(...) {
responseChannel.receive()
}
.also {
LOG.debug("Done response-consumer")
}
}
}
coroutinScope waits for all its children to finish in addition to the lambda so your code really just is all running sequentially.
Usually producers and consumers are in separate coroutines. I’m guessing you want the initial send and “request-consumer” and “response-consumer” to run in parallel in different coroutines. If so, put them all inside coroutineScope, each in there own launch. This way they will all run in parallel, and then coroutineScope will wait for all of them.
The idea is for all three to launch at the same time, but then wait for response-consumer until success or timeout. I don’t care about request-consumer, it can do whatever it can within the given time, then get cancelled by the timeout. Is that possible?
Your method does not wait for all its launched work to finish cleaning up. Using the coroutineScope method (with lowercase “c”) which is designed for this use case, will fix that. You can wrap the whole thing in a withTimeout.
What is the purpose of requestChannel? How is this loop any different than just calling requestObserver.onNext(initialRequest). Seems like there’s only ever the one request.
What is the purpose of responseChannel? It seems like it only ever has one response (I didn’t notice it’s not in a for loop before). Can’t you just return the response directly from onNext?
If you are suspending for the received item, why call async? Why not just:
Really, I don’t understand why you are using parallel coroutines or Channel at all. Seems like this code is just sending in one request and getting back one response. I think a regular suspend method would be all you need.
No, what’s happening here is that the gRPC handler (responseHandler in the code shown) may put more requests in the channel, that then need to be parsed and forwarded further.
Same as the requestChannel, but the idea is to stop waiting as soon as we get the first response, cancel all pending requests in requestChannel, and return.
Basically, the channels are a means of communication between this class, and gRPC handler code running on a different thread pool.
The trouble is that this code all looks sequential:
“request-consumer” won’t run until “request-producer” finishes.
“request-consumer” is just a for-loop so it appears to processes all it’s requests sequentially
“response-consumer” won’t do anything until the “request-consumer” has produced a result.
Because of this, there is no need for Channel (a regular queue would work fine or simply use recursion), and no need for launch/async.
Perhaps onNext is launching/posting work? If so, then the Channel still doesn’t really help, since the code adding to requestChannel could simply launch/post right there. A CompletableJob may make more sense than a Channel for the response, since there’s only one.