Kotlin's idiomatic way to achieve request hedging?

Spring’s reactor has an interesting feature : Hedging . It means spawning many requests and get the first returned result , and automatically clean other contexts. Josh Long recently has been actively promoting this feature. Googling Spring reactor hedging shows relative results. If anybody is curious , here is the sample code . In short , Flux.first() simplifies all the underlaying hassles , which is very impressive.

I wonder how this can be achieved with Kotlin’s coroutine and multithread , (and maybe with Flow or Channel ) . I thought of a simple scenario : One service accepts longUrl and spawns the longUrl to many URL shorten service ( such as IsGd , TinyUrl …) , and returns the first returned URL … (and terminates / cleans other thread / coroutine resources)

There is an interface UrlShorter that defines this work :

interface UrlShorter {
  suspend fun getShortUrl(longUrl: String): String?
}

And there are 4 implementations , one for is.gd , another for tinyUrl , and the third is a Dumb implementation that blocks 10 seconds and return null , the forth is a Null implementation that immediate returns null …

The http library is org.apache.http.client.fluent.Request , a blocking http library.


class IsgdImpl : UrlShorter {
  override suspend fun getShortUrl(longUrl: String): String? {
    logger.info("running : {}", Thread.currentThread().name)
    val url = "https://is.gd/create.php?format=simple&url=%s".format(URLEncoder.encode(longUrl, "UTF-8"))

    return withContext(Dispatchers.IO) {
      logger.info("running Dispatchers.IO : {}", Thread.currentThread().name)
      try {
        Request.Get(url).execute().returnContent().asString().also {
          logger.info("returning {}", it)
        }
      } catch (e: Throwable) {
        null
      }
    }
  }
}
class TinyImpl : UrlShorter {
  override suspend fun getShortUrl(longUrl: String): String? {
    logger.info("running : {}", Thread.currentThread().name)
    val url = "http://tinyurl.com/api-create.php?url=$longUrl"

    return withContext(Dispatchers.IO) {
      logger.info("running Dispatchers.IO : {}", Thread.currentThread().name)
      try {
        Request.Get(url).execute().returnContent().asString().also {
          logger.info("returning {}", it)
        }
      } catch (e: Throwable) {
        null
      }
    }
  }
}
/**
 * delays 10 seconds and returns null
 */
class DumbImpl : UrlShorter {
  override suspend fun getShortUrl(longUrl: String): String? {
    logger.info("running : {}", Thread.currentThread().name)
    delay(10 * 1000)
    return null
  }
}
/**
 * returns null immediately
 */
class NullImpl : UrlShorter {
  override suspend fun getShortUrl(longUrl: String): String? {
    logger.info("running : {}", Thread.currentThread().name)
    return null
  }
}

And one Service with all implementations :

@ExperimentalCoroutinesApi
@FlowPreview
class UrlShorterService(private val impls: List<UrlShorter>) {

  suspend fun getShortUrl(longUrl: String): String {
    // how to implement it ?
  }
}

The point is hedging implementation , I’ve thought this solution :

  suspend fun getShortUrl(longUrl: String): String {
    return methodFlowMerge1(longUrl)
  }

  private suspend fun methodFlowMerge1(longUrl: String): String {
    return impls.asSequence().asFlow().flatMapMerge(impls.size) { impl ->
      flow {
        impl.getShortUrl(longUrl)?.also {
          emit(it)
        }
      }
    }.first()
  }

This is the testing code :


@ExperimentalCoroutinesApi
@FlowPreview
class UrlShorterServiceTest {

  @Test
  fun testHedging() {
    val impls = listOf(NullImpl(), DumbImpl(), IsgdImpl(), TinyImpl())
    val service = UrlShorterService(impls)
    runBlocking {

      service.getShortUrl("https://www.google.com").also {
        logger.info("result = {}", it)
      }
    }
  }
}

I purposely add NullImpl and DumbImpl in the front of the list , hoping it can be spawn earlier. The result seems OK :

   * 13:40:21,358 INFO  NullImpl - running : main @coroutine#3
   * 13:40:21,380 INFO  DumbImpl - running : main @coroutine#4
   * 13:40:21,386 INFO  IsgdImpl - running : main @coroutine#5
   * 13:40:21,402 INFO  IsgdImpl$getShortUrl$2 - running Dispatchers.IO : DefaultDispatcher-worker-1 @coroutine#5
   * 13:40:21,416 INFO  TinyImpl - running : main @coroutine#6
   * 13:40:21,419 INFO  TinyImpl$getShortUrl$2 - running Dispatchers.IO : DefaultDispatcher-worker-2 @coroutine#6
   * 13:40:23,029 INFO  TinyImpl$getShortUrl$2 - returning http://tinyurl.com/389lo
   * 13:40:23,031 INFO  IsgdImpl$getShortUrl$2 - returning https://is.gd/EuvYes
   * 13:40:23,126 INFO  UrlShorterServiceTest$testHedging$1 - result = http://tinyurl.com/389lo

