Thread-safe ReceiveChannel.poll() on channel with nullable elements


#1

Due to some issues, I cannot use select+onReceive for my multi-channel-select use case (namely the lambdas are not inlined so I can’t return from inside). Therefore, I am using poll coupled with a when clause. But that channel might accept null elements. When using ReceiveChannel.poll how do I know in thread-safe fashion whether the result is null because the channel was empty, null because the channel was closed, or null because it was sent a null? Using isEmpty or isClosedForReceive before or after the fact is not thread safe and I don’t want to carry around a mutex with the channel everywhere (can’t use synchronized because this is common library code).

There is an unexposed pollInternal that does exactly what I want. Should I open an issue requesting a fun <T> ReceiveChannel<T>.pollOrDefault(default: () -> T): T? Or even better, ReceiveChannel<T>.pollWithState(): Pair<T?, State>? I just need atomic poll-or-isEmpty-or-isClosedForReceive.


#2

Are you considered to use Optional?


#3

This is part of a code generator and I am hoping not to have to convert every use of a nullable object to optional just because this functionality is hidden. Also, that doesn’t tell you whether it’s just empty or empty+closed, both of which would return a null value. There is ambiguity in the state when reusing null for this purpose (and it’s why Go has a version that returns ok so you know whether it was due to a empty+closed channel).


#4

Ok @cretz
I understand your concern and I agree with it,
I prefer the onEmpty and onClosedForReceive (https://github.com/Kotlin/kotlinx.coroutines/issues/330#issuecomment-381406403), these should be enough to resolve your issue and to build any other operator.

Referring to your proposal, pollOrDefault looks not so generic for me, instead pollWithState should return a sealed class Item|Empty|ClosedForReceive.


#5

@fvasco - yeah, I’ll let this sit a bit longer, then open up an issue (and maybe a PR). I can’t use select for my use case because I need inline functions with inline-return support and I need to support default fallback (see my other thread Coroutine select clause with default).


#6

I don’t understand this part, having the three onXxx selectors should cover all use cases, why do you need the else block?


#7

mentioned in the other thread, https://kotlin.github.io/kotlinx.coroutines/kotlinx-coroutines-core/kotlinx.coroutines.selects/select.html specifically states:

There is no default clause for select expression. Instead, each selectable suspending function has the corresponding non-suspending version that can be used with a regular when expression to select one of the alternatives or to perform default ( else ) action if none of them can be immediately selected.

I need something to run when no others are satisfied. This is a common need in Go and elsewhere. Also, those onXXX selectors aren’t inlined, so I can’t return from them (which may seem trivial to rework the program, but I am writing a code generator).


#8

May become possible something like:


fun ReceiveChannel<T>.pollWithState() =
select {
  onReceive { Ok(it) }
  onEmpty { Empty }
  onClosedForReceive { ClosedForReceive}
  onTimeout(0) { error("No else branch") }
}

#9

Well, onTimeout would never be called in that scenario in theory, because a receive channel either has a value, is open+empty, or is closed+empty.

But yes, sure. In fact, if you read the source at https://github.com/Kotlin/kotlinx.coroutines/blob/1.0.1/common/kotlinx-coroutines-core-common/src/channels/AbstractChannel.kt#L523 you’ll see they already return a union of the object, poll failed (i.e. empty), or closed. So you’d just leverage that for pollWithState. Then you can write onReceiveOrClosedForReceive if you want the select version (you don’t want it to handle just onEmpty because you want that to fall through to the next select clause). Separating them out means you might want to write different code for each which may not be the case. They just chose not to expose the state in a single (presumably) atomic invocation for whatever reason.