您好!
欢迎来到京东云开发者社区
登录
首页
博文
课程
大赛
工具
用户中心
开源
首页
博文
课程
大赛
工具
开源
更多
用户中心
开发者社区
>
博文
>
优雅的使用线程池以及源码剖析
分享
打开微信扫码分享
点击前往QQ分享
点击前往微博分享
点击复制链接
优雅的使用线程池以及源码剖析
自猿其说Tech
2021-08-09
IP归属:未知
584浏览
计算机编程
### 背景 今天是10.24号,天气晴,你正在摸鱼,突然间接到一个需求,由于系统升级,说要同步数据,方案就是把老系统需要同步的数据(订单)发送到MQ中,新系统再去拉取这个MQ,很简单的方案,你要做的就是,把老系统的数据封装成MQ消息,调用发送MQ接口,发送到MQ中。 #### 1.1 开始思考 你现在有了一个发MQ消息的接口,调用一次接口发送一条MQ消息,那么什么时候调用这个接口呢?同步一条数据就调用一次接口,现在老系统有一万条数据,我就要for循环分别调用一万次这个接口,调用一次接口花费600ms,调用一万次就要花费600ms * 10000 = 6000s = 600min = 10h **笑容逐渐变态** 心想,真好,这样的话我就可以摸鱼10个小时了,今天的行云面板就先安排上了! <center>![](//img1.jcloudcs.com/developer.jdcloud.com/38bdf56f-d7d4-491c-9bbf-2e2353d1803c20210809142555.png)</center> 老板,真不是我不干活,而是程序它不中用啊,你看,我代码都写好了,是他自己非要跑10个小时的!冤枉啊老大~ #### 1.2 开始反思 被领导谈话过后,回到工位开始思考,究竟是代码的问题还是你的问题,现在有1万条数据,而你是串行调用的,能不能让它并行调用呢,听说有个叫线程池的东西很厉害的样子,听说隔壁组的小杰最近一直研究这个,我去问(逼)问(迫)他吧,好在你是个实干派,说干就干! ### 2 新手入门版 你来到了小杰的工位,发现他也正在用for循环摸鱼…… “用线程池?那当然是首选Executors工具类啊,简单粗暴,就连名字都这么短小精悍方便记忆,我们直接指定创建多少个线程就好啦,代码大概长这样” ![](//img1.jcloudcs.com/developer.jdcloud.com/2587d356-9e53-41e0-a07a-5a84b91d7a8920210809142654.jpg) ![](//img1.jcloudcs.com/developer.jdcloud.com/5f4c3008-3637-4a3e-817d-f6888dc06e7220210809142707.png) ![](//img1.jcloudcs.com/developer.jdcloud.com/450245dd-225e-40c7-a418-48e557ee695720210809142721.png) ”这简直太方便啦“ “不过但是这样的话,会有几个问题” #### 2.1 不足 ##### 2.1.1 不能自定义参数 你看,我们到现在,就指定了一个参数,可是线程池其实一共有七个参数呢,其他的我们都控制不了,这点就很不方便,同时也不有助于我们理解线程池的原理,出问题不好排查 ##### 2.1.2 容易OOM ![](//img1.jcloudcs.com/developer.jdcloud.com/d2cfe39f-a857-4fe4-9278-e561e19ed97d20210809142917.png) 其实线程池里面有一个任务队列,用来存储待执行的任务,我们如果用图中的这个线程池的话,他的队列(**LinkedBlockingQueue**)大小是接近无限大的 ![](//img1.jcloudcs.com/developer.jdcloud.com/030de10c-3e3a-4c0c-b2fd-d7e8bb84951e20210809142939.png) 这就意味着理论情况下,我们可以无限的往里放任务,一直堆在这个队列里,但是我们内存是有限的,有可能在堆任务的时候就OOM了 ”那我们怎么能够自定义参数呢?怎么避免OOM呢?“ “用带参数的构造函数就好啦,给阻塞队列设置规定大小“ 总的来说,就是根据业务需求自己来控制线程池的七个参数 ### 3 铂金钻石版 如果我们要是想自定义这几个参数的话,就要用 ThreadPoolExecutor 原生的7个参数的构造函数,他们大概长这样,根据我的观察,一般我们项目中的业务用这样的就够了 ,自己定义参数 ![](//img1.jcloudcs.com/developer.jdcloud.com/9ac6943f-275a-4427-a119-39f2af8b7ba120210809143006.png) 我们先来回顾下这七个参数是干嘛的 #### 3.1 参数详解 - corePoolSize:核心线程数 - maximumPoolSize:最大线程数 - keepAliveTime:非核心线程最大空闲时间 - unit:空闲时间的单位 - workQueue:任务队列,用来存放还未处理完的任务 - threadFactory(可自定义):线程工厂,用来生产线程的 - RejectedExecutionHandler(可自定义):拒绝策略:已经达到最大线程数了并且阻塞队列也满了,这个时候还要来任务的话,那么就用拒绝策略拒绝后面的任务 #### 3.2 总体流程 ![](//img1.jcloudcs.com/developer.jdcloud.com/35337bf7-d892-4e94-b7a5-bccd1bae47ac20210809143128.png) #### 3.3 不足 ##### 3.3.1 无法监控每个线程的具体运行相关情况 发没发现,这样确实可以跑出来对应的结果,不过我总觉得有点怪怪的,站在一个旁观者的角度来看,这里有一堆任务,我把他们丢进去了线程池里面,它会有一堆线程来处理,可是我也不知道具体的线程处理情况,也不知道每个任务对应的线程的运行时间是多少,那在每个任务运行前和运行后我能不能打印点自定义的日志?这些我都不能做呀,束缚住了我的手脚 ##### 3.3.2 无异常处理 这么多任务,要是其中有一个跑着跑着有异常怎么处理?线程池的异常梳理机制是什么?我想记录异常日志怎么记录? **只要思想不滑坡,方法总比困难多!** <center>![](//img1.jcloudcs.com/developer.jdcloud.com/50874f42-e43f-446c-a78d-e487465bffab20210809143219.png)</center> ### 4 王者乱杀版 我们想到的问题,前辈们肯定是都想到了呀,要不然这个线程池能被广泛的应用在各种业务场景下你说是不? 这一切的一切,都要继承ThreadPoolExecutor这个类,重写父类的几个方法 #### 4.1 拓展接口 - 任务执行前 ```java protected void beforeExecute(Thread t, Runnable r) { } ``` - 任务执行后/有异常 ```java protected void afterExecute(Runnable r, Throwable t) { } ``` - 线程池结束后 ```java protected void terminated() { } ``` #### 4.2 自定义线程池 ```java public class MyThreadPoolExecutor extends ThreadPoolExecutor { public static Logger log = LoggerFactory.getLogger(MyThreadPoolExecutor .class); private AtomicLong numTasks = new AtomicLong(); private AtomicLong totalTime = new AtomicLong(); private ThreadLocal<Long> startTime = new ThreadLocal<Long>(); public MyThreadPoolExecutor(int corePoolSize, int maximumPoolSize, long keepAliveTime, TimeUnit unit, BlockingQueue<Runnable> workQueue, ThreadFactory threadFactory, RejectedExecutionHandler handler) { super(corePoolSize, maximumPoolSize, keepAliveTime, unit, workQueue, threadFactory, handler); } @Override protected void beforeExecute(Thread t, Runnable r) { log.info("Thread {},Runnable {}",r.toString(),r.toString()); startTime.set(System.currentTimeMillis()); } @Override protected void afterExecute(Runnable r, Throwable t) { numTasks.incrementAndGet(); long taskTime = System.currentTimeMillis() - startTime.get(); totalTime.addAndGet(taskTime); log.info("Runnable {},这次任务执行时间是{},截至目前为止总耗时是{},异常是{}", r.toString(),taskTime,totalTime.get(),t.getMessage()); } @Override protected void terminated() { log.info("terminated 线程关闭 总耗时是{},总任务数是 {},平均耗时是{}",totalTime.get(),numTasks.get(),totalTime.get()/numTasks.get()); } @Override public void execute(Runnable command) { super.execute(command); } } ``` 我们可以这样就可以在任务执行前后输出自己想输出的业务日志了/记录时间等相关信息 #### 4.3 线程池配置类 ```java @Configuration public class MyThreadPoolExecutorConfig { public static Logger log = LoggerFactory.getLogger(MyThreadPoolExecutorConfig.class); @Value("${threadPoolExecutorConfig.corePoolSize}") private int corePoolSize; @Value("${threadPoolExecutorConfig.maximumPoolSize}") private int maximumPoolSize; @Value("${threadPoolExecutorConfig.keepAliveTime}") private int keepAliveTime; @Value("${threadPoolExecutorConfig.capacity}") private int capacity; @Value("${threadPoolExecutorConfig.namePrefix}") private String namePrefix; class MyThreadPoolFactory implements ThreadFactory{ private final AtomicInteger poolNumber = new AtomicInteger(1); private final ThreadGroup group; private final AtomicInteger threadNumber = new AtomicInteger(1); private final String namePrefix; MyThreadPoolFactory(String name) { SecurityManager s = System.getSecurityManager(); group = (s != null) ? s.getThreadGroup() : Thread.currentThread().getThreadGroup(); if (null == name || name.isEmpty()) { name = "pool"; } namePrefix = name + "-thread-" + poolNumber.getAndIncrement(); } @Override public Thread newThread(Runnable r) { Thread t = new Thread(group, r, namePrefix + threadNumber.getAndIncrement(), 0); if (t.isDaemon()){ t.setDaemon(false); } if (t.getPriority() != Thread.NORM_PRIORITY){ t.setPriority(Thread.NORM_PRIORITY); } return t; } } class MyRejectedExecutionHandler implements RejectedExecutionHandler{ @Override public void rejectedExecution(Runnable r, ThreadPoolExecutor executor) { //打印被拒绝的任务 log.info("任务被拒绝 {},CompletedTaskCount已完成执行的大致任务总数 {},ActiveCount正在主动执行任务的线程的大致数量{}," + "CorePoolSize核心线程数{},LargestPoolSize曾经同时进入池中的最大线程数 {},MaximumPoolSize允许的最大线程数{}," + "QueueSize{},TaskCount已安排执行的大致任务总数{}",r.toString(),executor.getCompletedTaskCount(), executor.getActiveCount(),executor.getCorePoolSize(),executor.getLargestPoolSize(),executor.getMaximumPoolSize(), executor.getQueue().size(),executor.getTaskCount()); } } @Bean public ExecutorService getThreadPool(){ log.info("初始化线程池 corePoolSize {},maximumPoolSize{},keepAliveTime {},capacity{},namePrefix{}",corePoolSize,maximumPoolSize,keepAliveTime,capacity,namePrefix); return new MyThreadPoolExecutor(corePoolSize,maximumPoolSize,keepAliveTime,TimeUnit.SECONDS, new ArrayBlockingQueue<>(capacity),new MyThreadPoolFactory(namePrefix),new MyRejectedExecutionHandler()); } } ``` 前面我们讲过,线程池工厂/拒绝策略,我们都可以自定义的,图中就是自定义之后的样子,除此之外,我们一般喜欢把重要的参数抽取出来写在配置文件里面,便于查看与修改 ![](//img1.jcloudcs.com/developer.jdcloud.com/796be2c8-1382-40f2-902c-d50453bd460b20210809143514.png) #### 4.4 测试代码 ![](//img1.jcloudcs.com/developer.jdcloud.com/dbd7c527-ff76-4515-88bb-f24937160d8e20210809143554.png) 为了测试自定义处理异常,改了下测试代码,让处理orderNo为”order_1“的订单号的线程抛异常 ![](//img1.jcloudcs.com/developer.jdcloud.com/ce2d411a-bbd8-4f84-9949-12770977011220210809143636.png) 捕获住的异常如下,就可以自己处理了 ![](//img1.jcloudcs.com/developer.jdcloud.com/38ef063a-9e50-4cc5-a5fe-f503dbe0b33620210809143701.png) ![](//img1.jcloudcs.com/developer.jdcloud.com/87980095-a969-4ec6-8ef8-bac08055403a20210809143712.png) ### 5 源码分析 ”太棒了!这样我就可以改造一个属于我自己的线程池了!不过我还有一个问题,这个线程池的原理是啥啊?为什么可以做到线程复用?为什么重写那三个方法就可以达到这样的效果?线程池是不是还有什么秘密瞒着我?小杰你说!“ 摊牌了我不装了,其实我也不知道为啥 不过 源码面前,了无秘密 #### 5.1 位运算 ##### 5.1.1 线程池状态与线程数量 线程池通过一个变量ctl同时来管理自己的状态和线程数量 这就是大佬的设计思想,非常巧妙,里面也有位运算的相关知识 **数据结构预与算法在这里体现的淋漓尽致,要是基础不扎实的话,理解起来就相当费劲啦,可见其重要性** ![](//img1.jcloudcs.com/developer.jdcloud.com/f04acf12-17a6-4c22-a9c1-5d19763e3bbe20210809143923.png) 我们来分别看一下,这都是啥啥啥,平常CRUD的时候怎么没见过呢 ##### 5.1.2 线程池状态 如图所示,线程池一共五种状态 - RUNNING :能够接受新的任务也能处理阻塞队列里的任务 - SHUTDOWN: 调用shutdown()方法之后,这时线程池不能接受新任务,但是可以处理阻塞队列里的任务 - STOP:调用shutdownNow()方法之后,这时线程池不能接受新任务,终止正在处理的任务 - TIDYING:阻塞队列里任务为空且线程池中的线程数量为空时,线程池会是这个状态 - TERMINATED:线程池彻底终止的状态,线程池会从TIDYING变成这个状态 ##### 5.1.3 位运算之左移 - Integer.SIZE :32 (Integer是32位的) - COUNT_BITS : 32-3=29 - 1 << 29 代表二进制的1左移29位 ![](//img1.jcloudcs.com/developer.jdcloud.com/977f7aeb-76bb-4c05-b13c-1279c3578d0820210809144018.png) 根据观察我们可以知道,线程池的五种状态其高三位是不一致的,分别是 - RUNNING : 111 - SHUTDOWN: 000 - STOP: 001 - TIDYING: 010 - TERMINATED:011 还记得一开始的 **COUNT_BITS = Integer.SIZE - 3 **吗? 为什么要-3而不是-2或者-4呢?**因为就是留出这五种状态需要前三位来表示** 话说回来,这个左移,具体是怎么移动的呢?就拿 1<<29举个例子吧 ![](//img1.jcloudcs.com/developer.jdcloud.com/e5a52114-ae5f-482f-9064-cf1560ee028e20210809144106.png) 这是正数的左移,那么负数的左移又要怎么办呢? ##### 5.1.4 负数左移 首先,我们要知道,负数在计算机中是以其正值的补码形式表示 - 原码:负数的绝对值 - 反码:对源码取反 - 补码:反码+1 回到我们刚才说的:-1 << 29 ![](//img1.jcloudcs.com/developer.jdcloud.com/f8b32824-0143-47bb-a5d6-cc365be3894f20210809144225.png) 其中的左移,就是右边补0 ##### 5.1.5 位运算之与运算或运算取反运算 位运算的左移我们看完了,我们再来看下这个 - 获取线程池状态 ```java private static int runStateOf(int c) { return c & ~CAPACITY; } ``` - 获取线程池中的线程数量 ```java private static int workerCountOf(int c) { return c & CAPACITY; } ``` - 初始化ctl ```java private static int ctlOf(int rs, int wc) { return rs | wc; } ``` 这些又是什么意思呢? - ~CAPACITY : 代表CAPACITY取反 ,CAPACITY的二进制 0变成1 ,1变成0 - c & CAPACITY : 代表与操作,不同取0,相同不变 ![](//img1.jcloudcs.com/developer.jdcloud.com/95ab5798-6c3b-48b6-862a-44b9140ec21620210809144423.png) - rs | wc: 代表或操作,有1则是1 ![](//img1.jcloudcs.com/developer.jdcloud.com/784eccf1-8f39-4483-964f-0d7348abdad120210809144517.png) 这就是位运算在线程池里的应用啦 #### 5.2 execute源码分析 ```java public void execute(Runnable command) { //先判断提交的任务是不是空的 if (command == null) throw new NullPointerException(); //获得线程池的ctl(根据这个可以获取线程池状态或者线程池数量) int c = ctl.get(); //判断线程池数量是否小于核心线程数 if (workerCountOf(c) < corePoolSize) { //如果小于核心线程数 //添加worker if (addWorker(command, true)) //添加成功,返回 return; //添加失败,重新获取ctl c = ctl.get(); } // 到这个if时,有两个原因 // 1. 线程池的线程数量已经大于等于核心线程数 // 2.添加worker失败 //当线程池处于Running状态,就要把任务添加至阻塞队列,如果添加成功的话 if (isRunning(c) && workQueue.offer(command)) { int recheck = ctl.get(); // 再次检查线程池的状态是否为running,因为添加任务需要时间,这个时间内线程池的状态可能会变化 // 如果不为running的话,就要从阻塞队列中删除这个任务,然后执行拒绝策略 if (! isRunning(recheck) && remove(command)) reject(command); // 到这个if时,有两种情况 // 1. 线程池状态为runnning //2. 线程池状态不为runnign,但是删除失败 // 判断 当前线程池是否有线程,如果没有的话 else if (workerCountOf(recheck) == 0) // 添加一个线程,来确保线程池有线程可以执行任务 addWorker(null, false); } // 到这个if时,有两个原因 // 1.走到这里说明添加阻塞队列失败, // 2. 线程池不是runninng状态 else if (!addWorker(command, false)) //创建非核心线程也失败的话,执行拒绝策略 reject(command); } ``` 流程图大概如下图所示 ![](//img1.jcloudcs.com/developer.jdcloud.com/8b4aff04-c0df-4c55-a7eb-d4d9c6c0f7e320210809144721.png) #### 5.3 addWorker源码分析 ```java private boolean addWorker(Runnable firstTask, boolean core) { //标志位,一会儿会跳过来 retry: for (;;) { int c = ctl.get(); // 获取线程池状态 int rs = runStateOf(c); // 1. 如果线程池状态是非running状态 // 且 // 2.非( 线程池是SHUTDOWN状态并且firstTask为空并且阻塞队列为不为空 ) if (rs >= SHUTDOWN && ! (rs == SHUTDOWN && firstTask == null && ! workQueue.isEmpty())) return false; for (;;) { //判断线程池线程总数量 int wc = workerCountOf(c); //如果wc大于CAPACITY 或者 // 当 core 为true时,wc大于 corePoolSize // 当 core 为false时,wc大于 maximumPoolSize if (wc >= CAPACITY || wc >= (core ? corePoolSize : maximumPoolSize)) // wc数量超的话,则添加失败 return false; //cas 线程池数量加1 if (compareAndIncrementWorkerCount(c)) //如果成功,则跳到开始 retry处且往下执行,不在进入大循环 break retry; // 到这里就说明,cas失败了 // 失败有两个原因,1.线程数量变了 2.线程池状态变了,不能再添加任务了 // 重新获取ctl c = ctl.get(); // Re-read ctl // 判断当前线程池状态是否和刚才的状态一致,如果不一致 if (runStateOf(c) != rs) //继续大循环 continue retry; } } //线程启动成功的标志 boolean workerStarted = false; //线程添加成功的标志 boolean workerAdded = false; Worker w = null; try { //将提交的任务封装进worker w = new Worker(firstTask); //得到worker中的线程 final Thread t = w.thread; if (t != null) { final ReentrantLock mainLock = this.mainLock; // 加锁 mainLock.lock(); try { //得到线程池状态 int rs = runStateOf(ctl.get()); // 1.如果线程池状态是running状态 // 2. 或者线程池状态是SHUTDOWN并且firstTask为空 // 这样的话代表可以添加线程 if (rs < SHUTDOWN || (rs == SHUTDOWN && firstTask == null)) { if (t.isAlive())//预先检查 t 是否可启动 throw new IllegalThreadStateException(); //将worker添加至workers中 (这是个set集合,真正的线程池) workers.add(w); //判断线程数量 int s = workers.size(); // 更新最大的线程数 if (s > largestPoolSize) largestPoolSize = s; //添加worker成功 workerAdded = true; } } finally { // 释放锁 mainLock.unlock(); } if (workerAdded) { //启动worker中的线程 t.start(); workerStarted = true; } } } finally { //如果启动失败 if (! workerStarted) addWorkerFailed(w); } // 返回启动是否成功 return workerStarted; } ``` ##### 5.3.1 addWorker流程图 ![](//img1.jcloudcs.com/developer.jdcloud.com/35e33e79-e8ec-4db5-bc9e-3565ad10111020210809144809.png) #### 5.4 worker源码 ```java private final class Worker extends AbstractQueuedSynchronizer implements Runnable { private static final long serialVersionUID = 6138294804551838833L; //线程 final Thread thread; //要执行的任务 Runnable firstTask; //完成的任务 volatile long completedTasks; Worker(Runnable firstTask) { //禁止中断直到 runWorker setState(-1); // inhibit interrupts until runWorker //用户提交的任务 this.firstTask = firstTask; //通过创建一个线程,传入的this是woker自身 worker继承了Runnable 那么这个线程在t.start就是调用重写的run()方法了 this.thread = getThreadFactory().newThread(this); } /** Delegates main run loop to outer runWorker */ //真正运行任务的方法 public void run() { runWorker(this); } // 0表示解锁状态 // 1 表示锁定状态 protected boolean isHeldExclusively() { return getState() != 0; } protected boolean tryAcquire(int unused) { // 如果获取锁成功,cas将 state设置为从0到1 if (compareAndSetState(0, 1)) { setExclusiveOwnerThread(Thread.currentThread()); return true; } return false; } // 释放锁,将state设置为0 protected boolean tryRelease(int unused) { setExclusiveOwnerThread(null); setState(0); return true; } public void lock() { acquire(1); } public boolean tryLock() { return tryAcquire(1); } public void unlock() { release(1); } public boolean isLocked() { return isHeldExclusively(); } void interruptIfStarted() { Thread t; // state是1且 当前线程t不等于空且t没有被中断 if (getState() >= 0 && (t = thread) != null && !t.isInterrupted()) { try { // 中断线程 t.interrupt(); } catch (SecurityException ignore) { } } } } ``` #### 5.5 runWorker源码解析 ```java final void runWorker(Worker w) { //得到当前线程 Thread wt = Thread.currentThread(); //这是我们提交的任务 Runnable task = w.firstTask; w.firstTask = null; // 这里设置sate为0,为了响应中断 w.unlock(); // allow interrupts //线程退出的原因 boolean completedAbruptly = true; try { //线程复用的密码就在这里,是一个while循环,判断如果提交的任务不为空或者队列里有任务的话 while (task != null || (task = getTask()) != null) { // 上锁 w.lock(); // 判断是否需要设置中断标识 // 1. 当线程池状态大于等于STOP // 2. 当线程池状态小于STOP 但是线程已经被打断,并且wt没有被中断,则设置中断流程 if ((runStateAtLeast(ctl.get(), STOP) || (Thread.interrupted() && runStateAtLeast(ctl.get(), STOP))) && !wt.isInterrupted()) wt.interrupt(); try { //执行前的函数,用户可以自己拓展 beforeExecute(wt, task); Throwable thrown = null; try { //任务自己的run方法 task.run(); } catch (RuntimeException x) { thrown = x; throw x; } catch (Error x) { thrown = x; throw x; } catch (Throwable x) { thrown = x; throw new Error(x); } finally { //执行后的函数,用户可以自己拓展 afterExecute(task, thrown); } } finally { task = null; //完成任务数+1 w.completedTasks++; //释放锁 w.unlock(); } } // 线程正常退出 completedAbruptly = false; } finally { //销毁线程 processWorkerExit(w, completedAbruptly); } } ``` ##### 5.5.1 runWorker流程图 ![](//img1.jcloudcs.com/developer.jdcloud.com/37d3c142-40c6-4110-a604-7664dccc15db20210809144947.png) #### 5.6 getTask 源码分析 ```java private Runnable getTask() { boolean timedOut = false; // Did the last poll() time out? for (;;) { // 获取ctl int c = ctl.get(); // 得到线程池状态 int rs = runStateOf(c); // 状态大于等于SHUTDOWN 且 (状态大于stop或者阻塞队列为空) if (rs >= SHUTDOWN && (rs >= STOP || workQueue.isEmpty())) { // 线程数-1 decrementWorkerCount(); return null; } //得到线程池线程数量 int wc = workerCountOf(c); // 是否设置超时时间 allowCoreThreadTimeOut默认是false 是否允许核心线程超时回收 //判断线程池数量是否大于核心线程数,如果大于的话 timed为true boolean timed = allowCoreThreadTimeOut || wc > corePoolSize; // 1. wc > 最大线程数 或者 阻塞队列拉取时间超时 都为true // 2. 线程数大于1 或者 阻塞队列为空 if ((wc > maximumPoolSize || (timed && timedOut)) && (wc > 1 || workQueue.isEmpty())) { // cas 使线程数减一 if (compareAndDecrementWorkerCount(c)) return null; continue; } try { //这里 timed为ture的时候,采用带超时时间的获取元素的方法, 否则采取一直阻塞的方法 Runnable r = timed ? workQueue.poll(keepAliveTime, TimeUnit.NANOSECONDS) : workQueue.take(); //获取到任务就返回 if (r != null) return r; // 超时获取不到任务 timedOut设置为true timedOut = true; } catch (InterruptedException retry) { // 获取任务事线程被中断,继续重试 timedOut = false; } } } ``` ##### 5.6.1 getTask 流程图 ![](//img1.jcloudcs.com/developer.jdcloud.com/a267e537-75a9-4f94-853e-53c73cd91bf220210809145043.png) #### 5.7 processWorkerExit源码分析 ```java private void processWorkerExit(Worker w, boolean completedAbruptly) { //completedAbruptly = true 代表线程执行出了异常 // completedAbruptly = tfalse 线程正常结束 if (completedAbruptly) // If abrupt, then workerCount wasn't adjusted // 线程数量减一 decrementWorkerCount(); final ReentrantLock mainLock = this.mainLock; //上锁 mainLock.lock(); try { //任务完成数 completedTaskCount += w.completedTasks; // 线程集合里删掉该woker workers.remove(w); } finally { // 释放锁 mainLock.unlock(); } // 判断是否需要结束线程池 tryTerminate(); // 获得ctl int c = ctl.get(); // 线程池状态是 running 时 if (runStateLessThan(c, STOP)) { // 如果线程不是异常结束的话 if (!completedAbruptly) { // 如果允许核心线程超时取消的话 min = 0 int min = allowCoreThreadTimeOut ? 0 : corePoolSize; // 如果 min = 0 且阻塞队列里还有任务 则保留一个线程来处理 if (min == 0 && ! workQueue.isEmpty()) min = 1;/**/ // 确保当前线程数量至少要为min if (workerCountOf(c) >= min) return; // replacement not needed } // 添加worker addWorker(null, false); } } ``` ##### 5.7.1 processWorkerExit 流程图 ![](//img1.jcloudcs.com/developer.jdcloud.com/3ac1c6ac-3b4d-4d23-af63-832d22eb94be20210809145139.png) ### 6 最后 其上就是我们用线程池execute方法提交任务时的总体流程,里面东西其实非常之多,不得不佩服当时设计者的精妙思想,但是,这还不是线程池的全部,还有两个关闭线程池的方法,还有异常梳理没有详细展开说,为什么线程池有时会吞异常?线程池处理异常到底有哪几种方法?submit方法和execute方法有什么区别等等 大家是喜欢直接讲源码的呢? 还是喜欢有开头这种场景带入的类型的呢?大家要是对后半段源码感兴趣的话 ![](//img1.jcloudcs.com/developer.jdcloud.com/9fffbdd1-2595-485b-b63b-62484631e83220210809145237.png) ------------ ###### 自猿其说Tech-JDL京东物流技术发展部 ###### 部作者:中台技术部 邢焕杰
原创文章,需联系作者,授权转载
上一篇:领域建模之数据模型设计方法论
下一篇:AspectJ浅析系列(一)- 初识AOP
相关文章
Taro小程序跨端开发入门实战
Flutter For Web实践
配运基础数据缓存瘦身实践
自猿其说Tech
文章数
426
阅读量
2163995
作者其他文章
01
深入JDK中的Optional
本文将从Optional所解决的问题开始,逐层解剖,由浅入深,文中会出现Optioanl方法之间的对比,实践,误用情况分析,优缺点等。与大家一起,对这项Java8中的新特性,进行理解和深入。
01
Taro小程序跨端开发入门实战
为了让小程序开发更简单,更高效,我们采用 Taro 作为首选框架,我们将使用 Taro 的实践经验整理了出来,主要内容围绕着什么是 Taro,为什么用 Taro,以及 Taro 如何使用(正确使用的姿势),还有 Taro 背后的一些设计思想来进行展开,让大家能够对 Taro 有个完整的认识。
01
Flutter For Web实践
Flutter For Web 已经发布一年多时间,它的发布意味着我们可以真正地使用一套代码、一套资源部署整个大前端系统(包括:iOS、Android、Web)。渠道研发组经过一段时间的探索,使用Flutter For Web技术开发了移动端可视化编程平台—Flutter乐高,在这里希望和大家分享下使用Flutter For Web实践过程和踩坑实践
01
配运基础数据缓存瘦身实践
在基础数据的常规能力当中,数据的存取是最基础也是最重要的能力,为了整体提高数据的读取能力,缓存技术在基础数据的场景中得到了广泛的使用,下面会重点展示一下配运组近期针对数据缓存做的瘦身实践。
自猿其说Tech
文章数
426
阅读量
2163995
作者其他文章
01
深入JDK中的Optional
01
Taro小程序跨端开发入门实战
01
Flutter For Web实践
01
配运基础数据缓存瘦身实践
添加企业微信
获取1V1专业服务
扫码关注
京东云开发者公众号