The TinyImpl returns first and returns the result. It seems good , but I wonder if it safe ? Are the other coroutine resources really cancelled / cleaned ? Is it better to rewrite it to this :

  private suspend fun methodFlowMerge2(longUrl: String): String {
    return impls.asSequence().asFlow().flatMapMerge(impls.size) { impl ->
      flow {
        impl.getShortUrl(longUrl)?.also {
          emit(it)
        }
      }.flowOn(Dispatchers.IO)
    }.first()
      .also { Dispatchers.IO.cancelChildren() }
  }

It flowOn(Dispatchers.IO) and finally cancelChildren , is it safer ? The underlaying http client (apache http client) is a blocking library , will there be any leaky ?

For a suspend function executing a http request , is a coroutine-supported http client library a must ? Such as retrofit or kittinunf/fuel ?

SO kotlin expert Marko Topolnik uses channel to solve it multithreading - Kotlin to achieve multithread request hedging? - Stack Overflow . But I wonder if it really necessary to use channel to solve the problem ?

The whole working code is available here : Hedging with blocking http client · GitHub

What’s your ideal / idiomatic solution regarding to this problem ?

Thanks.

You should probably use a Job and just make sure that you are running using that Job’s scope, then on the first one that actually returns a result, just do jobName.cancel()

If I was going to do this a lot, I wouldn’t mind having to write a helper function to implement it. I would want some extra features, too, like going to option 2 without delay if option 1 fails. Given that, it doesn’t seem important that it’s a little tricky to build out of primitives.

You can trust flatMapMerge to cancel the children coroutines. Calling cancelChildren is not necessary and likely incorrect (it’ll cancel anything else using Dispatchers.IO).

Cancelling coroutines, however, does not cancel blocking calls so yes, your code as-is will leak the no longer needed requests.

In Marko’s response, it’s not really the Channel that is the important part but rather the cancelling of the Futures once we have a result.

You should be able to do it just as easily with your existing approach and a slight tweak using a helper method:


class TinyImpl : UrlShorter {
  override suspend fun getShortUrl(longUrl: String): String? {
    logger.info("running : {}", Thread.currentThread().name)
    val url = "http://tinyurl.com/api-create.php?url=$longUrl"

    return withInterrupt {
      logger.info("running withInterrupt : {}", Thread.currentThread().name)
      try {
        Request.Get(url).execute().returnContent().asString().also {
          logger.info("returning {}", it)
        }
      } catch (e: Throwable) {
        null
      }
    }
  }
}
val blockingIOExecutor = Executors.newCachedThreadPool()
suspend fun <T> withInterrupt(block: () -> T): T {
    return suspendCancellableCoroutine { cont ->
        val future = blockingIOExecutor.submit {
            try {
                val result = block()
                cont.resumeWith(Result.success(result))
            } catch (e: InterruptedException) {
                cont.resumeWithException(CancellationException())
            } catch (e: Throwable) {
                cont.resumeWithException(e);
            }
        }
        cont.invokeOnCancellation {
            future.cancel(true)
        }
    }
}

Helper method adapted from here

Thanks , it’s very helpful.

I found ktor natively supports coroutines. So I re-implement my TinyImpl and IsgdImpl with io.ktor.client.HttpClient .
Here is the code :

val ktorClient = io.ktor.client.HttpClient()

and TinyURL with ktor implementation

class TinyKtorImpl : UrlShorter {
  override suspend fun getShortUrl(longUrl: String): String? {
    logger.info("running : {}", Thread.currentThread().name)
    val url = "http://tinyurl.com/api-create.php?url=$longUrl"

    return try {
      ktorClient.get<String>(url).also {
        logger.info("returning {}", it)
      }
    } catch (e: Throwable) {
      logger.warn("{}", e.message)
      null
    }
  }
}

And Isgd with ktor implementation :

class IsgdKtorImpl : UrlShorter {
  override suspend fun getShortUrl(longUrl: String): String? {
    logger.info("running : {}", Thread.currentThread().name)
    val url = "https://is.gd/create.php?format=simple&url=%s".format(URLEncoder.encode(longUrl, "UTF-8"))

    return try {
      ktorClient.get<String>(url).also {
        logger.info("returning {}", it)
      }
    } catch (e: Throwable) {
      logger.warn("{}", e.message)
      null
    }
  }
}

The algorithm is the same :

private suspend fun methodFlowMerge1(longUrl: String): String {

    return impls.asFlow().flatMapMerge(impls.size) { impl ->
      flow<String?> {
        impl.getShortUrl(longUrl)?.also {
          emit(it)
        }
      }
    }.first() ?: longUrl
  }

The ktor’s client is more clean and readable (compare to okhttp and apache http) .

The full source code is here : kotlinPlay/HedgeTest.kt at master · smallufo/kotlinPlay · GitHub

My question is , since ktor supports co-routine , is this more safe / robust / leaky-free ?

And more , ktor can replace the underlaying Engine , with apache , okHttp … defined here https://ktor.io/clients/http-client/engines.html

Since apache engine is essentially blocking , does it means ktor can handle call cancellation correctly (meaning the hedging algorithm doesn’t need to take care of it ) ?

Thanks.

Yes (relative to maintaining coroutine interop code yourself)

Yes, though in the case of apache engine, there’s no blocking even internally since it uses Apache’s asynchronous API.