您好!
欢迎来到京东云开发者社区
登录
首页
博文
课程
大赛
工具
用户中心
开源
首页
博文
课程
大赛
工具
开源
更多
用户中心
开发者社区
>
博文
>
JSF本地联调工具实践
分享
打开微信扫码分享
点击前往QQ分享
点击前往微博分享
点击复制链接
JSF本地联调工具实践
自猿其说Tech
2022-01-11
IP归属:未知
87160浏览
计算机编程
### 1 背景 在项目开发中,研发同学会经历下面的过程: ![](//img1.jcloudcs.com/developer.jdcloud.com/de16720e-8074-4679-91b5-dd14423a7c1120220111171400.jpg) 但是针对团队的测试环境,涉及到几个痛点: - 多人联调使用时尤其是跨部门联调时,强依赖环境稳定,频繁启停发布或者由于某些原因没有启动成功,影响的不是一个人的,而是多方的。 - jsf接口逻辑较多,联调环节存在隐晦的bug时,大部分做法是打点日志重启再次调用排查,如果没有发现问题,继续反复加日志重启排查。这样的过程是苦恼且费时的,终于找到问题了,进行修复,但是修复完成又懒得把排查时添加的日志删了。这一系列问题是由于不能打断点。 - 联调的双方有一方没有测试环境,想联调只能上uat。 上面列举的痛点,大家或多或少都有体会,尤其是测试同学体更多一些。其实在项目周期中,将问题暴露的越早,项目时间越可控。为了不在痛,引出本章的主角:jsf本地联调工具。 ### 2 设计初衷 对于设计工具,有两个设计准则: - 非侵入,不影响项目代码 - 简单,只需要简单的配置就可以看到效果 目的就是要做到简单好用。 ### 3 方案 首先看下整体架构图,这里面分成三层: - 黄色代表jsf调用者 - 蓝色代表redis中转者 - 绿色代表jsf提供者 为什么要使用中间件作为中转,这是由于我们内网环境下,两台电脑是无法pin通的,所以无法进行直连,所以需要中转来做数据传输。 ![](//img1.jcloudcs.com/developer.jdcloud.com/951e4afe-013c-4500-a51d-6dc28e67dbc320220111171453.jpg) 看过架构图大家会对工具有个大体的了解,利用redis的发布订阅模式来完成jsf请求到响应的数据传递工作。除此之外我们要解决的问题还有很多,比如如何做到非侵入、如何在线程中发出请求后同步得到结果、数据传输过程中使用哪种序列化方式、怎么做到简单配置即可使用等等,针对这些问题我列了个思维脑图,方便整理思路。 下面是对工具的思维脑图: ![](//img1.jcloudcs.com/developer.jdcloud.com/f6497502-2348-438a-a1d6-c13e5085e8c220220111171510.jpg) 通过思维脑图,我列出了解决这些问题的办法,下面就一一讲解。 #### 3.1 连通方式 ##### 3.1.1 中间件 针对本地环境无法连通的难点,有句老话:逢山开路,遇水搭桥。这是一种精神,也是这个难点的解决方案。 内网环境下,两个电脑无法连通,但是每台电脑都可以和中间件连通。通过中间件作为连通两个本地环境的桥梁,做到连通。 ![](//img1.jcloudcs.com/developer.jdcloud.com/7a4edb9b-78df-4aaa-b3dd-1b502f969e2a20220111171532.jpg) 通过上图可以分析,中间件需要具备的能力就是发布订阅,那么能想到的就是jmq和redis。与其每次使用工具时都去配置topic、用户名、密码、链接地址等等,还不如在组件里集成好,并固定一组请求和响应topic,让使用者无感的使用。基于这个思路,我们继续进行假设:现在有A机器调用者,B机器和C机器是提供者,B和C都在本地启动了,A想和B进行联调,如何保证B可以接收到消息呢? 这就分两种情况: - 使用jmq的情况,两台提供者同时启动,如果用户相同,那么相当于在同一组消费topic,A请求打到B机器还是C机器无法确定。但是jmq支持广播模式,可以实现B机器和C机器同时受到消息。那如何确定B才是真是本次请求的目标机器呢?可以在jsf请求消息体里携带目的机器的ip信息,当B和C同时接到消息后,判断是否与自身ip匹配,匹配就说明这次请求是自己该处理的,否则不处理即可。 ![](//img1.jcloudcs.com/developer.jdcloud.com/4c99bae8-cd74-4208-a20a-82f9cc145a8a20220111171552.jpg) - 使用redis的情况,redis的订阅发布模式,本身就是广播模式,只要订阅的机器都可以收到请求。多个提供者时处理办法和上面的一致即可。 这样我们通过jmq或者redis都可以实现想要的效果了,那么到底选择哪中更加合适呢?那就要从jmq和redis特点来说明: - 先说jmq中间件,jmq的topic需要在jmq平台进行申请,如果联调双方有一方无法使用工具里集成的jmq环境进行连通,那就要自定义jmq环境,并且还需要在jmq平台申请topic。再一点,也是重要的一点,jmq在广播模式下,由于消费者是拉取模式,如果第一次判断队列里没有数据,则第二次轮询的时间会比较慢。这样就会导致使用这个jsf联调工具时,一次请求响应的时间比较长,测试结果,一次请求响应可以达到30s以上,这还是比较难接受的。 - 再说redis,redis发布订阅模式不需要通过平台申请topic这一步。redis会判断订阅的topic是否存在,不存在就自己创建一个,当没有订阅者时会自己删除topic,这个实现的目的就是节约内存。同时redis基于内存实现,响应速度非常快,经过测试,可以达到秒级甚至更快。 通过上面的分析我决定使用redis作为本工具的中间件。 ##### 3.1.2 序列化 这是几乎所有通过网络传输都需要考虑的问题,我们使用jsf时,jsf支持的序列化方式很多,我在公司使用比较多的就是默认的方式msgpack和hessian。msgpack携带的数据更少,性能更好。而hessian更加灵活,但是性能不如msgpack。如果使用redis,redis本身支持的序列化方式里没有msgpack和hessian,但是使用redistemplete时,可以通过扩展的方式对序列化进行自定义,我们可以自己根据msgpack和hessian进行序列化。但是问题由来了,redistemplete在springboot启动时,需要定义好针对哪种类型数据使用哪种序列化方式,也就是很难做到像jsf那样,根据请求。目前做法是针对msgpack和hessian通过不同的topic区分,请求时根据携带的序列化方式发送给不同序列化方式的topic。 redis序列化扩展需要实现RedisSerializer<T>泛型接口,实现serialize和deserialize方法即可。我结合了jsf提供序列化工具进行扩展。 ```java public class RedisJsfSerializer<T> implements RedisSerializer<T> { private final JavaType javaType; private final Codec codec; public RedisJsfSerializer(Class<T> type, Constants.CodecType codecType) { this.javaType = getJavaType(type); this.codec = CodecFactory.getInstance(codecType); } @Override public byte[] serialize(T t) throws SerializationException { if (t == null) { return new byte[0]; } try { return this.codec.encode(t); } catch (Exception ex) { throw new SerializationException("Could not write codec: " + ex.getMessage(), ex); } } @Override public T deserialize(byte[] bytes) throws SerializationException { try { return (T)codec.decode(bytes, javaType.getRawClass()); } catch (Exception e) { return (T)codec.decode(bytes, javaType.getRawClass().getTypeName()); } } protected JavaType getJavaType(Class<?> clazz) { return TypeFactory.defaultInstance().constructType(clazz); } } ``` 构造中传入的是jsf定义的序列化类型,通过CodecFactory.getInstance(codecType)获取对应的序列化器。 这里需要注意的是,如果是msypack方式对ResponseMessage进行反序列化时,jsf提供了一个反序列化模板,如果不使用反序列化模板,则反序列化会报错,所以在上面deserialize方法中对应做了处理。 有了序列化器,就可以针对jsf请求和响应提供hessian和msgpack两种序列化处理。 ```java public class RedisSerializerConfiguration { @Bean public RedisSerializer<RequestMessage> hessianRequestSerializer() { return new RedisJsfSerializer<>(RequestMessage.class, Constants.CodecType.hessian); } @Bean public RedisSerializer<ResponseMessage> hessianResponseSerializer() { return new RedisJsfSerializer<>(ResponseMessage.class, Constants.CodecType.hessian); } @Bean public RedisSerializer<RequestMessage> msgpackRequestSerializer() { return new RedisJsfSerializer<>(RequestMessage.class, Constants.CodecType.msgpack); } @Bean public RedisSerializer<ResponseMessage> msgpackResponseSerializer() { return new RedisJsfSerializer<>(ResponseMessage.class, Constants.CodecType.msgpack); } @Bean public RedisSerializer<String> stringRedisSerializer() { return new StringRedisSerializer(); } } ``` 通过上面的配置,就可以在监听redis topic和发送topic根据序列化方式使用不同的序列化对象处理了。 ##### 3.1.3 异步转同步 我们在发送一个请求之后要同步等待结果的返回,现在发送请求变成的想redis发送一条消息,返回是需要监听响应topic拿到的。如何做到同步等待响应结果呢? 我第一个想到就是CountDownLatch,先通过伪代码看下思路: ```java //创建 CountDownLatch countDownLatch = new CountDownLatch(1); //监听响应topic subscribe(responseMessage -> countDownLatch.countDown()); //发送请求topic publish(requestMessage); //等待结果 countDownLatch.await(10, TimeUnit.SECONDS); ``` 这样就可以在收到响应的时候结束等待,继续流程了。 这里使用CountDownLatch有个好处就是可以设置等待时长,这个与jsf设置超时时长很相似。 #### 3.2 触发时机 通过上面的准备工作我们基本捋清了工具使用的关键技术。下面就要考虑在什么时机触发我们的工具,让他帮我们完成调用工作。 这里就说道了jsf的扩展性,每次jsf调用过程都会经过一些列过滤器,无论是调用者还是提供者。 借助jsf官方文档上的图来看下。 ![](//img1.jcloudcs.com/developer.jdcloud.com/c773e73e-8441-450b-adab-d63ad6fc503220220111171809.png) 既然是这样,我们扩展一个filter,每次调用者调用方法时,就可以通过自定义filter拦截请求,完成通过中间件通信的目的。 #### 3.3 易用性 首先就是不需要使用者去了解内部细节,引入就能用。之前可以看到项目里会有引入通用jar包后,需要手动显式在xml中配置对应bean才能正常使用这个功能。而我并不想这样,通过springbootstarter方式直接将对应bean引入到容器中,这不乏是一种好的方案。 其次就是jsf中的组件是否由于spring托管,只有被spring容器托管,才能通过简单配置来控制插件行为,比如插件是否开启。 最简单的验证方式就是项目启动后查看jsf内置filter是否可以从spring容器中拿到。 这里用一个集成jsf的springboot项目启动后,从容器中获取jsf内置系统时间检查过滤器。并未能获取到。 ![](//img1.jcloudcs.com/developer.jdcloud.com/602a7b91-04ae-457f-bb6c-1a09dcc6454320220111171915.png) 这个问题也可以解决,通过提供一个静态工具类,在容器启动时拿到spring应用上下文对象就可以了。例如: ```java @Slf4j public class SpringUtils implements ApplicationContextAware { private static ApplicationContext applicationContext; @Override public void setApplicationContext(ApplicationContext applicationContext) throws BeansException { if(SpringUtils.applicationContext == null) { SpringUtils.applicationContext = applicationContext; } } public static ApplicationContext getApplicationContext() { return applicationContext; } /** * 通过name获取 Bean. * @param name * @return */ public static <T> T getBean(String name,Class<T> clazz){ if(getApplicationContext() == null) { return null; } try { return getApplicationContext().getBean(name,clazz); } catch (Exception e) { log.warn("通过spring获取配置参数异常", e); } return null; } } ``` 这样我们通过这个工具就可以从容器中拿到配置参数,被任何非spring容器中的类使用了。 #### 3.4 整体流程图 说明: - 虚线部分是jsf原调用流程,工具不会通过的。 - 蓝线部分是工具走的路线,线上的描述序号为执行顺序。 ![](//img1.jcloudcs.com/developer.jdcloud.com/d3cd0697-d82b-475d-a514-73e51f51bb1320220111172006.jpg) 4 功能实现 由于篇幅有限,这里只展示关键代码。 - 提供者订阅 ```java @Configuration @ConditionalOnProperty(prefix = "jsf.plugin.filter.local.invoke",name = "enable",havingValue = "true") @EnableConfigurationProperties({JsfLocalInvokeProperties.class}) public class JsfFilterRedisConfiguration { ... /** * jsf 提供者接收消息监听器适配器 * @param messageSubscribe * @param requestMessageRedisSerializer * @return */ @Bean public MessageListenerAdapter messageListenerAdapter(MessageSubscribe<RequestMessage> messageSubscribe,RedisSerializer<RequestMessage> requestMessageRedisSerializer) { MessageListenerAdapter messageListenerAdapter = new MessageListenerAdapter(messageSubscribe, "receiveMessage"); messageListenerAdapter.setSerializer(requestMessageRedisSerializer); messageListenerAdapter.afterPropertiesSet(); return messageListenerAdapter; } /** * redis 订阅容器 * @param redisConnectionFactory * @param jsfLocalInvokeProperties * @param messageListenerAdapter * @return */ @Bean public RedisMessageListenerContainer container(RedisConnectionFactory redisConnectionFactory,JsfLocalInvokeProperties jsfLocalInvokeProperties,MessageListenerAdapter messageListenerAdapter) { RedisMessageListenerContainer container = new RedisMessageListenerContainer(); container.setConnectionFactory(redisConnectionFactory); if(RoleEnum.PRODUCER.getCode().equals(jsfLocalInvokeProperties.getRole())) { container.addMessageListener(messageListenerAdapter, new PatternTopic(LOCAL_INVOKE_REDIS_CUSTOMER_TOPIC)); } return container; } } ``` - 调用者调用Filter ```java @Slf4j @Extensible(value = "jsfLocalInvokeFilter",order = 10) @AutoActive(consumerSide = true) public class JsfLocalInvokeFilterByRedis extends AbstractFilter { private volatile JsfLocalInvokeProperties jsfLocalInvokeProperties; public static final String REGISTRY_CLASS_NAME = "com.jd.jsf.service.RegistryService"; private volatile RedisMessageListenerContainer redisMessageListenerContainer; private volatile RedisSerializer<ResponseMessage> redisSerializer; private volatile PublishMessageBean<RequestMessage> publishMessageBean; @Override public ResponseMessage invoke(RequestMessage requestMessage) { if(getJsfLocalInvokeProperties() == null || !getJsfLocalInvokeProperties().isEnable() || !addAddressAndPort(requestMessage)) { return this.getNext().invoke(requestMessage); } //排除注册、心跳接口 if(REGISTRY_CLASS_NAME.equals(requestMessage.getClassName())) { return this.getNext().invoke(requestMessage); } CountDownLatch countDownLatch = new CountDownLatch(1); //hessian对alias要求group:version 没有就补全 if(!verifyAlias(requestMessage.getAlias())) { requestMessage.setAlias(requestMessage.getAlias() + LOCAL_INVOKE_JSF_VERSION); requestMessage.getInvocationBody().addAttachment(LOCAL_INVOKE_JSF_VERSION_ADDED, 1); } log.info("开启内网jsf调用,requestMessage = {}", JSON.toJSONString(requestMessage)); //topic之间关联值 UUID uuid = UUID.randomUUID(); //添加生产者监听 AtomicReference<ResponseMessage> messageAtomicReference = new AtomicReference<>(); MessageListenerAdapter listenerAdapter = addListener(message -> { log.debug("收到生产者回复,message = {}", message); messageAtomicReference.set(message); countDownLatch.countDown(); }); requestMessage.getInvocationBody().addAttachment(LOCAL_INVOKE_UUID, uuid.toString()); //发送消费者topic getPublishMessageBean().publish(LOCAL_INVOKE_REDIS_CUSTOMER_TOPIC, requestMessage); //等结果 try { countDownLatch.await(10, TimeUnit.SECONDS); } catch (InterruptedException e) { log.warn("JsfLocalInvokeFilter等待提供者结果被中断", e); } //拿到结果 ResponseMessage responseMessage; if (messageAtomicReference.get() != null) { responseMessage = messageAtomicReference.get(); } else { //给默认值 responseMessage = MessageBuilder.buildResponse(requestMessage); } //停止并取消订阅生产者监听 removeListener(listenerAdapter); return responseMessage; } ... } ``` - jsf提供者接收请求 ```java @Slf4j public class ProviderMessageSubscribe implements MessageSubscribe<RequestMessage>{ private final PublishMessageBean<ResponseMessage> publishMessageBean; public ProviderMessageSubscribe(PublishMessageBean<ResponseMessage> publishMessageBean) { this.publishMessageBean = publishMessageBean; } @Override public void receiveMessage(RequestMessage requestMessage) { modifyAlias(requestMessage); log.info("生产者收到消息:{}", JSON.toJSONString(requestMessage)); ProviderConfig providerConfig = getProviderConfig(requestMessage); if(providerConfig != null && isSameIp(requestMessage)) { log.debug("ProviderTopicListener invokeJsf find jsf interface"); ProviderProxyInvoker invoker = new ProviderProxyInvoker(providerConfig); ResponseMessage responseMessage = invoker.invoke(requestMessage); publishMessageBean.publish(LOCAL_INVOKE_REDIS_PROVIDER_TOPIC, responseMessage); } else { log.warn("没有提供者或非调用者配置的ip,不处理"); } } private void modifyAlias(RequestMessage requestMessage) { String alias = requestMessage.getAlias(); boolean versionAdded = requestMessage.getInvocationBody().getAttachments().containsKey(LOCAL_INVOKE_JSF_VERSION_ADDED); if(versionAdded) { requestMessage.setAlias(alias.substring(0, alias.indexOf(LOCAL_INVOKE_JSF_VERSION))); } } private ProviderConfig getProviderConfig(RequestMessage requestMessage) { List<ProviderConfig> providerConfigs = JSFContext.getProviderConfigs(); for (ProviderConfig providerConfig : providerConfigs) { if(requestMessage.getAlias().equals(providerConfig.getAlias()) && requestMessage.getClassName().equals(providerConfig.getInterfaceId())) { return providerConfig; } } return null; } private boolean isSameIp(RequestMessage requestMessage) { final Object attachmentsObj = requestMessage.getInvocationBody().getAttachment(LOCAL_INVOKE_REMOTE_ATTACHMENT); if(attachmentsObj == null) { return true; } try { HashSet<String> attachmentSet = (HashSet<String>)attachmentsObj; return attachmentSet.contains(JSFContext.getLocalHost()); } catch (Exception e) { return true; } } } ``` ### 5 结语 这个小工具其实并不复杂,也没有多么高大上,只是为了解决研发过程中的小问题,方便大家使用。其实每位研发在开发的过程可能遇到不顺手的工具亦或者是工具平台某些功能不好用,这都是正常的,毕竟没有十全十美的。只要有槽点,不停留在槽点上,而是利用现有资源思考是否解决掉,并着手去实现。这样既可以解决问题,又可以锻炼自身技术,双重帮助。 对于方案或者实现细节可能会有瑕疵或者考虑不周的地方,欢迎大家指点,一起讨论,感谢阅读。 ------------ ###### 自猿其说Tech-京东物流技术发展部 ###### 作者:吕顺
原创文章,需联系作者,授权转载
上一篇:从源码角度分析VueRouter路由实现
下一篇:京东探索研究院NLP水平超越微软 织女Vega v1模型位居GLUE榜首
相关文章
Taro小程序跨端开发入门实战
Flutter For Web实践
配运基础数据缓存瘦身实践
自猿其说Tech
文章数
426
阅读量
2149963
作者其他文章
01
深入JDK中的Optional
本文将从Optional所解决的问题开始,逐层解剖,由浅入深,文中会出现Optioanl方法之间的对比,实践,误用情况分析,优缺点等。与大家一起,对这项Java8中的新特性,进行理解和深入。
01
Taro小程序跨端开发入门实战
为了让小程序开发更简单,更高效,我们采用 Taro 作为首选框架,我们将使用 Taro 的实践经验整理了出来,主要内容围绕着什么是 Taro,为什么用 Taro,以及 Taro 如何使用(正确使用的姿势),还有 Taro 背后的一些设计思想来进行展开,让大家能够对 Taro 有个完整的认识。
01
Flutter For Web实践
Flutter For Web 已经发布一年多时间,它的发布意味着我们可以真正地使用一套代码、一套资源部署整个大前端系统(包括:iOS、Android、Web)。渠道研发组经过一段时间的探索,使用Flutter For Web技术开发了移动端可视化编程平台—Flutter乐高,在这里希望和大家分享下使用Flutter For Web实践过程和踩坑实践
01
配运基础数据缓存瘦身实践
在基础数据的常规能力当中,数据的存取是最基础也是最重要的能力,为了整体提高数据的读取能力,缓存技术在基础数据的场景中得到了广泛的使用,下面会重点展示一下配运组近期针对数据缓存做的瘦身实践。
自猿其说Tech
文章数
426
阅读量
2149963
作者其他文章
01
深入JDK中的Optional
01
Taro小程序跨端开发入门实战
01
Flutter For Web实践
01
配运基础数据缓存瘦身实践
添加企业微信
获取1V1专业服务
扫码关注
京东云开发者公众号