您好!
欢迎来到京东云开发者社区
登录
首页
博文
课程
大赛
工具
用户中心
开源
首页
博文
课程
大赛
工具
开源
更多
用户中心
开发者社区
>
博文
>
一种通用的业务监控触发方案设计
分享
打开微信扫码分享
点击前往QQ分享
点击前往微博分享
点击复制链接
一种通用的业务监控触发方案设计
jd****
2023-08-21
IP归属:北京
6800浏览
# 一、背景 <span style="font-family: 宋体;" data-slate-string="true"> 业务监控是指通过技术手段监控业务代码执行的最终结果或者状态是否符合预期,实现业务监控主要分成两步:一、在业务系统中选择节点发送消息触发业务监控;二、系统在接收到</span><span data-slate-string="true">mq</span><span style="font-family: 宋体;" data-slate-string="true">消息或者定时任务调度时,根据消息中或者任务中的业务数据查询业务执行的结果或状态并与业务预期的结果相对比。目前供销系统的方案如下:</span> ![](https://apijoyspace.jd.com/v1/files/KWN1B7TXWuYq0KMLIlzl/link) <span data-slate-string="true"> 由业务系统发送消息触发规则中心的校验任务,校验逻辑和报警规则通过规则中心的groovy脚本代码实现,该方案的缺点如下:</span> <span style="margin-right: 0.5em;" data-w-e-reserve="true">1.</span><span style="font-family: 宋体;" data-slate-string="true">业务监控代码掺杂在正常的业务代码中,业务监控的代码侵入性高;</span> <span style="margin-right: 0.5em;" data-w-e-reserve="true">2.</span><span style="font-family: 宋体;" data-slate-string="true">业务监控消息触发代码可复用性极低,各个应用都要维护一套代码,后期若要增加或维护某个功能时成本大;</span> <span style="margin-right: 0.5em;" data-w-e-reserve="true">3.</span><span style="font-family: 宋体;" data-slate-string="true">增加业务监控的开发工作量,开发人员需要开发和维护与业务监控功能无关的代码,如:消息触发降级功能、性能监控、异步触发等功能;</span> <span style="font-family: 宋体;" data-slate-string="true"> 为解决上述问题,本文提出了一种通用的业务监控触发方案。</span> # 二、方案介绍 <span data-slate-string="true">1\. 通用mq消息体:</span> ```java public class BusinessCheckMessage { /** * 监控类型 */ private String businessType; /** * 业务监控需要的参数 */ private Object data; /** * 业务方 */ private String businessSource; /** * 当前所属的topic */ private String topic; } ``` 其中, businessType用于区分业务监控的类型,如:终止合作、提单等; data用于存储和业务相关的关键数据,如订单id、商家id等; businessSource用于区分不同业务方的业务,如:万商的提单、供销的提单等; topic用于隔离消息,如:业务监控任务执行快的可以用主题A、执行慢的的可以用主题B等; 2.自定义注解 + 切面 以供销系统业务监控为例,接近50%的场景是将方法体中的参数作为业务数据来触发业务监控,针对此场景,本文<span style="font-size:10.5pt;mso-bidi-font-size:11.0pt; line-height:120%;font-family:宋体;mso-ascii-font-family:Calibri;mso-hansi-font-family: Calibri;mso-bidi-font-family:Times New Roman;mso-font-kerning:1.0pt; mso-ansi-language:EN-US;mso-fareast-language:ZH-CN;mso-bidi-language:AR-SA">采用</span>**<span style="font-size:10.5pt;mso-bidi-font-size:11.0pt; line-height:120%;font-family:宋体;mso-ascii-font-family:Calibri;mso-hansi-font-family: Calibri;mso-bidi-font-family:Times New Roman;mso-font-kerning:1.0pt; mso-ansi-language:EN-US;mso-fareast-language:ZH-CN;mso-bidi-language:AR-SA">注解+切面</span>**<span style="font-size:10.5pt;mso-bidi-font-size:11.0pt; line-height:120%;font-family:宋体;mso-ascii-font-family:Calibri;mso-hansi-font-family: Calibri;mso-bidi-font-family:Times New Roman;mso-font-kerning:1.0pt; mso-ansi-language:EN-US;mso-fareast-language:ZH-CN;mso-bidi-language:AR-SA">解耦业务监控代码和正常业务代码,降低业务监控代码对正常的业务代码的</span><span style="font-size:10.5pt; mso-bidi-font-size:11.0pt;line-height:120%;font-family:宋体;mso-ascii-font-family: Calibri;mso-hansi-font-family:Calibri;mso-bidi-font-family:Times New Roman; mso-font-kerning:1.0pt;mso-ansi-language:EN-US;mso-fareast-language:ZH-CN; mso-bidi-language:AR-SA">侵入,其中自定义注解负责获取业务监控需要用到的方法入参中的相关数据,切面负责组装通用mq数据模型并完成消息的发送。</span>自定义注解定义如下: ```java public @interface BusinessCheckPoint { /** * 业务监控类型 */ String businessType(); /** * 业务方 */ String businessSource(); /** * 要发送的消息的topic */ String businessTopic(); /** * 方法参数的第几个参数作为消息内容,从0开始 */ int dataIndex(); /** * 在执行业务流程前发送消息 * 默认在业务流程执行后发送消息 */ boolean beforeOperate() default false; } ``` 其中, businesstype用于获取业务监控类型; businessSource用于获取业务方; businessTopic用于获取当前要发送的消息主体; dataIndex用于获取方法体参数中的数据,从0开始; beforeOperate用于获取消息发送的时间,在业务流程执行后发送消息还是业务流程执行前发消息; 3.侵入式触发业务监控 考虑到业务系统可能会在复杂场景下触发业务监控,本文也提供了通用的解决方案,具体如何使用见后续的实战介绍。 # 三、关键代码 1.切面(根据自定义注解获取业务信息+消息体构造) ```java /** * @Description 业务监控切面 */ @Slf4j @Aspect public class BusinessCheckAspect { @Autowired private BusinessCheckHandler businessCheckHandler; @Pointcut("@annotation(com.jd.gmall.monitor.annotation.BusinessCheckPoint)") protected void pointCut() {} @Around("pointCut()") public Object doAround(ProceedingJoinPoint joinPoint) throws Throwable { try { boolean isBeforeOperate = IsBeforeOperate(joinPoint); //事前触发业务监控 if (isBeforeOperate) { sparkBusinessCheck(joinPoint); } //执行业务 Object result = joinPoint.proceed(); //事后触发业务监控 if (!isBeforeOperate) { sparkBusinessCheck(joinPoint); } return result; } catch (Exception e) { log.error("业务执行异常,", e); throw e; } } /** * 业务监控消息构建 * * @return */ protected void sparkBusinessCheck(ProceedingJoinPoint joinPoint) { try { // 获取标注注解. BusinessCheckPoint businessCheckPoint = getMethodAnnotation(joinPoint, BusinessCheckPoint.class); if (businessCheckPoint == null) { return; } //消息发送 businessCheckHandler.sendMessage(build(businessCheckPoint, joinPoint)); } catch (Exception e) { log.error("触发业务监控异常,", e); } } /** * 获取方法枚举. */ public <T extends Annotation> T getMethodAnnotation(ProceedingJoinPoint pjp, Class<T> annotationClass) { if (!(pjp.getSignature() instanceof MethodSignature)) { return null; } MethodSignature signature = (MethodSignature) pjp.getSignature(); Method method = signature.getMethod(); return method.getAnnotation(annotationClass); } /** * 构建消息 * @param businessCheckPoint * @param joinPoint * @return */ public Message build(BusinessCheckPoint businessCheckPoint, ProceedingJoinPoint joinPoint) { //方法入参 Object[] args = joinPoint.getArgs(); //方法参数的第几个参数作为消息内容 int dataIndex = businessCheckPoint.dataIndex(); if (args == null || dataIndex >= args.length) { return null; } //业务监控类型 String businessType = businessCheckPoint.businessType(); //业务方 String businessSource = businessCheckPoint.businessSource(); //要发送的消息的topic String businessTopic = businessCheckPoint.businessTopic(); //业务id String businessId = String.valueOf(System.currentTimeMillis()); //业务数据 Object data = args[dataIndex]; BusinessCheckMessage businessCheckMessage = BusinessCheckMessage.builder() .businessType(businessType) .businessSource(businessSource) .data(data) .build(); return new Message(businessTopic, JSON.toJSONString(businessCheckMessage), businessId); } /** * 根据注解判断是否在业务执行前发送消息 * @param joinPoint * @return */ public boolean IsBeforeOperate(ProceedingJoinPoint joinPoint) { try { // 获取标注注解. BusinessCheckPoint businessCheckPoint = getMethodAnnotation(joinPoint, BusinessCheckPoint.class); if (businessCheckPoint == null) { return false; } return businessCheckPoint.beforeOperate(); } catch (Exception e) { log.error("根据注解判断是否在业务执行前发送消息异常,异常信息", e); return false; } } } ``` 2.消息发送服务 ```java /** * @Description 消息发送服务 */ @Slf4j public class BusinessCheckHandlerImpl implements BusinessCheckHandler { /** * topic对应的MessageProducer */ private static Map<String, Producer> messageProducerMap = new HashMap<>(); /** * 线程池 */ private static ThreadPoolTaskExecutor commonExecutor; /** * 降级开关 */ private static boolean businessCheckSwitch = true; /** * 向规则中心异步发送消息触发业务监控任务 * 失败时不抛异常 * @param message */ protected void sendSync(Message message) { if (!businessCheckSwitch) { log.error("降级,不触发业务监控"); return; } if (message == null) { return; } CompletableFuture.runAsync(() -> { try { Producer messageProducer = messageProducerMap.get(message.getTopic()); if (messageProducer == null) { return; } messageProducer.send(message); if (log.isInfoEnabled()) { log.info("向规则中心发送MQ消息成功:{}", JSON.toJSONString(message)); } } catch (Exception e) { log.error("向规则中心发送MQ消息失败:{}", JSON.toJSONString(message), e); } }, commonExecutor.getThreadPoolExecutor()); } /** * 触发业务监控 * @param businessCheckMessage */ @Override public void sparkBusinessCheck(BusinessCheckMessage businessCheckMessage) { try { if (businessCheckMessage == null || StringUtils.isBlank(businessCheckMessage.getTopic())) { log.error("触发业务监控失败,参数不合法,businessCheckMessage = {}", JSON.toJSONString(businessCheckMessage)); return; } Message message = new Message(businessCheckMessage.getTopic(), JSON.toJSONString(businessCheckMessage), String.valueOf(System.currentTimeMillis())); sendSync(message); } catch (Throwable e) { log.error("触发业务监控失败,参数:{}", JSON.toJSONString(businessCheckMessage), e); } } /** * 向规则中心异步发送消息触发业务监控任务 * 失败时不抛异常 * @param message */ @Override public void sendMessage(Message message) { if (message == null) { return; } sendSync(message); } } ``` # 四、实战介绍 1.引入依赖 ```java <dependency> <groupId>com.jd</groupId> <artifactId>business.check</artifactId> <version>1.0.0</version> </dependency> ``` 2.初始化切面 ```java <bean id="businessCheckAspect" class="com.jd.gmall.monitor.aspect.BusinessCheckAspect"/> ``` 3.Producer及线程池赋值 ```java <bean id="businessCheckHandler" class="com.jd.gmall.monitor.service.impl.BusinessCheckHandlerImpl"> <property name="messageProducerMap"> <map> <entry key="gx_bussiness_check" value-ref="businessCheckProducer" /> </map> </property> <property name="commonExecutor" ref="asyncTaskThreadPoolTaskExecutor"/> </bean> ``` 其中, messageProducerMap类型为Map\<String, Producer>,用于指定topic对应的Producer; commonExecutor用于指定异步发送消息时用到的线程池(建议自行创建线程池); 4.业务监控消息发送 场景一: 简单场景下可使用自定义注解来发送消息,如下所示 ![图片.png](https://s3.cn-north-1.jdcloud-oss.com/shendengbucket1/2023-05-16-16-03hcn43Degw0IEiG46.png) 业务监控类型 = "100" 消息主题 = "gx\_business\_check" 业务方 = "fx" 消息体中的业务数据data = req 场景二: 复杂场景下,可在服务中注入sdk中的消息发送服务,如下所示: ![图片.png](https://s3.cn-north-1.jdcloud-oss.com/shendengbucket1/2023-05-16-15-57YjkkQMktTaDxFIl.png)场景二与场景一发送的消息内容一致。 5.业务监控降级不发送消息 sdk中的类BusinessCheckHandlerImpl中定义了控制降级的方法: ```java public static void setBusinessCheckSwitch(boolean businessCheckSwitch) { BusinessCheckHandlerImpl.businessCheckSwitch = businessCheckSwitch; } ``` 此处给出了通过ducc控制降级的方法: ```java @LafValue("business.check.switch") public void setBusinessCheckSwitch(boolean businessCheckSwitch) { BusinessCheckHandlerImpl.setBusinessCheckSwitch(businessCheckSwitch); } ``` businessCheckSwitch:true,开启消息发送;false,降级
上一篇:从头到尾说一次 Spring 事务管理(器)
下一篇:Java NIO 图解 Netty 服务端启动的过程
jd****
文章数
1
阅读量
170
作者其他文章
01
一种通用的业务监控触发方案设计
一、背景 业务监控是指通过技术手段监控业务代码执行的最终结果或者状态是否符合预期,实现业务监控主要分成两步:一、在业务系统中选择节点发送消息触发业务监控;二、系统在接收到mq消息或者定时任务调度时,根据消息中或者任务中的业务数据查询业务执行的结果或状态并与业务预期的结果相对比。目前供销系统的方案如下: 由业务系统发送消息触发规则中心的校验任务,校验逻辑和报警规则通过规则中心的groovy脚本代码实
jd****
文章数
1
阅读量
170
作者其他文章
添加企业微信
获取1V1专业服务
扫码关注
京东云开发者公众号