fluentd + logstash_formatの難点をrecord_reformerで解決した話

「アプリケーションで出力したログをelasticsearchにインデックスするときにログ時間のフィールド名を@timestampにしたい。」
こんなときにfluentdのlogstash_formatを使うと少々はまります。
今回はそのハマりポイントと解決についてのお話です。

なぜログ時間のフィールド名を@timestampにしたいか

先々にkibanaでもログを取り込むことを考えていました。(kibanaではログ時間のTime Fieldの扱いが@timestampとなっています。)
アプリケーションから出力されるフィールドはtimeのままで@timestampにしたくありません。
logstash_formatを使えばtimeフィールドを@timestampに変換してくれますが難点があります。

logstash_formatを使わない理由

logstash_formatはfluentdでsource化したログをlogstashのログ形式にフォーマットしてくれるプラグインです。

例えば以下のログをfluentdでsource化してelasticsearchにインデックスするとインデックスは以下のようになります。

ログ

{"time":"2016-12-06T10:43:10Z","post_id":1000,"category_id":200,"user_id":123}

elasticsearchでindex確認

$ curl -XGET "http://localhost:9200/localhost.api-2016.12.06/logstash_action_log/AVjnaIabuvtAHVSVViec?pretty"
{
  "_index" : "localhost.api-2016.12.06",
  "_type" : "logstash_action_log",
  "_id" : "AVjnaIabuvtAHVSVViec",
  "_version" : 1,
  "found" : true,
  "_source" : {
    "post_id" : 1000,
    "category_id" : 200,
    "user_id" : 123,
    "@timestamp" : "2016-12-06T10:43:10+00:00"
  }
}

ここに1つの難点があります。logstash_formatを使うとアプリケーションログのtimeは@timestampに変換してくれます。(便利)
だけど、インデックス名がlocalhost.api-yyyyMMddとなってしまいます。
fluentdの設定からはインデックス名のprefixは指定できますが、時刻部分はコントロールできません。(type名は指定できます)

fluentdの設定でいうと以下の部分です。

<match api.logstash_action_log>
  type copy
  <store>
〜〜省略〜〜
    type_name logstash_action_log ← indexのtype名

    logstash_format true ←ココでフォーマットを有効化
    logstash_prefix "localhost.api" ←ココでindex名のprefixを指定
  </store>
</match>

インデックス名はコントロールしたいけど、timeフィールドは@timestampに変換してほしい。
logstash_formatだとインデックス名はコントロールできない。

そこでrecord_reformerの出番です。

github.com

record_reformer

record_reformerはfluentdでsource化したログのフィールド名を新しいフィールド名に書き換えることができます。
ログ値も同様に書き換えられます。

以下、利用例です。

fluentdの設定

<match foo.**>
  type record_reformer
  remove_keys remove_me
  renew_record false
  enable_ruby false

  tag reformed.${tag_prefix[-2]}
  <record>
    hostname ${hostname}
    input_tag ${tag}
    last_tag ${tag_parts[-1]}
    message ${message}, yay!
  </record>
</match>

書き換え前のログ

foo.bar {
  "remove_me":"bar",
  "not_remove_me":"bar",
  "message":"Hello world!"
}

record_reformerが書き換え後のログ

reformed.foo {
  "not_remove_me":"bar",
  "hostname":"YOUR_HOSTNAME",
  "input_tag":"foo.bar",
  "last_tag":"bar",
  "message":"Hello world!, yay!",
}

record_reformerが解決してくれる

見事に今回の課題をrecord_reformerが解決してくれました。
record_reformerで書き換えたmatch節からelaseticsearchにcopyするmatch節に繋げるのがコツです。
最終的な設定は以下のようになりました。

<source>
# 省略
</source>

<match api.reformer_action_log>
  type record_reformer
  enable_ruby true
  tag api.action_log.reformer
  <record>
  @timestamp ${time.strftime('%Y-%m-%dT%H:%M:%S%z')}
  </record>
</match>

<match api.action_log.reformer>
  type copy
  <store>
    @type elasticsearch
    host "#{ENV['ELASTICSEARCH_HOST']}"
    port "#{ENV['ELASTICSEARCH_PORT']}"

    index_name api
    type_name reformer_action_log

    include_time_key true
    flush_interval 1s
  </store>
</match>
  • match api.reformer_action_logでtimeフィールドを@timestampのフィールド名に再フォーマット
  • match api.reformer_action_logtag api.action_log.reformerでelasticsearchへ転送するmatch定義へフォワード
  • match api.action_log.reformerではlogstash_formatは使わずにindex名が指定できています

elasticsearchでindex確認

$ curl -XGET "http://localhost:9200/api/reformer_action_log/AVjnaJoIuvtAHVSVVied?pretty"
{
  "_index" : "api",
  "_type" : "reformer_action_log",
  "_id" : "AVjnaJoIuvtAHVSVVied",
  "_version" : 1,
  "found" : true,
  "_source" : {
    "post_id" : 1000,
    "category_id" : 200,
    "user_id" : 123,
    "@timestamp" : "2016-12-06T10:43:10+0000"
  }
}

インデックスが定義で指定したreformer_action_logに、timeフィールドは@timestampのフィールド名になりました!

ソースを公開しています

今回のソースはgithubに公開しています。logstashの設定も入れているので合わせて確認できます。

github.com


同じ課題にぶち当たっている人の手助けになれば嬉しいです。