How do I enqueue sequential coroutine blocks with the Default Dispatcher?

Hi Everyone,

I am trying to utilize coroutines to create an object which tracks how long various tasks take to execute for data analysis purposes, but I’m running into a wall. I need to support fully concurrent method invocations, but the real-time ordering of those method calls needs to be strongly guaranteed.

The traditional way of doing this would look something like:

enum class TrackedTask {
    Task1,
    Task2,
    // ...
}

object Tracker {
    private val startTimes = mutableMapOf<TrackedTask, LocalDateTime>()

    @Synchronized
    fun startTask(task: TrackedTask) {
        startTimes[task] = LocalDateTime.now()
    }

    @Synchronized
    fun finishTask(task: TrackedTask) {
        val endTime = LocalDateTime.now()
        startTimes[task]?.let { /* Send tracking event to service with start and end times */ }
    }

    @Synchronized
    fun abandonTask(task: TrackedTask) {
        startTimes.remove(task)
    }
}

Synchronized blocks are quickly becoming antiquated with high-level languages which offer structured concurrency, so I’m trying to incorporate coroutines to handle this for me. What I want out of coroutines would be something that looks like:

object Tracker : CoroutineScope by CoroutineScope(Dispatchers.Default) {
    private val startTimes = mutableMapOf<TrackedTask, LocalDateTime>()

    fun startTask(task: trackedTask) {
        launch {
            startTimes[task] = LocalDateTime.now()
        }
    }

    fun finishTask(task: TrackedTask) {
        val endTime = LocalDateTime.now()
        launch {
            startTimes[task]?.let { /* Send tracking event to service with start and end times */ }
        }
    }

    fun abandonTask(task: TrackedTask) {
        launch {
            startTimes.remove(task)
        }
    }
}

The problem with this is that the Default Dispatcher will still submit this blocks to some shared thread pool, and there is the risk of concurrent modification exceptions. Furthermore, when doing a simple test using this implementation in a sandbox environment I see each launch executing in whatever order the underlying dispatchers/schedulers think they should be executed in. They’re usually pretty close to the actual called order, but I require strong ordering guarantees that each launch block is executed sequentially, in the real-time order the function was invoked.


I have seen the following code snippet which solves the problem, but it introduces a brand new one:

object Tracker : CoroutineScope by
        CoroutineScope(Executors.newSingleThreadExecutor().asCoroutineDispatcher()) {
  ...
}

The problem with this is it creates an entirely new thread just for the lifetime of this object. For a one-off tracker object, this is probably fine, but I’d like to reuse this kind of pattern across my application to do away with all my synchronized or concurrent logic for objects which need to support concurrency but also require that the caller order is respected.

Looking through the implementation of the Default Dispatcher, it comes really close to what I think of as the ideal scenario: It enqueues suspending blocks to be executed in the JVM’s ForkJoinPool.commonPool() (or a manually-created one if certain conditions are met), but it dispatches all the blocks concurrently to that thread pool. I want a Dispatcher which will sequentially dispatch these suspending blocks to some common background thread pool.

Short of rolling my own CoroutineDispatcher (which I really want to avoid :eyes:), is there any way to create this kind of fire-and-forget behavior with strong execution ordering guarantees using coroutines?

You can guarantee strict sequential ordering by running them with a single coroutine. Send the tasks into a channel and launch a single coroutine to receive and execute all tasks in a loop.

When you launch something, you don’t get any guarantees about when it will run. That’s part of what launch means, so while you certainly could change that with a custom dispatcher, it’s really not a good idea. Changing the meaning of common works like launch would make your code extremely difficult to understand.

I actually think your synchronized tracker is just fine, except that you may want to launch the “send tracking event” part if it would otherwise send a request synchronously.

I could offer you ways to avoid multithreading, and therefore the @Synchronized blocks, but since you need to support “fully concurrent method invocations”, it sounds like multithreading is a requirement. If you’re going to do multithreading, then you’ll need to use concurrency control mechanisms like @Synchronized, or other more complicated ways to ensure sequential execution across threads.

Thank you for your responses! I like the idea of synchronizing the methods and putting a launch in place for the actual sending of the tracking events. Thanks @mtimmerm for that suggestion :smile: I guess I was trying use Kotlin Coroutines to do what Swift GCD allows you to do with serial background queues, but there’s more to the story than that.

@nickallendev I had disregarded Channel since I thought they were still experimental like some parts of the Flow API, but you encouraged me to take another look at the source. I didn’t know Channel was not-experimental, so I may play with that a bit. Thanks!

IMO, I think you anyway will need some single thread pool. Even with channel, enqueued tasks can execute simultaneously if there are multiple threads in dispatcher pool.

Tasks run by a single coroutine will not execute simultaneously, no matter how many threads are available. For loops do not spawn new child coroutines.