Actor examples and Sequences - which is 1.3 "correct"?

Short question: When looking at the Channel and Actor examples, they seem to be written with a diferent style than the “use sequences for unbounded lists of stuff” philosophy.

Specifically the CoroutineScope.filter example - it has a for (x in myReceiveChannel)...

Should/Could these use Sequences? Are sequences for a different class of problem, or are channels and sequences frenemies - two competing ways to deal with creating, processing, and consuming stuff?

Longer Context:
I’m consuming frames from a saved video, merging large groups of them, and writing them back out. I’d like it to

  1. Not die with an OOM (so no unbounded memory consumption with too many frames read before processing them, which is a problem I have with coroutines, and am hoping to solve with bounded capacity producers. )
  2. Be fast (run as much in parallel as possible, while preserving frame order) which I’m holing to get for free with the new coroutine context best-practices.
  3. Be Kotlin 1.3 correct: y’all concerning me that the consumerEach example is @ObsoleteCoroutinesApi

I also found which doesn’t work with 1.3, which heightens the impression of “these are two different strategies.”


I’m not the best person to answer but here’s my understanding of it:

Channels either send or receive a stream of data. They do this asynchronously in their own coroutine.
Sequences on the other hand, are themselves a stream of data.

So when would you want to pick on over the other?

Sequences, like lists, iterators and streams, don’t suspend. So if your problem involves a sequence of numbers that require a long time to compute, then using Sequences would do the computation using the same thread and at the same time for when you retrieve the next value.

Channels support suspension and would allow asynchronous sending/receiving of data without blocking the thread. This would be useful for creating pipelines of processing by chaining channels together. Right now Channels are also “hot”, meaning start running before anyone has asked to retreive data–there’s work on adding “cold” streams which would mean Channels behaving similarly to sequence’s lazy evaluation.

From that discussion, Channels are like "an asynchronous analogue of a standard Kotlin’s Sequence" (kind of… They’re different right now in that Channels are “hot”).

I suspect you will choose to use Channels. Have you seen the Coroutines in Practice talk from KotlinConf2018?

Maybe someone can refine my understanding–I still have a lot to learn when it comes to coroutines. Luckily the material Roman and the team have put out is very good. The Slack coroutines channel is a great resource as well.

It gets even stranger! I moved from a sequence to a channel:

