Kotlin + gRPCでio.grpc.Contextをつかってログ出力を横断処理してみた

アスペクト指向プログラミング(AOP)をgRPC Serverではどう扱うか考えていきたい。 横断的な関心事といえば認証やログ出力である。

  • gRPCクライアントのメタデータに認証トークンを添えてリクエストを行いgRPC Serverのinterceptorで横断的に認証を完了させる。そしてメインのServer処理で認証したユーザ情報をコンテキストから取り出す。
  • gRPC Serverで起きたことをログに詰め込みコンテキストに乗せてinterceptorで横断的にログ出力を行う。

これらを実現するのがio.grpc.Contextである。

Context (grpc-all 1.5.0 API)

今回のエントリではio.grpc.ContextをつかいgRPC Serverのログ出力を横断的にする方法をまとめていく。

ContextHandlerをつくる

Contextからログをセットしたり取り出したりするHandlerを用意する。

object GRpcLogContextHandler {

    private val GRPC_LOG: Context.Key<GRpcLogBuilder> = Context.key("GRPC_LOG")

    @JvmStatic
    fun setLog(ctx: Context, log: GRpcLogBuilder) = ctx.withValue(GRPC_LOG, log)

    @JvmStatic
    fun getLog() =
            try {
                GRPC_LOG.get()
            } catch (e: Exception) {
                GRpcLogBuilder({
                    name { "UnknownName" }
                })
            }
}

GRpcLogContextHandlerが提供するsetLoggetLogをgRPC Serverから操作してログオブジェクトを取り出しデータを詰め込む。

ServerInterceptorをつくる

GRpcLogContextHandlerをContextに含めたServerInterceptorを用意する。

@Component
class GRpcLogInterceptor : ServerInterceptor {

    private val logger = KotlinLogging.logger {}

    override fun <ReqT : Any?, RespT : Any?> interceptCall(call: ServerCall<ReqT, RespT>?, headers: Metadata?, next: ServerCallHandler<ReqT, RespT>?): ServerCall.Listener<ReqT> {

        val serverName = call?.methodDescriptor?.fullMethodName!!.replace("/", ".")

        val serverCall = object : ForwardingServerCall.SimpleForwardingServerCall<ReqT, RespT>(call) {

            override fun close(status: Status?, trailers: Metadata?) {

                val log = GRpcLogContextHandler.getLog()

                if (status!!.isOk())
                    log.success { true }

                try {
                    logger.info { log.build().toString() }
                } catch (e: Exception) {
                    logger.warn { "GRpcLogger is not set with %s".format(serverName) }
                }

                super.close(status, trailers)
            }
        }

        val log = GRpcLogBuilder({
            name { serverName }
            remoteAddr { call.attributes.get(Grpc.TRANSPORT_ATTR_REMOTE_ADDR).toString() }
        })
        val ctx = GRpcLogContextHandler.setLog(Context.current(), log)

        return Contexts.interceptCall(ctx, serverCall, headers, next)
    }
}
  • gRPC Serverのcloseメソッドをオーバライドしたval serverCallを用意する。このServerCallはclose処理でContextHandlerからログオブジェクトを取り出しログを出力をしてくれる。
  • ContextHandlerにログオブジェクトをセットしたval ctxを用意する。
  • 最後にContexts.interceptCall(ctx, serverCall, headers, next)とすることで初期化されたログオブジェクトをコンテキストに乗せることができる。
  • このGRpcLogInterceptorはログオブジェクトを初期化してコンテキストに載せること、そしてcloseするときにログ出力をする2つの横断的な処理を行ってくれる。

gRPC Serverでログを詰め込む

gRPC Serverを一部抜粋したコードが次のとおりである。

