通过实例理解CompletableFuture并发框架


通过实例理解CompletableFuture并发框架

CompletableFuture实现FutureCompletionStage;
CompletionStage的定义是作为一个用于异步执行中的处理阶段,适用于lambda表达式计算过程中。Future定义是作为异步返回值容器。下面通过一系列实列先来熟悉CompletableFuture能够完成的功能。

实现功能

  1. 初始化一个完成的CompletableFuture

/**
 * 初始化一个完成的CompletableFuture
 */
static void completableFutureExample0() {
    CompletableFuture<String> completableFuture = CompletableFuture.completedFuture("完成结果");
    //是否完成
    if (completableFuture.isDone()) {
        System.out.println("completableFuture已经完成");
    }

    //getNow()返回
    String returnMsg = completableFuture.getNow(null);
    if("完成结果".equals(returnMsg)){
        System.out.println("completableFuture.getNow()");
    }
}
  1. 创建一个简单的异步stage

/**
 * 创建一个简单的异步stage
 */
static void completableFutureExample1() {
    CompletableFuture<Void> future = CompletableFuture.runAsync(() -> {
        //是否是守护线程
        System.out.println(Thread.currentThread().getName() + "是否是守护线程:" + Thread.currentThread().isDaemon());
    });
    boolean done = future.isDone();
    System.out.println("future.isDone() = " + future.isDone());
}
  1. 对完成的CompletableFuture继续同步进行处理
/**
 * 对完成的CompletableFuture继续同步进行处理
 */
static void completableFutureExample2() {
    //thenApply如果没有指定执行线程池那么就会在当前线程中执行
    CompletableFuture<String> message = CompletableFuture.completedFuture("message").thenApply(s -> {
        System.out.println("当前线程为" + Thread.currentThread().getName());
        return s.toUpperCase();
    });
    String result = message.getNow(null);
    System.out.println("返回结果为" + result );
}
  1. 对完成的CompletableFuture继续异步进行处理(有返回结果)
/**
 * 对完成的CompletableFuture继续异步进行处理
 */
static void completableFutureExample3() {
    //thenApplyAsync默认使用ForkJoinPool.commonPool()线程池
    CompletableFuture<String> message = CompletableFuture.completedFuture("message").thenApplyAsync(s -> {
        System.out.println("当前线程为" + Thread.currentThread().getName());
        return s.toUpperCase();
    });
    String result = message.getNow(null);
    System.out.println("返回结果为" + result );
}
  1. 异步消费结果(无返回值)
  • thenAccept(Consumer<? super T> action)、thenAcceptAsync(Consumer<? super T> action,Executor executor)
/**
 * 异步消费结果(无返回值)
 */
static void completableFutureExample4(){
    CompletableFuture.completedFuture("value").thenAccept(item ->{
       System.out.println(Thread.currentThread().getName() + ":" + item.toUpperCase());
    });
}
  1. 计算时出现异常
/**
 * 取消&异常处理
 */
static void completableFutureExample5() throws ExecutionException, InterruptedException {
    CompletableFuture<Integer> future = CompletableFuture.completedFuture(Lists.newArrayList(10, 100, 1000))
            .thenApplyAsync(item -> {
                item.forEach(i -> {
                    if (i.equals(1000)) {
                        int i1 = i / 0;
                    }
                });
                return -1;
            });
    //join()不会显示抛出异常
    future.join();

    //get()方法会抛出异常
    future.get();

    //cancel()方法会取消计算
    future.cancel(true);
}
  1. BiConsumer同时处理两个stage结果
/**
 * BiConsumer支持对两个Stage的结果进行操作
 */
static void completableFutureExample6() {
    String original = "message";
    StringBuilder result = new StringBuilder();
    CompletableFuture.completedFuture(original)
            .thenApply(String::toUpperCase)
            .thenAcceptBoth(CompletableFuture.completedFuture("value").thenApply(String::toUpperCase),
                    //BiConsumer
                    (v1, v2) -> result.append(v1 + "-" + v2));
    System.out.println("result:" + result.toString());
}

实现原理

Future接口介绍

JDK5新增了Future接口,用于描述一个异步计算的结果。但是对于结果的获取却是很不方便,只能通过阻塞或者轮询的方式得到任务的结果。

CompletableFuture类介绍

CompletableFuture提供了非常强大的 Future 的扩展功能,可以帮助我们简化异步编程的复杂性,并且提供了函数式编程的能力,可以通过回调的方式处理计算结果,也提供了转换和组合 CompletableFuture 的方法。

对于阻塞或者轮询方式,依然可以通过 CompletableFuture 类的 CompletionStage和Future接口方式支持。CompletableFuture 类声明了 CompletionStage 接口,CompletionStage 接口实际上提供了同步或异步运行计算的舞台,所以我们可以通过实现多个 CompletionStage 命令,并且将这些命令串联在一起的方式实现多个命令之间的触发。

扩展知识

参考文档

通过实例理解 JDK8 的 CompletableFuture


  TOC