Why this is blocking and not completing?


class WaitAndNotify {
    private var _subscriptionCount = 0
    private val channel = Channel<Unit>(0)

    val subscriptionCount = _subscriptionCount

    /**
     * Calling coroutine will wait/suspend
     * until another coroutine notify
     */
    suspend fun wait() {
        _subscriptionCount++
        channel.receive()
        _subscriptionCount--
    }


    suspend fun notify() {
        channel.send(Unit)
    }

    suspend fun notifyAll() {
        repeat(_subscriptionCount) {
            channel.send(Unit)
        }
    }
}


 @Test
    fun waitAndNotifyTest() {
        val waitAndNotify = WaitAndNotify()

        runBlocking {
            repeat(10) {
                launch {
                    println("coroutine $it waiting!")
                    waitAndNotify.wait()
                    println("coroutine $it resume!")
                }
            }

            while(waitAndNotify.subscriptionCount > 0) {
                println("notifying")
                waitAndNotify.notify()
            }
        }
    }

Condition in while(waitAndNotify.subscriptionCount > 0) { is always true, so it waits there forever. Replace it with repeat(10) { and it will work correctly. Or maybe you planned to do _subscriptionCount-- after channel.receive()?

1 Like

Updated WaitAndNotify

class WaitAndNotify {
    private var _subscriptionCount = 0
    private val channel = Channel<Unit>(0)

    val subscriptionCount = _subscriptionCount

    /**
     * Calling coroutine will wait/suspend
     * until another coroutine notify
     */
    suspend fun wait() {
        _subscriptionCount++
        channel.receive()
        _subscriptionCount--
    }



    suspend fun notify() {
        if (subscriptionCount > 0) channel.send(Unit)
    }

    suspend fun notifyAll() {
        while(_subscriptionCount > 0) {
            channel.send(Unit)
        }
    }
}

Test code

 @Test
    fun waitAndNotifyTest() {
        val waitAndNotify = WaitAndNotify()
        runBlocking {
            repeat(10) {
                launch {
                    println("coroutine $it waiting")
                    waitAndNotify.wait()
                    println("coroutine $it resume!")
                }.also { it.start() }
            }

            // all the coroutine might be started and waiting
            // notify all
           waitAndNotify.notifyAll()
        }
    }

This code is not really “concurrency-safe” (normally, I would say “thread safe” :wink: ). You tend to assume a specific order in which it executes, but it is concurrent so you shouldn’t make such assumptions without some kind of synchronization. I see at least 3 bugs in it:

  1. waitAndNotify.notifyAll() can be executed before coroutines started waiting (contrary to what the comment says) and in that case it won’t notify anyone.
  2. Similarly, in notifyAll() you can’t assume the counter is already decremented only because you sent something through a channel. notifyAll() may (and probably will) try to notify more than the counter.
  3. Access to _subscriptionCount is not synchronized in any way, so coroutines may overwrite the value of other coroutines.

What do you try to achieve exactly? Do you want to replicate Java’s Object wait/notify feature, but suspending?

1 Like

I want java thread wait and notify mechanism in coroutines :grin:

Thread Safe!

class WaitAndNotify {
    private var _subscriptionCount = AtomicInteger(0)
    private val channel = Channel<Unit>(0)

    val subscriptionCount: Int get() = _subscriptionCount.get()

    /**
     * Calling coroutine will wait/suspend
     * until another coroutine notify
     */
    suspend fun wait() {
        incCount()
        channel.receive()
    }



    suspend fun notify() {
        if (subscriptionCount > 0) {
            channel.send(Unit)
            decCount()
        }
    }

    suspend fun notifyAll() {
        while(subscriptionCount > 0) {
            channel.send(Unit)
            decCount()
        }
    }

    private fun incCount() {
        _subscriptionCount.incrementAndGet()
    }
    private fun decCount() {
        _subscriptionCount.decrementAndGet()
    }
}

TestCode

    @Test
    fun waitAndNotifyTest() {
        val waitAndNotify = WaitAndNotify()
        runBlocking {
            repeat(10) {
                launch {
                    println("coroutine $it waiting")
                    waitAndNotify.wait()
                    println("coroutine $it resume!")
                }.also { it.start() }
            }

            // all the coroutine might be started and waiting
            // notify all
            launch {
                println("notifying")
                waitAndNotify.notifyAll()
            }
        }
    }

val subscriptionCount = _subscriptionCount // will return the same all time :joy::joy:

It is actually an interesting problem and I’m interested in different ways to solve it for coroutines.

First of all, we need to be aware wait/notify is a rather low-level feature, even in Java it is mostly used to create high-level synchronization utils on top of it. If you need it in Kotlin/coroutines then either you do something really exotic or you want it mostly because you used to it and you probably have much better alternatives in Kotlin.

Second, for reasons mentioned above (problems in your code) I think it would be hard to implement this without a critical section between waiting and notifying parties. Otherwise, even while we notify coroutines, another one could wait in the meantime and it is really hard to coordinate them all. Please note in Java we can use notify/wait only after acquiring a monitor object - that means only a single thread could use that functionality at a time.

The most trivial implementation would be to use Mutex to synchronize and inside a locked section either wait or notify. Unfortunately, that would mean we wait/suspend while inside the critical section, which would cause a deadlock. Java supports this case - notify unlocks the monitor even if we are still in the synchronized block. I don’t see a way to replicate this behavior by using a Mutex.

However, we can do it in another way, for example by creating completable jobs:

class WaitAndNotify {
    private val jobs = ArrayDeque<CompletableJob>()
    private val mutex = Mutex()

    suspend fun wait() {
        val job = Job()
        mutex.withLock {
            jobs.addLast(job)
        }
        job.join()
    }

    suspend fun notify() {
        mutex.withLock {
            if (jobs.isNotEmpty()) {
                jobs.removeFirst().complete()
            }
        }
    }

    suspend fun notifyAll() {
        mutex.withLock {
            jobs.forEach { it.complete() }
            jobs.clear()
        }
    }
}

To be honest, I’m not entirely sure this code is correct. Please test it accordingly :slight_smile:

1 Like

Yeah! It is nice and easy :two_hearts: !

I had to implement something similar a while back, and this code seems to do the trick by using a re-entrant lock to prevent deadlocks. Check the comments to see the links I adapted this code from. Again, this is not my original work, but is slightly modified from a couple discussions from kotlinx-coroutines issues.


// Adapted from: https://gist.github.com/elizarov/9a48b9709ffd508909d34fab6786acfe
suspend inline fun <T> Mutex.withReentrantLock(crossinline block: suspend () -> T): T {
    val key = ReentrantMutexContext(this)
    // call block directly when this mutex is already locked in the context
    if (coroutineContext[key] != null) return block()
    // otherwise add it to the context and lock the mutex
    return withContext(key) {
        withLock { block() }
    }
}

data class ReentrantMutexContext(
    val mutex: Mutex
) : CoroutineContext.Key<ReentrantMutexContext>, CoroutineContext.Element {
    override val key: CoroutineContext.Key<*>
        get() = this
}

// Adapted from https://gist.github.com/paulo-raca/ef6a827046a5faec95024ff406d3a692
class Condition(private val mutex: Mutex) {
    private val waiting = LinkedHashSet<Mutex>()

    /**
     * Blocks this coroutine until the predicate is true
     *
     * The associated mutex is unlocked while this coroutine is awaiting
     *
     */
    @OptIn(ExperimentalTime::class)
    suspend inline fun awaitUntil(owner: Any? = null, predicate: () -> Boolean) {
        awaitUntil(Duration.INFINITE, owner, predicate)
    }

    /**
     * Blocks this coroutine until the predicate is true or the specified timeout has elapsed
     *
     * The associated mutex is unlocked while this coroutine is awaiting
     *
     * @return true If this coroutine was waked by signal() or signalAll(), false if the timeout has elapsed
     */
    @ExperimentalTime
    suspend inline fun awaitUntil(timeout: Duration, owner: Any? = null, predicate: () -> Boolean): Boolean {
        val start = System.nanoTime()
        while (!predicate()) {
            val elapsed = (System.nanoTime() - start).nanoseconds
            val remainingTimeout = timeout - elapsed
            if (remainingTimeout < Duration.ZERO) {
                return false  // Timeout elapsed without success
            }
            await(remainingTimeout, owner)
        }
        return true
    }

    /**
     * Blocks this coroutine until unblocked by signal() or signalAll()
     *
     * The associated mutex is unlocked while this coroutine is awaiting
     *
     */
    @OptIn(ExperimentalTime::class)
    suspend fun await(owner: Any? = null) {
        await(Duration.INFINITE, owner)
    }

    /**
     * Blocks this coroutine until unblocked by signal() or signalAll(), or the specified timeout has elapsed
     *
     * The associated mutex is unlocked while this coroutine is awaiting
     *
     * @return true If this coroutine was waked by signal() or signalAll(), false if the timeout has elapsed
     */
    @ExperimentalTime
    suspend fun await(timeout: Duration = Duration.INFINITE, owner: Any? = null): Boolean {
        ensureLocked(owner, "await")
        val waiter = Mutex(true)
        waiting.add(waiter)
        mutex.unlock(owner)
        return try {
            withTimeout(timeout) {
                waiter.lock()
            }
            true
        } catch (e: TimeoutCancellationException) {
            false
        } finally {
            mutex.lock(owner)
            waiting.remove(waiter)
        }
    }

    /**
     * Wakes up one coroutine blocked in await()
     */
    fun signal(owner: Any? = null) {
        ensureLocked(owner, "signal")
        waiting.iterator().apply {
            if (hasNext()) {
                next().also { remove() }.unlock()
            }
        }
    }

    /**
     * Wakes up all coroutines blocked in await()
     */
    fun signalAll(owner: Any? = null) {
        ensureLocked(owner, "signalAll")
        val iterator = waiting.iterator()
        for (waiter in iterator) {
            iterator.remove()
            waiter.unlock()
        }
    }

    private fun ensureLocked(owner: Any?, funcName: String) {
        val isLocked = if (owner == null) mutex.isLocked else mutex.holdsLock(owner)
        if (!isLocked) {
            throw IllegalStateException("$funcName requires a locked mutex")
        }
    }
}

fun Mutex.newCondition(): Condition {
    return Condition()
}

Could you show some example of code where you need it and what kind problem you are trying to solve