Sequential or concurrent processing of actions via coroutines based on some conditions

What would be the best way to implement sequential or concurrent processing of actions via coroutines based on some conditions?

The main goal is to execute same type of actions (e.g. having the same common id) in sequential order and parallel other actions if possible.

The result should be returned for further processing. So looks like it should be suspend function returning the result in coroutines terminology.

Using java executors framework this could be achieved easily by constructing either single thread executor (for sequential processing) or thread pool (for concurrent processing).

The solution, which come to my mind would use Channel like a queue, but resulting implementation does not looks as trivial as when implemented via executors.

The goal is to have such code in Kotlin common (multiplatform) module.

Following is my implementation, which is based on executors:

package com.adeya.android.common.actionsqueue.fooo

import com.google.common.util.concurrent.ListenableFuture
import com.google.common.util.concurrent.ListeningExecutorService
import com.google.common.util.concurrent.MoreExecutors
import kotlinx.coroutines.guava.await
import kotlinx.coroutines.rx2.rxSingle
import java.util.*
import java.util.concurrent.Callable
import java.util.concurrent.Executors
import java.util.concurrent.ThreadPoolExecutor
import java.util.concurrent.atomic.AtomicInteger
import kotlin.random.Random

/**
 * Runs messages with same id sequentially, but with different id concurrently.
 *
 * Note: Rx api are exposed for Java code compatibility.
 */
class ActionsRunner {
    private val executors: List<StateAwareExecutor>

    init {
        val cpuCount = Runtime.getRuntime().availableProcessors()

        executors = (0 until cpuCount)
            .map {
                Executors.newFixedThreadPool(1) as ThreadPoolExecutor
            }. map {
                poolExecutor -> StateAwareExecutor.listeningDecorator(poolExecutor)
            }
            .toList()
    }

    /**
     * Messages which are currently being processed by executor (if any) with pending messages count.
     */
    private val pendingMessageToExecutorMap = HashMap<Long, ExecutorWithMessagesCount>()

    private fun <T> executeInner(
        messageId: Long,
        action: Callable<T>
    ): ListenableFuture<T> {
        val (executor, counter) = synchronized(pendingMessageToExecutorMap) {
            var executorWithCount: ExecutorWithMessagesCount? = pendingMessageToExecutorMap[messageId]
            if (executorWithCount == null) {
                executorWithCount = ExecutorWithMessagesCount(availableExecutor)
                pendingMessageToExecutorMap[messageId] = executorWithCount
            }

            executorWithCount.count.incrementAndGet()

            executorWithCount
        }
        return executor.submit(Callable {
            val actionResult = action.call()

            synchronized(pendingMessageToExecutorMap) {
                val pendingMessagesWithSameId = counter.decrementAndGet()
                if (pendingMessagesWithSameId == 0) {
                    pendingMessageToExecutorMap.remove(messageId)
                }
            }
            actionResult
        })
    }

    /**
     *  Runs message processing with the same id sequentially, but with different id concurrently.
     */
    suspend fun <T> execute(messageId: Long, action: Callable<T> ): T {
        return executeInner(messageId, action).await()
    }

    /**
     *  Runs message processing with the same id sequentially, but with different id concurrently.
     *
     * Note: Rx api are exposed for Java code compatibility.
     * Use [execute] in Kotlin instead.
     *
     * @see ActionsRunner.execute
     */
    fun <T : Any> executeRx(messageId: Long, action: Callable<T> ) = rxSingle {
        execute(messageId, action)
    }

    private val availableExecutor: ListeningExecutorService
        get() = executors.firstOrNull { it.isIdle } ?: executors[Random.nextInt(executors.size)]
}

private data class ExecutorWithMessagesCount constructor(
    val executor: ListeningExecutorService,
    val count: AtomicInteger = AtomicInteger()
)

/**
 * Executor returning [ListenableFuture] and exposing it's active/inactive state.
 */
private class StateAwareExecutor private constructor(
    private val executor: ListeningExecutorService,
    private val isIdleProvider: () -> Boolean
) : ListeningExecutorService by executor {
    val isIdle get() = isIdleProvider()

    companion object {
        fun listeningDecorator(poolExecutor: ThreadPoolExecutor): StateAwareExecutor {
            val listeningExecutor = MoreExecutors.listeningDecorator(poolExecutor)

            return StateAwareExecutor(listeningExecutor, { poolExecutor.activeCount == 0 })
        }
    }
}


You might want to check out actors. You could have one actor per message ID, held in a map. I wouldn’t worry about removing actors from the map, they’re lightweight (unless you’re worried about the map getting too big).

2 Likes

@ebrowne72 thanks for your reply and suggestion.
Indeed CompletableDeferred (mentioned in the documentation) is something, which I was looking for in order to transfer back the result of execution.

There’s only one thing, which looks odd in this api comparing to executor.submit() - necessity to pass this CompletableDeferred in the argument together with original message. But anyway I can work-around this issue by making it implementation detail of my ActionsRunner class and do not expose it to public api.

Now I have to check if there is a way to expose idle state from the actor in order to see, that it does no message processing and should be prefered actor to do the job.

Alternatively (because actors supposed to be cheap) I can consider always launching new actor is there is no actor currently processing actions with specified id and let them be finished after processing.

P.S. yes, I am worried about the map getting too big