Coroutine Actors and thread-safety

I’m reading up on the excellent coroutine guide, but I’m left with a couple questions when it comes to actors. I’m repeating the example here for clarity:

fun counterActor() = actor<CounterMsg>(CommonPool) {
    var counter = 0 // actor state
    for (msg in channel) { // iterate over incoming messages
        when (msg) {
            is IncCounter -> counter++
            is GetCounter -> msg.response.send(counter)
        }
    }
}

So because this is a coroutine, the actual thread that counter++ executes on can vary, yet the counter can be a primitive and not an AtomicInteger. How does that actually work? (the docs say “confinement of the state to the specific coroutine works as a solution to the problem of shared mutable state”, but I’d like details)

Next question: in my experimentation, it seems safe (in that I get the expected result) to move the mutated variable (counter, in this case) up a line, just outside of the actor. Is this actually safe, or a bad idea? I’m guessing the latter, but I was surprised that it seems to work.

I’m writing some code for a local conference about Kotlin and Vert.x integration.
Maybe it is possible to switch from callback hell to actor style, i wrote some code about this.
This is the reference:

Because the actor must elaborate messages sequentially, I managed backpressure of Vertx ReadStream: when a new message arrives the stream is paused, after message elaboration the stream is resumed.
The concurrency is around the actors, not inside the actors.

Disclaimer: the code isn’t fully working and it is designed only for demostrative purpose. Please share your feedback about it. Thank you.

The counter inside actor is safe because execution execution inside a coroutine is sequential in the precise, concurrency-theoretic definition of this term, i.e. all the operations in the coroutine are totally ordered with “happens before” relation (a book on concurrency theory like “The Art of Mutliprocessor Programming” is a highly recommended reading to understand it better).

So, there is no concurrency in a coroutine. How is that achieved? There are two cases. Execution in between suspension points follows normal JMM rules that state that operations on a single thread establish happens-before relation. Execution around suspension points has happens-before relation, because it is established by the synchronisation primitives that are used by implementations of suspending functions.

With respect to where the counter is defined. As long as you have only one instance of counterActor running, it does not matter whether counter variable is define inside the actor or outside of it. However, moving it outside of the code of counterActor is a bad idea for the same reason as global variables are a bad idea. If you start two instances of counterActor, then they will conflict on the global counter variable.

3 Likes

I think this is a very good point. What I would suggest is some approach to using coroutines in Kotlin similar to channels and goroutines in Go. This is much more readable, is much less prone to deadlocks and race conditions. It is much easier to see where it happened and much easier to fix it.

Problem is the lack of continuations. So you define a solution where a method may never do a take on more than one channel in a method and the take must be the very first statement in the method to make sure no temporary variable has been set. Why that? Let’s say a method was invoked that does a take on a channel and would block, because the channel is empty. So the thread is withdrawn from this method to serve channels that are not empty. Once the channel receveives some data, the method is called again. No state needs to be remembered as the take on the channel must always be the first statement in a method that does a take.

Wouldn’t be as powerful as in Go, but much easier to write, read, debug and fix and still powerful enough. When do you really need to do a take on more than one channel within one method? That is very rare and can be neglected, but you get a much more readable way of doing things concurrently. Only the guy implementing this general solution needs to know about the suspendable keyword. I don’t thing the masses will grok this thing with the suspendable. It is too powerful, requires too many things to be understood. So people will run into trouble and as a conclusion will just revert to CompletableFutures.

I’m not sure I understand the problem that you have outlined in your last message. What exactly do you miss in Kotlin? Could you please find some time and clarify your last message with an example? You can give an example in Go.

The idea is a bit wild and it may not be feasible. But it is some suggestion to put a facade in front of the Kotlin coroutine stuff for the user to be easier to use for the most typical concurrency needs. For other things the user can still revert to declaring Kortlin funs as suspendable.

Given the following sample code in Go:

package main

import (
	"fmt"
	"math/rand"
)

func foo(channel1 chan int, channel2 chan int, done chan bool) {
	temp1 := rand.Intn(100)
	int1 := <-channel1 // X
	temp2 := rand.Intn(100)
	int2 := <-channel2 // Y
	fmt.Println("value: ", (temp1 + int1 + temp2 + int2))
	done <- true

}

func main() {
	done := make(chan bool)
	channel1 := make(chan int)
	channel2 := make(chan int)

	go foo(channel1, channel2, done)

	channel1 <- rand.Intn(100)
	channel2 <- rand.Intn(100)

	<-done
}

