Conditional concurrency

I am reading data objects from a queue. Each data object has a type, for instance “color” (eg. yellow, green, orange) . I would like to process all data objects of the same color sequential, but objects of different colors should be processed in parallel. I have been studying coroutines. So far, I didn’t find a solution. Anybody has an idea?

I guess you need to keep a separate mutex/queue/dispatcher per color. This isn’t much different than with threading.

Yes, that would be an option, but I thought there may be nicer, less consuming, cleaner way to do this with coroutines or vertx or something like that. In my example I have three colors, but in my real code I will have 1000s of different “colors” (in reality something else) and I don’t want to create a thread for each of them. I would like the amount of threads to be limited to a maximum

Please remember while starting 1000 threads is clearly wrong, starting 1000 coroutines is fine and actually a recommended way. Again, depending on your case you can use different tools, but in the end of the day I think it can’t be done without 1000 separate queues, either defined explicitly (channel), or implicitly (mutex, dispatcher).

But maybe there is a better solution.

Dispatcher is not a mutex neither a semaphore.

Channel+coroutine is an actor or an agent.
Running many actors is not an issue, they require lesser memory than threads.

Project Reactor has a groupby function, to split a single flux in independant groups. However, the javadoc states it works best with a low number of groups, so I am not sure it is reliable under heavy workload.

But it is a very easy way to prototype the wanted logic.

I don"t see any “groupBy” operator on kotlinx.coroutine flows, but I think implementing it should be possible. Technically, it would be similar to @broot idea : manually dispatch elements to a group (a group would then be a sort of flow), and publish groups as elements of a root flow as they appear.

Although, if you know in advance all possible groups, you can also “just” subscribe to the input flow once for each group (if it is a cold flow, or a multicast flow), and just filter elements for each specific group, something like that (pseudo-code) :

for (Group group : groups) {
    launch {
        myFlow.filter { it.belongsTo(group) }
              .collect { group.process(it) }
    }
}

I mean if we have N groups, then whatever util we’ll use, internally there probably has to be N queues of some sort.

  • If we use N dispatchers, then each of them keeps a queue of scheduled jobs.
  • If we use N mutexes, each of them keeps a queue of waiting coroutines.
  • If we use N channels + N consumers, each channel is a queue itself.

I assume first two technically speaking store queues of continuations. The last one stores queues of application-level objects.

Of course each solution is different: dispatchers limit parallelism, but not concurrency. Channels are good for submitting tasks asynchronously, mutexes for waiting for them. But each of them should be fine even for thousands of groups and should not be hard to implement

If we use N dispatchers, then each of them keeps at least one queue of scheduled jobs.
Moreover, a coroutine can not be scheduled on a dispatcher, for example calling mutex.lock() on a busy mutex releases the dispatcher’s thread and enqueue the coroutine on mutex.

If we need “to process all data objects of the same color sequential” then we are looking for a queue (Flow/Channel).

You can have a single producer coroutine that reads from the queue and produces to the correct colored channel based on the color. Then setup one consumer per channel

If you want to limit the number of parallel queues, you can assign the queue based on a hash-value of the “color”, modulo the number of queues.