Skip to content

MySQL 数据怎么同步到 ES?

很多业务会把 MySQL 作为主存储,把 ES 作为搜索引擎。这样做的关键问题是:MySQL 数据变化后,怎么可靠地同步到 ES?

这篇以 Canal 为例,讲一条常见同步链路。

为什么不是直接写 ES?

如果业务写入时同时写 MySQL 和 ES,看起来简单:

text
业务服务 -> MySQL
业务服务 -> Elasticsearch

但这会带来一致性问题:

  • MySQL 写成功,ES 写失败怎么办?
  • ES 写成功,MySQL 回滚怎么办?
  • 服务重试导致 ES 重复写怎么办?
  • 多个服务都要写 ES,逻辑散落在哪里?

更常见的做法是让 MySQL 成为权威数据源,通过 binlog 捕获变化,再同步到 ES:

text
业务服务 -> MySQL -> binlog -> Canal -> 同步程序 / MQ -> Elasticsearch

这样业务写入链路更干净,ES 最终追上 MySQL。

Canal 是什么?

Canal 会伪装成 MySQL 的从库,订阅 MySQL binlog。MySQL 表发生 insert、update、delete 后,Canal 可以拿到对应变更事件。

同步程序消费这些事件,再转换成 ES 的写入请求。

MySQL 需要开启 binlog

自建 MySQL 需要开启 binlog,并使用 ROW 模式:

ini
[mysqld]
log-bin=mysql-bin
binlog-format=ROW
server_id=1

ROW 模式会记录每行数据变化,更适合同步到 ES。STATEMENT 模式只记录 SQL,解析复杂且容易受上下文影响。

创建 Canal 账号:

sql
CREATE USER 'canal'@'%' IDENTIFIED BY 'canal_password';
GRANT SELECT, REPLICATION SLAVE, REPLICATION CLIENT ON *.* TO 'canal'@'%';
FLUSH PRIVILEGES;

生产环境要使用强密码,并限制来源 IP。

Canal 关键配置

Canal instance 的核心配置大概包括:

properties
canal.instance.mysql.slaveId=1234
canal.instance.master.address=127.0.0.1:3306
canal.instance.dbUsername=canal
canal.instance.dbPassword=canal_password
canal.instance.connectionCharset=UTF-8
canal.instance.filter.regex=app_db\.article_.*
canal.instance.filter.black.regex=mysql\.slave_.*

几个点要注意:

  • slaveId 不要和已有 MySQL 从库重复。
  • filter.regex 只订阅需要同步的库表,减少无用流量。
  • 如果只关心部分表,不要用 .*\\..* 全量订阅。
  • Canal 和 MySQL 网络要稳定,否则会增加同步延迟。

同步程序怎么写?

同步程序一般做这些事:

  1. 从 Canal 拉取一批 binlog 事件。
  2. 解析库名、表名、事件类型、主键和字段值。
  3. 把 MySQL 行转换成 ES 文档。
  4. 使用 bulk API 批量写入 ES。
  5. 成功后 ack Canal 消息。
  6. 失败时不 ack,或进入重试和死信流程。

伪代码:

text
while running:
  message = canal.getWithoutAck(batchSize)
  try:
    bulkRequests = convert(message.entries)
    es.bulk(bulkRequests)
    canal.ack(message.batchId)
  catch error:
    canal.rollback(message.batchId)
    retry_or_alarm(error)

关键是:ES 写入成功后再 ack。否则同步程序崩溃时,可能丢失还没写入 ES 的变更。

insert/update/delete 怎么映射?

常见规则:

  • insert:向 ES index 文档。
  • update:按同一个 _id index 覆盖文档,或 partial update。
  • delete:按 _id 删除 ES 文档。

建议 ES 文档 _id 使用 MySQL 主键或稳定业务 ID:

text
_id = articleId

这样重复消费同一条 insert/update 是幂等的,最多覆盖同一文档,不会产生重复数据。

如果一个 ES 文档由多张 MySQL 表拼出来,比如文章主表 + 标签表 + 作者表,同步会复杂很多。此时通常需要:

  • 明确哪张表变化会影响哪个 ES 文档。
  • 变更时回查 MySQL 拼完整文档。
  • 对同一文档的并发更新做版本控制或顺序控制。

为什么建议回查 MySQL 拼完整文档?

binlog 事件里通常只有发生变化的行。对于复杂搜索文档,ES 里需要的是完整视图。

比如文章 ES 文档包含:

json
{
  "articleId": "1001",
  "title": "ES 同步",
  "authorName": "小林",
  "tags": ["搜索", "数据库"],
  "publishTime": "2025-05-16 10:00:00"
}

如果标签表变了,只靠标签表 binlog 不一定能拿到完整文章标题、作者信息。更稳的方式是根据 articleId 回查 MySQL,重新构建完整 ES 文档,然后覆盖写入。

这样牺牲一些查询 MySQL 的成本,换来同步逻辑简单和文档一致性更好。

顺序和版本问题

binlog 本身是有顺序的,但同步链路中如果引入 MQ、多线程消费、批量写 ES,就可能发生乱序。

比如:

text
update title = A
update title = B

如果 B 先写入 ES,A 后写入 ES,最终 ES 就会变成旧数据。

解决思路:

  • 同一主键路由到同一消费分区,保证单 key 顺序。
  • ES 文档里保存 updateTime 或 version,旧版本不覆盖新版本。
  • 使用 MySQL binlog 位点、时间戳或业务版本号做冲突判断。

简单业务里,同步程序单线程或按主键分区就够了;高并发场景必须认真设计顺序。

删除怎么处理?

物理删除时,binlog delete 事件可以直接删除 ES 文档:

http
DELETE /articles/_doc/1001

如果业务使用软删除,比如 deleted = 1,ES 可以有两种选择:

  • 同步 deleted 字段,查询时过滤 deleted = 0
  • 收到软删除事件后直接删除 ES 文档。

搜索系统通常更喜欢第二种,避免所有查询都带一个删除过滤条件。但如果后台需要搜索已删除数据,就要保留字段。

全量同步和增量同步

第一次接入 ES 时,通常要先做全量同步,再接增量。

一种常见流程:

  1. 记录当前 binlog 位点或当前时间。
  2. 从 MySQL 全量查询数据,批量写入 ES。
  3. 从记录的位点开始消费 Canal 增量。
  4. 增量追平后,对外切换搜索接口。

如果没有位点控制,只靠 updateTime 追增量,也要保证所有更新都会刷新 updateTime。

Bulk 写入要控制大小

ES bulk 不是越大越好。单批太小,请求开销大;单批太大,内存、网络和线程池压力大。

经验上可以从这些范围压测:

  • 单批 500 到 5000 条。
  • 单批大小控制在几 MB 到十几 MB。
  • 观察 ES bulk 线程池、拒绝数、写入延迟和 GC。

失败要能拆分重试,不能一批里一条失败就无限重试整批。

一致性怎么校验?

同步链路上线后,要有校验机制:

  • 对比 MySQL 和 ES 总数。
  • 按时间范围抽样对比。
  • 随机抽主键回查 MySQL 和 ES。
  • 记录同步延迟。
  • 对失败事件报警。
  • 定期做补偿任务。

ES 是最终一致系统,不代表可以不校验。没有校验的同步,早晚会悄悄漂移。

小结

MySQL 同步 ES 的核心思想是:MySQL 做权威数据源,ES 做搜索视图。

落地时要抓住这些点:

  • MySQL 开启 ROW binlog。
  • Canal 订阅必要表。
  • 同步程序把行变化转换成 ES 文档。
  • ES _id 使用稳定业务主键,保证幂等。
  • 复杂文档建议回查 MySQL 拼完整视图。
  • 注意消费顺序、失败重试、删除语义和一致性校验。

同步链路一旦设计好,业务就可以放心把搜索能力交给 ES,而不是把复杂查询硬塞给 MySQL。