Coupling dependent Flows?

I’m wondering if anyone has a good strategy for coupling dependent flows, I’m running into this situation quite frequently now. The first idea of course would be to combine them into a single flow but not sure how? Here is a concrete example:

Using Flow alongside the ProjectReactor Kafka client and Spring Data R2DBC, this is what we want to achieve:

  • Download CSV file
  • Insert audit entry into DB
  • Deserialize downloaded file (fanning out here, one domain instance per line)
  • Publish each entry to a Kafka queue
  • When all entries have been published to the queue, update the audit entry in the DB
val reportProcessId = UUID.randomUUID()
SFTP.downloadFile(file) // fun downloadFile(file: String): Flow<String>
            .flatMapConcat {
                auditRepository.save(ReportProcess(reportProcessId, it)) // fun save(r: ReportProcess): Mono<ReportProcess>
                    .asFlow()
                    .map { it }
            }
            .flatMapConcat { deserializeCSVFile(it) } // fun deserializeCSVFile(filename: String): Flow<ReportEntry>
            .onEach {
                reportSink.get().next(toKafkaProducerRecord(topic, report.id, report))
            }
            .onCompletion { cause ->
                if (cause == null) {
                    auditRepository.updateState(reportProcessId, ReportProcessState.SUBMITTED).asFlow().collect{}
                } else {
                    logger.error("failed to complete stream", cause)
                }
            }

This is a simple example, with the second flow (updateState) only being a single line, I have other examples where the second flow is much more elaborate. This looks like a smell though, what happens to context preservation? Will it be guaranteed to finish running completely even if the outer/first Flow finishes first?
Also, in this case I do not need the materialized value of the stream but in other cases I would, which makes the onCompletion handler unsuitable.

Another possibility is using something like a reduce or fold terminal operator but what if I need more operators after (eg log every entry)?

ps. I really really hope there are plans to expand Exposed to support reactive operation, would love to get away from Spring!

Not super clear what you are asking but it kinda seems like you just want to know how process one stream, and then another.

If you want to do async things in order, just write them in order in a suspend method or lambda.

try {
    val content = SFTP.downloadFile(file).first() //assuming one item based on name
    auditRepository.save(ReportProcess(reportProcessId, content)).collect()
    deserializeCSVFile(content).collect { 
        reportSink.get().next(toKafkaProducerRecord(topic, report.id, report))
    }
    auditRepository.updateState(reportProcessId, ReportProcessState.SUBMITTED).collect()
} catch (ex: Exception) {
    logger.error("failed to complete stream", cause)
}

If you want a Flow that does this, stick the whole think inside a call to flow { ... }. Consider changing methods that return a Flow/Mono of zero or one items into suspend methods or suspend functions (like suspend () -> String) so you never need to worry about forgetting to tack on that collect() call.

onCompletion is fine for emitting items after another flow has finished. That lambda’s receiver is a FlowCollector so you can just call emit and it’ll be sent out.

1 Like

Thank you, probably not the best description of my problem, maybe I’ll take another stab in the future. Either way, a good reminder that suspending functions in a CoroutineScope runs sequentially, a useful property that may prove useful here.