Kotlin + gRPCでio.grpc.Contextをつかってログ出力を横断処理してみた

アスペクト指向プログラミング(AOP)をgRPC Serverではどう扱うか考えていきたい。 横断的な関心事といえば認証やログ出力である。

  • gRPCクライアントのメタデータに認証トークンを添えてリクエストを行いgRPC Serverのinterceptorで横断的に認証を完了させる。そしてメインのServer処理で認証したユーザ情報をコンテキストから取り出す。
  • gRPC Serverで起きたことをログに詰め込みコンテキストに乗せてinterceptorで横断的にログ出力を行う。

これらを実現するのがio.grpc.Contextである。

Context (grpc-all 1.5.0 API)

今回のエントリではio.grpc.ContextをつかいgRPC Serverのログ出力を横断的にする方法をまとめていく。

ContextHandlerをつくる

Contextからログをセットしたり取り出したりするHandlerを用意する。

object GRpcLogContextHandler {

    private val GRPC_LOG: Context.Key<GRpcLogBuilder> = Context.key("GRPC_LOG")

    @JvmStatic
    fun setLog(ctx: Context, log: GRpcLogBuilder) = ctx.withValue(GRPC_LOG, log)

    @JvmStatic
    fun getLog() =
            try {
                GRPC_LOG.get()
            } catch (e: Exception) {
                GRpcLogBuilder({
                    name { "UnknownName" }
                })
            }
}

GRpcLogContextHandlerが提供するsetLoggetLogをgRPC Serverから操作してログオブジェクトを取り出しデータを詰め込む。

ServerInterceptorをつくる

GRpcLogContextHandlerをContextに含めたServerInterceptorを用意する。

@Component
class GRpcLogInterceptor : ServerInterceptor {

    private val logger = KotlinLogging.logger {}

    override fun <ReqT : Any?, RespT : Any?> interceptCall(call: ServerCall<ReqT, RespT>?, headers: Metadata?, next: ServerCallHandler<ReqT, RespT>?): ServerCall.Listener<ReqT> {

        val serverName = call?.methodDescriptor?.fullMethodName!!.replace("/", ".")

        val serverCall = object : ForwardingServerCall.SimpleForwardingServerCall<ReqT, RespT>(call) {

            override fun close(status: Status?, trailers: Metadata?) {

                val log = GRpcLogContextHandler.getLog()

                if (status!!.isOk())
                    log.success { true }

                try {
                    logger.info { log.build().toString() }
                } catch (e: Exception) {
                    logger.warn { "GRpcLogger is not set with %s".format(serverName) }
                }

                super.close(status, trailers)
            }
        }

        val log = GRpcLogBuilder({
            name { serverName }
            remoteAddr { call.attributes.get(Grpc.TRANSPORT_ATTR_REMOTE_ADDR).toString() }
        })
        val ctx = GRpcLogContextHandler.setLog(Context.current(), log)

        return Contexts.interceptCall(ctx, serverCall, headers, next)
    }
}
  • gRPC Serverのcloseメソッドをオーバライドしたval serverCallを用意する。このServerCallはclose処理でContextHandlerからログオブジェクトを取り出しログを出力をしてくれる。
  • ContextHandlerにログオブジェクトをセットしたval ctxを用意する。
  • 最後にContexts.interceptCall(ctx, serverCall, headers, next)とすることで初期化されたログオブジェクトをコンテキストに乗せることができる。
  • このGRpcLogInterceptorはログオブジェクトを初期化してコンテキストに載せること、そしてcloseするときにログ出力をする2つの横断的な処理を行ってくれる。

gRPC Serverでログを詰め込む

gRPC Serverを一部抜粋したコードが次のとおりである。

override fun updateTaskService(request: UpdateTaskInbound?, responseObserver: StreamObserver<TaskOutbound>?) {
    try {
        val (taskId, title) = GRpcInboundValidator.validUpdateTaskInbound(request)

        val log = GRpcLogContextHandler.getLog()
        log.elem { "taskId" to taskId }
        log.elem { "title" to title }

        val task = updateTaskService(UpdateTaskCommand(taskId.toLong(), title))
        val msg = getOutbound(task)
        responseObserver?.onNext(msg)
        responseObserver?.onCompleted()
    } catch (e: WebAppException.NotFoundException) {
        logger.error { "gRPC server error, task not found." }
        responseObserver?.onError(
                Status.NOT_FOUND.withDescription("task not found.").asRuntimeException())
    } catch (e: WebAppException.BadRequestException) {
        logger.error { "gRPC server error, invalid request." }
        responseObserver?.onError(
                Status.INVALID_ARGUMENT.withDescription("invalid request.").asRuntimeException())
    }
}

上記のうち次の箇所でContextHandlerからログオブジェクトを取り出しログを詰め込んでいることが分かる。

 val log = GRpcLogContextHandler.getLog()
 log.elem { "taskId" to taskId }
 log.elem { "title" to title }

このgRPC Serverは前述したGRpcLogInterceptorがリクエストを前段でインターセプトしているためServerとGRpcLogInterceptorで共通のログオブジェクトを取り出し操作することができている。

まとめ

  • io.grpc.ContextをつかったgRPC Serverの横断的な関心事の処理方法をまとめた
  • 今回はログ出力を取り上げたがユーザ認証も同様にインタセプターで認証をしてContextHandlerに認証ユーザを詰め込みgRPC Serverで取り出すことになる。

コード

エントリでは一部コードの抜粋になっているがgithubに全てのコードを参照できるので参考にしてほしい。

github.com