I’m newish to the concept of coroutines. I’ve consistently read about them described as lighter weight threads and good for discrete actions one may want to move off of the main thread. But, when I attempted to search for a coroutine equivalent of something like Java’s Scheduled Executor Service, I was unable to find one. Am misunderstanding of the purpose of coroutines, I understand Jobs have a simple lifecycle, but I couldn’t seem to find some structures for this use case. Have I missed something which makes the task somewhat trivial or this is a bad use case for coroutines. I made a rough draft of what I was thinking of below:
class Task(initialDelay: Long, private val recurringDelay: Long, private val action: suspend CoroutineScope.() -> Unit): Comparable<Task> {
var time = now() + initialDelay
var parentJob = SupervisorJob()
val context = TaskScheduler.cpuExecutorService + parentJob
val scope = CoroutineScope(context)
val isPeriodic: Boolean
get() = recurringDelay != 0L
val uuid: String = UUID.randomUUID().toString()
suspend fun run() {
val job = scope.launch {
action()
}
if(recurringDelay < 0) {
job.join()
}
if (isPeriodic) {
time = now() + Math.abs(recurringDelay)
}
}
fun cancel() {
parentJob.cancel()
}
override fun compareTo(other: Task): Int = time.compareTo(other.time)
}
class TaskScheduler {
var parentJob = SupervisorJob()
val context = cpuExecutorService + parentJob
private val scope = CoroutineScope(context)
private var isRunning = AtomicBoolean(false)
private val taskQueue = PriorityQueue<Task>()
var jobs = HashMap<String, Task>()
private fun schedule(initialDelay: Long, recurringDelay: Long, action: suspend CoroutineScope.() -> Unit): String {
val task = Task(initialDelay, recurringDelay, action)
jobs[task.uuid] = task
taskQueue.add(task)
run()
return task.uuid
}
private fun run() {
scope.launch {
if (!isRunning.get()) {
isRunning.set(true)
var next = taskQueue.poll()
while (next != null) {
if (next.time < now()) {
next.run()
val tempNext = next
if(tempNext.isPeriodic) {
taskQueue.add(tempNext)
}
next = taskQueue.poll()
}
}
isRunning.set(false)
}
}
}
fun scheduleOnce(initialDelay: Long, action: suspend CoroutineScope.() -> Unit): String{
return schedule(initialDelay, 0, action)
}
fun scheduleAtFixedRate(initialDelay: Long, recurringDelay: Long, action: suspend CoroutineScope.() -> Unit): String {
return schedule(initialDelay, recurringDelay, action)
}
fun scheduleWithFixedDelay(initialDelay: Long, recurringDelay: Long, action: suspend CoroutineScope.() -> Unit): String {
return schedule(initialDelay, recurringDelay*-1, action)
}
fun cancel(uuid: String) {
jobs[uuid]?.cancel()
taskQueue.remove(jobs[uuid])
jobs.remove(uuid)
}
fun cancel() {
jobs.forEach {
jobs[it.key]?.cancel()
}
jobs.clear()
}
companion object {
private val NUM_CPU_THREADS = Math.max(1, Runtime.getRuntime().availableProcessors() - 1)
val cpuExecutorService: CoroutineDispatcher by lazy { Executors.newFixedThreadPool(NUM_CPU_THREADS).asCoroutineDispatcher() }
}
}
internal fun now() = System.currentTimeMillis()