override fun updateTaskService(request: UpdateTaskInbound?, responseObserver: StreamObserver<TaskOutbound>?) {
    try {
        val (taskId, title) = GRpcInboundValidator.validUpdateTaskInbound(request)

        val log = GRpcLogContextHandler.getLog()
        log.elem { "taskId" to taskId }
        log.elem { "title" to title }

        val task = updateTaskService(UpdateTaskCommand(taskId.toLong(), title))
        val msg = getOutbound(task)
        responseObserver?.onNext(msg)
        responseObserver?.onCompleted()
    } catch (e: WebAppException.NotFoundException) {
        logger.error { "gRPC server error, task not found." }
        responseObserver?.onError(
                Status.NOT_FOUND.withDescription("task not found.").asRuntimeException())
    } catch (e: WebAppException.BadRequestException) {
        logger.error { "gRPC server error, invalid request." }
        responseObserver?.onError(
                Status.INVALID_ARGUMENT.withDescription("invalid request.").asRuntimeException())
    }
}

上記のうち次の箇所でContextHandlerからログオブジェクトを取り出しログを詰め込んでいることが分かる。

 val log = GRpcLogContextHandler.getLog()
 log.elem { "taskId" to taskId }
 log.elem { "title" to title }

このgRPC Serverは前述したGRpcLogInterceptorがリクエストを前段でインターセプトしているためServerとGRpcLogInterceptorで共通のログオブジェクトを取り出し操作することができている。

まとめ

  • io.grpc.ContextをつかったgRPC Serverの横断的な関心事の処理方法をまとめた
  • 今回はログ出力を取り上げたがユーザ認証も同様にインタセプターで認証をしてContextHandlerに認証ユーザを詰め込みgRPC Serverで取り出すことになる。

コード

エントリでは一部コードの抜粋になっているがgithubに全てのコードを参照できるので参考にしてほしい。

github.com

Kotlin + gRPCでdropwizard/metricsをつかってメトリクスを取得してみた

今回のエントリはKotlin + gRPC(FWはSpringBoot 2.0.0.M1)のアプリケーションでgRPCのリクエストタイムやエラー回数などのメトリクスを計測する方法をまとめていく。

dropwizard/metrics

メトリクス計測のライブラリには dropwizard/metricsをつかってみた。

github.com

Getting Started | Metrics

gRPCリクエストのメトリクスを計測していきたいのでMetersTimersを使い、gRPCリクエストのエラー回数とgRPCリクエストのレスポンスタイムをそれぞれ計測していく。

ログの出力形式にはSlf4jReporterを使って、メトリクスをログファイルへ出力していく。

gRPC Severにインタセプターを追加する

メトリクスの計測ポイントをつくるためにgRPC Serverのリクエストをインターセプトしたい。
io.grpc.ServerInterceptorを実装した MetricsInterceptorを準備してServerBuilderにgRPC Serverに追加するときにServerInterceptorも添えてあげるとよい。
コードとしては次のようになる。

MetricsInterceptor

override fun <ReqT : Any?, RespT : Any?> interceptCall(call: ServerCall<ReqT, RespT>?, headers: Metadata?, next: ServerCallHandler<ReqT, RespT>?): ServerCall.Listener<ReqT> {

    val timer = metricRegistry.timer(metricName(REQUEST_TIME, call?.methodDescriptor?.fullMethodName!!.replace("/", "."))).time()

    val serverCall = object : ForwardingServerCall.SimpleForwardingServerCall<ReqT, RespT>(call) {

        override fun close(status: Status?, trailers: Metadata?) {
            val errorMeter = metricRegistry.meter(metricName(ERROR_METRIC, methodDescriptor.fullMethodName.replace("/", ".")))
            if (!status!!.isOk()) {
                errorMeter.mark()
                logger.error { "An error occured with %s".format(call.methodDescriptor) }
            }
            timer.stop()
            super.close(status, trailers)
        }
    }

    return next?.startCall(serverCall, headers)!!
}

SimpleForwardingServerCallクラスを実装し、 closeをoverrideしている。
ここでMetersTimersのメトリクス計測ポイントを追加している。

ServerBuilderにgRPC Serverに追加するときにServerInterceptorも添えてあげる

getBeanNamesByTypeWithAnnotation(GRpcService::class).subscribe {
    name ->
    val server = applicationContext.beanFactory.getBean(name, BindableService::class) as BindableService
    val service = server.bindService()
    serverBuilder.addService(ServerInterceptors.intercept(service, metricsInterceptor))
    logger.info { "$name service has been registered." }
}

