Choosing a data structure to hold Coroutine Channels


#1

I’m following a rough interpretation of the Actor model with Kotlinx’s coroutine BroadcastChannels. These channels are listened to by several actors, so my design right now is for the containing class of each actor to take in a Channels object, like so:

class Channels {
  val myChannel = BroadcastChannel<Message>(1)
  val myOtherChannel = BroadcastChannel<Message>(1)
  ...
}

class MyNode(val channels: Channels) {
  // some internal state
  fun myActor() = actor<Message> {
    val subscription = channels.myChannel.openSubscription()
    for (msg in subscription) { ... }
  }
  fun myOtherActor() = actor<Message> {
    val subscription = channels.myOtherChannel.openSubscription()
    for (msg in subscription) { ... }
  }
}
val channels = Channels()
val myNode = MyNode(channels)

This is convenient enough, but now I want a “debugging” node which listens to every channel in my Channels object. I tried reflection, but this doesn’t work for properties with type parameters, like BroadcastChannel<T>.
I feel like I’m missing something very obvious here. Is there a better (iterable) data structure to hold all of my channels in? HashMaps aren’t deterministic, so I’d have to check for nulls for channels I know exist.


#2

If I understand you correct you want a debug actor which listens to all channels at the same time and executes some code for the message like

// pseudo code
for (msg in (myChannel and myOtherChannel)) { ... }

You should be able to use a select clause here

select {
  myChannel.onReceive { debugLogMsg(it) }
  myOtherChannel.onReceive { debugLogMsg(it) }
}

This will wait for the first message on any channel and handle it. You would need to wrap it into a loop and probably also check whether the channels are closed to break out of the loop.


#3

Apologies, my original post was unclear. In my code there are more than just 2 channels, and I’m looking for an approach where I don’t have to explicitly subscribe to each channel by name. As in, I’d like some kind of iterable to do the DRY work for me, so if I create more channels later on I don’t have to remember to copy-paste the names into my debugger.


#4

Can’t you just iterate over the properties of the Channels class in that case?

for (property in Channels::class.declaredMemberProperties){
  property.returnType // this is the type of the property in your case BroadcastChannel<Message>
}

#5

I appreciate the help thus far. As I said, I tried reflection, but the compiler gives a Cannot check for instance of erased type error because the BroadcastChannel type has a parameter.
I can only use a property channel’s methods if I check if it is BroadcastChannel<Message>, which the compiler cannot do.


#6

How do you check? You could use this code, it works for me

