京东Spark作为京东内部的统一分析计算引擎,目前服务于京东零售、京东物流、京东数字科技、京喜事业群、京东国际、京东安联、京东工业等各子集团,其承载着诸多核心业务线,其任务量逐年攀升,其任务增长趋势如图1所示,面对日益增长的业务量以及复杂、多变的业务场景,京东Spark计算引擎研发团队面临着诸多挑战,其中典型问题包括:稳定性挑战、计算效率提升挑战、降本增效挑战、满足业务个性化需求挑战。京东Spark计算引擎研发团队通过对Apache Spark的技术改进及架构升级,提供了一套高可用、高性能服务,以满足内部大量离线、实时分析型的查询需求。
在京东Spark任务中,Spark SQL的任务量占比较高(90%+),在生产场景中,我们发现有很多SQL业务场景需要一张小表(相对小)Inner / Left Outer Join 一张大表。这里的小表是指不会触发BroadcastHashJoin,默认走SortMergeJoin,了解Spark的同学会知道,SortMergeJoin 的实现逻辑:首先,两张表会分别根据join条件确定每条记录的key,基于该key做shuffle write将可能join到一起的记录分到同一个分区中;其次,在shuffle read阶段将两个表中相同分区的数据进行拉取并进行sort merge;最终,将满足join条件的数据进行join输出,其中满足join条件与具体SQL join策略相关,对于上面提及的场景,满足join条件是以左侧小表为基准的,即在两表join时只会将大表侧满足与小表侧数据相等条件的数据保留,对于大表侧不符合join条件的数据,我们认为是无效数据,由于无效数据参与shuffle、sort merge计算,一方面影响任务时效,另一方面无效的shuffle数据会增加集群磁盘IO、网络IO等负载,同时浪费大量的计算资源。下面是京东某重要业务线的一个真实案例,如图2所示,其中左侧某商品SKU表(小表)与右侧某商品SKU类目表(大表)进行Inner Join,通过Spark DAQ执行计划我们可以看出,二者走的是SortMergeJoin,其中左侧小表过滤后参与计算的数据量大概15亿条、shuffle量大概是168MB;而右侧大表的数据量大概是1558亿条、shuffle量高达2.5TB,通过最终join输出,可以看到满足join条件的数据仅7000多万,结合上面的分析, 说明大表侧有大部分数据是不满足join条件的,但这些无效数据都参与shuffle和sort merge环节,不但影响任务时效,还会增加集群负载等。本文基于上述问题进行深度分析,并重点介绍我们在Spark内核所做的优化。
本文讨论京东Spark计算引擎研发团队基于Bloom Filter算法的Runtime Filter Join优化机制,助力京东大促场景的探索和实践。Runtime Filter Join是京东Spark计算引擎研发团队最近推出的新特性,其原理是两个表进行join时,动态基于小表(相对小)构建BloomFilter来对大表侧进行过滤,过滤掉大表一侧不会被命中(不满足join条件)的输入数据,从而大幅减少磁盘IO、网络IO和计算的技术,并能够提升任务的整体执行时间。在详细介绍Runtime Filter Join特性之前,先对Spark SQL 的整体架构做一下概述, 由下图3可知,无论使用DataFrame、 SQL 语句还是使用DataSet ,都会经过如下步骤转换成 DAG 对 RDD 的操作:
1. 先解析 SQL,生成 Unresolved Logical Query Plan
2. 由 Analyzer 结合 Catalog 信息生成 Resolved Logical Plan
3. Optimizer根据预先定义好的规则对 Resolved Logical Plan 进行优化并生成 Optimized Logical Plan
4. Query Planner 将 Optimized Logical Plan 转换成多个 Physical Plan
5. 根据 Cost Model 算出每个 Physical Plan 的代价并选取代价最小的 Physical Plan 作为最终的 Physical Plan
6. Spark 以 DAG 的方法执行上述 Physical Plan
7. 在Spark 3.0版本以后,引入AQE(Adaptive Query Execution),其在执行 DAG 的过程中,会根据运行时信息动态调整执行计划从而提高执行效率。
JD Spark Runtime Filter Join的技术实现包括:基于逻辑执行计划(Logical Plan)数据裁剪和基于物理执行计划(Physical Plan)数据裁剪,其中前者是在逻辑执行计划(Logical Plan)优化过程中(上述步骤3),动态基于小表数据构建BloomFilter,并将该BloomFilter 算子插入到大表侧对其进行过滤;后者是对前者场景的补充,如果未能在逻辑执行计划优化中动态插入BloomFilter算子对大表进行过滤,在物理执行计划优化中(上述步骤7),通过AQE基于小表构建BloomFilter并对大表进行过滤,二者的主要区别是前者是在大表shuffle 前进行过滤,后者是在大表shuffle后进行过滤,其具体实现架构如图4所示,为了描述方便我们这里结合一个case进行阐述,下面以Table-A Inner Join Table-B为例,其中Table-A是一张小表:
基于Bloom Filter算法的Runtime Filter Join优化机制的执行流程如下:
在逻辑执行计划过程中,我们实现了相应的Rule,首先判断小表Table-A 是否可以广播,如果可以广播会走BroadcastHashJoin,因BroadcastHashJoin不会引入shuffle、sort,往往性能表现良好,因此对于满足BroadcastHashJoin的场景,保持Spark原生计算逻辑;否则,会默认走SortMergeJoin;
2. 对于SortMergeJoin,上述Rule会根据表数据文件大小和schema信息对两个表的Row count进行评估,并基于两表的Row count信息进行代价评估(例如:Table-A-Row-count/Table-B-Row-count小于一定阈值等),当小表Row count与大表Row count满足一定条件,上述Rule会基于Table-A的join keys动态构建BloomFilter,我们定义为RuntimeBloomFilter,并作为Filter算子动态插入到Table-B的过滤条件中,因RuntimeBloomFilter会在大表shuffle前进行过滤,提前过滤掉大表侧join时不会被命中的数据,从而减少大表的shuffle量;
3. 在物理执行计划过程中,同样实现了相应的Rule,该Rule会根据Join策略的不同,检查SortMergeJoin的Left或Right节点中,是否在逻辑执行计划中已基于小表join keys构建了BloomFilter对大表侧进行过滤,如果存在,该步骤会自动跳过;否则,我们会基于AQE在执行计划中,基于小表join keys动态构建BloomFilter,这里定义其为ShuffleBloomFilter,并作为Filter算子动态插入读取大表shuffle数据后面,对shuffle数据过滤,从而减少大表join 前sort的数据量、减少spill数据量。这里与以上优化的不同点在于,其作为AQE的一条规则且依赖于AQE功能的开启,该项优化主要是用于优化中间Stage。
基于上述架构设计在实现中面临的挑战及技术攻坚:
1. BloomFilter算子支持codegen
为了实现上述架构设计,我们内部自定义了BloomFilter算子,为了提升该算子的处理性能,内部实现的BloomFilter算子支持codegen。
2. 支持多join keys场景且多个join keys只需一个BloomFilter
两张表join时,可能存在多个关联键,例如:Table-A a Inner Join Table-B b on a.col1=b.col1 and a.col2=b.col2,如果基于小表每个join key都构建一个BloomFilter并分别作为Filter算子对大表侧进行过滤,会导致多次读取小表,增加IO,另外由于BloomFilter是以广播向量的方式存在,如果产生过多的BloomFilter会带来driver和executor的内存压力,为此我们采用XxHash64 对join keys 进行处理,以获取一个新的散列值,基于新的散列值构建BloomFilter并对大表侧过滤,这样能够有效提升性能及减少OOM发生。
选取XxHash64 处理多join keys的原因:首先,XxHash64支持codegen,能够与上述实现的BloomFilter算子有机结合,另外基于HashBenchmark验证可知,XxHash64在处理基本数据类型相对Murmur3Hash、HiveHash均有不错的表现,且京东线上的join keys一般是string类型,下面是验证效果:
3. 设计构建BloomFilter的timeout fallback机制
结合上面的Runtime Filter Join的执行流程可知,两表join时如果触发了RuntimeBloomFilter或ShuffleBloomFilter特性,均会先基于小表构建相应的BloomFilter,然后再作为Filter算子对大表进行过滤,这就要求对大表的处理需要等待基于小表构建BloomFilter完后才执行,极端情况下可能会导致任务延迟问题,这在京东复杂、多变且SLA要求严格的背景下,如果想全场景铺开Runtime Filter Join特性会面临很大的挑战。为了解决上述问题,我们设计构建BloomFilter的timeout fallback机制,即在规定时间内未能完成基于“小表”构建BloomFilter,Spark执行计划会自动回退到原始处理逻辑,这样能够有效规避因大表被误判成小表,导致构建BloomFilter耗时过长所引起的性能回归问题等。
4. 设计Rule 支持BloomFilter 谓词下推
Spark SQL在多个join场景的一个典型优化就是谓词下推,即可以通过内部Rule优化,将某一组join的谓词下推到其他join,从而过滤掉其他join的无效数据,提升性能。通过分析京东线上业务场景,发现大部分任务存在多个join场景,例如:A as a join B as b on a.col1=b.col1 join C as c on a.col1=c.col1。然而,本文引入的BloomFilter算子是一种基于子查询的封装,基于Spark SQL 现有的内置Rule不能将其进行谓词下推,为此我们内部实现了相应的Rule支持BloomFilter算子的谓词下推,在满足多个join且关联键相同的情况下,该Rule能够基于某一组join构建的BloomFilter下推到其他join,最终实现基于一个BloomFilter过滤多组join的能力,下面结合一个案例进行展示,其中 tb1 和 tb2是两个大表,tb3是小表,其中tb1数据量10000条,tb2数据量是9000条,tb3数据量是10条。spark.range(100000) .select(col("id").as("a"), col("id").as("b"), col("id").as("c")) .write.format(tableFormat).mode(SaveMode.Overwrite) .saveAsTable("tb1")spark.range(9000) .select(col("id").as("a"), col("id").as("b"), col("id").as("c")) .write.format(tableFormat).mode(SaveMode.Overwrite) .saveAsTable("tb2")spark.range(10) .select(col("id").as("a"), col("id").as("b"), col("id").as("c")) .write.format(tableFormat).mode(SaveMode.Overwrite) .saveAsTable("tb3")set spark.sql.autoBroadcastJoinThreshold=-1;sql(s""" |SELECT tb1.a, | tb2.b |FROM tb1 | Inner JOIN tb3 | ON tb1.a = tb3.a AND tb3.b < 4 | Inner JOIN tb2 | ON tb1.a = tb2.a |""".stripMargin)
为了验证效果,我们这里关闭BroadcastHashJoin (spark.sql.autoBroadcastJoinThreshold=-1),执行上面语句,Spark会优先处理 tb1 Inner JOIN tb3 ON tb1.a = tb3.a AND tb3.b < 4,基于Bloom Filter算法的Runtime Filter Join优化机制会自动识别出tb3是一张小表,并基于tb3构建RuntimeBloomFilter对大表tb1进行过滤,然后进行SortMergeJoin,其DAG如图5所示,由图5可知tb1的数据量得到有效过滤,只保留满足join条件的4条数据,这在生产环境中的收益是显著的,具体上线效果在京东实际场景中的收益中会阐述。
如果不引入BloomFilter谓词下推优化,我们的优化到此已经结束,上述计算结果最终再与tb2进行SortMergeJoin,这时我们发现tb2的数据会全部参与计算,其中9000条数据均会进行shuffle并最终与第一组join的结果进行join匹配,但满足join条件的数据也仅4条,即tb2存在大量无效数据参与shuffle、sort,对于上述场景我们自定义的Rule能够有效将基于tb3构建RuntimeBloomFilter下推到第二组join里,并对tb2进行谓词下推,其具体的优化效果如图6所示,由图可知tb2的数据量也得到有效过滤,只保留了满足join条件的4条数据,最终实现了基于一组join的BloomFilter谓词下推到其他join,从而减少shuffle量,减少集群负载,提升任务时效,同时节省大量的计算资源。
基于TPC-DS 10TB 基准测试验证,如图7所示,99个查询中有9个被Runtime Filter Join特性命中,其中shuffle量减少1.5%~73.7%。
下面是京东线上的优化案例:其为某重要业务线的某商品SKU属性表(小表)与某商品SKU扩展属性模型表(大表)进行Left Outer Join,因业务需要两张表在join前会根据相关字段进行group by,最终再通过商品SKU编号进行关联,涉及业务较为复杂,这里不展开描述,该业务场景是非常符合本文的Runtime Filter Join优化,下面是优化前后的对比效果:
由上面优化前后的对比效果图8和图9可知,1)优化前,该线上任务的shuffle量大概是:16TB+,任务运行时间:4.4小时;2)基于Runtime Filter Join优化后,shuffle量减少至:200MB+,任务运行时间缩短至:4分钟,计算性能提升:95%+,同时节省了大量的计算资源。基于Bloom Filter算法的Runtime Filter Join优化机制在京东的线上优化案例不胜枚举,限于篇幅原因这里不一一展开。目前,基于Bloom Filter算法的Runtime Filter Join优化机制已全面上线(默认关闭),基于京东自研Spark版本相对Spark社区版本,命中任务平均处理数据量(shuffle量)减少72%、性能提升53%。
本文讨论京东Spark计算引擎研发团队基于Bloom Filter算法的Runtime Filter Join优化机制,助力京东大促场景的探索和实践。目前上线效果显著,但仍有一些不足及待提升项:1、目前表Row count主要是通过表文件大小及其schema信息进行判断,存在一定误差,后续会尝试开启CBO进行精准判断;
2、目前是基于小表动态构建BloomFilter,需要读取小表的数据信息,后续会考虑基于列式文件存储的一些特性来丰富整个架构,例如:如果要基于小表的全量数据构建BloomFilter,且存储格式是ORC或Parquet等,可直接基于其Data文件内部的Index BloomFilter构建全表的BloomFilter,这样可以进一步提升性能;
3、目前优化的主要是两表关联的场景,后续需要增强对多表关联等复杂场景的支持。
我们后续计划将Spark Runtime Filter Join 技术与数据湖技术相结合,助力京东湖仓一体相关场景的落地及实践。同时,我们会继续加强行业内技术交流,在分享内部技术实践与经验的同时,聆听大家的反馈和建议,在满足内部业务增长需要的同时,相关技术会反馈给社区,共同建设Spark生态。
作者:技数中心魏秀利