fun CoroutineScope.readFrames(
    fileName: String, 
    filterStr: String? = null): ReceiveChannel<BufferedImage> = produce(capacity = 512) { ...

But then when I went to consume the frames with zipWithNext() - turns out ReceiveChannel doesn’t have it! So then I get lost in rabbit holes seeing if I should convert from a capacity constrained channel to a sequence, or if I’m barking up the wrong trees…

I figured out what I wanted, but not how to do it:

  1. Little chunks of code that take in (a collection of stuff), and produce a (collection of different stuff) – like a map, but not 1:1.
  2. That all run in coroutines, to make use of all my cores
  3. That have reasonable capacity constrained channels so as not to blow up with an OOM.

I don’t know if the “collection of stuff” should be a sequence, or a channel, or what.

I would say that Sequences and Channels are solving two completely different problems. Sequences primarily are for things which are calculable but not yet calculated. By lazy-calculating them, it prevents making calculations that will ultimately never be used (for example mapping then filtering an entire set when you only need to check if ANY exist, which can be short-circuited if, say, the 3rd out of 1000 matches the filter) and also allows for the use of Mathematical-defined sequences which have no end (say a PRNG or a Geometric Series).

Whereas a Channel is an asynchronous contract between two entities: a sender and a receiver. It solves any potential concurrency issues by suspending a send/receive request until no other thread is using it. It is not really accurate to say that Channels are lazy-loaded because how the channels get loaded is entirely dependent on the sender.

There are some problems that can be solved with either, but it’s unlikely that both really fit the problem space’s needs very well.

See, you have to be careful about things like that. Asynchronous code is not “inherently faster” in fact all that messaging and coordination are inherent slowdowns. Obviously the goal is to use all 8, 16+ cores that you have at the same time, so you can’t just synchronously single-thread everything, but you have to be clever how you set that up; just saying “I have 10,000 jobs to do, so I’m going to make 10,000 workers to do the job and offload all the heavy-lifting to the async engine” is probably just going to cause a paper jam.

I’m seeing your particular problem as something like this:

  • Some sort of Frame Miner is decoding the video and producing a Queue of Jobs (implemented as a Channel), but IT has to be clever enough to not fill the queue with 1 billion jobs faster than the workers can handle because frames will be non-trivial in terms of memory size.
  • You have dedicated background workers in a set number (say 8) each asynchronously consuming from this Channel and doing work.

In this setup, the purpose of the Channel is not anything super fancy or clever, it is simply to coordinate concurrency between these 8 background workers to make sure that no item on the channel is double-got and none are missed. A Sequence simply can’t do that by itself (you would need to do the locking/unlocking logic manually and then it becomes blocking as opposed to suspending).

YES! You get it exactly.
My hope was that a channel with a set capacity would free me from that manual allocation of background workers. If I throw everything-at-everything, the Frame Miner will race ahead… until it fills up the channel capacity. Then it starts blocking some percent of the time when it tries to insert a new job, and the CPU power that would have gone into reading a frame wanders off and finds another coroutine to do, which in my case are the consumers.

Well coroutines are definitely going to help you, but they aren’t going to do ALL the work for you. In particular, the Coroutine structure isn’t inherently going to know how many background workers to allocate. If you just do a greedy-type “Allocate a new Coroutine for every new frame” what’s probably is going to happen is that jobs get created faster than are fulfilled and you’ll eventually balloon into an OOM problem as memory-intensive job objects get offloaded into an ever-growing number of workers. Having a Channel with a set capacity (I’m not sure what that looks like, but I’m sure that can be done) will definitely work, but only assuming you already have a SET (or otherwise heuristically clever) number of background consumers. If you just let the number of background workers grow indefinitely, then their memory requirement will grow indefinitely.

So you probably want

  • N Background workers that consume the channel.
  • A Channel with a maximum capacity of N*2 or thereabouts

Ideally N = the number of cores that someone has. In practical terms, how you figure this number is kind of one of those neverending problem.

So basically a Channel with a set capacity is HALF of the problem, but the consuming cores are the ones that REALLY need to have their capacity set ahead of time (or otherwise cleverly set based on how they perform).

Agreed. But at this point I’d be happy with a single thread for every node, and everything not crashing.

I gave up, ripped out all Channels, Sequences, and Coroutines, and went back to plain Java Thread + Blocking Queue. This makes me sad because Kotlin is usually so much more clear.

Reasons why:

  1. Sequences were a nice way to consume frames, but ran out of memory if I tried to get fancy running in parallel.
  2. So I moved to Actors + Channels, but javacv.FFmpegFrameRecorder started crashing java.
  3. So I tried to do something that limited race-ahead memory usage, but Channels and Sequences are two different teams/philosophies. There aren’t good examples on how to use them together that I have been able to find.
  4. Coroutine scope - I don’t quite get it yet. It appears that not all the examples have moved to best practices yet. (Some use GlobalScope but then other places say don’t use that, and I could enclose something in runBlocking, coroutineScope, suspend, produce, etc)

In my experience, you really only need a channel when you need to communicate information in-sequence between 2 different coroutines. So sequences should be used when performing list transformations while channels should be used when sending information between distinct coroutines.

For limiting race-ahead memory usage, I ran into a similar problem when launching a bunch of coroutines which all attempted to make a connection to a mongo database simultaneously. I ended up building a suspending semaphore based on the Mutex implementation in kotlinx.coroutines for this. It worked really well.

In terms of coroutine scope, basically what happens is if you launch another coroutine from within your current coroutine, the new coroutine becomes a “child” of the launching one. Therefore, if you cancel the launching coroutine, all the child coroutines get cancelled as well and don’t linger.

To clarify a few other things:

  • GlobalScope.launch is used to fire-and-forget a coroutine onto the Default dispatcher’s thread pool when you’re not within another coroutine (i.e. on the main thread or something). It’s good for launching coroutines while not inside another coroutine, but if you are inside a coroutine you shouldn’t use it because the new coroutine won’t become a “child” of the launching one like I mentioned earlier.
  • runBlocking allows you to run suspending functions within your current thread. It does not launch coroutines onto the default thread pool like GlobalScope.launch does, so unless you explicitly launch new coroutines using Dispatchers.Default any coroutines you launch will be kinda-sorta sequential because they’ll all be running on the same thread.
  • The only use I can think of for using coroutineScope is to group a bunch of “child” coroutines into a single job so you can cancel them all at once.
  • Suspend is a modifier on functions which turn them into “suspending functions”. This makes it so when you invoke the function it automatically becomes a child of the parent coroutine and can do things like launch(), async(), etc.
  • Produce is part of the experimental coroutines API and gives you a “send()” function within it so you can just generate a bunch of results on a channel easily. The opposite is actor (also experimental) which provides a channel that you can send stuff into which it will operate upon via the “channel” member exposed inside the scope.

Hope that cleared some things up for you, coroutines are a little confusing at first but they make async stuff very easy in my opinion. Feel free to ask more questions if you need help!

1 Like

I learned today that coroutineScope is needed when nesting coroutine invokations, to correctly clean up when there are errors in the inner coroutines.
Otherwise, the next call to any coroutine in that CoroutineScope implementing class will end up with the wrong coroutine continuation, somehow.
And when your class doesn’t implement CoroutineScope, you can’t use launch { ... } or other constructs that create coroutines unless you get passed a CoroutineScope or CoroutineContext from elsewhere and call launch{} or async{} / await() on that.

Ah, I had assumed suspending functions would just inherit the scope of the calling coroutine. Interesting that we now have to define scopes using this builder now. It’s a little annoying, but I guess I’ll get used to it.

Good to note though: coroutineScope() builder causes ALL child coroutines to be cancelled if one of them throws an exception while supervisorScope() allows the child coroutines to keep going if one throws an exception.

Argh, still confused.
I’ve got a perfectly cromulent fun videoFilesToBufferedImages(vararg files: File): Sequence<BufferedImage> { return sequence { ... } } that decodes a list of mp4s. Great!

Now I want to have it work in it’s own core. So I naively toss in a coroutineScope { ... }
but you can’t do that, because “Error: Kotlin: Restricted suspending functions can only invoke member or extension suspending functions on their restricted coroutine scope”

erk. So maybe I need to wrap my sequence in a producer to make a channel. But the producer sigle thread docs page has a big block of NOTE: This API will be replaced in the future.

So I’m not sure if there is a “right” way to do it right now.

And kinda mixed up in the runBlocking(Dispatchers.Default) { ... launch(Dispatchers.Default) { ... do things ... } }

Update: Flows makes this all much more fun.