Create Iterator from suspendible function


#1

Hello, I have a need to create an Iterator where the values come from a suspendible function. I seem to be having trouble navigating main and coroutine contexts. Here’s a toy example:

package com.opentempo.specification

import kotlinx.coroutines.experimental.channels.Channel
import kotlinx.coroutines.experimental.launch
import kotlinx.coroutines.experimental.runBlocking
import java.util.LinkedList
import kotlin.coroutines.experimental.buildIterator

fun main(args: Array<String>) {
    val iter = getIterator()
    while (iter.hasNext()) {
        println("Main: ${iter.next()}")
    }
    println("Done!")
}

fun getIterator(): Iterator<String> = buildIterator {
    val channel = Channel<String>(0) // zero doesn't seem to give me what I want
    launch {
        emitter(channel)
    }
    // won't compile without this or something like it?
    // but yield never continues
    runBlocking {
        while (! channel.isClosedForReceive) {
            channel.send("request") // tell coroutine to send next value
            val value = channel.receive()
            println("\treceived ${value}")
            yield(value) // never returns, main never prints values
        }
    }
}

suspend fun emitter(channel: Channel<String>) {
    val queue = LinkedList<String>()
    queue.addAll(listOf("Fred", "Barney", "Wilma", "Betty"))

    while (queue.isNotEmpty()) {
        channel.receive() // wait for main line to request a value
        val value = queue.pop()
        println("sending ${value}")
        channel.send(value)
    }
    channel.close()
}

First, I thought Channel<String>(0) would make it so only one message would be in the channel at a time. This didn’t work but I have addressed that with the request-response pattern.

If I comment out the yield() I get the values in the order I expect, but of course the Iterator doesn’t emit them. But as is the program hangs on the yield(). I suppose this makes sense if it’s suspending into a blocking context. But without the runBlocking (or some equivalent) it won’t compile, the channel send and receive calls throw errors.

The buildIterator / yield pattern seems like the best fit for what I’m doing. But I can’t figure out how to interact with the channel AND yield values to the iterator caller.

(The real-world situation is a bit more complex, of course. I’m building a DSL for JUnit5. The DSL must be perfectly sequential in execution, thus must suspend while yielding DynamicNode to an iterator.)

Thanks for the help.

/Daryl


#2

As far as I understand it the runBlocking call is the problem. For some reason (I don’t know why) it stops your buildIterator from yielding the next value. You can work around this by using this instead

val value = runBlocking{
    channel.send("request")
    channel.receive()
}

I also needed to add a short delay in my test after the yield. Otherwise the iterator builder would wait for the next channel receive, before the emitter had enough time to close the channel. (Obviously a delay is not the best way to solve this, but hey not my problem right now :slight_smile: I guess in this case an atomic int would do the trick, or you could use a mutex)


#3

Awesome, thank you so much. I did not realize runBlocking was returning the last value of the block.

It seems to work without the delay you mentioned. I’ll have to remember to keep an eye on that…

/Daryl


#4

OK, my real-world example needed the delay… hmm…


#5

Coroutines can get executed on multiple threads, so you can not simply assume a linear execution. That’s why sometimes you need some sort of mutex and sometimes you don’t. A delay can work, but if for some reason the execution of one coroutine is really slow it can still lead to a deadlock. That’s why I would suggest you look into a different way of fixing it, some kind of mutex.


#6

As it turns out I need this to all run on one thread. Using “Unconfined” appears to address the concern.

Thanks.


#7

Then I guess you were lucky. “Unconfined” does not lock the execution to one thread but allows the thread to change. More information can be found here: https://github.com/Kotlin/kotlinx.coroutines/blob/master/coroutines-guide.md#unconfined-vs-confined-dispatcher

I think the best way would be to create a single thread dispatcher. You can use newSingleThreadDispatcher to create one. Just make sure to call close on it to release the resources it takes.


#8

If you want an easier iterator you can also use buildSequence to create a sequence from a suspend function. A sequence can then be iterated as you wish (it has an iterator function).


#9

Thanks again, I’ll try that (newSingleThreadDispatcher).


#10

How would buildSequence be easier? It looks to be 6 of one 1/2 dozen of the other.


#11

Isn’t the only difference between buildSequence and buildIterator that sequence can be re-used?

But i’m still not getting your initial actual question. Your approach seems more complicated than necessary.

Iterators ARE suspendible functions. yield()'s are suspension points. If you want a suspendable iterator, just yield stuff.

If your data source is on a different suspension context, aka you don’t get around the launch { emitter(channel) } part? Then channel can be your iterator. Why not just use it?

fun getIterator(): Iterator<String> {
    val channel = Channel<String>(0) // zero doesn't seem to give me what I want
    launch {
        emitter(channel)
    }
    return channel
}

This has disadvantages, like abandoning one part does not clean up the other. But your version shouldn’t be better in that regard, anyway.


#12

I missed buildIterator in your code sample (it was hidden in the right fold and a lot of code). But yes, if you just want an iterator use buildIterator.


#13

Yes, I understand if you think my example is overly complicated. Yes the iterator suspends, but the emitter also suspends. In my OP I described that it’s a DSL that emits events and needs to suspend with each event until the external iterator caller requests the next element. It’s possible I could simplify things, the problem was not knowing how to get a value off the channel the right way.

Your example doesn’t compile, even with return channel.iterator() as iterator() produces a ChannelIterator which is not a Java Iterator.

/Daryl


#14

Your right. Sorry, i missed that. Btw, here is someone else wrapping ReceiveChannel into a Sequence, but you seem to do something similar already. And it also uses runBlocking in a way that potentially blocks and I’m not sure your single thread requirement can be fulfilled that way.


#15

Yes, I saw that post before creating this thread. That’s where I learned "ChannelIterator does not extend Iterator ".

Thanks.


#16

You should not call yield from runBlocking. It will be prohibited eventually, see https://youtrack.jetbrains.com/issue/KT-20505.