Mutable state defined inside or outside coroutines - does it matter?

These might be two stupid questions, but i need to be sure I understood correctly.

In Kotlin documentation there is this example:

// This function launches a new counter actor
fun CoroutineScope.counterActor() = actor<CounterMsg> {
    var counter = 0 // actor state
    for (msg in channel) { // iterate over incoming messages
        when (msg) {
            is IncCounter -> counter++
            is GetCounter -> msg.response.complete(counter)
        }
    }
}

This works because even if the coroutine gets resumed in different threads (by setting a Dispatcher that uses multiple threads), there is a happens-before relationship, so visibility of counter is guaranteed.

But does it have anything to do with the actor state being defined inside the actor? Suppose I have the following example:

var counter = 0
// some other stuff

fun CoroutineScope.counterActor() = actor<CounterMsg> {
    for (msg in channel) { // iterate over incoming messages
        when (msg) {
            is IncCounter -> counter++
            is GetCounter -> msg.response.complete(counter)
        }
    }
}

Assume for the moment that we make sure that the counter variable only gets accessed from the coroutine. Then the happens-before relationship is still guaranteed right? I am asking this because I have a normal Kotlin object, which has some mutable, but private fields. These fields do not get accessed by anything other than a single coroutine that infinitely iterates over multiple channels via select. I cannot define the state inside the coroutine due to inheritance. But I make sure the coroutine is only started once all these fields are initialized.
 

My second question is regarding the @Volatile annotation. I also have the following scenario:

// channels that other parts of the program use for communication
val featureChannel = Channel<Feature>()
val dataChannel = Channel<Data>()
	
// state object that has lots of mutable state, but ONLY accessed from coroutine below
private val state = State()
	
// other parts of the program might read this. do they see the latest info?
@Volatile var latestInfo = state.getImmutableInfo()
	
init {
	GlobalScope.launch (Dispatchers.Unconfined) {
		while (isActive) {
			select<Unit> {
				featureChannel.onReceive {
					state.handleFeature(it)
					latestInfo = state.getImmutableInfo()
				}
				dataChannel.onReceive {
					state.handleData(it)
					latestInfo = state.getImmutableInfo()
				}
			}
		}
	}
}

The field latestInfo gets only written by this coroutine. But from time to time, other coroutines (that are running simultanously somewhere else) might want to read it. Do they see the latest object? As far as I understood, the @Volatile keyword should make that sure the variable is copied to main memory, thus other coroutines should, when accessing latestInfo, always get the newest object.

Is @Volatile even neccessary here?

Not really. It’s no different really than if you define a variable outside a regular lambda that’s executed once asynchronously.

It’s not needed for coroutines as much as it is needed for threading. If there’s only a single thread involved(like Dispatchers.Main) , no you don’t need it no matter how many coroutines there are. If there’s different threads (like the Dispatchers.Default thread pool), then yes, it sounds like you want it to be volatile.

If you want to pass updated data to other parts of your app, consider using a Flow or ConflatedBroadcastChannel (I prefer Flow over channels, it’s a higher level tool).

Thank you for the helpful response.

In my case Flow would not be a good fit I think, because my data sources are intrinsically hot, i.e. originating from outside my application.

Flow works just fine for hot or cold data sources. Notice that all the Channel helper methods like filter or map have been deprecated. That’s because using Flow instead is being encouraged. Channel is being positioned as a low level coroutine primitive for building higher level abstractions (like Flow). Flow seems slated to get further improvements (sharing flows, throttling, representing state) while Channel’s API has been getting trimmed.

While Channel must be hot and always uses cross-coroutine synchronization primitives, Flow can be hot or cold and will only incur synchronization costs if they are needed.

Among others, some of my hot sources are Jetty Websockets, for which I have to use certain method signatures. For example.

	@OnWebSocketMessage
	fun onMessage(message: String) {
		channel.quickSend(message)
	}

Not sure how I could skip channel primitives here (quickSend here is just some kind of offer).

 

But most importantly, I do not see a way to do the same thing with Flows as you can do with select over multiple channels. So for example, I have sources send different types to channels A-F. There is a receiving coroutine that selects over these six channels in a biased way.

So

  • consuming over channels is never done concurrently
  • if channel A has an element in it, I will consume it first, even if channel F has 20 elements in it (because data from channel A is more important)
  • there is no type checking because parameter types of the channels are known at runtime

I agree with you, Channel is the right tool for filling and reading a bunch of queues (Channel is really just a queue).

