Is there such thing as as dispatcher which “picks one worker thread from a default pool and does not jump between threads”?
The question is inspired by:
@Test
fun testDispatchers() = runBlocking {
val t = Thread.currentThread()
launch {
check(Thread.currentThread() == t)
}
launch(Dispatchers.Default) {
val tLaunch = Thread.currentThread()
launch {
// Note the thread is different, which I don't want
check(Thread.currentThread() != tLaunch)
}
}
Unit
}
The first launch will execute on the parent thread, so it’s all good. However, as soon as a coroutine is launched with a default dispatcher, subsequent launch without parameters will run on a random worker thread.
How do I make it still run on the same thread?
I suppose I need a dispatcher which would pick one worker thread from the default pool, and keep on that thread with subsequent launches.
launch(newSingleThreadContext(/* pick one from default pool */) {
val tLaunch = Thread.currentThread()
launch {
// This should hold
check(Thread.currentThread() == tLaunch)
}
}
I would obviously like to avoid creating a new thread if possible.
It is possible but somehow violates the contract of a work stealing pool. The general way to obtain single thread dispatcher is to do Executors.newSingleThreadExecutor().asCoroutineDispatcher().
It depends on the factory, but in general it creates a new thread.
You still did not explained why you need to reserve a thread inside existing pool. It looks meaningless to create a shared pool and then force it to give you the same thread.
@Test
fun testDispatchers() = runBlocking {
val t = Thread.currentThread()
launch {
check(Thread.currentThread() == t)
}
launch(Dispatchers.Default) {
val tLaunch = Thread.currentThread()
launch {
// Note the thread is different, which I don't want
check(Thread.currentThread() != tLaunch)
}
runBlocking {
launch {
// Keeps same thread
check(Thread.currentThread() == tLaunch)
}
future {
check(Thread.currentThread() == tLaunch)
}
}
}
Unit
}
What are the downsides to this approach?
The use case is pretty complex:
Our web service is migrating from servlets to Vert.x
We can’t use the event loop for all requests because MySQL is not async right now
So, some requests have to run on a “worker” thread, however they will still have to be a coroutine to achieve parallelism within one request
We use GraphQL, and GraphQL library for Java relies on futures (CompletableFuture) for async execution
We absolutely must keep a request on the same thread throughout execution because our code is not thread-safe
This would be pretty easy to achieve if we don’t have to satisfy all of the above requirements. The naive approach was to use launch (Dispatchers.Default) to process a request, and then coroutineScope.future to start a coroutine inside GraphQL data fetcher (because GraphQL expects a future), however the latter will then start the coroutine on a random thread, and essentially different reducers within the same request will run on different threads in parallel, which we must avoid.
You must never use runBlocking inside the coroutine. Especially inside default dispatcher because it will eventually block the whole pool. The feature you are searching for is called context switch and achieved by withContext function or additional CoroutineContext launch argument. Like this:
val singleDispatcher = Executors.newSigleThreadExecutor().asCoroutineDispatcher()
// ...
launch(Dispatchers.Default){
// do something multithreaded...
val res = withContext(singleDispatcher){
// do somthething on a single thread...
}//return to Default
}
I also fail to understand how runBlocking will hurt. At worst, we won’t be able to process more than N requests in parallel, but that’s not too bad. But, I’m not even sure this is the case, since runBlocking is interruptable.
You can have a single additional context for all of your requests so some operations are executed strictly sequentially. It strongly depends on the problem you need to solve.
You are not allowed to use runBlocking inside the coroutine because it will block the thread. Eventually you will block all threads in Default and break it.
You can also create a regular thread pool and execute runBlocking in your thread pool tasks.
If you use the solution from @mtimmerm, don’t forget to move the blocking calls out to a more appropriate dispatcher like Dispatchers.IO using withContext.
Thank you for your answer @mtimmerm. The question that you asked on the forum hits the nail on the head.
I checked your SO answer, but at first glance I think the solution with your withSerialContext is opposite of what I need. If I understand correctly (haven’t tested it), withSerialContext will start a task on a worker thread, and if additional coroutines are launched inside it, they will run serially (but the thread on which they run is not guaranteed). Is my understanding correct?
Note that even though it’s one thread at a time, there is no guarantee that it will be the same thread for the whole operation.
EDIT: I think I misunderstood the wording a bit. So it looks like SerialContextDispatcher is exactly what I need Even though it’s not the same thread, you’re right that my principal requirement is to prevent multi-threading, and the exact thread doesn’t matter.
How well has it been tested? Is it being used in production?
If I understand correctly, the first suggestion follows my approach with runBlocking but the difference is that it won’t use thread pool on which Dispatchers.Default operates, but my own. But it would still suffer from parallelism which is limited by a number of threads. Am I correct?
For the second suggestion, I haven’t considered that but we should definitely test and see if it improves the performance of our service. Thanks!
The version I posted in SO has not been tested and is not used in production. There is a better production that is very well tested, but my employer owns it. There is an Apache-licensed version that is supposed to be essentially the same that I have in an upcoming open source project. I wrote it without reference to the original, though, so it’s not really tested either.
If you want to use it, then here it is. If you’d like to test it, I’d be much obliged, and if you find any issues, I’d be happy to fix them: