中文说明 | English Readme
同步 MariaDB/MySQL 表的数据到 ElasticSearch, 不支持物理删除(物理删除需要根据 binlog 才能处理), 建议使用逻辑删除(业务系统使用逻辑删除本身就是一件很自然的事), 比如在表中添加「deleted(1 已删除, 默认是 0)」字段.
基于 jdk 8 和 spring boot
- 支持多表(分表), 暂不支持分库
- 支持 路由(routing)
- 支持 数据版本(index-version)
- 支持 索引模板(template)
- 支持 嵌套类型(nested)
git clone https://github.com/liuanxin/mysql2es.git
cd mysql2es
# 查看所有分支 --> 对应 es 版本
echo "`git branch`"
git checkout ${es_version}
mvn clean package -DskipTests
# 更改 application-prod.yml 成你自己的配置
nohup java -jar -Dspring.profiles.active=prod target/mysql2es.jar >/dev/null 2>&1 &
或者
# 添加你的配置文件到任意地方, 如: ~/application.yml(建议基于 application-prod.yml 修改即可)
nohup java -jar -Dspring.config.location=~/application.yml target/mysql2es.jar >/dev/null 2>&1 &
# 日志在 ~/logs/mysql2es.log 中
建议先在 ElasticSearch 中建好 index 的 scheme(如果想基于数据库表字段类型来生成可以在配置中设置 scheme
为 true
).
系统启动后会根据定时规则来同步数据, 同步时基于 sql 拼接增量字段来获取分页数据并批量写入 ElasticSearch 直到没有数据为止, 最后的记录会存起来(mysql 或临时文件, 前者会自动生成数据库表, 后者默认在 /tmp 下, 使用 -Djava.io.tmpdir=/path 修改) 供下次同步时使用(如果到了下次运行时间, 但上次还没有运行结束将会顺延)
相关的配置如下:
spring:
# mysql 配置见: org.springframework.boot.autoconfigure.jdbc.DataSourceProperties 和 com.zaxxer.hikari.HikariConfig
datasource:
... 数据库配置 ...
# es 配置见: org.springframework.boot.autoconfigure.elasticsearch.rest.RestClientProperties
elasticsearch.rest:
uris: 192.168.1.2:9200
db2es:
# 是否启用同步, 不设置则默认是 true
enable: false
# 增量数据保存在哪(temp_file 或 mysql), 不设置则保存在临时文件(默认在 /tmp 下, 使用 -Djava.io.tmpdir=/path 自定义位置), 只支持保存在临时文件或 mysql(会自动生成 t_db_to_es 表)
increment-type: mysql
# 可以不设定, 定时任务的表达式, 默认是每分钟执行一次
cron: 0/5 * * * * *
# 是否启用数据补偿, 默认是 false, 当使用时间戳进行同步时用到, 如果长事务比较多, 会出现同步服务处理过了某个时间, 应用服务器才提交事务. 这会导致数据不一致
enable-compensate: true
# 数据补偿用到的定时任务的表达式
compensate-cron: 1 * * * * *
# 同步时间跟当前时间的间隔在这个值以内时才开始进行数据补偿, 单位: 秒. 默认 1200(20 分钟)
begin-interval-second: 600
# 数据补偿时往前的时间数, 单位: 秒. 默认 300(5 分钟)
compensate-second: 60
# 如果设置为 false 则写入时不做 es 的版本检查, 默认是 true, 依赖索引中的 version-column 列
version-check: false
relation:
-
# 同步时间跟当前时间的间隔在这个值以内时才开始进行数据补偿, 当配置了这个值将不再使用上面全局的配置, 单位: 秒
begin-interval-second: 1800
# 数据补偿时往前的时间数, 当配置了这个值将不再使用上面全局的配置, 单位: 秒
compensate-second: 1500
# *** 必须设定且要有主键. 主键会生成 es 中 /index/type/id 的 id, 如果是多列主键会用 "-" 拼接, 可以使用 % 做为通配来匹配多张表(当分表时)
table: t_order
# *** 必须设定. 表示用来做数据增量操作时用, 一般使用自增 id 或 time(更新时间戳)
increment-column: id
# 从版本 6.0 开始, type 替换成了 _doc, 索引名直接对应数据库表名
# 可以不设定. 表示 es 中 /index/type/id 的 index, 不设定将会从数据库表名生成(t_some_one ==> some-one), 6.0 开始 index name 必须是小写
index: order
# 可以不设定. 是否在启动时基于 数据库表结构 生成 es 的 scheme, 默认是 false, 建议先在 es 中建立好索引的 scheme
scheme: true
# 可以不设定. 自定义的 sql 语句(不要用 ORDER BY 和 LIMIT, 会基于 increment-column 自动添加), 不设定将会基于 table 来拼装
sql: "select o.id, o.order_no, o.order_status, ifnull(o.price,0) price, ifnull(o.sum,0) sum,
o.create_time, unix_timestamp(o.update_time) update_time, ifnull(o.pay_time,0) pay_time,
ifnull(o.ship_time,0) ship_time, ifnull(o.receipt_time,0) receipt_time, ifnull(o.success_time,0) success_time,
oa.receiver, oa.province, oa.city, oa.area, oa.address, oa.phone
from t_order o
left join t_order_address oa on o.id = oa.order_id"
# 可以不设定. 如果 sql 有使用多个表 join, 主表的别名
table-alias: o
# 可以不设定. 一次从数据库获取 及 同步进 es 的条数, 默认是 1000
limit: 2000
# 可以不设定. 用来生成 index 的 id 列, 不设置将会自动从表中获取, 当表中有主键又有多列唯一索引, 想用唯一索引来做 index 的 id 时可以使用此配置
id-column: c1,c2
# 可以不设定. 当想在 index 的 id 上加前缀时使用
id-prefix:
# 可以不设定. true 表示将表名的通配数据做为 id 的一部分(比如上面的 table 使用 t_order_% 通配, 则表 t_order_2016 同步时 2016 将做为 id 的前缀), 默认是 true
pattern-to-id: false
# 可以不设定. 当想在 index 的 id 上加后缀时使用
id-suffix:
# 可以不设定. 数据保存进 es 时用来做分片的字段, 多个用逗号隔开
route-column: c1,c2
# 可以不设定. 数据保存进 es 时用来表示版本的字段, id 或 时间戳
version-column: updated
# 可以不设定. 使用 es 模板时用来做数据的字段名
template-column: created
# 可以不设定. 使用 es 模板时用来做数据的字段名使用的模式, 当用在日期字段时有用
template-pattern: yyyy_MM
# 属性是否转为驼峰, true 则将表中的 user_name 转换成 userName, 默认是 false
column-lower-camel: true
# 可以不设定. 默认将会从表字段生成, 只设置特殊情况即可
mapping:
# 表字段(如果有别名则使用别名): es 列
c_type: type
# 可以不设定. 上面的 sql 中不想写入索引的字段(如果字段有别名则用别名)
ignore-column: c1,c2
# 可以不设定. sql 中 limit start,1000 里的 start 超出这个值就将 sql 优化成 inner join 的方式, 默认是 2000
big-count-to-sql: 10000
# 可以不设定. 主键名, 当表数据很多时使用 <LIMIT 1000万, 1000> 效率会很慢, 会基于这个字段优化 sql 语句, 默认是 id
primary-key: id
# 原来的 sql: SELECT ... FROM t_product WHERE time > '2010-01-01 00:00:01' LIMIT 1000万, 1000
# 优化的 sql: SELECT ... FROM t_product c inner join (SELECT id FROM t_product WHERE time > '2010-01-01 00:00:01' LIMIT 1000万, 1000) t on t.id = c.id
# 原先的 sql 执行时先通过索引找到 id, 再去存数据的物理块取记录, 最后在结果集里偏移 1000万 后再取 1000 条, 所以效率好不了
# 优化的 sql 括号中偏移 1000万 全都是基于覆盖索引, 之后再用 id 联表取数据, 因此这样是很快的
# 与主表 一对一 关联的子表数据, 最终会跟主表数据平级(在上面的 sql 中使用 left join 也可以, 如果 left join 的 sql 查询性能不如单表查询时, 可以使用此种方式)
relation-mapping:
address:
main-field: id
sql: SELECT receiver, province, city, area, address, phone FROM t_order_address
child-field: order_id
# 与主表 多对一 关联的子表数据, 最终会组装成 List 属性(对应 nested 结构)
nested-mapping:
# 如果有 t_order 和 t_order_item 表, t_order 主键是 id, t_order_item 关联字段是 order_id, 则 main-field 是 id, child-field 是 order_id
item:
main-field: id
sql: "SELECT oi.sku_id, p.name, oi.unit_price, oi.num, oi.total
FROM t_order_item oi LEFT JOIN t_product p ON ps.product_id = p.id"
child-field: order_id
-
table: t_product
increment-column: update_time
cron 的说明如下
.------------------- second (0 - 59) if (0/10) then (0, 10, 20, 30, 40, 50) run . .---------------- minute (0 - 59) . . .------------- hour (0 - 23) . . . .---------- day of month (1 - 31) . . . . .------- month (1 - 12) OR jan,feb,mar,apr,may,jun,jul,aug,sep,oct,nov,dec . . . . . .---- day of week (0 - 6) (Sunday=0 or 7) OR sun,mon,tue,wed,thu,fri,sat . . . . . . ? * * * * * 如 0/5 * * * * * 表示每 5 秒运行一次
es 索引相关的 scheme 示例如下
DELETE /order
PUT /order
{
"settings": {
"number_of_shards": "5",
"number_of_replicas": "0",
"analysis": {
"normalizer": {
"self_normalizer": {
"type": "custom",
"filter": ["trim", "lowercase"]
}
}
}
},
"mappings": {
"properties": {
"id": {
"type": "long"
},
"order_no": {
"type": "keyword",
"normalizer": "self_normalizer"
},
"order_status": {
"type": "integer"
},
"create_time": {
"type": "date",
"format": "epoch_millis||yyyy-MM-dd||yyyy-MM-dd HH:mm:ss||yyyy-MM-dd HH:mm:ss.SSS"
},
"pay_time": {
"type": "date",
"format": "epoch_millis||yyyy-MM-dd||yyyy-MM-dd HH:mm:ss||yyyy-MM-dd HH:mm:ss.SSS"
},
"receiver": {
"type": "keyword",
"normalizer": "self_normalizer"
},
"address": {
"type": "keyword",
"normalizer": "self_normalizer"
},
"phone": {
"type": "keyword",
"normalizer": "self_normalizer"
},
"item": {
"type": "nested",
"properties": {
"sku_id": {
"type": "long"
},
"name": {
"type": "text",
"normalizer": "self_normalizer"
},
"sku_desc": {
"type": "keyword",
"normalizer": "self_normalizer"
}
}
}
}
}
}
POST /_aliases
{
"actions" : [
{ "remove" : { "index" : "order", "alias" : "old_order_query" } },
{ "add" : { "index" : "order", "alias" : "new_order_query" } }
]
}
基于模板按月建索引, 对应用程序使用别名的 es 示例如下
DELETE /_template/order
PUT /_template/order
{
"index_patterns": [ "order_*" ],
"aliases": {
"order_query": {}
},
"settings": {
"number_of_shards": "1",
"number_of_replicas": "0",
"analysis": {
"normalizer": {
"self_normalizer": {
"type": "custom",
"filter": ["trim", "lowercase"]
}
}
}
},
"mappings": {
"properties": {
"id": {
"type": "long"
},
"order_no": {
"type": "keyword",
"normalizer": "self_normalizer"
},
"order_status": {
"type": "integer"
}
}
}
}