余談だが、1つのjarにHTTP ServerとgRPC Serverを載せられる便利なLogNet/grpc-spring-boot-starterというライブラリがある。
このライブラリはjava版でありSpringBoot 2.0.0.M1のバージョンでは動作保証されていないので、今回のエントリを期に参考にしながらKotlinで書き換えてみた。

メトリクスを見てみよう

ログの出力形式には[Slf4jReporter]をつかったので指定したログファイルにメトリクスが出力されていることが確認できた。

[2017-07-12 03:06:00.881] type=METER, name=server.error.messages.TaskService.GetTaskListService, count=0, mean_rate=0.0, m1=0.0, m5=0.0, m15=0.0, rate_unit=events/second
[2017-07-12 03:06:00.887] type=TIMER, name=server.request.time.messages.TaskService.GetTaskListService, count=5, min=9.169273, max=226.930993, mean=53.95158430282904, stddev=84.29036619882183, median=12.679804, p75=17.87868, p95=226.930993, p98=226.930993, p99=226.930993, p999=226.930993, mean_rate=0.008607979785884044, m1=5.629057078531829E-5, m5=0.11817160428031309, m15=0.4228910681867675, rate_unit=events/second, duration_unit=milliseconds

ログ出力は10分ごとに繰り返し行われるように設定した。

まとめ

  • dropwizard/metricsを使いgRPC Serverに計測ポイントをついかしてメトリクスを計測することができた。
  • io.grpc.ServerInterceptorを実装したので全てのgRPC Serverに導入すればメトリクスを横断して計測することができる。
  • ログ出力形式にはCSV形式も選択できる。fluentdでCSV形式のログを収集してelasticsearchへ流しkibanaでモニタリングやその他のグラフツールへの連携も容易に行えるので試してみたい。

コード

動くコードをgithubに置いているので参照してください。

github.com

今回の導入の差分もあります。他の修正内容も入っていて分かりづらいですが、よければどうぞ。

ADD grpc-metrics by nsoushi · Pull Request #7 · nsoushi/spring5-kotlin-application · GitHub

Kotlin + gradleでgRPCプロトコル定義ファイル(.proto)のvendoringにprotodepをつかってみた

gRPCのプロトコル定義ファイル(.proto)の管理を考えていきたい。gRPCを導入するプロジェクトであれば.protoファイルの運用方法は課題である。

プロトコル定義ファイル(.proto)の運用課題

gRPCはプロトコル定義ファイル(.proto)から生成したプログラムをもとにサーバとクライアントに分かれ実装していくことになる。Microservicesではサーバとクライアントが別レポジトリに分かれ開発を行うことが多い。それぞれのレポジトリではマスタの.protoファイルを何処に置いて参照すれば良いだろうか。この課題は言語が違うクライアントとサーバ間でも同じことが言えるだろう。

そしてもう1つの課題は.protoファイルのバージョン管理だ。開発中のプロトコル定義に最新のプロトコル定義を干渉させたくない。開発中はクライアントとサーバは同じ世代のプロトコル定義を参照していることを再現しなくてはならない。

このような課題を解決してくれるのが protodepだ。

github.com

protodepはプロトコル定義ファイル(.proto)のvendoringツールである。
今回のエントリでは protodepをGradleビルドシステムに導入した内容をまとめていく。

※protodepの使い方はREADMEにあるので合わせて参照してください。

protodepをGradleタスクに追加する

※ 開発するプロジェクトの言語はKotlinを想定

開発中はプロジェクト内の.protoファイルからProtocolBufferを生成する必要がある。protobuf-gradle-pluginを有効にすればProtocolBufferを生成するタスクが追加される。導入方法は grpc/grpc-javaを参照していただきたい。

GitHub - grpc/grpc-java: The Java gRPC implementation. HTTP/2 based RPC

protobuf-gradle-pluginを有効にすると generateProtoのタスクが追加される。このタスクを実行することでプロジェクト内の.protoファイルからProtocolBufferが生成できる。

そしてgenerateProtoのタスクの前にprotodepを実行すればリモートにある.protoファイルのプロトコル定義のバージョンの固定を担保したうえでProtocolBufferが生成できる。 settings.gradleに次のようなタスクを追加した。

