How can I use co-routines to single-thread asynchronous responses

In my code I wish to ‘single-thread’ actions that are invoked asynchronously for external sources. I do this by maintaining a queue of lambdas with a dedicated thread. I use the thread to remove lambdas from the list and execute them. When the list is empty the thread waits on a lock (with a timeout to poll the queue in case a lambda gets missed). When lambdas are added I signal the lock to wake up the dedicated thread.

This ensures that all changes (to a complex model) happen on a single thread and so no data-race conditions occur.

This works, but seems a bit cumbersome. Is there a simpler, more elegant solution using co-routines?

Would a buffered channel replace the queue? Would a consumer for that queue execute the lambdas sequentially?
Also, this code is in a library (jar). What is the best way to launch the co-routine scope?

[EDIT] I do not require that all lambdas are executed on the same thread - and in fact that requirement requires a dedicated thread - just that only one lambda is executed at a time. So 'single-threaded may be misleading. I don’t care as long as each thread sees the state that the previous thread created.[/EDIT]

Thanks for any help

public typealias SimpleBlock = () -> Unit
/**
 * Executes the given blocks in order on a single daemon thread.
 */
public class DispatchQueue(name: String) {
    public fun add(block: SimpleBlock) {
        synchronized(lock) {
            eventQueue.addLast(block)
            lock.notify()
        }
    }

    public fun stop() {
        keepProcessing = false
        synchronized(lock) {
            lock.notify()
        }
    }

    private var keepProcessing = true
    private val lock = Object()
    private val eventQueue = ArrayDeque<SimpleBlock>()
    public val thread: Thread =
        thread(name = name) {
            while (keepProcessing) {
                var block: SimpleBlock?
                do {
                    synchronized(lock) {
                        block = eventQueue.removeFirstOrNull()
                    }
                    // need to invoke the block outside the lock!
                    block?.invoke()
                } while (null != block)

                synchronized(lock) {
                    try { lock.wait(20) } catch (e: TimeoutException) { }
                }
            }
        }
}

1 Like

You can run coroutines on a single-threaded dispatcher. It seemed like it will solve your problem.

val dispatcher =  Executors.newSingleThreadExecutor().asCoroutineDispatcher()
launch(dispatcher){
  //whatever
}

This will ensure that all coroutines will work on the same thread avoiding concurrent access, but aynchronously.

2 Likes

Even with the threading approach if you use a Java ArrayBlockingQueue you won’t need a lock at all, just take elements from the queue in order. queue.take will block for you.

Thanks - I missed the blocking Queue in my search. That will help if I do not use co-routines.

Or you can use single-thread executor in Java:

val executor = Executors.newSingleThreadExecutor()
executor.submit {
    ...
}

Does the single thread executor reserve a thread from the pool? That would be as inefficient as declaring my own dedicated thread since the dispatch queue servicing runs for a long time.
Actually, I guess my initial requirement might be stated wrongly. I do not care what thread my tasks are run on, as long as only one is run at a time… I will update my question.

So far all proposed solutions require to create a dedicated thread to handle actions. I thought your main concern is to simplify your code, not to get rid of this thread :slight_smile:

But if you want to remove it then I don’t see a straightforward solution without using coroutines. It is technically possible, just not that easy. We can’t wait for new tasks by blocking, because then we require some thread to do this blocking. We would need to wait asynchronously and be careful not to run multiple tasks in parallel.

With coroutines this is pretty easy, using exactly the solution you mentioned in the first post. Just create a buffered channel and start a coroutine, e.g. using Dispatchers.Default that reads items from this channel in a loop. Yes, it will work sequentially as long as you don’t start asynchronous tasks (e.g. with launch()). It will reuse threads with other parts of your application, but won’t occupy any while waiting for new tasks.

Regarding the scope, it really depends. Probably the most universal way is to design this as some kind of a service, with clearly defined lifecycle. Then, you create CoroutineScope() and store it in a property of your service and in your cancel()/close() function you invoke cancel() on this scope. This will stop processing almost immediately. If you need to shutdown gracefully and process already queued items, you would need to do it a little differently.

