grpc-javaのClient/ServerのテストをKotlinで書く - Client編

前回のエントリに続いて今回のエントリではgRPC Clientのテストの書き方をまとめていく。

naruto-io.hatenablog.com

テスト対象のproto

テスト対象のprotoは次のとおりSimple-RPCとする。

service TaskService {
  rpc GetTaskService (TaskInbound) returns (TaskOutbound) {
    option (google.api.http) = {
      get: "/v1/task"
    };
  }
}

message TaskInbound {
  uint32 task_id = 1;
}

message TaskOutbound {
  uint32 task_id = 1;
  string title = 2;
  string finishedAt = 3;
  string createdAt = 4;
  string updatedAt = 5;
}

finishedAtなどは本来であればgoogle/protobuf/timestamp.protoを使いたいところが今回はstringで定義している。

テストするgRPC Cleintとテスト内容

テスト対象のClientのコードは次のとおりである。

suspend fun getTask(taskId: Long): TaskOutbound =
        async(CommonPool) {
            try {
                val outbound = ShutdownLoan.using(getChannel(), { channel ->
                    val msg = TaskInbound.newBuilder().setTaskId(taskId.toInt()).build()
                    TaskServiceGrpc.newBlockingStub(channel).getTaskService(msg)
                })
                Result.Success<TaskOutbound, GrpcException>(outbound)
            } catch (e: Exception) {
                val status = Status.fromThrowable(e)
                logger.error(e) { "gRPC server error, code:{%d}, description:{%s}".format(status.code.value(), status.description) }
                Result.Failure<TaskOutbound, GrpcException>(status with status.description)
            }
        }.await().fold({ it }, { throw it })

private fun getChannel() = NettyChannelBuilder.forAddress(appProperties.grpc.backend.host, appProperties.grpc.backend.port)
        // for testing
        .usePlaintext(true)
        .build()

コルーチンをつかっているが、通常のgRPC Serverへリクエストするクライアントコードである。

  • val msg = TaskInbound.newBuilder().setTaskId(taskId.toInt()).build()でリクエスト変数を定義している。
  • TaskServiceGrpc.newBlockingStub(channel).getTaskService(msg)でgRPC serverへリクエストをしている。
  • gRPC Serverから受け取ったレスポンスを Result型に格納し返却する。
  • getChannel()はgRPCのチャネルを返すメソッドである。

テスト内容

テストする内容を次のようにまとめる。

  • getTask(taskId: Long)を呼び出すとタスクが1件取得できるか。
  • TaskServiceGrpc.newBlockingStub(channel).getTaskService(msg)ではgRPC Serverへリクエストが渡るがリクエスト結果がエラーだった場合に例外エラー(GrpcException)を受け取ることができるか。

テストコード

次にテストコードである。

前回のエントリでまとめたとおりテストコードのなかでServerを起動させる。Clientのテストではテスト内容に応じて 成功を返すgRPC Server Serviceエラーを返すgRPC Server Serviceを用意する。そして起動しているServerにテスト対象のgRPC Sereverをアサインする。コードとしては次のようになる。

@Before
fun setup() {

    serviceRegistry = MutableHandlerRegistry()
    inProcessServer = InProcessServerBuilder
            .forName(UNIQUE_SERVER_NAME).fallbackHandlerRegistry(serviceRegistry).directExecutor().build()
    inProcessChannel = InProcessChannelBuilder.forName(UNIQUE_SERVER_NAME).directExecutor().build()

    val appProperties = AppProperties()
    target = TaskBackendClient(appProperties)

    inProcessServer.start()
}

@After
fun shutdown() {
    inProcessChannel.shutdownNow()
    inProcessServer.shutdownNow()
}
  • @BeforeAfterはServerのテストのエントリでまとめたようにサーバーの起動と停止を行っている。

次のコードが重要である。

private class GetTaskServerOk: TaskServiceGrpc.TaskServiceImplBase() {

    override fun getTaskService(request: TaskInbound?, responseObserver: StreamObserver<TaskOutbound>?) {
        responseObserver?.onNext(TaskOutbound.newBuilder()
                .setTaskId(1)
                .setTitle("mocked Task")
                .setFinishedAt("2017-01-01T23:59:59Z")
                .setCreatedAt("2017-01-02T23:59:59Z")
                .setUpdatedAt("2017-01-02T23:59:59Z")
                .build()
        )
        responseObserver?.onCompleted()
    }
}

このGetTaskServerOkはテストするgRPC Serverが正常なレスポンスを返すServiceクラスである。
このServiceクラスをテストで起動したgRPC Serverにアサインすることでレスポンスをモックできる。

正常系のテスト

