Logstashを使ってmysqlのデータをElasticsearchに同期する
mysqlのデータをElasticsearchのインデックスとして登録する際に、Logstashを使うと手軽に登録ができる。
ただ、データの新規追加、更新時、削除時にうまく同期させるには工夫が必要だったので対応方法をまとめておく。
mysql
更新日時の追加
どこまで同期済みかを判定するために更新日時カラムを用意しておく。
何かしらのフレームワークを使っている場合はupdated_atが自動で登録されていると思うが、ない場合は以下のsqlを実行してupdated_atを追加する。
ALTER TABLE {table} ADD updated_at TIMESTAMP NOT NULL DEFAULT CURRENT_TIMESTAMP ON UPDATE CURRENT_TIMESTAMP;
削除フラグの追加
mysql側でデータを削除した場合にLogstashでは削除を検知できないので、削除フラグを追加しておく。
ALTER TABLE {table} ADD is_deleted tinyint(1) NOT NULL DEFAULT '0';
Logstash
updated_atが最終取り込み日時よりも新しいデータを取り込み対象とするようにstatementを記述する。
scheduleにはcron風の実行頻度条件を記述できる。以下の例では5秒に一回取り込みが実施される。
input {
jdbc {
jdbc_driver_library => "/path/mysql-connector-java-8.0.17.jar"
jdbc_driver_class => "com.mysql.cj.jdbc.Driver"
jdbc_connection_string => "jdbc:mysql://{host}:3306/{db}"
jdbc_user => {user}
jdbc_password => {pass}
jdbc_paging_enabled => true
tracking_column => "unix_ts_in_secs"
use_column_value => true
tracking_column_type => "numeric"
schedule => "*/5 * * * * *"
statement => "SELECT *, UNIX_TIMESTAMP(updated_at) AS unix_ts_in_secs FROM {table} WHERE (UNIX_TIMESTAMP(updated_at) > :sql_last_value AND updated_at < NOW()) ORDER BY updated_at ASC"
}
}
filter {
mutate {
copy => { "id" => "[@metadata][_id]"}
remove_field => ["id", "@version", "unix_ts_in_secs"]
}
}
output {
elasticsearch {
index => "{index}"
document_id => "%{[@metadata][_id]}"
}
}
これでmysql側に変更があった場合は自動でElasticsearchに同期される。
elasticsearch
検索の際はis_deletedがfalseのデータを対象とするように検索条件を設定する。
{
"query": {
"bool": {
"must": [
{
"term": {
"is_deleted": "false"
}
},
{
"match": {
"name": "test"
}
}
],
"must_not": [],
"should": []
}
},
"from": 0,
"size": 10,
"sort": [],
"aggs": {}
}