Logstashを使ってmysqlのデータをElasticsearchに同期する

公開日時
更新日時

mysqlのデータをElasticsearchのインデックスとして登録する際に、Logstashを使うと手軽に登録ができる。

ただ、データの新規追加、更新時、削除時にうまく同期させるには工夫が必要だったので対応方法をまとめておく。

mysql

更新日時の追加

どこまで同期済みかを判定するために更新日時カラムを用意しておく。

何かしらのフレームワークを使っている場合はupdated_atが自動で登録されていると思うが、ない場合は以下のsqlを実行してupdated_atを追加する。

ALTER TABLE {table} ADD updated_at TIMESTAMP NOT NULL DEFAULT CURRENT_TIMESTAMP ON UPDATE CURRENT_TIMESTAMP;

削除フラグの追加

mysql側でデータを削除した場合にLogstashでは削除を検知できないので、削除フラグを追加しておく。

ALTER TABLE {table} ADD is_deleted tinyint(1) NOT NULL DEFAULT '0';

Logstash

updated_atが最終取り込み日時よりも新しいデータを取り込み対象とするようにstatementを記述する。

scheduleにはcron風の実行頻度条件を記述できる。以下の例では5秒に一回取り込みが実施される。

input {
  jdbc {
    jdbc_driver_library => "/path/mysql-connector-java-8.0.17.jar"
    jdbc_driver_class => "com.mysql.cj.jdbc.Driver"
    jdbc_connection_string => "jdbc:mysql://{host}:3306/{db}"
    jdbc_user => {user}
    jdbc_password => {pass}
    jdbc_paging_enabled => true
    tracking_column => "unix_ts_in_secs"
    use_column_value => true
    tracking_column_type => "numeric"
    schedule => "*/5 * * * * *"
    statement => "SELECT *, UNIX_TIMESTAMP(updated_at) AS unix_ts_in_secs FROM {table} WHERE (UNIX_TIMESTAMP(updated_at) > :sql_last_value AND updated_at < NOW()) ORDER BY updated_at ASC"
  }
}
filter {
  mutate {
    copy => { "id" => "[@metadata][_id]"}
    remove_field => ["id", "@version", "unix_ts_in_secs"]
  }
}
output {
  elasticsearch {
      index => "{index}"
      document_id => "%{[@metadata][_id]}"
  }
}

これでmysql側に変更があった場合は自動でElasticsearchに同期される。

elasticsearch

検索の際はis_deletedがfalseのデータを対象とするように検索条件を設定する。

{
  "query": {
    "bool": {
      "must": [
        {
          "term": {
            "is_deleted": "false"
          }
        },
        {
          "match": {
            "name": "test"
          }
        }
      ],
      "must_not": [],
      "should": []
    }
  },
  "from": 0,
  "size": 10,
  "sort": [],
  "aggs": {}
}

参考


Related #elasticsearch

docker-composeでElasticsearchの検証環境を作る

手軽に全文検索エンジンの検証環境ができた

検証用のElasticsearchにUnassigned shardsが発生した

レプリカ数を0に変更して対応した