次にメインとなるテストコードをまとめる。こちらは正常系のテストである

@Test
fun getTask() {
    serviceRegistry.addService(GetTaskServerOk())

    // mock
    val instance = PowerMockito.spy(target)
    PowerMockito.doReturn(inProcessChannel).`when`(instance, "getChannel")

    runBlocking {

        // assertion
        val actual = instance.getTask(1L)

        actual.taskId shouldBe 1
        actual.title shouldBe "mocked Task"
        actual.finishedAt shouldBe "2017-01-01T23:59:59Z"
        actual.createdAt shouldBe "2017-01-02T23:59:59Z"
        actual.updatedAt shouldBe "2017-01-02T23:59:59Z"
    }
}
  • serviceRegistry.addService(GetTaskServerOk())でモック化したServiceクラスをgRPC Serverに追加している。
  • PowerMockito.doReturn(inProcessChannel).when(instance, "getChannel")では、ClientコードにあったgetChannel()をモック化している。getChannel()はgRPCのチャネルを返す関数であったが、テストコード内でモック化することでテストでビルドアップしたチャネルを適応している。

異常系のテスト

次に異常系のテストである。gRPC ServerからNot Foundのエラーを返すことを期待したテストコードである。

private class GetTaskServerNotFound : TaskServiceGrpc.TaskServiceImplBase() {

    override fun getTaskService(request: TaskInbound?, responseObserver: StreamObserver<TaskOutbound>?) {
        responseObserver?.onError(Status.NOT_FOUND.withDescription("task not found.").asRuntimeException())
        responseObserver?.onCompleted()
    }
}

@Test(expected = GrpcException::class)
fun getTask_then_NotFound() {
    serviceRegistry.addService(GetTaskServerNotFound())

    // mock
    val instance = PowerMockito.spy(target)
    PowerMockito.doReturn(inProcessChannel).`when`(instance, "getChannel")

    try {
        runBlocking {
            instance.getTask(1L)
        }
    } catch (e: GrpcException) {
        e.message shouldBe "task not found."
        e.status shouldBe HttpStatus.NOT_FOUND
        throw e
    }
}
  • エラーを返すGetTaskServerNotFoundクラスを定義してgRPC Serverにアサインしている。
  • テストコードではエラーが発生しているか、エラーメッセージ、コードが期待どおりかテストをしている。

まとめ

  • Server編とClient編に分けてgRPC ServerとClientのテストコードをまとめた。
  • 今回はSimple-RPCをまとめたが次の機会にはServerSideStreaming-RPCClientSideStreaming-RPCBidirectionalStreaming-RPCのStreamingのテストをまとめていきたい。

コード

エントリで紹介したコードは一部分のためコード全体はgithubを参照してください。

github.com

テストコードはこちらです。

grpc-javaのClient/ServerのテストをKotlinで書く - Server編

grpc-javaで実装されたgRPC ClientとgRPC Serverのテストコードについてまとめていきたい。

ClientとServerのどちらも大枠は同じである。テストコードのなかでgRPC Serverを起動させる。そしてリクエスト内のトランザクションを必要に応じてモック化しながら期待値が取得できているか、期待される関数が呼び出せれているかを検証する。

今回のエントリではServer側のテストをJUnitとKotlinを用いてまとめていく。

テスト対象のproto

テスト対象のprotoは次のとおりSimple-RPCとする。

service TaskService {
  rpc GetTaskService (TaskInbound) returns (TaskOutbound) {
    option (google.api.http) = {
      get: "/v1/task"
    };
  }
}

message TaskInbound {
  uint32 task_id = 1;
}

message TaskOutbound {
  uint32 task_id = 1;
  string title = 2;
  string finishedAt = 3;
  string createdAt = 4;
  string updatedAt = 5;
}

テストするgRPC Serverとテスト内容

テスト対象のServerのコードは次のとおりである

override fun getTaskService(request: TaskInbound?, responseObserver: StreamObserver<TaskOutbound>?) {
    try {
        val (taskId) = GRpcInboundValidator.validTaskInbound(request)

        val log = GRpcLogContextHandler.getLog()
        log.elem { "taskId" to taskId }

        val task = getTaskService(GetTaskCommand(taskId.toLong()))
        val msg = getOutbound(task)
        responseObserver?.onNext(msg)
        responseObserver?.onCompleted()
    } catch (e: WebAppException.NotFoundException) {
        logger.error { "gRPC server error, task not found." }
        responseObserver?.onError(
                Status.NOT_FOUND.withDescription("task not found.").asRuntimeException())
    } catch (e: WebAppException.BadRequestException) {
        logger.error { "gRPC server error, invalid request." }
        responseObserver?.onError(
                Status.INVALID_ARGUMENT.withDescription("invalid request.").asRuntimeException())
    }
}
  • taskId: IntをパラメータにとるTaskBackendServerで TaskBackendServer#getTaskServiceはタスクを1件返す。
  • GRpcLogContextHandlerはリクエストパラメータをログ出力するためにio.grpc.Contextにログ情報を詰めこむ。
  • TaskBackendServer#getTaskServiceの中でエラーが発生した場合は、エラーに応じたgRPC Serverレスポンスを返す。

