开发者社区 > 博文 > 京东自适应数据倾斜处理算法
分享
  • 打开微信扫码分享

  • 点击前往QQ分享

  • 点击前往微博分享

  • 点击复制链接

京东自适应数据倾斜处理算法

  • 京东零售技术
  • 2022-01-27
  • IP归属:北京
  • 41880浏览

    本文讨论了京东Spark计算引擎研发团队关于自主研发并落地数据倾斜解决方案,助力京东大规模离线计算场景的探索和实践。近年来,大数据技术在各行各业的应用越来越广泛,Spark自UCBerkeley的AMP实验室诞生到如今3.0版本的发布,已过十年,俨然已经成为大数据计算领域名副其实的老项目。虽经过不断的迭代和优化,Spark功能日趋成熟与完善,但在性能及稳定性方面,仍然还有很多可以提升的地方。数据倾斜问题作为大数据计算领域中的一个顽疾,就是其中重点之一。我们希望在京东超大规模数据体量及复杂业务场景的背景下,通过自研数据倾斜优化算法,解决现有问题,打造稳定高效的JDSpark计算引擎,助力京东大促过程中的一些应用实践,能够给大家提供一些思路和启发,同时也欢迎大家多多交流,给我们提出宝贵建议。



    什么是数据倾斜




    数据倾斜是分布式计算领域中最为棘手的问题之一,它是指在分布式的数据并行处理过程中,由于数据分片不均匀,导致大量数据集中到一个或少数计算节点上,导致少数任务的处理速度远低于平均速度,甚至直接导致OOM内存溢出,进而拖慢整个计算环节。


    数据倾斜的发生往往与数据Shuffle紧密相连,在Join(连接)、Aggregate(聚合)、Window(窗口)等计算中,必须从各个前序计算节点中将相同key相关的数据拉取到某个节点上进行计算,当数据分布不均存在hot key时,其对应计算节点上的数据量便很可能超过其他节点。



    社区版实现




    通过对线上任务的分析,我们发现约80%的数据倾斜是由Join计算导致,倾斜Stage严重影响了任务的时效性,解决SkewedJoin迫在眉睫。所幸的是,从3.0开始,Spark引入了Adaptive Query Execution(AQE)机制,可以通过分析已完成Stage的统计信息,来动态的调整后续Stage的执行,以达到更佳的优化效果。图1展示了AQE的基本架构。而SkewedJoin的优化处理,正是AQE中最重要的应用之一。


    jdls1.jpg

    图1 Adaptive Query Execution


    图2、3来自于Databricks介绍AQE的博客。其中图2展示了两表Join情况下的数据倾斜,其中左侧Table A发生了严重的数据倾斜,其分区A0的数据量远大于其他三个分区A1~A3;在计算过程中,A0-B0对应的任务Task0的计算压力将大于其他三个Task,进而拖慢整个Stage。



    jdls2.jpg

    图2 数据倾斜


    基于AQE框架,社区版本是这样处理2表Join场景下的数据倾斜:在前序Stage完成后,AQE可以获取其各个分区大小;其中的OptimizeSkewedJoin优化规则,将首先分析JoinType(如Inner、Left、Right、LeftSemi等)来判断左右两侧是否可以进行Split分割处理,判断的依据是Split之后再执行Join是否会破坏数据正确性;若可处理,则进一步分析分区大小的分布,检测是否存在倾斜分区(默认逻辑为分区大小超过中位数的5倍);分析结束后,若存在可处理的倾斜分区(图2中的A0),则对其进行分割(分割为A0-0,A0-1),并将对应分区(B0)进行Duplicate复制处理,这样Task0(A0 - B0)将被分割成为两个Task(A0-0 - B0;A0-1 - B0)。处理后的Stage将由4个Task变为5个Task,但各个Task的计算压力得到了平滑,整个Stage的执行效率得到了提升。

    在实际中,我们发现这里有三点值得注意的技术细节:

    1、 JoinType约束:目前的Split+Duplicate处理机制,会对JoinType产生约束;具体见表1,如果图3中的JoinType为RightOuter,则不能对A0进行分割,否则会导致数据正确性问题。

    2、 处理开销:对应非倾斜分区会被多次读取,如图3中的B0被读取两次。

    3、 组合爆炸:以Table A Inner Join Table B为例,若左右两侧的分区A0和B0均发生了倾斜,且分别被分割成M份和N份,则Task0最终会被拆分成M X N个Task,当M X N数值过大时,可能会导致一定的性能回归。


    jdls3.jpg

    图3 基于AQE处理数据倾斜


    jdls4.jpg

    表1 JoinType约束



    业界的改进




    通过观察线上任务,社区版数据倾斜处理能力可以明显提升命中任务的时效性,但其依然存在一些不足:

    1、模式限制:目前社区版本仅支持2表Join的场景,且匹配模式非常严格(见图4,左侧为SortMergeJoin,右侧为ShuffledHashJoin);当前支持的模式中,两侧的输入不能存在聚合、映射等其他节点。

    2、语义限制:即上文所提的JoinType约束,目前在Split+Duplicate处理机制下,类似Table A leftJoin skewed Table B的情况是不能被处理的。


    jdls5.jpg

    图4 目前社区版所支持的数据倾斜模式


    目前业界主要的优化方向在于放宽模式限制,即在AQE框架下支持更多的模式,以如图5为例,假设Shuffle2中发生数据倾斜,但由于Shuffle1端存在Aggregate算子,导致匹配不上图4中的模式。目前业界主要有两种实践方向:

    方案1:主动引入额外的Shuffle将复杂Stage转化为社区版可以处理的简单模式。在Aggregate节点后,主动插入Shuffle,导致Shuffle3与Shuffle2之间的Join计算满足图4中的模式;。

    方案2:扩展并维护倾斜模式集,需要不断将线上遇到的倾斜模式添加进集合,如图5这种单侧存在聚合的模式。


    jdls6.jpg

    图5 左侧存在聚合节点的数据倾斜



    京东自适应数据倾斜算法




    业界的改进均可以拓宽数据倾斜处理的覆盖面,但我们在引入京东平台时,发现了其中的一些缺陷:

    方案1:主动引入的Shuffle有时是可以避免的,如图5中的Shuffle3可以通过调整数据倾斜算法来避免的。

    方案2:除了图5中的单边聚合模式,还需要支持多表Join模式、含Union的模式等;特别的,倾斜模式之间可能是关联的,比如Union节点的每个子节点分属不同模式,或多表Join中的某些节点又包含Aggregate节点;这导致很难枚举所有可能的模式,且难以维护,很容易陷入头痛医头脚痛医脚的困境。

    通过仔细分析平台任务的倾斜模式,并深入调研业界已有的改进工作,我们归纳出了一种较为通用的优化算法。不再使用模式匹配的方法,而是分析节点语义,搜索出可以分割的倾斜叶子节点,并进行分割和组合处理。其处理逻辑如下:

    OptimizeSkewedJoinV2 :

        Check operator types

        { Non-Skewed SMJ/SHJ/Aggregrate/Window, BHJ/Sort/Project/Filter/… }

    Find Splittable Shuffles:


    jdls7.jpg


    Split Splittable Skewed partitions & Duplicate other partitions

    Check Combinatorial Explosion & Re-split if necessary

    Mark all SMJ/SHJ/Aggregate/Window Skewed


    步骤一:出于保证数据正确性的考虑,利用白名单(包含Sort、Project、BHJ、Non-Skewed SMJ/SHJ/Aggregate/Window等)对全部子孙节点进行类型检查,若满足要求,则可以进行后续优化;

    步骤二:语义分析,搜索Splittable叶子节点:

    • 遇到SMJ/SHJ (Inner/Cross)节点,搜索左右子节点;

    • 遇到SMJ/SHJ (LeftOuter/LeftSemi/LeftAnti)节点,搜索左子节点;

    • 遇到SMJ/SHJ (RightOuter)节点,搜索右子节点;

    • 遇到SMJ/SHJ (FullOuter)节点,停止当前路径搜索;

    • 遇到Aggregate/Window节点,停止当前路径搜索;

    • 遇到其他节点,则搜索其子节点; 

    • 所有最终到达的Shuffle节点均为Splittable;

    步骤三:利用分区信息,分析Splittable叶子节点是否发生倾斜;若发生倾斜,则对其进行分割,并复制其他分区数据;

    步骤四:对组合数进行检测,当超过阈值(默认1000)时,会对分割数进行压缩;

    步骤五:对分割后的数据进行组合遍历,生成最终的Task。并将树结构中的SMJ/SHJ/Aggregate/Window节点标记为Skewed状态,以避免EnsureRequirements引入新的Shuffle;


    jdls8.jpg

    图6 在复杂Stage中搜索Splittable叶子结点


    以图6为例,步骤二中筛选出Shuffle 0/2/4具备可分割性,当其中发生数据倾斜时,则可以被处理;步骤三中,对于每个分区,会通过遍历所有的Split组合生成优化后的任务;

    假设对于分区0,Shuffle 0/2/4均发生了数据倾斜,并均被分割成了100份,则通过组合遍历生成一百万(100X100X100)个新Task,但这会导致Shuffle 1/3/5中分区0的数据均被读取了一百万次;步骤四会对组合数进行压缩,将分割方案由100X100X100压缩至10X10X10。



    线上案例




    目前我们设计的新算法已经上线一段时间,应用于保障关键链路中的计算任务。


    jdls9.jpg

    图7 四表Join倾斜


    图7为一个典型的线上案例,在计算过程中发生了4表Join倾斜(2SMJ+1BHJ),其中最左侧的Shuffle数据发生了倾斜;


    jdls10.jpg

    图8 四表Join倾斜优化相关日志


    图8中的日志记录了整个处理过程,从Stage的Project节点开始,首先发现两个SMJ(#2002,#1996)和三个Shuffle输入(#12,#9,#11);分析中发现JoinType均为LeftOuter,故最左侧的Shuffle(#12)满足Splittable条件;接着对Shuffle(#12)进行分析,发现多个倾斜分区,于是进行分割处理;


    jdls11.jpg


    优化前Stage 20共包含3000个Task,耗时长达1~2小时;优化后共生成3051个Task,耗时优化至11分钟,显著提高了时效性。



    总结与展望




    京东Spark平台承担了海量的计算任务,常有几千行的大SQL出没,计算逻辑千差万别。我们对线上倾斜模式进行分析,并对业界处理方法进行借鉴,归纳总结出一种较为通用的处理算法。不过目前仍然存在不少的待优化项需要继续进行攻克:

    1、目前的算法需要所有叶子节点均为Shuffle/Broadcast类型,尚不支持Bucket Join。后续需要支持DataScan类型的叶子节点,以支持Bucket Join的推广。

    2、突破JoinType限制:通过对线上任务的分析,目前仍然有一定量的类似A leftJoin skewed B、A leftJoin skewed B leftJoin C的倾斜模式。目前Split+Duplicate处理机制不能处理这种倾斜模式,需要突破当前限制,进一步拓展覆盖度。

    3、目前我们的工作主要围绕Join开展,未来需要进一步支持Window和Aggregate算子引发的数据倾斜。

    在不断提升平台整体性能的同时,我们会继续深度钻研数据倾斜、数据膨胀等异常场景,保障长尾任务的稳定性,为京东大数据平台的湖仓一体升级工程打造坚实可依赖的高效引擎基础。

    我们还会同开源社区和业界同行保持密切的技术交流,在支撑好内部计算任务的同时,也会把相关优化工作回馈给社区,共建Spark生态。


    作者:技数中心郑瑞峰