gRPC streamingをつかうとマイクロサービスの責務が整理できるし省コネクションでメリットあるよね、という話
今回はgRPCをマイクロサービス間通信に導入することってメリットあるよね、というエントリです。 定期的に処理を実行してくれるバッチはよくあるものですがバッチの駆動をgRPCを使って次のような構成で動かしました。
Batch Control
とBatch Server
はBidirectional gRPC streaming
でコネクションする。Batch Control
はRedisのPub/Subで特定のチャンネルを監視する。Batch Control
はチャンネルにキューが投げられたらBatch Server
へバッチ処理スタートのリクエストを送る。Batch Service
はリクエストを受け取りバッチを動かし処理結果をBatch Control
に送る(レスポンスを送る)。- チャンネルにキューが投げられる度に上記の流れでバッチを稼働させる。
上記の構成を踏まえ次からはメリットをまとめます。
gRPCをマイクロサービスに導入するとメリットあるよね
キューのRead権限をバッチサーバから剥がせる
キュー駆動でバッチを動かしている場合、例えばAmazon SQSを導入しているとRead権限が必要です。上記の構成であればキューを監視するのはバッチサーバではなくコントロールサーバになります。そのためキューを監視する権限をコントロールサーバに集約できるメリットがあります。
ログ集約サーバへの送信責務もバッチサーバから剥がせる
図のとおりgRPCのBidirectional streaming
を使えば複数のレスポンスを送信することができます。バッチ処理結果や各種ログはコントロールサーバへ送り、ログ集約サーバへの送信はコントロールサーバが行います。gRPCで各サービスをつないでおいてログを送り、受けとったクライアントにログの集約を任せる、といった構成は導入メリットの1つな気がします。(ログの送信漏れ考慮は必要ですが)
そもそものgRPCのメリット
そもそものgRPCのメリットがあります。異なる言語のマイクロサービス間の通信でもProtocol Buffers
を定義することで容易に通信を確立できますし、streamingの方式を用途に合わせて選択することで省コネクションでマイクロサービス間のやり取りが行えます。
GoとJavaでBidirectional gRPC streamingをつかったデモ
上記の図の構成をもとにgRPCのクライアントをGo
でサーバをJava
で通信方式はBidirectional streaming
を採用してデモを作ってみました。
どのようなバッチサービス?
Bidirectional streaming
を採用しているので、リクエストが複数あってレスポンスも複数、または1つのようなサービスを考えました。
結果、数値を受け取り割り算
をして商と余りを返すサービスを実装しました。
Redisからキューを送信してクライアントがリクエストとレスポンスを受け取ったイメージです。
# Redis $ redis-cli 127.0.0.1:6379> PUBLISH my_queue '{"serviceName" : "division", "numbers" : [10, 3]}'
# Client 12:27:50.452 Request : {serviceName:'division', message:'10', time:'time string'} 12:27:50.452 Request : {serviceName:'division', message:'3', time:'time string'} 12:27:50.455 Response: {serviceName:'division', message:'quotient:3', time:'time string'} 12:27:50.456 Response: {serviceName:'division', message:'remainder:1', time:'time string'}
クライアントは10
と3
のリクエストを2つ送り、商が3
と余りが1
の結果を受け取ります。(余りが0であればレスポンスは1つになる)
protoファイル
protoファイルは次のようになりました。
syntax = "proto3"; option go_package = "protobuf"; package proto; service MicroService { rpc MicroService (stream Request) returns (stream Response) {} } message Request { string name = 1; string message = 2; string time = 3; } message Response { string name = 1; string message = 2; string time = 3; }
クライアントのコード(Go)
リクエストを送信してレスポンスを受け取っている通信周りのコードの抜粋です。
※コード全体はgithubにあります。
waitc := make(chan struct{}) go func() { for { in, err := stream.Recv() if err == io.EOF { close(waitc) return } if err != nil { log.Error("Failed to receive a message : %v", err) return } responseLog.Info("{serviceName:'%s', message:'%s', time:'%s'}", in.Name, in.Message, in.Time) } }() for { message, err := pubSub.ReceiveMessage() if err != nil { panic(err) } requests, err := getRequests(message) if err != nil { panic(err) } for _, request := range requests { requestLog.Info("{serviceName:'%s', message:'%s', time:'%s'}", request.Name, request.Message, request.Time) if err := stream.Send(&request); err != nil { log.Error("Failed to send a message: %v", err) } } } stream.CloseSend() <-waitc
サーバのコード(Java)
リクエストを受け取りレスポンスを送信している通信周りのコードの抜粋です。
割り算をする数値が分けられて送られてきます。1回目のリクエストでキーを生成してリクエストを保持しながら2回目のリクエストで割った結果を送信しています。
※コード全体はgithubにあります。
return new StreamObserver<Microservice.Request>() { public void onNext(Microservice.Request req) { Long key = getTime(req); Observable.just(req) .subscribe(new Observer<Microservice.Request>() { @Override public void onSubscribe(Disposable d) { Log.i("Request", getRequestLog(req)); } @Override public void onNext(Microservice.Request request) { if (!routeNumber.containsKey(key)) { routeNumber.put(key, Arrays.asList(req)); } else if (routeNumber.get(key).size() == 1) { Microservice.Request prevRequest = routeNumber.get(key).get(0); Integer leftTerm = Integer.parseInt(prevRequest.getMessage()); Integer rightTerm = Integer.parseInt(req.getMessage()); Integer quotient = leftTerm / rightTerm; Integer remainder = leftTerm % rightTerm; if (remainder == 0) { responses.putIfAbsent(key, Arrays.asList( getResponse(req.getName(), String.format("quotient:%d", quotient)))); } else { responses.putIfAbsent(key, Arrays.asList( getResponse(req.getName(), String.format("quotient:%d", quotient)), getResponse(req.getName(), String.format("remainder:%d", remainder)))); } } else { Log.w(String.format("waring, unknown state. key:{%s}, value:{%s}", key, routeNumber.get(key))); } } @Override public void onError(Throwable e) { Log.e(String.format("onError %s", e.getMessage())); } @Override public void onComplete() { if (responses.containsKey(key)) { Observable.fromIterable(responses.get(key)) .subscribe(res -> { responseObserver.onNext(res); Log.i("Response", getResponseLog(res)); }); routeNumber.remove(key); responses.remove(key); } } }); } public void onError(Throwable t) { logger.log(Level.WARNING, "microService cancelled"); } public void onCompleted() { responseObserver.onCompleted(); } };
デモ
まとめ
- Bidirectional streamingは1回のコネクションでクライアントとサーバ間で複数回のリクエストとレスポンスを送ることができます。リクエスト/レスポンスの度にコネクションを確率しないので省コネクションのメリットがあります。
- クライアントはgRPCのコネクションを確立してからチャンネルのsubscribeを継続して行っています。キューが送られる度にgRCPのコネクションを繋いでいません。1回のgRCPコネクションを確立するだけでバッチサーバのコントロールが行うことができました。
- gRPCで考えてみましたがHTTP/API/JSONの通信形式であっても権限や責務を1つのサーバに集約させるメリットは受けられます。何よりProtocol Buffers定義による複数言語のサポートとstreaming方式の便利さが運用しているマイクロサービスに嵌まれば導入機会を検討するべきです。引き続きgRPCのメリットを受けられるようなユースケースを考えていきます。
コードを公開しています
コード全体はgitbubで確認できます。