テスト内容

テストする内容を次のようにまとめる。

  • TaskBackendServer#getTaskServiceを呼び出すとタスクが1件取得できるか。
  • GRpcLogContextHandler.getLog()は正常に呼び出されているか。
  • TaskBackendServer#getTaskServiceの中で発生したエラーに応じたgRPC Serverレスポンスが返ってくるか。

テストコード

次にテストコードである。

先述したとおりテストコードのなかでServerを起動させる。そして起動しているServerにテスト対象のgRPC Sereverをアサインする。コードとしては次のようになる。

@Before
fun setUp() {
    getTaskService = mock(GetTaskServiceImpl::class)
    // 一部省略

    target = TaskBackendServer(getTaskService, getTaskListService, createTaskService, updateTaskService,
            deleteTaskService, finishTaskService)
    inProcessServer = InProcessServerBuilder
            .forName(UNIQUE_SERVER_NAME).addService(target).directExecutor().build()
    inProcessChannel = InProcessChannelBuilder.forName(UNIQUE_SERVER_NAME).directExecutor().build()

    inProcessServer.start()
}

@After
fun tearDown() {
    inProcessChannel.shutdownNow()
    inProcessServer.shutdownNow()
}
  • @BeforeInProcessServerBuilderを使いServerをビルドアップしている。ビルドしたServerに TaskBackendServeraddService関数を使いアサインする。(addService(target)
  • InProcessServerBuilderで起動したServerは UNIQUE_SERVER_NAMEという名称をつけている。このServerNameをInProcessChannelBuilderでビルドアップするChannelに関連付ける。
  • ビルドアップしたChannel(inProcessChannel)をテストコードでブロッキングすることでgRPC Serverのレスポンスを受け取ることができる。

正常系のテスト

次のコードは正常系をテストしたコードである。

@Test
fun getProducts_onCompleted() {

    val taskId = 1L
    val request = TaskInbound.newBuilder()
            .setTaskId(taskId.toInt())
            .build()

    val command = GetTaskCommand(taskId)
    val now = LocalDateTime.now()
    val task = Task(taskId.toInt(), "mocked Task", now, now, now)

    val log = GRpcLogBuilder()

    // mock
    mockStatic(GRpcLogContextHandler::class)
    Mockito.`when`(GRpcLogContextHandler.getLog()).thenReturn(log)
    Mockito.`when`(getTaskService(command)).thenReturn(task)

    // request server
    val blockingStub = TaskServiceGrpc.newBlockingStub(inProcessChannel)
    val actual = blockingStub.getTaskService(request) // ブロッキングしてgRPC Serverのレスポンスを受け取る

    // assertion
    actual.taskId shouldBe 1
    actual.title shouldBe "mocked Task"
}
  • このテストコードでは期待したタスクがTaskBackendServerから返ってきているか、GRpcLogContextHandler.getLog()が呼び出せれているかを検証している
  • TaskBackendServer#getTaskService内では GetTaskServiceImpl#invokeを呼び出しタスクを1件取得している。この処理をモック化することでテストコード内のgRPC Serverの挙動をコントロールしている。

異常系のテスト

次のコードは異常系をテストしたコードである。

@Test
fun getProducts_NOT_FOUND() {

    val taskId = 1L
    val request = TaskInbound.newBuilder().setTaskId(taskId.toInt()).build()

    val command = GetTaskCommand(taskId)

    // mock
    mockStatic(GRpcLogContextHandler::class)
    Mockito.`when`(GRpcLogContextHandler.getLog()).thenReturn(GRpcLogBuilder())
    Mockito.`when`(getTaskService(command)).thenThrow(WebAppException.NotFoundException("not found"))

    try {
        // request server
        val blockingStub = TaskServiceGrpc.newBlockingStub(inProcessChannel)
        blockingStub.getTaskService(request)
    } catch (e: StatusRuntimeException) {
        // assertion
        e.status.code shouldBe Status.NOT_FOUND.code
        e.message shouldBe "NOT_FOUND: task not found."
    }
}

@Test
fun getProducts_INVALID_ARGUMENT() {

    val taskId = 0L
    val request = TaskInbound.newBuilder().setTaskId(taskId.toInt()).build()

    try {
        // request server
        val blockingStub = TaskServiceGrpc.newBlockingStub(inProcessChannel)
        blockingStub.getTaskService(request)
    } catch (e: StatusRuntimeException) {
        // assertion
        e.status.code shouldBe Status.INVALID_ARGUMENT.code
        e.message shouldBe "INVALID_ARGUMENT: invalid request."
    }
}
  • タスクが存在しない(NotFound)、リクエストパラメータが不整合(INVALID_ARGUMENT)のテストコードである。
  • responseObserveronErrorにエラーがセットされるとStatusRuntimeExceptionが発生する。テストコードでそれをキャッチしエラーコードとエラーメッセージを検証している。

まとめ

  • gRPC Serverのテストコードをまとめた。
  • テストコード内でgRPC Serverを起動させる方法とテストコードでChannel をブロッキングしgRPC Serverのレスポンスを受け取る方法を紹介した。
  • 必要に応じてモック化することでgRPC Serverのテストカバレッジを向上させることができる。
  • 次回のエントリではgRPC Clientのテスト方法をまとめていく。

コード

エントリで紹介したコードは一部分のためコード全体はgithubを参照してください。

github.com

テストコードはこちらです。

Kotlin コルーチンでasync/awaitをつかってgRPC Serverをリクエストしてみた

Kotlin1.1からの新機能であるコルーチン(Coroutines)を試していきたい。Kotlin コルーチンをつかえば非同期処理を同期的なコードで書けるし、非同期処理をブロッキングすることもシンプルなコードで書ける。
今回はgRPC Serverへのリクエスト部分をコルーチンをつかい非同期化させてみたのでコードをまとめていく。

Serverへのリクエストは非同期化するが処理結果や例外処理をキャッチしたい

メインのリクエストは非同期化するがgRPC Serverのレスポンスや例外処理を呼び出し元でどのようにすれば受け取れるだろうか。この場合にはコルーチンの結果を返すことができるasyncをつかいawaitで処理を中断してコルーチンから値を取り出す。

suspend fun getTask(taskId: Long): TaskOutbound =
        async(CommonPool) {
            try {
                val outbound = ShutdownLoan.using(getChannel(), { channel ->
                    val msg = TaskInbound.newBuilder().setTaskId(taskId.toInt()).build()
                    TaskServiceGrpc.newBlockingStub(channel).getTaskService(msg)
                })
                Result.Success<TaskOutbound, GrpcException>(outbound)
            } catch (e: Exception) {
                val status = Status.fromThrowable(e)
                logger.error(e) { "gRPC server error, code:{%d}, description:{%s}".format(status.code.value(), status.description) }
                Result.Failure<TaskOutbound, GrpcException>(status with status.description)
            }
        }.await().fold({ it }, { throw it })

非同期化する前のコードとの差分は次のようになっている。

-    fun getTask(taskId: Long): TaskOutbound =
-            try {
-                ShutdownLoan.using(getChannel(), { channel ->
-                    val msg = TaskInbound.newBuilder().setTaskId(taskId.toInt()).build()
-                    TaskServiceGrpc.newBlockingStub(channel).getTaskService(msg)
-                })
-            } catch (e: Exception) {
-                val status = Status.fromThrowable(e)
-                logger.error(e) { "gRPC server error, code:{%d}, description:{%s}".format(status.code.value(), status.description) }
-                throw status with status.description
-            }
-
+    suspend fun getTask(taskId: Long): TaskOutbound =
+            async(CommonPool) {
+                try {
+                    val outbound = ShutdownLoan.using(getChannel(), { channel ->
+                        val msg = TaskInbound.newBuilder().setTaskId(taskId.toInt()).build()
+                        TaskServiceGrpc.newBlockingStub(channel).getTaskService(msg)
+                    })
+                    Result.Success<TaskOutbound, GrpcException>(outbound)
+                } catch (e: Exception) {
+                    val status = Status.fromThrowable(e)
+                    logger.error(e) { "gRPC server error, code:{%d}, description:{%s}".format(status.code.value(), status.description) }
+                    Result.Failure<TaskOutbound, GrpcException>(status with status.description)
+                }
+            }.await().fold({ it }, { throw it })
  • getTask関数の先頭にsuspendキーワードがついている。この関数内でコルーチンの処理結果を受け取りるため await()をつけた。非同期処理の処理結果を受け取るということは処理を中断することになる。コルーチンの処理を中断することができるSuspending functionsgetTask関数に加えた。
  • asyncブロックは先のとおり値を返すコルーチンにするためである。
  • asyncブロック内にgRPC Serverのリクエスト処理がある。タスクを1件取得する処理が記述されている。修正前のコードは例外が起きた場合、throwしていたが修正後はcom.github.kittinunf.result.Resultを使ってResultオブジェクトを返却するコードにした。こうするとasyncブロックをtry-catchで囲む必要がなくなるので可読性があがることを期待してやってみた。 { throw it }節でコルーチンで起きた例外を受け取り必要な処理を挟むことができる。

コルーチンの呼び出し元でブロッキングする

getTask関数はsuspendキーワードがついているため呼び出し元でrunBlockingブロックを加えブロッキングをしている。

変更コードの差分は次のとおりである。

     fun fetchByTaskId(req: ServerRequest) = ok().json().body(
-            Mono.just(TaskModel(taskBackendClient.getTask(req.pathVariable("id").toLong()))))
+            runBlocking {
+                Mono.just(TaskModel(taskBackendClient.getTask(req.pathVariable("id").toLong())))
+            })