task protoDep {
    def identity_file = "id_rsa"
    if (findProperty("identityFile") != null) {
        identity_file = findProperty("identityFile")
    }
    def forceUpdate = ""
    if (findProperty("forceUpdate") != null || findProperty("forceUpdate") == true) {
        forceUpdate = "-f"
    }
    println forceUpdate
    doLast() {
        exec {
            commandLine = ["protodep", forceUpdate, "--identity-file=${identity_file}", "up"]
        }
    }
}

tasks.whenTaskAdded { task ->
    if (task.name == "generateProto" && !project.hasProperty("IGNORE_PROTO_DEP")) {
        task.dependsOn protoDep
    }
}

複数人で開発することを想定して identityFileのgradle propertyを追加している。個人の認証ファイルをビルド時に指定できる。

実行した結果は次のようになる。

$ ./gradlew clean generateProto build

:cleanProtoGen
:clean
:extractIncludeProto
:extractProto UP-TO-DATE
:protoDep
[INFO] force update = false
[INFO] identity file = id_rsa
[INFO] use SSH protocol
[INFO] Getting github.com/nsoushi/spring5-kotlin-application-proto
:generateProto
:processResources
・・・

generateProtoのタスクの前に protoDepのタスクが実行され.protoファイルをvendoringしてくれる。

.protoファイルをアップデートしたい場合は次のように実行する。

$ ./gradlew clean generateProto build -PforceUpdate=true

protodepの-fオプションを有効にしてくれる。
リモートの.protoファイルをProtocolBufferの生成するときに固定するか更新するか選択することができる。

まとめ

  • プロトコル定義ファイル(.proto)の運用課題をgradleのタスクに追加する方法と運用イメージをまとめた。
  • generateProtoタスクの前に組み込むことにより protoDepを意識することなく.protoファイルのvendoringが行うことができた。

コード

settings.gradlegithubから参照できます。

github.com

ALBのアクセスログからAPIのレスポンスタイムを監視する

サービスのAPIのレスポンスタイムを監視することは運用において大事なことである。APIのレスポンスタイムを計測してモニタリングする方法は多様に考えられるが、このエントリでは次の図のようにALBのアクセスログをfluentdが収集してモニタリングツールと連携させることでAPIのレスポンスタイムを監視を実現していく。

f:id:n_soushi:20170629104620p:plain

  • ALBのアクセスログを有効にすると指定したS3のバケットにログファイルが貯まる
  • FluentdはS3へアクセスしログを収集する
  • 収集したログをElasticSearchへ転送しKibanaでモニタリング
  • 収集したログを解析し収集インターバル中のアクセスのうち最大レスポンスタイムや平均レスポンスタイムを統計しMackerelへメトリックを投稿
  • Mackerelでレスポンスタイムが閾値を超えた場合のアラート体制を整える

上記の構成はログのモニタリングと合わせてレスポンスタイムが閾値を超えた場合の通知を実現している。
また、ALBのアクセスログにはリクエストURLやUserAgentなどLBを通っているため監視に適した情報が揃っている。
モニタリングはkibanaで行うのでデータの集計やグラフを組み合わせてログ監視のダッシュボードを作れる。

そして、fluentdのプラグインを組み合わせれば、簡単にこの構成を実現できる。
利用したプラグインを一覧する。

どれも提示されている標準の使い方で実現できる。
詳細なfluentdの設定はgithubに置いているので参照いただきたい。

GitHub - nsoushi/alb-latency-collector: This repository contains fluentd setting for monitoring ALB latency.

1点、ログ監視の注意点として挙げておきたいことがある。
Fluentdで転送したログをLogstash形式でElasticsearchへインデキシングをしているときにALBのログタイムスタンプ(timeフィールド)を @timestampに置き換える必要がある。障害でログが転送されない状況から再開したときにログ転送の時刻を計測してしまうと実際のALBのログタイムスタンプと差異が生まれてしまう。これを予防するのが次の設定になる。

<match api.access_log>
  type copy

  <store>
    type record_reformer
    enable_ruby true
    tag api.access_log.es
    <record>
    @timestamp ${record['time']}
    </record>
  </store>

