SpringBoot @Async异步注解上下文透传

程序员考拉

共 21160字,需浏览 43分钟

 ·

2021-07-09 14:55

公众号关注 “GitHub今日热榜
设为 “星标”,带你挖掘更多开发神器!






上一篇文章说到,之前使用了@Async注解,子线程无法获取到上下文信息,导致流量无法打到灰度,然后改成 线程池的方式,每次调用异步调用的时候都手动透传 上下文(硬编码)解决了问题。


后面查阅了资料,找到了方案不用每次硬编码,来上下文透传数据了。


方案一:


继承线程池,重写相应的方法,透传上下文。


方案二:(推荐)


线程池ThreadPoolTaskExecutor,有一个TaskDecorator装饰器,实现这个接口,透传上下文。


方案一:继承线程池,重写相应的方法,透传上下文。


1、ThreadPoolTaskExecutor  spring封装的线程池


@Bean(ExecutorConstant.simpleExecutor_3)
    public Executor asyncExecutor3() {
        MyThreadPoolTaskExecutor taskExecutor = new MyThreadPoolTaskExecutor();
        taskExecutor.setCorePoolSize(corePoolSize);
        taskExecutor.setMaxPoolSize(maxPoolSize);
        taskExecutor.setQueueCapacity(queueCapacity);
        taskExecutor.setThreadNamePrefix(threadNamePrefix_3);
        taskExecutor.initialize();
        return taskExecutor;
    }

    //------- 继承父类 重写对应的方法 start
    class MyCallable<T> implements Callable<T> {
        private Callable<T> task;
        private RequestAttributes context;

        public MyCallable(Callable<T> task, RequestAttributes context) {
            this.task = task;
            this.context = context;
        }

        @Override
        public T call() throws Exception {
            if (context != null) {
                RequestContextHolder.setRequestAttributes(context);
            }

            try {
                return task.call();
            } finally {
                RequestContextHolder.resetRequestAttributes();
            }
        }
    }
    class MyThreadPoolTaskExecutor extends ThreadPoolTaskExecutor{

        @Override
        public <T> Future<T> submit(Callable<T> task) {
            return super.submit(new MyCallable(task, RequestContextHolder.currentRequestAttributes()));
        }

        @Override
        public <T> ListenableFuture<T> submitListenable(Callable<T> task) {
            return super.submitListenable(new MyCallable(task, RequestContextHolder.currentRequestAttributes()));
        }
    }
    //------- 继承父类 重写对应的方法 end


1、MyCallable是继承Callable,创建MyCallable对象的时候已经把Attributes对象赋值给属性context了(创建MyCallable对象的时候因为实在当前主线程创建的,所以是能获取到请求的Attributes),在执行call方法前,先执行了RequestContextHolder.setRequestAttributes(context); 【把这个MyCallable对象的属性context 设置到setRequestAttributes中】 所以在执行具体业务时,当前线程(子线程)就能取得主线程的Attributes


2、MyThreadPoolTaskExecutor类是继承了ThreadPoolTaskExecutor 重写了submit和submitListenable方法


为什么是重写submit和submitListenable这两个方法?


@Async AOP源码的方法位置是在:AsyncExecutionInterceptor.invoke

doSubmit方法能看出来


无返回值调用的是线程池方法:submit()

有返回值,根据不同的返回类型也知道:


  1. 返回值类型是:Future.class 调用的是方法:submit()

  2. 返回值类型是:ListenableFuture.class 调用的方法是:submitListenable(task)

  3. 返回值类型是:CompletableFuture.class调用的是CompletableFuture.supplyAsync这个在异步注解中暂时用不上的,就不考虑重写了。