まとめ

  • コルーチンを試して非同期処理を導入できた。
  • サンプルのコードではawait()を1度しか使っていないが複数の非同期処理をasyncで呼び出し用途にあわせてawaitを使って直列に処理させたり並列に処理させることができる。
  • 一見、非同期処理を複雑な印象を与えコードの可読性を下げる懸念を与えるがKotlinコルーチンが素晴らしいのは同期的なシンプルなコードでこれを実現できることにある。

コード

エントリでは一部コードの抜粋になっているがgithubに全てのコードを参照できるので参考にしてほしい。

github.com

Kotlin + gRPCでio.grpc.Contextをつかってログ出力を横断処理してみた

アスペクト指向プログラミング(AOP)をgRPC Serverではどう扱うか考えていきたい。 横断的な関心事といえば認証やログ出力である。

  • gRPCクライアントのメタデータに認証トークンを添えてリクエストを行いgRPC Serverのinterceptorで横断的に認証を完了させる。そしてメインのServer処理で認証したユーザ情報をコンテキストから取り出す。
  • gRPC Serverで起きたことをログに詰め込みコンテキストに乗せてinterceptorで横断的にログ出力を行う。

これらを実現するのがio.grpc.Contextである。

Context (grpc-all 1.5.0 API)

今回のエントリではio.grpc.ContextをつかいgRPC Serverのログ出力を横断的にする方法をまとめていく。