In function foo blocking takes are done on channel1 and channel2. Let’s say at the time when foo is invoked, channel1 was empty. So the Go runtime withdraws the OS thread serving the goroutine (go foo(…)) and assigns it to some code that takes from a channel that is not empty. Later, some item is added to channel1 and foo is invoked again. So the thread of execution will now jump to the blocking take of channel2 (line Y). But for this to work the contents of temp1 had to be remembered from the invocation of foo just before. For this to work on the JVM you would need some continuations framework to be put in place to remember the stack context. This is what Quasar does. Now the Kotlin guys don’t want to take this approach, because it requires byte code insertion that might interfere with other libraries that generate byte code (e.g. Hibernate), etc.

So my idea is that some channel type becomes a language built-in type in Kotlin as in Go. To avoid the need of continuations the Kotlin compiler would make sure the very first statement in a method that does a take on a channel is the take on the channel itself. This way there can be no temporary variable like temp1 that needs to be remembered as in the scenario described before. The Kotlin compiler makes sure than be no other take on a channel inside a method. This way there can only be one take on a channel and the problem with saving temporary variables to the context cannot repeat as in the Go sample above that takes from two channels.

Okay, the whole thing is a bit wild. But it is the idea that counts ;-). The whole purpose of this is to explain the usefulness of a simpler interface à la Go. And for the user much much easier to deal with exactly as in Go. The solution does not have the full power as in Go, but is still sufficient for many purposes.

Frankly, I don’t understand what you are looking for. Kotlin coroutines provide all the framework that is needed. Here, I rewrote the Go code that you’ve presented into Kotlin line-by-line (using channels from kotlinx.coroutines library) and it works exactly like it works in Go. You can run it and verify for yourself. What is missing? Why do we need channels in the language when they work just as well as a library?

import kotlinx.coroutines.experimental.channels.*
import kotlinx.coroutines.experimental.*
import java.util.concurrent.ThreadLocalRandom as Rnd

suspend fun foo(channel1: Channel<Int>, channel2: Channel<Int>, done: Channel<Boolean>) {
    val temp1 = Rnd.current().nextInt(100)
    val int1 = channel1.receive()
    val temp2 = Rnd.current().nextInt(100)
    val int2 = channel2.receive()
    println("value: ${temp1 + int1 + temp2 + int2}")
    done.send(true)
}

fun main(args: Array<String>) = runBlocking<Unit> {
    val done = Channel<Boolean>()
    val channel1 = Channel<Int>()
    val channel2 = Channel<Int>()
    launch(context) { foo(channel1, channel2, done) }
    channel1.send(Rnd.current().nextInt(100))
    channel2.send(Rnd.current().nextInt(100))
    done.receive()
}

UPDATE: You can read more about all the capabilities (including select) in the guide: kotlinx.coroutines/coroutines-guide.md at master · Kotlin/kotlinx.coroutines · GitHub

1 Like

Having dabbled with coroutines a bit now, I do think we will need a couple more higher-level facilities, but I don’t think the use case being discussed here really needs improving. The idea that holding onto a single int or reference while waiting for other coroutines to finish being a problem is a bit ridiculous. (and yes, I realize it could be more state than that, but we’re still talking about some transient temporary state within a single method call).

Hey, this is cool! Woohoo :-). All right, I didn’t knew this were possible. I didn’t spent that much time reading about Kotlin coroutines. But I thought I once read something that it doesn’t make use of continuations contrary to Quasar because of byte code manipulation. So I thought it could not be done. Thanks for the code sample. It should be added on the samples page. Go-style concurrency as a sample would be really attracting people’s attention.

It is there is the guide. Lots of Go-style samples. I really suggest to go forward and read it.

Yes, those Go-style samples look good. Glad I discovered that link at least ;-). What I still don’t understand is whether Kotlin’s coroutines now use continuations or not with or without byte code injection. My sample was set up to make sure it can only be handled correctly with some sort of continuations …

Kotlin coroutines are based on continuations (delimited continuations to be precise). You can learn more about their design and implementation in the corresponding design document: coroutines-examples/kotlin-coroutines-informal.md at master · Kotlin/coroutines-examples · GitHub

One thing that struck my mind is whether Kotlin Coroutines suspendable methods can also be written in Java besides Kotlin. There is no suspend keyword in Java. So the suspendable method has to be written in Kotlin and can then be called from Java, but cannot be written in Java straight away? Or is suspend an annotation that has some representation in Java?

Thanks, Oliver

They have to be written in Kotlin, because they have to have Kotlin-specific metadata attached to them, so that Kotlin compiler recognizes them as having suspend modifier.

Hi Roman,

one more thing that struck my mind… . The Kotlin sample you provided in this thread does not rely on kotlinx.coroutines.experimental.channels.RendezvousChannel being used, right? So I could plug in my own implementation of kotlinx.coroutines.experimental.channels.Channel? I’m thinking of distributed channels using Hazelcast. That’s why.

Thanks, Oliver

I have actorA, actorB, actorC, They will send message each other. Is Actor thread-safety if I send message to actorA in actorB or in actorC. And what about actor.close, actor.isFull