都在建议你不要直接使用 @Async 注解,为什么?

共 9262字,需浏览 19分钟

 ·

2021-11-18 10:39

来源:cnblogs.com/wlandwl/p/async.html

本文讲述@Async注解,在Spring体系中的应用。本文仅说明@Async注解的应用规则,对于原理,调用逻辑,源码分析,暂不介绍。对于异步方法调用,从Spring3开始提供了@Async注解,该注解可以被标注在方法上,以便异步地调用该方法。调用者将在调用时立即返回,方法的实际执行将提交给Spring TaskExecutor的任务中,由指定的线程池中的线程执行。

在项目应用中,@Async调用线程池,推荐使用自定义线程池的模式。自定义线程池常用方案:重新实现接口AsyncConfigurer。

简介

应用场景

同步: 同步就是整个处理过程顺序执行,当各个过程都执行完毕,并返回结果。

异步: 异步调用则是只是发送了调用的指令,调用者无需等待被调用的方法完全执行完毕;而是继续执行下面的流程。

例如, 在某个调用中,需要顺序调用 A, B, C三个过程方法;如他们都是同步调用,则需要将他们都顺序执行完毕之后,方算作过程执行完毕;如B为一个异步的调用方法,则在执行完A之后,调用B,并不等待B完成,而是执行开始调用C,待C执行完毕之后,就意味着这个过程执行完毕了。

在Java中,一般在处理类似的场景之时,都是基于创建独立的线程去完成相应的异步调用逻辑,通过主线程和不同的业务子线程之间的执行流程,从而在启动独立的线程之后,主线程继续执行而不会产生停滞等待的情况。

Spring 已经实现的线程池

  1. SimpleAsyncTaskExecutor:不是真的线程池,这个类不重用线程,默认每次调用都会创建一个新的线程。
  2. SyncTaskExecutor:这个类没有实现异步调用,只是一个同步操作。只适用于不需要多线程的地方。
  3. ConcurrentTaskExecutor:Executor的适配类,不推荐使用。如果ThreadPoolTaskExecutor不满足要求时,才用考虑使用这个类。
  4. SimpleThreadPoolTaskExecutor:是Quartz的SimpleThreadPool的类。线程池同时被quartz和非quartz使用,才需要使用此类。
  5. ThreadPoolTaskExecutor :最常使用,推荐。其实质是对java.util.concurrent.ThreadPoolExecutor的包装。

异步的方法有:

  1. 最简单的异步调用,返回值为void
  2. 带参数的异步调用,异步方法可以传入参数
  3. 存在返回值,常调用返回Future

Spring中启用@Async

// 基于Java配置的启用方式:
@Configuration
@EnableAsync
public class SpringAsyncConfig { ... }

// Spring boot启用:
@EnableAsync
@EnableTransactionManagement
public class SettlementApplication {
    public static void main(String[] args) {
        SpringApplication.run(SettlementApplication.classargs);
    }
}

@Async应用默认线程池

Spring应用默认的线程池,指在@Async注解在使用时,不指定线程池的名称。查看源码,@Async的默认线程池为SimpleAsyncTaskExecutor。

无返回值调用

基于@Async无返回值调用,直接在使用类,使用方法(建议在使用方法)上,加上注解。若需要抛出异常,需手动new一个异常抛出。

/**
     * 带参数的异步调用 异步方法可以传入参数
     *  对于返回值是void,异常会被AsyncUncaughtExceptionHandler处理掉
     * @param s
     */

    @Async
    public void asyncInvokeWithException(String s) {
        log.info("asyncInvokeWithParameter, parementer={}", s);
        throw new IllegalArgumentException(s);
    }

有返回值Future调用

/**
     * 异常调用返回Future
     *  对于返回值是Future,不会被AsyncUncaughtExceptionHandler处理,需要我们在方法中捕获异常并处理
     *  或者在调用方在调用Futrue.get时捕获异常进行处理
     *
     * @param i
     * @return
     */

    @Async
    public Future asyncInvokeReturnFuture(int i) {
        log.info("asyncInvokeReturnFuture, parementer={}", i);
        Future future;
        try {
            Thread.sleep(1000 * 1);
            future = new AsyncResult("success:" + i);
            throw new IllegalArgumentException("a");
        } catch (InterruptedException e) {
            future = new AsyncResult("error");
        } catch(IllegalArgumentException e){
            future = new AsyncResult("error-IllegalArgumentException");
        }
        return future;
    }

有返回值CompletableFuture调用

CompletableFuture 并不使用@Async注解,可达到调用系统线程池处理业务的功能。

JDK5新增了Future接口,用于描述一个异步计算的结果。虽然 Future 以及相关使用方法提供了异步执行任务的能力,但是对于结果的获取却是很不方便,只能通过阻塞或者轮询的方式得到任务的结果。阻塞的方式显然和我们的异步编程的初衷相违背,轮询的方式又会耗费无谓的 CPU 资源,而且也不能及时地得到计算结果。

  • CompletionStage代表异步计算过程中的某一个阶段,一个阶段完成以后可能会触发另外一个阶段
  • 一个阶段的计算执行可以是一个Function,Consumer或者Runnable。比如:stage.thenApply(x -> square(x)).thenAccept(x -> System.out.print(x)).thenRun(() -> System.out.println())
  • 一个阶段的执行可能是被单个阶段的完成触发,也可能是由多个阶段一起触发

