Turn a synchronous multi-shot call-back function into a Sequence

The following function shall represent a replacement for some library funtion, so there is no control over it:

fun loop(callback: (Int) -> Unit) {
    var i = 0
    while (true) {
        Thread.sleep(100)
        callback(i++)
    }
}

I want to wrap this and work with a Sequence instead. I know callbackFlow from kotlinx.coroutines which can be used to turn this into a Flow, roughly like this:

fun main() {
    val flow = callbackFlow {
        loop {
            trySendBlocking(it)
        }
    }
    runBlocking {
        flow.buffer(0).collect {
            println(it)
        }
    }
}

But I get a Flow here and not a Sequence. My application is fully synchronous, strictly single-threaded, so I don’t need and don’t want to go into the suspending world. Also I want to avoid to pull the kotlinx.coroutines library. Is this somehow possible with the standard library?

Usually, sequences generate items on demand, but in your case you actually push items by the producer. For this reason we have to do this in two separate threads and we need some kind of a queue. If you don’t mind targeting the JVM, we can use its LinkedBlockingQueue:

val queue = LinkedBlockingQueue<Int>()
val seq = generateSequence { queue.take() }
thread {
    loop {
        queue += it
    }
}
seq.forEach {
    println(it)
}

Thanks for the answer, but the application must stay single-threaded. It seems like the solution with the callbackFlow does not use any extra threads. So I guess, it should be possible what I want to achieve. Maybe I have to reimplement callbackFlow as a variant producing a Sequence. The question is, how complicated is that, or the other way round, what’s the simplest way to do it? But I’m not sure about this…

No, this is not possible to do with a single thread and without coroutines. You have two different call stacks here: one on the producer and one on the consumer side. Without coroutines you need two separate threads for this.

1 Like

The solution might very well use coroutines under the hood. The sequence builder function also does this. I just do not want to have a Flow as a result, but a Sequence which I can call in a non-suspending context.
Here is an idea: I take my first solution producing the Flow and turn it into Sequence by wrapping the Flow and implement iterator with a runBlocking. I wonder whether this would be a “good” solution, or whether there is one with acceptable length without using kotlinx.coroutines.

No, this is still not possible. Look, thread can’t be in two places at the same time. Well, with coroutines it kind of can, but to do this both the consumer and producer must be coroutines and the whole code must be coroutine-aware and coroutine-friendly. Your code clearly isn’t.

loop() function is not at all aware of coroutines. It hijacks the thread for itself and doesn’t allow to use it for anything else. If this is the only thread you have, that means your application pretty much can’t do anything else than running this loop() function. You can’t even run your above code twice.

2 Likes

broot is correct. loop will take up all of work in your thread safe for whatever happens in the callback.

Sequence itself is built on top of Iterator, implementing its next function makes little sense since either your program is consuming a value or it is working on creating a value, it can’t block or suspend while there is no value yet because then you will never acquire your next value.

Once your code calls loop, it cannot proceed and everything before it will be blocked indefinitely:

fun main() {
    //setup
    loop /*block the thread forever*/ { value ->
        //consume value
    }
    //unreachable code
}

Ok you might be right, but your explanations suggest that the example with the callbackFlow can’t work on a single thread, but it seems to me that it does. The main thread switches from the trySendBlocking to the println back and forth. So maybe I don’t really understand how it works. Could you explain?

Your example with callbackFlow() works by accident, it really shouldn’t :stuck_out_tongue:

The only reason it can work is because everything here is based on callbacks. If we register a callback, we don’t occupy the thread - the event is pushed to us within some third party thread. But if we do e.g. seq.first() then we “pull” the item, we block the thread in place.

Second reason why it works is because in this specific case coroutines machinery decided it can run the producer and consumer not as siblings, but to invoke the consumer directly inside the producer. What happens here is that trySendBlocking() internally invokes the lambda passed to collect(). But this is a very specific case, most probably related to trySendBlocking(). Change anything in your code, for example replace trySendBlocking() with trySend() and you will see sending says it is successful, but the collector never receives the item. It can’t collect, because the only thread is occupied by the producer all the time.

Also, try to launch another coroutine inside runBlocking() and you will notice it doesn’t even run. The reason is the same, the thread is entirely blocked by your loop(). Now, add a condition to your loop, so it finishes after a few items and you will notice at this point everything unblocks - your collector starts consuming and this above new coroutine starts running.

edit:
We can also look at this from another perspective. Let’s forget about your specific case and coroutines. Speaking in general, producer-consumer pattern using a single thread is possible only if one side is active and another is passive. Usually, when using sequences, the consumer is active, it runs some kind of a loop and asks for next items, then internally the producer is invoked on demand. Your example works in opposite way: producer runs a loop and invokes the consumer for each item. But if both the producer and the consumer needs to be active and run their own loops, we can’t do this on a single thread, we need at least two. Well… coroutines allow to do this by running two coroutines on a single thread, but there are multiple restrictions to make this possible and your loop() doesn’t really allow this.

1 Like

Thanks, that clarifies my confusion with the example.