Coroutine dispatcher confined to a single thread

Is there such thing as as dispatcher which “picks one worker thread from a default pool and does not jump between threads”?

The question is inspired by:

	@Test
	fun testDispatchers() = runBlocking {
		val t = Thread.currentThread()

		launch {
			check(Thread.currentThread() == t)
		}

		launch(Dispatchers.Default) {
			val tLaunch = Thread.currentThread()

			launch {
				// Note the thread is different, which I don't want
				check(Thread.currentThread() != tLaunch)
			}
		}

		Unit
	}

The first launch will execute on the parent thread, so it’s all good. However, as soon as a coroutine is launched with a default dispatcher, subsequent launch without parameters will run on a random worker thread.

How do I make it still run on the same thread?

I suppose I need a dispatcher which would pick one worker thread from the default pool, and keep on that thread with subsequent launches.

		launch(newSingleThreadContext(/* pick one from default pool */) {
			val tLaunch = Thread.currentThread()

			launch {
				// This should hold
				check(Thread.currentThread() == tLaunch)
			}
		}

I would obviously like to avoid creating a new thread if possible.

I believe coroutines are not designed to “pick a thread and stick with it”. Indeed they have been designed to be adaptable!

May I ask why a new dispatcher is not ok? Also, what stops you from changing thread?

Adaptable as in adaptable for different use cases?

There is newSingleThreadContext which does stick with a thread, however the problem with it is that it creates a new thread, which I want to avoid.

Indeed, even a default is to stick with a single thread. It’s just once a dispatcher is changed, this behavior cannot be replicated.

It is possible but somehow violates the contract of a work stealing pool. The general way to obtain single thread dispatcher is to do Executors.newSingleThreadExecutor().asCoroutineDispatcher().

Well expose you case, we can discuss it and find a a solution that better fit how coroutines works!

Maybe if it was possible to say minus on the CoroutineContext, then I could remove the default dispatcher. Alas, coroutine contexts are only additive.

But am I wrong this is strange?

  • If there is no dispatcher, it will be confined to a single thread.
  • As soon as a dispatcher is added to the context, it cannot be removed, so the default behavior cannot be replicated.

Seems illogical to me. Perhaps if there was Dispatchers.ThisThread?

I have found: https://github.com/Kotlin/kotlinx.coroutines/issues/261 which, if implemented, would allow me to do what I want.

Also: Add "virtual single thread context"

1 Like

Does Executors.newSingleThreadExecutor() get a thread from some common pool?

It depends on the factory, but in general it creates a new thread.

You still did not explained why you need to reserve a thread inside existing pool. It looks meaningless to create a shared pool and then force it to give you the same thread.

Thank you for your answer.

I found that runBlocking inside launch works:

	@Test
	fun testDispatchers() = runBlocking {
		val t = Thread.currentThread()

		launch {
			check(Thread.currentThread() == t)
		}

		launch(Dispatchers.Default) {
			val tLaunch = Thread.currentThread()

			launch {
				// Note the thread is different, which I don't want
				check(Thread.currentThread() != tLaunch)
			}

			runBlocking {
				launch {
					// Keeps same thread
					check(Thread.currentThread() == tLaunch)
				}

				future {
					check(Thread.currentThread() == tLaunch)
				}
			}
		}

		Unit
	}

What are the downsides to this approach?

The use case is pretty complex:

  • Our web service is migrating from servlets to Vert.x
  • We can’t use the event loop for all requests because MySQL is not async right now
  • So, some requests have to run on a “worker” thread, however they will still have to be a coroutine to achieve parallelism within one request
  • We use GraphQL, and GraphQL library for Java relies on futures (CompletableFuture) for async execution
  • We absolutely must keep a request on the same thread throughout execution because our code is not thread-safe

This would be pretty easy to achieve if we don’t have to satisfy all of the above requirements. The naive approach was to use launch (Dispatchers.Default) to process a request, and then coroutineScope.future to start a coroutine inside GraphQL data fetcher (because GraphQL expects a future), however the latter will then start the coroutine on a random thread, and essentially different reducers within the same request will run on different threads in parallel, which we must avoid.

Let me know if you have any ideas.

You must never use runBlocking inside the coroutine. Especially inside default dispatcher because it will eventually block the whole pool. The feature you are searching for is called context switch and achieved by withContext function or additional CoroutineContext launch argument. Like this:

val singleDispatcher = Executors.newSigleThreadExecutor().asCoroutineDispatcher()
// ...
launch(Dispatchers.Default){
  // do something multithreaded...
  val res = withContext(singleDispatcher){
    // do somthething on a single thread...
  }//return to Default
}

Won’t that create a new thread for each request?

I also fail to understand how runBlocking will hurt. At worst, we won’t be able to process more than N requests in parallel, but that’s not too bad. But, I’m not even sure this is the case, since runBlocking is interruptable.

Take a look at this unit test:

	@Test
	fun testRunBlocking() = runBlocking {
		coroutineScope {
			val f = async {
				println("f1 ${Thread.currentThread()}")
				// Does not block g
				runBlocking {
					println("fb ${Thread.currentThread()}")
					delay(1000)
				}
				println("f2 ${Thread.currentThread()}")
			}

			val g = async {
				println("g1 ${Thread.currentThread()}")
				delay(100)
				println("g2 ${Thread.currentThread()}")
				delay(100)
				println("g3 ${Thread.currentThread()}")
			}

			awaitAll(f, g)
		}

		Unit
	}

runBlocking inside f does not block g, it can proceed execution. What am I missing?

You can have a single additional context for all of your requests so some operations are executed strictly sequentially. It strongly depends on the problem you need to solve.

You are not allowed to use runBlocking inside the coroutine because it will block the thread. Eventually you will block all threads in Default and break it.

How about you share some code instead of repeating the same premise? I don’t see any proof for such a claim.

OK, you’re right sir. It will “break” at 8 on my machine:

	@Test
	fun testDefaultDispatcher() = runBlocking {
		val start = System.currentTimeMillis()

		coroutineScope {
			repeat(9) {
				launch(Dispatchers.Default) {
					runBlocking {
						delay(10_000)
					}
				}
			}
		}

		println(System.currentTimeMillis() - start)

		Unit
	}

This will take 10s up to 9, and then for 9 will take 20s. So, looks like we won’t be able to serve more than 8 requests at a time.

I don’t really need anything to run sequentially. What I need:

  • to be able to start multiple additional coroutines from a single request, which is already a coroutine
  • to prevent them from being started on different threads

That’s it.

This isn’t what you asked for, but I bet it’s what you need: How to prevent unintentional multithreading

1 Like

You can also create a regular thread pool and execute runBlocking in your thread pool tasks.

If you use the solution from @mtimmerm, don’t forget to move the blocking calls out to a more appropriate dispatcher like Dispatchers.IO using withContext.

Thank you for your answer @mtimmerm. The question that you asked on the forum hits the nail on the head.

I checked your SO answer, but at first glance I think the solution with your withSerialContext is opposite of what I need. If I understand correctly (haven’t tested it), withSerialContext will start a task on a worker thread, and if additional coroutines are launched inside it, they will run serially (but the thread on which they run is not guaranteed). Is my understanding correct?

Note that even though it’s one thread at a time, there is no guarantee that it will be the same thread for the whole operation.

EDIT: I think I misunderstood the wording a bit. So it looks like SerialContextDispatcher is exactly what I need :smiling_face_with_three_hearts: Even though it’s not the same thread, you’re right that my principal requirement is to prevent multi-threading, and the exact thread doesn’t matter.

How well has it been tested? Is it being used in production?

Thank you for your answer @nickallendev!

If I understand correctly, the first suggestion follows my approach with runBlocking but the difference is that it won’t use thread pool on which Dispatchers.Default operates, but my own. But it would still suffer from parallelism which is limited by a number of threads. Am I correct?

For the second suggestion, I haven’t considered that but we should definitely test and see if it improves the performance of our service. Thanks!

The version I posted in SO has not been tested and is not used in production. There is a better production that is very well tested, but my employer owns it. There is an Apache-licensed version that is supposed to be essentially the same that I have in an upcoming open source project. I wrote it without reference to the original, though, so it’s not really tested either.

If you want to use it, then here it is. If you’d like to test it, I’d be much obliged, and if you find any issues, I’d be happy to fix them: