Spring Web Flux でServer Sent EventsのPush型APIをつくってみた
Spring Boot 2.0からWebFlux frameworkが導入された。Reactorが使えてReactive Programmingが可能になったのでPush型のAPIをつくってみた。
どんなAPIをつくるか
- コンテンツが更新されたら件数を教えてくるAPI
- Server Sent EventsのContent-typeをサポートする
- コンテンツの件数が変更されたらAPIをSubscribeしているクライアントに件数を通知する
RedisのPub/Subを使う
RedisのPub/Subを使ってリアルタイムのコンテンツ更新を通知する。アプリはServerSide+Kotlinで実装したのでJedisのライブラリを使う。
JedisはJedisPubSub
のクラスを実装すればチャンネルに通知されたメッセージをSubscribeできる。onMessage
でReactorクラスのFluxにデータを流すようにした。
@Service class TaskService(private val jedisPool: JedisPool, private val taskBackendClient: TaskBackendClient) { companion object { private val CHANNEL = "task" } private val flux = Flux.create<String> { emitter -> TaskSubscriber(emitter).let { sub -> jedisPool.resource.use { it.subscribe(sub, CHANNEL) } } } fun publishUpdateTask() { jedisPool.resource.use { it.publish(CHANNEL, "updateTask") } } fun subscribeTaskCount() = flux.map { taskBackendClient.getTaskCount().count }.run { share() } } class TaskSubscriber(private val fluxSink: FluxSink<String>) : JedisPubSub() { override fun onMessage(channel: String?, message: String?) { fluxSink.next(message) } }
ポイントはFlux.create<T>
で作られるFluxをTaskSubscriberのコンストラクタに渡してTaskSubscriberのonMessageでデータを流すようにしているところ。
コンテンツが更新されたらTaskService#publishUpdateTask
を呼び出しチャンネルに通知をPublishする。
ここまでくればTaskServiceを必要なところで使えばOK。
API HandlerでSever-Send-EventsのServerResponseを返す
bodyToServerSentEvents
の拡張関数を使えばSever-Send-EventsのServerResponseを返せる。
@Component class TaskHandler(private val taskBackendClient: TaskBackendClient, private val taskService: TaskService) { // --- fun create(req: ServerRequest): Mono<ServerResponse> { return ok().json().body( req.bodyToMono(CreateTaskInbound::class.java).map { p -> TaskModel(taskBackendClient.createTask(p.title)).also { taskService.publishUpdateTask() } }) } // --- fun fetchTaskCount(req: ServerRequest): Mono<ServerResponse> { return ok().json().bodyToServerSentEvents( taskService.subscribeTaskCount() .map { TaskCount(it) } ) } }
text/event-streamをacceptするroutingをRouterFunctionDslに追加する
@Configuration class TaskRoutes(private val taskHandler: TaskHandler) { @Bean fun taskRouter() = router { "/api".nest { accept(TEXT_EVENT_STREAM).nest { GET("/task-count", taskHandler::fetchTaskCount) } } } }
まとめ
- Spring BootでServer Sent EventsのPush型APIをつくってみた
- ReactorをサポートしているのとServer Sent EventsのAPIを作りやすい仕組みがSpring Boot 2.0から整っているのありがたい
- ReactorにProcessorが用意されているがRx javaと機能差異がどれくらいあるか気になる
コード
動くコードあります。