What is a right way to launch actor inside coroutine scope?
Passing scope:
fun echo(msg: String) = println("[${Thread.currentThread().name}] $msg")
class ScopedActor(scope: CoroutineScope) {
// In this case the scope won't wait the actor
// private val job = Job()
private val job = Job(parent = scope.coroutineContext[Job])
private val channel = scope.actor<Unit>(job) {
try {
echo("Actor started")
for (msg in this) echo("Message received")
echo("Actor ended")
} catch (exc: CancellationException) {
echo("Actor cancelled")
throw exc
}
}
suspend fun send(msg: Unit) = channel.send(msg)
fun cancel() = job.cancel()
}
fun main(args: Array<String>) {
runBlocking {
val actor = ScopedActor(this)
actor.send(Unit)
echo("Message sent")
actor.cancel()
}
echo("runBlocking finished")
}
But I cannot think of how to run the actor within thread pool.
Another aproach is to pass CoroutineContext
:
class ContextActor(private val context: CoroutineContext) : CoroutineScope {
// private val job = Job()
private val job = Job(parent = context[Job])
override val coroutineContext = context + job
private val channel = actor<Unit> { ... }
...
}
val actor = ContextActor(coroutineContext + Dispatchers.Default)
Also there is an example in the documentation that uses Job()
for canceling actors. But this example violates structured concurrency since if you comment actor.cancel()
line runBlocking
won’t wait the actor.
So what is the best idiomatic way to implement complex stateful actors that could be dispatched to a thread pool?
P.S. Sorry for such a long post.