Coroutine Scheduling

#1

I’m newish to the concept of coroutines. I’ve consistently read about them described as lighter weight threads and good for discrete actions one may want to move off of the main thread. But, when I attempted to search for a coroutine equivalent of something like Java’s Scheduled Executor Service, I was unable to find one. Am misunderstanding of the purpose of coroutines, I understand Jobs have a simple lifecycle, but I couldn’t seem to find some structures for this use case. Have I missed something which makes the task somewhat trivial or this is a bad use case for coroutines. I made a rough draft of what I was thinking of below:

class Task(initialDelay: Long, private val recurringDelay: Long, private val action: suspend CoroutineScope.() -> Unit): Comparable<Task> {
    var time = now() + initialDelay
    var parentJob = SupervisorJob()
    val context = TaskScheduler.cpuExecutorService + parentJob
    val scope = CoroutineScope(context)

    val isPeriodic: Boolean
        get() = recurringDelay != 0L

    val uuid: String = UUID.randomUUID().toString()

    suspend fun run() {

        val job = scope.launch {
            action()
        }

        if(recurringDelay < 0) {
            job.join()
        }


        if (isPeriodic) {
            time = now() + Math.abs(recurringDelay)
        }
    }

    fun cancel() {
        parentJob.cancel()
    }

    override fun compareTo(other: Task): Int = time.compareTo(other.time)
}


class TaskScheduler {

    var parentJob = SupervisorJob()
    val context = cpuExecutorService + parentJob
    private val scope = CoroutineScope(context)

    private var isRunning = AtomicBoolean(false)
    private val taskQueue = PriorityQueue<Task>()
    var jobs = HashMap<String, Task>()

    private fun schedule(initialDelay: Long, recurringDelay: Long, action: suspend CoroutineScope.() -> Unit): String {
        val task = Task(initialDelay, recurringDelay, action)
        jobs[task.uuid] = task
        taskQueue.add(task)
        run()
        return task.uuid
    }

    private fun run() {

        scope.launch {
            if (!isRunning.get()) {
                isRunning.set(true)
                var next = taskQueue.poll()
                while (next != null) {
                    if (next.time < now()) {
                        next.run()
                        val tempNext = next

                        if(tempNext.isPeriodic) {
                            taskQueue.add(tempNext)
                        }

                        next = taskQueue.poll()

                    }
                }
                isRunning.set(false)
            }
        }
    }

    fun scheduleOnce(initialDelay: Long, action: suspend CoroutineScope.() -> Unit): String{
        return schedule(initialDelay, 0, action)
    }

    fun scheduleAtFixedRate(initialDelay: Long, recurringDelay: Long, action: suspend CoroutineScope.() -> Unit): String {
        return schedule(initialDelay, recurringDelay, action)
    }

    fun scheduleWithFixedDelay(initialDelay: Long, recurringDelay: Long, action: suspend CoroutineScope.() -> Unit): String {
        return schedule(initialDelay, recurringDelay*-1, action)
    }

    fun cancel(uuid: String) {
        jobs[uuid]?.cancel()
        taskQueue.remove(jobs[uuid])
        jobs.remove(uuid)
    }

    fun cancel() {
        jobs.forEach {
            jobs[it.key]?.cancel()
        }
        jobs.clear()
    }


    companion object {
        private val NUM_CPU_THREADS = Math.max(1, Runtime.getRuntime().availableProcessors() - 1)
        val cpuExecutorService: CoroutineDispatcher by lazy { Executors.newFixedThreadPool(NUM_CPU_THREADS).asCoroutineDispatcher() }
    }

}

internal fun now() = System.currentTimeMillis()
#2

Are you seen this page https://github.com/Kotlin/kotlinx.coroutines/blob/master/docs/coroutine-context-and-dispatchers.md ?

Coroutine context includes a coroutine dispatcher (see CoroutineDispatcher) that determines what thread or threads the corresponding coroutine uses for its execution. Coroutine dispatcher can confine coroutine execution to a specific thread, dispatch it to a thread pool, or let it run unconfined.

I couldn’t seem to find some structures for this use case

I missed it, what is your use case?

#3

I guess the core of my question is about converting long running processes which need to run at a fixed interval into coroutines. The example I used was Java’s Scheduled Executor Service. It allows you to pass in a runnable along with interval information to have a tasks repeated at the given interval until cancelled.

#4

@robert.gross This example fits your use case?

GlobalScope.launch {
    while (isActive) {
        delay(Duration.ofSeconds(1))
        println("One more second")
    }
}
#5

Coroutines aren’t really the equivalent of Scheduled Executor Service. They generally run immediately, and only once. I could imagine someone creating a Java library that runs coroutines at scheduled times (the way that Android’s WorkManager does), but I have no idea if that exists yet.

#6
#7

@fvasco
This is pretty similar to what I’m looking for thanks for the info