Kotlin コルーチンでasync/awaitをつかってgRPC Serverをリクエストしてみた

Kotlin1.1からの新機能であるコルーチン(Coroutines)を試していきたい。Kotlin コルーチンをつかえば非同期処理を同期的なコードで書けるし、非同期処理をブロッキングすることもシンプルなコードで書ける。
今回はgRPC Serverへのリクエスト部分をコルーチンをつかい非同期化させてみたのでコードをまとめていく。

Serverへのリクエストは非同期化するが処理結果や例外処理をキャッチしたい

メインのリクエストは非同期化するがgRPC Serverのレスポンスや例外処理を呼び出し元でどのようにすれば受け取れるだろうか。この場合にはコルーチンの結果を返すことができるasyncをつかいawaitで処理を中断してコルーチンから値を取り出す。

suspend fun getTask(taskId: Long): TaskOutbound =
        async(CommonPool) {
            try {
                val outbound = ShutdownLoan.using(getChannel(), { channel ->
                    val msg = TaskInbound.newBuilder().setTaskId(taskId.toInt()).build()
                    TaskServiceGrpc.newBlockingStub(channel).getTaskService(msg)
                })
                Result.Success<TaskOutbound, GrpcException>(outbound)
            } catch (e: Exception) {
                val status = Status.fromThrowable(e)
                logger.error(e) { "gRPC server error, code:{%d}, description:{%s}".format(status.code.value(), status.description) }
                Result.Failure<TaskOutbound, GrpcException>(status with status.description)
            }
        }.await().fold({ it }, { throw it })

非同期化する前のコードとの差分は次のようになっている。

-    fun getTask(taskId: Long): TaskOutbound =
-            try {
-                ShutdownLoan.using(getChannel(), { channel ->
-                    val msg = TaskInbound.newBuilder().setTaskId(taskId.toInt()).build()
-                    TaskServiceGrpc.newBlockingStub(channel).getTaskService(msg)
-                })
-            } catch (e: Exception) {
-                val status = Status.fromThrowable(e)
-                logger.error(e) { "gRPC server error, code:{%d}, description:{%s}".format(status.code.value(), status.description) }
-                throw status with status.description
-            }
-
+    suspend fun getTask(taskId: Long): TaskOutbound =
+            async(CommonPool) {
+                try {
+                    val outbound = ShutdownLoan.using(getChannel(), { channel ->
+                        val msg = TaskInbound.newBuilder().setTaskId(taskId.toInt()).build()
+                        TaskServiceGrpc.newBlockingStub(channel).getTaskService(msg)
+                    })
+                    Result.Success<TaskOutbound, GrpcException>(outbound)
+                } catch (e: Exception) {
+                    val status = Status.fromThrowable(e)
+                    logger.error(e) { "gRPC server error, code:{%d}, description:{%s}".format(status.code.value(), status.description) }
+                    Result.Failure<TaskOutbound, GrpcException>(status with status.description)
+                }
+            }.await().fold({ it }, { throw it })
  • getTask関数の先頭にsuspendキーワードがついている。この関数内でコルーチンの処理結果を受け取りるため await()をつけた。非同期処理の処理結果を受け取るということは処理を中断することになる。コルーチンの処理を中断することができるSuspending functionsgetTask関数に加えた。
  • asyncブロックは先のとおり値を返すコルーチンにするためである。
  • asyncブロック内にgRPC Serverのリクエスト処理がある。タスクを1件取得する処理が記述されている。修正前のコードは例外が起きた場合、throwしていたが修正後はcom.github.kittinunf.result.Resultを使ってResultオブジェクトを返却するコードにした。こうするとasyncブロックをtry-catchで囲む必要がなくなるので可読性があがることを期待してやってみた。 { throw it }節でコルーチンで起きた例外を受け取り必要な処理を挟むことができる。

コルーチンの呼び出し元でブロッキングする

getTask関数はsuspendキーワードがついているため呼び出し元でrunBlockingブロックを加えブロッキングをしている。

変更コードの差分は次のとおりである。

     fun fetchByTaskId(req: ServerRequest) = ok().json().body(
-            Mono.just(TaskModel(taskBackendClient.getTask(req.pathVariable("id").toLong()))))
+            runBlocking {
+                Mono.just(TaskModel(taskBackendClient.getTask(req.pathVariable("id").toLong())))
+            })

まとめ

  • コルーチンを試して非同期処理を導入できた。
  • サンプルのコードではawait()を1度しか使っていないが複数の非同期処理をasyncで呼び出し用途にあわせてawaitを使って直列に処理させたり並列に処理させることができる。
  • 一見、非同期処理を複雑な印象を与えコードの可読性を下げる懸念を与えるがKotlinコルーチンが素晴らしいのは同期的なシンプルなコードでこれを実現できることにある。

コード

エントリでは一部コードの抜粋になっているがgithubに全てのコードを参照できるので参考にしてほしい。

github.com