CompletableFuture任务执行与结果收集详解
时间:2025-08-05 14:27:29 356浏览 收藏
IT行业相对于一般传统行业,发展更新速度更快,一旦停止了学习,很快就会被行业所淘汰。所以我们需要踏踏实实的不断学习,精进自己的技术,尤其是初学者。今天golang学习网给大家整理了《CompletableFuture任务执行与结果收集详解》,聊聊,我们一起来看看吧!
1. 问题背景与挑战
在异步编程中,CompletableFuture是处理并发任务的强大工具。然而,当面临需要严格顺序执行的异步任务链,并且需要收集每个任务的结果时,可能会遇到一些挑战。例如,业务场景可能要求前一个任务完成后,后一个任务才能开始,同时我们希望将所有任务的计算结果汇总到一个集合中。
考虑一个耗时的业务处理函数,它返回一个CompletionStage
import java.time.LocalDateTime; import java.util.concurrent.CompletableFuture; import java.util.concurrent.CompletionStage; public class SequentialTaskProcessor { private CompletionStageprocess(int a) { return CompletableFuture.supplyAsync(() -> { System.err.printf("%s dispatch %d\n", LocalDateTime.now(), a); // 模拟长时间运行的业务处理 try { Thread.sleep(10); // 增加延迟以观察效果 } catch (InterruptedException e) { Thread.currentThread().interrupt(); } return a + 10; }).whenCompleteAsync((e, t) -> { if (t != null) System.err.printf("!!! error processing '%d' !!!\n", a); System.err.printf("%s finish %d\n", LocalDateTime.now(), e); }); }
我们的目标是多次调用process函数,确保它们按顺序执行,并将每次的结果收集到一个List
1.1 常见误区:thenApplyAsync与内部join()
一种直观的尝试是使用thenApplyAsync并在其内部调用process(element).toCompletableFuture().join()。
import java.util.ArrayList; import java.util.List; import java.util.stream.Collectors; import java.util.stream.IntStream; // ... (process方法同上) public void firstApproach() { Listarr = IntStream.range(1, 10).boxed().collect(Collectors.toList()); CompletionStage > result = CompletableFuture.completedFuture(new ArrayList<>()); for (Integer element : arr) { result = result.thenApplyAsync((ret) -> { // 在thenApplyAsync内部阻塞等待前一个CompletableFuture完成 Integer a = process(element).toCompletableFuture().join(); ret.add(a); return ret; }); } List
computeResult = result.toCompletableFuture().join(); System.out.println("First approach results: " + computeResult); }
问题分析: 虽然这种方法能够实现顺序执行并收集结果,但它效率低下。thenApplyAsync本身会在一个线程池中执行其回调,而回调内部的process(element).toCompletableFuture().join()又会阻塞这个线程,直到process方法返回的CompletableFuture完成。这意味着一个逻辑步骤可能间接占用两个线程资源(一个用于thenApplyAsync的回调,另一个用于process内部的异步任务),造成线程资源的浪费和不必要的阻塞。观察输出日志,会发现dispatch和finish的时间戳是严格顺序的,但线程利用率不高。
1.2 常见误区:thenCombineAsync的并发陷阱
另一种尝试是使用thenCombineAsync,期望它能将前一个阶段的结果与新任务的结果结合:
// ... (process方法同上) public void secondApproach() { Listarr = IntStream.range(1, 10).boxed().collect(Collectors.toList()); CompletionStage > result = CompletableFuture.completedFuture(new ArrayList<>()); for (Integer element : arr) { // process(element) 在这里被立即调用,而非等待前一个阶段完成 result = result.thenCombineAsync(process(element), (array, ret) -> { array.add(ret); return array; }); } List
computeResult = result.toCompletableFuture().join(); System.out.println("Second approach results: " + computeResult); }
问题分析: 这种方法会导致任务并发执行,而非顺序执行。thenCombineAsync的第二个参数CompletionStage other在方法调用时就会被评估并启动。这意味着在循环中,所有的process(element)调用几乎是同时发起的,它们会并发执行。观察输出日志,会发现dispatch的时间戳是交错的,这违反了顺序执行的要求。thenCombineAsync适用于两个独立的异步任务都完成后再进行合并的场景,而不是链式顺序执行的场景。
2. 解决方案:顺序链式执行与结果收集
为了实现任务的顺序执行并高效地收集结果,我们需要利用CompletableFuture提供的更高级的组合方法,特别是thenCompose。
2.1 方案一:使用外部列表收集结果
这种方法通过thenCompose确保任务顺序执行,并使用thenAccept将结果添加到循环外部维护的列表中。
import java.util.ArrayList; import java.util.List; import java.util.concurrent.CompletableFuture; import java.util.concurrent.CompletionStage; import java.util.stream.Collectors; import java.util.stream.IntStream; // ... (process方法同上) public class SequentialTaskProcessor { // ... process 方法 ... public void solutionOne() { Listarr = IntStream.range(1, 10).boxed().collect(Collectors.toList()); // 初始化一个表示链式操作开始的CompletableFuture,其结果类型为Void CompletionStage loopStage = CompletableFuture.completedFuture(null); final List resultList = new ArrayList<>(); // 外部结果列表 for (Integer element : arr) { loopStage = loopStage // thenCompose确保前一个阶段完成后,才执行process(element) .thenCompose(v -> process(element)) // thenAccept将process的结果添加到外部列表中,并返回CompletionStage .thenAccept(resultList::add); } // 阻塞等待所有任务完成 loopStage.toCompletableFuture().join(); System.out.println("Solution One results: " + resultList); } public static void main(String[] args) { SequentialTaskProcessor processor = new SequentialTaskProcessor(); System.out.println("--- Running Solution One ---"); processor.solutionOne(); System.out.println("\n--- Running Solution Two ---"); processor.solutionTwo(); } }
原理详解:
- CompletionStage
loopStage = CompletableFuture.completedFuture(null);:我们从一个已完成的CompletableFuture开始,其结果类型为Void。这提供了一个初始的“钩子”来启动任务链。 - loopStage = loopStage.thenCompose(v -> process(element)):thenCompose是这里的关键。它接收一个函数,该函数返回一个新的CompletionStage。这意味着process(element)只会在loopStage(即前一个任务)完成后才会被调用并开始执行。这确保了任务的严格顺序性。thenCompose的作用是将CompletionStage
(来自loopStage)和CompletionStage (来自process)的结果扁平化为一个新的CompletionStage 。 - .thenAccept(resultList::add):在process(element)完成并产生结果后,thenAccept会异步地将该结果添加到resultList中。thenAccept本身返回一个CompletionStage
,这使得loopStage可以继续作为链的下一个开始点,而不必传递一个累积的列表。 - loopStage.toCompletableFuture().join():最后,我们阻塞等待整个任务链的最终阶段完成。此时,resultList将包含所有任务的顺序结果。
这种方法简洁且高效,避免了不必要的阻塞和线程浪费。
2.2 方案二:在链中传递并累积列表
另一种方法是在CompletableFuture链中直接传递并累积结果列表。
import java.util.ArrayList; import java.util.List; import java.util.concurrent.CompletableFuture; import java.util.concurrent.CompletionStage; import java.util.stream.Collectors; import java.util.stream.IntStream; // ... (process方法同上) public class SequentialTaskProcessor { // ... process 方法 ... public void solutionTwo() { Listarr = IntStream.range(1, 10).boxed().collect(Collectors.toList()); // 初始化一个携带空列表的CompletableFuture CompletionStage > listStage = CompletableFuture.completedFuture(new ArrayList<>()); for (Integer element : arr) { listStage = listStage // thenCompose确保前一个阶段完成后,才执行process(element) .thenCompose(list -> process(element) // thenAccept将process的结果添加到当前列表 .thenAccept(list::add) // thenApply将CompletionStage
转换回CompletionStage > .thenApply(v -> list) ); } // 阻塞等待所有任务完成,并获取最终的列表 List
resultList = listStage.toCompletableFuture().join(); System.out.println("Solution Two results: " + resultList); } // ... main 方法 ... }
原理详解:
- CompletionStage
- > listStage = CompletableFuture.completedFuture(new ArrayList<>());:我们从一个包含空列表的CompletableFuture开始,这个列表将作为结果的累积器。
- listStage = listStage.thenCompose(list -> ...):同样使用thenCompose来确保顺序执行。这里的list参数是前一个阶段传递过来的结果列表。
- process(element).thenAccept(list::add):在thenCompose的函数内部,我们启动process(element)任务。当它完成时,使用thenAccept将结果添加到当前list中。
- .thenApply(v -> list):这是关键一步。thenAccept返回的是CompletionStage
,但为了将list传递给下一个迭代,我们需要将其结果类型转换回CompletionStage - >。thenApply(v -> list)实现了这一点:它在thenAccept完成后被调用,并简单地返回当前的list对象,从而将列表传递给链中的下一个thenCompose。
- List
resultList = listStage.toCompletableFuture().join();:最终,整个链完成时,listStage的结果就是包含了所有累积结果的列表。
3. 总结与注意事项
两种解决方案都能够有效地实现异步任务的顺序执行和结果收集,并且都避免了线程阻塞和并发执行的问题。
- 方案一(外部列表):
- 优点:代码逻辑相对直观,loopStage只关心任务的完成状态(Void),结果列表在外部维护。
- 适用场景:当任务链的中间结果不需要在CompletableFuture链中传递,只需最终汇总时。
- 方案二(链中传递列表):
- 优点:结果列表直接作为CompletableFuture链的一部分进行传递和累积,整个操作封装在一个CompletableFuture中,最终结果直接从CompletableFuture获取。
- 适用场景:当需要将累积的结果作为链中下一个任务的输入,或者更倾向于将所有状态变化封装在CompletableFuture链内部时。
注意事项:
- 异常处理:在实际应用中,需要为CompletableFuture链添加适当的异常处理机制,例如使用exceptionally、handle等方法来处理任务执行过程中可能出现的错误。
- 线程池管理:CompletableFuture默认使用ForkJoinPool.commonPool()。对于长时间运行或IO密集型任务,建议为supplyAsync、thenApplyAsync等方法指定自定义的Executor,以更好地控制线程资源,避免阻塞公共线程池。
- 任务原子性:确保process方法内部的业务逻辑是线程安全的,如果它操作共享资源,需要额外的同步机制。本文的重点在于CompletableFuture的链式调用,而非process方法本身的线程安全性。
通过理解thenCompose的扁平化特性和thenAccept/thenApply的组合使用,我们可以更灵活、高效地构建复杂的异步任务流,满足各种顺序执行和结果收集的需求。
到这里,我们也就讲完了《CompletableFuture任务执行与结果收集详解》的内容了。个人认为,基础知识的学习和巩固,是为了更好的将其运用到项目中,欢迎关注golang学习网公众号,带你了解更多关于的知识点!
-
501 收藏
-
501 收藏
-
501 收藏
-
501 收藏
-
501 收藏
-
457 收藏
-
429 收藏
-
305 收藏
-
258 收藏
-
241 收藏
-
485 收藏
-
357 收藏
-
105 收藏
-
386 收藏
-
132 收藏
-
163 收藏
-
450 收藏
-
- 前端进阶之JavaScript设计模式
- 设计模式是开发人员在软件开发过程中面临一般问题时的解决方案,代表了最佳的实践。本课程的主打内容包括JS常见设计模式以及具体应用场景,打造一站式知识长龙服务,适合有JS基础的同学学习。
- 立即学习 542次学习
-
- GO语言核心编程课程
- 本课程采用真实案例,全面具体可落地,从理论到实践,一步一步将GO核心编程技术、编程思想、底层实现融会贯通,使学习者贴近时代脉搏,做IT互联网时代的弄潮儿。
- 立即学习 511次学习
-
- 简单聊聊mysql8与网络通信
- 如有问题加微信:Le-studyg;在课程中,我们将首先介绍MySQL8的新特性,包括性能优化、安全增强、新数据类型等,帮助学生快速熟悉MySQL8的最新功能。接着,我们将深入解析MySQL的网络通信机制,包括协议、连接管理、数据传输等,让
- 立即学习 498次学习
-
- JavaScript正则表达式基础与实战
- 在任何一门编程语言中,正则表达式,都是一项重要的知识,它提供了高效的字符串匹配与捕获机制,可以极大的简化程序设计。
- 立即学习 487次学习
-
- 从零制作响应式网站—Grid布局
- 本系列教程将展示从零制作一个假想的网络科技公司官网,分为导航,轮播,关于我们,成功案例,服务流程,团队介绍,数据部分,公司动态,底部信息等内容区块。网站整体采用CSSGrid布局,支持响应式,有流畅过渡和展现动画。
- 立即学习 484次学习