Coroutines with time scheduling


#1

I’ve started working with coroutines in Kotlin and would appreciate help.
I’m quite familiar with Python’s asyncio, so the whole subject is not new to me, but finding it hard how to better design to implement my use case.

I have a potential large number of coroutines, which I need to “schedule” around an external clock, i.e. each coroutine will suspend itself at particular points of its code until a logical clock time. The scheduler should resume coroutines by the order of this logical time a coroutine wants to wakeup.

As an example

suspend fun A() {
    println("A1")
    waitUntil(1000)
    println("A2")
    waitUntil(1500)
    println("A3")
}

suspend fun B() {
    waitUntil(500)
    println("B1")
    waitUntil(200)
    println("B2")
    waitUntil(1800)
    println("B3")
}

should print

A1
B1
B2
A2
A3
B3

I believe wait will be a suspend function too, that needs to reschedule “its own resume” in some sorted list of resumes.
Assume that the logical clock always starts at 0, and clock is logical, i.e. no duration of wait is required : it only is required that resumes are made on incremental order.

How would you implement this coroutine model?

Thanks


#2

Eh, I was trying to come up with a solution involving the low-level APIs, e.g. suspendCoroutine(), but I think it’s simpler just to implement this in terms of delay. Whaddya think:

import kotlinx.coroutines.experimental.CommonPool
import kotlinx.coroutines.experimental.delay
import kotlinx.coroutines.experimental.launch
import kotlinx.coroutines.experimental.runBlocking

private val programStart = System.currentTimeMillis()
suspend fun waitUntil(timeSinceStart: Long) {
    val duration = programStart + timeSinceStart - System.currentTimeMillis()
    if (duration > 0) {
        delay(duration)
    }
}

fun A() = launch(CommonPool) {
    println("A1")
    waitUntil(1000)
    println("A2")
    waitUntil(1500)
    println("A3")
}

fun B() = launch(CommonPool) {
    waitUntil(500)
    println("B1")
    waitUntil(200)
    println("B2")
    waitUntil(1800)
    println("B3")
}

fun main(args: Array<String>) {
    runBlocking {
        val a = A()
        val b = B()
        a.join()
        b.join()
    }
}

prints

A1
B1
B2
A2
A3
B3

#3

Thanks. Not exactly what I’m after. The purpose of the code is to run cooperative multi-agent simulation, and therefore I don’t want delay() to respect real-time clocks, but simply use it to order cooperation. That means that if I do waitUntil(1week), I don’t want the program to wait for a week, but rather switch to the co-routine which had a “waitUntil()” waking up the earliest. It is almost like I would write my own scheduler, sorting the coroutines by the order of their waitUnti().

I re-read the explanation of suspendCoroutine() and I see how it can play a role, by only resuming if the “engine clock” time is already equal or greater than the waitUntil parameter. I’d see a “low priority co-routine” - whatever that might be - just incrementing the engine clock one by one. Instead I think that I’d need to tap into the scheduling logic to decide myself what should be the next suspended function to “wake-up”, after making sure I advanced the engine clock to the earliest parameter of the waitUntil…

Where is the implementation of “such scheduler” currently ?

Thanks


#4

Alright, here’s another attempt. Because there’s no real-time clock component, it’s more prone to race conditions, so the output’s not exactly what you asked for, but I think that’s probably unavoidable:

(Also there are surely better ways to do this; this is more of a prototype than anything)

import kotlinx.coroutines.experimental.CommonPool
import kotlinx.coroutines.experimental.future.await
import kotlinx.coroutines.experimental.launch
import kotlinx.coroutines.experimental.runBlocking
import java.time.Duration
import java.time.Instant
import java.util.concurrent.*

// library //

private data class PriorityJob(val order: Long, internal val future: CompletableFuture<Void>) : Runnable {
    override fun run() {
        future.complete(null)
    }
}

private data class RunnablePriorityJob<T>(private val job: PriorityJob) : RunnableFuture<T>, Comparable<RunnablePriorityJob<T>> {
    override fun compareTo(other: RunnablePriorityJob<T>): Int = job.order.compareTo(other.job.order)

    override fun run() = job.run()

    override fun get(): T? = null
    override fun get(timeout: Long, unit: TimeUnit): T? = null
    override fun cancel(mayInterruptIfRunning: Boolean): Boolean = job.future.cancel(mayInterruptIfRunning)
    override fun isDone(): Boolean = job.future.isDone
    override fun isCancelled(): Boolean = job.future.isCancelled
}