record_reformerでALBの timeフィールドをLogstashの @timestampへ置き換えている。

まとめ

  • Fluentdのプラグインを集めてALBのアクセスログを監視することができた。
  • その他のALBがあれば同様にFluentdの設定を追加することで柔軟にAPIのレスポンスタイム監視を増やすことができる。

コード

設定値を変更すればdocker-composeで動作確認を行えるコードを用意したので試したい方はgithubを参照してください。

github.com

Spring5.0 + KotlinのRouterFunctionのテストはどうすればよいか? 試してみた

引き続きSpring5.0(Spring Boot 2.0) + KotlinのWebアプリケーションを試している。今回はRouterFunctionをつかってHTTPルーティングを定義したときにテストコードはどう書くのか?気になったのでまとめてみた。

Router Function

Router FunctionはSpring5.0から導入されるものでアノテーションベースのHTTPサービスの定義ではなく RouterFunctionDslクラスを実装していくことになる。エンドポイントのURLとHTTP Handlerをマッピングを行うことでルーティングが定義できる。

@Configuration
class TaskRoutes(private val taskHandler: TaskHandler, private val exceptionFilter: ExceptionFilter) {

    @Bean
    fun taskRouter() = router {
        (accept(APPLICATION_JSON) and "/api").nest {
            "/task".nest {
                POST("/", taskHandler::create)
                GET("/{id}", taskHandler::fetchByTaskId)
                PUT("/{id}", taskHandler::updateByTaskId)
                DELETE("/{id}", taskHandler::deleteByTaskId)
                PUT("/{id}/finish", taskHandler::finishByTaskId)
            }
            "/tasks".nest {
                GET("/", taskHandler::fetchAll)
            }
        }
    }.filter(exceptionFilter())
}

次からはRouter Functionで定義されたHTTPサービスのテストコードを書いていきたい。

MockMvcではなくWebTestClient をつかう

SpringのコントローラテストコードはMockMvcを用いて書くことが多かったがRouter Functionでは WebTestClientをつかう。
上記の taskRouterはタスクエンティティを扱うコントローラでありルーティングも固有のBeanで定義している。また TaskHandlerはDBアクセスやgRPC サーバなどロジックが絡むのでモック化していきたい。そのような場合にはWebTestClientに用意されている bindToRouterFunctionを定義する。

@RunWith(SpringRunner::class)
class TaskRoutesTest {
    lateinit var client : WebTestClient
    lateinit var taskHandler: TaskHandler
    lateinit var exceptionFilter: ExceptionFilter

    val mapper = ObjectMapper().registerModule(KotlinModule())

    @Before
    fun before() {
        taskHandler = mock(TaskHandler::class)
        exceptionFilter = ExceptionFilter()

        val taskRoutes = TaskRoutes(taskHandler, exceptionFilter)

        client = WebTestClient.bindToRouterFunction(taskRoutes.taskRouter()).build()
    }

・・・

}
  • TaskHandlerをモック化する(taskHandler変数を定義)
  • taskHandler変数とexceptionFilter変数から TaskRoutesのコンストラクタを呼び出しRoutesオブジェクトを生成する(taskRoutes変数を定義)
  • taskRoutes変数を bindToRouterFunctionに渡しWebTestClientオブジェクトを生成する

以上をbeforeに定義してテスト実行の準備を整える

@Testを定義する

タスクを1件取得するエンドポイントURLのテストコードは次のようになった。

@Test
fun `GET Task`() {

    // mock
    `when`(taskHandler.fetchByTaskId(any())).thenReturn(ok().json().body(Mono.just(mockModel)))

    client.get().uri("/api/task/1")
            .accept(MediaType.APPLICATION_JSON_UTF8)
            .exchange()
            .expectStatus().isOk
            .expectBody()
            .consumeAsStringWith {
                val actual = mapper.readValue<TaskModel>(it, TaskModel::class)
                actual.id shouldBe 1L
                actual.title shouldBe "task title"
                actual.createdAt shouldBe "2017-06-13T16:22:52Z"
                actual.updatedAt shouldBe "2017-06-13T16:22:52Z"
            }
}
  • taskHandler.fetchByTaskIdをモック化して返すオブジェクトを定義している
  • exchange()ResponseSpecオブジェクトが返ってくるのでHTTPステータスやレスポンスボディを取得しアサーションする

