Streams and Sequences inside coroutines


#1

Just encountered one possible problem with coroutines. It seems to be impossible to use Stream or Sequence lazy evaluation with suspended functions. The example is following: I have a coroutine running and want to perform some suspendedable operation on each of elements of some Stream. The compiler gives an error, declaring that suspendable function could not be called outside coroutine and indeed, we do not know, when and how will Stream operations be performed. I can surely introduce async block inside mapping function, but I do not know weather it is a good idea to launch one coroutine inside another one. Another solution is to collect the Stream to list and then use regular Kotlin mapping operation on this list (it works fine because mapping operation is not lazy).


#2

It isn’t a bad idea, the ForkJoinPool is specifically designed for this purpose.

Collect a Stream should be avoided if possible.

Can you use a Channel?
https://kotlin.github.io/kotlinx.coroutines/kotlinx-coroutines-core/kotlinx.coroutines.experimental.channels/-channel/index.html

However can you provide an example?


#3

Here it is:

call.json { // it is slightly modified ktor call that takes suspendable function as argument
    val loaders = jsonArray();
    //my framework function that provides a stream of objects called loaders
    StorageUtils.loaderStream().map { loader ->
        // a call to suspendable json builder. This is the place where problem arises
        jsonObject {
          //some logic here
        }
    }.forEach {
        loaders.add(it)
    }
    add("loaders", loaders)
}

I do not think that I need channels here, because all of internal logic is straight forward and does not requires communication between processes. The problem is that some iterator calls in my code are replaced by Stream API. I do not need parallel processing of Stream here.


#4
for (item in StorageUtils.loaderStream()) {
    val jo = jsonObject {
                //some logic here
                }
    loaders.add(jo)
}

#5

Good idea, thanks. I did not know that streams could be used instead of iterators in Kotlin (In Java Stream have iterator method but does not implement Iterable).


#6

for iterates through anything that provides an iterator, i.e.
has a member- or extension-function iterator()

http://kotlinlang.org/docs/reference/control-flow.html#for-loops