val checkAgainst = Channel::class.createType(listOf(KTypeProjection.covariant(Message)))
if(property.returnType.isSubtypeOf(checkAgainst) {
	...
}

#7

My linter says Classifier 'Message' does not have a companion object, and thus must be initialized here for that solution.
My Message class is an interface, though it seems it doesn’t matter if it’s a class or not, I’m not familiar with Kotlin enough to know why it needs a companion object.

The way I was checking was:

for (property in Channels::class.declaredMemberProperties) {
    if (property.returnType is BroadcastChannel<Message>) {
        val subscription = property.get(channels).openSubscription()
    }
}

Which gives the erased type error.


#8

Interesting error message, I don’t know why it appears. The error I get is “Cannot check for instance of erased type”.
But your check is wrong anyways. is checks that the type of the left hand operand is the same as the right hand operand and returnType is of type KType so this would never evaluate to true.
Instead you can either check that returnType == BroadcastChannel::class.crateType(...) or as I did is a subtype of Channel whatever you need.


#9

Sorry, I meant that I get the error message

for any use of KTypeProjection.covariant(Message). I’m not sure why a companion object is necessary for an interface.


#10

Woops your right. Missed some part when I posted here. It should be Message::class.createType()


#11

Assuming the full expression should be:

 if (property.returnType == BroadcastChannel::class.createType(listOf(KTypeProjection.covariant(Message::class.createType())))) {
    property.get(channels).openSubscription()
 }

The compiler isn’t able to infer the type of property.get(channels) being BroadcastChannel<Message>.


#12

I guess the reason is that BroadcastChannel is a interface and obviously the type of the property is the implementing class. Maybe try property.returnType.isSubtypeOf(...) instead of just equals.


#13

Does this do what you want?

interface Message {
    val foo: Int
}

data class MessageImpl (
        override val foo: Int
) : Message

class Channels {
    val allProps = mutableMapOf<String, BroadcastChannel<Message>>()
    var myChannel by allProps
    var myOtherChannel by allProps

    init {
        myChannel = BroadcastChannel(1)
        myOtherChannel = BroadcastChannel(1)
    }
}

class MyNode(val channels: Channels, val coroutineContext: CoroutineContext) {
    fun myConsumer() = launch(coroutineContext) {
        val subscription = channels.myChannel.openSubscription()
        for (msg in subscription) {
            println("myConsumer received: ${msg.foo}")
        }
    }

    fun myOtherConsumer() = launch(coroutineContext) {
        val subscription = channels.myOtherChannel.openSubscription()
        for (msg in subscription) {
            println("myOtherConsumer received: ${msg.foo}")
        }
    }

    fun debugConsumer() {
        for ((cname, channel) in channels.allProps) {
                val subscription = channel.openSubscription()
                launch(coroutineContext) {
                    subscription.consumeEach {
                        println("debugConsumer received from `$cname`: ${it.foo}")
                    }
                }
        }
    }
}

fun main(args: Array<String>) {
    runBlocking {
        val channels = Channels()
        val myNode = MyNode(channels, coroutineContext)

        myNode.myConsumer()
        myNode.myOtherConsumer()
        myNode.debugConsumer()

        launch(coroutineContext) {
            for (i in 0 until 10) {
                channels.myChannel.send(MessageImpl(i))
            }
        }

        launch(coroutineContext) {
            for (i in 0 until 10) {
                channels.myOtherChannel.send(MessageImpl(i))
            }
        }


        delay(500)
        channels.myChannel.close()
        channels.myOtherChannel.close()
    }
}

#14

That is essentially perfect, thank you.


#15

I wish there was a way to initilize the properties on the spot, though, instead of in a separate init {} block, like this:

class Channels {
    val allProps = mutableMapOf<String, BroadcastChannel<Message>>()
    var myChannel by allProps = BroadcastChannel(1)
    var myOtherChannel by allProps = BroadcastChannel(1)
}

But this is not supported.


#16

My thoughts exactly. Still, the overhead seems to be better than the alternatives.


#17

Unfortunately finding the subtype didn’t work either. I appreciate all the help, though.


#18

Interesting. I only tested it with lists of integers and strings but I don’t see where the difference is.

import kotlin.reflect.*
import kotlin.reflect.full.*

class Test{

	val a = listOf(1, 2, 3)
	val b = listOf(1, 2, 3)
	val c = listOf(1, 2, 3)
	val d = listOf(1, 2, 3)
	val e = ArrayList<Int>(5)
	val f = listOf("", "", "")
}

fun main(vararg args: String){
	for(property in Test::class.declaredMemberProperties){

		println(property.name)
		println(property.returnType)


		if(property.returnType.isSubtypeOf(
						List::class.createType(
								listOf(KTypeProjection.covariant(Int::class.createType()))))){
			println("List<Int>")
		}
	}
}


#19

I wasn’t able to test your code in my environment, but I believe the issue will appear if you try performing methods on the properties. Like instead of println("List<Int>") write println(property.get(Test()).map { it + 2 })


#20

Nope. Using the result of property.get works fine for me. You just have to cast it to the correct type and you have to use unchecked cast because you can not test for the generic parameter.

val any: Any = property.get(Test())!!
@Suppress("UNCHECKED_CAST")
val list = any as List<Int>
// do whatever you want here

You can test that the result is of type List<*> but as far as I am aware there is no way to autocast it to List<Int> that’s why the suppress is needed or you get a warning. But you checked the type before anyways so this should never fail.