ContextHandlerをつくる

Contextからログをセットしたり取り出したりするHandlerを用意する。

object GRpcLogContextHandler {

    private val GRPC_LOG: Context.Key<GRpcLogBuilder> = Context.key("GRPC_LOG")

    @JvmStatic
    fun setLog(ctx: Context, log: GRpcLogBuilder) = ctx.withValue(GRPC_LOG, log)

    @JvmStatic
    fun getLog() =
            try {
                GRPC_LOG.get()
            } catch (e: Exception) {
                GRpcLogBuilder({
                    name { "UnknownName" }
                })
            }
}

GRpcLogContextHandlerが提供するsetLoggetLogをgRPC Serverから操作してログオブジェクトを取り出しデータを詰め込む。

ServerInterceptorをつくる

GRpcLogContextHandlerをContextに含めたServerInterceptorを用意する。

@Component
class GRpcLogInterceptor : ServerInterceptor {

    private val logger = KotlinLogging.logger {}

    override fun <ReqT : Any?, RespT : Any?> interceptCall(call: ServerCall<ReqT, RespT>?, headers: Metadata?, next: ServerCallHandler<ReqT, RespT>?): ServerCall.Listener<ReqT> {

        val serverName = call?.methodDescriptor?.fullMethodName!!.replace("/", ".")

        val serverCall = object : ForwardingServerCall.SimpleForwardingServerCall<ReqT, RespT>(call) {

            override fun close(status: Status?, trailers: Metadata?) {

                val log = GRpcLogContextHandler.getLog()

                if (status!!.isOk())
                    log.success { true }

                try {
                    logger.info { log.build().toString() }
                } catch (e: Exception) {
                    logger.warn { "GRpcLogger is not set with %s".format(serverName) }
                }

                super.close(status, trailers)
            }
        }

        val log = GRpcLogBuilder({
            name { serverName }
            remoteAddr { call.attributes.get(Grpc.TRANSPORT_ATTR_REMOTE_ADDR).toString() }
        })
        val ctx = GRpcLogContextHandler.setLog(Context.current(), log)

        return Contexts.interceptCall(ctx, serverCall, headers, next)
    }
}
  • gRPC Serverのcloseメソッドをオーバライドしたval serverCallを用意する。このServerCallはclose処理でContextHandlerからログオブジェクトを取り出しログを出力をしてくれる。
  • ContextHandlerにログオブジェクトをセットしたval ctxを用意する。
  • 最後にContexts.interceptCall(ctx, serverCall, headers, next)とすることで初期化されたログオブジェクトをコンテキストに乗せることができる。
  • このGRpcLogInterceptorはログオブジェクトを初期化してコンテキストに載せること、そしてcloseするときにログ出力をする2つの横断的な処理を行ってくれる。