public Object invoke(final MethodInvocation invocation) throws Throwable {
    Class<?> targetClass = (invocation.getThis() != null ? AopUtils.getTargetClass(invocation.getThis()) : null);
    Method specificMethod = ClassUtils.getMostSpecificMethod(invocation.getMethod(), targetClass);
    final Method userDeclaredMethod = BridgeMethodResolver.findBridgedMethod(specificMethod);

    AsyncTaskExecutor executor = determineAsyncExecutor(userDeclaredMethod);
    if (executor == null) {
      throw new IllegalStateException(
          "No executor specified and no default executor set on AsyncExecutionInterceptor either");
    }

    Callable<Object> task = () -> {
      try {
        Object result = invocation.proceed();
        if (result instanceof Future) {
          return ((Future<?>) result).get();
        }
      }
      catch (ExecutionException ex) {
        handleError(ex.getCause(), userDeclaredMethod, invocation.getArguments());
      }
      catch (Throwable ex) {
        handleError(ex, userDeclaredMethod, invocation.getArguments());
      }
      return null;
    };

    return doSubmit(task, executor, invocation.getMethod().getReturnType());
  }

  @Nullable
  protected Object doSubmit(Callable<Object> task, AsyncTaskExecutor executor, Class<?> returnType) {
    if (CompletableFuture.class.isAssignableFrom(returnType)) {
      return CompletableFuture.supplyAsync(() -> {
        try {
          return task.call();
        }
        catch (Throwable ex) {
          throw new CompletionException(ex);
        }
      }, executor);
    }
    else if (ListenableFuture.class.isAssignableFrom(returnType)) {
      return ((AsyncListenableTaskExecutor) executor).submitListenable(task);
    }
    else if (Future.class.isAssignableFrom(returnType)) {
      return executor.submit(task);
    }
    else {
      executor.submit(task);
      return null;
    }
  }


2、ThreadPoolExecutor 原生线程池


ThreadPoolExecutor线程池代码如下:


//------- ThreadPoolExecutor 继承父类 重写对应的方法 start
    class MyRunnable implements Runnable {
        private Runnable runnable;
        private RequestAttributes context;

        public MyRunnable(Runnable runnable, RequestAttributes context) {
            this.runnable = runnable;
            this.context = context;
        }

        @Override
        public void run() {
            if (context != null) {
                RequestContextHolder.setRequestAttributes(context);
            }
            try {
                runnable.run();
            } finally {
                RequestContextHolder.resetRequestAttributes();
            }
        }
    }

    class MyThreadPoolExecutor extends ThreadPoolExecutor{
        @Override
        public void execute(Runnable command) {
            if(!(command instanceof MyRunnable)){
                command = new MyRunnable(command,RequestContextHolder.currentRequestAttributes())
            }
            super.execute(command);
        }

        public MyThreadPoolExecutor(int corePoolSize, int maximumPoolSize, long keepAliveTime, TimeUnit unit, BlockingQueue<Runnable> workQueue) {
            super(corePoolSize, maximumPoolSize, keepAliveTime, unit, workQueue);
        }

        public MyThreadPoolExecutor(int corePoolSize, int maximumPoolSize, long keepAliveTime, TimeUnit unit, BlockingQueue<Runnable> workQueue, ThreadFactory threadFactory) {
            super(corePoolSize, maximumPoolSize, keepAliveTime, unit, workQueue, threadFactory);
        }

        public MyThreadPoolExecutor(int corePoolSize, int maximumPoolSize, long keepAliveTime, TimeUnit unit, BlockingQueue<Runnable> workQueue, RejectedExecutionHandler handler) {
            super(corePoolSize, maximumPoolSize, keepAliveTime, unit, workQueue, handler);
        }

        public MyThreadPoolExecutor(int corePoolSize, int maximumPoolSize, long keepAliveTime, TimeUnit unit, BlockingQueue<Runnable> workQueue, ThreadFactory threadFactory, RejectedExecutionHandler handler) {
            super(corePoolSize, maximumPoolSize, keepAliveTime, unit, workQueue, threadFactory, handler);
        }
    }
    //------- ThreadPoolExecutor 继承父类 重写对应的方法 end


像ThreadPoolExecutor主要重写execute方法,在启动新线程的时候先把Attributes取到放到MyRunnable对象的一个属性中,MyRunnable在具体执行run方法的时候,把属性Attributes赋值到子线程中,当run方法执行完了在把Attributes清空掉。


为什么只要重写了execute方法就可以了?


ThreadPoolExecutor大家都知道主要是由submit和execute方法来执行的。


看ThreadPoolExecutor类的submit具体执行方法是由父类AbstractExecutorService#submit来实现。


具体代码在下面贴出来了,可以看到submit实际上最后调用的还是execute方法,所以我们重写execute方法就好了。


submit方法路径及源码:


public Future<?> submit(Runnable task) {
        if (task == null) throw new NullPointerException();
        RunnableFuture<Void> ftask = newTaskFor(task, null);
        execute(ftask);
        return ftask;
    }

    /**
     * @throws RejectedExecutionException {@inheritDoc}
     * @throws NullPointerException {@inheritDoc}
     */

    public <T> Future<T> submit(Runnable task, T result) {
        if (task == null) throw new NullPointerException();
        RunnableFuture<T> ftask = newTaskFor(task, result);
        execute(ftask);
        return ftask;
    }

    /**
     * @throws RejectedExecutionException {@inheritDoc}
     * @throws NullPointerException {@inheritDoc}
     */

    public <T> Future<T> submit(Callable<T> task) {
        if (task == null) throw new NullPointerException();
        RunnableFuture<T> ftask = newTaskFor(task);
        execute(ftask);
        return ftask;
    }


