val jobs = arrayListOf<Job>()
for(n in 1..120) {Preformatted text
jobs += launch(newSingleThreadContext("NEWTHREAD")) {
ToProtobuf(grpcServer, complexObject.build(), n)
}
}
jobs.forEach { it.join() }
println("JOB DONE")
fun ToProtobuf(grpcServer: GRPCServer, complexObject:ComplexObject,n:Int){
val grpcObserver=gRPCObserver(n.toString())
grpcServer.addLessonDBNonBlocking(complexObject,grpcObserver) <--- gRPC Async Call
grpcObserver.latch.await()
}
class gRPCObserver(name:String) : StreamObserver<NewId>{
val sname=name
var latch=CountDownLatch(1)
override fun onError(t: Throwable?) { logger.error("$sname reponseObserver onError: ${t.toString()}") }
override fun onCompleted() { latch.countDown() }
override fun onNext(value: NewId?) { }
}
The issue is that when you deal with async grpc calls you have to rely on a callback executed on an observer, and well for waiting asynchronously on that observer object I could not figure out better than a latch …
But I wonder if there is a better solution, CountDownLatch seems to have bad reputation.
Besides the present solution works very well and its damn simple … But I too want to get rid of latches!
I
Nice to know it, but unfortunately I’m forced to send and object which implements three call backs: the
gRPCObserver not just a call back , but I guess it’s doable
So now I’m trying to convert
fun addSubjectDBNonBlocking(subject: Subject,obs: StreamObserver<NewId>)
in to
fun someLongComputation(params: Params, callback: (Result) -> Unit)
with the hope to attain the suspended kingdom of:
suspend fun someLongComputation(params: Params): Result = suspendCoroutine { cont ->
someLongComputation(params) { cont.resume(it) }
}
fun addS(pair: Pair<Subject,StreamObserver<NewId>>,callback:(NewId)->Unit){
try {
callback=pair.second::onNext <-- I need this
stub.addSubject(pair.first,pair.second)
}catch (e:Exception){
logger.error("addSubjectDBNonBlocking",e)
throw e
}finally {
}
}
suspend fun addS(pair: Pair<Subject,StreamObserver<NewId>>):NewId = suspendCoroutine { cont->
addS(pair){ cont.resume(it) }
}
Now it works I used the Pair to be able to mimic the signature of the suspending paradise function and went to nirvana, alas callback is never called and Job never ends … so back to cruel earth .
Nevertheless if this was C++ it will be just a matter of a casting between two function pointers, so I guess there is also a solution in kotlin
This can be solved neatly if the gRPC Service/Client stubs natively used coroutine primitives such as Deferred and Channel. That would do away with all the manual StreamObserver state management.