Spring’s reactor has an interesting feature : Hedging . It means spawning many requests and get the first returned result , and automatically clean other contexts. Josh Long recently has been actively promoting this feature. Googling Spring reactor hedging shows relative results. If anybody is curious , here is the sample code . In short , Flux.first()
simplifies all the underlaying hassles , which is very impressive.
I wonder how this can be achieved with Kotlin’s coroutine and multithread , (and maybe with Flow
or Channel
) . I thought of a simple scenario : One service accepts longUrl and spawns the longUrl to many URL shorten service ( such as IsGd , TinyUrl …) , and returns the first returned URL … (and terminates / cleans other thread / coroutine resources)
There is an interface UrlShorter
that defines this work :
interface UrlShorter {
suspend fun getShortUrl(longUrl: String): String?
}
And there are 4 implementations , one for is.gd , another for tinyUrl , and the third is a Dumb implementation that blocks 10 seconds and return null , the forth is a Null implementation that immediate returns null …
The http library is org.apache.http.client.fluent.Request
, a blocking http library.
class IsgdImpl : UrlShorter {
override suspend fun getShortUrl(longUrl: String): String? {
logger.info("running : {}", Thread.currentThread().name)
val url = "https://is.gd/create.php?format=simple&url=%s".format(URLEncoder.encode(longUrl, "UTF-8"))
return withContext(Dispatchers.IO) {
logger.info("running Dispatchers.IO : {}", Thread.currentThread().name)
try {
Request.Get(url).execute().returnContent().asString().also {
logger.info("returning {}", it)
}
} catch (e: Throwable) {
null
}
}
}
}
class TinyImpl : UrlShorter {
override suspend fun getShortUrl(longUrl: String): String? {
logger.info("running : {}", Thread.currentThread().name)
val url = "http://tinyurl.com/api-create.php?url=$longUrl"
return withContext(Dispatchers.IO) {
logger.info("running Dispatchers.IO : {}", Thread.currentThread().name)
try {
Request.Get(url).execute().returnContent().asString().also {
logger.info("returning {}", it)
}
} catch (e: Throwable) {
null
}
}
}
}
/**
* delays 10 seconds and returns null
*/
class DumbImpl : UrlShorter {
override suspend fun getShortUrl(longUrl: String): String? {
logger.info("running : {}", Thread.currentThread().name)
delay(10 * 1000)
return null
}
}
/**
* returns null immediately
*/
class NullImpl : UrlShorter {
override suspend fun getShortUrl(longUrl: String): String? {
logger.info("running : {}", Thread.currentThread().name)
return null
}
}
And one Service with all implementations :
@ExperimentalCoroutinesApi
@FlowPreview
class UrlShorterService(private val impls: List<UrlShorter>) {
suspend fun getShortUrl(longUrl: String): String {
// how to implement it ?
}
}
The point is hedging implementation , I’ve thought this solution :
suspend fun getShortUrl(longUrl: String): String {
return methodFlowMerge1(longUrl)
}
private suspend fun methodFlowMerge1(longUrl: String): String {
return impls.asSequence().asFlow().flatMapMerge(impls.size) { impl ->
flow {
impl.getShortUrl(longUrl)?.also {
emit(it)
}
}
}.first()
}
This is the testing code :
@ExperimentalCoroutinesApi
@FlowPreview
class UrlShorterServiceTest {
@Test
fun testHedging() {
val impls = listOf(NullImpl(), DumbImpl(), IsgdImpl(), TinyImpl())
val service = UrlShorterService(impls)
runBlocking {
service.getShortUrl("https://www.google.com").also {
logger.info("result = {}", it)
}
}
}
}
I purposely add NullImpl
and DumbImpl
in the front of the list , hoping it can be spawn earlier. The result seems OK :
* 13:40:21,358 INFO NullImpl - running : main @coroutine#3
* 13:40:21,380 INFO DumbImpl - running : main @coroutine#4
* 13:40:21,386 INFO IsgdImpl - running : main @coroutine#5
* 13:40:21,402 INFO IsgdImpl$getShortUrl$2 - running Dispatchers.IO : DefaultDispatcher-worker-1 @coroutine#5
* 13:40:21,416 INFO TinyImpl - running : main @coroutine#6
* 13:40:21,419 INFO TinyImpl$getShortUrl$2 - running Dispatchers.IO : DefaultDispatcher-worker-2 @coroutine#6
* 13:40:23,029 INFO TinyImpl$getShortUrl$2 - returning http://tinyurl.com/389lo
* 13:40:23,031 INFO IsgdImpl$getShortUrl$2 - returning https://is.gd/EuvYes
* 13:40:23,126 INFO UrlShorterServiceTest$testHedging$1 - result = http://tinyurl.com/389lo
The TinyImpl
returns first and returns the result. It seems good , but I wonder if it safe ? Are the other coroutine resources really cancelled / cleaned ? Is it better to rewrite it to this :
private suspend fun methodFlowMerge2(longUrl: String): String {
return impls.asSequence().asFlow().flatMapMerge(impls.size) { impl ->
flow {
impl.getShortUrl(longUrl)?.also {
emit(it)
}
}.flowOn(Dispatchers.IO)
}.first()
.also { Dispatchers.IO.cancelChildren() }
}
It flowOn(Dispatchers.IO)
and finally cancelChildren
, is it safer ? The underlaying http client (apache http client) is a blocking library , will there be any leaky ?
For a suspend function executing a http request , is a coroutine-supported http client library a must ? Such as retrofit
or kittinunf/fuel
?
SO kotlin expert Marko Topolnik uses channel
to solve it multithreading - Kotlin to achieve multithread request hedging? - Stack Overflow . But I wonder if it really necessary to use channel to solve the problem ?
The whole working code is available here : Hedging with blocking http client · GitHub
What’s your ideal / idiomatic solution regarding to this problem ?
Thanks.