您好!
欢迎来到京东云开发者社区
登录
首页
博文
课程
大赛
工具
用户中心
开源
首页
博文
课程
大赛
工具
开源
更多
用户中心
开发者社区
>
博文
>
Flink单元测试最佳实践
分享
打开微信扫码分享
点击前往QQ分享
点击前往微博分享
点击复制链接
Flink单元测试最佳实践
自猿其说Tech
2022-01-17
IP归属:未知
2276浏览
测试
计算机编程
### 1 前言 测试是每个软件开发过程中不可或缺的一部分, 目前在flink开发过程中,缺少本地测试环境,开发完成后在bdp平台进行topic订阅,任务创建及调试,调试过程耗时效率低下,不易排查问题,针对此情况,flink单元测试可有效的解决一些问题,提升开发效率,主要有三种测试方式,其一主要测无状态的自定义函数,其二主要测有状态的自定义算子,其三是集成测试,现将这三种单元测试具体使用场景记录如下。 ### 2 测试 为进行集成测试,需加载一些配置依赖 ```xml <!-- flink 单元测试 --> <dependency> <groupId>org.apache.flink</groupId> <artifactId>flink-streaming-java_${scala.binary.version}</artifactId> <scope>test</scope> <classifier>tests</classifier> </dependency> <dependency> <groupId>org.apache.flink</groupId> <artifactId>flink-test-utils_2.11</artifactId> <version>1.13.3</version> <scope>test</scope> </dependency> ``` #### 2.1 测试用户自定义函数 ##### 2.1.1 单测无状态,无时间限制的UDF 以无状态的MapFunction为例 ```java @Slf4j public class TransPlanBillMapFunction implements MapFunction<JdwData, ClubOrderBillDto> { private static final long serialVersionUID = 8040205166533131242L; @Override public ClubOrderBillDto map(JdwData jdwData) throws Exception { Map<String, String> allMap = CharSequenceMapUtils.getAllJdwDataParam(jdwData); ClubOrderBillDto build = ClubOrderBillDto.builder() .transNeedCode(MapUtils.getString(allMap, TRANS_NEED_CODE)) .beginOrgCode(MapUtils.getString(allMap, BEGIN_ORG_CODE)) .build(); build.setSource(TRANS_PLAN_BILL); // log.info(">>TransPlanBillMapFunction-allMap:{}, build:{}", JSON.toJSONString(allMap), JSON.toJSONString(build)); return build; } } ``` 通过传参并验证数据准确性 ```java public class TransPlanBillMapFunctionTest { TransPlanBillMapFunction mapFunction = new TransPlanBillMapFunction(); @Test public void TransPlanBillMapFunctionTest() throws Exception { JdwData jdwData = new JdwData(); Map<CharSequence, CharSequence> value = new HashMap<>(); value.put("trans_need_code", "TN21110192912491"); value.put("begin_org_code", "600"); jdwData.setCur(value); jdwData.setTab("club_order_operate"); ClubOrderBillDto dto = ClubOrderBillDto.builder() .transNeedCode("TN21110192912491") .beginOrgCode("600").build(); ClubOrderBillDto result = mapFunction.map(jdwData); Assert.assertEquals(dto.getTransNeedCode(),result.getTransNeedCode()); Assert.assertEquals(dto.getBeginOrgCode(),result.getBeginOrgCode()); } } ``` ##### 2.1.2 对有状态或及时UDF和自定义算子进行单元测试(重点) 对使用管理状态或定时器的用户自定义函数的功能测试会更加困难,因为它涉及到测试用户代码和 Flink 运行时的交互,对此flink提供了一些测试工具,可测试我们自定义的函数和算子: - OneInputStreamOperatorTestHarness (适用于 DataStream 上的算子) - KeyedOneInputStreamOperatorTestHarness (适用于 KeyedStream 上的算子) - TwoInputStreamOperatorTestHarness (适用于两个 DataStream 的 ConnectedStreams 算子) - KeyedTwoInputStreamOperatorTestHarness (适用于两个 KeyedStream 上的 ConnectedStreams 算子) 现针对具体的涉及到状态的算子进行举例: 在日常开发工作中,我们用到flink的主要场景就是将不同数据源根据一个key值,进行合流,扩展出一个维度的数据,这是开发的重点功能,所以重点就是针对这个方法做单元测试,KeyedOneInputStreamOperatorTestHarness 现将项目中比较常用的场景示例如下: ```java SingleOutputStreamOperator<ClubOrderBillDto> transPlanBillUinon = clubOrderBill .filter(new TransNeedCodeEmptyFilter()).name("TransNeedCodeEmptyFilter").uid("TransNeedCodeEmptyFilter") .slotSharingGroup("default") .union(transPlanBill) .keyBy(ClubOrderBillDto::getTransNeedCode) .flatMap(new ClubOrderBillFlatMap()).name("ClubOrderBillFlatMap").uid("ClubOrderBillFlatMap") .slotSharingGroup("default") .filter(new CreateTimeEmptyFilter()).name("CreateTimeEmptyFilter").uid("CreateTimeEmptyFilter") .slotSharingGroup("default"); ``` ClubOrderBillFlatMap 这个方法主要是将tranPlanBill 和clubOrderBill 两条数据 ,根据transNeedCode字段key 进行合并数据, 此为job中重要实现功能。 ```java public class ClubOrderBillFlatMap extends RichFlatMapFunction<ClubOrderBillDto, ClubOrderBillDto> { private static final long serialVersionUID = -736468321533586125L; private transient ValueState<ClubOrderBillDto> valueState; @Override public void open(Configuration parameters) throws Exception { super.open(parameters); StateTtlConfig ttlConfig = StateTtlConfig.newBuilder(Time.days(ParallelismUtil.loadByKey("ClubOrderBillWideJobNew","day"))) .setUpdateType(StateTtlConfig.UpdateType.OnCreateAndWrite) .setStateVisibility(StateTtlConfig.StateVisibility.NeverReturnExpired) .cleanupInRocksdbCompactFilter(6000L).build(); ValueStateDescriptor<ClubOrderBillDto> stateDescriptor = new ValueStateDescriptor<>("ClubOrderBillFlatMap", ClubOrderBillDto.class); stateDescriptor.enableTimeToLive(ttlConfig); valueState = getRuntimeContext().getState(stateDescriptor); } @Override public void flatMap(ClubOrderBillDto inDto, Collector<ClubOrderBillDto> collector) throws Exception { String tableName = inDto.getSource(); ClubOrderBillDto existDto = valueState.value(); existDto = existDto == null?new ClubOrderBillDto():existDto; switch (tableName) { case CLUB_ORDER_BILL: { processClubOrderBill(collector, inDto, existDto); break; } case TRANS_PLAN_BILL: { processTransPlanBill(collector, inDto, existDto); break; } default: { log.error("数据table不存在:{}, {}", tableName, JSON.toJSONString(inDto)); throw new RuntimeException("数据table不存在:" + tableName + JSON.toJSONString(inDto)); } } } /** * trans_work_item_extend 合并表处理 * * @param collector * @param inDto * @param existDto * @throws IOException */ public void processTransPlanBill(Collector<ClubOrderBillDto> collector, ClubOrderBillDto inDto, ClubOrderBillDto existDto) throws IOException { existDto.setBeginOrgCode(inDto.getBeginOrgCode()); existDto.setTransNeedCode(inDto.getTransNeedCode()); valueState.update(existDto); collector.collect(existDto); } /** * trans_work_item_extend 合并表处理 * * @param collector * @param inDto * @param existDto * @throws IOException */ public void processClubOrderBill(Collector<ClubOrderBillDto> collector, ClubOrderBillDto inDto, ClubOrderBillDto existDto) throws IOException { existDto.setId(inDto.getId()); existDto.setClubOrderCode(inDto.getClubOrderCode()); existDto.setClubJobCode(inDto.getClubJobCode()); existDto.setTransNeedCode(inDto.getTransNeedCode()); existDto.setWaybillCode(inDto.getWaybillCode()); existDto.setClubLeaderCode(inDto.getClubLeaderCode()); existDto.setClubLeaderNumber(inDto.getClubLeaderNumber()); valueState.update(existDto); collector.collect(existDto); } } ``` 现写测试类进行这个flatMap方法的数据验证,通过测试类可以验证出数据是否准确的进行合并及state中的值是否正确。 ```java public class ClubOrderBillFlatMapTest { private KeyedOneInputStreamOperatorTestHarness<String, ClubOrderBillDto, ClubOrderBillDto> testHarness; private ClubOrderBillFlatMap clubOrderBillFlatMap; @Before public void setupTestHarness() throws Exception { clubOrderBillFlatMap = new ClubOrderBillFlatMap(); testHarness = new KeyedOneInputStreamOperatorTestHarness<>( new StreamFlatMap<>(clubOrderBillFlatMap), x -> "TN21110192912491", Types.STRING); testHarness.open(); } @Test public void MyStatefulFlatMap() throws Exception{ //club_order_bill 赋值 getClubOrderBill(); //trans_plan_bill 赋值 getTransPlanBill(); //构建期望的返回值 buildClubOrderBillDto(); //获取结果 ClubOrderBillDto result = testHarness.extractOutputValues().get(testHarness.extractOutputValues().size() - 1); //比较 Assert.assertEquals(buildClubOrderBillDto(), result); System.out.println(testHarness.getOutput()); } /** * 构造clubOrderBill数据源 * @throws Exception */ public void getClubOrderBill() throws Exception { ClubOrderBillDto dto = ClubOrderBillDto.builder() .id("XTL231273436942") .clubJobCode("CJ21110101961279") .clubOrderCode("XTL231273436942") .clubLeaderCode("XTL231273436942") .beginNodeCode("WG0000964") .beginNodeName("周口项城网格站") .transNeedCode("TN21110192912491") .createTime("2021-11-10 12:20:20") .build(); dto.setSource("club_order_bill"); testHarness.processElement(dto, 5l); } /** * 构造transPlanBill数据源 * @throws Exception */ private void getTransPlanBill() throws Exception { ClubOrderBillDto dto = ClubOrderBillDto.builder() .transNeedCode("TN21110192912491") .beginOrgCode("600").build(); dto.setSource("trans_plan_bill"); testHarness.processElement(dto, 5l); } /** * 构造期望数据 * @return */ private ClubOrderBillDto buildClubOrderBillDto() { ClubOrderBillDto dto = ClubOrderBillDto.builder() .id("XTL231273436942") .clubJobCode("CJ21110101961279") .clubOrderCode("XTL231273436942") .clubLeaderCode("XTL231273436942") .beginNodeCode("WG0000964") .beginNodeName("周口项城网格站") .transNeedCode("TN21110192912491") .createTime("2021-11-10 12:20:20") .beginOrgCode("600") .build(); return dto; } } ``` #### 2.2 测试flink作业 Apache Flink 提供了一个名为 MiniClusterWithClientResource 的 Junit 规则,用于针对本地嵌入式小型集群测试完整的作业。通过集成测试,可有效解决我们作业各个流程中的问题,提升代码质量。 现将我们用到的一个两张表数据关联的job测试用例展示如下: ```java public class ClubOrderBillWideJobTest { @ClassRule public static MiniClusterWithClientResource flinkCluster = new MiniClusterWithClientResource( new MiniClusterResourceConfiguration.Builder() .setNumberSlotsPerTaskManager(2) .setNumberTaskManagers(4) .build()); @Test public void testClubOrderBillWideJob() throws Exception { StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment(); //设置并发度 env.setParallelism(1); CollectSink.values.clear(); // club_order_bill SingleOutputStreamOperator<ClubOrderBillDto> clubOrderBill = env.fromElements(getClubOrderBill()); // trans_plan_bill SingleOutputStreamOperator<ClubOrderBillDto> transPlanBill = env.fromElements(getTransPlanBill()); //处理数据 SingleOutputStreamOperator<ClubOrderBillCPo> cpoInfo = handleData(clubOrderBill, transPlanBill); //落入sink cpoInfo.addSink(new CollectSink()); env.execute("ClubOrderBillWideJobTest"); log.info("testClubOrderBillWideJob 结果为:{}", JSONObject.toJSONString(CollectSink.values)); //数据比较 Assert.assertEquals(CollectSink.values.get(0).getTransNeedCode(), buildClubOrderBillCPo().getTransNeedCode()); } // 落sink private static class CollectSink implements SinkFunction<ClubOrderBillCPo> { public static final List<ClubOrderBillCPo> values = Collections.synchronizedList(new ArrayList<>()); @Override public void invoke(ClubOrderBillCPo value) throws Exception { values.add(value); } } } ``` 关于MiniClusterWithClientResource的一些备注: - 为了不将整个 pipeline 代码从生产复制到测试,请将你的 source 和 sink 在生产代码中设置成可插拔的,并在测试中注入特殊的测试 source 和测试 sink。 - 这里使用 CollectSink 中的静态变量,是因为Flink 在将所有算子分布到整个集群之前先对其进行了序列化。 解决此问题的一种方法是与本地 Flink 小型集群通过实例化算子的静态变量进行通信。 或者,你可以使用测试的 sink 将数据写入临时目录的文件中,或者测试数据库。 - 建议始终以 parallelism > 1 的方式在本地测试 pipeline,以识别只有在并行执行 pipeline 时才会出现的 bug。 - 优先使用 @ClassRule 而不是 @Rule,这样多个测试可以共享同一个 Flink 集群。这样做可以节省大量的时间,因为 Flink 集群的启动和关闭通常会占用实际测试的执行时间。 ### 3 总结 之前在开发过程中常常会隐藏一些bug很难被发现,现在通过flink单元测试可以有效的解决一些问题,提升开发效率,也希望通过分享的方式来加深自己对flink的认识,同时也可以对有需要的同学提供一些帮助,以上仅个人的一些浅谈,希望更多的大神指教学习,也欢迎一些对flink有兴趣的同学一起讨论学习。 ------------ ###### 自猿其说Tech-京东物流技术发展部 ###### 作者:马俊杰
原创文章,需联系作者,授权转载
上一篇:直播线上实时翻译和流式字幕技术实践与应用
下一篇:京东云PostgreSQL在GIS场景的应用分享
相关文章
安全测试之探索windows游戏扫雷
Jmeter压测实战:Jmeter二次开发之JSF采样器实现
Laputa自动化测试框架介绍
自猿其说Tech
文章数
426
阅读量
2163575
作者其他文章
01
深入JDK中的Optional
本文将从Optional所解决的问题开始,逐层解剖,由浅入深,文中会出现Optioanl方法之间的对比,实践,误用情况分析,优缺点等。与大家一起,对这项Java8中的新特性,进行理解和深入。
01
Taro小程序跨端开发入门实战
为了让小程序开发更简单,更高效,我们采用 Taro 作为首选框架,我们将使用 Taro 的实践经验整理了出来,主要内容围绕着什么是 Taro,为什么用 Taro,以及 Taro 如何使用(正确使用的姿势),还有 Taro 背后的一些设计思想来进行展开,让大家能够对 Taro 有个完整的认识。
01
Flutter For Web实践
Flutter For Web 已经发布一年多时间,它的发布意味着我们可以真正地使用一套代码、一套资源部署整个大前端系统(包括:iOS、Android、Web)。渠道研发组经过一段时间的探索,使用Flutter For Web技术开发了移动端可视化编程平台—Flutter乐高,在这里希望和大家分享下使用Flutter For Web实践过程和踩坑实践
01
配运基础数据缓存瘦身实践
在基础数据的常规能力当中,数据的存取是最基础也是最重要的能力,为了整体提高数据的读取能力,缓存技术在基础数据的场景中得到了广泛的使用,下面会重点展示一下配运组近期针对数据缓存做的瘦身实践。
自猿其说Tech
文章数
426
阅读量
2163575
作者其他文章
01
深入JDK中的Optional
01
Taro小程序跨端开发入门实战
01
Flutter For Web实践
01
配运基础数据缓存瘦身实践
添加企业微信
获取1V1专业服务
扫码关注
京东云开发者公众号