您好!
欢迎来到京东云开发者社区
登录
首页
博文
课程
大赛
工具
用户中心
开源
首页
博文
课程
大赛
工具
开源
更多
用户中心
开发者社区
>
博文
>
从多线程设计模式到对 CompletableFuture 的应用
分享
打开微信扫码分享
点击前往QQ分享
点击前往微博分享
点击复制链接
从多线程设计模式到对 CompletableFuture 的应用
wy****
2024-06-26
IP归属:北京
100浏览
最近在开发 **延保服务** 频道页时,为了提高查询效率,使用到了多线程技术。为了对多线程方案设计有更加充分的了解,在业余时间读完了《图解 Java 多线程设计模式》这本书,觉得收获良多。本篇文章将介绍其中提到的 **Future 模式**,以及在实际业务开发中对该模式的应用,而这些内容对于本书来说只是冰山一角,还是推荐大家有时间去阅读原书。 ### 1\. Future 模式:“先给您提货单” 我们先来看一个场景:假如我们去蛋糕店买蛋糕,下单后,店员会递给我们提货单并告知“请您傍晚来取蛋糕”。到了傍晚我们拿着提货单去取蛋糕,店员会先和我们说“您的蛋糕已经做好了”,然后将蛋糕拿给我们。 如果将下单蛋糕到取蛋糕的过程抽象成一个方法的话,那么意味着这个方法需要花很长的时间才能获取执行结果,**与其一直等待结果,不如先拿着一张“提货单”**,到我们需要取货的时候,再通过它去取,而获取“提货单”的过程是几乎不耗时的,而这个提货单对象就被称为 `Future`,后续便可以通过它来获取方法的返回值。用 Java 来表示这个过程的话,需要使用到 `FutureTask` 和 `Callable` 两个类,如下: ```java public class Example { public static void main(String[] args) throws InterruptedException, ExecutionException { // 预定蛋糕,并定义“提货单” System.out.println("我:预定蛋糕"); FutureTask<String> future = new FutureTask<>(() -> { System.out.println("店员:请您傍晚来取蛋糕"); Thread.sleep(2000); System.out.println("店员:您的蛋糕已经做好了"); return "Holiland"; }); // 开始做蛋糕 new Thread(future).start(); // 去做其他事情 Thread.sleep(1000); System.out.println("我:忙碌中..."); // 取蛋糕 System.out.println("我:取蛋糕 " + future.get()); } } // 运行结果: // 我:预定蛋糕 // 店员:请您傍晚来取蛋糕 // 我:忙碌中... // 店员:您的蛋糕已经做好了 // 我:取蛋糕 Holiland ``` 方法的调用者可以将任务交给其他线程去处理,无需阻塞等待方法的执行,这样调用者便可以继续执行其他任务,并能通过 `Future` 对象获取执行结果。 它的运行原理如下:创建 `FutureTask` 实例时,`Callable` 对象会被传递给构造函数,当线程调用 `FutureTask` 的 `run` 方法时,`Callable` 对象的 `call` 方法也会被执行。调用 `call` 方法的线程会同步地获取结果,并通过 `FutureTask` 的 `set` 方法来记录结果对象,如果 `call` 方法执行期间发生了异常,则会调用 `setException` 方法记录异常。最后,通过调用 `get` 方法获取方法的结果,**注意这里可能会抛出方法执行时产生的异常**。 ```java public void run() { // ... try { // “提货任务” Callable<V> c = callable; if (c != null && state == NEW) { V result; boolean ran; try { // 调用 callable 的 call 方法 result = c.call(); ran = true; } catch (Throwable ex) { result = null; ran = false; // 捕获并设置异常 setException(ex); } if (ran) // 为结果赋值 set(result); } } finally { // ... } } protected void set(V v) { if (STATE.compareAndSet(this, NEW, COMPLETING)) { // 将结果赋值给 outcome 全局变量,供 get 时获取 outcome = v; // 修改状态为 NORMAL STATE.setRelease(this, NORMAL); // final state finishCompletion(); } } protected void setException(Throwable t) { if (STATE.compareAndSet(this, NEW, COMPLETING)) { // 将异常赋值给 outcome 变量,供 get 时抛出 outcome = t; // 修改状态为 EXCEPTIONAL STATE.setRelease(this, EXCEPTIONAL); // final state finishCompletion(); } } public V get() throws InterruptedException, ExecutionException { int s = state; // 未完成时阻塞等一等 if (s <= COMPLETING) s = awaitDone(false, 0L); return report(s); } private V report(int s) throws ExecutionException { Object x = outcome; // 正常结束的话能正常获取到结果 if (s == NORMAL) return (V)x; // 否则会抛出异常,注意如果执行中出现异常,调用 get 时会被抛出 if (s >= CANCELLED) throw new CancellationException(); throw new ExecutionException((Throwable)x); } ``` 现在对 **Future 模式** 已经有了基本的了解:它通过 `Future` 接口来表示未来的结果,实现 **调用者与执行者之间的解耦**,**提高系统的吞吐量和响应速度**,那在实践中对该模式是如何使用的呢? ### 2\. 对 Future 模式的实践 因为 **延保服务** 频道页访问量大且对接口性能要求较高,单线程处理并不能满足性能要求,所以应用了 **Future 模式** 来提高查询效率,但是并没有借助上文所述的 `FutureTask` 来实现,而是使用了 `CompletableFuture` 工具类,它们的实现原理基本一致,但是后者提供的方法和对 **链式编程** 的支持使代码更加简洁,实现更加容易(相关 API 参考见文末)。 如下是使用 `CompletableFuture` 异步多线程查询订单列表的逻辑,根据配置的 `pageNo` 分多条线程查询各页的订单数据: ```java List<OrderListInfo> result = new ArrayList<>(); // 并发查询订单列表 List<CompletableFuture<List<OrderListInfo>>> futureList = new ArrayList<>(); try { // 配置需要查询的页数 pageNo,并发查询不同页码的订单 for (int i = 1; i <= pageNo; i++) { int curPageNo = i; CompletableFuture<List<OrderListInfo>> future = CompletableFuture.supplyAsync( () -> getOrderInfoList(userNo, curPageNo), threadPoolExecutor); futureList.add(future); } // 等待所有线程处理完毕,并封装结果值 for (CompletableFuture<List<OrderListInfo>> future : futureList) { result.addAll(future.get()); } } catch (Exception e) { log.error("并发查询用户订单信息异常", e); } ``` 这段代码中对异常的处理能进行优化:第 15 行代码,如果某条线程查询订单列表时发生异常,那么在调用 `get` 方法时会抛出该异常,被 `catch` 后返回空结果,即使有其他线程查询成功,这些订单结果值也会被忽略掉,可以针对这一点进行优化,如下: ```java List<OrderListInfo> result = new ArrayList<>(); // 并发查询订单列表 List<CompletableFuture<List<OrderListInfo>>> futureList = new ArrayList<>(); try { // 配置需要查询的页数 pageNo,并发查询不同页码的订单 for (int i = 1; i <= pageNo; i++) { int curPageNo = i; CompletableFuture<List<OrderListInfo>> future = CompletableFuture .supplyAsync(() -> getOrderInfoList(userNo, curPageNo), threadPoolExecutor) // 添加异常处理 .exceptionally(e -> { log.error("查询用户订单信息异常", e); return Collections.emptyList(); }); futureList.add(future); } // 等待所有线程处理完毕,并封装结果值 for (CompletableFuture<List<OrderListInfo>> future : futureList) { result.addAll(future.get()); } } catch (Exception e) { log.error("并发查询用户订单信息异常", e); } ``` 优化后针对查询发生异常的任务打印异常日志,并返回空集合,这样即使单线程查询失败,也不会影响到其他线程查询成功的结果。 `CompletableFuture` 还提供了 `allOf` 方法,它返回的 `CompletableFuture` 对象在所有 `CompletableFuture` 执行完成时完成,相比于对每个任务都调用 `get` 阻塞等待任务完成的实现可读性更好,改造后代码如下: ```java List<OrderListInfo> result = new ArrayList<>(); // 并发查询订单列表 CompletableFuture<List<OrderListInfo>>[] futures = new CompletableFuture[pageNo]; // 配置需要查询的页数 pageNo,并发查询不同页码的订单 for (int i = 1; i <= pageNo; i++) { int curPageNo = i; CompletableFuture<List<OrderListInfo>> future = CompletableFuture .supplyAsync(() -> getOrderInfoList(userNo, curPageNo), threadPoolExecutor) // 添加异常处理 .exceptionally(e -> { log.error("查询用户订单信息异常", e); return Collections.emptyList(); }); futures[i - 1] = future; } try { // 等待所有线程处理完毕 CompletableFuture.allOf(futures).get(); for (CompletableFuture<List<OrderListInfo>> future : futures) { List<OrderListInfo> orderInfoList = future.get(); if (CollectionUtils.isEmpty(orderInfoList)) { result.addAll(orderInfoList); } } } catch (Exception e) { log.error("处理用户订单结果信息异常", e); } ``` > **Tips**: `CompletableFuture` 的设计初衷是支持异步编程,所以应尽量避免在`CompletableFuture` 链中使用 `get()/join()` 方法,因为这些方法会阻塞当前线程直到`CompletableFuture` 完成,应该在必须使用该结果值时才调用它们。 #### 相关的模式:命令模式 命令模式能将操作的调用者和执行者解耦,它能很容易的与 **Future 模式** 结合,以查询订单的任务为例,我们可以将该任务封装为“命令”对象的形式,执行时为每个线程提交一个命令,实现解耦并提高扩展性。在命令模式中,命令对象需要 **支持撤销和重做**,那么这便在查询出现异常时,提供了补偿处理的可能,命令模式类图关系如下: ![命令模式.png](https://s3.cn-north-1.jdcloud-oss.com/shendengbucket1/2024-06-19-15-0819U8P6L19YQhoDB9f.png) ### 3.《图解Java多线程设计模式》书籍推荐 我觉得本书算得上是一本老书:05 年出版的基于 JDK1.5 的Java多线程书籍,相比于目前我们常用的 JDK1.8 和时髦的 JDK21,在读之前总会让人觉得有一种过时的感觉。但是当我读完时,发现其中的模式能对应上代码中的处理逻辑:对 `CompletableFuture` 的使用正对应了其中的 **Future 模式**(异步获取其他线程的执行结果)等等,所以我觉得模式的应用不会局限于技术的新老,它是在某种情况下,研发人员共识或通用的解决方案,在知晓某种模式,采用已有的技术实现它是容易的,而反过来在只掌握技术去探索模式是困难且没有方向的。 同时,我也在考虑一个问题:对于新人学习多线程技术来说,究竟适不适合直接从模式入门呢?因为我对设计模式有了比较多的实践经验,所以对“模式”相关的内容足够敏感,如果新人没有这些经验的话,这对他们来说会不会更像是一个个知识点的堆砌呢?好在的是,本书除了模式相关的内容,对基础知识也做足了铺垫,而且提出的关于多线程编程的思考点也是非常值得参考和学习的,以线程互斥和协同为例,书中谈到:在对线程进行互斥处理时需要考虑 **“要保护的东西是什么”**,这样便能够 **清晰的确定锁的粒度**;对于线程的协同,书中提到的是需要考虑 **“放在中间的东西是什么”**,直接的抛出这个观点是不容易理解的,“中间的东西”是在多线程的 **生产者和消费者模式** 中提出的,部分线程负责生产,生产完成后将对象放在“中间”,部分线程负责消费,消费时取的便是“中间”的对象,而合理规划这些中间的东西便能 **消除生产者和消费者之间的速度差异**,提高系统的吞吐量和响应速度。而再深入考虑这两个角度时,线程的互斥和协同其实是内外统一的:为了让线程协调运行,必须执行互斥处理,以防止共享的内容被破坏,而线程的互斥是为了线程的协调运行才进行的必要操作。 *** ### 附:CompletableFuture 常用 API #### 使用 supplyAsync 方法异步执行任务,并返回 CompletableFuture 对象 如下代码所示,调用 `CompletableFuture.supplyAsync` 静态方法异步执行查询逻辑,并返回一个新的 `CompletableFuture` 对象 ```java CompletableFuture<List<Object>> future = CompletableFuture.supplyAsync(() -> doQuery(), executor); ``` #### 使用 join 方法阻塞获取完成结果 如下代码所示,在封装结果前,调用 `join` 方法阻塞等待获取结果 ```java futureList.forEach(CompletableFuture::join); ``` 它与 `get` 方法的主要区别在于,`join` 方法抛出的是未经检查的异常 `CompletionException`,并将原始异常作为其原因,这意味着我们可以不需要在方法签名中声明它或在调用 `join` 方法的地方进行异常处理,而 `get` 方法会抛出 `InterruptedException` 和 `ExecutionException` 异常,我们必须对它进行处理,`get` 方法源码如下: ```java public T get() throws InterruptedException, ExecutionException { Object r; if ((r = result) == null) r = waitingGet(true); return (T) reportGet(r); } ``` #### 用 thenApply(Function) 和 thenAccept(Consumer) 等回调函数处理结果 如下是使用 `thenApply()` 方法对 `CompletableFuture` 的结果进行转换的操作: ```java CompletableFuture<String> future = CompletableFuture.supplyAsync(() -> "Hello") .thenApply(greeting -> greeting + " World"); ``` #### 使用 exceptionally() 处理 CompletableFuture 中的异常 `CompletableFuture` 提供了`exceptionally()` 方法来处理异常,这是一个非常重要的步骤。如果在 `CompletableFuture` 的运行过程中抛出异常,那么这个异常会被传递到最终的结果中。如果没有适当的异常处理,那么在调用 `get()` 或 `join()` 方法时可能会抛出异常。 ```java CompletableFuture<String> future = CompletableFuture.supplyAsync(() -> { if (true) { throw new RuntimeException("Exception occurred"); } return "Hello, World!"; }).exceptionally(e -> "An error occurred"); ``` #### 使用 allOf() 和 anyOf() 处理多个 CompletableFuture 如果有多个 `CompletableFuture` 需要处理,可以使用 `CompletableFuture.allOf()` 或者 `CompletableFuture.anyOf()`。`allOf()` 在所有的 `CompletableFuture` 完成时完成,而 `anyOf()` 则会在任意一个 `CompletableFuture` 完成时完成。 #### complete()、completeExceptionally()、cancel() 方法 `CompletableFuture` 的运行是在调用了 `complete()`、`completeExceptionally()`、`cancel()` 等方法后才会被标记为完成。如果没有正确地完成 `CompletableFuture`,那么在调用 `get()` 方法时可能会永久阻塞。这三个方法在 Java 并发编程中有着重要的应用。以下是这三个方法的常见使用场景: - `complete(T value)`: 此方法用于显式地完成一个 `CompletableFuture`,并设置它的结果值。这在你需要在某个计算完成时,手动设置 `CompletableFuture` 的结果值的场景中非常有用。例如,你可能在一个异步操作完成时,需要设置 `CompletableFuture` 的结果值。 ```java CompletableFuture<String> future = new CompletableFuture<>(); // Some asynchronous operation future.complete("Operation Result"); ``` - `completeExceptionally(Throwable ex)`: 此方法用于显式地以异常完成一个 `CompletableFuture`。这在你需要在某个计算失败时,手动设置 `CompletableFuture` 的异常的场景中非常有用。例如,你可能在一个异步操作失败时,需要设置 `CompletableFuture` 的异常。 ```java CompletableFuture<String> future = new CompletableFuture<>(); // Some asynchronous operation future.completeExceptionally(new RuntimeException("Operation Failed")); ``` - `cancel(boolean mayInterruptIfRunning)`: 此方法用于取消与 `CompletableFuture` 关联的计算。这在你需要取消一个长时间运行的或者不再需要的计算的场景中非常有用。例如,你可能在用户取消操作或者超时的情况下,需要取消 `CompletableFuture` 的计算。 ```java CompletableFuture<String> future = CompletableFuture.supplyAsync(() -> { // Long running operation }); // Some condition future.cancel(true); ``` 这些方法都是线程安全的,可以从任何线程中调用。 #### 使用 thenCompose() 处理嵌套的 CompletableFuture 如果在处理 `CompletableFuture` 的结果时又创建了新的`CompletableFuture`,那么就会产生嵌套的 `CompletableFuture`。这时可以使用 `thenCompose()` 方法来避免 `CompletableFuture` 的嵌套,如下代码所示: ```java CompletableFuture<String> completableFuture = CompletableFuture.supplyAsync(() -> "Hello") .thenCompose(s -> CompletableFuture.supplyAsync(() -> s + " World")); ``` #### 使用 thenCombine() 处理两个 CompletableFuture 的结果 ```java CompletableFuture<String> completableFuture = CompletableFuture.supplyAsync(() -> "Hello") .thenCombine(CompletableFuture.supplyAsync(() -> " World"), (s1, s2) -> s1 + s2); ``` *** 欢迎大家在京东APP内搜索 **“京东延保”** 跳转延保服务页\~
上一篇:不存在的场景真的不存在吗?
下一篇:无任何数学公式理解大模型基本原理
wy****
文章数
32
阅读量
2476
作者其他文章
01
高性能MySQL实战(一):表结构
最近因需求改动新增了一些数据库表,但是在定义表结构时,具体列属性的选择有些不知其所以然,索引的添加也有遗漏和不规范的地方,所以我打算为创建一个高性能表的过程以实战的形式写一个专题,以此来学习和巩固这些知识。1. 实战我使用的 MySQL 版本是 5.7,建表 DDL 语句如下所示:根据需求创建 接口调用日志 数据库表,请大家浏览具体字段的属性信息,它们有不少能够优化的点。CREATE TABLE
01
分布式服务高可用实现:复制
1. 为什么需要复制我们可以考虑如下问题:当数据量、读取或写入负载已经超过了当前服务器的处理能力,如何实现负载均衡?希望在单台服务器出现故障时仍能继续工作,这该如何实现?当服务的用户遍布全球,并希望他们访问服务时不会有较大的延迟,怎么才能统一用户的交互体验?这些问题其实都能通过 “复制” 来解决:复制,即在不同的节点上保存相同的副本,提供数据冗余。如果一些节点不可用,剩余的节点仍然可以提供数据服务
01
高性能MySQL实战(三):性能优化
这篇主要介绍对慢 SQL 优化的一些手段,而在讲解具体的优化措施之前,我想先对 EXPLAIN 进行介绍,它是我们在分析查询时必要的操作,理解了它输出结果的内容更有利于我们优化 SQL。为了方便大家的阅读,在下文中规定类似 key1 的表示二级索引,key_part1 表示联合索引的第一部分,unique_key1 则表示唯一二级索引,primary_key 表示主键索引。高性能MySQL实战(一
01
从2PC和容错共识算法讨论zookeeper中的Create请求
最近在读《数据密集型应用系统设计》,其中谈到了zookeeper对容错共识算法的应用。这让我想到之前参考的zookeeper学习资料中,误将容错共识算法写成了2PC(两阶段提交协议),所以准备以此文对共识算法和2PC做梳理和区分,也希望它能帮助像我一样对这两者有误解的同学。1. 2PC(两阶段提交协议)两阶段提交 (two-phase commit) 协议是一种用于实现 跨多个节点的原子事务(分布
wy****
文章数
32
阅读量
2476
作者其他文章
01
高性能MySQL实战(一):表结构
01
分布式服务高可用实现:复制
01
高性能MySQL实战(三):性能优化
01
从2PC和容错共识算法讨论zookeeper中的Create请求
添加企业微信
获取1V1专业服务
扫码关注
京东云开发者公众号