Coroutines: ConflatedChannel: how to detect event conflation?

Is it possible to receive an event if an actor using a ConflatedChannel receives events close enough to be conflated? My purpose is to try and cancel the work being done to process the event, as the new one comes in.

Here’s what my actor looks like, roughly:

    private val updateDataRequestAsync = actor<DataObject>(UI, capacity = Channel.CONFLATED) {
        for (newData in channel) {           
            // This process is expensive, may take ~3 seconds 
            // I want to cancel it if it's not being used
            val processedData = myHeavyComputation(newData).await()  // uses CommonPool
            renderData(processedData)
        }
    }

It is possible for the new raw data to come in more frequently than the time it takes to process any single event. In that situation, is it possible to get some kind of a notification so that I can cancel the child myHeavyComputation (assuming that coroutine is cancellation friendly)?

Am I thinking about this the right way? Is there an alternative design that would avoid this problem and allow me to process new data coming in frequently?

This similar to switchMap in RxJava. I am not sure such operator is available out of the box in current couroutine version.

I’m not very familiar with RxJava so I don’t quite understand the switchMap analogy.

Does this imply that event processing by actors should be very lightweight? Or is there a better way of structuring this so I can avoid the problem I’m facing?

Use a regular channel and in the actor’s loop select on channel.onReceive and processedData.onAwait to continuously drain the channel

2 Likes

Thank you – that sounds a bit tedious but does achieve what I want.