private val executorService = object : ThreadPoolExecutor(1, 1, 1, TimeUnit.DAYS, PriorityBlockingQueue<Runnable>()) {
    override fun <T> newTaskFor(runnable: Runnable, value: T): RunnableFuture<T>
            = RunnablePriorityJob(runnable as PriorityJob)

    override fun <T> newTaskFor(callable: Callable<T>)
            = throw NotImplementedError("Only Runnables are currently supported")
}

suspend fun waitUntil(order: Long) {
    val fut = CompletableFuture<Void>()
    executorService.submit(PriorityJob(order, fut))
    fut.await()
}

// application //

fun A() = launch(CommonPool) {
    println("A1")
    waitUntil(1000)
    println("A2")
    waitUntil(1500)
    println("A3")
}

fun B() = launch(CommonPool) {
    waitUntil(500)
    println("B1")
    waitUntil(200)
    println("B2")
    waitUntil(1800)
    println("B3")
}

fun main(args: Array<String>) {
    val start = Instant.now()
    runBlocking {
        val a = A()
        val b = B()
        a.join()
        b.join()
    }
    val end = Instant.now()
    println("Executed in ${Duration.between(start, end).toMillis()}ms")
    executorService.shutdown()
}

prints

A1
A2
B1
B2
A3
B3
Executed in 46ms

Notice that A2 is printed before B1. This is probably just because the “A” thread got a bit of a head start and isn’t actually a bug. Only when there are multiple items in the queue will there actually be anything interesting happening.


dependencies {
    compile "org.jetbrains.kotlin:kotlin-stdlib-jre8:$kotlin_version" // 1.1.2-2
    compile "org.jetbrains.kotlinx:kotlinx-coroutines-core:0.15"
    compile "org.jetbrains.kotlinx:kotlinx-coroutines-jdk8:0.15"
}

#5

Many thanks. I think I got it and it is indeed an elegant solution to schedule the completion of the future, together with prioritizing the “waits”.
Will check the head start, but I guess that can be solved by tighter control of the executor start. Will check.
Many thanks for the help.

Ricardo


#6

Do you still need help?


#7

I think my solution ended working fine. If there is interest for others what I ended doing is something like example below. Like I said this is for a specific case of autonomous agents simulation, where you want executions to take place in a manually controlled clock.

The support of coroutines and DSL syntax worked quite nicely I must say.

Thanks for checking.

typealias Routine = suspend VirtualMachine.() -> Unit

private data class State(val time: Double, internal val cont: Continuation<Unit>) : Comparable<State> {    
    override fun compareTo(other: State): Int = time.compareTo(other.time)
}

data class VirtualMachine() : Continuation<Unit> {

    private val states = PriorityQueue<State>()

    // The current time of the VM    var time: Double = 0.0
    // Those members implement Continuation, to receive the termination of all the co-routines. Not used. 
    override val context: CoroutineContext = EmptyCoroutineContext 
    override fun resume(value: Unit) {} 
    override fun resumeWithException(exception: Throwable) { throw exception }

    // Schedules a routine after certain time (needs to be after VMtime)    

    fun schedule(after:Double= 0.0, block: Routine) { 
        states.add(State(time+after, block.createCoroutine(receiver=this, completion = this)))    
    }
    
   // Runs all the routines that have been scheduled so far    
    fun run() {
        var next = states.poll()        
        while (next!=null) {
            if (next.time>time) {                
                // some logic here to process time "advancing"            
            }            

            // We set the time for the time the routine wanted to be woken up            
            time = next.time

            // Re-run the routine            
            next.cont.resume(Unit)
    
            // Pull the next            
            next = states.poll()       
       }
   }

    suspend fun wait(seconds: Double) =  suspendCoroutine {
        cont: Continuation<Unit> -> states.add(State(seconds+time, cont)) 
    }

}

and then use:

val vm = VirtualMachine()
vm.schedule(1.0) {
    dosomething()       
    wait(1.0)
    dosomethingelse()
    wait(2)
}

vm.schedule(1.5) {
   dosomething2()       
   wait(3.0)
   dosomethingelse2()
   wait(5)
}

vm.run()

#8

Looks good to me. This is how it is designed to be used. The only additional advise that I can give is this. If all kinds suspensions in your simulation are supposed to end up invoking VirtualMachine.wait, then you can mark VirtualMachine with @RestrictsSuspension annotation to make sure that your code does not accidentally invoke some unauthorised suspending function like delay from kotlinx.coroutines.