方案二:(推荐)ThreadPoolTaskExecutor线程池


实现TaskDecorator接口,把实现类设置到taskExecutor.setTaskDecorator(new MyTaskDecorator());


//------- 实现TaskDecorator 接口 start

    @Bean(ExecutorConstant.simpleExecutor_4)
    public Executor asyncExecutor4() {
        MyThreadPoolTaskExecutor taskExecutor = new MyThreadPoolTaskExecutor();
        taskExecutor.setCorePoolSize(corePoolSize);
        taskExecutor.setMaxPoolSize(maxPoolSize);
        taskExecutor.setQueueCapacity(queueCapacity);
        taskExecutor.setThreadNamePrefix(threadNamePrefix_4);
        taskExecutor.setTaskDecorator(new MyTaskDecorator());
        taskExecutor.initialize();
        return taskExecutor;
    }

    class MyTaskDecorator implements TaskDecorator{

        @Override
        public Runnable decorate(Runnable runnable) {
            try {
                RequestAttributes attributes = RequestContextHolder.getRequestAttributes();
                return () -> {
                    try {
                        RequestContextHolder.setRequestAttributes(attributes);
                        runnable.run();
                    } finally {
                        RequestContextHolder.resetRequestAttributes();
                    }
                };
            } catch (IllegalStateException e) {
                return runnable;
            }
        }
    }
    //------- 实现TaskDecorator 接口 end


为什么设置了setTaskDecorator就能实现透传数据了?


主要还是看taskExecutor.initialize()方法,主要是重写了ThreadPoolExecutor的execute方法,用装饰器模式 增强了Runnable接口,源代码如下:


@Nullable
  private ThreadPoolExecutor threadPoolExecutor;

  //初始化方法
  public void initialize() {
    if (logger.isDebugEnabled()) {
      logger.debug("Initializing ExecutorService" + (this.beanName != null ? " '" + this.beanName + "'" : ""));
    }
    if (!this.threadNamePrefixSet && this.beanName != null) {
      setThreadNamePrefix(this.beanName + "-");
    }
    this.executor = initializeExecutor(this.threadFactory, this.rejectedExecutionHandler);
  }

  @Override
  protected ExecutorService initializeExecutor(
      ThreadFactory threadFactory, RejectedExecutionHandler rejectedExecutionHandler)
 
{

    BlockingQueue<Runnable> queue = createQueue(this.queueCapacity);

    ThreadPoolExecutor executor;

    //判断是否设置了,taskDecorator装饰器
    if (this.taskDecorator != null) {
      executor = new ThreadPoolExecutor(
          this.corePoolSize, this.maxPoolSize, this.keepAliveSeconds, TimeUnit.SECONDS,
          queue, threadFactory, rejectedExecutionHandler) {
        @Override
        public void execute(Runnable command) {
          //执行装饰器方法包装Runnable接口
          Runnable decorated = taskDecorator.decorate(command);
          if (decorated != command) {
            decoratedTaskMap.put(decorated, command);
          }
          super.execute(decorated);
        }
      };
    }
    else {
      executor = new ThreadPoolExecutor(
          this.corePoolSize, this.maxPoolSize, this.keepAliveSeconds, TimeUnit.SECONDS,
          queue, threadFactory, rejectedExecutionHandler);

    }

    if (this.allowCoreThreadTimeOut) {
      executor.allowCoreThreadTimeOut(true);
    }
    //把初始化好的ThreadPoolExecutor线程池赋值给 当前类属性threadPoolExecutor
    this.threadPoolExecutor = executor;
    return executor;
  }


总结


无论是方案1还是方案2,原理都是先在当前线程获取到Attributes,然后把Attributes赋值到Runnable的一个属性中,在起一个子线程后,具体执行run方法的时候,把Attributes设置给当子线程,当run方法执行完了,在清空Attributes。


方案2实现比较优雅,所以推荐使用它。



出处:cnblogs.com/x-kq/p/14911497.html










关注GitHub今日热榜,专注挖掘好用的开发工具,致力于分享优质高效的工具、资源、插件等,助力开发者成长!








点个在看,你最好看



浏览 124
点赞
评论
收藏
分享

手机扫一扫分享

分享
举报
评论
图片
表情
推荐
点赞
评论
收藏
分享

手机扫一扫分享

分享
举报