Have you tired using your single thread dispatcher from sendPacket
?
withContext(mySingleThreadedDispatcher) { doBlockingSend() }
That should fix your non-thread-safe issue.
I’m uncertain of how to address your “special acknowledgement packets” without more explanation of how your current code is attempting to address it. A single worker can’t immediately respond to anything if it’s blocked working on a previous request so I’m confused on how the worker coroutine is helping you at all.
Also, if you are using a single coroutine for your worker, you probably don’t need it to run on a single threaded dispatcher so long as you aren’t calling launch/async. While a single coroutine may switch between threads, it’s running sequentially and there will be no concurrent calls.
Your single threaded dispatcher is likely doing nothing for your worker as it is since Dispatchers.IO
uses a thread pool so I guessing your calls are not happening on a single thread anyways.
Docs for runInterruptible are here. It uses thread interruption to cancel blocking calls.
If you have some other mechanism for cancelling your blocking work, then suspendCancellableCoroutine (detect current coroutine cancellation) or Job.invokeOnCompletion (detect Job finished) may be of some use.
If you stick with the worker, I’d suggest switching around your task’s Job
ancestry to leverage Structured Concurrency.
Consider this expample that demonstrates running code with a provided CoroutineScope
instead of just using the current CoroutineContext
.
import kotlinx.coroutines.*;
import kotlinx.coroutines.channels.*;
val channel = Channel<PacketRequest>()
fun main() {
runBlocking {
launch { worker() }
sendPacket(Packet)
}
println("All done")
}
object Packet
fun doBlockingSend(packet: Packet) { println("doBlockingSend") }
suspend fun sendPacket(packet: Packet) {
coroutineScope { // creates a CoroutineScope for launching the actual work
val completionSignal = Job()
val sendTask = PacketRequest(packet, this, completionSignal::complete)
channel.send(sendTask)
completionSignal.join() //Keep CoroutineScope alive so it can be used by worker.
}
}
suspend fun worker() {
// while (true) {
val (packet, requesterScope, onComplete) = channel.receive()
val job = requesterScope.launch {
withContext(Dispatchers.IO) {
doBlockingSend(packet)
}
onComplete()
}
job.join()
// }
}
data class PacketRequest(
val packet: Packet,
val requesterScope: CoroutineScope,
val onComplete: () -> Unit
)
A worker can ensure that only one packet is processed at a time, but the actual processing can still happen under the Job
hierarchy of the request’s sender. Cancellation will propagate from sender parent to child task and exceptions will propagate from child task to sender parent.
Not that by using a worker, you also have the problem of what happens if the worker is cancelled before all packets have been sent/processed.