在Java8中,CompletableFuture提供了非常强大的Future的扩展功能,可以帮助我们简化异步编程的复杂性,并且提供了函数式编程的能力,可以通过回调的方式处理计算结果,也提供了转换和组合 CompletableFuture 的方法。

  • 它可能代表一个明确完成的Future,也有可能代表一个完成阶段( CompletionStage ),它支持在计算完成以后触发一些函数或执行某些动作。
  • 它实现了Future和CompletionStage接口
/**
     * 数据查询线程池
     */

    private static final ThreadPoolExecutor SELECT_POOL_EXECUTOR = new ThreadPoolExecutor(10205000,
            TimeUnit.MILLISECONDS, new LinkedBlockingQueue<>(1024), new ThreadFactoryBuilder().setNameFormat("selectThreadPoolExecutor-%d").build());

// tradeMapper.countTradeLog(tradeSearchBean)方法表示,获取数量,返回值为int
 // 获取总条数
        CompletableFuture countFuture = CompletableFuture
                .supplyAsync(() -> tradeMapper.countTradeLog(tradeSearchBean), SELECT_POOL_EXECUTOR);
// 同步阻塞
    CompletableFuture.allOf(countFuture).join();
// 获取结果
 int count = countFuture.get();

默认线程池的弊端

在线程池应用中,参考阿里巴巴java开发规范:线程池不允许使用Executors去创建,不允许使用系统默认的线程池,推荐通过ThreadPoolExecutor的方式,这样的处理方式让开发的工程师更加明确线程池的运行规则,规避资源耗尽的风险。Executors各个方法的弊端:

  • newFixedThreadPool和newSingleThreadExecutor:主要问题是堆积的请求处理队列可能会耗费非常大的内存,甚至OOM。
  • newCachedThreadPool和newScheduledThreadPool:要问题是线程数最大数是Integer.MAX_VALUE,可能会创建数量非常多的线程,甚至OOM。

@Async默认异步配置使用的是SimpleAsyncTaskExecutor,该线程池默认来一个任务创建一个线程,若系统中不断的创建线程,最终会导致系统占用内存过高,引发OutOfMemoryError错误。

针对线程创建问题,SimpleAsyncTaskExecutor提供了限流机制,通过concurrencyLimit属性来控制开关,当concurrencyLimit>=0时开启限流机制,默认关闭限流机制即concurrencyLimit=-1,当关闭情况下,会不断创建新的线程来处理任务。基于默认配置,SimpleAsyncTaskExecutor并不是严格意义的线程池,达不到线程复用的功能。

@Async应用自定义线程池

自定义线程池,可对系统中线程池更加细粒度的控制,方便调整线程池大小配置,线程执行异常控制和处理。在设置系统自定义线程池代替默认线程池时,虽可通过多种模式设置,但替换默认线程池最终产生的线程池有且只能设置一个(不能设置多个类继承AsyncConfigurer)。自定义线程池有如下模式:

  • 重新实现接口AsyncConfigurer
  • 继承AsyncConfigurerSupport
  • 配置由自定义的TaskExecutor替代内置的任务执行器

通过查看Spring源码关于@Async的默认调用规则,会优先查询源码中实现AsyncConfigurer这个接口的类,实现这个接口的类为AsyncConfigurerSupport。但默认配置的线程池和异步处理方法均为空,所以,无论是继承或者重新实现接口,都需指定一个线程池。且重新实现 public Executor getAsyncExecutor()方法。

实现接口AsyncConfigurer

@Configuration
 public class AsyncConfiguration implements AsyncConfigurer {
     @Bean("kingAsyncExecutor")
     public ThreadPoolTaskExecutor executor() {
         ThreadPoolTaskExecutor executor = new ThreadPoolTaskExecutor();
         int corePoolSize = 10;
         executor.setCorePoolSize(corePoolSize);
         int maxPoolSize = 50;
         executor.setMaxPoolSize(maxPoolSize);
         int queueCapacity = 10;
         executor.setQueueCapacity(queueCapacity);
         executor.setRejectedExecutionHandler(new ThreadPoolExecutor.CallerRunsPolicy());
         String threadNamePrefix = "kingDeeAsyncExecutor-";
         executor.setThreadNamePrefix(threadNamePrefix);
         executor.setWaitForTasksToCompleteOnShutdown(true);
         // 使用自定义的跨线程的请求级别线程工厂类19         int awaitTerminationSeconds = 5;
         executor.setAwaitTerminationSeconds(awaitTerminationSeconds);
         executor.initialize();
         return executor;
     }
 
     @Override
     public Executor getAsyncExecutor() {
         return executor();
     }
 
     @Override
     public AsyncUncaughtExceptionHandler getAsyncUncaughtExceptionHandler() {
         return (ex, method, params) -> ErrorLogger.getInstance().log(String.format("执行异步任务'%s'", method), ex);
     }
 }

