I want to grok Flows. Maybe 1 more example?

I’ve read the Medium articles on Flows, but they assume a background with Rx and “cold vs hot” that I don’t have. (I’m very glad the experts are in charge!)

I think I’m very close to understanding flows, and maybe need just one now example of a scenario that works with Flows but wouldn’t work as smoothly with other ways. I think they might be useful in what I’m trying to do, but I’m not sure yet.

I’ve tried various Actors/consumers and Channels and Sequences and the like, trying to differentiate which is good for what type of problem, which work well with coroutines, and which are LTS.

“Kotlin Flows and Coroutines” by Roman Elizarov Kotlin Flows and Coroutines. Kotlin Flows are sequential while… | by Roman Elizarov | Medium

Good question. What are Flows for? What’s the difference between Flows and Sequences?

Sequence is synchronous. It uses Iterator and blocks while waiting for next item. Flow is asynchronous. It uses a suspending collect method so there’s no blocking while waiting for the next item.

Flow is a coroutine based implementation of Rx. Simplest description I can think of: it is for representing stream of values that arrive over time.

Like… could you give an ELI5 example?

I see a bunch of potential misunderstandings referenced in the original question (added some extremely simplistic answers):

  • Cold vs Hot - cold means lazy, hot means eager
  • Flow vs Actor - a Flow produces events, an Actor consumes events
  • Flow vs Channel - Flow is a general way to represent multiple values over time (eager or lazy, multiple coroutine contexts or just one). Channel is a primitive specifically for sending multiple values between coroutine contexts (always eager, always synchronization costs).
  • Flow vs Sequence - Flow suspends to wait for next value, Sequence blocks to get next value

It’s not clear what it is about Flows that you are unsure about so figuring out an appropriate example is difficult. Could you be more specific?

Perhaps you could describe what you are trying to do?

1 Like

Foundational language to make sure I’ve got it right (I think I’m missing some of the background)

  • Blocking while waiting for the next item: Stops processing. No chance of continuing until next item arrives.
  • Flow - no blocking (just suspending) - so it … could keep going before something else arrives?
  • Flow is a coroutine based implementation of Rx - I haven’t used Rx, so I’m unsure what this means.
  • Cold/Lazy Hot/Eager - not sure what this means.
  • You said an Actor consumes events and Flow produces events, but this doc says a producer produces events. Flows are the new producers?

BUT - To your request for examples: Yes! Examples are my favorite for making the obscure more clear.

  1. I’ve got a lot of BufferedImage frames produced by a mp4 video decoder and I want to do stuff with them. (High memory, no skipping frames allowed). A Sequence seems to work best, because the lazy processing allows me to filter them, chunk them, and process chunks, all without running out of memory. In general, Sequences seem to be better than Iterables (except for some edge speed cases). I’m pretty clear with what a sequence does for me.
  2. A robot distance sensor is continually producing distance measurements, at an uneven rate. I think a Flow would be best, because there might not be a consumer initially, you want to skip to catch up to the current/latest distance, and you want to sample or average it independent of who is consuming it. But this also sounds like a single element that gets you the “latest sensor read” every time. So I’m not
  3. I wanted to use more CPUs to “do stuff” to the aforementioned BufferedImage video frames in parallel, but if I apply a map() to a async{} to a Sequence, it explodes with OOM. Piping it through a size-limited Channel seems like one option to restrict “number of frames being processed in parallel”

Some language clarification:

  • Blocking - The thread is stuck (imagine a no-op while loop)
  • Suspending - The rest of the coroutine is packaged up into a callback and suspend methods all return immediately allowing the thread to be used for other work. Later the callback will be invoked when some internal result is ready causing the rest of the coroutine run.
  • Rx - Basically just a framework for having something (like Flow) that you can add a listener to (like collect) to optionally cause something to happen and get zero or more values (calls to the lambda passed to collect) followed by a complete signal (like collect finishing) or error signal (like collect throwing exception). It has some rules that make code easier to reason about like callbacks never happening concurrently. It also has a bunch of methods for transforming and combining these values that are sent over time.
  • Cold - Nothing happens till you start listening, listening triggers code to run.
  • Hot - Code runs and does stuff whether you listen for the events or not.
  • Producer/Consumer - A general pattern in programming where some code sends items (producer) and there’s other code that does something with those items (consumer). produce is just one method in one library for one language the helps with writing code that sends out items.

Example responses:

  1. Sequence works ok (I’m guessing loading/decoding the images is a blocking call) but I’d use Flow and ensure that any IO happens with the IO dispatcher using withContext. While I expect getting the next item from a Sequence to run some code, I could be surprised by it doing something really slow like IO. Note this is a “cold flow” scenario, images are only decoded when you listen for them.
  2. This is a “hot flow” scenario (code is running whether anyone listens or not). Flow seems like it’d work fine for this. Depending on the details, I might expose the events with a Flow in the public API but use a Channel internally.
  3. Channel is perfect for consuming events with multiple coroutines. Note that the Channel size is not about parallel processing, just buffering. Parallel processing requires reading from the Channel with multiple coroutines. See: https://kotlinlang.org/docs/reference/coroutines/channels.html#fan-out

Note: Flow and Channel can work together. Using one does not block you from using the other also. See channelFlow, Flow.broadcastIn, Flow.produceIn or BroadcastChannel.asFlow.

I think I grok flows: