开发者社区 > 博文 > 大表治理之分表实践:带你手搓一个简单的数据库分表组件
分享
  • 打开微信扫码分享

  • 点击前往QQ分享

  • 点击前往微博分享

  • 点击复制链接

大表治理之分表实践:带你手搓一个简单的数据库分表组件

  • jd****
  • 2026-06-11
  • IP归属:北京
  • 2浏览

    导语

    手搓一个简单的数据库分表组件,KISS原则,简单够用。困于其他组件之局限而自起,无它,够用足矣。


    背景

    在大型系统中,流量大,数据量大,很容易出现大表,一般来说,大表有以下几方面特征:

    数据量维度:通常认为单表数据量超过1000万行或10GB以上可以视为大表。

    性能维度:当表的查询、插入、更新、删除操作开始出现明显性能下降。

    维护维度:当对表进行DDL操作(如添加索引、修改字段)需要很长时间(如超过1小时)。

    业务维度:根据具体业务场景,影响核心业务流程的表即使数据量不大也可能被视为大表。


    以库存流水表为例,一天的流水量是3400万行,大小为31GB。



    大表会伴随着产生读写性能下降、服务耗时长、DDL、DML时间长、备份恢复难度大、统计信息不准确等弊端,需要进行大表治理优化。


    治理方案

    大表的治理优化方案

    一般有几个思路:

    分库分表:按业务维度(仓号、SKU、主键ID等)水平拆分。

    表分区:按时间、哈希或范围分区,提高查询效率。

    冷热分离:冷数据归档结转,超过一定时间的热数据则转为冷数据存储。

    索引优化:删除冗余和低效索引,减少索引维护成本,提升单表读写性能。

    读写分离:主库写操作,从库读操作,减轻主库压力。


    在本系统的治理中,除了表分区方式暂未使用,其他优化策略均有使用,尤其是在分库分表方面,通过水平拆分数据量降维最直接高效,也是本文阐述的重点,其他策略不重点赘述。


    分表组件选择

    分库方面,之前已接入分库,按仓号分库,但流水表尚未分表,单表的流水量依然很大,其中一个分库下20天的单表存量数据行在四五千万行左右。

    本次是在分库的基础上进行分表。在分库分表的组件选择上,业界常用的分库分表组件有Sharding-JDBC、Mycat‌、Cobar‌、TDDL‌、Atlas‌等,他们的优劣势各不相同,但这些分库分表插件在SQL语法支持上都存在自己的局限性。

    例如:

    Sharding-JDBC

    ‌SQL语法限制‌:

    不支持复杂的SQL语法,如OR、UNION和UNION ALL等操作

    不支持HAVING子句

    不支持子查询的完整支持

    不支持DISTINCT聚合操作

    不支持SELECT LAST_INSERT_ID()语句

    不支持CASE WHEN语句

    不支持VALUES后有多行数据的INSERT语句

    不支持dual虚拟表


    Mycat

    ‌SQL语法限制‌:

    不支持跨库多表关联查询和子查询

    不支持CREATE TABLE LIKE xxx和CREATE TABLE SELECT xxx等语句

    不支持SELECT FOR UPDATE、SELECT LOCK IN SHARE MODE等语句

    不支持跨分片的UPDATE/DELETE操作

    不支持多表UPDATE或UPDATE分片键

    不支持跨分片的ORDER BY LIMIT操作


    Cobar

    ‌SQL语法限制‌:

    不支持跨库情况下的JOIN、分页、排序、子查询操作

    不支持SAVEPOINT操作

    SET语句执行会被忽略(事务和字符集设置除外)

    分库情况下INSERT语句必须包含拆分字段列名

    分库情况下UPDATE语句不能更新拆分字段的值

    仅支持部分SQL数据节点


    TDDL

    ‌SQL语法限制‌:

    不支持JOIN、多表查询等语法

    不支持跨库JOIN操作

    不支持存储过程、函数等复杂SQL操作


    Atlas

    ‌SQL语法限制‌:

    社区维护较久,功能相对有限

    不支持跨库JOIN等复杂操作


    这些组件在SQL语法支持上的局限性主要源于分库分表场景下对SQL解析和路由的复杂性,以及各组件架构设计的差异。


    正因为这些SQL语法的局限性,在之前分库时,部门没再继续使用Sharding-jdbc作为分库分表的插件,而是部门自研了一个适合自己的分库插件。关于自研这件事,造轮子目的很直接简单,并不是为了证明什么,仅仅是为了适配自己的场景,组件设计和实现也并不复杂,远远不如行业里的哪些插件强大,根据KISS原则,够用+1而已。

    在分表时,依然选择了部门自研这个条路,原因同样简单:无它,够用+1而已。


    组件设计

    基于 MyBatis 插件机制实现的分表拦截器,采用 策略模式 + 责任链模式 的架构设计。



    ┌─────────────────────────────────────────────────────────────┐

    │ MyBatis 插件链 │

    ├─────────────────────┬───────────────────────────────────────┤

    │ SQLMarkingPlugin │ SQLBasedDatabaseProtectorPlugin │

    ├─────────────────────┼───────────────────────────────────────┤

    │ DataSourceRouting │ TableShardingInterceptor │

    │ Interceptor │ (分表拦截器) │

    └─────────────────────┴───────────────────────────────────────┘





    模块划分

    拦截处理模块:intercept() 方法,负责拦截 SQL 执行。

    策略模块:Policy 枚举,定义不同的读写策略。

    SQL处理模块:负责 SQL 的解析、修改和重写。

    路由计算模块:与 ShardingSuffixCalculator 交互,计算分表后缀。

    参数处理模块:处理 SQL 参数映射。


    组件研发

    分表策略

    分表规则

    • 基于仓库编号(warehouseNo)进行分表
    • 支持多种分表策略:单写单读、单写双读、双写单读、双写双读
    • 分表数量可配置,如st_stock_stream:128表示分为128张表

    分表后缀计算

    // 分表后缀计算流程
    1. 从SQL参数中提取 warehouseNo
    2. 查询配置表,看是否已有分配的分表
    3. 如果有,直接使用
    4. 如果没有,通过负载均衡算法选择一个分表
    5. 使用分布式锁确保分配的唯一性
    

    读写策略






    四种策略模式

    1. doubleWriteAndReadOld:双写读旧表
      • 写操作:同时写入旧表和新表
      • 读操作:只读旧表
    2. doubleWriteAndReadNew:双写读新表
      • 写操作:同时写入旧表和新表
      • 读操作:只读新表(分表)
    3. singleWriteAndReadOld:单写读旧表
      • 写操作:只写入新表(分表)
      • 读操作:只读旧表
    4. singleWriteAndReadNew:单写读新表
      • 写操作:只写入新表(分表)
      • 读操作:只读新表(分表)
    5. singleWriteAndDoubleRead:单写双读
      • 写操作:只写入新表(分表)
      • 读操作:同时读旧表和新表,结果合并

    SQL处理机制




    SQL解析与重写

    1. 识别SQL中的表名
    2. 判断是否需要分表处理
    3. 提取分表键(warehouseNo)
    4. 计算分表后缀
    5. 重写SQL,替换表名
    6. 处理参数映射

    特殊SQL处理

    • COUNT查询SELECT COUNT(*) FROM tableSELECT SUM(countNumForSharding) FROM (SELECT COUNT(*) AS countNumForSharding FROM table_0 UNION ALL ...)
    • SUM查询SELECT SUM(column) FROM tableSELECT SUM(sumNumForSharding) FROM (SELECT SUM(column) AS sumNumForSharding FROM table_0 UNION ALL ...)
    • ORDER BY/LIMIT:在双读模式下需要特殊处理


    双写的处理


    核心功能

    该方法的核心功能是:将单条写入 SQL 转换为双写 SQL,即同时写入原始表和分表。

    实现逻辑详解

    1. 前置检查

    String sql= boundSql.getSql().trim();
    log.trace("TableShardingInterceptor#changeStatementForDoubleWrite, SQL: {}", sql);
    // 检查是否在忽略列表中if (StringUtils.isNotBlank(ignoredTables)) {
        for (String ignoredTable : StringUtils.split(ignoredTables, COMMA)) {
            if (StringUtils.containsIgnoreCase(sql, ignoredTable)) {
                return; // 忽略配置的表,不进行双写
            }
        }
    }
    

    逻辑说明

    • 首先获取原始 SQL 并记录日志
    • 检查当前 SQL 涉及的表是否在忽略列表中
    • 如果是忽略的表,则直接返回,不进行双写处理

    2. 分表处理

    if (StringUtils.isNotBlank(shardingTables)) {
        StatementChangeResultstatementChangeResult= changeSql(sql, metaParam);
        if (statementChangeResult.isSqlChanged()) {
            // 构建双写 SQL
            sql = statementChangeResult.getSql().concat(DOUBLE_WRITE_JOINER).concat(boundSql.getSql().trim());
            
            // 更新 BoundSql 中的 SQL
            ReflectionUtils.setFieldValue(BoundSql.class, boundSql, "sql", sql);
            
            // 复制参数映射
            boundSql.getParameterMappings().addAll(boundSql.getParameterMappings());
        }
    }
    

    逻辑说明

    • 检查是否配置了分表
    • 调用changeSql方法处理 SQL,获取分表后的 SQL
    • 如果 SQL 发生变化(即需要分表),则构建双写 SQL
    • 使用DOUBLE_WRITE_JOINER;)连接分表 SQL 和原始 SQL
    • 更新BoundSql对象中的 SQL
    • 复制参数映射,确保双写 SQL 的参数正确

    3. 双写 SQL 构建示例

    原始 SQL

    INSERT INTO st_stock_stream (id, warehouse_no, sku, quantity) VALUES (?, ?, ?, ?)
    

    双写 SQL

    INSERT INTO st_stock_stream_5 (id, warehouse_no, sku, quantity) VALUES (?, ?, ?, ?) ; INSERT INTO st_stock_stream (id, warehouse_no, sku, quantity) VALUES (?, ?, ?, ?)
    

    使用场景

    1. 数据迁移场景

    在从单表架构向分表架构迁移的过程中,使用双写策略可以确保:

    • 新数据同时写入旧表和新表
    • 保证迁移期间的数据一致性
    • 支持平滑迁移,可随时回滚
    2. 策略配置

    双写策略通过 policy 配置项控制:

    • doubleWriteAndReadOld:双写读旧表
    • doubleWriteAndReadNew:双写读新表


    关键点

    1. SQL 连接符

    public static final String DOUBLE_WRITE_JOINER=" ; ";
    

    使用分号作为 SQL 语句分隔符,这是 SQL 支持的多语句执行格式。

    2. 参数映射处理

    boundSql.getParameterMappings().addAll(boundSql.getParameterMappings());
    

    由于双写 SQL 包含两套相同的参数,需要将参数映射列表复制一份,确保参数正确绑定。

    3. 反射更新

    ReflectionUtils.setFieldValue(BoundSql.class, boundSql, "sql", sql);
    

    使用反射机制更新 BoundSql 对象中的 SQL,这是 MyBatis 插件开发中的常用技巧。


    执行流程图


    changeStatementForDoubleWrite 方法是实现平滑数据迁移的关键组件,它使得系统可以在不中断服务的情况下,从单表架构平滑过渡到分表架构,大大降低了系统升级的风险。


    双读的处理



    用于实现双读策略下的 SQL 处理。在双读模式下,系统会同时查询原始表和分表,并将结果合并,确保在数据迁移过程中能够读取到完整的数据。

    方法

    private static void changeStatementForDoubleRead(BoundSql boundSql, MetaObject metaParam)
    

    核心功能

    该方法的核心功能是:将单表查询 SQL 转换为双读 SQL,即同时查询原始表和分表,并处理结果合并。

    实现逻辑详解

    1. 前置检查

    String sql= boundSql.getSql().trim();
    log.trace("TableShardingInterceptor#changeStatementForDoubleRead, SQL: {}", sql);
    // 检查是否在忽略列表中if (StringUtils.isNotBlank(ignoredTables)) {
        for (String ignoredTable : StringUtils.split(ignoredTables, COMMA)) {
            if (StringUtils.containsIgnoreCase(sql, ignoredTable)) {
                return; // 忽略配置的表,不进行双读
            }
        }
    }
    

    逻辑说明

    • 获取原始 SQL 并记录日志
    • 检查当前 SQL 涉及的表是否在忽略列表中
    • 如果是忽略的表,则直接返回,不进行双读处理

    2. 分表处理与SQL重写

    if (StringUtils.isNotBlank(shardingTables)) {
        StatementChangeResultstatementChangeResult= changeSql(sql, metaParam);
        if (statementChangeResult.isSqlChanged()) {
            sql = mergeSqlForDoubleRead(statementChangeResult.getSql(), boundSql.getSql().trim());
            
            // 处理ORDER BY子句
            if (StringUtils.containsIgnoreCase(sql, ORDER_BY)) {
                intorderByIndex= StringUtils.lastIndexOfIgnoreCase(sql, ORDER_BY);
                StringorderByAndLimitSql= StringUtils.substring(sql, orderByIndex);
                StringsqlBeforeOrderBy= StringUtils.substring(sql, 0, orderByIndex);
                
                // 移除表别名前缀,避免UNION ALL后的列引用错误
                orderByAndLimitSql = orderByAndLimitSql.replaceAll("\\w+\\.", StringUtils.EMPTY);
                sql = StringTools.getJointCodeWithSpace(sqlBeforeOrderBy, orderByAndLimitSql);
            }
            
            // 处理COUNT查询
            if (StringUtils.containsIgnoreCase(sql, COUNT_NUM_FOR_SHARDING)) {
                sql = "SELECT SUM( " + COUNT_NUM_FOR_SHARDING + " ) FROM (" + sql + ") ".concat(TEMP_TABLE_NAME);
            }
            
            // 处理SUM查询
            if (StringUtils.containsIgnoreCase(sql, SUM_NUM_FOR_SHARDING)) {
                sql = "SELECT SUM( " + SUM_NUM_FOR_SHARDING + " ) FROM (" + sql + ") ".concat(TEMP_TABLE_NAME);
            }
            
            // 更新BoundSql
            ReflectionUtils.setFieldValue(BoundSql.class, boundSql, "sql", sql);
            appendParameterMappings(boundSql);
        }
    }
    

    关键技术点解析

    1. SQL合并机制

    private static String mergeSqlForDoubleRead(String changedSql, String originalSql) {
        StringnewSql= StringUtils.removeEnd(originalSql, SEMICOLON);
        newSql = newSql.trim().replaceAll(RegExpConstants.SQL_ORDER_BY, StringUtils.SPACE);
        newSql = newSql.trim().replaceAll(RegExpConstants.SQL_LIMIT, StringUtils.SPACE);
        return appendSelectColumns(newSql, changedSql);
    }
    

    处理步骤

    1. 移除原始SQL末尾的分号
    2. 移除ORDER BY子句(后续会重新处理)
    3. 移除LIMIT子句(后续会重新处理)
    4. 调用appendSelectColumns合并SELECT列

    2. ORDER BY处理

    if (StringUtils.containsIgnoreCase(sql, ORDER_BY)) {
        intorderByIndex= StringUtils.lastIndexOfIgnoreCase(sql, ORDER_BY);
        StringorderByAndLimitSql= StringUtils.substring(sql, orderByIndex);
        StringsqlBeforeOrderBy= StringUtils.substring(sql, 0, orderByIndex);
        
        // 关键:移除表别名前缀,避免"Unknown column"错误
        orderByAndLimitSql = orderByAndLimitSql.replaceAll("\\w+\\.", StringUtils.EMPTY);
        sql = StringTools.getJointCodeWithSpace(sqlBeforeOrderBy, orderByAndLimitSql);
    }
    

    问题背景: 在UNION ALL查询中,如果ORDER BY子句包含表别名(如m.update_time),会导致"SQL Unknown column"错误,因为UNION ALL后的结果集不再有原始表的别名。

    3. COUNT查询处理

    if (StringUtils.containsIgnoreCase(sql, COUNT_NUM_FOR_SHARDING)) {
        sql = "SELECT SUM( " + COUNT_NUM_FOR_SHARDING + " ) FROM (" + sql + ") ".concat(TEMP_TABLE_NAME);
    }
    

    转换示例

    • 原始SELECT COUNT(*) FROM table
    • 双读SELECT COUNT(*) AS countNumForSharding FROM table_0 UNION ALL SELECT COUNT(*) AS countNumForSharding FROM table
    • 最终SELECT SUM(countNumForSharding) FROM (SELECT COUNT(*) AS countNumForSharding FROM table_0 UNION ALL SELECT COUNT(*) AS countNumForSharding FROM table) tempForSharding

    4. SUM查询处理

    与COUNT查询类似,将多个分表的SUM结果再求和。

    5. 参数映射处理

    private static void appendParameterMappings(BoundSql boundSql) {
        Stringsql= boundSql.getSql().trim();
        intlimitIndex= StringUtils.lastIndexOfIgnoreCase(sql, LIMIT);
        
        if (limitIndex < 0) {
            // 无LIMIT,直接复制所有参数
            boundSql.getParameterMappings().addAll(boundSql.getParameterMappings());
            return;
        }
        
        // 处理LIMIT参数
        StringlimitOffset= StringUtils.substring(sql, limitIndex);
        intcountMatches= StringUtils.countMatches(limitOffset, SQL_PLACEHOLDER);
        
        if (countMatches == 2) {
            // LIMIT ?, ? (offset, pageSize)
            List<ParameterMapping> originalParameterMappings = Lists.newArrayList(boundSql.getParameterMappings());
            // 移除最后两个参数(LIMIT参数)
            boundSql.getParameterMappings().remove(boundSql.getParameterMappings().size() - 1);
            boundSql.getParameterMappings().remove(boundSql.getParameterMappings().size() - 1);
            // 添加完整的参数列表
            boundSql.getParameterMappings().addAll(originalParameterMappings);
        }
        // ... 其他情况处理
    }
    

    SQL转换示例

    1. 简单查询

    原始SQL

    SELECT * FROM st_stock_stream WHERE warehouse_no = ? AND deleted =0
    

    双读SQL

    SELECT * FROM st_stock_stream_5 WHERE warehouse_no = ? AND deleted =0 UNION ALL SELECT * FROM st_stock_stream WHERE warehouse_no = ? AND deleted =0
    

    2. 带ORDER BY和LIMIT的查询

    原始SQL

    SELECT * FROM st_stock_stream WHERE warehouse_no = ? ORDERBY update_time DESC LIMIT 10
    

    双读SQL

    SELECT * FROM st_stock_stream_5 WHERE warehouse_no = ? UNION ALL SELECT * FROM st_stock_stream WHERE warehouse_no = ? ORDERBY update_time DESC LIMIT 10
    

    3. COUNT查询

    原始SQL

    SELECT COUNT(*) FROM st_stock_stream WHERE warehouse_no = ?
    

    双读SQL

    SELECT SUM(countNumForSharding) FROM (
        SELECT COUNT(*) AS countNumForSharding FROM st_stock_stream_5 WHERE warehouse_no = ? 
        UNION ALL 
        SELECTCOUNT(*) AS countNumForSharding FROM st_stock_stream WHERE warehouse_no = ?
    ) tempForSharding
    

    执行流程图


    在从单表向分表架构迁移的过程中,使用双读策略可以:

    • 确保读取到完整的数据(旧表+新表)
    • 支持平滑迁移,无需停机
    • 提供回滚能力


    路由计算



    分表路由的一个具体实现,可以扩展实现为其他方式。


    路由算法

    1. 配置优先:首先检查是否已有配置的分表
    2. 负载均衡:选择分配仓库最少的分表
    3. 分布式锁:使用分布式锁确保分配的唯一性
    4. 容错机制:支持重试。

    参数处理


    参数映射处理

    • 双写模式下,参数列表需要复制一份
    • 双读模式下,需要根据LIMIT子句调整参数
    • 保持参数与SQL占位符的一致性

    性能优化


    优化策略

    1. 缓存机制:使用ThreadLocal缓存分表后缀
    2. 批量处理:支持批量操作的分表
    3. 索引优化:分表键上有索引,提高查询效率
    4. 连接池优化:每个分表使用独立的连接池

    可扩展性


    扩展点

    1. 自定义分表策略:通过实现ShardingSuffixCalculator接口
    2. 灵活配置:通过mybatis-config.xml配置分表规则
    3. 策略模式:支持多种读写策略,可动态切换

    监控与日志


    监控指标

    • SQL重写耗时
    • 分表路由计算耗时
    • 分表命中率
    • 分布式锁竞争情况

    日志记录

    • 分表键提取日志
    • SQL重写前后对比
    • 路由计算结果
    • 异常情况记录


    组件研发过程问题和解决方案

    1、双读,排序字段缺失报错

    案例:

    报错信息:

    ERROR 1250 (42000): Table 'm' from one of the SELECTs cannot be used in field list
    

    解决方案:将排序字段放在子查询的select column中

    select 1, m.update_time from wms_stock.st_stock_stream m UNION ALL select 1, m.update_time from wms_stock.st_stock_stream_0  m order by m.update_time asc limit 1;
    


    2、双读,select count 报错

    案例:

    select count(1) from wms_stock.st_stock_stream UNION ALL select count(1) from wms_stock.st_stock_stream_1
    

    解决方案:先将select count 通过UNION ALL 合并,然后外层进行sum


    3、sum() 函数 UNION ALL 后需要再次 sum

    案例:

    select sum(stock_qty) from wms_stock.st_stock UNION ALL select sum(stock_qty) from wms_stock.st_stock_1
    

    解决方案:每个sum 语句作为子查询,外层再次sum,累加所有的子数量,作为总数量。