继承AsyncConfigurerSupport

@Configuration  
@EnableAsync  
class SpringAsyncConfigurer extends AsyncConfigurerSupport {  
  
    @Bean  
    public ThreadPoolTaskExecutor asyncExecutor() {  
        ThreadPoolTaskExecutor threadPool = new ThreadPoolTaskExecutor();  
        threadPool.setCorePoolSize(3);  
        threadPool.setMaxPoolSize(3);  
        threadPool.setWaitForTasksToCompleteOnShutdown(true);  
        threadPool.setAwaitTerminationSeconds(60 * 15);  
        return threadPool;  
    }  
  
    @Override  
    public Executor getAsyncExecutor() {  
        return asyncExecutor;  
}  

  @Override  
    public AsyncUncaughtExceptionHandler getAsyncUncaughtExceptionHandler() {
    return (ex, method, params) -> ErrorLogger.getInstance().log(String.format("执行异步任务'%s'", method), ex);
}
}

配置自定义的TaskExecutor

由于AsyncConfigurer的默认线程池在源码中为空,Spring通过beanFactory.getBean(TaskExecutor.class)先查看是否有线程池,未配置时,通过beanFactory.getBean(DEFAULT_TASK_EXECUTOR_BEAN_NAME, Executor.class),又查询是否存在默认名称为TaskExecutor的线程池。

所以可以在项目中,定义名称为TaskExecutor的bean生成一个默认线程池。也可不指定线程池的名称,申明一个线程池,本身底层是基于TaskExecutor.class便可。

比如:

Executor.class:ThreadPoolExecutorAdapter->ThreadPoolExecutor->AbstractExecutorService->ExecutorService->Executor

这样的模式,最终底层为Executor.class,在替换默认的线程池时,需设置默认的线程池名称为TaskExecutor

TaskExecutor.class:ThreadPoolTaskExecutor->SchedulingTaskExecutor->AsyncTaskExecutor->TaskExecutor

这样的模式,最终底层为TaskExecutor.class,在替换默认的线程池时,可不指定线程池名称。

@EnableAsync
 @Configuration
 public class TaskPoolConfig {
     @Bean(name = AsyncExecutionAspectSupport.DEFAULT_TASK_EXECUTOR_BEAN_NAME)
     public Executor taskExecutor() {
         ThreadPoolTaskExecutor executor = new ThreadPoolTaskExecutor();
          //核心线程池大小
         executor.setCorePoolSize(10);
         //最大线程数
         executor.setMaxPoolSize(20);
         //队列容量
         executor.setQueueCapacity(200);
         //活跃时间
         executor.setKeepAliveSeconds(60);
         //线程名字前缀
         executor.setThreadNamePrefix("taskExecutor-");
         executor.setRejectedExecutionHandler(new ThreadPoolExecutor.CallerRunsPolicy());
         return executor;
     }
    @Bean(name = "new_task")
     public Executor taskExecutor() {
         ThreadPoolTaskExecutor executor = new ThreadPoolTaskExecutor();
          //核心线程池大小
         executor.setCorePoolSize(10);
         //最大线程数
         executor.setMaxPoolSize(20);
         //队列容量
         executor.setQueueCapacity(200);
         //活跃时间
         executor.setKeepAliveSeconds(60);
         //线程名字前缀
         executor.setThreadNamePrefix("taskExecutor-");
         executor.setRejectedExecutionHandler(new ThreadPoolExecutor.CallerRunsPolicy());
         return executor;
     }
 }

多个线程池

@Async注解,使用系统默认或者自定义的线程池(代替默认线程池)。可在项目中设置多个线程池,在异步调用时,指明需要调用的线程池名称,如@Async("new_task")

@Async部分重要源码解析

源码-获取线程池方法

源码-设置默认线程池defaultExecutor,默认是空的,当重新实现接口AsyncConfigurer的getAsyncExecutor()时,可以设置默认的线程池。

源码-都没有找到项目中设置的默认线程池时,采用spring 默认的线程池

程序汪资料链接

程序汪接的7个私活都在这里,经验整理

Java项目分享 最新整理全集,找项目不累啦 06版

堪称神级的Spring Boot手册,从基础入门到实战进阶

卧槽!字节跳动《算法中文手册》火了,完整版 PDF 开放下载!

卧槽!阿里大佬总结的《图解Java》火了,完整版PDF开放下载!

字节跳动总结的设计模式 PDF 火了,完整版开放下载!


欢迎添加程序汪个人微信 itwang009  进粉丝群或围观朋友圈

浏览 13
点赞
评论
收藏
分享

手机扫一扫分享

分享
举报
评论
图片
表情
推荐
点赞
评论
收藏
分享

手机扫一扫分享

分享
举报