gRPC Kotlin - Coroutine based gRPC for Kotlin
gRPC Kotlin – Coroutine based gRPC for Kotlin
gRPC Kotlin is a protoc plugin for generating native Kotlin bindings using coroutine primitives for gRPC services.
Why?
The asynchronous nature of bidirectional streaming rpc calls in gRPC makes them a bit hard to implement
and read. Getting your head around the StreamObserver<T>
‘s can be a bit tricky at times. Specially
with the method argument being the response observer and the return value being the request observer, it all
feels a bit backwards to what a plain old synchronous version of the handler would look like.
In situations where you’d want to coordinate several request and response messages in one call, you’ll and up having to manage some tricky state and synchronization between the observers. There are reactive bindings for gRPC which make this easier. But I think we can do better!
Enter Kotlin Coroutines! By generating native Kotlin stubs that allows us to use suspend
functions and
Channel
, we can write our handler and client code in an idiomatic and easy to read Kotlin style.
Quick start
Note: This has been tested with gRPC 1.25.0
, protobuf 3.10.0
, kotlin 1.3.61
and coroutines 1.3.3
.
Add a gRPC service definition to your project
greeter.proto
syntax = "proto3";
package org.example.greeter;
option java_package = "org.example.greeter";
option java_multiple_files = true;
message GreetRequest {
string greeting = 1;
}
message GreetReply {
string reply = 1;
}
service Greeter {
rpc Greet (GreetRequest) returns (GreetReply);
rpc GreetServerStream (GreetRequest) returns (stream GreetReply);
rpc GreetClientStream (stream GreetRequest) returns (GreetReply);
rpc GreetBidirectional (stream GreetRequest) returns (stream GreetReply);
}
Run the protoc plugin to get the generated code, see build tool configuration
Server
After compilation, you’ll find the generated Kotlin code in the same package as the generated Java
code. A service base class named GreeterImplBase
and a file with extension functions for the
client stub named GreeterStubExt.kt
. Both the service base class and client stub extensions will
use suspend
and Channel<T>
instead of the typical StreamObserver<T>
interfaces.
All functions have the suspend
modifier so they can call into any suspending code, including the
core coroutine primitives like delay
and async
.
All the server streaming calls return a ReceiveChannel<TReply>
and can easily be implemented using
produce<TReply>
.
All client streaming calls receive an argument of ReceiveChannel<TRequest>
where they can receive()
messages from the caller.
Here’s an example server that demonstrates how each type of endpoint is implemented.
import kotlinx.coroutines.*
import kotlinx.coroutines.channels.ReceiveChannel
import kotlinx.coroutines.channels.produce
import java.util.concurrent.Executors.newFixedThreadPool
class GreeterImpl : GreeterImplBase(
coroutineContext = newFixedThreadPool(4).asCoroutineDispatcher()
) {
// unary rpc
override suspend fun greet(request: GreetRequest): GreetReply {
return GreetReply.newBuilder()
.setReply("Hello " + request.greeting)
.build()
}
// server streaming rpc
override fun greetServerStream(request: GreetRequest) = produce<GreetReply> {
send(GreetReply.newBuilder()
.setReply("Hello ${request.greeting}!")
.build())
send(GreetReply.newBuilder()
.setReply("Greetings ${request.greeting}!")
.build())
}
// client streaming rpc
override suspend fun greetClientStream(requestChannel: ReceiveChannel<GreetRequest>): GreetReply {
val greetings = mutableListOf<String>()
for (request in requestChannel) {
greetings.add(request.greeting)
}
return GreetReply.newBuilder()
.setReply("Hi to all of $greetings!")
.build()
}
// bidirectional rpc
override fun greetBidirectional(requestChannel: ReceiveChannel<GreetRequest>) = produce<GreetReply> {
var count = 0
for (request in requestChannel) {
val n = count++
launch {
delay(1000)
send(GreetReply.newBuilder()
.setReply("Yo #$n ${request.greeting}")
.build())
}
}
}
}
Client
Extensions functions for the original Java stubs are generated that use suspend
functions, Deferred<TReply>
and SendChannel<TRequest>
.
import io.grpc.ManagedChannelBuilder
import kotlinx.coroutines.delay
import kotlinx.coroutines.launch
import kotlinx.coroutines.runBlocking
fun main(args: Array<String>) {
val localhost = ManagedChannelBuilder.forAddress("localhost", 8080)
.usePlaintext()
.build()
val greeter = GreeterGrpc.newStub(localhost)
runBlocking {
// === Unary call =============================================================================
val unaryResponse = greeter.greet(req("Alice"))
println("unary reply = ${unaryResponse.reply}")
// === Server streaming call ==================================================================
val serverResponses = greeter.greetServerStream(req("Bob"))
for (serverResponse in serverResponses) {
println("server response = ${serverResponse.reply}")
}
// === Client streaming call ==================================================================
val manyToOneCall = greeter.greetClientStream()
manyToOneCall.send(req("Caroline"))
manyToOneCall.send(req("David"))
manyToOneCall.close()
val oneReply = manyToOneCall.await()
println("single reply = ${oneReply.reply}")
// === Bidirectional call =====================================================================
val bidiCall = greeter.greetBidirectional()
launch {
var n = 0
for (greetReply in bidiCall) {
println("r$n = ${greetReply.reply}")
n++
}
println("no more replies")
}
delay(200)
bidiCall.send(req("Eve"))
delay(200)
bidiCall.send(req("Fred"))
delay(200)
bidiCall.send(req("Gina"))
bidiCall.close()
}
}
gRPC Context propagation
gRPC has a thread-local Context
which is used to carry scoped values across API boundaries. With Kotlin coroutines
possibly being dispatched on multiple threads, the thread-local nature of Context
needs some special care. This is
solved by two details in the generated Kotlin code.
First, all the generated service *ImplBase
classes implement CoroutineScope
. This allows you to use any of
the top level coroutine primitives such as launch
, async
and produce
in your service implementation while still
keeping them within the context of your service code. The actual CoroutineContext
that is used can be set through the
base class constructor, but defaults to Dispatchers.default
.
abstract class MyServiceImplBase(
coroutineContext: CoroutineContext = Dispatchers.Default
)
Second, in the getter for CoroutineScope.coroutineContext
, an additional context key is added to the
CoroutineContext
that manages the gRPC Context attach()
and detach()
calls when dispatching coroutine
continuations. This will ensure that the the gRPC context is always propagated across different coroutine boundaries,
and eliminates the need to manually carry it across in user code.
Here’s a simple example that makes calls to other services concurrently and expects an authenticated user to be present
in the gRPC Context. The two accesses to the context key may execute on different threads in the CoroutineContext
but
the accesses work as expected.
val authenticatedUser = Context.key<User>("authenticatedUser")
override suspend fun greet(request: GreetRequest): GreetReply {
val motd = async { messageOfTheDay.getMessage() }
val weatherReport = async { weather.getWeatherReport(authenticatedUser.get().location) }
val reply = buildString {
append("Hello ${authenticatedUser.get().fullName}")
append("---")
append("Today's weather report: ${weatherReport.await()}")
append("---")
append(motd.await())
}
return GreetReply.newBuilder()
.setReply(reply)
.build()
}
For another example of gRPC Context usage, see the code in ContextBasedGreeterTest
Thanks to wfhartford
for contributing!
Exception handling
The generated server code follows the standard exception propagation for Kotlin coroutines as described
in the Exception handling documentation. This means that it’s safe to throw exceptions from within
the server implementation code. These will propagate up the coroutine scope and be translated to
responseObserver.onError(Throwable)
calls. The preferred way to respond with a status code is to
throw a StatusException
.
Note that you should not call close(Throwable)
or close()
from within the ProducerScope<T>
blocks you get from produce
as the producer will automatically be closed when all sub-contexts are
closed (or if an exception is thrown).
Maven configuration
Add the grpc-kotlin-gen
plugin to your protobuf-maven-plugin
configuration (see compile-custom goal)
<properties>
<kotlin.version>1.3.61</kotlin.version>
<kotlinx-coroutines.version>1.3.3</kotlinx-coroutines.version>
<grpc.version>1.25.0</grpc.version>
<protobuf.version>3.10.0</protobuf.version>
<grpc-kotlin.version>0.1.4</grpc-kotlin.version>
</properties>
<dependencies>
<dependency>
<groupId>org.jetbrains.kotlin</groupId>
<artifactId>kotlin-stdlib</artifactId>
<version>${kotlin.version}</version>
</dependency>
<dependency>
<groupId>org.jetbrains.kotlinx</groupId>
<artifactId>kotlinx-coroutines-core</artifactId>
<version>${kotlinx-coroutines.version}</version>
</dependency>
<dependency>
<groupId>io.grpc</groupId>
<artifactId>grpc-netty</artifactId>
<version>${grpc.version}</version>
</dependency>
<dependency>
<groupId>io.grpc</groupId>
<artifactId>grpc-protobuf</artifactId>
<version>${grpc.version}</version>
</dependency>
<dependency>
<groupId>io.grpc</groupId>
<artifactId>grpc-stub</artifactId>
<version>${grpc.version}</version>
</dependency>
</dependencies>
<build>
<extensions>
<extension>
<groupId>kr.motd.maven</groupId>
<artifactId>os-maven-plugin</artifactId>
<version>1.5.0.Final</version>
</extension>
</extensions>
<plugins>
<plugin>
<groupId>org.xolstice.maven.plugins</groupId>
<artifactId>protobuf-maven-plugin</artifactId>
<version>0.6.1</version>
<configuration>
<protocArtifact>com.google.protobuf:protoc:${protobuf.version}:exe:${os.detected.classifier}</protocArtifact>
</configuration>
<executions>
<execution>
<goals><goal>compile</goal></goals>
</execution>
<execution>
<id>grpc-java</id>
<goals><goal>compile-custom</goal></goals>
<configuration>
<pluginId>grpc-java</pluginId>
<pluginArtifact>io.grpc:protoc-gen-grpc-java:${grpc.version}:exe:${os.detected.classifier}</pluginArtifact>
</configuration>
</execution>
<execution>
<id>grpc-kotlin</id>
<goals><goal>compile-custom</goal></goals>
<configuration>
<pluginId>grpc-kotlin</pluginId>
<pluginArtifact>io.rouz:grpc-kotlin-gen:${grpc-kotlin.version}:exe:${os.detected.classifier}</pluginArtifact>
</configuration>
</execution>
</executions>
</plugin>
<!-- make sure to add the generated source directories to the kotlin-maven-plugin -->
<plugin>
<artifactId>kotlin-maven-plugin</artifactId>
<groupId>org.jetbrains.kotlin</groupId>
<version>${kotlin.version}</version>
<executions>
<execution>
<id>compile</id>
<goals><goal>compile</goal></goals>
<configuration>
<sourceDirs>
<sourceDir>${project.basedir}/src/main/kotlin</sourceDir>
<sourceDir>${project.basedir}/target/generated-sources/protobuf/grpc-kotlin</sourceDir>
<sourceDir>${project.basedir}/target/generated-sources/protobuf/grpc-java</sourceDir>
<sourceDir>${project.basedir}/target/generated-sources/protobuf/java</sourceDir>
</sourceDirs>
</configuration>
</execution>
</executions>
</plugin>
</plugins>
</build>
Gradle configuration
Add the grpc-kotlin-gen
plugin to the plugins section of protobuf-gradle-plugin
def protobufVersion = '3.10.0'
def grpcVersion = '1.25.0'
def grpcKotlinVersion = '0.1.4'
protobuf {
protoc {
// The artifact spec for the Protobuf Compiler
artifact = "com.google.protobuf:protoc:${protobufVersion}"
}
plugins {
grpc {
artifact = "io.grpc:protoc-gen-grpc-java:${grpcVersion}"
}
grpckotlin {
artifact = "io.rouz:grpc-kotlin-gen:${grpcKotlinVersion}"
}
}
generateProtoTasks {
all()*.plugins {
grpc {}
grpckotlin {}
}
}
}
Add the kotlin dependencies
def kotlinVersion = '1.3.61'
def kotlinCoroutinesVersion = '1.3.3'
dependencies {
compile "org.jetbrains.kotlin:kotlin-stdlib:$kotlinVersion"
compile "org.jetbrains.kotlinx:kotlinx-coroutines-core:$kotlinCoroutinesVersion"
}
Examples
This is a list of example gRPC services and clients written using this project
RPC method type reference
Unary call
rpc Greet (GreetRequest) returns (GreetReply);
Service
A suspendable function which returns a single message.
override suspend fun greet(request: GreetRequest): GreetReply {
// return GreetReply message
}
Client
Suspendable call returning a single message.
val response: GreetReply = stub.greet( /* GreetRequest */ )
Client streaming call
rpc GreetClientStream (stream GreetRequest) returns (GreetReply);
Service
A suspendable function which returns a single message, and receives messages from a ReceiveChannel<T>
.
override suspend fun greetClientStream(requestChannel: ReceiveChannel<GreetRequest>): GreetReply {
// receive request messages
val firstRequest = requestChannel.receive()
// or iterate all request messages
for (request in requestChannel) {
// ...
}
// return GreetReply message
}
Client
Using send()
and close()
on SendChannel<T>
.
val call: ManyToOneCall<GreetRequest, GreetReply> = stub.greetClientStream()
call.send( /* GreetRequest */ )
call.send( /* GreetRequest */ )
call.close() // don't forget to close the send channel
val responseMessage = call.await()
Server streaming call
rpc GreetServerStream (GreetRequest) returns (stream GreetReply);
Service
Using produce
and send()
to send a stream of messages.
override fun greetServerStream(request: GreetRequest) = produce<GreetReply> {
send( /* GreetReply message */ )
send( /* GreetReply message */ )
// ...
}
Note that close()
or close(Throwable)
should not be used, see Exception handling.
In kotlinx-coroutines-core:1.0.0
produce
is marked with @ExperimentalCoroutinesApi
. In order
to use it, mark your server class with @UseExperimental(ExperimentalCoroutinesApi::class)
and
add the -Xuse-experimental=kotlin.Experimental
compiler flag.
Client
Using receive()
on ReceiveChannel<T>
or iterating with a for
loop.
val responses: ReceiveChannel<GreetReply> = stub.greetServerStream( /* GreetRequest */ )
// await individual responses
val responseMessage = serverResponses.receive()
// or iterate all responses
for (responseMessage in responses) {
// ...
}
Full bidirectional streaming call
rpc GreetBidirectional (stream GreetRequest) returns (stream GreetReply);
Service
Using produce
and send()
to send a stream of messages. Receiving messages from a ReceiveChannel<T>
.
override fun greetBidirectional(requestChannel: ReceiveChannel<GreetRequest>) = produce<GreetReply> {
// receive request messages
val firstRequest = requestChannel.receive()
send( /* GreetReply message */ )
val more = requestChannel.receive()
send( /* GreetReply message */ )
// ...
}
Note that close()
or close(Throwable)
should not be used, see Exception handling.
In kotlinx-coroutines-core:1.0.0
produce
is marked with @ExperimentalCoroutinesApi
. In order
to use it, mark your server class with @UseExperimental(ExperimentalCoroutinesApi::class)
and
add the -Xuse-experimental=kotlin.Experimental
compiler flag.
Client
Using both a SendChannel<T>
and a ReceiveChannel<T>
to interact with the call.
val call: ManyToManyCall<GreetRequest, GreetReply> = stub.greetBidirectional()
launch {
for (responseMessage in call) {
log.info(responseMessage)
}
log.info("no more replies")
}
call.send( /* GreetRequest */ )
call.send( /* GreetRequest */ )
call.close() // don't forget to close the send channel