OK, thanks for that @broot . My apologies to all if my question was poorly phrased/thought out.

I guess that my main motivation was to explore whether coroutines would allow more elegant code. For me that means both ‘more efficient (get rid of a dedicated thread)’ and ‘syntactically simpler’.

A buffered channel and a default dispatcher in a simple loop sounds ideal. I will play with that idea and see where I get.

Is this a correct solution, please? If so it is a lot simpler than my original solution.

Requirements:

  • Execute lambdas in the order they were added
  • Execute lambdas one at a time
  • Execute each lambda with the state that the previous lambda created (not visible here)
package coroutines

import kotlinx.coroutines.GlobalScope
import kotlinx.coroutines.channels.Channel
import kotlinx.coroutines.launch
import kotlinx.coroutines.runBlocking

typealias SimpleBlock = () -> Unit
/**
 * Executes the given blocks in order on a single daemon thread.
 */
class DispatchQueue(name: String) {
    private val channel = Channel<SimpleBlock>(Channel.BUFFERED)

    fun add(block: SimpleBlock) = runBlocking { channel.send(block) }

    fun stop() = sc.cancel()

    private val sc = GlobalScope.launch {
        while (true) { channel.receive()() }
    }
}

fun main() {
    val dq = DispatchQueue("dq")
    dq.add { println("first") }
    dq.add { println("second") }
    dq.add { println("third") }

    Thread.sleep(2000)

    dq.stop()
}

Yes, this implementation seems fine and it should meet all your requirements. There is a race condition on add() function: if multiple threads invoke this function concurrently, you don’t have guarantees which task will be added first. It may violate your first requirement, but I guess this shouldn’t be a problem. You can always synchronize this function if you need.

Note that Channel.BUFFERED does not create a channel with unlimited capacity. It has a fixed size and after filling it up, your add() will block, waiting for free space. I’m not sure if this is what you expect. If not then use Channel.UNLIMITED instead and then I think it would be better to implement add() as:

channel.trySend(block).getOrThrow()

runBlocking() always has some performance hit and we don’t really need to suspend if the channel is unlimited. Maybe I miss something here.

Also, someone will probably tell you that GlobalScope is evil, but I believe you used it correctly. Your solution is functionally the same as creating the scope with CoroutineScope(). You can simplify your loop to for (item in channel) or channel.consumeEach {}, but this is just a small improvement.

Also, keep in mind that coroutines are always a little cumbersome when you need to use them from outside of coroutines context and especially for some little feature. This is due to this scope and structured concurrency things. For this reason it may not be always a good idea to migrate to coroutines just to simplify the code. However, if you go fully-coroutines, many things become simpler. For example, your example could be reimplemented as:

class DispatchQueue {
    val channel = ...
    fun addTask() { ... }
    suspend fun run() {
        channel.consumeEach { ... }
    }
}

class AnotherService {
    suspend fun run() = coroutineScope {
        launch { DispatchQueue().run() }
        
        ...
    }
}

As coroutines support cancelling automatically, we don’t need to bother about creating the scope and cancelling manually. If we cancel coroutine that started AnotherService, it will automatically cancel all services, including the loop in DispatchQueue.

1 Like

Thank you for taking the time to pen that full reply. Those look like excellent points and I will digest them slowly.

All of them are valid for me in this context. I like the syntactic improvements, and synchronising the add will help to remove a source of entropy. As for how to embed this in the bigger picture - we that will take the most thought. I already use dependency injection, and the model that makes use of the dispatchQueue can be injected where it is needed, so it may be simpler than feared (Inversion of control becomes more than a buzzword). The GUI makes use of javafx and will likely need animations so co-routines may have some use there, and the whole system will need to deal with asynchronous comms, so again co-routines (once I understand them a bit more) may have a part to play there.

So again, many thanks. This has been a ‘focused lesson’ in co-routines at least as much as it has been solution finding.

