Context:
Currently in my web application each request handled by each thread so to store logging context in whole application i use threadLocal which is initialised at start of request and flush logging data & reset at the end of the in request in a filter. But now we have to make a bulk api which simply calls underline service ( this underline service makes network calls) concurrently and combines results.
Problem
For calling underline service concurrently i’m using coroutines since we already use spring webflux so at controller layer i can define suspend fun and inside this suspend function i can dispatch concurrent tasks with IO Dispatcher. So far so good but the problem is with how to handle existing threadLocal and logging context. By reading docs i understand that i can pass threadLocal as contextElement in coroutines but since we defined suspend function at controller instead of using runBlocking coroutine builder it means our main thread won’t be blocked hence request can be handle by another thread while returning response so thread local doesn’t make sense to me. How can i have Context for whole request like threadLocal or how can i use threadLocal in this situation ?
NOTE: To make situation worse in current setup we use Flow and webClient to make async calls ( even though we block until we get all result ) but passing threadLocal in reactor was easy with enableAutomaticContextPropagationHooks (reactor-core 3.7.6)
I have problems understanding this part, could you elaborate? You should be able to add thread local at the beginning of handling the request and this should do the trick. Also, if you don’t have too much code based on thread locals, you can migrate it to store the data in the coroutine context instead of thread locals.
Thank you so much broot for help. I will try to explain again
So in my controller layer i’m defining method like
@GetMapping("/bulk/operation")
suspend fun processBulkRequestConcurrennlty( bulkOperationRequest ): List<individualResponse> {
return withContext(Dispatchers.IO + requestContextSpringBean.getMyThreadLocal().asContextElement()) {
withTimeout(bulkTimeoutMs) {
coroutineScope {
bulkOperationRequest
.indvidualRequestsList
.map { individualRequest ->
async(Dispatchers.IO) {
withTimeout(individualTimeoutMs) {
// Actual service that i want to call concurrently
individualService.handle(individualRequest)
}
}
}.awaitAll()
}
}
}
}
So till the time request comes to controller in filter myThreadLocal will be initialised to request handling thread and i can pass that threadLocal to coroutines using withContext as you said in another thread. But this creates some doubt to me
Since underline request thread can be released and can pick up another request (that’s the reason i’m not using runBlocking ) hence removing existing threadLocal data by initialising again. This way i will lose the context in previous request.
After all concurrent tasks are done this returning response work can be picked up by another thread so losing original threadLocal/Context again and flushing value of threadLocal in filter won’t be correct data.
Using runBlocking i can block request thread and block thread will solve these problem but i want to know if there’s a better solution.
That’s exactly why we use asContextElement(), don’t we? To solve the problem of switching from one thread to another.
My understanding is that inside the withContext block we are guaranteed to get the correct value of the thread local - no matter if there is a single thread or 10 of them. After leaving the block the value is probably undefined, so I wouldn’t read it.
If for any reasons you have to remove the value in the filter, you can safely do it and you won’t break other requests, because of the above. But if you need to do some kind of a clean up exactly once for every generated value (e.g. free allocated resources), then I don’t know if this can be done with filters.
Yes you’re right but as i mentioned in first point if request thread release back to pool and picks up another request then it will reinitialise the threadLocal ( Which basically has a mutable map inside) hence previous request would loose its context that’s the problem i’m facing.
Do you observe this in practice? My understanding is that the problem won’t occur. myThreadLocal.asContextElement() basically means that whenever inside the block the coroutine resumes within a new thread, the value of myThreadLocal is set again to the value it was at the time we entered the block. So if you reinitialize it to another value, it doesn’t break anything, because if the thread will go back to handling the previous request, the thread local will be set again to the value associated with that request (more specifically: with the previous coroutine).
Thank you so much!! that’s what i was looking for.
To answer your question i don’t observe in practice still in implementation phase of api will update thread if i will face this.
I just have one more doubt inside withContext block i have async coroutines builder do i need to pass context in these builder explicitly or will it be propagated automatically ?
Thanks again @broot and @kyay10 for helping.
To conclude thread i also created example code Kotlin Playground: Edit, Run, Share Kotlin Code Online where i’m putting values in a map (Wrapped by threadLocal) inside coroutines builders and then finally once all coroutines builders are done i can see whole map is populated with all values.
Though with my existing solution i won’t be able to use filter to flushing all data of threadLocal i need to this inside withContext block only.
But reinitialising threadLocal again by request thread won’t clear my map because context in withContext is copied as value not reference of map is this my understanding correct ?
Essentially this. As far as I understand, Kotlin coroutine context is basically a giant fancy immutable Map. So if you were to store a mutable value in coroutine context, like a mutable Map or a mutable List, and then mutate the data inside that container, that would mess up the coroutine context. But as long as a copy of the Map or List or whatever is provided to the coroutine context, then everything’s fine.
I’m a l little bit confused with your example. I thought you want to separate data per each request, but here you actually share it for both requests. For this we don’t need thread locals or coroutine context.
But if your question is if you can safely set the thread local to a new mutable map for each request, so each has a separate map, then yes, this is fine. This is not really specific to thread locals or coroutines. In general, if we replace a variable with another map, the old one shouldn’t be affected.
I don’t really see why mutating the data stored in the context would mess it up. It should be perfectly fine to store a mutable map in the context and modify it throughout the request handling.
Also, as said earlier: if your solution is entirely custom, I suggest to not use thread locals in the first place. This asContextElement() is a kind of workaround to bridge coroutines with existing code utilizing thread locals. Coroutine-native solution is to store data in the coroutine context:
fun main(): Unit = runBlocking {
launch(MyRequestData()) {
coroutineContext[MyRequestData]!!.username = "John"
delay(500)
doSomething()
}
launch(MyRequestData()) {
coroutineContext[MyRequestData]!!.username = "James"
delay(500)
doSomething()
}
}
suspend fun doSomething() {
println("Username: ${coroutineContext[MyRequestData]!!.username}")
}
class MyRequestData : CoroutineContext.Element {
var username: String? = null
override val key get() = Key
companion object Key : CoroutineContext.Key<MyRequestData>
}
I added doSomething just to show we can access the data everywhere in the suspending context. If you need to access the data in a function which is not suspending, then I guess thread local is the way to go.
Also, while you use Spring, did you consider request-scoped beans? Such global variables using either thread locals or coroutine context aren’t generally an ideal solution. However, they make sense for logging, etc.
Also, while you use Spring, did you consider request-scoped beans? Such global variables using either thread locals or coroutine context aren’t generally an ideal solution. However, they make sense for logging, etc.
Yes i used request-scoped bean only but in spring request scope == thread scope ( I have seen this in practice) and i use ThreadLocal for logging and request tracing only. As i mentioned in my first code block under the hood in new api i want to existing service concurrently which use existing spring bean like RequestMetrics in below code for updating threadLocal data and than at the end of request in filter flush all threadLocal data
@Component
class RequestMetrics(
private val someOtherBean: SomeOtherBean,
) {
private val data =
ThreadLocal.withInitial {
RequestData(
Collections.synchronizedList(mutableListOf()),
ConcurrentHashMap(),
AtomicReference(null),
AtomicReference(someOtherBean.currentNanoTime()),
)
}
data class RequestData(
val timerExecutions: MutableList<TimerBlock>,
val featureFlags: MutableMap<String, String>,
val requestStartNanos: AtomicReference<Long>,
)
fun <T> time(
timerKey: String,
action: () -> T,
): T {
// Add a timerBlock to the timerExecutions list for given block of code
val timerExecutions = data.get().timerExecutions
val requestStartNanos = data.get().requestStartNanos
.......
}
}
Now i want to make sure that this existing setup will work as it as and with my new api as well by changing threadLocal data type from RequestData to map<String, RequestData> so that in new api all concurrent tasks have their own RequestData
Could you please elaborate this more ? What do you mean by both request ? I want to have have separate data per each request. Earlier it was easy because each request had only one thread hence one threadLocal
I mean in your example both “request A” and “request B” share exactly the same map - which is clearly visible in logs. If you need A to use one map, and B to use another map, you need to set the thread local to a new map for every request.
Ooh my bad i got confused between names. In my example “request A” and “request B” are simple to concurrent tasks when i say request i mean whole request which will trigger multiple concurrent tasks in example code like “request A” and “reqeust B” ( i should have given diff name “task A” and “task B” my bad) so yes you’re right i want to share data in all tasks of a single request but not between whole requests.
// ThreadLocal data
val myData = mutableMapOf("username" to "Test User", "auth" to "my token")
// Here we put the ThreadLocal data into the coroutine context
val coroutineContext = mapOf("requestContext" to myData)
// Here the ThreadLocal data gets cleared and populated for a different request
myData.clear()
myData["username"] = "Other User"
myData["auth"] = "other token"
// Creating a second coroutine with new request context
val otherCoroutineContext = mapOf("requestContext" to myData)
println(coroutineContext["requestContext"]!!.get("username")) // prints "Other User" because the first coroutine context was given a reference to a mutable map which was modified after being provided to the coroutine context
This is what I meant; assuming this is how ThreadLocal works (which I don’t know if it does work this way), you would have a problem. However, if ThreadLocal creates a new Map each time, or coroutine context does a deep copy of the Map when it’s created, it’s all good.
(I know the ThreadLocal probably doesn’t just use a plan Map, and that coroutine context doesn’t work exactly like that. This is pseudo code to demonstrate what I meant about mutability and immutability and references.)
I don’t get the example. The only reason why it worked this way is because the second coroutine somehow accessed the context of the first one, which never happens in reality, because each coroutine has its own context. I doubt there is a requirement that coroutine context elements are immutable - I fail to see why they would impose such a requirement and I don’t see anything in docs.
BTW, neither thread locals nor coroutine context do anything magic with the data we provide to them: they don’t create copies, they don’t instantiate new maps, etc. They know nothing about the data we provide, they only store it for us.
edit:
Ahh, you meant to use otherCoroutineContext in the last line. But then again: why do we use the same myData for both contexts? We would get exactly the same behavior with immutable data. The whole point of using thread locals / coroutine contexts is to have separate data per thread / coroutine. If we need to share something globally, then we use global variables.
Even i had same doubt if coroutine context creates a deep copy of Map or not. Ig if it doesn’t create a deep copy than this would be a problem why ?
Consider a request comest to server and handled by some I/O worker thread.
I/O worker creates a threadLocal and clear map entry. ( Most probably threadLocal would be already initialised by previous request so clear map for fresh request)
Request comes to controller layer and threadLocal map is passed to coroutines context and request is handles by coroutines I/O Pool thread.
I/O worker thread released back to Pool.
Same I/O worker thread is picked by another request. Again with step-1 it clear the map to fresh start the request. So if previous request coroutines context has the reference of the map the all data would be cleared.
So can someone confirm if coroutines context creates a deep copy of object inside threadLocal or store the same reference ?