Js coroutines experiements


#1

I was looking at something like this. A jqFetch method which I can use in a synchronous fashion.
This is the best I could come with. My objective is to get to the point where I can just call
jqFetch and get the result back without having to worry about promises, futures or callbacks.
Just:

result = jqFetch("url/to/something")

Is there a way to hide “launch”? maybe extending jquery click method and name it clickSynch with the
launch within it already, so that it’s sort of hidden. Anybody has an idea?

import jQuery as jq

fun main(args:Array<String> = arrayOf()) {
    jq("#confButton").click {launch{
        jq("#configData").text(jqFetch("application.conf"))
        jq("#confModal").asDynamic().modal("show") as Any
    }}
}

suspend fun jqFetch(url:String, type:String? = "text"):String = suspendCoroutine {cont ->
    jq.get(url, null, {data, _, _ ->
        cont.resume(data as String)
    }, type)
}

I want to construct something like this also for websockets “receive()” so without the onmessage callback burden. Something like, i create the websocket and it returns only when it’s open, then i call “receive()” on it and it waits for next message, close or error:

val ws = openWebSocket(url) // suspends until websocket is open
val data = ws.receive() // suspends until data is received

All done with a coroutine aware wrapper on top of websockets… that’d make much easier to do networking in the webpages.

And by the way, also websocket sends should suspend until bufferedAmount is 0, just to be safe and not fill the buffer causing maybe the page to crash. If the network cannot handle the data fast enough, what then?

   if (socket.bufferedAmount == 0)
       socket.send(getUpdateData());

#2

Finally came out with this solution.
It turns the whole websocket event based into blocking/suspending style which I find much better.

Inside Javascript:

  launch {
        val socket = Socket()
        while(true) {
            socket.connect("wss://echo.websocket.org")
            console.log("connected!")
            try {
                while(true) {
                    socket.send("echo this!")
                    console.log(socket.receive())
                    delay(1000)
                }
            } finally {
                socket.close()
            }
        }
    }

Socket implementation:

class SocketClosedException(val reason:String) : Throwable(reason)

    class Socket {
        var eventQueue:Channel<Event> = Channel(Channel.UNLIMITED)
        lateinit var ws:WebSocket
        val state:Short
            get() = ws.readyState

        private fun onWsEvent(event:Event) {
            launch {eventQueue.send(event)}
        }

        suspend fun connect(url:String, retryDelay:Int = 1000) {
            while(true) {
                val connected = suspendCoroutine<Boolean> {cont ->
                    while(eventQueue.poll() != null){/*drain*/}
                    ws = WebSocket(url)
                    ws.onopen = {
                        console.info("Connected to: $url")
                        ws.onclose = ::onWsEvent
                        ws.onerror = ::onWsEvent
                        cont.resume(true)
                    }
                    ws.onmessage = ::onWsEvent
                    ws.onerror = {
                        logError(it)
                    }
                    ws.onclose = {
                        logClose((it as CloseEvent).code)
                        cont.resume(false)
                    }
                }
                if(connected)
                    break
                else
                    delay(retryDelay)
            }
        }

        suspend inline fun <reified T> receive():T {
            val event = eventQueue.receive()
            return when(event) {
                is MessageEvent -> {
                    if(String is T)
                        event.asDynamic().data as T
                    else
                        JSON.parse(event.asDynamic().data as String)
                }
                is CloseEvent -> {
                    val reason = logClose(event.code)
                    throw SocketClosedException(reason)
                }
                is ErrorEvent -> {
                    logError(event)
                    close()
                    throw SocketClosedException(event.message)
                }
                else -> {
                    val reason = getReason(4001)
                    console.error(reason)
                    close()
                    throw SocketClosedException(reason)
                }
            }
        }

        inline fun <reified T> send(obj:T) {
            when {
                isClosed() -> console.error(getReason(4002))
                String is T -> ws.send(obj as String)
                else -> ws.send(JSON.stringify(obj))
            }
        }

        fun close(code:Short = 1000) {
            when(state) {
                OPEN -> ws.close(code, getReason(1000))
            }
        }

        fun isClosed():Boolean {
            return when(state) {
                CLOSED, CLOSING -> true
                else -> false
            }
        }

        fun logClose(code:Short):String {
            val reason = getReason(code)
            console.info("Socket connection with ${ws.url} was closed, reason: $reason")
            return reason
        }

        fun logError(event:Event) = console.error("An error %o occurred when connecting to ${ws.url}", event)

        fun getReason(code:Short):String {
            return when(code.toInt()) { // See http://tools.ietf.org/html/rfc6455#section-7.4.1
                1000 -> "Normal closure"
                1001 -> "An endpoint is \"going away\", such as a server going down or a browser having navigated away from a page."
                1002 -> "An endpoint is terminating the connection due to a protocol error"
                1003 -> "An endpoint is terminating the connection because it has received a type of data it cannot accept (e.g., an endpoint that understands only text data MAY send this if it receives a binary message)."
                1004 -> "Reserved. The specific meaning might be defined in the future."
                1005 -> "No status code was actually present."
                1006 -> "The connection was closed abnormally, e.g., without sending or receiving a Close control frame"
                1007 -> "An endpoint is terminating the connection because it has received data within a message that was not consistent with the type of the message (e.g., non-UTF-8 [http://tools.ietf.org/html/rfc3629] data within a text message)."
                1008 -> "An endpoint is terminating the connection because it has received a message that \"violates its policy\". This reason is given either if there is no other sutible reason, or if there is a need to hide specific details about the policy."
                1009 -> "An endpoint is terminating the connection because it has received a message that is too big for it to process."
                1010 -> "An endpoint (client ) is terminating the connection because it has expected the server to negotiate one or more extension, but the server didn't return them in the response message of the WebSocket handshake. <br /> Specifically, the extensions that are needed are: "
                1011 -> "A server is terminating the connection because it encountered an unexpected condition that prevented it from fulfilling the request."
                1015 -> "The connection was closed due to a failure to perform a TLS handshake (e.g., the server certificate can't be verified)."
                4001 -> "Unexpected event"
                4002 -> "You are trying to use closed socket"
                else -> "Unknown reason"
            }
        }
    }