Scheduler Task

Hello, recently I wrote a code for a task scheduler, but I’m not entirely sure if it’s written correctly. Here’s the code:

typealias SchedulerBlock = suspend () -> Unit

interface Scheduler {
    suspend fun scheduleAsync(block: SchedulerBlock)

    suspend fun scheduleSync(block: SchedulerBlock)

    suspend fun scheduleAsyncTask(delay: Duration, repeat: Duration?, block: SchedulerBlock)

    suspend fun scheduleAsyncTask(delay: Duration, block: SchedulerBlock) = scheduleAsyncTask(delay, null, block)

    suspend fun scheduleSyncTask(delay: Duration, repeat: Duration?, block: SchedulerBlock)

    suspend fun scheduleSyncTask(delay: Duration, block: SchedulerBlock) = scheduleSyncTask(delay, null, block)

    suspend fun processTask()

    suspend fun shutdown()
}

enum class SchedulerTaskType {
    ASYNC, SYNC
}

data class SchedulerTask(
    val id: Int,
    val type: SchedulerTaskType,
    val delay: Duration,
    val repeat: Duration?,
    val block: SchedulerBlock
)

@OptIn(DelicateCoroutinesApi::class)
class CoroutineScheduler : Scheduler {
    private val taskScope = CoroutineScope(Dispatchers.IO)
    private val taskChannel = Channel<SchedulerTask>()
    private val taskExecutor = newSingleThreadContext("Scheduler Thread")
    private val taskIdCounter = atomic(0)

    override suspend fun scheduleAsync(block: SchedulerBlock) {
       withContext(taskScope.coroutineContext) { block() }
    }

    override suspend fun scheduleSync(block: SchedulerBlock) {
        withContext(taskExecutor) { block() }
    }

    override suspend fun scheduleAsyncTask(delay: Duration, repeat: Duration?, block: SchedulerBlock) {
        taskChannel.send(SchedulerTask(taskIdCounter.incrementAndGet(), SchedulerTaskType.ASYNC, delay, repeat, block))
    }

    override suspend fun scheduleSyncTask(delay: Duration, repeat: Duration?, block: SchedulerBlock) {
        taskChannel.send(SchedulerTask(taskIdCounter.incrementAndGet(), SchedulerTaskType.SYNC, delay, repeat, block))
    }

    override suspend fun processTask() {
        taskScope.launch {
            while (isActive) {
                select {
                    taskChannel.onReceive {
                        launch {
                            delay(it.delay)

                            when (it.type) {
                                SchedulerTaskType.ASYNC -> it.block()
                                SchedulerTaskType.SYNC -> withContext(taskExecutor) { it.block() }
                            }

                            if (it.repeat != null) taskChannel.send(it.copy(delay = it.repeat))
                        }
                    }
                }
            }
        }
    }

    override suspend fun shutdown() {
        taskExecutor.close()
        taskChannel.close()
        taskScope.cancel()
    }
}

Thank you in advance for improving it :smiley:

  1. What is your idea behind sync vs async and task vs non-task? Sync - one at a time, async - multiple at the same time, non-task - wait for it to finish, task - run in the background? That makes sense, but I find the naming counter-intuitive. For me, one at a time vs multiple is sequential vs concurrent and waiting/not waiting is… sync/async.
  2. Why do you use select {} with only a single clause? Isn’t it the same as simply calling receive()?
  3. I don’t think you need channel here. Your consumer only launches and immediately consumes a next item, so I think channel does nothing here. Instead of sending to channel you can launch directly.
  4. Is there any reason processTask() is suspend? It doesn’t suspend in the above code. All *Task() functions probably shouldn’t be suspend as well as they only schedule running the task.
  5. withContext(taskScope.coroutineContext) - I think this is a bad idea. You pass a context which contains Job and that messes with the job structure. Who should be the parent/owner of the task started in scheduleAsync()? Caller of the function? taskScope? It looks like you want both, but this is not possible.
  6. Assuming you wanted sequential execution of tasks for “sync”, please be aware single threaded executor doesn’t provide this. Your solution limits parallelism and you probably wanted to limit concurrency. If one task suspends, another will jump in, even before the first finishes. If you want to start next task only after the first finishes, use mutex or channel with a single consumer as you do in processTask(), but then don’t launch.