Note that you can turn any Channel into a “hot” Flow (see consumeAsFlow or asFlow) and you can turn any Flow into a Channel (see produceIn) to use with select. May be useful in some other situation where there’s more going on than just “queue this up”.

Well the Websocket snippet is basically the only point in my program where I am just queuing stuff up.

Most of what I’m doing involves actors that have multiple channels, which are selected upon importance, and after changing their state and making changes to the receiving items, they pass them on. So yes, my Websocket actor in this case only has one single channel of type String, but it creates six different data types DataType1-DataType6 and sends these objects to an Actor that has six channels, each one corresponding to a data type. Each of these objects mutate the state of the actor, so I cannot iterate over these six channels concurrently. Also, keeping a priority of the channels is important. So I am “splitting up” my data, but unlike broadcasting (which sends the same types to multiple channels), I send different types of different channels. Often these multi-channel Actors then re-unite the data, so the actor receiving from the Websocket transforms DataType1-3 to DataType7 and sends it to the next actor, but it sends DataType4-5 (after making changes to the objects) to another actor as the same types. This last receiving actor then again makes transformations, passing the data again, etc…

I just don’t see a way to do this complex workflow that involves changing state, splitting up data and selecting/prioritizing with Flows, except (as you suggested) dropping down to Channel primitives all the time, in which case it doesn’t make much sense to use Flows in the first place.

But I just read your post again and you said

Channel is being positioned as a low level coroutine primitive for building higher level abstractions (like Flow)

So what I am doing is most likely a higher level abstraction. And I was wrong in the sense that it is not the hot/cold type of source that makes Flows not feasible for my use case, but the abstraction.

This is actually exactly what I do and it’s worked out great (though I’ve not dealt with quite the same scenario). Instead of actors, I just have a chain of Flow operators. You could have something like originChannel.consumeAsFlow().doIntitalProcessing().prioritize().processThatChangesPriorities().prioritize().someOtherProcessing().doOtherStuff().collect { ... }

Some nice parts about Flow (besides just being faster in many cases):

  • End of stream is propagated automatically by all the operators
  • The entire chain inherits the context of the consumer which works really nicely with structured concurrency
  • It has many operators to easily (and often cheaply) transform your data
  • Creating custom operators is usually pretty easy.

Consider this generic prioritization operator I just threw together (no guarantees):

fun <T> Flow<T>.prioritize(numPriorities: Int, prioritize: (T) -> Int) = flow {
    coroutineScope {
        val channels = List(numPriorities) { Channel<T>(Channel.UNLIMITED) }
        launch {
            collect {
                channels[prioritize(it)].offer(it)
            }
            channels.forEach { it.close() }
        }
        while (true) {
            val openChannels = channels.filter { !it.isClosedForReceive }
            if (openChannels.isEmpty()) break;
            val nextData = select<T> {
                for (channel in openChannels) {
                    channel.onReceive { it }
                }
            }
            emit(nextData)
        }
    }
}

I still struggle to understand how this could apply to my situation. Correct me if I’m wrong, but as far as I understood a Flow takes a generic parameter, so it has to emit objects of that type. Also, we have to collect objects of a single type (in your code, we have a Flow<T> on which we operate on). But what I want to do is to emit objects of multiple types and prioritize receiving based on the type.

Of course I could do that by chaining Flow operators of type Flow<Any>, but this would mean I would have to type-check every collected object again at every single step, which defeats the purpose. Also, I fail to see how the solution you provided could be faster, as we are launching an additional coroutine that forwards the items to the respective channels. So we have an additional suspension point (compared to sending to the channels directly). I still appreciate the time you take, so thank you.

To send a bunch of different types through the same flow, you’d create a sealed class with a derivative type for each type you are sending.

It sounded like you had a big chain of actors, so I was guessing you had channels between each transformation of data. That’s where flows would be faster, since operators like map are as cheap as a function call.

Ultimately, I use Flow because of its abstraction, not its performance. I find it very easy use (relative to Rxjava). A chain of actors for data processing just reminds me of a chain of Flow operators.

It sounded like you had a big chain of actors, so I was guessing you had channels between each transformation of data.

I do have a chain of actors. But usually Actors have just one channel from which they receive from. Mine have multiple channels from which they receive from, which makes them able to prioritize some types over others (via select).

To send a bunch of different types through the same flow, you’d create a sealed class with a derivative type for each type you are sending.

Yes, but this still means redundant type checking at every transformation, no? After all, I create the object, so I know the type - to send it as an umbrella type just to type check everything later at the receiving end does not feel right.