Spring REST Docsは使えるの?

Test DrivenにAPIドキュメントを生成するSpring Rest DocsはMockMVCベースなので今のところは対応していない。

github.com

対応はしていないが対応はする予定である。issueから確認できるがREST Docs 2.0で対応予定のようなので期待したい。

Add support for using WebTestClient to document an API · Issue #384 · spring-projects/spring-restdocs · GitHub

同じくAPIドキュメントをアノテーションベースで生成できる便利なSpring Foxもサポート予定であることをissueから観測した。

Spring 5 support · Issue #1773 · springfox/springfox · GitHub

個人的にはエンドポイント設計時からアノテーションを付与すればAPI仕様書がつくれるSpring Foxのほうが好みである。だがアノテーションベースで定義する手法が、HTTPルーティングをアノテーションベースで定義しないRouter Functionにどのようにマッピングされるのだろうか。サポート結果が楽しみである。

まとめ

  • RouterFunctionをつかった場合でもテストコードはモックも交えることができるし問題無さそうで導入に前向きになれた。
  • APIドキュメントのライブラリのサポート体制状況も観測できたので、Spring5.0のリリースが待ち遠しい。

コード

コードを書きながら調べたのでgithubと合わせて確認いただけます。

github.com

テストコードはこちらです。 spring5-kotlin-application/TaskRoutesTest.kt at master · nsoushi/spring5-kotlin-application · GitHub

参考にしたエントリ

Spring5.0 + KotlinではDoma、Request Interceptorあたりはどうなっているのか調べてみた

Spring5.0のリリースが迫るなか、プロジェクトへ導入に向けて色々と調べている。インタセプターなどのSpring Frameworkにおける作法はどうなっているか、便利に使えていたライブラリとの相性はどうなのか、などをアウトプットしていく。

次の2つのはてな?を調べてみた。

  • DBアクセスにはDomaを使いたい。Spring Transactionalの@Transactionalも合わせて使える?
  • HandlerInterceptorAdapter@ControllerAdviceなどAOPRouterFunctionで使いたいけど、どうすれば?

それでは1つずつ整理していく。

Domaとの相性

Spring Boot 1.5系とDomaの相性は良くSpring Transactionalの@Transactionalも問題なく使えている。
@Transactionalの使い勝手は良くアノテーションを付けたメソッド無いでRuntimeExceptionが発生すればロールバックしてくれる。

@Transactional
override fun invoke(command: CreateTaskCommand): Task {
    return taskRepository.create(command.title).fold({
        task -> task
    }, {
        error -> throw handle(error)
    })
}

com.mysql.jdbc.ReplicationDriverのドライバを使い @Transactional(readOnly = true)をつければスレーブに対してDBアクセスが行われる。

DomaのKotlinサポートの対応方法をベースにSpringBoot 2.0で試してみたところ、問題なく使えた!

Domaとの相性は問題ないと結果が得られたのでDBアクセスには引き続きDomaを使っていく。

動いているコードはgithubにあるので合わせて参照ください。

RouterFunctionでリクエストやExceptionを良しなに処理したい

HTTPリクエストをハンドリングするアプリケーションの関心事として次のようなものがある。

  • HTTPリクエストをインターセプトして共通の処理を入れたい
  • エラーレスポンスの扱いを共通化したい

これらの関心事をSpring5.0から導入されるRouterFunctionでは、どのように解決すれば良いか。
Spring4系であればHTTPリクエストのインターセプトにはHandlerInterceptorAdapterを実装したクラスを用意すれば良かった。サービス層で発生したエクセプションに応じてエラーレスポンスを定義するクラスを@ControllerAdviceアノテーションをつけたクラスを用意すれば良かった。

RouterFunctionでは、FilterFunctionをつかって2つの関心事を解決できる。
コードとしては次のようになる。

