Non-suspending ring buffer

I’m looking for an alternative to flow.emit(). I would need a data structure (a sized circular/ring buffered list) that would be “coroutine-safe” and the “put/add” operation is non-suspending. The producer should not depend on the consumer and, it should be able to emit data even if there are no consumers listening.
I was thinking of Channels, but they offer non-suspending send() only when declared Channel.UNLIMITED.

Is the coroutine library offering something, or do I need to implement it myself? Do you have any suggestions?

Thanks

Are you considered Channel’s sendBlocking?

Hi @fvasco,

The Channels suspend producing when the buffer is full. I need a producer that keeps producing on a circular buffer (old values get overwritten with fresh ones) and producers can produce also if there aren’t any consumers reading the data.

Consumer can start/stop consuming anytime data available in the buffer.

Take a look here: https://github.com/Kotlin/kotlinx.coroutines/issues/2034

Channel has non-suspending and non-blocking methods too.

A simple solution: try offer and if it fails (returns false), you can poll to throw away the oldest value and offer again.

Note: If your producer emits results concurrently, you should wrap such a code block in a synchronization primitive.

Thanks @nickallendev.

This is what I have implemented right now with a BUFFERED Channel. The “add” operation is synchronised with a Mutex.

The new SharedFlow suggested from @fvasco, could work too (probably with KEEP_LATEST strategy), but it is still in PR/draft phase. It will take a while before coming in the stable release.

Thank you both!