How it is better to wrap handler to Corotine Channel?

Hi all !

Do you know, how it is better to wrap handler to the Kotlin channels?

For example, we have handler interface:

interface ExternalMessageHandler {
    void onMessage(message: Message)

    void onClose()
}

I’d like to have function, which will able wrap such handler to:

// this function I'd like to write
// this returns pair:
// 1. Channel with events
// 2. Inner channel, which can be used for closing
suspend fun wrapChannel(connectionInfo: ConnectionInfo): Connection

data class Connection(val channel: ReceiveChannel<RawEvent>, val externalConnection: ExternalConnection)

sealed class RawEvent{
    class MessageEvent(val message: Message) : RawEvent()

    class CloseEvent(): RawEvent()
}

// this function can connect to the external system:
fun connect(handler: ExternalMessageHandler, connectionInfo: ConnectionInfo): ExternalConnection

Please see below short PoC, how can we wrap this handler. For my opinion, code below can be simplified. However, my question: how can I simplify this code?


suspend fun wrapChannel(connectionInfo: ConnectionInfo): Connection {

    val rawConnection = CompletableDeferred<ExternalConnection>()

    val channel = produce<RawEvent>{
        val handler = object : ExternalMessageHandler {            
            void onMessage(message: Message) = runBlocking { send(RawEvent.MessageEvent(message)) }

            void onClose() = runBlocking { send(RawEvent.CloseEvent()) }
        }

        val externalConnection = connect(handler, connectionInfo)

        rawConnection.complete(externalConnection)
    }

    return Connection(channel, rawConnection)
}

And the same code with comments:


suspend fun wrapChannel(connectionInfo: ConnectionInfo): Connection {

    // this is way to return inner connection from produce lambda below
    val rawConnection = CompletableDeferred<ExternalConnection>()

    // use produce lambda to have ability in future to put additional Corotine context here, etc.
    // possible, we can use Channel<...>() constructor here
    val channel = produce<RawEvent>{

        // of course, we should not create object in place in the real life, this is just because of example simplification
        val handler = object : ExternalMessageHandler {

            // this is single data re-transfer. In real life we should to log call, add try catch.
            // also is is better to have async queue, to have ability add items in syncronus mode and get items in async mode
            void onMessage(message: Message) = runBlocking { send(RawEvent.MessageEvent(message)) }

            // in addition to onClose we often have "onError" methods. I hide it to simplify example
            void onClose() = runBlocking { send(RawEvent.CloseEvent()); complete() }
        }

        val externalConnection = connect(handler, connectionInfo)

        rawConnection.complete(externalConnection)        
    }

    // await for connection construction and return it
    return Connection(channel, rawConnection.await())
}

I repeat question from topic start: Do you know, how it is better to wrap handler to the Kotlin channels?

Can you avoid sealed class?

val handler = object : ExternalMessageHandler {            
    fun onMessage(message: Message) { sendBlocking(message) }

    fun onClose() { close() }

    fun onError(cause: Throwable) { close(cause) }
}

@fvasco, yes, I can. However, the most of handlers have extra methods. Please see okhttp WebSocket listener as an example - https://github.com/square/okhttp/blob/master/okhttp/src/main/java/okhttp3/WebSocketListener.java

Therefore, I have to have base sealed class to test my algos

Some consideration

  • use directly Channel instead of produce
  • use sendBlocking instead of runBlocking { send
  • use object CloseEvent(): RawEvent() instead of class CloseEvent(): RawEvent()
  • you can consider class ConnectionChannel(val externalConnection: ExternalConnection, channel: ReceiveChannel<RawEvent>) : ReceiveChannel<RawEvent> by channel instead of Connection
  • maybe you should avoid to use val rawConnection = CompletableDeferred<ExternalConnection>()
  • close channel when connection in closed

@fvasco, thank you !

I tried to use Channels directly, however I got coroutines hanging. Possible, I did not configure something. produce/publish works without these issues.

sendblocking is more useful, thank you !

About val rawConnection = CompletableDeferred<ExternalConnection>() - I agree with you, however I have to have access to the objects, which are used during channel creation.

About channel closing - you are absolutely right, I missed this.

Thank you !