Hello, recently I wrote a code for a task scheduler, but I’m not entirely sure if it’s written correctly. Here’s the code:
typealias SchedulerBlock = suspend () -> Unit
interface Scheduler {
suspend fun scheduleAsync(block: SchedulerBlock)
suspend fun scheduleSync(block: SchedulerBlock)
suspend fun scheduleAsyncTask(delay: Duration, repeat: Duration?, block: SchedulerBlock)
suspend fun scheduleAsyncTask(delay: Duration, block: SchedulerBlock) = scheduleAsyncTask(delay, null, block)
suspend fun scheduleSyncTask(delay: Duration, repeat: Duration?, block: SchedulerBlock)
suspend fun scheduleSyncTask(delay: Duration, block: SchedulerBlock) = scheduleSyncTask(delay, null, block)
suspend fun processTask()
suspend fun shutdown()
}
enum class SchedulerTaskType {
ASYNC, SYNC
}
data class SchedulerTask(
val id: Int,
val type: SchedulerTaskType,
val delay: Duration,
val repeat: Duration?,
val block: SchedulerBlock
)
@OptIn(DelicateCoroutinesApi::class)
class CoroutineScheduler : Scheduler {
private val taskScope = CoroutineScope(Dispatchers.IO)
private val taskChannel = Channel<SchedulerTask>()
private val taskExecutor = newSingleThreadContext("Scheduler Thread")
private val taskIdCounter = atomic(0)
override suspend fun scheduleAsync(block: SchedulerBlock) {
withContext(taskScope.coroutineContext) { block() }
}
override suspend fun scheduleSync(block: SchedulerBlock) {
withContext(taskExecutor) { block() }
}
override suspend fun scheduleAsyncTask(delay: Duration, repeat: Duration?, block: SchedulerBlock) {
taskChannel.send(SchedulerTask(taskIdCounter.incrementAndGet(), SchedulerTaskType.ASYNC, delay, repeat, block))
}
override suspend fun scheduleSyncTask(delay: Duration, repeat: Duration?, block: SchedulerBlock) {
taskChannel.send(SchedulerTask(taskIdCounter.incrementAndGet(), SchedulerTaskType.SYNC, delay, repeat, block))
}
override suspend fun processTask() {
taskScope.launch {
while (isActive) {
select {
taskChannel.onReceive {
launch {
delay(it.delay)
when (it.type) {
SchedulerTaskType.ASYNC -> it.block()
SchedulerTaskType.SYNC -> withContext(taskExecutor) { it.block() }
}
if (it.repeat != null) taskChannel.send(it.copy(delay = it.repeat))
}
}
}
}
}
}
override suspend fun shutdown() {
taskExecutor.close()
taskChannel.close()
taskScope.cancel()
}
}
Thank you in advance for improving it