Need to send twice in a rendevouz channel

Hey, I’ve got a puzzling issue involving channels in Kotlin. I’m currently trying to send a signal between one part of the code, that gets executed upon a pre-defined console command “stop”, and the part, that actually shuts down the program.

The problem is, that the stop command has to be sent twice, before the signal is sent successfully.

Here’s the first part:

        private val commandRegister: Map<CLICommands, CLICommand> = mapOf(
        CLICommands.STOP to CLICommand(
            "stop",
            true,  // always enabled!
        ) {
            logger.debug { "Sending shutdown signal!" }
            ChannelManager.getChannel<Unit>("shutdown_signal")?.send(Unit)
            return@CLICommand null
        }
    )
    
    suspend fun listenAndRun() = coroutineScope {
        logger.trace { "Started listening for console commands." }
        while (isActive) {
            val input = readlnOrNull()
            input?.let { readLine ->
                logger.debug { "Read: $readLine" }
    
                commandRegister[CLICommands.getFromString(readLine.split(" ").first())]?.let { cliCommand ->
                    if (cliCommand.enabled) {
                        val response = cliCommand.action(input.split(" ").drop(0))
                        response?.let { println(it) }
                    }
                }
            }
        }

Note this part in particular: ChannelManager.getChannel<Unit>("shutdown_signal")?.send(Unit)

This is the part, that actually sends the signal. It reliably gets executed upon sending the “stop” command to console.

The next part is the one, that receives the signal and subsequently shuts down the program. It currently resides in the main function:

// Listen for a shutdown signal:
launch {
    logger.trace { "Listening for shutdown..." }
    ChannelManager.initializeChannel<Unit>("shutdown_signal").receive()
    ChannelManager.closeChannel("shutdown_signal")
}.invokeOnCompletion {
    launch {
        logger.info { "Shutting down..." }
        mainLoop.cancelAndJoin()
        logger.info { "Goodbye o/" }
    }
}

After receiving the signal, the shutdown process works as expected.

The channel is created and retrieved from the ChannelManager object. Here is a link to the code for that one, if needed.

Another peculiarity, I’ve noticed is, that using `Channel$trySend` always fails.

I hope, this is all the information you need. Any help would be greatly appreciated, thanks.

My guess would be it is blocked in readlnOrNull(). How is this function implemented? If it performs a blocking I/O underneath, then this is not cancellable. If this is correct then it doesn’t wait for two “stop” commands, but for “stop” and then for anything, even an empty line.

Thanks for the reply ^^

listenAndRun is launched directly from the main function:

fun main(args: Array<String>) {
    runBlocking {
        startUp(args)

        val mainLoop = launch {
            logger.debug { "Launched main loop." }
            launch { CLIManager.listenAndRun() }
        }

        // Listen for a shutdown signal:
        launch {
            logger.trace { "Listening for shutdown..." }
            ChannelManager.initializeChannel<Unit>("shutdown_signal").receive()
            ChannelManager.closeChannel("shutdown_signal")
        }.invokeOnCompletion {
            launch {
                logger.info { "Shutting down..." }
                mainLoop.cancelAndJoin()
                logger.info { "Goodbye o/" }
            }
        }
    }
}

The method passed to the CLICommand instance is the one, that gets executed (inside listenAndRun), when the “stop” command is triggered.
I couldn’t confirm your theory, that readlnOrNull is blocking the execution somehow. The code, that sends the signal is reached every time the “stop” command is sent. (And it has to be this exact command even the second time.)

Do you have another idea, what the issue may be? Would it help, if I’d send you the entire project or some details, I can dig up with the debugger? Lmk

Again, what is the code inside readlnOrNull()?

readlnOrNull is part of the standard library.

Ahh, ok. Then as I said, it is a blocking operation. After you send “stop” command, it gets to the readlnOrNull() line again and stays there until you send another line of input. I can’t exactly explain why it requires another “stop” command specifically, because as I said, I would expect it to unblock after sending anything to it. Maybe I don’t see something. Anyway, I would definitely expect this code to not stop after the first “stop” command.

Ok, I think I know why it requires specifically another “stop”. It won’t be that easy to explain, but the root cause is that you run blocking IO (readlnOrNull()) inside a coroutine, which is not allowed. You will get different kinds of problems while doing this.

Okay, thanks for the answer.
I don‘t entirely get the details, but its also my first time extensively using coroutines.
What would you suggest doing instead? Should I run it in another thread entirely?

I can’t easily experiment with your code, so I can only make educated guesses, but I think the easiest you could do is:

  • readlnOrNull()withContext(Dispatchers.IO) { readlnOrNull() }
  • Do System.in.close() somewhere in the shutdown hook.

Full answer would be much longer. Blocking I/O is generally a pain, and not only if using coroutines - I think you might have similar problems if using threads as well, because reading from stdin is almost not stoppable. Usually, we handle these problems by either:

  1. Using non-blocking I/O.
  2. Running blocking I/O in a background thread. But that doesn’t solve the problem entirely as we still can’t cancel the I/O.
  3. Close the resource from another thread.

I believe we can’t do 1. for stdin in Java (I’m not sure about this). But in your case a combination of 2. (withContext(Dispatchers.IO) and 3. (System.in.close()) should probably do.

I think the problem is that the channel isn’t being created before you first try to use it. I might be wrong here, so apologies if I’m misreading your code, but this is how I understand what’s happening.

  1. Your program starts, and runs the startUp() function.
  2. Your program launches the CLIManager.listenAndRun()
  3. listenAndRun() starts executing, until it hits the readlnOrNull() line and waits for user input
  4. You enter the “STOP” command on the terminal (I’m assuming)
  5. It takes your input, finds the associated command in your commandRegister, and runs its action.
  6. The STOP command has an action which tries to get the “shutdown_signal” channel and send a Unit on it.
  7. Here’s the kicker; the code to create the “shutdown_channel” hasn’t run yet, so the STOP action doesn’t receive a channel, and doesn’t send anything.
  8. I actually don’t know how it continues at this point; I think it should be looping forever inside listenAndRun.

The trick with coroutines is that (as far as I understand) they are effectively an event loop; if you have a single thread, only one coroutine is running at a time. So when you fire off a launch, until the code inside that launch hits a suspend point and actually suspends, the thread never leaves that launch to execute the rest of your code. So I think the problem is that your mainLoop job is running and listening for input before the rest of your main function gets to run, IE your channels haven’t been set up.

I could be wrong on my theory, though, because if no Channel has been setup, I don’t see why listenAndRun would ever suspend, so I think you should be stuck inside that while loop forever. An easy way to test if my theory is correct is to modify your STOP command’s action a bit; store the Channel you receive in a variable, then debug log if the variable is null. That way, you can see if on the first time you send your stop command, the Channel to receive the stop command hasn’t been created yet.

Ahh, good catch. I didn’t notice the sending side could potentially quietly decide to not do anything. I still believe this is not what’s going on here, partially for the reason you mentioned - it should block forever.

I believe in most cases the body of launch() won’t be started before the caller fully finishes its block. We don’t have any guarantees about the execution order here, but from my experience the dispatcher created by runBlocking() executes coroutines in FIFO order. First it gets through the whole runBlocking block, scheduling two launch blocks. Then it executes the mainLoop block, but this block only schedules another one. Then it executes the receiver side, creating the channel. Only then it executes listenAndRun(). This is only my educated guess, because again - we don’t have guarantees on ordering.

But you are right about your main point: if we start executing listenAndRun(), it entirely hijacks our only thread, so no other coroutine can run until we unblock the thread. And in the current implementation the listenAndRun() unblocks the thread only in one case: when we write “stop”. After the first stop it starts executing the shutdown procedure, but then it gets hijacked by listenAndRun() again. After entering “stop” again, it is able to finish the procedure.

Well you’re right; I wrote a very simple program that does a launch with a print, and a print after the launch, and the print inside the launch happened second. I’m sure I’ve seen the opposite behaviour before… weird.

I tried to replicate what I think the issue is with the following code:

fun main() {
    runBlocking {
        var shouldRun = true
        val testSuspendingFunctionThatDoesntSuspend = suspend {
            val channel: Channel<Unit>? = null
            channel?.send(Unit)
            println("sent nothing to null channel")
        }
        launch {
            println("Hello from launch")
            while (shouldRun) {
                testSuspendingFunctionThatDoesntSuspend()
            }
        }

        println("Hello")

        launch {
            println("should not run anymore")
            shouldRun = false
        }
    }
}

This code does loop forever though… so I guess it must be something different going on in eingruenesbeb’s code.

I don’t understand how this is the case… but I think listenAndRun is running first, before the Channels are set up, then somehow it suspends (this is the part I don’t understand - why does it suspend?) which gives the other launch in the main function an opportunity to run and set up the Channels, so from then on, the shutdown channel exists and the shutdown command can be properly processed.

That depends on the used dispatcher. I believe if using multi-threaded dispatchers, we often see the last launch() to be executed first, then the rest in more or less FIFO order. I guess this is an optimization to skip some dispatching. Thread that was used to launch new coroutines is free to do something, so it immediately picks up the last scheduled coroutine, without pushing it through the dispatching process. This is irrelevant for a single-threaded dispatcher - it can use simple FIFO. But again, this is only my guessing, we don’t have guarantees on such behavior.

I’m not sure if I understand your example correctly, but please note in the original code there was launch inside launch on the sending side. That makes it run after the receiving side.

True… launch inside launch. So execution order (roughly) would be:

  1. mainLoop
  2. setting up channels
  3. listenAndRun()

I dunno why it’s being weird, then. I guess I would just debug the code to try and see why the first stop command doesn’t actually stop things. (Or use lots of print statements if debugging won’t work)

I see 3 possible explanations, all of them caused by the same root cause.

1. It gets to receive() first, then we enter “stop” and it gets to send(). send() unblocks the receiving side, so both coroutines are now active, but it continues with the sending side, it gets to another loop of readlnOrNull() and blocks the thread. Receiving side still didn’t get a chance to resume from receive(). After we enter “stop” again, we get to send(), then it can switch to the receiving side and shutdown everything. I think this is what is happening here.

2. Similarly to above, but it decides to resume from receive() first, before continuing from send(). Then it gets to launch() in invokeOnCompletion(), it resumes from send() and the rest is the same as above.

3. It somehow got to a situation where everything is cancelled already, but it stays blocked in readlnOrNull(), so it ignores the cancellation. Only after entering “stop” it gets to send(), so it could cancel. But this scenario doesn’t seem possible to me.

Thank you for all the suggestions on what exactly could be going wrong.

Regarding the theory, that the channel has simply not been created yet:
I’ve never had any issue with that. But you’re right @Skater901. I should probably ensure the channel exists before trying to send anything on it. Something along the lines of a getOrInit function.

Now to the possible explanations:
It seems to be best described by @broot in scenario 1, where readlnOrNull is reached, before the corresponding coroutine is cancelled.
I should note, that the readlnOrNull() call is now inside a withContext(Dispatchers.IO) {} block. Here’s the new listenAndRun():

suspend fun listenAndRun() = coroutineScope {
    logger.debug { "Started listening for console commands." }
    while (isActive) {
        val input: String? = withContext(Dispatchers.IO) {
            logger.trace { "Awaiting console input..." }
            readlnOrNull()
        }
        input?.let {
            runCommand(it)
        }
    }
}

From some further testing, I’ve concluded, that the following seems to happen, when sending the “stop” command:

  1. Inside CLIManager.listenAndRun() the blocking IO readlnOrNull returns with the input from the console.
  2. runCommand is called with the input as an argument, selecting the “stop” command from the commandRegister and calling the action function from it.
  3. In the action function a Unit is sent on the channel with the “shutdown_signal” id, if the channel exists.
  4. The execution returns to listenAndRun, which loops back to listening to the console input.
  5. Another coroutine launched in main receives the object from the channel, discards it and completes, promptly starting another coroutine, that cancels and joins the mainJob.
  6. Every cancel-able coroutine in mainJob is well… canceled except for the one running CLIManager.listenAndRun(), which in turn blocks until another input is received.

At least, that is, what I’ve extrapolated from the log:

17:58:04.914 [main @coroutine#1] INFO  i.g.e.secommandchangelogbot.Main - Loading up "SE Command Changelog Bot" version 0.0.1...
17:58:04.919 [main @coroutine#1] TRACE i.g.e.secommandchangelogbot.Main - Trace level is enabled.
17:58:04.932 [main @coroutine#1] INFO  i.g.e.secommandchangelogbot.Main - 
+-----------------------------------------+
| Welcome to the SE-Command-Changelog Bot |
|             Version: 0.0.1              |
+-----------------------------------------+
17:58:04.940 [DefaultDispatcher-worker-2 @coroutine#4] TRACE i.g.e.secommandchangelogbot.Main - Listening for shutdown...
17:58:04.940 [main @coroutine#3] DEBUG i.g.e.secommandchangelogbot.Main - Launched main loop.
17:58:04.946 [main @coroutine#5] DEBUG i.g.e.s.managers.CLIManager - Started listening for console commands.
17:58:04.948 [main @coroutine#6] DEBUG i.g.e.secommandchangelogbot.Main - I can run!
17:58:04.954 [DefaultDispatcher-worker-2 @coroutine#5] TRACE i.g.e.s.managers.CLIManager - Awaiting console input...
17:58:09.950 [main @coroutine#6] DEBUG i.g.e.secommandchangelogbot.Main - I can run!
stop
17:58:13.534 [main @coroutine#5] DEBUG i.g.e.s.managers.CLIManager - Read: stop
17:58:13.545 [main @coroutine#5] DEBUG i.g.e.s.managers.CLIManager - Sending shutdown signal!
17:58:13.548 [DefaultDispatcher-worker-1 @coroutine#5] TRACE i.g.e.s.managers.CLIManager - Awaiting console input...
17:58:13.551 [main @coroutine#7] INFO  i.g.e.secommandchangelogbot.Main - Shutting down...
17:58:13.573 [main @coroutine#6] DEBUG i.g.e.secommandchangelogbot.Main - I've been canceled!

18:05:43.633 [main @coroutine#7] INFO  i.g.e.secommandchangelogbot.Main - Goodbye o/

Does that mean, that a band-aid fix would be to simply ensure, that the coroutine running listenAndRun is suspended before and whilst canceling mainJob?
I guess an alternate solution would be to write a coroutine-friendly version of the read method of the underlying BufferedInputStream, that suspends instead of blocking, when no further input is available. But that strikes me as… complicated.

What do you suggest, that I should try?

Again, there are 2 separate problems related to readlnOrNull() blocking the thread. One problem is that it blocks other coroutines from running and this problem is solved by using Dispatchers.IO. Second problem is that the blocking operation can’t be cancelled/aborted. I’m not 100% sure about this, but I think there is no way in Java (this is really unrelated to Kotlin and coroutines) to make attempt to read something from stdin and then cancel the read operation. In your specific case, assuming you need to shutdown the whole application, I think the easiest is to close System.in - that will unblock readlnOrNull() by throwing from it. Alternatively, we can read using a daemon thread and detach the reading from the structured concurrency. It won’t cancel reading, it will simply allow the application to finish while it still tries to read.

I think it’s time to take a step back and ask “How do you want your program to operate?” Is the intention that executing commands is asynchronous, and commands can be queued up? If not, maybe you don’t need to use a Channel, and can just wait for a command to finish executing before accepting the next one.

If it does need to be asynchronous with queuing commands… the only real answer I can see is a special case in your input logic that says “If the user enters the stop command, exit this loop”.

I’ve found a solution, that works (at least for the “stop” command).
As listenAndRun doesn’t loop until this command (at least the defined action) is done. Therefore, I can simply set a flag safeToListen and be sure, that it is set before the next time readlnOrNull is reached.
With that, my code looks something like this:

private val commandRegister: Map<CLICommands, CLICommand> = mapOf(
        CLICommands.STOP to CLICommand(
            "stop",
            true  // always enabled!
        ) {
            logger.debug { "Sending shutdown signal!" }
            safeToListen = false
            ChannelManager.getChannel<Unit>("shutdown_signal")?.send(Unit)
            return@CLICommand null
        }
    )

    /**
     * As the name implies, this method listens for console-commands on [System.in] and executes the action specified
     * for them.
     *
     * The listening process is **NOT cancelable**, unless [safeToListen] is set to `false` **before** the method gets
     * to this stage.
     */
    suspend fun listenAndRun() = coroutineScope {
        logger.debug { "Started listening for console commands." }
        val maxReadTries = 5
        var readTries = maxReadTries

        while (isActive) {
            while (!safeToListen) {
                // Trap execution here, until it's safe.
                // This is a safeguard against accidentally preventing this coroutine from being canceled, because once
                // the execution reaches `readlnOrNull`, there's no turning back.
                logger.debug {
                    "Execution of `listenAndRun` delayed, because it's not safe to enter the listening phase. This is" +
                    " typically done to be able to cancel this."
                }
                delay(100)
            }

            val input: String?
            try {
                input = withContext(Dispatchers.IO) {
                    logger.trace { "Awaiting console input..." }
                    readlnOrNull()
                }
                readTries = maxReadTries
            } catch (e: Exception) {
                return@coroutineScope if (readTries-- == 0) {
                    logger.error(e) {
                        "There was an issue, trying to read the console. (Has the `System.in` stream been closed?)"
                    }
                } else {
                    delay(10)  // The issue may solve itself, but that could take some time.
                    continue
                }
            }

            input?.let {
                runCommand(it)
            }
        }
    }

I think that this solution is sufficient, as the “stop” command is special, because it’s the only one upon which all processes (including the one running listenAndRun) are to be canceled. I’ll probably make this into some kind of a global “application-state” later, if I’ll need to monitor this elsewhere.
Regarding the need for asynchronous execution of commands: I plan on having commands, that will have to perform various network related tasks, that should run in the background. So yeah, I believe, that approach is justified.

As it’s all now working as intended, I’ll consider this issue as solved.
Thank you so much guys for all the help and patience. You’re awesome! :heart: