1、背景
1.1 业务背景
近期在处理业务需求时,涉及储区储位的迁移工作。由于业务中存在“一品多储位、一储位多品”的复杂关系,且数据量较大,原由库存维护的储位数据需要整体迁移至履约团队进行统一管理。本次迁移的核心技术难点在于在有业务流量使用,不停机的数据库层面的数据迁移,如何保证数据一致性等。
1.2 数据迁移概述
随着业务的发展,系统重构与数据迁移几乎都会遇到。无论是分库分表、数据异构、老系统重构,还是大表结构变更,停机会影响业务,不停机数据迁移都是保障业务连续性的关键环节。尤其对于订单、商品、支付等有状态的核心数据,其一致性和可用性直接关系到公司业务的稳定运行。
数据迁移的常见场景包括:
- 分库分表
- 数据异构(如 MySQL → ES)
- 系统重构(新建表结构替换老系统)
- 大表结构变更
- 数据库迁移(同构或异构,如 MySQL → MySQL、MySQL → JED、MySQL → 大数据平台等)

1.3 技术选型与场景分析
根据迁移的同构/异构性质、数据量大小以及业务对停机的容忍度,可以选择不同的技术方案。
下表总结了常见场景的推荐方案及核心考量点:
数据迁移选型需综合考量数据库类型(同构/异构)、数据规模、一致性要求。
| 场景 | 推荐方案 |
| 同构、在线、大数据量 | 云服务DTS |
| 异构、在线、大数据量 | CDC组件 + 同步引擎 |
| 异构、定制化需求 | 自研程序 + Canal / Debezium |
| 复杂转换逻辑/非主流库 | ETL工具(Kettle、DataX) |
| 同构、小数据量、可停机 | 逻辑导出导入(mysqldump、pg_dump) |
方案如何选择采取
- 判断同构/异构
- 若为同构迁移(如 MySQL → MySQL),优先考虑数据库原生工具或云厂商的 DTS 服务。
- 若为异构迁移,则进入下一步。
- 评估业务场景与迁移目标
- 如果是上云场景,或在主流数据库之间迁移(如 Oracle → MySQL),强烈推荐使用云厂商的 DTS 服务,它能解决数据一致性、增量同步、切换可视化等绝大多数痛点。
- 如果数据转换逻辑非常复杂,或目标数据库为非主流存储,则应考虑 ETL 工具(如 Kettle)或自研方案,以获得最大的灵活性。
若不采取DTS,CDC的方式,结合业界常用技术组件,迁移过程中,借助消息队列或者binlog去做,避免侵入业务,降低复杂性。
1.4 数据迁移核心挑战
| 挑战维度 | 具体问题 | 影响程度 |
| 数据一致性 | 迁移过程中数据丢失或不一致 | 高 |
| 业务连续性 | 迁移导致服务停机 | 极高 |
| 数据量级 | 海量数据迁移时间窗口 | 中 |
| 关联关系 | 表之间的复杂的业务逻辑 | 高 |
| 回滚能力 | 出现问题如何快速恢复 | 极高 |
二、迁移方案总览
2.1 整体架构图

2.2 数据迁移过程