@Configuration
class TaskRoutes(private val taskHandler: TaskHandler, private val exceptionFilter: ExceptionFilter) {

    @Bean
    fun taskRouter() = router {
        (accept(APPLICATION_JSON) and "/api").nest {
            "/task".nest {
                POST("/", taskHandler::create)
                GET("/{id}", taskHandler::fetchByTaskId)
                PUT("/{id}", taskHandler::updateByTaskId)
                DELETE("/{id}", taskHandler::deleteByTaskId)
                PUT("/{id}/finish", taskHandler::finishByTaskId)
            }
            "/tasks".nest {
                GET("/", taskHandler::fetchAll)
            }
        }
    }.filter(exceptionFilter())
}

@Component
class ExceptionFilter {

    private val logger = KotlinLogging.logger {}

    operator fun invoke(): (request: ServerRequest, next: HandlerFunction<ServerResponse>) -> Mono<ServerResponse> = { request, next ->
        try {
            next.handle(request)
        } catch (e: Exception) {
            when (e) {
                is SystemException -> status(e.status).json().body(Mono.just(ErrorItem(e.message ?: "web application error", e.status.value().toString(), null)))
                else -> {
                    logger.error(e) { "unknown exception: %s".format(e.message ?: "unknown error") }
                    status(HttpStatus.INTERNAL_SERVER_ERROR).json().body(Mono.just(ErrorItem(e.message ?: "internal server error", null, null)))
                }
            }
        }
    }
}

.filter(exceptionFilter())の箇所でFilter Functionを定義している。
コードの例ではExceptionFilterクラスでは各APIで発生したExceptionをフィルタリングしている。Exceptionが発生した場合はエラーレスポンスを返すようにしている。これで例外処理をハンドリングできるインタセプターをRouterFunctionに追加できる。

Spring4系で HandlerInterceptorAdapter@ControllerAdviceで分かれていたところがFilterFunctionに集約された格好である。

コード

コードを書きながら調べたのでgithubと合わせて確認いただけます。

github.com

まとめ

  • Domaとの相性が問題ないことやHTTPハンドリングの関心事もRouterFunctionでも解決できることが分かった。
  • 並行してSpringFoxとの相性も調べている。APIドキュメントの快適さもSpring4系を使う恩恵があったのでSpring5.0からも問題なく使えることを期待している。

Spring5.0 + Kotlinで1つのjarにHTTPサーバーとgRPCサーバーを相乗りさせてみた

Spring Boot 2.0.0 M1がリリースされました。以前のエントリで試した当時は 2.0.0.BUILD-SNAPSHOTでありHTTPサーバーが起動している状態でgRPCクライアントを動かすとエラーになっていた。

2.0.0 M1のリリースに伴いHTTPサーバーとgRPCサーバーが1つのjarに相乗りできるようになっているか確認するのが今回のモチベーション。

次のようなアプリケーション構成を実現したい。

f:id:n_soushi:20170608125908p:plain

  • API ServerHTTP1.1のリクエストのルーティングgRPCサーバーのエンドポイントを提供する
  • API Serverに届いたリクエストはBackend Server向けgRPCクライアントからBackend Serverにリクエストする
  • API ServerとBackend Server間の通信はgRPCで行う
  • エンドポイントにはHTTP1.1のリクエストのルーティングgRPCサーバーの2つの通信方式を用意する
  • すべてのアプリケーションはSpring Boot 2.0.0 M1の上で動く
  • Introducing Kotlin support in Spring Framework 5.0のエントリにあるようなSpring5.0で提供される新しい機能をつかう

ここからは試した過程での気づきなどをアウトプットしていく。

CommandLineRunnerでgRPCサーバーを起動する

HTTPルーティングが動いている状態で CommandLineRunnerをつかいgRPCサーバーを次のように起動させた。

@Configuration
class GrpcServerRunner(private val appProperties: AppProperties,
                       private val echoServer: EchoServer,
                       private val taskServer: TaskServer) : CommandLineRunner, DisposableBean {

    private val logger = KotlinLogging.logger {}

    lateinit var server: Server

    override fun run(args: Array<String>) {

        val port = appProperties.grpc.server.port

        logger.info { "Starting gRPC Server ..." }
        val serverBuilder = NettyServerBuilder.forPort(port)
        serverBuilder.addService(echoServer)
        serverBuilder.addService(taskServer)
        server = serverBuilder.build().start()
        logger.info {"gRPC Server started, listening on port $port."}

        startDaemonAwaitThread()
    }
}

