I’m wondering if anyone has a good strategy for coupling dependent flows, I’m running into this situation quite frequently now. The first idea of course would be to combine them into a single flow but not sure how? Here is a concrete example:
Using Flow alongside the ProjectReactor Kafka client and Spring Data R2DBC, this is what we want to achieve:
- Download CSV file
- Insert audit entry into DB
- Deserialize downloaded file (fanning out here, one domain instance per line)
- Publish each entry to a Kafka queue
- When all entries have been published to the queue, update the audit entry in the DB
val reportProcessId = UUID.randomUUID() SFTP.downloadFile(file) // fun downloadFile(file: String): Flow<String> .flatMapConcat { auditRepository.save(ReportProcess(reportProcessId, it)) // fun save(r: ReportProcess): Mono<ReportProcess> .asFlow() .map { it } } .flatMapConcat { deserializeCSVFile(it) } // fun deserializeCSVFile(filename: String): Flow<ReportEntry> .onEach { reportSink.get().next(toKafkaProducerRecord(topic, report.id, report)) } .onCompletion { cause -> if (cause == null) { auditRepository.updateState(reportProcessId, ReportProcessState.SUBMITTED).asFlow().collect{} } else { logger.error("failed to complete stream", cause) } }
This is a simple example, with the second flow (updateState) only being a single line, I have other examples where the second flow is much more elaborate. This looks like a smell though, what happens to context preservation? Will it be guaranteed to finish running completely even if the outer/first Flow finishes first?
Also, in this case I do not need the materialized value of the stream but in other cases I would, which makes the onCompletion handler unsuitable.
Another possibility is using something like a reduce
or fold
terminal operator but what if I need more operators after (eg log every entry)?
ps. I really really hope there are plans to expand Exposed to support reactive operation, would love to get away from Spring!