I believe im trying to do something common when working with coroutines , passing a error channel in an object which awaits confirmation from the errC that it executed successfully. But i don’t understand why the newResourceCreator.onReceive() is never triggered. I have the complete project here and im running the test from here
Heres the gist of things, (maybe the issue lies outside of this am not sure)
data class ResourceAction (
id: Int,
errC: Channel<Throwable?>,
)
class MyClient {
private val job = SupervisorJob()
private val scope = CoroutineScope(Dispatchers.IO + job + CoroutineName("ClientSupervisor"))
val newResourceCreator = Channel<ResourceAction>()
val oldResourceDeletor = Channel<ResourceAction>()
init {
CoroutineScope(Dispatchers.Default + job).launch(CoroutineName("ClientRunLoop")) { run() }
}
fun addResource(id: Int) {
val errC = Channel<Throwable?>()
val resourceAction = ResourceAction(id,errC)
newResourceCreator.send(resourceAction)
val errCRes = errC.receive()
if(errCRes == null) return resource else throw Exception("Error creating resource $id with capacity $capacity: $errCRes")
}
suspend fun run () {
while(true) {
select {
newResourceCreator.onReceive() {
resourceAction ->
try {
resource = Resource(resourceAction.id)
// some stuff
resourceAction.errC.send(null)
} catch (e: Throwable) {
resourceAction.errC.send(e)
}
}
// oldResourceDeletor.onReceive() {}
}
}
}
}
I try to call addResource() from a junit test but i end up with the while inside run() looping infinitely.
On deeper inspection i can see something odd happening , the coroutine that was created during init{run()} is not the same one that performs the select{} and im confused why that is the case, it seems like a new coroutine gets spawned just for executing the select branch picking ?
I am waaaaaaaaay out of my depth with this one, but looking at what the select function actually does, it seems like there’s some black magic to actually register your select clause. I would go into the SelectImplementation class and put break points in all the invoke methods, to see if the select function is actually registering the onReceive as a select clause. If no clause is being registered, maybe the select finishes immediately and the while loop just loops? Out of curiosity, do you know if it’s actually the while loop that keeps looping, meaning the select function is finishing, and then being called again? Or is it actually that the select function keeps suspending, and then resuming waiting for that function?
Could you please provide a minimal reproducible example? Your above code seems a little different than your real one, it doesn’t compile for multiple reasons and after fixing all these problems, it runs just fine: Kotlin Playground: Edit, Run, Share Kotlin Code Online It is hard to guess what is the problem in your real code.
I went back and tried to write to rewrite only the part i wanted and it looks something like this, i figured out that removing the { , } fixes this blocked state (see the cmnts and remove them)… but no idea why… and flew super under my radar as those seemed harmless! Anyway thank you so much @broot , @Skater901 ! This still remains a mystery to me tho !
package org.example
import kotlinx.coroutines.*
import kotlinx.coroutines.channels.Channel
import kotlinx.coroutines.channels.ticker
import kotlinx.coroutines.selects.select
data class Action(
val id : Int,
val errC: Channel<Throwable?>
)
class Client {
private val job = SupervisorJob()
private val scope = CoroutineScope(Dispatchers.IO + job + CoroutineName("ClientSupervisor"))
private val actionChannel = Channel<Action>()
init {
CoroutineScope(Dispatchers.Default + job).launch(CoroutineName("ClientRunLoop")) { run() }
}
private fun applyAction(action: Action): Throwable? {
println("Applying action ${action.id}")
// does some stuff
return null
}
suspend fun addAction(id: Int): Action {
val errC = Channel<Throwable?>()
val action = Action(id, errC)
actionChannel.send(action)
val err = errC.receive()
if (err != null) {
throw err
} else {
return action
}
}
private suspend fun run() {
val wakeUp = ticker(1000)
while(true) {
select {
actionChannel.onReceive { action ->
{ // REMOVE THIS BRACE TO MAKE IT WORK!
try {
println("Action ${action.id} started")
val result = applyAction(action)
scope.launch {action.errC.send(result) }
println("Action ${action.id} finished")
} catch (e: Throwable) {
scope.launch { action.errC.send(e) }
}
} // REMOVE THIS BRACE TO MAKE IT WORK!
}
wakeUp.onReceive {
println("Client run loop is still alive")
}
}
}
}
}
fun main() {
runBlocking {
val client = Client()
val action1 = client.addAction(1)
println("Action 1 added , result: $action1")
val action2 = client.addAction(2)
println("Action 2 added , result: $action2")
}
}
So you actually debugged your problem yourself. It is unrelated to channels and coroutines. You just used an incorrect syntax, which was accidentally interpreted by the compiler as something entirely else. You created a lambda an thrown it away: