ElasticsearchのScroll APIをためしてみた

気になっていたElasticsearchのScroll APIの使用感を記録します。最近の開発でScroll APIを採用したい欲求がありましたが、使用感を調べる前で採用は見送りました。このままだと気になったまま使わないことになりそうなので、この機会にまとめます。

www.elastic.co

※ version 2.4をつかいました。

Scroll APIは通常のSearch requestのoffset/limitでページング取得をしないため処理中のデータ抜けが防げるメリットがあります。またScroll APIは初回リクエスト時の結果をスナップショットすることで安定した応答速度を担保します。
スナップショットをとるためリアルタイムのデータ処理の利用には向いていません。(スナップショットの挙動について試してみたので後述しています)

どんなふうに使うか?

通常のクエリとscroll=1mを加えたリクエストを送ります。(size=1にしています)

curl -XGET 'http://localhost:9200/_search?scroll=1m&size=1&pretty' -d '
{
"query" : { "match" : { "category_id" : 100 } }
}'

次のような検索結果(1件)と合わせて_scroll_idが返ってきます。

{
  "_scroll_id" : "cXVlcnlUaGVuRmV0Y2g7NTs4OkZRQjk1VGJIUmxhRm5RVlBnVWotYXc7OTpGUUI5NVRiSFJsYUZuUVZQZ1VqLWF3OzEwOkZRQjk1VGJIUmxhRm5RVlBnVWotYXc7MTE6RlFCOTVUYkhSbGFGblFWUGdVai1hdzsxMjpGUUI5NVRiSFJsYUZuUVZQZ1VqLWF3OzA7",
・・・
  "hits" : {
    "total" : 52,
・・・
  }
}

2件目の取得を行うために/_search/scrollのエンドポイントへscroll_idをRequest Bodyに加えてリクエストします。クエリは必要ありません。

curl -XGET 'http://localhost:9200/_search/scroll?pretty' -d '
{
"scroll_id": "cXVlcnlUaGVuRmV0Y2g7NTs4OkZRQjk1VGJIUmxhRm5RVlBnVWotYXc7OTpGUUI5NVRiSFJsYUZuUVZQZ1VqLWF3OzEwOkZRQjk1VGJIUmxhRm5RVlBnVWotYXc7MTE6RlFCOTVUYkhSbGFGblFWUGdVai1hdzsxMjpGUUI5NVRiSFJsYUZuUVZQZ1VqLWF3OzA7"
}'
  • 2回目以降はscroll_idを送ることで初回のリクエスト時に送った検索条件の結果が返ってきます。
  • 3回目以降も同様にscroll_idを送ることで3件目、4件目、5件目・・・と結果を取得できます。
  • 初回にsize=1としたため、2回目以降の結果も1件になります。
  • またscroll=1mとしたことで初回にリクエストした検索条件の結果を1分間の有効期限でスナップショットが取られます。

使い終わったscroll_idは破棄をする

スナップショットを残して置くのはコストがかかるためscrollが終われば次のようにscroll_idをクリアします。

curl -XDELETE localhost:9200/_search/scroll -d '
{
    "scroll_id" : ["cXVlcnlUaGVuRmV0Y2g7NTs4OkZRQjk1VGJIUmxhRm5RVlBnVWotYXc7OTpGUUI5NVRiSFJsYUZuUVZQZ1VqLWF3OzEwOkZRQjk1VGJIUmxhRm5RVlBnVWotYXc7MTE6RlFCOTVUYkhSbGFGblFWUGdVai1hdzsxMjpGUUI5NVRiSFJsYUZuUVZQZ1VqLWF3OzA7"]
}'

複数のscroll_idをまとめてクリアもできます。

Scroll APIを使うときのメモ

  • Scroll APIの初回のリクエストはscroll_idを取得するためのものではなく、検索結果に加えてscroll_idが返ってくる。
  • Aggregationを含んだリクエストの場合、Aggregationの結果は初回のみ返ってくる。
  • ソート条件に制約がなければsort orderは_docにすることで安定した応答速度が得られる。

kotlin + 公式Elasticsearch ClientでScroll APIをためしてみる

せっかくなのでkotlinでコードからScroll APIをためしてみました。使ったクライアントは公式のElasticsearch Clientです。

www.elastic.co

※ version 2.4.3をつかいました

スナップショットは本当に有効なのか?

scroll=1mと設定してインデックスされたデータをscroll取得している間に、新しいソースをインデックスしても取得結果のtotal件数に変化がないか試してみました。

以下のような流れで検証します。

scrollIdが取得できれば再帰的にログ出力を繰り返し、その間に新しいソースを1件追加していきます。

実行した結果は次のようになりました。

[INFO ] totalCount={104}, id={AVkna34Rhpv5RJ12skTc}   // 初回取得時のtotal件数は104[INFO ] complete add source id={AVkqcnqbMJXjH5tvLcGB}  //新しいソースの追加が成功
[INFO ] totalCount={104}, id={AVkna_83hpv5RJ12skTd}   // total件数は初回取得時の104件から変わらずスナップショットが有効であることが確認できた
[INFO ] complete add source id={AVkqcntGMJXjH5tvLcGC}
[INFO ] totalCount={104}, id={AVknbG17hpv5RJ12skTf}
・・・

Scroll APIの仕様のとおりスナップショットが有効の状態であれば新しいソースを追加したとしてもスナップショットを指すscroll_idでリクエストをすると全体の件数は変わらないことが確認できました。

まとめ

  • ページング処理ではないため取りこぼしがない。(そもそも初回時点のスナップショットのデータを対象にスクロールするので取りこぼしはないと思われる)
  • offset/limitのページング処理のコードがなくなり、コードがシンプルになった。
  • スナップショットの有効期限が切れた場合、SearchContextMissingExceptionがスローされる。
  • SearchContextMissingExceptionを捕捉して新規スクロールを始める必要があるが、例外が起きた時点のソースIDを始点にスクロールを開始する・・・なかなか例外処理は複雑。
  • 取得したscroll_idを用いた次のスクロールに有効期限を添えることで常にスナップショットの有効期限を更新すれば例外処理を避けることができそう。
  • いまのプロジェクトに導入したくなってきたが、本番での処理時間を測定した上で最適な有効期限の設定をする必要があるため様子見する。

ソースを公開しています

今回検証したソースコードを公開しています。

github.com

Kotlinで快適なJSONパース。Klaxon: JSON for Kotlinを使ってみた。

前回の記事ではMoshiライブラリを使ったJSON文字列からのオブジェクト変換、オブジェクトからのJSON文字列変換の話でした。

naruto-io.hatenablog.com

JSONが複雑な構造でもあってもMoshiのCustom Type Adaptersを使って@ToJsonと@FromJsonを実装すればJSON←→オブジェクトの変換が難なく行えます。

難なく行えますと書きましたが、Moshiのカスタムアダプタを利用する難点もあります。
次のような難点が考えられます。

  • 複数のカスタムアダプタの実装が必要になると骨が折れる作業となる
  • 複雑ではないJSON構造でもカスタムアダプタが必要な場合には実装コストがかかる
  • 実装したカスタムアダプタの整合性テストが必要になる

カスタムアダプタを実装して整合性テストまでのコストを考えると多くのカスタムアダプタの実装は避けたいところです。
開発の過程でJSONパースの処理は多く登場します。JSON文字列をオブジェクトに変換して値を取り出すような良くあるケースでカスタムアダプタの実装は避けたいなぁ・・・と考え調べていたところ「Klaxon: JSON for Kotlin」を見つけました。

github.com

Klaxon: JSON for Kotlin

Klaxonはkotlin製のJSONパースライブラリです。簡易的な使い方の紹介をします。

次のようなJSON構造を例にします。

{
    "name": "Sakib Sami",
    "age": 23
}

次は使い方です。