迁移过程概述:
- 初始化目标表:创建储区储位表。
- 存量迁移:用源表的数据初始化目标表。
- 存量数据校验:执行一次校验,并且修复数据,此时用源表数据修复目标表数据。
- 业务双写:业务代码开启双写,同时写目标表和源表,此时读源表,数据以源表为准。
- 增量校验:开启增量校验和数据修复,业务校验,保持一段时间。
- 切换双写顺序:此时读目标表,并且先写目标表,数据以目标表为准。
- 保持增量校验和数据修复。
- 切换为目标表单写,停掉源表写入,读写以目标表为准。
三、迁移实施方案
3.1 初始化目标表:创建储区储位表
CREATE TABLE `station_sku_cell` (
`id` bigint(20) NOT NULL AUTO_INCREMENT COMMENT '主键ID',
`org_code` varchar(20) NOT NULL COMMENT '商家编码',
`station_id` bigint(20) NOT NULL COMMENT '中台门店id',
`sku_id` bigint(20) NOT NULL COMMENT '商品skuId',
`sku_name` varchar(200) NOT NULL DEFAULT '' COMMENT '商品名称',
`out_sku_id` varchar(50) DEFAULT NULL COMMENT '商家商品编号',
`upc` varchar(1024) DEFAULT NULL COMMENT '条形码',
`area_code` varchar(64) NOT NULL COMMENT '储区编码',
`cell_code` varchar(64) NOT NULL COMMENT '储位编码',
`create_time` datetime NOT NULL DEFAULT CURRENT_TIMESTAMP COMMENT '创建时间',
`create_pin` varchar(50) NOT NULL DEFAULT '' COMMENT '创建人',
`update_time` datetime NOT NULL DEFAULT CURRENT_TIMESTAMP ON UPDATE CURRENT_TIMESTAMP COMMENT '更新时间',
`update_pin` varchar(50) NOT NULL DEFAULT '' COMMENT '更新人',
`sys_version` int(11) NOT NULL DEFAULT '0' COMMENT '版本号',
`yn` tinyint(4) NOT NULL DEFAULT '0' COMMENT '逻辑删除标示位',
`ts` timestamp NOT NULL DEFAULT CURRENT_TIMESTAMP ON UPDATE CURRENT_TIMESTAMP COMMENT '时间戳',
PRIMARY KEY (`id`),
KEY `idx_org_station_cell` ( `org_code`,`station_id`,`cell_code`),
KEY `idx_org_station_sku` ( `org_code`,`station_id`,`sku_id`)
) ENGINE=InnoDB AUTO_INCREMENT=1 DEFAULT CHARSET=utf8mb4 COMMENT='门店商品与储位关系表';
3.2 存量数据迁移
初始化表之后,怎么把源表数据导入源表数据呢,这里涉及到实时性,在低峰期的时候,通过调用的接口的同步目标表数据。如果不在低峰期,通过动态监控到MySQL的CPU,磁盘等,当cpu不高的时候写入,避免对业务产生影响。更新的时候,要注意避免旧的数据把新数据覆盖(比对版本号or更新时间)。
| 方案 | 实现方式 | 特点 |
| 方案一:历史备份 | 使用数据库现有备份文件恢复数据 | • 优点:对源系统无影响 • 缺点:数据可能不是最新的 |
| 方案二:源表导出 | 直接导出源表数据(SQL/CSV等格式) | • 优点:操作简单直接 • 缺点:可能影响源库性能 |
| 方案三:接口调用 | 通过业务接口,按商家、门店维度逐个获取并同步写入 | • 优点:数据实时性最好、业务逻辑完整、可按维度控制、便于问题排查、可灰度发布 • 缺点:速度相对较慢、对源系统有压力 |
部分核心代码展示
/**
* 同步储位信息
*
* @param orgCode 中台商家编码
* @param stationIds 门店ids
*/
ServiceResponse<Void> syncStationSkuCell(String orgCode, List<Integer> stationIds);
public void syncStationSkuCell(String orgCode, List<Integer> stationIds) {
long nextId = 0;
long size = 0;
while (true) {
logger.info("商品与储位关系-开始同步,nextId:{}, size:{}", nextId, size);
// 分页查询数据
List<StationSkuStock> stockList = stationSkuStockDao.selectByCellNotNullWithPagination(orgCode, stationIds, nextId);
if (CollectionUtils.isEmpty(stockList)) {
logger.info("商品与储位关系-同步完成");
return;
}
// 计算下一次 nextId
nextId = stockList.get(stockList.size() - 1).getId();
size += stockList.size();
// 构造待验证的记录列表
List<Map<String, Object>> checkRecords = stockList.stream()
.map(stock -> {
Map<String, Object> record = new HashMap<>();
record.put("orgCode", stock.getOrgCode());
record.put("stationId", stock.getStationId());
record.put("skuId", stock.getSkuId());
record.put("cellCode", stock.getChuWei());
return record;
})
.collect(Collectors.toList());
// 批量查询已存在的记录
List<StationSkuCell> existingRecords = stationSkuCellDao.batchCheckExists(checkRecords);
Set<String> existingKeys = new HashSet<>();
if (CollectionUtils.isNotEmpty(existingRecords)) {
// 构造已存在记录的 Set 用于快速比对
existingKeys = existingRecords.stream()
.map(record -> buildKey(record.getOrgCode(), record.getStationId(), record.getSkuId(), record.getCellCode()))
.collect(Collectors.toSet());
}
// 过滤掉已存在的记录
Set<String> finalExistingKeys = existingKeys;
List<StationSkuCell> needInsertCellList = stockList.stream()
.filter(stock -> !finalExistingKeys.contains(buildKey(stock.getOrgCode(), stock.getStationId(), stock.getSkuId(), stock.getChuWei())))
.map(stock -> {
StationSkuCell cell = new StationSkuCell();
cell.setOrgCode(stock.getOrgCode());
cell.setStationId(stock.getStationId());
cell.setSkuId(stock.getSkuId());
cell.setSkuName(stock.getSkuName());
cell.setOutSkuId(stock.getOutSkuId());
cell.setUpc(stock.getUpc());
if (StringUtils.isEmpty(stock.getChuQu())) {
StationCellInfo cellParam = new StationCellInfo();
cellParam.setStationId(stock.getStationId());
cellParam.setAreaCode(stock.getChuWei());
cellParam.setType(StationCellTypeEnum.CELL.getCode());
cellParam.setYn(false);
StationCellInfo stationCellInfo = stationCellInfoDao.getStationCellInfo(cellParam);
if (stationCellInfo != null) {
cell.setAreaCode(stationCellInfo.getBelongAreaCode());
} else {
cell.setAreaCode(stock.getChuWei());
}
} else {
cell.setAreaCode(stock.getChuQu());
}
cell.setCellCode(stock.getChuWei());
cell.setCreateTime(new Date());
cell.setCreatePin(Constant.WMS);
cell.setUpdateTime(new Date());
cell.setUpdatePin(Constant.WMS);
return cell;
})
.collect(Collectors.toList());
// 批量插入
if (CollectionUtils.isNotEmpty(needInsertCellList)) {
stationSkuCellDao.batchInsert(needInsertCellList);
}
}
3.3 存量数据校验
初始化数据完成后,建议立即进行数据校验与修复,主要原因如下:
- 备份数据滞后性:若使用历史备份(如昨天的备份),初始化后的目标库会缺失自备份以来的所有生产数据变更。
- 导出时间窗口:从数据导出到导入完成这段时间内,源库可能持续发生变更,导致目标库数据落后。
增量校验与修复方案
通过比对表的update_time字段进行增量数据同步:
- 校验逻辑:筛选目标表中
update_time晚于数据导出时间点的数据行。这些行代表在初始化窗口期内已发生变更,需要与源库对齐。 - 修复策略:直接用源表的对应行数据覆盖目标表。
3.4 业务开启双写,以源表为准
如何实现源表与目标表的数据双写?
两大实现方向对比:
| 方案类型 | 实现方式 | 优缺点 | 可行性 |
| 侵入式方案 | 直接修改业务代码,在写完源表后增加写目标表的逻辑 | • 缺点:工作量大、需排查所有业务代码、测试成本高、容易出错 | ❌ 不可行/代价高 |
| 非侵入式方案 | 通过数据库中间件/ORM框架的AOP机制实现 | • 优点:对业务代码无侵入、统一管控、易于维护 | ✅ 推荐采用 |
推荐非侵入式方案:
总结:采用非侵入式方案,通过ORM框架的AOP机制拦截数据变更操作,在不修改业务代码的前提下实现双写。
另外,双写可能出现的问题,
- 写目标表成功,写入源表失败,放到死信队列
- 写目标表失败,写源表成功,放到死信队列
结合业务不追求强一致性的时候,通过最终一致性去修复数据。
3.5 增量校验:开启增量校验和数据修复,业务校验,保持一段时间。
增量校验在保持双写持续进行的同时,对最新变更的数据进行实时或准实时的一致性验证,一旦发现数据不一致,立即触发自动修复机制。
方式1️⃣:利用更新时间戳
- 利用更新时间戳的思路很简单,就是定时查询每一张表,然后根据更新时间戳来判断某一行数据有没有发生变化。
// 记录上一次同步的最大时间戳
Timestamp lastTime = getLastSyncTime(); // 从数据库或缓存中读取
while (true) {
try {
// 1. 查询自上次同步以来发生过变更的数据
// SELECT * FROM source_table WHERE update_time >= ? ORDER BY update_time ASC
List<Row> updatedRows = sourceDao.findUpdatedRows(lastTime);
// 2. 遍历每一条变更的数据
for (Row sourceRow : updatedRows) {
// 3. 根据主键在目标库中查找对应数据
Object primaryKey = sourceRow.getId(); // 获取主键值
Row targetRow = targetDao.findById(primaryKey);
// 4. 比较源数据和目标数据是否一致
if (!isEquals(sourceRow, targetRow)) {
// 5. 如果不一致,执行修复
if (targetRow == null) {
// 目标库没有这条数据 -> 插入
targetDao.insert(sourceRow);
} else {
// 目标库有数据但内容不一致 -> 更新
targetDao.update(sourceRow);
}
log.info("修复数据: id={}", primaryKey);
}
}
// 6. 更新下一次查询的起始时间戳
if (!updatedRows.isEmpty()) {
// 取本次查询到的所有数据中最大的更新时间
Timestamp maxUpdateTime = updatedRows.stream()
.map(Row::getUpdateTime)
.max(Timestamp::compareTo)
.get();
lastTime = maxUpdateTime;
saveLastSyncTime(lastTime); // 持久化记录
}
// 7. 暂停1秒,避免对数据库造成过大压力
Thread.sleep(1000);
} catch (Exception e) {
log.error("同步过程发生异常", e);
// 异常处理:可以等待一段时间后重试
try {
Thread.sleep(5000);
} catch (InterruptedException ie) {
Thread.currentThread().interrupt();
break;
}
}
}
// 比较两个Row是否相等
boolean isEquals(Row source, Row target) {
if (source == null && target == null) return true;
if (source == null || target == null) return false;
// 比较关键字段(排除update_time等可能自动更新的字段)
return Objects.equals(source.getId(), target.getId())
&& Objects.equals(source.getName(), target.getName())
&& Objects.equals(source.getStatus(), target.getStatus())
// ... 其他需要比较的字段
;
}
方式2️⃣:通过binlog校验(推荐)
基于行的 binlog 模式,监听 binlog 的方案,用主键同时查询源表和目标表的当前状态进行比对,确保基于最新数据做决策,不一致时用源表覆盖目标表
执行步骤概述:
1. 监听 Binlog
↓
2. 收到变更事件(INSERT/UPDATE/DELETE)
↓
3. 从事件中提取主键 ID
↓
4. 根据主键同时查询源表和目标表
↓
5. 比对两表数据
↓
6. 发现不一致 → 用源表数据修复目标表
↓
7. 记录校验结果和修复日志
📝 伪代码实现
// 1. 初始化 Binlog 监听器
binlogListener = new BinlogListener()
// 2. 注册事件处理器
binlogListener.onEvent((event) -> {
// 提取事件中的主键
rowId = event.getPrimaryKey()
tableName = event.getTableName()
// 3. 根据主键查询源表和目标表
sourceRow = querySourceById(tableName, rowId)
targetRow = queryTargetById(tableName, rowId)
// 4. 数据比对
if (!compareData(sourceRow, targetRow)) {
// 5. 发现不一致,用源表修复目标表
repairTarget(tableName, sourceRow)
// 6. 记录修复日志
logRepair(tableName, rowId)
}
})
// 7. 启动监听
binlogListener.start()
方案3️⃣:对源表和目标表的 Binlog进行校验
你可能会想到一个更“聪明”的方案:既然源表和目标表都有 Binlog,那为什么不直接比较两者的 Binlog 呢?如果 Binlog 不一致,不就说明目标表出问题了吗?
理论上可行,但实践中存在两个问题:
1. 时间差问题(双写延迟)
- 同一笔数据变更,源表和目标表的 Binlog到达时间可能相差很大
- 你可能先收到源表的 Binlog,过了很久才收到目标表的;也可能反过来
- 这意味着你必须缓存其中一端的 Binlog,等待另一端的 Binlog 到达后才能进行比较,大大增加了系统复杂度
2. 顺序乱序问题(并发写入)
- 如果对同一行数据连续进行两次更新,Binlog 的接收顺序可能与产生顺序不一致
- 例如:你可能先收到源表第一次更新的 Binlog,后收到目标表第二次更新的 Binlog
- 要解决这个问题,你必须引入消息队列等组件对 Binlog 进行排序,确保消费顺序与产生顺序完全一致
结论:不可取。虽然这个方案能减少对数据库的直接查询压力,但它引入了极高的系统复杂度:
- 需要处理时间差 → 引入缓存机制
- 需要保证顺序 → 引入消息队列排序
- 整体架构变得臃肿且难以维护
3.6 切换双写顺序,此时读目标表,并且先写目标表,数据以目标表为准。
采用的是灰度可回滚的双写策略。
我们将双写顺序配置为‘先目标后源’,但这不是一刀切的,而是按流量灰度逐步推进。
同时,我们始终保持增量校验和自动修复在后台运行。
这套机制的核心价值在于:
第一,可回滚——发现问题能立即切回安全模式;
第二,可灰度——风险可控地逐步放量;
第三,自愈能力——即使出现不一致,校验修复也能自动处理。
这就好比给迁移过程上了‘双保险’,既保证了探索新架构的勇气,又留好了退路。
四、回滚方案设计
4.1 什么情况下需要回滚?
迁移过程中可能遇到各类突发问题:
- 数据不一致率飙升:超出预期阈值
- 目标库性能瓶颈:写入延迟过大,拖累业务
- 业务反馈异常:用户侧出现错误或数据错乱
- 系统资源过载:CPU、IOPS 达到极限
此时,能够快速、无损地将流量切回原架构,是保护业务的首要任务。
4.2 可回滚的核心设计:双写顺序切换,ducc配置
我们的方案支持动态切换双写顺序,通过ducc配置无需重启应用:
| 模式 | 写入顺序 | 读取来源 | 适用场景 |
| 阶段一 | 先源表,后目标表 | 读源表 | 迁移初期、发现问题时快速回滚 |
| 阶段二 | 先目标表,后源表 | 读目标表 | 灰度验证、逐步放量 |
切换原理:双写代理层通过 AOP 拦截所有数据变更操作,根据配置中心的开关决定执行顺序。该开关支持灰度粒度(按商家、门店、流量百分比),可精细控制切换范围。
回滚后处理
- 增量校验:即使回滚,校验服务仍基于 Binlog 实时比对,发现不一致则用源表修复目标表
- 死信队列补偿:回滚前可能已有部分数据写入目标表,通过补偿任务最终对齐
4.3 灰度切换策略
为避免“一刀切”风险,我们采用**灰度放量**策略:
1. 1% 灰度:选择少量商家/门店开启“先目标后源”模式
2. 观察期:持续校验 1-2天,确认无误
3. 商家门店逐步放大:5% → 20% → 50% → 100%
五、监控与告警体系
5.1 核心监控指标
| 维度 | 指标 | 说明 | 阈值建议 |
| 数据一致性 | 实时校验不一致数 | 每分钟发现的不一致行数 | >10 告警 |
| 双写性能 | 双写平均耗时 | 源表和目标表写入总耗时 | 基线+50% 告警 |
| 死信队列积压 | 补偿队列未处理消息数 | >3 告警 | |
| 双写失败率 | 任一表写入失败的比例 | >1% 告警 | |
| 系统资源 | 源/目标库 CPU | 数据库 CPU 使用率 | >80% 告警 |
| 业务影响 | 核心接口 RT | 业务接口响应时间 | 基线+30% 告警 |
| 错误率 | 业务请求错误比例 | >0.5% 告警 |
5.2 告警策略
P0 级告警(立即处理):
- 数据不一致数持续增长且修复失败
- 双写失败率 > 5%
- 核心业务接口错误率 > 2%
- 触发自动回滚 or 京me通知相关人员
P1 级告警(当日处理):
- 死信队列积压 > 3,需要修复数据
- 数据库 CPU > 85%,需要暂停写入
六、总结
本文围绕储区储位数据迁移,提出了一套可灰度、可观测、可回滚、自愈性的不停机迁移方案。
核心设计包括:双写顺序,回滚设计;增量校验、死信队列补偿异常等实现业务无感知恢复。
该方案不仅适用于MySQL异构宽表迁移,还可推广至同构数据库迁移、异构数据同步、分库分表、大表变更及系统重构等场景。
以上仅为个人实践总结,如有不妥之处,欢迎批评指正,共同探讨更优方案。






