Appearance
MySQL 数据怎么同步到 ES?
很多业务会把 MySQL 作为主存储,把 ES 作为搜索引擎。这样做的关键问题是:MySQL 数据变化后,怎么可靠地同步到 ES?
这篇以 Canal 为例,讲一条常见同步链路。
为什么不是直接写 ES?
如果业务写入时同时写 MySQL 和 ES,看起来简单:
text
业务服务 -> MySQL
业务服务 -> Elasticsearch但这会带来一致性问题:
- MySQL 写成功,ES 写失败怎么办?
- ES 写成功,MySQL 回滚怎么办?
- 服务重试导致 ES 重复写怎么办?
- 多个服务都要写 ES,逻辑散落在哪里?
更常见的做法是让 MySQL 成为权威数据源,通过 binlog 捕获变化,再同步到 ES:
text
业务服务 -> MySQL -> binlog -> Canal -> 同步程序 / MQ -> Elasticsearch这样业务写入链路更干净,ES 最终追上 MySQL。
Canal 是什么?
Canal 会伪装成 MySQL 的从库,订阅 MySQL binlog。MySQL 表发生 insert、update、delete 后,Canal 可以拿到对应变更事件。
同步程序消费这些事件,再转换成 ES 的写入请求。
MySQL 需要开启 binlog
自建 MySQL 需要开启 binlog,并使用 ROW 模式:
ini
[mysqld]
log-bin=mysql-bin
binlog-format=ROW
server_id=1ROW 模式会记录每行数据变化,更适合同步到 ES。STATEMENT 模式只记录 SQL,解析复杂且容易受上下文影响。
创建 Canal 账号:
sql
CREATE USER 'canal'@'%' IDENTIFIED BY 'canal_password';
GRANT SELECT, REPLICATION SLAVE, REPLICATION CLIENT ON *.* TO 'canal'@'%';
FLUSH PRIVILEGES;生产环境要使用强密码,并限制来源 IP。
Canal 关键配置
Canal instance 的核心配置大概包括:
properties
canal.instance.mysql.slaveId=1234
canal.instance.master.address=127.0.0.1:3306
canal.instance.dbUsername=canal
canal.instance.dbPassword=canal_password
canal.instance.connectionCharset=UTF-8
canal.instance.filter.regex=app_db\.article_.*
canal.instance.filter.black.regex=mysql\.slave_.*几个点要注意:
- slaveId 不要和已有 MySQL 从库重复。
- filter.regex 只订阅需要同步的库表,减少无用流量。
- 如果只关心部分表,不要用
.*\\..*全量订阅。 - Canal 和 MySQL 网络要稳定,否则会增加同步延迟。
同步程序怎么写?
同步程序一般做这些事:
- 从 Canal 拉取一批 binlog 事件。
- 解析库名、表名、事件类型、主键和字段值。
- 把 MySQL 行转换成 ES 文档。
- 使用 bulk API 批量写入 ES。
- 成功后 ack Canal 消息。
- 失败时不 ack,或进入重试和死信流程。
伪代码:
text
while running:
message = canal.getWithoutAck(batchSize)
try:
bulkRequests = convert(message.entries)
es.bulk(bulkRequests)
canal.ack(message.batchId)
catch error:
canal.rollback(message.batchId)
retry_or_alarm(error)关键是:ES 写入成功后再 ack。否则同步程序崩溃时,可能丢失还没写入 ES 的变更。
insert/update/delete 怎么映射?
常见规则:
- insert:向 ES index 文档。
- update:按同一个
_idindex 覆盖文档,或 partial update。 - delete:按
_id删除 ES 文档。
建议 ES 文档 _id 使用 MySQL 主键或稳定业务 ID:
text
_id = articleId这样重复消费同一条 insert/update 是幂等的,最多覆盖同一文档,不会产生重复数据。
如果一个 ES 文档由多张 MySQL 表拼出来,比如文章主表 + 标签表 + 作者表,同步会复杂很多。此时通常需要:
- 明确哪张表变化会影响哪个 ES 文档。
- 变更时回查 MySQL 拼完整文档。
- 对同一文档的并发更新做版本控制或顺序控制。
为什么建议回查 MySQL 拼完整文档?
binlog 事件里通常只有发生变化的行。对于复杂搜索文档,ES 里需要的是完整视图。
比如文章 ES 文档包含:
json
{
"articleId": "1001",
"title": "ES 同步",
"authorName": "小林",
"tags": ["搜索", "数据库"],
"publishTime": "2025-05-16 10:00:00"
}如果标签表变了,只靠标签表 binlog 不一定能拿到完整文章标题、作者信息。更稳的方式是根据 articleId 回查 MySQL,重新构建完整 ES 文档,然后覆盖写入。
这样牺牲一些查询 MySQL 的成本,换来同步逻辑简单和文档一致性更好。
顺序和版本问题
binlog 本身是有顺序的,但同步链路中如果引入 MQ、多线程消费、批量写 ES,就可能发生乱序。
比如:
text
update title = A
update title = B如果 B 先写入 ES,A 后写入 ES,最终 ES 就会变成旧数据。
解决思路:
- 同一主键路由到同一消费分区,保证单 key 顺序。
- ES 文档里保存 updateTime 或 version,旧版本不覆盖新版本。
- 使用 MySQL binlog 位点、时间戳或业务版本号做冲突判断。
简单业务里,同步程序单线程或按主键分区就够了;高并发场景必须认真设计顺序。
删除怎么处理?
物理删除时,binlog delete 事件可以直接删除 ES 文档:
http
DELETE /articles/_doc/1001如果业务使用软删除,比如 deleted = 1,ES 可以有两种选择:
- 同步 deleted 字段,查询时过滤
deleted = 0。 - 收到软删除事件后直接删除 ES 文档。
搜索系统通常更喜欢第二种,避免所有查询都带一个删除过滤条件。但如果后台需要搜索已删除数据,就要保留字段。
全量同步和增量同步
第一次接入 ES 时,通常要先做全量同步,再接增量。
一种常见流程:
- 记录当前 binlog 位点或当前时间。
- 从 MySQL 全量查询数据,批量写入 ES。
- 从记录的位点开始消费 Canal 增量。
- 增量追平后,对外切换搜索接口。
如果没有位点控制,只靠 updateTime 追增量,也要保证所有更新都会刷新 updateTime。
Bulk 写入要控制大小
ES bulk 不是越大越好。单批太小,请求开销大;单批太大,内存、网络和线程池压力大。
经验上可以从这些范围压测:
- 单批 500 到 5000 条。
- 单批大小控制在几 MB 到十几 MB。
- 观察 ES bulk 线程池、拒绝数、写入延迟和 GC。
失败要能拆分重试,不能一批里一条失败就无限重试整批。
一致性怎么校验?
同步链路上线后,要有校验机制:
- 对比 MySQL 和 ES 总数。
- 按时间范围抽样对比。
- 随机抽主键回查 MySQL 和 ES。
- 记录同步延迟。
- 对失败事件报警。
- 定期做补偿任务。
ES 是最终一致系统,不代表可以不校验。没有校验的同步,早晚会悄悄漂移。
小结
MySQL 同步 ES 的核心思想是:MySQL 做权威数据源,ES 做搜索视图。
落地时要抓住这些点:
- MySQL 开启 ROW binlog。
- Canal 订阅必要表。
- 同步程序把行变化转换成 ES 文档。
- ES
_id使用稳定业务主键,保证幂等。 - 复杂文档建议回查 MySQL 拼完整视图。
- 注意消费顺序、失败重试、删除语义和一致性校验。
同步链路一旦设计好,业务就可以放心把搜索能力交给 ES,而不是把复杂查询硬塞给 MySQL。