val parser: Parser = Parser()
val stringBuilder: StringBuilder = StringBuilder("{\"name\":\"Sakib Sami\", \"age\":23}")
val json: JsonObject = parser.parse(stringBuilder) as JsonObject
println("Name : ${json.string("name")}, Age : ${json.int("age")}") // Name : Sakib Sami, Age : 23

KlaxonのParsrにJSON文字列の入力ストリームを渡しJsonObjectでキャストします。あとはキャストしたJsonObjectのdata classに用意されたstiringやintメソッドに取得したいJSON構造のkeyを渡せば値を参照することができます。
stringやintなどのメソッドはJSON値の型に合わせたメソッドが用意されています。その他にlongやbooleanなどがあり、値が新たなJSON構造であればobj、配列の場合はarrayのメソッドを利用します。
as JsonObjectのようにJSON値をキャストする型はobjなどのメソッドと対になっています。詳しくはドキュメントを参照してください。

AggregationBuildersクラスをJSON化してパースしてみる

以前の記事で紹介したAggregationBuildersクラスのオブジェクト構造をJSON文字列にして、KlaxonでJSONパースした例を紹介します。
AggregationBuildersクラスのオブジェクト構造は次のように表せます。

{
    "aggs_post_id": {  "terms": { "field": "post_id", "size": 1000 }, "aggregations": {
            "aggs_category_id": { "terms": { "field": "category_id", "size": 20 }, "aggregations": {
                    "aggs_user_id": { "terms": { "field": "user_id", "size": 2500 } } } } }
}

上記のようなJSONをKlaxonでパースして値を検証してみました。

val parser: Parser = Parser()
val jsonObject = parser.parse(json.byteInputStream()) as JsonObject

should("valid aggs_post_id.terms") {
    val aggsPostIdTerms = (jsonObject.get("aggs_post_id") as JsonObject).get("terms") as JsonObject

    aggsPostIdTerms.string("field") shouldBe "post_id"
    aggsPostIdTerms.int("size") shouldBe 1000
}

should("valid aggs_category_id.terms") {
    val aggsCategoryId = ((jsonObject.get("aggs_post_id") as JsonObject).get("aggregations") as JsonObject).get("aggs_category_id") as JsonObject
    val aggCategoryIdTerms = aggsCategoryId.get("terms") as JsonObject

    aggCategoryIdTerms.string("field") shouldBe "category_id"
    aggCategoryIdTerms.int("size") shouldBe 20
}

should("valid aggs_user_id.terms") {
    val aggsUserId = ((((jsonObject.get("aggs_post_id") as JsonObject).get("aggregations") as JsonObject).get("aggs_category_id") as JsonObject).get("aggregations") as JsonObject).get("aggs_user_id") as JsonObject
    val aggUserIdTerms = aggsUserId.get("terms") as JsonObject

    aggUserIdTerms.string("field") shouldBe "user_id"
    aggUserIdTerms.int("size") shouldBe 2500
}

構造が深いためas JsonObjectが連続して可読性が損なわれていますが、json構造を辿れることが伝えたく敢えて上記のように書きました。

fieldは文字列なのでstringメソッドで、sizeは数値なのでintメソッドで取得できます。

aggsPostIdTerms.string("field") shouldBe "post_id"
aggsPostIdTerms.int("size") shouldBe 1000

APIのレスポンスをKlaxonでパースして値を検証してみる

次のようなレスポンスを返すAPIを想定してKlaxonでパースして値を検証してみます。

[
    {
        "postId": 1324231431,
        "categoryId": 11,
        "user": { "userId": 1413241, "name": "John", "age": 20 }
    },
    {
        "postId": 1321231341,
        "categoryId": 22,
        "user": { "userId": 1453124, "name": "Amy", "age": 25 }
    },
    {
        "postId": 1329709858,
        "categoryId": 33,
        "user": { "userId": 1409709, "name": "Jessica", "age": 38 }
    }
]

kotlintestを使ってSpecテストをしていますのでそれぞれの値の検証意図はSpecのnameを参照してください。

init {
    given("GET: /test/content_list") {

        target = TestController()
        mvc = MockMvcBuilders.standaloneSetup(target).build()

        val response = mvc.perform(MockMvcRequestBuilders.get("/test/content_list"))
                .andExpect(MockMvcResultMatchers.status().isOk()).andReturn().response.contentAsString

        `when`("response is ok") {

            val array = Parser().parse(response.byteInputStream()) as JsonArray<JsonObject>

            then("レスポンスに含まれるPostは `3つ`") {
                val postIds = array.long("postId")
                postIds.size shouldBe 3
            }

            then("categoryIdが30以上のPostは `1つ`でpostIdは `1329709858`") {
                val post = array.filter {
                    it.long("categoryId")!! > 30L
                }
                post.size shouldBe 1
                post.get(0).long("postId") shouldBe 1329709858L
            }

            then("categoryIdが30以下のPostでuserのageが20以上のうち最後のレスポンスのuserのnameは `Amy`") {
                val post = array.filter {
                    it.long("categoryId")!! < 30
                }.findLast{
                    it.obj("user")!!.int("age")!! > 20
                }!!

                post.obj("user")!!.string("name") shouldBe "Amy"
            }
        }
    }
}

APIのレスポンスをパースすることでJsonArrayが取得できます。配列が取れればkotlinのコレクションが使えるのでfilterやmap、findLastのメソッドを利用して値を検証することでテストコードの可読性があがります。
さらにkotlintestのBehaviorSpecでテストコード全体を仕上げたので、それぞれのテストの意図も伝わりやすいです。
JSONパースにコストをかけずJSON構造を上から辿っていく間隔で値の取り出しをできるKlaxonはテストコードで重宝しそうです。

ソースを公開しています

ソースコードを公開しています。Moshiライブラリの検証が入っているプロジェクトに入れました。良ければ参考にしてみてください。

github.com

KotlinでtoJsonとfromJsonのJSONパース。MoshiのCustom Type Adaptersを使ってオブジェクトのテストを快適に。

最近はkotlinで開発しています。これまで他の言語で出来ていたこともkotlinではどうやって出来るのか?、調査したり試したりすることは楽しいですね。新しい言語に触れる醍醐味とも言えます。
kotlinをサーバサイドのメイン言語に使っています。数ある処理の中でもJSONの扱いは必須です。
今のプロジェクトではMoshiライブラリをメインで使っています。今回はkotlinでMoshiライブラリを使った話です。

Moshi

MoshiはJavaで開発されたJSONライブラリです。

github.com

ライブラリの特徴は公式のほうを参照いただくとして、メインの話をしたいのが「Custom Type Adapters」です。
Moshiは自作のアダプタクラスに@ToJson と @FromJsonのアノテーションがついたメソッドを実装すればプリミティブな型以外でも追加した型をオブジェクトからjson文字列へ(toJson)、json文字列からオブジェクトへ(fromJson)の変換処理を自作することができます。

テストを書くときにCustom Type Adaptersを活用する

今のプロジェクトではelasticsearchを使っていて検索パラメータを定義するビルダークラスの整合性の検証にMoshiのCustom Type Adaptersを活用しました。
カスタムアダプタを使いelasticsearchのAggregationBuildersクラスをkotlinのdata classへ変換し検索パラメータの検証を行いました。

ビルダークラスはgetAggregationsQuery()をコールすることで取得できます。

object AggregationModel {
    fun getAggregationsQuery() = AggregationBuilders.terms("aggs_post_id").field("post_id").size(100)
                .subAggregation(AggregationBuilders.terms("aggs_category_id").field("category_id").size(200)
                        .subAggregation(AggregationBuilders.terms("aggs_user_id").field("user_id").size(300)))!!
}

このビルダークラスをjson文字列で表すと次のような構造になります。

{
    "aggs_post_id": {  "terms": { "field": "post_id", "size": 100 }, "aggregations": {
            "aggs_category_id": { "terms": { "field": "category_id", "size": 200 }, "aggregations": {
                    "aggs_user_id": { "terms": { "field": "user_id", "size": 300 } } } } }
}
  • ネストが深い(key1.aggregations.key2.aggregations.key3.aggregations.... と幾重にも定義できる)
  • keyの文字列はこちらで決めれる( aggs_post_id, aggs_category_id、aggs_user_id)
  • key.aggregationsの指定は任意

kotlinのdata classで表すと次のように定義できます。

data class Aggregations(
        val name: String,
        val terms: Terms,
        val aggregations: Aggregations?
)

data class Terms(
        val field: String,
        val size: Long
)

今回のやりこと

今回、やりたいことを明確にします。

AggregationBuildersクラスをjson文字列変換し、MoshiのCustom Type Adaptersを使い、kotlinのdata classに変換してテストを快適にしたい。

です。

AggregationBuildersクラスはjson文字列に変換できる

AggregationBuildersはXContentFactoryでパラメータ構造を文字列に変換できます。

val aggregations = AggregationModel.getAggregationsQuery()

val builder = XContentFactory.jsonBuilder()
builder.startObject()
aggregations.toXContent(builder, org.elasticsearch.common.xcontent.ToXContent.EMPTY_PARAMS)
builder.endObject()

val json = builder.string()

val json = builder.string()ではビルドしたパラメータがjson文字列として格納されています。こちらのjson文字列を実装したカスタムアダプタでkotlinのdata classに変換します。

AggregationAdapterクラスを実装する

次のように定義することでMoshiがサポートしている型ではないAggregations型を追加できます。

class AggregationAdapter : JsonAdapter<Aggregations>() {

    companion object {
        val FACTORY: Factory = object : Factory {
            override fun create(type: Type, annotations: Set<out Annotation>, moshi: Moshi): JsonAdapter<*>? {
                if (type === Aggregations::class.java) {
                    return AggregationAdapter()
                }
                return null
            }
        }
    }

次のページを参考にさせていただきました。
Retrofit2.0に備えてKotlinで始めるMoshi(JSONパーサ) - Qiita

今回はJsonWriterとJsonReaderを使って@ToJson、@FromJsonを実装しました。

@ToJson
override fun toJson(writer: JsonWriter?, value: Aggregations?)

@FromJson
override fun fromJson(reader: JsonReader?): Aggregations?

keyの値に"aggs_post_id"など独自定義ができたり、ネストが深かったりとJsonWriterとJsonReaderを使うことで複雑な構造であっても変換処理を実装することができます。

実際のコードは省略してしまっていますが公開していますので、よろしければ参照してください。役立つと嬉しいです。
github.com

実装したアダプタクラスを使いdata class化することでアサーションも見通しが良くなりました。

val adapter = Moshi.Builder().add(AggregationAdapter()).build().adapter(Aggregations::class.java)

val actual = adapter.fromJson(json)

actual.name shouldBe "aggs_post_id"
actual.terms.field shouldBe "post_id"
actual.terms.size shouldBe 0L

actual.aggregations?.name shouldBe "aggs_category_id"
actual.aggregations?.terms?.field shouldBe "category_id"
actual.aggregations?.terms?.size shouldBe 0L

actual.aggregations?.aggregations?.name shouldBe "aggs_user_id"
actual.aggregations?.aggregations?.terms?.field shouldBe "user_id"
actual.aggregations?.aggregations?.terms?.size shouldBe 0L

まとめ

開発においてjsonの扱いは必須です。
kotlinでもMoshiのCustom Type Adaptersを使うことで独自の型の変換を学べました。
JsonWriterとJsonReaderのを使い変換処理をコード化することは慣れが必要で扱いにくいと感じてしまうかもしれません。

APIのResponse Bodyからオブジェクトへの変換をする、よくあるパターンです。
今回の学びでkotlinでも臆することなく向き合えるようになりました!