gRPC Serverでログを詰め込む

gRPC Serverを一部抜粋したコードが次のとおりである。

override fun updateTaskService(request: UpdateTaskInbound?, responseObserver: StreamObserver<TaskOutbound>?) {
    try {
        val (taskId, title) = GRpcInboundValidator.validUpdateTaskInbound(request)

        val log = GRpcLogContextHandler.getLog()
        log.elem { "taskId" to taskId }
        log.elem { "title" to title }

        val task = updateTaskService(UpdateTaskCommand(taskId.toLong(), title))
        val msg = getOutbound(task)
        responseObserver?.onNext(msg)
        responseObserver?.onCompleted()
    } catch (e: WebAppException.NotFoundException) {
        logger.error { "gRPC server error, task not found." }
        responseObserver?.onError(
                Status.NOT_FOUND.withDescription("task not found.").asRuntimeException())
    } catch (e: WebAppException.BadRequestException) {
        logger.error { "gRPC server error, invalid request." }
        responseObserver?.onError(
                Status.INVALID_ARGUMENT.withDescription("invalid request.").asRuntimeException())
    }
}

上記のうち次の箇所でContextHandlerからログオブジェクトを取り出しログを詰め込んでいることが分かる。

 val log = GRpcLogContextHandler.getLog()
 log.elem { "taskId" to taskId }
 log.elem { "title" to title }

このgRPC Serverは前述したGRpcLogInterceptorがリクエストを前段でインターセプトしているためServerとGRpcLogInterceptorで共通のログオブジェクトを取り出し操作することができている。

まとめ

  • io.grpc.ContextをつかったgRPC Serverの横断的な関心事の処理方法をまとめた
  • 今回はログ出力を取り上げたがユーザ認証も同様にインタセプターで認証をしてContextHandlerに認証ユーザを詰め込みgRPC Serverで取り出すことになる。

コード

エントリでは一部コードの抜粋になっているがgithubに全てのコードを参照できるので参考にしてほしい。

github.com

Kotlin + gRPCでdropwizard/metricsをつかってメトリクスを取得してみた

今回のエントリはKotlin + gRPC(FWはSpringBoot 2.0.0.M1)のアプリケーションでgRPCのリクエストタイムやエラー回数などのメトリクスを計測する方法をまとめていく。

dropwizard/metrics

メトリクス計測のライブラリには dropwizard/metricsをつかってみた。

github.com

Getting Started | Metrics

gRPCリクエストのメトリクスを計測していきたいのでMetersTimersを使い、gRPCリクエストのエラー回数とgRPCリクエストのレスポンスタイムをそれぞれ計測していく。

ログの出力形式にはSlf4jReporterを使って、メトリクスをログファイルへ出力していく。

gRPC Severにインタセプターを追加する

メトリクスの計測ポイントをつくるためにgRPC Serverのリクエストをインターセプトしたい。
io.grpc.ServerInterceptorを実装した MetricsInterceptorを準備してServerBuilderにgRPC Serverに追加するときにServerInterceptorも添えてあげるとよい。
コードとしては次のようになる。

MetricsInterceptor

override fun <ReqT : Any?, RespT : Any?> interceptCall(call: ServerCall<ReqT, RespT>?, headers: Metadata?, next: ServerCallHandler<ReqT, RespT>?): ServerCall.Listener<ReqT> {

    val timer = metricRegistry.timer(metricName(REQUEST_TIME, call?.methodDescriptor?.fullMethodName!!.replace("/", "."))).time()

    val serverCall = object : ForwardingServerCall.SimpleForwardingServerCall<ReqT, RespT>(call) {

        override fun close(status: Status?, trailers: Metadata?) {
            val errorMeter = metricRegistry.meter(metricName(ERROR_METRIC, methodDescriptor.fullMethodName.replace("/", ".")))
            if (!status!!.isOk()) {
                errorMeter.mark()
                logger.error { "An error occured with %s".format(call.methodDescriptor) }
            }
            timer.stop()
            super.close(status, trailers)
        }
    }

    return next?.startCall(serverCall, headers)!!
}

SimpleForwardingServerCallクラスを実装し、 closeをoverrideしている。
ここでMetersTimersのメトリクス計測ポイントを追加している。

