您好!
欢迎来到京东云开发者社区
登录
首页
博文
课程
大赛
工具
用户中心
开源
首页
博文
课程
大赛
工具
开源
更多
用户中心
开发者社区
>
博文
>
探索Reactor线程模型及其在JMQ和JSF中的应用
分享
打开微信扫码分享
点击前往QQ分享
点击前往微博分享
点击复制链接
探索Reactor线程模型及其在JMQ和JSF中的应用
自猿其说Tech
2022-10-11
IP归属:未知
472浏览
计算机编程
### 1 前言 Reactor模式也叫反应器模式,是基于事件驱动的线程模型,通常基于在I/O多路复用实现,是处理并发I/O比较常见的一种模式。中心思想是将所有要处理的I/O事件注册到一个中心I/O多路复用器上,同时主线程在多路复用器上不断轮询;一旦有I/O事件到来或是准备就绪,多路复用器返回并将相应I/O事件分发到对应的处理器中。很多开源的IO相关组件如Netty、Redis在使用Reactor模型。本文分三部分部分:Reactor线程模型的介绍,开源框架中Reactor线程模型的应用和京东JMQ和JSF中,Reactor的应用。 ### 2 Reactor线程模型 在使用JAVA NIO和多线程来进行高并发Java服务端应用程序设计时,通常是基于Reactor线程模型来设计的。Reactor,即包含一个Java NIO的多路复用选择器Selector的反应堆,当有反应时,即该Selector所管理的某个客户端连接有IO事件过来时,则在当前线程或者分配到其他线程来处理该IO事件。 Reactor线程模型通常由接收客户端连接请求的acceptor线程和处理客户端的IO请求的IO线程两部分组成,而acceptor线程和IO处理线程可以是同一个线程,也可以是不同的线程或者是各自对应一个线程池。 #### 2.1 单线程Reactor模型 单线程Reactor模型是指由一个线程绑定一个Java NIO的多路复用选择器Selector,由该线程来处理该Selector的所有IO事件,包括新客户端连接建立请求,即监听套接字的IO事件,和已经建立连接的客户端套接字的所有IO事件请求,如数据读写。在单线程Reactor模型中,Reactor模型中的新客户端连接建立的acceptor线程和已经建立连接的客户端套接字的IO请求处理的IO线程是同一个线程。 ![](//img1.jcloudcs.com/developer.jdcloud.com/b26b982d-86fd-4d0a-b0b4-9994eff8ab5420221011170745.png) 由于该Selector所管理的所有客户端连接和服务端的监听套接字的IO请求都是由该线程来处理,所以如果当前线程正在处理某个客户端的数据读写IO请求,则无法处理当前的新建立连接的客户端请求,导致客户端无法建立连接或者连接超时。同时,由于只使用一个线程,所以也不能充分利用多核CPU。 所以虽然通过使用单线程,规避了线程竞争和线程上下文切换,但是由于单个线程处理能力有限,所以单线程Reactor模型也无法支撑高并发客户端请求的场景,Java服务端应用程序设计很少使用到该模型。 #### 2.2 单Acceptor线程多IO线程的Reactor模型 多线程Reactor模型与单线程Reactor模型的主要区别是已经建立连接的客户端套接字的IO事件是在另外一个线程或者另外一个线程池来处理的,这种线程也称为IO线程,而处理监听套接字的新客户端连接请求的线程则还是一个独立的Acceptor线程。 具体工作过程如图所示:使用在一个独立Acceptor线程通过监听套接字监听客户端的连接请求,当有新的客户端连接到来时,Acceptor线程会为该客户端创建对应的channel,然后从IOThreadPool中取出一个IOThread,将channel注册到取出来的IOThread的Selector对象上。由该IOThread上的Selector对象监听channel后续的读写IO事件并进行处理。 ![](//img1.jcloudcs.com/developer.jdcloud.com/7f5aa21c-5648-4a9b-9ea4-0c3462e5b85f20221011170811.png) 以上示意图是针对一个已建立连接的客户端套接字的所有IO请求可以是始终在IO线程池中的一个IO线程中处理。不过也可以是使用一个专门的IO线程来负责监视所有客户端的channels,当某个客户端有IO事件到来时,将该客户端的此次IO事件封装为一个Runnable的任务,交给另外的线程池处理,即某个客户端的所有IO事件会在不同中线程处理。不过推荐是始终在一个IO线程中处理,这样就不存在多个线程对该客户端套接字进行并发操作的场景,即不需要通过加锁之类的操作来对该客户端套接字对应的channel对象引用进行线程同步。 这种已建立连接的客户端的所有IO事件都是在同一个IO线程中处理的方式也是netty4的IO线程模型,通过客户端channel与线程的绑定来避免线程竞争来提高性能。 #### 2.3 多Acceptor线程多IO线程Reactor模型 多Acceptor线程多IO线程Reactor模型实际与单Acceptor线程多IO线程Reactor模型差不多,主要一个区别就是用于接收客户端的连接请求的线程不再是只使用一个acceptor线程,而是使用一个包含多个acceptor线程的线程池,从而解决单个acceptor线程在处理多个不同的端口的高并发客户端连接建立请求时的性能瓶颈,即当服务端需要同时监听多个端口时,可以使用一个包含多个Acceptor线程的线程池。 该模型的工作过程如图所示:使用多个Acceptor线程来处理客户端的连接请求。 ![](//img1.jcloudcs.com/developer.jdcloud.com/1d806be6-38e7-4f11-8644-2aff7886c3d920221011172540.png) #### 2.4 主从Reactor多线程模型 主从Reactor多线程模型是在单Acceptor线程多IO线程模型和多Acceptor线程多IO线程模型的基础上,结合业务线程形成的一种新的线程模型,也是Reactor线程模型在实际应用中用的最多的线程模型。IO线程读取完数据后,计算任务会提交给业务线程来处理。 ##### 2.4.1 基于单Acceptor线程的主从多线程模型 工作模式如下: ![](//img1.jcloudcs.com/developer.jdcloud.com/aa43d9c1-357c-4d12-9d64-dfff36ff61eb20221011172740.png) ##### 2.4.2 基于多Acceptor线程的主从多线程模型 工作模式如下: ![](//img1.jcloudcs.com/developer.jdcloud.com/7475ead7-9885-4cbf-93cd-242b31634b9420221011172758.png) ### 3 开源框架中Reactor线程模型的应用 上文只是介绍了Reactor的一些概念,实际编写代码的时候,要使用上面的线程模型还是有一定的难度,至少不是那么方便,实际中的使用还是要借助一些优秀的开源框架,那么开源框架中是如何使用的呢? #### 3.1 Netty中的Reactor Netty框架的线程模型就是使用的Reactor线程模型,并且提供了易用的接口和参数,可以轻松实现Reactor的多种模型。 ##### 3.1.1 单线程模型 如下代码就能轻松实现单线程模型,共用一个EventLoopGroup,并且线程数设置为1 ```java public class Server { public static void main(String[] arg) { EventLoopGroup reactorGroup = new NioEventLoopGroup(1); //服务类 ServerBootstrap b = new ServerBootstrap(); b = b.group(reactorGroup); b = b.option(ChannelOption.SO_BACKLOG, 128); b = b.childOption(ChannelOption.SO_KEEPALIVE, true); //设置niosocket工厂 b.channel(NioServerSocketChannel.class); // 后续代码省略 } } ``` 线程nioEventGroup2-1负责创建连接和io操作。 ![](//img1.jcloudcs.com/developer.jdcloud.com/96ef6802-029b-43e6-8f55-f174efde87a020221011173302.png) ##### 3.1.2 单Acceptor线程多IO线程的模型 代码如下 ```java public class Server { public static void main(String[] arg) { EventLoopGroup acceptorGroup = new NioEventLoopGroup(1); // 构造函数中不指定数的话,默认会创建当前机器CPU核数的两倍数量的EventLoop EventLoopGroup IOGroup = new NioEventLoopGroup(); //服务类 ServerBootstrap b = new ServerBootstrap(); b = b.group(acceptorGroup,IOGroup); b = b.option(ChannelOption.SO_BACKLOG, 128); b = b.childOption(ChannelOption.SO_KEEPALIVE, true); //设置niosocket工厂 b.channel(NioServerSocketChannel.class); // 后续代码省略 } } ``` 线程可视化如下图所示 ![](//img1.jcloudcs.com/developer.jdcloud.com/120a144a-d323-4c92-8ac6-e00c8687eaf620221011173350.png) ##### 3.1.3 多Acceptor线程模型多IO线程模型 事实上,据我观察,Netty并不能支持这种线程模型,如下代码中,在线程可视化中观察到的Acceptor线程仍然只有一个。 ```java public class Server { public static void main(String[] arg) { EventLoopGroup acceptorGroup = new NioEventLoopGroup(4); EventLoopGroup IOGroup = new NioEventLoopGroup(8); //服务类 ServerBootstrap b = new ServerBootstrap(); b = b.group(acceptorGroup,IOGroup); b = b.option(ChannelOption.SO_BACKLOG, 128); b = b.childOption(ChannelOption.SO_KEEPALIVE, true); //设置niosocket工厂 b.channel(NioServerSocketChannel.class); // 后续代码省略 } } ``` #### 3.2 Jetty中的Reactor 在看Jetty的代码的时候,发现Jetty不支持单线程模型,本来单线程模型在真实场景就没人用的。实际上,Jetty也是不支持单Acceptor线程多IO线程的Reactor模型和多Acceptor线程多IO线程Reactor模型,因为Jetty的acceptor线程,selector线程,worker线程都会创建。所以在Jetty中,直接就是基于单Acceptor线程的主从多线程模型和基于多Acceptor线程的主从多线程模型。 ##### 3.2.1 基于单Acceptor线程的主从多线程模型 Jetty默认创建一个acceptor线程,两个selector线程,n-1-2 个worker线程,n为QueuedThreadPool的大小。在创建Server的时候,如果构造函数不传QueuedThreadPool,那么Server会自己构建一个QueuedThreadPool。 ```java import http.HTTPService; import org.eclipse.jetty.server.*; import org.eclipse.jetty.server.handler.DefaultHandler; import org.eclipse.jetty.server.handler.HandlerCollection; import org.eclipse.jetty.servlet.ServletContextHandler; import org.eclipse.jetty.servlet.ServletHolder; import org.eclipse.jetty.util.thread.QueuedThreadPool; import java.util.concurrent.Executor; import java.util.concurrent.Executors; public class ExampleServer { public static Server createServer(int port) { Server server = new Server(); ServerConnector connector = new ServerConnector(server); connector.setPort(port); server.setConnectors(new Connector[]{connector}); ServletContextHandler context = new ServletContextHandler(); context.setContextPath("/"); context.addServlet(new ServletHolder(new HTTPService.HelloServlet()), "/hello"); HandlerCollection handlers = new HandlerCollection(); handlers.setHandlers(new Handler[]{context, new DefaultHandler()}); server.setHandler(handlers); return server; } public static void main(String[] args) throws Exception { int port = 8080; Server server = createServer(port); server.start(); server.join(); } } ``` 上面的代码跑起来之后,线程模型就是单Acceptor线程的主从多线程模型。此处Acceptor是一个线程,IOThread是两个线程,业务线程是5个线程。 创建Server的时候,也可以传入自定义的QueuedThreadPool,如下 ```java QueuedThreadPool workers = new QueuedThreadPool(4); Server server = new Server(workers); ``` 需要注意的是,自定义QueuedThreadPool的时候,线程配置至少需要大于等于4,否则会启动失败。因为“Acceptor”、"IOThread"和"业务处理"会共用QueuedThreadPool线程池,"Acceptor"占用一个,“IOThrea”占用两个,“业务处理”至少占用一个,所以线程池至少需要4个及以上。 线程可视化如下,mainReactor就是指Acceptor,subReactorThreadPool就是指IOThreadPool。 ![](//img1.jcloudcs.com/developer.jdcloud.com/295a6d4a-d48a-4e4a-b5a0-ee3a48331a1220221011173858.png) 因为都是从QueuedThreadPool中取出来的,mainReactor线程根据名称一下就看出来了,如何知道subReactor是两个呢? 在接受请求的时候,ServerConnector创建完socket之后,会选择select,打断点,可以看到SelectorManager里面有两个ManagedSelector。 ![](//img1.jcloudcs.com/developer.jdcloud.com/90a2fc04-e090-4a3d-9c81-8fd2cd78047d20221011175455.png) ##### 3.2.2 基于多Acceptor线程的主从多线程模型 上面讲的这都是Jetty的默认的默认,实际上一切都可以配置,mainReactor和subReactor最好单独配置,别占用worker线程池。如下代码,配置了两个mainReactor线程,四个subReactor线程,8个worker线程。 ```java import http.HTTPService; import org.eclipse.jetty.server.*; import org.eclipse.jetty.server.handler.DefaultHandler; import org.eclipse.jetty.server.handler.HandlerCollection; import org.eclipse.jetty.servlet.ServletContextHandler; import org.eclipse.jetty.servlet.ServletHolder; import org.eclipse.jetty.util.thread.QueuedThreadPool; import java.util.concurrent.Executor; import java.util.concurrent.Executors; public class ExampleServer { public static Server createServer(int port) { QueuedThreadPool workers = new QueuedThreadPool(8); Server server = new Server(workers); Executor executors = Executors.newFixedThreadPool(6); ServerConnector connector = new ServerConnector(server, executors, null, null, 2, 4, new HttpConnectionFactory()); connector.setPort(port); server.setConnectors(new Connector[]{connector}); ServletContextHandler context = new ServletContextHandler(); context.setContextPath("/"); context.addServlet(new ServletHolder(new HTTPService.HelloServlet()), "/hello"); HandlerCollection handlers = new HandlerCollection(); handlers.setHandlers(new Handler[]{context, new DefaultHandler()}); server.setHandler(handlers); return server; } public static void main(String[] args) throws Exception { int port = 8080; Server server = createServer(port); server.start(); server.join(); } } ``` 线程可视化如下所示: ![](//img1.jcloudcs.com/developer.jdcloud.com/95d0522b-e774-4753-ab2c-e29c27aa43e720221011175534.png) 这就是Reactor线程模型介绍中提到的最实用、性能最好的基于多Acceptor线程的主从多线程模型。在Netty线程模型中,无论如何设置bossGroup,mainReactor一直是一个线程。此线程模型,Rcceptor是多线程,IOThread也是多线程,但是它俩共用了一个线程池,业务线程BIZThreadPool是自己的线程池。 ### 4 京东JMQ和JSF中的线程模型 JMQ和JSF的网络传输是基于NIO的,并没有直接基于JDK中提供的NIO接口来编程,而是基于Netty这款优秀的开源框架来进行开发的。 #### 4.1 JMQ中的线程模型 jmq中的核心组件是,NettyServer,UML图如下: ![](//img1.jcloudcs.com/developer.jdcloud.com/a474f56a-54b4-4034-83f3-ab85787fb75020221011175601.png) com.jd.jmq.common.network.netty.NettyServer#doStart进行启动,会在方法com.jd.jmq.common.network.netty.NettyServer#configure中进行Netty参数的配置。 ```java protected void configure(NettyServerConfig serverConfig, InetSocketAddress address) { if (bossLoopGroup == null) { bossLoopGroup = createEventLoopGroup(config.getSelectorThreads(), new NamedThreadFactory("BossLoopGroup")); createBossLoopGroup = true; } bootStrap.group(bossLoopGroup, ioLoopGroup) .channel(config.isEpoll() ? EpollServerSocketChannel.class : NioServerSocketChannel.class).option( ChannelOption.TCP_NODELAY, serverConfig.isTcpNoDelay()) //.option(ChannelOption.SO_TIMEOUT, config.getSoTimeout()) // NIO事件通知机制不支持so_timeout .option(ChannelOption.SO_REUSEADDR, serverConfig.isReuseAddress()) .option(ChannelOption.SO_KEEPALIVE, serverConfig.isKeepAlive()) .option(ChannelOption.SO_LINGER, serverConfig.getSoLinger()) .option(ChannelOption.SO_RCVBUF, serverConfig.getSocketBufferSize()) .option(ChannelOption.SO_SNDBUF, serverConfig.getSocketBufferSize()) .option(ChannelOption.SO_BACKLOG, serverConfig.getBacklog()).localAddress(address) .childHandler(childHandler()); } ``` 单独配置了bossLoopGroup和ioLoopGroup,在创建业务handler的时候,调用了com.jd.jmq.common.network.netty.NettyTransport#childHandler方法,初始化channel的时候,会传入自定义的workerGroup线程池。如果没有定义workerGroup,则直接在ioLoopGroup中处理业务逻辑。 ```java protected class PipelineChain extends ChannelInitializer { @Override protected void initChannel(Channel channel) throws Exception { if (workerGroup != null) { channel.pipeline().addLast(workerGroup, getChannelHandlers()); } else { channel.pipeline().addLast(getChannelHandlers()); } } } ``` 感兴趣的朋友可以验证一下,在创建com.jd.jmq.common.network.netty.NettyTransport#createEventLoopGroup方法中,线程数是可以根据io.netty.eventLoopThreads参数配置的,如果没有配置,会在1和线程数*2直接取最大值,但是无论如何配置,bossLoopGroup应该都只有一个线程。 #### 4.2 JSF中的线程模型 JSF中,网络传输组件是:JSFServerTransport,配置组件是:ServerTransportConfig,业务handle是BaseServerHandler。类图如下: ![](//img1.jcloudcs.com/developer.jdcloud.com/fa0b5604-8438-48ae-83b3-3d5150e710b920221011175712.png) ServerTransportConfig中的方法initParentEventLoopGroup用来初始化parentGroup ```java private synchronized EventLoopGroup initParentEventLoopGroup() { int threads; if (this.parentNioEventThreads == 0) { threads = Math.max(4, Constants.CPU_CORES / 2); } else { threads = this.parentNioEventThreads; } NamedThreadFactory threadName = new NamedThreadFactory("JSF-SEV-BOSS", this.isDaemon()); Object eventLoopGroup; if (this.isUseEpoll()) { eventLoopGroup = new EpollEventLoopGroup(threads, threadName); } else { eventLoopGroup = new NioEventLoopGroup(threads, threadName); } return (EventLoopGroup)eventLoopGroup; } ``` 方法initChildEventLoopGroup用来初始化childGroup ```java private synchronized EventLoopGroup initChildEventLoopGroup() { int threads = this.childNioEventThreads > 0 ? this.childNioEventThreads : Math.max(8, Constants.DEFAULT_IO_THREADS); NamedThreadFactory threadName = new NamedThreadFactory("JSF-SEV-WORKER", this.isDaemon()); EventLoopGroup eventLoopGroup = null; if (this.isUseEpoll()) { eventLoopGroup = new EpollEventLoopGroup(threads, threadName); } else { eventLoopGroup = new NioEventLoopGroup(threads, threadName); } return (EventLoopGroup)eventLoopGroup; } ``` 在BaseServerHandler的构造函数中,会调用com.jd.jsf.gd.server.BusinessPool#getBusinessPool方法来初始化业务线程池。 ```java public BaseServerHandler(ServerTransportConfig transportConfig) { this.serverTransportConfig = transportConfig; this.bizThreadPool = BusinessPool.getBusinessPool(this.serverTransportConfig); } public static ThreadPoolExecutor getBusinessPool(ServerTransportConfig transportConfig) { int port = transportConfig.getPort(); ThreadPoolExecutor pool = (ThreadPoolExecutor)poolMap.get(port); if (pool == null) { pool = (ThreadPoolExecutor)CommonUtils.putToConcurrentMap(poolMap, port, initPool(transportConfig)); } return pool; } private static synchronized ThreadPoolExecutor initPool(ServerTransportConfig transportConfig) { final int port = transportConfig.getPort(); int maxPoolSize = transportConfig.getServerBusinessPoolSize(); String poolType = transportConfig.getServerBusinessPoolType(); int minPoolSize; char aliveTime; if ("fixed".equals(poolType)) { minPoolSize = maxPoolSize; aliveTime = 0; } else { if (!"cached".equals(poolType)) { throw new IllegalConfigureException(21401, "server.threadpool", poolType); } minPoolSize = 20; maxPoolSize = Math.max(minPoolSize, maxPoolSize); aliveTime = '\uea60'; } String queueType = transportConfig.getPoolQueueType(); int queueSize = transportConfig.getPoolQueueSize(); boolean isPriority = "priority".equals(queueType); BlockingQueue<Runnable> configQueue = ThreadPoolUtils.buildQueue(queueSize, isPriority); NamedThreadFactory threadFactory = new NamedThreadFactory("JSF-BZ-" + port, true); RejectedExecutionHandler handler = new RejectedExecutionHandler() { private int i = 1; public void rejectedExecution(Runnable r, ThreadPoolExecutor executor) { if (this.i++ % 7 == 0) { this.i = 1; BusinessPool.LOGGER.warn("[JSF-23002]Task:{} has been reject for ThreadPool exhausted! pool:{}, active:{}, queue:{}, taskcnt: {}", new Object[]{r, executor.getPoolSize(), executor.getActiveCount(), executor.getQueue().size(), executor.getTaskCount()}); } RejectedExecutionException err = new RejectedExecutionException("[JSF-23003]Biz thread pool of provider has bean exhausted, the server port is " + port); ProviderErrorHook.getErrorHookInstance().onProcess(new ProviderErrorEvent(err)); throw err; } }; LOGGER.debug("Build " + poolType + " business pool for port " + port + " [min: " + minPoolSize + " max:" + maxPoolSize + " queueType:" + queueType + " queueSize:" + queueSize + " aliveTime:" + aliveTime + "]"); return new ThreadPoolExecutor(minPoolSize, maxPoolSize, (long)aliveTime, TimeUnit.MILLISECONDS, configQueue, threadFactory, handler); } ``` ### 5 总结 本文开篇介绍了线程模型的概念,接着着重介绍了非常优秀的开源网络编程框架Netty和Jetty是如何使用Reactor线程模型的,通过JDK提供的线程可视化工具,观察了Reactor线程模式运行过程中线程的状态,还通过debug的方式观察了Jetty运行过程中的线程状态。最后介绍了在JMQ和JSF中如何基于Netty框架来实现高性能的网络编程。希望能通过这篇文章,让大家更好的理解线程模型,在工作中基于多线程开发的时候,能开发出性能更好的程序。 ------------ ###### 自猿其说Tech-JDL京东物流技术与数据智能部 ###### 作者:高圆庆
原创文章,需联系作者,授权转载
上一篇:mysql基于binlake同步ES积压解决方案
下一篇:代码评审的价值和规范
相关文章
Taro小程序跨端开发入门实战
Flutter For Web实践
配运基础数据缓存瘦身实践
自猿其说Tech
文章数
426
阅读量
2163890
作者其他文章
01
深入JDK中的Optional
本文将从Optional所解决的问题开始,逐层解剖,由浅入深,文中会出现Optioanl方法之间的对比,实践,误用情况分析,优缺点等。与大家一起,对这项Java8中的新特性,进行理解和深入。
01
Taro小程序跨端开发入门实战
为了让小程序开发更简单,更高效,我们采用 Taro 作为首选框架,我们将使用 Taro 的实践经验整理了出来,主要内容围绕着什么是 Taro,为什么用 Taro,以及 Taro 如何使用(正确使用的姿势),还有 Taro 背后的一些设计思想来进行展开,让大家能够对 Taro 有个完整的认识。
01
Flutter For Web实践
Flutter For Web 已经发布一年多时间,它的发布意味着我们可以真正地使用一套代码、一套资源部署整个大前端系统(包括:iOS、Android、Web)。渠道研发组经过一段时间的探索,使用Flutter For Web技术开发了移动端可视化编程平台—Flutter乐高,在这里希望和大家分享下使用Flutter For Web实践过程和踩坑实践
01
配运基础数据缓存瘦身实践
在基础数据的常规能力当中,数据的存取是最基础也是最重要的能力,为了整体提高数据的读取能力,缓存技术在基础数据的场景中得到了广泛的使用,下面会重点展示一下配运组近期针对数据缓存做的瘦身实践。
自猿其说Tech
文章数
426
阅读量
2163890
作者其他文章
01
深入JDK中的Optional
01
Taro小程序跨端开发入门实战
01
Flutter For Web实践
01
配运基础数据缓存瘦身实践
添加企业微信
获取1V1专业服务
扫码关注
京东云开发者公众号