Kotlin's idiomatic way to achieve request hedging?

Spring’s reactor has an interesting feature : Hedging . It means spawning many requests and get the first returned result , and automatically clean other contexts. Josh Long recently has been actively promoting this feature. Googling Spring reactor hedging shows relative results. If anybody is curious , here is the sample code . In short , Flux.first() simplifies all the underlaying hassles , which is very impressive.

I wonder how this can be achieved with Kotlin’s coroutine and multithread , (and maybe with Flow or Channel ) . I thought of a simple scenario : One service accepts longUrl and spawns the longUrl to many URL shorten service ( such as IsGd , TinyUrl …) , and returns the first returned URL … (and terminates / cleans other thread / coroutine resources)

There is an interface UrlShorter that defines this work :

interface UrlShorter {
  suspend fun getShortUrl(longUrl: String): String?
}

And there are 4 implementations , one for is.gd , another for tinyUrl , and the third is a Dumb implementation that blocks 10 seconds and return null , the forth is a Null implementation that immediate returns null …

The http library is org.apache.http.client.fluent.Request , a blocking http library.


class IsgdImpl : UrlShorter {
  override suspend fun getShortUrl(longUrl: String): String? {
    logger.info("running : {}", Thread.currentThread().name)
    val url = "https://is.gd/create.php?format=simple&url=%s".format(URLEncoder.encode(longUrl, "UTF-8"))

    return withContext(Dispatchers.IO) {
      logger.info("running Dispatchers.IO : {}", Thread.currentThread().name)
      try {
        Request.Get(url).execute().returnContent().asString().also {
          logger.info("returning {}", it)
        }
      } catch (e: Throwable) {
        null
      }
    }
  }
}
class TinyImpl : UrlShorter {
  override suspend fun getShortUrl(longUrl: String): String? {
    logger.info("running : {}", Thread.currentThread().name)
    val url = "http://tinyurl.com/api-create.php?url=$longUrl"

    return withContext(Dispatchers.IO) {
      logger.info("running Dispatchers.IO : {}", Thread.currentThread().name)
      try {
        Request.Get(url).execute().returnContent().asString().also {
          logger.info("returning {}", it)
        }
      } catch (e: Throwable) {
        null
      }
    }
  }
}
/**
 * delays 10 seconds and returns null
 */
class DumbImpl : UrlShorter {
  override suspend fun getShortUrl(longUrl: String): String? {
    logger.info("running : {}", Thread.currentThread().name)
    delay(10 * 1000)
    return null
  }
}
/**
 * returns null immediately
 */
class NullImpl : UrlShorter {
  override suspend fun getShortUrl(longUrl: String): String? {
    logger.info("running : {}", Thread.currentThread().name)
    return null
  }
}

And one Service with all implementations :

@ExperimentalCoroutinesApi
@FlowPreview
class UrlShorterService(private val impls: List<UrlShorter>) {

  suspend fun getShortUrl(longUrl: String): String {
    // how to implement it ?
  }
}

The point is hedging implementation , I’ve thought this solution :

  suspend fun getShortUrl(longUrl: String): String {
    return methodFlowMerge1(longUrl)
  }

  private suspend fun methodFlowMerge1(longUrl: String): String {
    return impls.asSequence().asFlow().flatMapMerge(impls.size) { impl ->
      flow {
        impl.getShortUrl(longUrl)?.also {
          emit(it)
        }
      }
    }.first()
  }

This is the testing code :


@ExperimentalCoroutinesApi
@FlowPreview
class UrlShorterServiceTest {

  @Test
  fun testHedging() {
    val impls = listOf(NullImpl(), DumbImpl(), IsgdImpl(), TinyImpl())
    val service = UrlShorterService(impls)
    runBlocking {

      service.getShortUrl("https://www.google.com").also {
        logger.info("result = {}", it)
      }
    }
  }
}

I purposely add NullImpl and DumbImpl in the front of the list , hoping it can be spawn earlier. The result seems OK :

   * 13:40:21,358 INFO  NullImpl - running : main @coroutine#3
   * 13:40:21,380 INFO  DumbImpl - running : main @coroutine#4
   * 13:40:21,386 INFO  IsgdImpl - running : main @coroutine#5
   * 13:40:21,402 INFO  IsgdImpl$getShortUrl$2 - running Dispatchers.IO : DefaultDispatcher-worker-1 @coroutine#5
   * 13:40:21,416 INFO  TinyImpl - running : main @coroutine#6
   * 13:40:21,419 INFO  TinyImpl$getShortUrl$2 - running Dispatchers.IO : DefaultDispatcher-worker-2 @coroutine#6
   * 13:40:23,029 INFO  TinyImpl$getShortUrl$2 - returning http://tinyurl.com/389lo
   * 13:40:23,031 INFO  IsgdImpl$getShortUrl$2 - returning https://is.gd/EuvYes
   * 13:40:23,126 INFO  UrlShorterServiceTest$testHedging$1 - result = http://tinyurl.com/389lo

The TinyImpl returns first and returns the result. It seems good , but I wonder if it safe ? Are the other coroutine resources really cancelled / cleaned ? Is it better to rewrite it to this :

  private suspend fun methodFlowMerge2(longUrl: String): String {
    return impls.asSequence().asFlow().flatMapMerge(impls.size) { impl ->
      flow {
        impl.getShortUrl(longUrl)?.also {
          emit(it)
        }
      }.flowOn(Dispatchers.IO)
    }.first()
      .also { Dispatchers.IO.cancelChildren() }
  }

It flowOn(Dispatchers.IO) and finally cancelChildren , is it safer ? The underlaying http client (apache http client) is a blocking library , will there be any leaky ?

For a suspend function executing a http request , is a coroutine-supported http client library a must ? Such as retrofit or kittinunf/fuel ?

SO kotlin expert Marko Topolnik uses channel to solve it https://stackoverflow.com/a/58748645/298430 . But I wonder if it really necessary to use channel to solve the problem ?

The whole working code is available here : https://gist.github.com/smallufo/64e18dc1190a1eaeed01759789f04087

What’s your ideal / idiomatic solution regarding to this problem ?

Thanks.

You should probably use a Job and just make sure that you are running using that Job’s scope, then on the first one that actually returns a result, just do jobName.cancel()