ServerBuilderにgRPC Serverに追加するときにServerInterceptorも添えてあげる

getBeanNamesByTypeWithAnnotation(GRpcService::class).subscribe {
    name ->
    val server = applicationContext.beanFactory.getBean(name, BindableService::class) as BindableService
    val service = server.bindService()
    serverBuilder.addService(ServerInterceptors.intercept(service, metricsInterceptor))
    logger.info { "$name service has been registered." }
}

余談だが、1つのjarにHTTP ServerとgRPC Serverを載せられる便利なLogNet/grpc-spring-boot-starterというライブラリがある。
このライブラリはjava版でありSpringBoot 2.0.0.M1のバージョンでは動作保証されていないので、今回のエントリを期に参考にしながらKotlinで書き換えてみた。

メトリクスを見てみよう

ログの出力形式には[Slf4jReporter]をつかったので指定したログファイルにメトリクスが出力されていることが確認できた。

[2017-07-12 03:06:00.881] type=METER, name=server.error.messages.TaskService.GetTaskListService, count=0, mean_rate=0.0, m1=0.0, m5=0.0, m15=0.0, rate_unit=events/second
[2017-07-12 03:06:00.887] type=TIMER, name=server.request.time.messages.TaskService.GetTaskListService, count=5, min=9.169273, max=226.930993, mean=53.95158430282904, stddev=84.29036619882183, median=12.679804, p75=17.87868, p95=226.930993, p98=226.930993, p99=226.930993, p999=226.930993, mean_rate=0.008607979785884044, m1=5.629057078531829E-5, m5=0.11817160428031309, m15=0.4228910681867675, rate_unit=events/second, duration_unit=milliseconds

ログ出力は10分ごとに繰り返し行われるように設定した。

まとめ

  • dropwizard/metricsを使いgRPC Serverに計測ポイントをついかしてメトリクスを計測することができた。
  • io.grpc.ServerInterceptorを実装したので全てのgRPC Serverに導入すればメトリクスを横断して計測することができる。
  • ログ出力形式にはCSV形式も選択できる。fluentdでCSV形式のログを収集してelasticsearchへ流しkibanaでモニタリングやその他のグラフツールへの連携も容易に行えるので試してみたい。

コード

動くコードをgithubに置いているので参照してください。

github.com

今回の導入の差分もあります。他の修正内容も入っていて分かりづらいですが、よければどうぞ。

ADD grpc-metrics by nsoushi · Pull Request #7 · nsoushi/spring5-kotlin-application · GitHub

Kotlin + gradleでgRPCプロトコル定義ファイル(.proto)のvendoringにprotodepをつかってみた

gRPCのプロトコル定義ファイル(.proto)の管理を考えていきたい。gRPCを導入するプロジェクトであれば.protoファイルの運用方法は課題である。

プロトコル定義ファイル(.proto)の運用課題

gRPCはプロトコル定義ファイル(.proto)から生成したプログラムをもとにサーバとクライアントに分かれ実装していくことになる。Microservicesではサーバとクライアントが別レポジトリに分かれ開発を行うことが多い。それぞれのレポジトリではマスタの.protoファイルを何処に置いて参照すれば良いだろうか。この課題は言語が違うクライアントとサーバ間でも同じことが言えるだろう。

そしてもう1つの課題は.protoファイルのバージョン管理だ。開発中のプロトコル定義に最新のプロトコル定義を干渉させたくない。開発中はクライアントとサーバは同じ世代のプロトコル定義を参照していることを再現しなくてはならない。

このような課題を解決してくれるのが protodepだ。

github.com

protodepはプロトコル定義ファイル(.proto)のvendoringツールである。
今回のエントリでは protodepをGradleビルドシステムに導入した内容をまとめていく。

※protodepの使い方はREADMEにあるので合わせて参照してください。

protodepをGradleタスクに追加する

※ 開発するプロジェクトの言語はKotlinを想定

開発中はプロジェクト内の.protoファイルからProtocolBufferを生成する必要がある。protobuf-gradle-pluginを有効にすればProtocolBufferを生成するタスクが追加される。導入方法は grpc/grpc-javaを参照していただきたい。

GitHub - grpc/grpc-java: The Java gRPC implementation. HTTP/2 based RPC

protobuf-gradle-pluginを有効にすると generateProtoのタスクが追加される。このタスクを実行することでプロジェクト内の.protoファイルからProtocolBufferが生成できる。

そしてgenerateProtoのタスクの前にprotodepを実行すればリモートにある.protoファイルのプロトコル定義のバージョンの固定を担保したうえでProtocolBufferが生成できる。 settings.gradleに次のようなタスクを追加した。

