Coroutines and protobuf async streams


#1
val jobs = arrayListOf<Job>()
for(n in 1..120) {Preformatted text
    jobs += launch(newSingleThreadContext("NEWTHREAD")) {
        ToProtobuf(grpcServer, complexObject.build(), n)
    }
}
jobs.forEach { it.join() }
println("JOB DONE")

fun ToProtobuf(grpcServer: GRPCServer,  complexObject:ComplexObject,n:Int){
    val grpcObserver=gRPCObserver(n.toString())
    grpcServer.addLessonDBNonBlocking(complexObject,grpcObserver) <--- gRPC Async Call
    grpcObserver.latch.await()

}

class gRPCObserver(name:String) : StreamObserver<NewId>{
    val sname=name
    var latch=CountDownLatch(1)
    override fun onError(t: Throwable?) { logger.error("$sname  reponseObserver onError: ${t.toString()}")   }
    override fun onCompleted()          { latch.countDown()   }
    override fun onNext(value: NewId?)  {        } 

}

The issue is that when you deal with async grpc calls you have to rely on a callback executed on an observer, and well for waiting asynchronously on that observer object I could not figure out better than a latch …
But I wonder if there is a better solution, CountDownLatch seems to have bad reputation.
Besides the present solution works very well and its damn simple … But I too want to get rid of latches!
I


#2

Please format your code, thanks.

4 spaces before each line :)

#3

well, you never stop learning!! thank you for hint


#4

Thank you.

Create too many thread to manage delays isn’t a good idea.
You shoud use the coroutines to avoid newSingleThreadContext.

To reach this goal you need to rewrite your addLessonDBNonBlocking, take a look here:

Finally respecting the code convensions is a great help for the reader, so use a lower case for functions and upper case for types.
Here some details:

http://kotlinlang.org/docs/reference/coding-conventions.html#naming-rules


#5

Nice to know it, but unfortunately I’m forced to send and object which implements three call backs: the
gRPCObserver not just a call back , but I guess it’s doable
So now I’m trying to convert

fun addSubjectDBNonBlocking(subject: Subject,obs: StreamObserver<NewId>)

in to

  fun someLongComputation(params: Params, callback: (Result) -> Unit)

with the hope to attain the suspended kingdom of:

    suspend fun someLongComputation(params: Params): Result = suspendCoroutine { cont ->
       someLongComputation(params) { cont.resume(it) }
    } 

Thank you again


#6
fun addS(pair: Pair<Subject,StreamObserver<NewId>>,callback:(NewId)->Unit){
    try {
         callback=pair.second::onNext            <--   I need this 
        stub.addSubject(pair.first,pair.second)
    }catch (e:Exception){
        logger.error("addSubjectDBNonBlocking",e)
        throw e
    }finally {

    }
}

suspend fun addS(pair: Pair<Subject,StreamObserver<NewId>>):NewId = suspendCoroutine { cont->
    addS(pair){ cont.resume(it) }
}

Now it works I used the Pair to be able to mimic the signature of the suspending paradise function and went to nirvana, alas callback is never called and Job never ends … so back to cruel earth .

Nevertheless if this was C++ it will be just a matter of a casting between two function pointers, so I guess there is also a solution in kotlin

Thank you again for your guidance and patience


#7

Solved! and got rid of the latch. thank you!


#8

This can be solved neatly if the gRPC Service/Client stubs natively used coroutine primitives such as Deferred and Channel. That would do away with all the manual StreamObserver state management.

You might want to give this a try: https://github.com/rouzwawi/grpc-kotlin