Spring Boot 2.0.0 M1で試したところ問題なく起動した!2.0.0.BUILD-SNAPSHOTではエラーになっていたところ)
2.0.0 M1では300以上のissueやプルリクエストがマージされたので、その中のどれかで解消されたということだろう。

grpc-java1.3.0はエラーになる

Spring Bootには直接関係はないところであるが最新のgrpc-javaの1.3.0を使うとエラーになった。

java.lang.NoClassDefFoundError: io/netty/handler/codec/http2/internal/hpack/Decoder
        at io.grpc.netty.GrpcHttp2HeadersDecoder.<init>(GrpcHttp2HeadersDecoder.java:85) ~[grpc-netty-1.2.0.jar:1.2.0]
        at io.grpc.netty.GrpcHttp2HeadersDecoder$GrpcHttp2ServerHeadersDecoder.<init>(GrpcHttp2HeadersDecoder.java:135) ~[grpc-netty-1.2.0.jar:1.2.0]
        at io.grpc.netty.NettyServerHandler.newHandler(NettyServerHandler.java:109) ~[grpc-netty-1.2.0.jar:1.2.0]
        at io.grpc.netty.NettyServerTransport.createHandler(NettyServerTransport.java:132) ~[grpc-netty-1.2.0.jar:1.2.0]
        at io.grpc.netty.NettyServerTransport.start(NettyServerTransport.java:77) ~[grpc-netty-1.2.0.jar:1.2.0]
        at io.grpc.netty.NettyServer$1.initChannel(NettyServer.java:141) ~[grpc-netty-1.2.0.jar:1.2.0]
・・・

Netty 4.1.9ではio.netty.handler.codec.http2.internal.hpack.Decoderが削除されているのが原因。

github.com

gradleは次のようにgrpc-nettyのみ1.4.0-SNAPSHOTのバージョンを指定することで解消できる。

buildscript {
    ext.grpc_version = "1.3.0"
    ext.grpc_version_snapshot = "1.4.0-SNAPSHOT"

    repositories {
        mavenCentral()
        maven { url "https://repo.spring.io/milestone" }
        maven { url 'http://repo.spring.io/plugins-release' }
        maven { url "https://oss.sonatype.org/content/repositories/snapshots" }
    }
}

・・・

dependencies {
・・・
    // grpc
    compile "io.grpc:grpc-netty:${grpc_version_snapshot}"
    compile "io.grpc:grpc-protobuf:${grpc_version}"
    compile "io.grpc:grpc-stub:${grpc_version}"
    compile "io.grpc:grpc-okhttp:${grpc_version}"
    compile "com.google.api.grpc:googleapis-common-protos:0.0.3"
・・・
}

BackendサーバーにはHTTPサーバーがいらない

BackendサーバーにはHTTPサーバーがいらないので次のように@SpringBootApplicationがついたメインクラスの起動でWebアプリケーションを動かさないようにした。

@SpringBootApplication
@EnableConfigurationProperties(AppProperties::class)
class Application {

    companion object {
        @JvmStatic
        fun main(args: Array<String>) {
            SpringApplicationBuilder(Application::class).web(WebApplicationType.NONE).run(*args)
        }
    }
}

コード

今回試したコードはgithubに置いてあるので参考になれば嬉しい。

github.com

まとめ

  • 理想の構成どおりにアプリケーションが組めた
  • Spring5.0のkotlinサポートは魅力的であり実戦投入を検討したかったがgRPCとの相性が良くない印象であったが検証を経て問題ないことが確認できた
  • いよいよSpring5.0の正式リリースに向けて整ってきている印象を受けた
  • その他で利用しているSpringエコシステムの各種ライブラリとのSpring5.0/Spring Boot 2.0.0の相性が問題ないことを引き続き確認していき導入を検討していきたい