task protoDep {
    def identity_file = "id_rsa"
    if (findProperty("identityFile") != null) {
        identity_file = findProperty("identityFile")
    }
    def forceUpdate = ""
    if (findProperty("forceUpdate") != null || findProperty("forceUpdate") == true) {
        forceUpdate = "-f"
    }
    println forceUpdate
    doLast() {
        exec {
            commandLine = ["protodep", forceUpdate, "--identity-file=${identity_file}", "up"]
        }
    }
}

tasks.whenTaskAdded { task ->
    if (task.name == "generateProto" && !project.hasProperty("IGNORE_PROTO_DEP")) {
        task.dependsOn protoDep
    }
}

複数人で開発することを想定して identityFileのgradle propertyを追加している。個人の認証ファイルをビルド時に指定できる。

実行した結果は次のようになる。

$ ./gradlew clean generateProto build

:cleanProtoGen
:clean
:extractIncludeProto
:extractProto UP-TO-DATE
:protoDep
[INFO] force update = false
[INFO] identity file = id_rsa
[INFO] use SSH protocol
[INFO] Getting github.com/nsoushi/spring5-kotlin-application-proto
:generateProto
:processResources
・・・

generateProtoのタスクの前に protoDepのタスクが実行され.protoファイルをvendoringしてくれる。

.protoファイルをアップデートしたい場合は次のように実行する。

$ ./gradlew clean generateProto build -PforceUpdate=true

protodepの-fオプションを有効にしてくれる。
リモートの.protoファイルをProtocolBufferの生成するときに固定するか更新するか選択することができる。

まとめ

  • プロトコル定義ファイル(.proto)の運用課題をgradleのタスクに追加する方法と運用イメージをまとめた。
  • generateProtoタスクの前に組み込むことにより protoDepを意識することなく.protoファイルのvendoringが行うことができた。

コード

settings.gradlegithubから参照できます。

github.com

ALBのアクセスログからAPIのレスポンスタイムを監視する

サービスのAPIのレスポンスタイムを監視することは運用において大事なことである。APIのレスポンスタイムを計測してモニタリングする方法は多様に考えられるが、このエントリでは次の図のようにALBのアクセスログをfluentdが収集してモニタリングツールと連携させることでAPIのレスポンスタイムを監視を実現していく。

f:id:n_soushi:20170629104620p:plain

  • ALBのアクセスログを有効にすると指定したS3のバケットにログファイルが貯まる
  • FluentdはS3へアクセスしログを収集する
  • 収集したログをElasticSearchへ転送しKibanaでモニタリング
  • 収集したログを解析し収集インターバル中のアクセスのうち最大レスポンスタイムや平均レスポンスタイムを統計しMackerelへメトリックを投稿
  • Mackerelでレスポンスタイムが閾値を超えた場合のアラート体制を整える

上記の構成はログのモニタリングと合わせてレスポンスタイムが閾値を超えた場合の通知を実現している。
また、ALBのアクセスログにはリクエストURLやUserAgentなどLBを通っているため監視に適した情報が揃っている。
モニタリングはkibanaで行うのでデータの集計やグラフを組み合わせてログ監視のダッシュボードを作れる。

そして、fluentdのプラグインを組み合わせれば、簡単にこの構成を実現できる。
利用したプラグインを一覧する。

どれも提示されている標準の使い方で実現できる。
詳細なfluentdの設定はgithubに置いているので参照いただきたい。

GitHub - nsoushi/alb-latency-collector: This repository contains fluentd setting for monitoring ALB latency.

1点、ログ監視の注意点として挙げておきたいことがある。
Fluentdで転送したログをLogstash形式でElasticsearchへインデキシングをしているときにALBのログタイムスタンプ(timeフィールド)を @timestampに置き換える必要がある。障害でログが転送されない状況から再開したときにログ転送の時刻を計測してしまうと実際のALBのログタイムスタンプと差異が生まれてしまう。これを予防するのが次の設定になる。

<match api.access_log>
  type copy

  <store>
    type record_reformer
    enable_ruby true
    tag api.access_log.es
    <record>
    @timestamp ${record['time']}
    </record>
  </store>

record_reformerでALBの timeフィールドをLogstashの @timestampへ置き換えている。

まとめ

  • Fluentdのプラグインを集めてALBのアクセスログを監視することができた。
  • その他のALBがあれば同様にFluentdの設定を追加することで柔軟にAPIのレスポンスタイム監視を増やすことができる。

コード

設定値を変更すればdocker-composeで動作確認を行えるコードを用意したので試したい方はgithubを参照してください。

github.com