强大的CompletableFuture
共 18919字,需浏览 38分钟
·
2021-08-19 14:20
异步计算
异步调用其实就是实现一个可无需等待被调用函数的返回值而让操作继续运行的方法。打个比方,你拿了一袋子衣 服到你中意的干洗店去洗。干洗店的员工会给你张发票,告诉你什么时候你的衣服会洗好。同时,你可以去做其他的事情。洗衣服对于你就是一个异步的过程。
在开发中如何使用异步呢?
Future 接口
Future是JDK5新增的接口,用于描述一个异步计算的结果。它提供了检查计算是否完成的方法,以等待计算的完成,并获取计算的结果。计算完成后只能使用 get 方法来获取结果。使用Future以异步的方式执行一个耗时的操作,完成上面洗衣服的例子。
public class FutureDemo {
public static void main(String[] args) throws ExecutionException, InterruptedException {
//1. 创建ExecutorService,通过它你可以向线程池提交任务
ExecutorService executor = Executors.newCachedThreadPool();
//2.向executorService提交一个callable对象
Future<String> future = executor.submit(()->{
try {
System.out.println("把衣服交给干洗店");
Thread.sleep(5000);
} catch (InterruptedException e) {
e.printStackTrace();
}
return "洗完了";
});
//3.可以做其他事情了
//goShopping();
//4.获取异步执行的结果
System.out.println(future.get());
System.out.println("finish!");
}
}
CompletableFuture
为什么有了Future, JDK8之后又提供了CompletableFuture,因为Future有其局限性; 比如以下情况
将两个异步计算合并为一个; 等待多个Future的所有任务都完成 仅等待Future集合中最快结束的任务完成,并返回它的结果 Future完成后,需要做一些事情。
CompletableFuture即实现了Future接口,又实现了CompletionStage接口, CompletionStage也是JDK8新增的接口,他给CompletableFuture提供了函数式编程的能力,通过他,我们可以在一个CompletableFuture结果上多次流式调用,得到最后的结果。
创建异步对象
CompletableFuture提供了runAsync, supplyAsync静态方法创建一个异步操作。runAsync没有返回值的场景,如执行一个简单的异步操作。supplyAsync可以获得返回结果的场景,如计算某些数据返回等。同时,这两种方法都可以通过传入executor设置使用的线程池,如果不传,则使用默认的ForkJoinPool.common线程池。
//无返回值 runAsync()
public static CompletableFuture<Void> runAsync(Runnable runnable)
public static CompletableFuture<Void> runAsync(Runnable runnable, Executor executor)
//有返回值supplyAsync()
public static <U> CompletableFuture<U> supplyAsync(Supplier<U> supplier)
public static <U> CompletableFuture<U> supplyAsync(Supplier<U> supplier, Executor executor)
public class CompletableFutureDemo {
public static void main(String[] args) throws ExecutionException, InterruptedException {
//异步调用,无返回值的情况
CompletableFuture<Void> completableFuture = CompletableFuture.runAsync(()->{
System.out.println(Thread.currentThread().getName());
});
completableFuture.get();
//异步调用,有返回值的情况
CompletableFuture<Integer> future = CompletableFuture.supplyAsync(()->{
System.out.println("构建任务");
return calc(15, 10);
});
System.out.println(future.get());
}
public static Integer calc(Integer a, Integer b) {
try {
//模拟长时间的任务
Thread.sleep(1000);
} catch (InterruptedException e) {
e.printStackTrace();
}
return a / b;
}
}
ForkJoinPool.commonPool-worker-1
构建任务
1
完成时回调
whenComplete
CompletableFuture提供了一种异步编排的功能,使用whenComplete ,当Future任务完成后,调用一个回调。
//whenComplete 不带Async,表示同步,与上一个Future用同一个线程执行
public CompletableFuture<T> whenComplete(BiConsumer<? super T, ? super Throwable> action)
//whenCompleteAsync,异步,表示任务执行完后另起线程异步执行
public CompletableFuture<T> whenCompleteAsync(BiConsumer<? super T, ? super Throwable> action)
//whenCompleteAsync,异步,表示任务执行完后交给指定线程池异步执行
public CompletableFuture<T> whenCompleteAsync(BiConsumer<? super T, ? super Throwable>action, Executor executor)
whenComplete可以处理正常和异常的计算结果,exceptionally处理异常情况。whenComplete是执行当前任务的线程继续执行whenComplete的任务。whenCompleteAsync是提交给其他线程池来执行。
whenComplete的参数是一个BiConsumer, BiConsumer接收两个参数,一个是上次任务的结果,一个是上次任务的异常。所以可以获取到任务的执行结果和异常,同时,CompletableFuture可以对异常进行处理,当发生异常时,给定一个默认返回。
CompletableFuture<Integer> future = CompletableFuture.supplyAsync(()->{
return 10/0;
}).whenComplete((res, exception)->{
System.out.println(res);
System.out.println(exception);
}).exceptionally(throwable -> 10);
System.out.println(future.get());
null
java.util.concurrent.CompletionException: java.lang.ArithmeticException: / by zero
10
handle
handle方法和whenComplete大致相同。
public <U> CompletableFuture<U> handle(BiFunction<? super T, Throwable, ? extends U> fn)
public <U> CompletableFuture<U> handleAsync(BiFunction<? super T, Throwable, ? extends U> fn)
public <U> CompletableFuture<U> handleAsync(BiFunction<? super T, Throwable, ? extends U> fn, Executor executor)
不同在于whenComplete传入的是BiConsumer, 而handle方法传入的是BiFunction。懂了!就是handle方法有返回值,可以对CompletableFuture任务的执行结果做处理,得到新结果返回,而whenComplete不能。
CompletableFuture<Integer> future2 = CompletableFuture.supplyAsync(()->{
return 10/2;
}).handle((res, exception)->{
if (res!=null) {
//handle方法可以修改结果返回
return res * res;
}
return 0;
});
System.out.println(future2.get()); //get()返回结果25
任务串行
串行即一个任务完成后继续执行下一个任务。CompletableFuture提供了下面几种方法。
thenRun/thenRunAsync
一个任务执行完后开始执行后面的任务,我们可以看到传入的参数是个Runnbale, 多以后面的任务不依赖前面的任务执行结果。
// 与前任务是要同一线程
public CompletableFuture<Void> thenRun(Runnable action)
// 另起一线程执行
public CompletableFuture<Void> thenRunAsync(Runnable action)
public CompletableFuture<Void> thenRunAsync(Runnable action, Executor executor)
例如
CompletableFuture.runAsync(()->{
System.out.println("task 1 finish!");
}).thenRun(()->{
System.out.println("task 2 finish!");
}).get();
thenAccept/thenAcceptAsync
thenAccept与runAsync的区别在于它的参数是一个Consumer,可以回去上一个任务的返回结果。
public CompletableFuture<Void> thenAccept(Consumer<? super T> action)
public CompletableFuture<Void> thenAcceptAsync(Consumer<? super T> action)
public CompletableFuture<Void> thenAcceptAsync(Consumer<? super T> action,Executor executor)
CompletableFuture.supplyAsync(()->{
System.out.println("task 1 finish!");
return "Hello";
}).thenAccept((res)->{
System.out.println("this is task 2, task 1 return :"+res);
}).get();
----------Console print---------------
task 1 finish!
this is task 2, task 1 return :Hello
thenApply/thenApplyAsync
似曾相识。thenApply与thenAccept的区别,就是有返回值。
public <U> CompletableFuture<U> thenApply(Function<? super T,? extends U> fn)
public <U> CompletableFuture<U> thenApplyAsync(Function<? super T,? extends U> fn)
public <U> CompletableFuture<U> thenApplyAsync(Function<? super T,? extends U> fn, Executor executor)
String s = CompletableFuture.supplyAsync(() -> {
System.out.println("task 1 finish!");
return "Hello";
}).thenApply((res) -> {
System.out.println("this is task 2, task 1 return :" + res);
return res + " world";
}).get();
System.out.println(s);
----------Console print---------------
task 1 finish!
this is task 2, task 1 return :Hello
Hello world
两任务组合
runAfterBoth/runAfterBothAsync
组合两个CompletableFuture,执行不需要依赖他们的结果,当连个CompletableFuture都执行完后,执行action。
//套路都一样,这里就只粘上自定义线程池的runAfterBothAsy了。
public CompletableFuture<Void> runAfterBothAsync(CompletionStage<?> other, Runnable action, Executor executor)
例如
CompletableFuture.runAsync(()->{
System.out.println("CompletableFuture 2.");
}).runAfterBoth(CompletableFuture.runAsync(()->{
System.out.println("CompletableFuture 1.");
}), ()->{
System.out.println("all finish!");
}).join();
thenAcceptBoth/thenAcceptBothAsync
组合两个CompletableFuture, 获取他们的返回结果,然后执行action,无返回值。
public <U> CompletableFuture<Void> thenAcceptBothAsync(CompletionStage<? extends U> other,BiConsumer<? super T, ? super U> action, Executor executor)
CompletableFuture.supplyAsync(()->{
System.out.println("CompletableFuture 1.");
return 1;
}).thenAcceptBoth(CompletableFuture.supplyAsync(()->{
System.out.println("CompletableFuture 2.");
return 2;
}), (f1, f2)->{
System.out.println("Result of CompletableFuture 1 is "+f1);
System.out.println("Result of CompletableFuture 2 is "+f2);
System.out.println("all finish!");
}).join();
----------------Console Print-----------------
CompletableFuture 1.
CompletableFuture 2.
Result of CompletableFuture 1 is 1
Result of CompletableFuture 2 is 2
all finish!
thenCombine/thenCombineAsync
组合两个CompletableFuture, 获取他们的返回结果,然后执行action,有返回值。
public <U,V> CompletableFuture<V> thenCombineAsync(
CompletionStage<? extends U> other,
BiFunction<? super T,? super U,? extends V> fn, Executor executor)
看到这,我相信也不用在写demo了...
runAfterEither/runAfterEitherAsync
组合两个CompletableFuture,当其中一个CompletableFuture都执行完后,再执行action。
public CompletableFuture<Void> runAfterEither(CompletionStage<?> other,Runnable action)
public CompletableFuture<Void> runAfterEitherAsync(CompletionStage<?> other,Runnable action)
public CompletableFuture<Void> runAfterEitherAsync(CompletionStage<?> other,Runnable action,Executor executor)
acceptEither/acceptEitherAsync
组合两个CompletableFuture,当其中一个CompletableFuture都执行完后,可以接受他们的返回结果,再执行action,无返回值。
public CompletableFuture<Void> acceptEither(
CompletionStage<? extends T> other, Consumer<? super T> action)
public CompletableFuture<Void> acceptEitherAsync(
CompletionStage<? extends T> other, Consumer<? super T> action)
public CompletableFuture<Void> acceptEitherAsync(
CompletionStage<? extends T> other, Consumer<? super T> action,
Executor executor)
applyToEither/applyToEitherAsync
组合两个CompletableFuture,当其中一个CompletableFuture都执行完后,可以接受他们的返回结果,再执行action,无返回值。
public <U> CompletableFuture<U> applyToEither(
CompletionStage<? extends T> other, Function<? super T, U> fn)
public <U> CompletableFuture<U> applyToEitherAsync(
CompletionStage<? extends T> other, Function<? super T, U> fn)
public <U> CompletableFuture<U> applyToEitherAsync(
CompletionStage<? extends T> other, Function<? super T, U> fn,
Executor executor)
多任务
上面都是两个任务,如果需要多个怎么办?
allOf是多个任务都完成后再执行后面的操作;anyOf是任意一个任务完成,就可以执行后面的操作。
public static CompletableFuture<Void> allOf(CompletableFuture<?>... cfs)
public static CompletableFuture<Object> anyOf(CompletableFuture<?>... cfs)
例如
CompletableFuture exportSheet1 = CompletableFuture.runAsync(()->{
System.out.println("export sheet1");
});
CompletableFuture exportSheet2 = CompletableFuture.runAsync(()->{
System.out.println("export sheet2");
});
CompletableFuture exportSheet3 = CompletableFuture.runAsync(()->{
System.out.println("export sheet3");
});
CompletableFuture.allOf(exportSheet1, exportSheet2, exportSheet3).join();
总结
CompletableFuture功能还是挺强大的,提供的方法众多,我们这样看起来,无外乎就是异步任务有无返回结果,任务需不需要拿到上个任务的返回结果,对需不需要返回结果再处理都返回。总结到这里,希望对大家有所帮助。