开发者社区 > 博文 > 如何不停机完成海量数据迁移?
分享
  • 打开微信扫码分享

  • 点击前往QQ分享

  • 点击前往微博分享

  • 点击复制链接

如何不停机完成海量数据迁移?

  • 京东云开发者
  • 2026-04-10
  • IP归属:北京
  • 118浏览

    1、背景

    1.1 业务背景

    近期在处理业务需求时,涉及储区储位的迁移工作。由于业务中存在“一品多储位、一储位多品”的复杂关系,且数据量较大,原由库存维护的储位数据需要整体迁移至履约团队进行统一管理。本次迁移的核心技术难点在于在有业务流量使用,不停机的数据库层面的数据迁移,如何保证数据一致性等

    1.2 数据迁移概述

    随着业务的发展,系统重构与数据迁移几乎都会遇到。无论是分库分表、数据异构、老系统重构,还是大表结构变更,停机会影响业务,不停机数据迁移都是保障业务连续性的关键环节。尤其对于订单、商品、支付等有状态的核心数据,其一致性和可用性直接关系到公司业务的稳定运行。

    数据迁移的常见场景包括:

    • 分库分表
    • 数据异构(如 MySQL → ES)
    • 系统重构(新建表结构替换老系统)
    • 大表结构变更
    • 数据库迁移(同构或异构,如 MySQL → MySQL、MySQL → JED、MySQL → 大数据平台等)

    1.3 技术选型与场景分析

    根据迁移的同构/异构性质、数据量大小以及业务对停机的容忍度,可以选择不同的技术方案。

    下表总结了常见场景的推荐方案及核心考量点:

    数据迁移选型需综合考量数据库类型(同构/异构)数据规模一致性要求

    场景
    推荐方案
    同构、在线、大数据量
    云服务DTS
    异构、在线、大数据量
    CDC组件 + 同步引擎
    异构、定制化需求
    自研程序 + Canal / Debezium
    复杂转换逻辑/非主流库
    ETL工具(Kettle、DataX)
    同构、小数据量、可停机
    逻辑导出导入(mysqldump、pg_dump)

    方案如何选择采取

    1. 判断同构/异构
      • 若为同构迁移(如 MySQL → MySQL),优先考虑数据库原生工具或云厂商的 DTS 服务。
      • 若为异构迁移,则进入下一步。
    2. 评估业务场景与迁移目标
      • 如果是上云场景,或在主流数据库之间迁移(如 Oracle → MySQL),强烈推荐使用云厂商的 DTS 服务,它能解决数据一致性、增量同步、切换可视化等绝大多数痛点。
      • 如果数据转换逻辑非常复杂,或目标数据库为非主流存储,则应考虑 ETL 工具(如 Kettle)或自研方案,以获得最大的灵活性。

    若不采取DTS,CDC的方式,结合业界常用技术组件,迁移过程中,借助消息队列或者binlog去做,避免侵入业务,降低复杂性。

    1.4 数据迁移核心挑战

    挑战维度
    具体问题
    影响程度
    数据一致性
    迁移过程中数据丢失或不一致

    业务连续性
    迁移导致服务停机
    极高
    数据量级
    海量数据迁移时间窗口

    关联关系
    表之间的复杂的业务逻辑

    回滚能力
    出现问题如何快速恢复
    极高

    二、迁移方案总览

    2.1 整体架构图

    2.2 数据迁移过程

    迁移过程概述:

    1. 初始化目标表:创建储区储位表。
    2. 存量迁移:用源表的数据初始化目标表。
    3. 存量数据校验:执行一次校验,并且修复数据,此时用源表数据修复目标表数据。
    4. 业务双写:业务代码开启双写,同时写目标表和源表,此时读源表,数据以源表为准。
    5. 增量校验:开启增量校验和数据修复,业务校验,保持一段时间。
    6. 切换双写顺序:此时读目标表,并且先写目标表,数据以目标表为准。
    7. 保持增量校验和数据修复。
    8. 切换为目标表单写,停掉源表写入,读写以目标表为准。

    三、迁移实施方案

    3.1 初始化目标表:创建储区储位表

    CREATE TABLE `station_sku_cell` (
      `id` bigint(20) NOT NULL AUTO_INCREMENT COMMENT '主键ID',
      `org_code` varchar(20) NOT NULL COMMENT '商家编码',
      `station_id` bigint(20) NOT NULL COMMENT '中台门店id',
      `sku_id` bigint(20) NOT NULL COMMENT '商品skuId',
      `sku_name` varchar(200) NOT NULL DEFAULT '' COMMENT '商品名称',
      `out_sku_id` varchar(50) DEFAULT NULL COMMENT '商家商品编号',
      `upc` varchar(1024) DEFAULT NULL COMMENT '条形码',
      `area_code` varchar(64) NOT NULL COMMENT '储区编码',
      `cell_code` varchar(64) NOT NULL COMMENT '储位编码',
      `create_time` datetime NOT NULL DEFAULT CURRENT_TIMESTAMP COMMENT '创建时间',
      `create_pin` varchar(50) NOT NULL DEFAULT '' COMMENT '创建人',
      `update_time` datetime NOT NULL DEFAULT CURRENT_TIMESTAMP ON UPDATE CURRENT_TIMESTAMP COMMENT '更新时间',
      `update_pin` varchar(50) NOT NULL DEFAULT '' COMMENT '更新人',
      `sys_version` int(11) NOT NULL DEFAULT '0' COMMENT '版本号',
      `yn` tinyint(4) NOT NULL DEFAULT '0' COMMENT '逻辑删除标示位',
      `ts` timestamp NOT NULL DEFAULT CURRENT_TIMESTAMP ON UPDATE CURRENT_TIMESTAMP COMMENT '时间戳',
      PRIMARY KEY (`id`),
      KEY `idx_org_station_cell` ( `org_code`,`station_id`,`cell_code`),
      KEY `idx_org_station_sku` ( `org_code`,`station_id`,`sku_id`)
    ) ENGINE=InnoDB AUTO_INCREMENT=1 DEFAULT CHARSET=utf8mb4 COMMENT='门店商品与储位关系表';
    

    3.2 存量数据迁移

    初始化表之后,怎么把源表数据导入源表数据呢,这里涉及到实时性,在低峰期的时候,通过调用的接口的同步目标表数据。如果不在低峰期,通过动态监控到MySQL的CPU,磁盘等,当cpu不高的时候写入,避免对业务产生影响。更新的时候,要注意避免旧的数据把新数据覆盖(比对版本号or更新时间)。

    方案
    实现方式
    特点
    方案一:历史备份
    使用数据库现有备份文件恢复数据
    • 优点:对源系统无影响 • 缺点:数据可能不是最新的
    方案二:源表导出
    直接导出源表数据(SQL/CSV等格式)
    • 优点:操作简单直接 • 缺点:可能影响源库性能
    方案三:接口调用
    通过业务接口,按商家、门店维度逐个获取并同步写入
    • 优点:数据实时性最好、业务逻辑完整、可按维度控制、便于问题排查、可灰度发布 • 缺点:速度相对较慢、对源系统有压力

    部分核心代码展示

    /**
    * 同步储位信息
    *
    * @param orgCode 中台商家编码
    * @param stationIds 门店ids
    */
    ServiceResponse<Void> syncStationSkuCell(String orgCode, List<Integer> stationIds);
    
    public void syncStationSkuCell(String orgCode, List<Integer> stationIds) {
            long nextId = 0;
            long size = 0;
            while (true) {
                logger.info("商品与储位关系-开始同步,nextId:{}, size:{}", nextId, size);
                // 分页查询数据
                List<StationSkuStock> stockList = stationSkuStockDao.selectByCellNotNullWithPagination(orgCode, stationIds, nextId);
                if (CollectionUtils.isEmpty(stockList)) {
                    logger.info("商品与储位关系-同步完成");
                    return;
                }
                // 计算下一次 nextId
                nextId = stockList.get(stockList.size() - 1).getId();
                size += stockList.size();
                // 构造待验证的记录列表
                List<Map<String, Object>> checkRecords = stockList.stream()
                        .map(stock -> {
                            Map<String, Object> record = new HashMap<>();
                            record.put("orgCode", stock.getOrgCode());
                            record.put("stationId", stock.getStationId());
                            record.put("skuId", stock.getSkuId());
                            record.put("cellCode", stock.getChuWei());
                            return record;
                        })
                        .collect(Collectors.toList());
                // 批量查询已存在的记录
                List<StationSkuCell> existingRecords = stationSkuCellDao.batchCheckExists(checkRecords);
    
                Set<String> existingKeys = new HashSet<>();
                if (CollectionUtils.isNotEmpty(existingRecords)) {
                    // 构造已存在记录的 Set 用于快速比对
                    existingKeys = existingRecords.stream()
                            .map(record -> buildKey(record.getOrgCode(), record.getStationId(), record.getSkuId(), record.getCellCode()))
                            .collect(Collectors.toSet());
                }
                // 过滤掉已存在的记录
                Set<String> finalExistingKeys = existingKeys;
                List<StationSkuCell> needInsertCellList = stockList.stream()
                        .filter(stock -> !finalExistingKeys.contains(buildKey(stock.getOrgCode(), stock.getStationId(), stock.getSkuId(), stock.getChuWei())))
                        .map(stock -> {
                            StationSkuCell cell = new StationSkuCell();
                            cell.setOrgCode(stock.getOrgCode());
                            cell.setStationId(stock.getStationId());
                            cell.setSkuId(stock.getSkuId());
                            cell.setSkuName(stock.getSkuName());
                            cell.setOutSkuId(stock.getOutSkuId());
                            cell.setUpc(stock.getUpc());
                            if (StringUtils.isEmpty(stock.getChuQu())) {
                                StationCellInfo cellParam = new StationCellInfo();
                                cellParam.setStationId(stock.getStationId());
                                cellParam.setAreaCode(stock.getChuWei());
                                cellParam.setType(StationCellTypeEnum.CELL.getCode());
                                cellParam.setYn(false);
                                StationCellInfo stationCellInfo = stationCellInfoDao.getStationCellInfo(cellParam);
                                if (stationCellInfo != null) {
                                    cell.setAreaCode(stationCellInfo.getBelongAreaCode());
                                } else {
                                    cell.setAreaCode(stock.getChuWei());
                                }
                            } else {
                                cell.setAreaCode(stock.getChuQu());
                            }
                            cell.setCellCode(stock.getChuWei());
                            cell.setCreateTime(new Date());
                            cell.setCreatePin(Constant.WMS);
                            cell.setUpdateTime(new Date());
                            cell.setUpdatePin(Constant.WMS);
                            return cell;
                        })
                        .collect(Collectors.toList());
                // 批量插入
                if (CollectionUtils.isNotEmpty(needInsertCellList)) {
                    stationSkuCellDao.batchInsert(needInsertCellList);
                }
            }
    

    3.3 存量数据校验

    初始化数据完成后,建议立即进行数据校验与修复,主要原因如下:

    1. 备份数据滞后性:若使用历史备份(如昨天的备份),初始化后的目标库会缺失自备份以来的所有生产数据变更。
    2. 导出时间窗口:从数据导出到导入完成这段时间内,源库可能持续发生变更,导致目标库数据落后。

    增量校验与修复方案

    通过比对表的update_time字段进行增量数据同步:

    • 校验逻辑:筛选目标表中update_time晚于数据导出时间点的数据行。这些行代表在初始化窗口期内已发生变更,需要与源库对齐。
    • 修复策略:直接用源表的对应行数据覆盖目标表。

    3.4 业务开启双写,以源表为准

    如何实现源表与目标表的数据双写?

    两大实现方向对比:

    方案类型
    实现方式
    优缺点
    可行性
    侵入式方案
    直接修改业务代码,在写完源表后增加写目标表的逻辑
    • 缺点:工作量大、需排查所有业务代码、测试成本高、容易出错
    ❌ 不可行/代价高
    非侵入式方案
    通过数据库中间件/ORM框架的AOP机制实现
    • 优点:对业务代码无侵入、统一管控、易于维护
    ✅ 推荐采用

    推荐非侵入式方案:

    总结:采用非侵入式方案,通过ORM框架的AOP机制拦截数据变更操作,在不修改业务代码的前提下实现双写。

    另外,双写可能出现的问题,

    1. 写目标表成功,写入源表失败,放到死信队列
    2. 写目标表失败,写源表成功,放到死信队列

    结合业务不追求强一致性的时候,通过最终一致性去修复数据。

    3.5 增量校验:开启增量校验和数据修复,业务校验,保持一段时间。

    增量校验保持双写持续进行的同时,对最新变更的数据进行实时或准实时的一致性验证,一旦发现数据不一致,立即触发自动修复机制

    方式1️⃣:利用更新时间戳

    • 利用更新时间戳的思路很简单,就是定时查询每一张表,然后根据更新时间戳来判断某一行数据有没有发生变化。
    // 记录上一次同步的最大时间戳
    Timestamp lastTime = getLastSyncTime(); // 从数据库或缓存中读取
    
    while (true) {
        try {
            // 1. 查询自上次同步以来发生过变更的数据
            // SELECT * FROM source_table WHERE update_time >= ? ORDER BY update_time ASC
            List<Row> updatedRows = sourceDao.findUpdatedRows(lastTime);
            
            // 2. 遍历每一条变更的数据
            for (Row sourceRow : updatedRows) {
                // 3. 根据主键在目标库中查找对应数据
                Object primaryKey = sourceRow.getId(); // 获取主键值
                Row targetRow = targetDao.findById(primaryKey);
                
                // 4. 比较源数据和目标数据是否一致
                if (!isEquals(sourceRow, targetRow)) {
                    // 5. 如果不一致,执行修复
                    if (targetRow == null) {
                        // 目标库没有这条数据 -> 插入
                        targetDao.insert(sourceRow);
                    } else {
                        // 目标库有数据但内容不一致 -> 更新
                        targetDao.update(sourceRow);
                    }
                    log.info("修复数据: id={}", primaryKey);
                }
            }
            
            // 6. 更新下一次查询的起始时间戳
            if (!updatedRows.isEmpty()) {
                // 取本次查询到的所有数据中最大的更新时间
                Timestamp maxUpdateTime = updatedRows.stream()
                    .map(Row::getUpdateTime)
                    .max(Timestamp::compareTo)
                    .get();
                lastTime = maxUpdateTime;
                saveLastSyncTime(lastTime); // 持久化记录
            }
            
            // 7. 暂停1秒,避免对数据库造成过大压力
            Thread.sleep(1000);
            
        } catch (Exception e) {
            log.error("同步过程发生异常", e);
            // 异常处理:可以等待一段时间后重试
            try {
                Thread.sleep(5000);
            } catch (InterruptedException ie) {
                Thread.currentThread().interrupt();
                break;
            }
        }
    }
    
    // 比较两个Row是否相等
    boolean isEquals(Row source, Row target) {
        if (source == null && target == null) return true;
        if (source == null || target == null) return false;
        
        // 比较关键字段(排除update_time等可能自动更新的字段)
        return Objects.equals(source.getId(), target.getId())
            && Objects.equals(source.getName(), target.getName())
            && Objects.equals(source.getStatus(), target.getStatus())
            // ... 其他需要比较的字段
            ;
    }
    

    方式2️⃣:通过binlog校验(推荐)

    基于行的 binlog 模式,监听 binlog 的方案,用主键同时查询源表和目标表的当前状态进行比对,确保基于最新数据做决策,不一致时用源表覆盖目标表

    执行步骤概述:

    1. 监听 Binlog
       ↓
    2. 收到变更事件(INSERT/UPDATE/DELETE)
       ↓
    3. 从事件中提取主键 ID
       ↓
    4. 根据主键同时查询源表和目标表
       ↓
    5. 比对两表数据
       ↓
    6. 发现不一致 → 用源表数据修复目标表
       ↓
    7. 记录校验结果和修复日志
    

    📝 伪代码实现

    // 1. 初始化 Binlog 监听器
    binlogListener = new BinlogListener()
    
    // 2. 注册事件处理器
    binlogListener.onEvent((event) -> {
        // 提取事件中的主键
        rowId = event.getPrimaryKey()
        tableName = event.getTableName()
        
        // 3. 根据主键查询源表和目标表
        sourceRow = querySourceById(tableName, rowId)
        targetRow = queryTargetById(tableName, rowId)
        
        // 4. 数据比对
        if (!compareData(sourceRow, targetRow)) {
            // 5. 发现不一致,用源表修复目标表
            repairTarget(tableName, sourceRow)
            
            // 6. 记录修复日志
            logRepair(tableName, rowId)
        }
    })
    
    // 7. 启动监听
    binlogListener.start()
    

    方案3️⃣:对源表和目标表的 Binlog进行校验

    你可能会想到一个更“聪明”的方案:既然源表和目标表都有 Binlog,那为什么不直接比较两者的 Binlog 呢?如果 Binlog 不一致,不就说明目标表出问题了吗?

    理论上可行,但实践中存在两个问题:

    1. 时间差问题(双写延迟)

    • 同一笔数据变更,源表和目标表的 Binlog到达时间可能相差很大
    • 你可能先收到源表的 Binlog,过了很久才收到目标表的;也可能反过来
    • 这意味着你必须缓存其中一端的 Binlog,等待另一端的 Binlog 到达后才能进行比较,大大增加了系统复杂度

    2. 顺序乱序问题(并发写入)

    • 如果对同一行数据连续进行两次更新,Binlog 的接收顺序可能与产生顺序不一致
    • 例如:你可能先收到源表第一次更新的 Binlog,后收到目标表第二次更新的 Binlog
    • 要解决这个问题,你必须引入消息队列等组件对 Binlog 进行排序,确保消费顺序与产生顺序完全一致

    结论:不可取。虽然这个方案能减少对数据库的直接查询压力,但它引入了极高的系统复杂度

    • 需要处理时间差 → 引入缓存机制
    • 需要保证顺序 → 引入消息队列排序
    • 整体架构变得臃肿且难以维护

    3.6 切换双写顺序,此时读目标表,并且先写目标表,数据以目标表为准。

    采用的是灰度可回滚的双写策略

    我们将双写顺序配置为‘先目标后源’,但这不是一刀切的,而是按流量灰度逐步推进

    同时,我们始终保持增量校验和自动修复在后台运行。

    这套机制的核心价值在于:

    第一,可回滚——发现问题能立即切回安全模式;

    第二,可灰度——风险可控地逐步放量;

    第三,自愈能力——即使出现不一致,校验修复也能自动处理。

    这就好比给迁移过程上了‘双保险’,既保证了探索新架构的勇气,又留好了退路。

    四、回滚方案设计

    4.1 什么情况下需要回滚?

    迁移过程中可能遇到各类突发问题:

    • 数据不一致率飙升:超出预期阈值
    • 目标库性能瓶颈:写入延迟过大,拖累业务
    • 业务反馈异常:用户侧出现错误或数据错乱
    • 系统资源过载:CPU、IOPS 达到极限

    此时,能够快速、无损地将流量切回原架构,是保护业务的首要任务。

    4.2 可回滚的核心设计:双写顺序切换,ducc配置

    我们的方案支持动态切换双写顺序,通过ducc配置无需重启应用:

    模式
    写入顺序
    读取来源
    适用场景
    阶段一
    先源表,后目标表
    读源表
    迁移初期、发现问题时快速回滚
    阶段二
    先目标表,后源表
    读目标表
    灰度验证、逐步放量

    切换原理:双写代理层通过 AOP 拦截所有数据变更操作,根据配置中心的开关决定执行顺序。该开关支持灰度粒度(按商家、门店、流量百分比),可精细控制切换范围。

    回滚后处理

    • 增量校验:即使回滚,校验服务仍基于 Binlog 实时比对,发现不一致则用源表修复目标表
    • 死信队列补偿:回滚前可能已有部分数据写入目标表,通过补偿任务最终对齐

    4.3 灰度切换策略

    为避免“一刀切”风险,我们采用**灰度放量**策略:

    1. 1% 灰度:选择少量商家/门店开启“先目标后源”模式

    2. 观察期:持续校验 1-2天,确认无误

    3. 商家门店逐步放大:5% → 20% → 50% → 100%

    五、监控与告警体系

    5.1 核心监控指标

    维度
    指标
    说明
    阈值建议
    数据一致性
    实时校验不一致数
    每分钟发现的不一致行数
    >10 告警
    双写性能
    双写平均耗时
    源表和目标表写入总耗时
    基线+50% 告警
    死信队列积压
    补偿队列未处理消息数
    >3 告警
    双写失败率
    任一表写入失败的比例
    >1% 告警
    系统资源
    源/目标库 CPU
    数据库 CPU 使用率
    >80% 告警
    业务影响
    核心接口 RT
    业务接口响应时间
    基线+30% 告警
    错误率
    业务请求错误比例
    >0.5% 告警

    5.2 告警策略

    P0 级告警(立即处理):

    • 数据不一致数持续增长且修复失败
    • 双写失败率 > 5%
    • 核心业务接口错误率 > 2%
    • 触发自动回滚 or 京me通知相关人员

    P1 级告警(当日处理):

    • 死信队列积压 > 3,需要修复数据
    • 数据库 CPU > 85%,需要暂停写入

    六、总结

    本文围绕储区储位数据迁移,提出了一套可灰度、可观测、可回滚、自愈性的不停机迁移方案。

    核心设计包括:双写顺序,回滚设计;增量校验、死信队列补偿异常等实现业务无感知恢复。

    该方案不仅适用于MySQL异构宽表迁移,还可推广至同构数据库迁移、异构数据同步、分库分表、大表变更及系统重构等场景。

    以上仅为个人实践总结,如有不妥之处,欢迎批评指正,共同探讨更优方案。