I’ve started working with coroutines in Kotlin and would appreciate help.
I’m quite familiar with Python’s asyncio, so the whole subject is not new to me, but finding it hard how to better design to implement my use case.
I have a potential large number of coroutines, which I need to “schedule” around an external clock, i.e. each coroutine will suspend itself at particular points of its code until a logical clock time. The scheduler should resume coroutines by the order of this logical time a coroutine wants to wakeup.
As an example
suspend fun A() {
println("A1")
waitUntil(1000)
println("A2")
waitUntil(1500)
println("A3")
}
suspend fun B() {
waitUntil(500)
println("B1")
waitUntil(200)
println("B2")
waitUntil(1800)
println("B3")
}
should print
A1
B1
B2
A2
A3
B3
I believe wait will be a suspend function too, that needs to reschedule “its own resume” in some sorted list of resumes.
Assume that the logical clock always starts at 0, and clock is logical, i.e. no duration of wait is required : it only is required that resumes are made on incremental order.
How would you implement this coroutine model?
Thanks
Eh, I was trying to come up with a solution involving the low-level APIs, e.g. suspendCoroutine()
, but I think it’s simpler just to implement this in terms of delay
. Whaddya think:
import kotlinx.coroutines.experimental.CommonPool
import kotlinx.coroutines.experimental.delay
import kotlinx.coroutines.experimental.launch
import kotlinx.coroutines.experimental.runBlocking
private val programStart = System.currentTimeMillis()
suspend fun waitUntil(timeSinceStart: Long) {
val duration = programStart + timeSinceStart - System.currentTimeMillis()
if (duration > 0) {
delay(duration)
}
}
fun A() = launch(CommonPool) {
println("A1")
waitUntil(1000)
println("A2")
waitUntil(1500)
println("A3")
}
fun B() = launch(CommonPool) {
waitUntil(500)
println("B1")
waitUntil(200)
println("B2")
waitUntil(1800)
println("B3")
}
fun main(args: Array<String>) {
runBlocking {
val a = A()
val b = B()
a.join()
b.join()
}
}
prints
A1
B1
B2
A2
A3
B3
Thanks. Not exactly what I’m after. The purpose of the code is to run cooperative multi-agent simulation, and therefore I don’t want delay() to respect real-time clocks, but simply use it to order cooperation. That means that if I do waitUntil(1week), I don’t want the program to wait for a week, but rather switch to the co-routine which had a “waitUntil()” waking up the earliest. It is almost like I would write my own scheduler, sorting the coroutines by the order of their waitUnti().
I re-read the explanation of suspendCoroutine() and I see how it can play a role, by only resuming if the “engine clock” time is already equal or greater than the waitUntil parameter. I’d see a “low priority co-routine” - whatever that might be - just incrementing the engine clock one by one. Instead I think that I’d need to tap into the scheduling logic to decide myself what should be the next suspended function to “wake-up”, after making sure I advanced the engine clock to the earliest parameter of the waitUntil…
Where is the implementation of “such scheduler” currently ?
Thanks
Alright, here’s another attempt. Because there’s no real-time clock component, it’s more prone to race conditions, so the output’s not exactly what you asked for, but I think that’s probably unavoidable:
(Also there are surely better ways to do this; this is more of a prototype than anything)
import kotlinx.coroutines.experimental.CommonPool
import kotlinx.coroutines.experimental.future.await
import kotlinx.coroutines.experimental.launch
import kotlinx.coroutines.experimental.runBlocking
import java.time.Duration
import java.time.Instant
import java.util.concurrent.*
// library //
private data class PriorityJob(val order: Long, internal val future: CompletableFuture<Void>) : Runnable {
override fun run() {
future.complete(null)
}
}
private data class RunnablePriorityJob<T>(private val job: PriorityJob) : RunnableFuture<T>, Comparable<RunnablePriorityJob<T>> {
override fun compareTo(other: RunnablePriorityJob<T>): Int = job.order.compareTo(other.job.order)
override fun run() = job.run()
override fun get(): T? = null
override fun get(timeout: Long, unit: TimeUnit): T? = null
override fun cancel(mayInterruptIfRunning: Boolean): Boolean = job.future.cancel(mayInterruptIfRunning)
override fun isDone(): Boolean = job.future.isDone
override fun isCancelled(): Boolean = job.future.isCancelled
}
private val executorService = object : ThreadPoolExecutor(1, 1, 1, TimeUnit.DAYS, PriorityBlockingQueue<Runnable>()) {
override fun <T> newTaskFor(runnable: Runnable, value: T): RunnableFuture<T>
= RunnablePriorityJob(runnable as PriorityJob)
override fun <T> newTaskFor(callable: Callable<T>)
= throw NotImplementedError("Only Runnables are currently supported")
}
suspend fun waitUntil(order: Long) {
val fut = CompletableFuture<Void>()
executorService.submit(PriorityJob(order, fut))
fut.await()
}
// application //
fun A() = launch(CommonPool) {
println("A1")
waitUntil(1000)
println("A2")
waitUntil(1500)
println("A3")
}
fun B() = launch(CommonPool) {
waitUntil(500)
println("B1")
waitUntil(200)
println("B2")
waitUntil(1800)
println("B3")
}
fun main(args: Array<String>) {
val start = Instant.now()
runBlocking {
val a = A()
val b = B()
a.join()
b.join()
}
val end = Instant.now()
println("Executed in ${Duration.between(start, end).toMillis()}ms")
executorService.shutdown()
}
prints
A1
A2
B1
B2
A3
B3
Executed in 46ms
Notice that A2 is printed before B1. This is probably just because the “A” thread got a bit of a head start and isn’t actually a bug. Only when there are multiple items in the queue will there actually be anything interesting happening.
dependencies {
compile "org.jetbrains.kotlin:kotlin-stdlib-jre8:$kotlin_version" // 1.1.2-2
compile "org.jetbrains.kotlinx:kotlinx-coroutines-core:0.15"
compile "org.jetbrains.kotlinx:kotlinx-coroutines-jdk8:0.15"
}
Many thanks. I think I got it and it is indeed an elegant solution to schedule the completion of the future, together with prioritizing the “waits”.
Will check the head start, but I guess that can be solved by tighter control of the executor start. Will check.
Many thanks for the help.
Ricardo
I think my solution ended working fine. If there is interest for others what I ended doing is something like example below. Like I said this is for a specific case of autonomous agents simulation, where you want executions to take place in a manually controlled clock.
The support of coroutines and DSL syntax worked quite nicely I must say.
Thanks for checking.
typealias Routine = suspend VirtualMachine.() -> Unit
private data class State(val time: Double, internal val cont: Continuation<Unit>) : Comparable<State> {
override fun compareTo(other: State): Int = time.compareTo(other.time)
}
data class VirtualMachine() : Continuation<Unit> {
private val states = PriorityQueue<State>()
// The current time of the VM var time: Double = 0.0
// Those members implement Continuation, to receive the termination of all the co-routines. Not used.
override val context: CoroutineContext = EmptyCoroutineContext
override fun resume(value: Unit) {}
override fun resumeWithException(exception: Throwable) { throw exception }
// Schedules a routine after certain time (needs to be after VMtime)
fun schedule(after:Double= 0.0, block: Routine) {
states.add(State(time+after, block.createCoroutine(receiver=this, completion = this)))
}
// Runs all the routines that have been scheduled so far
fun run() {
var next = states.poll()
while (next!=null) {
if (next.time>time) {
// some logic here to process time "advancing"
}
// We set the time for the time the routine wanted to be woken up
time = next.time
// Re-run the routine
next.cont.resume(Unit)
// Pull the next
next = states.poll()
}
}
suspend fun wait(seconds: Double) = suspendCoroutine {
cont: Continuation<Unit> -> states.add(State(seconds+time, cont))
}
}
and then use:
val vm = VirtualMachine()
vm.schedule(1.0) {
dosomething()
wait(1.0)
dosomethingelse()
wait(2)
}
vm.schedule(1.5) {
dosomething2()
wait(3.0)
dosomethingelse2()
wait(5)
}
vm.run()
1 Like
Looks good to me. This is how it is designed to be used. The only additional advise that I can give is this. If all kinds suspensions in your simulation are supposed to end up invoking VirtualMachine.wait
, then you can mark VirtualMachine
with @RestrictsSuspension
annotation to make sure that your code does not accidentally invoke some unauthorised suspending function like delay
from kotlinx.coroutines
.
1 Like