外部发布使用:https://joyspace.jd.com/pages/KfwqlMUoXQaGR3oGX4dq
联合作者:
陈美航 京东集团-京东零售-平台产品与研发中心-数据资产与应用部-数据研发部-数据资产研发组
张越 京东集团-京东零售-平台产品与研发中心-集团数据计算平台部-实时平台研发部
0 前言
在流量领域的转化分析、搜索推广算法及AI等数据分析应用场景中,流量资产的质量直接影响到业务的监测和运营。作为流量资产的基石,流量数仓在应对快速变化和多样化的业务需求时,如何在提高效率、优化用户体验和控制成本方面做到最佳?本文将方案设计、链路优化、湖架构新特性研发、大促保障及监控设计等多角度全方位介绍湖仓一体技术在流量资产的探索和实践。
1 转型前,流量数仓有哪些痛点?
1.1 架构负担重
原流量数仓采用典型的Lambda架构,虽然能够给用户提供离/在线两种时效数据,但长期以来存在离线、实时数据对不齐,多种计算组件和架构范式导致的成本负担高等问题,用户很难专注于解决业务问题实现价值。
从成本上看,
- 离线、实时链路相互独立运转,计算资源double,增加成本约100W/年;
- 离线链路为优化时效,采用小时级加工方式代替原T+1批处理,但确引入导数据冗余存储及同一份数据多次copy问题,存储及计算成本增加近40W/年;
从效率上看,资产模型的开发迭代需要在离线和实时研发人员间拉通对齐,整体效率不高;
从体验上看,因离线和实时链路计算&存储异构,常常因为数据间有gap,或者表/流对不齐,需要下游用户花费更多的时间和精力排查解决,使用体验差;
1.2 模型割裂
不同类的用户交互行为模型,如曝光、点击、浏览、订单等,又按照埋点上报端来划分为主站APP、M、PC、WEIXIN及各垂站APP等,再考虑实时和离线,同一类的交互模型将衍生出10余张表或流模型,共40+模型,这将带来很多问题。
体验差,效率低:
- 完整的用户行为分析需要接入以上全部模型,接入时和后期迭代运维工作量巨大;
- 各模型间字段命名不统一,列对不齐,各端上报的埋点规则也不一致,取数开发成本非常高;
冗余计算,成本高:
- 业务使用的字段需要从诸如json_param,event_param等扩展列取值,下游数千用户重复计算提取;
1.3 时效性不完备
仅有流式秒级和T+1天级时效数据,没有提供近实时数据能力,考虑到秒级数据的成本,一些用户只能选择使用T+1数据。流量数仓GDM层T+1时效具备时间日常一般为凌晨2点,下游接入后,在面对内部复杂业务场景时,计算处理时间捉襟见肘,数据SLA难以保障,大促或活动流量高峰时,影响更为明显。
2 为什么选择湖仓框架来解决以上问题?
湖仓架构(Delta Lake, Hudi等)具有许多诸如支持更新、存算分离、支持非结构化和结构化多种数据类型等特性,但应用到流量资产场景时,我们最关注以下四个关键特性:
- 端到端分钟级能力;
- 支持事务,保障数据在多方并发读写时的一致性,写入任务异常时的回滚清理;
- MVCC功能,提供轻量级时间旅行能力,在处理小文件问题时尤为重要;
- 数据重新布局能力,有效降低存储和查询成本;
3 湖仓链路架构设计
模型精简统一:曝光、点击、浏览、订单等不同用户交互行为内的各端及实时、离线模型,数据统一,模型名称统一,字段名称统一,由之前40+模型简化为4个逻辑模型,8个实体模型;
用户交互类型 | 时效 | 旧模型 | 新模型 |
曝光 | 实时 | ja2015_311210_ep,inner_ja2015_311210_ep,mo_j2011_1_ep,inner_m_ep,inner_wxapp_ep,inner_app_ep,exp_log.100000 | gdm_jdr_sch_d14_traffic_expo_di (Topic) |
离线 | gdm_m14_wireless_exposure_osp_eid_log,gdm_m14_wireless_exposure_log,gdm_m14_m_exposure_log, gdm_m14_wxapp_exposure_log,gdm_m14_unify_exposure_log gdm_m14_pc_exposure_log | gdm_jdr_sch_d14_traffic_expo_di | |
准实时(分钟级) | 无 | ||
点击 | 实时 | ja2015_311210_cl,inner_m_cl_ja2015_311210,mo_j2011_1_cl, inner_m_cl, inner_wxapp_cl, inner_app_cl, other.000000 | gdm_jdr_sch_d14_traffic_click_di (Topic) |
离线 | gdm_m14_wireless_click_osp_eid_log,gdm_m14_wireless_click_log,gdm_m14_m_click_log,gdm_m14_wxapp_click_log,gdm_m14_unify_online_log_mark,gdm_m14_pc_click_log | gdm_jdr_sch_d14_traffic_click_di | |
准实时(分钟级) | 无 | ||
浏览 | 实时 | ja2015_311210_pv,inner_m_pv_ja2015_311210,mo_j2011_1_pv,inner_m_pv,inner_wxapp_pv,inner_app_pv,www.100000 | gdm_jdr_sch_d14_traffic_pageview_di (Topic) |
离线 | gdm_m14_wireless_online_log, gdm_m14_m_online_log, gdm_m14_wxapp_online_log, gdm_m14_unify_online_log, gdm_m14_pc_online_log | gdm_jdr_sch_d14_traffic_pageview_di | |
准实时(分钟级) | 无 | ||
订单 | 实时 | ja2015_311210_sr_od, inner_m_sr_od_ja2015_311210, mo_j2011_1_sr_od, inner_m_sr, inner_wxapp_od, inner_app_sr_od, order.100000 | gdm_jdr_sch_d14_traffic_order_di (Topic) |
离线 | gdm_m14_wireless_order_log, gdm_m14_m_order_log, gdm_m14_wxapp_order_log, gdm_m14_unify_order_log, gdm_m14_ol_sale_ord | gdm_jdr_sch_d14_traffic_order_di | |
准实时(分钟级) | 无 |
标准且易用:将数据ETL过程封装成埋点解析SDK,对埋点上报数据进行统一解析,形成含义清晰的标准字段(例如shp -> shop_id , rtm -> receive_ts等),埋点上报的复杂字段及解析逻辑对下游用户透明;
数据语义保证:通过Flink任务集成解析SDK,将原始数据加工成最终模型,并事务生产到JDQ,同时下游通过Flink任务写入Hudi表中,保证链路处理语义为Exactly-Once;
优化效率及成本:Hudi表按照业务域分区,方便下游裁剪数据按需取数;数据分钟级可见,存算成本需要比原表低;
容灾:设计冷备批修复链路,应对所有可能出现异常导致的数据不可用情况。
新链路架构图如下:
4 转型的挑战与优化
在流量资产湖仓一体化转型的过程中,也遇到了很多的挑战,平台侧基于开源Hudi,在底层引擎做了大量的优化,数资团队也与数据湖平台团队展开紧密联动,“遇山开路,逢水搭桥”!下面着重介绍下在实际落地过程中,都遇到了哪些挑战,以及是如何解决的
4.1 数据湖团队 -- 从底层引擎的视角,面临的挑战与优化
4.1.1 数据湖多模态 IO 能力
挑战:京东某些HDFS集群在热点时段繁忙度较高。此时Flink流写会出现timeout重试甚至异常重启,导致写入性能均值吞吐受限,且扩Flink资源对性能提升效果不明显
解决方案:自研湖表多模态IO能力,将湖表物理存储划分为缓冲层和持久层,流量数据先写高性能缓冲层,极大提升写入性能与任务稳定性,且吞吐能够横向扩展。
开源Hudi版本,其元数据和物理存储是一对一的关系,数据湖团队创新性的设计了多模态IO的能力,结合Hudi自身元数据,构建统一的IO抽象层与Hudi逻辑表视图能力,实现湖表分层存储。对底层物理存储而言,以可插拔的方式集成不同的存储介质,例如HDFS、OSS、Kafka、HBase和Redis等,再结合Hudi元数据,构建统一的Hudi逻辑表视图,对外暴露逻辑表,实现对查询、写入端的透明;不同存储介质间,扩展Hudi表服务能力,即Clustering/Compaction 实现在完成常规湖表文件整理工作的同时,同步完成不同物理介质间的数据“搬运”工作。
湖表数据缓冲层,即为湖表多模态IO的一个具体的落地场景,将湖表物理存储划分为以高性能HDFS为基础的数据缓冲层和以共享HDFS为基础的数据持久层,热数据先写入到缓冲层,后续通过Compaction和Clustering等表服务搬运至持久层,以满足海量数据入湖场景下对于性能和稳定性的强烈诉求。这里需要注意的是,数据写入缓冲层并commit后即对下游可见,无论后续“搬运工作”何时开始与结束都不影响数据可见性与时效,另外因为复用了湖表自身的Compaction和Clustering表服务能力,来实现缓冲层到持久层的数据搬运工作,该行为同样遵循湖表的commit机制与MVCC快照隔离设计,因此湖表开启数据缓冲层后,同样具有原子性、事务性保障以及Exactly Once语义。
下面是一个普通湖表读写流程与开启湖表缓冲层功能后的湖表读写流程之间的对比
开启湖表多模态IO能力,后Hudi写入吞吐提升100%,CP时长减少95%,稳定性提升97%
4.1.2 湖表动态分区策略
挑战:业务分区倾斜严重,最大分区与最小分区数据量相差730万倍,需解决数据倾斜导致的写入性能和小文件问题
解决方案:在Flink流写Hudi的核心拓扑链路上,新增一个自定义Partitioner策略,根据湖表数据特征来路由数据,从而缓解数据倾斜,控制文件数量
4.1.3 跨引擎无锁并发 Append
挑战:随着流量数据量的持续提升以及大促场景下的骤增,受Flink机房资源限制,单个Flink集群扩容出现瓶颈,此时需要将任务拆分多个,并发Append同一张流量湖表。
解决方案:解决元数据冲突、JM与TM通信冲突、以及Hive元数据同步的冲突,实现轻量级的无锁并发Append,解决扩展性问题。
4.1.4 数据湖元数据 Timeline 优化
挑战:对于历史数据回溯入湖等批量操作,会提交“多且大”的元数据信息,从而影响正常元数据即Timeline访问的性能。
解决方案:针对通过湖表Replace-Commit的方式,来进行历史数据回溯,并在元数据访时,增加智能跳过、懒加载等机制,从而提升元数据访问性能。
4.1.5 异步增量排序Clustering
实时写入的数据文件通常较小且无序,数据湖团队扩展社区Clustering的能力,实现数据增量排序、合并,从而在合并小文件的同事,提升数据压缩比。在增量排序后,存储能够节省60%!
4.2 数资团队 -- 从业务应用所做的优化及保障
流量资产是零售数据量最大主题,没有之一,日常千亿级别的量级,大促期间更是成倍上涨,由此带来整条数据链路上各个环节的架构组件都需要承受更大的计算压力和网络带宽冲击,以下通过链路性能优化、稳定性优化及监控等多方面介绍数资团队在转型过程中所做工作。
4.2.1 链路性能优化
4.2.1.1 反序列化优化
新模型仍旧以JSON为数据格式,随着比旧模型多了120+字段,行级的反序列化成本很高,尤其数据量级增大之后,往往成为整个任务的性能瓶颈。
为了处理好反序列化(Deser)的开销,我们做了两项工作:
- Flink流式处理过程中消息的Deser操作一般在Source算子中执行,该算子决定了整个任务的处理性能上限,受限于Flink Source Subtask和JDQ分区1对N的关系,无法通过扩大Source算子的并行度来提升性能,因此我们换了一种思路,将Deser操作从Source算子中抽出来,Source算子只负责从JDQ拉取字节级的消息数据,通过注册一个专门Deser的Lookup Table(为什么使用Lookup Table? Lookup Table可直接返回RowData类型,可直接用于入湖Sink算子,避免其他转换开销)来执行Deser操作,该Lookup算子并行度可按性能要求调整。
- 针对曝光数据,受限于Flink单任务瓶颈,我们采用拆任务方式来提高入湖性能,但拆分需要根据消息所属业务域来进行,为避免拆分后的任务仍需要序列化整体数据,我们将消息业务域字段写入JDQ 消息 Headers中,拆分后的任务只需要解析Headers来过滤掉不需要的业务域数据,而不是通过Deser整个消息后来过滤。
4.2.1.2 写入性能优化
处理分区倾斜处理,按业务域分区后,曝光模型有些分区(S和Y分区)的倾斜度高达730万倍
如果不做任何处理直接入湖,入湖后的文件数将会是
File num = Sink并行度 * 表分区数
例如表分区数为 54,Sink并行度2000,那么一次commit产生的文件数至少为108000个,数据量最小的Y分区单个文件大小为KB级别,会形成非常多的小文件,无论是对NS还是后续Clustering操作都形成巨大压力,单个subtask要写54个分区的文件,在多个分区文件句柄间切换也非常耗性能。
优化:根据各个分区的大小来设计动态分区策略,数据量较大的分区,比如S,H,P,单独路由到特定的subtask集合,集合的大小根据该分区对应的数据量大小确定,S 分区占据了近一半的数据,因此它的subtask集合占据了Sink算子并行度的一半,而其他非常小的分区则三两成队路由到单独的subtask(尽量减少单个subtask写分区数量,减少写入句柄切换)。按此方式优化后,一次commit写入的文件约为6000+,减少了94%,写入性能提升了1倍多。
流写HDFS优化,HDFS的读写参数默认为批读批写服务,比如默认写parquet block size为128M,但遇到流写场景,block size过大会影响流式写入性能,因此将此参数write.parquet.block.size调整至10~20M,形成微块写入,性能有显著提升,提升将近1倍,但也要考虑因此带来的下游读数性能下降,目前可通过Clustering来规避该问题。
4.2.2 链路稳定优化
4.2.2.1 JDQ集群稳定优化
挑战:因曝光数据量过大,导致下游接入业务消费速度无法跟上生产,进而大量冷读磁盘数据导致JDQ集群磁盘繁忙引起整体性能抖动,转而影响正常消费的入湖任务,导致入湖时效延迟。
优化方案:合理限制曝光流生产速度,并将下游按重要等级划分,依据不同等级限速,保障JDQ集群以合理的负载扛过大促洪峰阶段,待消峰后再逐步放开限速。
4.2.2.2 Clustering稳定优化
挑战: 异步增量Clustering需要做分区内全局数据排序,期间会产生大量Shuffle操作,日常会借助Spark RSS服务来稳定Shuffle操作,但大促洪峰时大量频繁的Shuffle即使使用RSS服务也会有Fetch Failed问题,甚至会对RSS服务造成冲击。
优化方案:平台侧研发了Clustering关闭排序功能的动态开关,可避免Shuffle,数资侧通过DUCC来控制开关,从而在大促洪峰期间可视数据量级大小判断是否关闭排序功能,防止Clustering Fetched Failed问题,降低RSS服务的风险。
4.2.3 流转批处理
挑战:在处理实时流表时,我们需要一种机制来感知流表的业务时间水位线,并判断何时 T - 1 的数据已准备就绪,以便正确启动下游的离线加工任务。
解决方案:我们通过流量湖表的流转批机制来实现这一目标。具体做法先改造Flink Hudi Writer,使其能够将业务时间记录在元数据中,再通过流转批服务定期扫描湖表元数据中记录的业务时间水位线。当所有分区的水位线最小值都超过 T 的零点时,并且对应的 Clustering 文件合并任务已完成时,这就表示 T - 1 的数据已准备就绪,此时下游依赖 的加工任务就可以运行。
4.2.4 湖链路容灾及监控
4.2.4.1 监控
除常规的Flink任务、JDQ生产消费监控外,我们额外针对hudi表设置了数据水位线的监控,具体方式和流转批逻辑类似,包括了流写的和异步clustering的水位线监控,分别对应图中的缓冲层(多模态IO具体实现)和持久层。
此外,Flink入湖任务checkpoint间隔为15min,如果仅使用JDQ消费积压监控来判断任务运行情况,则可能会出现故障延迟发现情况。例如T1时刻入湖作业出现故障,我们设置的报警会在T2时刻之后才因积压超过阈值而触发,(T1,T2)这段能及时发现问题处理问题的时间被浪费掉。
实时计算平台在Flink 1.16镜像提供了业务延迟监控解决了以上问题,并支持以此metric配置对应报警,能够第一时间发现任务异常情况。
4.2.4.2 容灾
平台侧研发了《Append模式下的跨引擎并发写》能力,支持COW表纯Append写入模式下Flink和Spark并发写入(不同分区),利用该能力业务侧搭建了湖表冷备容灾链路,并通过只修复异常期间数据来缩短整体修数时效,以曝光数据为例,修复全天数据有之前5~6小时缩短至2~3小时。
5 收益
5.1 SLA和数据时效提升
T+1 SLA提升,数据量级越大SLA提升越明显,给下游带来的Buffer越大,日常时效提升如下表所示,
模型 | Hive(旧模型) | Hudi(新模型) | SLA提前时间 |
订单 | 00:25 | 00:20 | 5min |
浏览 | 00:45 | 00:30 | 15min |
点击 | 01:10 | 00:30 | 40min |
曝光 | 02:00 | 00:45 | 75min |
新增近实时数据,为兼顾性能和成本考虑,目前设定延迟为15min,待进一步优化后有望减小至10min内。
5.2 存算成本节降
以数据量曝光表和点击表为例:
曝光表 | Hive(旧模型) | Hudi(新模型) | 节约 | 备注 |
计算链路成本(元) | 6546/天 | 8250/天 | -1704/天 | 预期内的高时效及更多字段解析带来的成本上升 |
存储成本(元) | 61257/天 | 34668/天 | 26589/天 | 1.湖仓技术数据重分布能力带来更高数据压缩比,100T -> 42.3T 将近60% 2.旧模型包含单分区和多分区表 |
总成本(元) | 67803/天 | 42918/天 | 24885/天 | 合900万/年 |
点击表 | Hive(旧模型) | Hudi(新模型) | 节约 | 备注 |
计算链路成本(元) | 645/天 | 1961/天 | -1316/天 | 预期内的高时效及更多字段解析带来的成本上升 |
存储成本(元) | 14623/天 | 3656/天 | 10967/天 | 1.湖仓技术数据重分布能力带来更高数据压缩比,12T -> 5T 将近60% 2.旧模型包含单分区和多分区表 |
总成本(元) | 15268/天 | 5617/天 | 9651/天 | 合352万/年 |
两个数据量级最大的表预计节约存算成本1200+万/年。
5.3 查询成本节降
验证场景1:约60个字段length求总和(旧模型共约60个字段,新模型共180+字段)。
场景1结果:
Hive(旧模型) | Hudi(新模型) | 节约 | |
计算费用(元) | 249 | 195 | 27.7% |
耗时(min) | 69 | 59 | 14.5% |
验证场景2:抽样统计从旧模型迁移到新模型的24个任务,对比历史(5月)与迁移后(9月)成本表现。
场景2结果:
Hive(旧模型) | Hudi(新模型) | 节约 | |
计算费用(元) | 11161元/7天(5月统计) | 6658元/7天(9月统计) | 40.3% |
结论:新Hudi表模型查询成本及耗时优于旧Hive表模型,主要归功于Hudi的数据重新排序分布带来的文件压缩以及新模型在字段预先提取,避免下游重复计算。
5.4 业务反馈
- 秒送业务反馈,接入湖仓架构表后SLA从11点提前到8点,彻底告别大促T+2看数历史。
- 用增业务反馈,新模型融合了所有端上数据,字段统一,使用方便。
- 搜推业务反馈,SLA相比之前至少提前1小时,数据链路执行时效非常稳定。
...
6 总结与展望
流量资产在湖仓一体架构的转型落地,解决流量资产在原Lambda架构下离在线数据对不齐,计算及存储架构范式不统一等问题,流批数据同源同模型,并额外提供了近实时增量数据,提升用户接入体验和效率,拓宽业务使用场景,目前总入湖数据量达120PB,数资团队和数据湖团队通过一系列优化确保该链路在首次大促整体运行平稳。
6.1 大促表现
****大促期间,湖仓一体流量新模型全时段平稳运行、无延迟****
以数据量最大的曝光模型为例,双十一大促期间曝光总量同比增长 79.7%;其中大促晚八高潮期,曝光峰值达 7.7亿条/min,均值达 4.5亿条/min。
备战压测 | 大促实际峰值 | |
入湖性能 | 8亿+条/min | 7.7亿/min |
入湖消费带宽 | 30+GB/s | 23.5GB/s |
大促(11月12日)SLA,相比旧模型,新模型时效更为稳定和日常差距不大。
模型 | Hive(旧模型) | Hudi(新模型) | SLA提前时间 |
订单 | 00:25 | 00:20 | 5min |
浏览 | 01:55 | 00:36 | 79min |
点击 | 01:25 | 00:42 | 43min |
曝光 | 02:45 | 00:54 | 111min |
6.2 未来规划
除了本次在流量资产应用湖仓一体的能力外,数资团队与数据湖团队还会继续合作,在以下一些场景加强建设和应用。
生产环境流读流量数据实践
如果能实现流读->入湖完整串起整条链路,则为批流一体打下基础,但目前还未在生产环境验证过流读,主要原因为1.部分场景不具备流读能力(数据量过大处理不过来,或者上游并发写入),2.流读的窗口期,依赖上游的表配置,如何确保下游能流读完整也是要着重考虑的。
纯外键关联PartialUpdate能力
当前的PartialUpdate能力是基于主键进行局部更新,但在数仓模型开发中存在大量需要通过外键关联更新的场景, 为方便业务使用一般会在事实宽表中冗余存储维表的主键和维度列,如果维表发生变更,一般需要重刷事实表记录,而基于外键的PartialUpdate利用湖表主键、外键索引的能力实现高效的流式局部更新,减轻额外计算层关联开销,加速数仓外键关联场景的模型加工效率。
Hudi秒级别实时能力
数据在入湖时,先写入数据文件和元数据,提交事务成功后数据才可见,数据文件的写入比较慢,因而一般为分钟级提交。这里可以扩展湖表多模态IO的能力,将数据写入到Kafka、HBase、Redis等高速存储中,在湖表的元数据中记录高速存储的位点、key等元信息。在读取时再持久存储与高速存储的内容联合到一起,从而使Hudi实现秒级别时延。