Phill

And for the record here is my updated version.

package coroutines

import kotlinx.coroutines.*
import kotlinx.coroutines.channels.Channel
import kotlinx.coroutines.channels.consumeEach

// with help and guidance from here:
// https://discuss.kotlinlang.org/t/how-can-i-use-co-routines-to-single-thread-asynchronous-responses/23045/11?u=phill
typealias SimpleBlock = () -> Unit
/**
 * Executes the given blocks in order on a single daemon thread.
 */
class DispatchQueue(name: String) {
    private val channel = Channel<SimpleBlock>(Channel.UNLIMITED)

    @Synchronized fun add(block: SimpleBlock) = channel.trySend(block).getOrThrow()

    fun stop() = sc.cancel()

    private val sc = CoroutineScope(Dispatchers.Default).launch { channel.consumeEach { it() } }
}

fun main() {
    val dq = DispatchQueue("dq")
    dq.add { println("first") }
    dq.add { println("second") }
    dq.add { println("third") }

    Thread.sleep(2000)

    dq.stop()
}

It sounds like you have this requirement: java - How can I create single-thread coroutine context under Common pool in Kotlin? - Stack Overflow

Ahh, I just realized something very important in this discussion. It is crucial to specify if our goal is to limit concurrency or parallelism.

It was a little simpler in Java world, because Executors.newSingleThreadExecutor() actually limited both. But this is not the case with coroutines. Single-threaded dispatcher (mentioned by @darksnake and @mtimmerm) limits parallelism, but not concurrency. Queue with single consumer limits concurrency, but not parallelism. From your description I believe you actually need the latter one.

I explain the difference with this code:

fun main() {
    Executors.newSingleThreadExecutor().asCoroutineDispatcher().use { dispatcher ->
        var data = 0

        runBlocking(dispatcher) {
            repeat(1000) {
                launch {
                    data = transformData(data)
                }
            }
        }

        println(data)
    }
}

suspend fun transformData(input: Int): Int {
    delay(500) // processing
    return input + 1
}

We increment an integer 1000 times, using a single-thread dispatcher. What is the result of this code? Value: 1. The problem here is that even using single-thread dispatcher, we still execute all 1000 tasks concurrently. This is coroutines magic. We will get exactly the same result using SerialContextDispatcher from the linked StackOverflow question.

Using a channel and reading from it with a single coroutine does a really different thing. It makes sure that we only start a new task after the previous one is fully completed. I didn’t verify this, but it should return 1000 instead of 1 and take 500s instead of 500ms.

Actually, a single-thread dispatcher prohibits concurrent (simultaneous) access to data. It won’t guarantee the order of access is linear though.

I think that I am trying to limit both concurrency and parallelism. I want to execute a sequence of tasks in the order that they were presented (for some definition of the word ‘presented’), but execute them one at a time, in such a manner that changes to shared state made by one task are honoured by the subsequent tasks. Calling this ‘single threading’ was a mistake - I actually do not care which thread the code is executed on.
From my understanding the code I wound up will do this - the channel enforcing an ordering and the coroutine scope ensuring that only one task is handled at a time. I am unsure whether the shared data is treated correctly but I am assuming so.
I also believe that the code will share its thread with a common pool so thread identity is not maintained. That increases efficiency without side effects (I believe).
What I do like is that (given an understanding of the bits of coroutines I have used) the code is terse and the intent shines through the syntactic noise. This clarity is a part of Kotlin that I find very appealing.

[Edit] In case you had not spotted it, my solution is an attempt to replicate the dispatch queue concept I first encountered in swift/iOS (aka ‘Grand Central Dispatch’). It provides a simplification for handling complex state models with multiple asynchronous input. It might be helpful to have this capability as part of the core coroutines code.[/Edit]

A new API for such use case with coroutines is also planned (for 1.6?):

A coroutine dispatcher you can use to enforce that only one thread can enter any withContext(myLimitedParallelismDispatcher) { ... } block.

1 Like