通过实例理解CompletableFuture并发框架
CompletableFuture实现Future、CompletionStage;
CompletionStage的定义是作为一个用于异步执行中的处理阶段,适用于lambda表达式计算过程中。Future定义是作为异步返回值容器。下面通过一系列实列先来熟悉CompletableFuture能够完成的功能。
实现功能
- 初始化一个完成的CompletableFuture
/**
* 初始化一个完成的CompletableFuture
*/
static void completableFutureExample0() {
CompletableFuture<String> completableFuture = CompletableFuture.completedFuture("完成结果");
//是否完成
if (completableFuture.isDone()) {
System.out.println("completableFuture已经完成");
}
//getNow()返回
String returnMsg = completableFuture.getNow(null);
if("完成结果".equals(returnMsg)){
System.out.println("completableFuture.getNow()");
}
}
- 创建一个简单的异步stage
/**
* 创建一个简单的异步stage
*/
static void completableFutureExample1() {
CompletableFuture<Void> future = CompletableFuture.runAsync(() -> {
//是否是守护线程
System.out.println(Thread.currentThread().getName() + "是否是守护线程:" + Thread.currentThread().isDaemon());
});
boolean done = future.isDone();
System.out.println("future.isDone() = " + future.isDone());
}
- 对完成的CompletableFuture继续同步进行处理
/**
* 对完成的CompletableFuture继续同步进行处理
*/
static void completableFutureExample2() {
//thenApply如果没有指定执行线程池那么就会在当前线程中执行
CompletableFuture<String> message = CompletableFuture.completedFuture("message").thenApply(s -> {
System.out.println("当前线程为" + Thread.currentThread().getName());
return s.toUpperCase();
});
String result = message.getNow(null);
System.out.println("返回结果为" + result );
}
- 对完成的CompletableFuture继续异步进行处理(有返回结果)
/**
* 对完成的CompletableFuture继续异步进行处理
*/
static void completableFutureExample3() {
//thenApplyAsync默认使用ForkJoinPool.commonPool()线程池
CompletableFuture<String> message = CompletableFuture.completedFuture("message").thenApplyAsync(s -> {
System.out.println("当前线程为" + Thread.currentThread().getName());
return s.toUpperCase();
});
String result = message.getNow(null);
System.out.println("返回结果为" + result );
}
- 异步消费结果(无返回值)
- thenAccept(Consumer<? super T> action)、thenAcceptAsync(Consumer<? super T> action,Executor executor)
/**
* 异步消费结果(无返回值)
*/
static void completableFutureExample4(){
CompletableFuture.completedFuture("value").thenAccept(item ->{
System.out.println(Thread.currentThread().getName() + ":" + item.toUpperCase());
});
}
- 计算时出现异常
/**
* 取消&异常处理
*/
static void completableFutureExample5() throws ExecutionException, InterruptedException {
CompletableFuture<Integer> future = CompletableFuture.completedFuture(Lists.newArrayList(10, 100, 1000))
.thenApplyAsync(item -> {
item.forEach(i -> {
if (i.equals(1000)) {
int i1 = i / 0;
}
});
return -1;
});
//join()不会显示抛出异常
future.join();
//get()方法会抛出异常
future.get();
//cancel()方法会取消计算
future.cancel(true);
}
- BiConsumer同时处理两个stage结果
/**
* BiConsumer支持对两个Stage的结果进行操作
*/
static void completableFutureExample6() {
String original = "message";
StringBuilder result = new StringBuilder();
CompletableFuture.completedFuture(original)
.thenApply(String::toUpperCase)
.thenAcceptBoth(CompletableFuture.completedFuture("value").thenApply(String::toUpperCase),
//BiConsumer
(v1, v2) -> result.append(v1 + "-" + v2));
System.out.println("result:" + result.toString());
}
实现原理
Future接口介绍
JDK5新增了Future接口,用于描述一个异步计算的结果。但是对于结果的获取却是很不方便,只能通过阻塞或者轮询的方式得到任务的结果。
CompletableFuture类介绍
CompletableFuture提供了非常强大的 Future 的扩展功能,可以帮助我们简化异步编程的复杂性,并且提供了函数式编程的能力,可以通过回调的方式处理计算结果,也提供了转换和组合 CompletableFuture 的方法。
对于阻塞或者轮询方式,依然可以通过 CompletableFuture 类的 CompletionStage和Future接口方式支持。CompletableFuture 类声明了 CompletionStage 接口,CompletionStage 接口实际上提供了同步或异步运行计算的舞台,所以我们可以通过实现多个 CompletionStage 命令,并且将这些命令串联在一起的方式实现多个命令之间的触发。