用了SpringBoot的发布订阅模式,代码看着高级多了

共 14867字,需浏览 30分钟

 ·

2022-11-27 19:09




大家好,我是小富~


在项目里,经常会有一些主线业务之外的其它业务,比如,下单之后,发送通知、监控埋点、记录日志……


这些非核心业务,如果全部一梭子写下去,有两个问题,一个是业务耦合,一个是串行耗时。


e31e1e582e5c56da708ec1e25eb8f2c8.webp下单之后的逻辑

所以,一般在开发的时候,都会把这些操作å抽象成观察者模式,也就是发布/订阅模式(这里就不讨论观察者模式和发布/订阅模式的不同),而且一般会采用多线程的方式来异步执行这些观察者方法。


93076387489c1273f85afef1b2a7db83.webp观察者模式

一开始,我们都是自己去写观察者模式。




自己实现观察者模式

e89ae5215b071aa0a6c24b7ead0aeb3d.webp观察者简图



观察者



  • 观察者定义接口



/**
 * @Author: fighter3
 * @Description: 观察者接口
 * @Date: 2022/11/7 11:40 下午
 */

public interface OrderObserver {

    void afterPlaceOrder(PlaceOrderMessage placeOrderMessage);
}



  • 具体观察者




    @Slf4j
    public class OrderMetricsObserver implements OrderObserver {

        @Override
        public void afterPlaceOrder(PlaceOrderMessage placeOrderMessage) {
            log.info("[afterPlaceOrder] metrics");
        }
    }




    @Slf4j
    public class OrderLogObserver implements OrderObserver{

        @Override
        public void afterPlaceOrder(PlaceOrderMessage placeOrderMessage) {
            log.info("[afterPlaceOrder] log.");
        }
    }




    @Slf4j
    public class OrderNotifyObserver implements OrderObserver{

        @Override
        public void afterPlaceOrder(PlaceOrderMessage placeOrderMessage) {
            log.info("[afterPlaceOrder] notify.");
        }
    }



    • 业务通知观察者


    • 日志记录观察者


    • 监控埋点观察者




被观察者



  • 消息实体定义



@Data
public class PlaceOrderMessage implements Serializable {
    /**
     * 订单号
     */

    private String orderId;
    /**
     * 订单状态
     */

    private Integer orderStatus;
    /**
     * 下单用户ID
     */

    private String userId;
    //……
}



  • 被观察者抽象类



public abstract class OrderSubject {
    //定义一个观察者列表
    private List<OrderObserver> orderObserverList = new ArrayList<>();
    //定义一个线程池,这里参数随便写的
    ThreadPoolExecutor threadPoolExecutor = new ThreadPoolExecutor(6126, TimeUnit.SECONDS, new ArrayBlockingQueue<>(30));

    //增加一个观察者
    public void addObserver(OrderObserver o) {
        this.orderObserverList.add(o);
    }

    //删除一个观察者
    public void delObserver(OrderObserver o) {
        this.orderObserverList.remove(o);
    }

    //通知所有观察者
    public void notifyObservers(PlaceOrderMessage placeOrderMessage) {
        for (OrderObserver orderObserver : orderObserverList) {
            //利用多线程异步执行
            threadPoolExecutor.execute(() -> {
                orderObserver.afterPlaceOrder(placeOrderMessage);
            });
        }
    }
}


这里利用了多线程,来异步执行观察者。



  • 被观察者实现类



/**
 * @Author: fighter3
 * @Description: 订单实现类-被观察者实现类
 * @Date: 2022/11/7 11:52 下午
 */

@Service
@Slf4j
public class OrderServiceImpl extends OrderSubject implements OrderService {

    /**
     * 下单
     */

    @Override
    public PlaceOrderResVO placeOrder(PlaceOrderReqVO reqVO) {
        PlaceOrderResVO resVO = new PlaceOrderResVO();
        //添加观察者
        this.addObserver(new OrderMetricsObserver());
        this.addObserver(new OrderLogObserver());
        this.addObserver(new OrderNotifyObserver());
        //通知观察者
        this.notifyObservers(new PlaceOrderMessage());
        log.info("[placeOrder] end.");
        return resVO;
    }
}




测试




    @Test
    @DisplayName("下单")
    void placeOrder() {
        PlaceOrderReqVO placeOrderReqVO = new PlaceOrderReqVO();
        orderService.placeOrder(placeOrderReqVO);
    }



  • 测试执行结果



2022-11-08 00:11:13.617  INFO 20235 --- [pool-1-thread-1] c.f.obverser.OrderMetricsObserver        : [afterPlaceOrder] metrics
2022-11-08 00:11:13.618  INFO 20235 --- [           main] cn.fighter3.obverser.OrderServiceImpl    : [placeOrder] end.
2022-11-08 00:11:13.618  INFO 20235 --- [pool-1-thread-3] c.fighter3.obverser.OrderNotifyObserver  : [afterPlaceOrder] notify.
2022-11-08 00:11:13.617  INFO 20235 --- [pool-1-thread-2] cn.fighter3.obverser.OrderLogObserver    : [afterPlaceOrder] log.


可以看到,观察者是异步执行的。




利用Spring精简

可以看到,观察者模式写起来还是比较简单的,但是既然都用到了Spring来管理Bean的生命周期,代码还可以更精简一些。


1f859c149b8128770c698e4ea5944167.webpSpring精简观察者模式



观察者实现类:定义成Bean



  • OrderLogObserver




    @Slf4j
    @Service
    public class OrderLogObserver implements OrderObserver {

        @Override
        public void afterPlaceOrder(PlaceOrderMessage placeOrderMessage) {
            log.info("[afterPlaceOrder] log.");
        }
    }




  • OrderMetricsObserver





@Slf4j
@Service
public class OrderMetricsObserver implements OrderObserver {

    @Override
    public void afterPlaceOrder(PlaceOrderMessage placeOrderMessage) {
        log.info("[afterPlaceOrder] metrics");
    }
}



  • OrderNotifyObserver



@Slf4j
@Service
public class OrderNotifyObserver implements OrderObserver {

    @Override
    public void afterPlaceOrder(PlaceOrderMessage placeOrderMessage) {
        log.info("[afterPlaceOrder] notify.");
    }
}




被观察者:自动注入Bean



  • OrderSubject




    public abstract class OrderSubject {
        /**
         * 利用Spring的特性直接注入观察者
         */

        @Autowired
        protected List<OrderObserver> orderObserverList;

        //定义一个线程池,这里参数随便写的
        ThreadPoolExecutor threadPoolExecutor = new ThreadPoolExecutor(6126, TimeUnit.SECONDS, new ArrayBlockingQueue<>(30));

        //通知所有观察者
        public void notifyObservers(PlaceOrderMessage placeOrderMessage) {
            for (OrderObserver orderObserver : orderObserverList) {
                //利用多线程异步执行
                threadPoolExecutor.execute(() -> {
                    orderObserver.afterPlaceOrder(placeOrderMessage);
                });
            }
        }
    }




  • OrderServiceImpl





@Service
@Slf4j
public class OrderServiceImpl extends OrderSubject implements OrderService {

    /**
     * 实现类里也要注入一下
     */

    @Autowired
    private List<OrderObserver> orderObserverList;

    /**
     * 下单
     */

    @Override
    public PlaceOrderResVO placeOrder(PlaceOrderReqVO reqVO) {
        PlaceOrderResVO resVO = new PlaceOrderResVO();
        //通知观察者
        this.notifyObservers(new PlaceOrderMessage());
        log.info("[placeOrder] end.");
        return resVO;
    }
}


这样一来,发现被观察者又简洁了很多,但是后来我发现,在SpringBoot项目里,利用Spring事件驱动驱动模型(event)模型来实现,更加地简练。




Spring Event实现发布/订阅模式

Spring Event对发布/订阅模式进行了封装,使用起来更加简单,还是以我们这个场景为例,看看怎么来实现吧。




自定义事件



  • PlaceOrderEvent:继承ApplicationEvent,并重写构造函数。ApplicationEvent是Spring提供的所有应用程序事件扩展类。



public class PlaceOrderEvent extends ApplicationEvent {

    public PlaceOrderEvent(PlaceOrderEventMessage source) {
        super(source);
    }
}



  • PlaceOrderEventMessage:事件消息,定义了事件的消息体。



@Data
public class PlaceOrderEventMessage implements Serializable {
    /**
     * 订单号
     */

    private String orderId;
    /**
     * 订单状态
     */

    private Integer orderStatus;
    /**
     * 下单用户ID
     */

    private String userId;
    //……
}




事件监听者


事件监听者,有两种实现方式,一种是实现ApplicationListener接口,另一种是使用@EventListener注解。


43a560f98906ca9eb614a9c5c8ba8f08.webp事件监听者实现




实现ApplicationListener接口


实现ApplicationListener接口,重写onApplicationEvent方法,将类定义为Bean,这样,一个监听者就完成了。



  • OrderLogListener



@Slf4j
@Service
public class OrderLogListener implements ApplicationListener<PlaceOrderEvent{

    @Override
    public void onApplicationEvent(PlaceOrderEvent event) {
        log.info("[afterPlaceOrder] log.");
    }
}



  • OrderMetricsListener



@Slf4j
@Service
public class OrderMetricsListener implements ApplicationListener<PlaceOrderEvent{

    @Override
    public void onApplicationEvent(PlaceOrderEvent event) {
        log.info("[afterPlaceOrder] metrics");
    }
}



  • OrderNotifyListener



@Slf4j
@Service
public class OrderNotifyListener implements ApplicationListener<PlaceOrderEvent{

    @Override
    public void onApplicationEvent(PlaceOrderEvent event) {
        log.info("[afterPlaceOrder] notify.");
    }
}





使用@EventListener注解


使用@EventListener注解就更简单了,直接在方法上,加上@EventListener注解就行了。



  • OrderLogListener




    @Slf4j
    @Service
    public class OrderLogListener  {

        @EventListener
        public void orderLog(PlaceOrderEvent event) {
            log.info("[afterPlaceOrder] log.");
        }
    }




  • OrderMetricsListener




    @Slf4j
    @Service
    public class OrderMetricsListener {

        @EventListener
        public void metrics(PlaceOrderEvent event) {
            log.info("[afterPlaceOrder] metrics");
        }
    }




  • OrderNotifyListener




    @Slf4j
    @Service
    public class OrderNotifyListener{

        @EventListener
        public void notify(PlaceOrderEvent event) {
            log.info("[afterPlaceOrder] notify.");
        }
    }






异步和自定义线程池


异步执行


异步执行也非常简单,使用Spring的异步注解@Async就可以了。例如:



  • OrderLogListener



@Slf4j
@Service
public class OrderLogListener  {

    @EventListener
    @Async
    public void orderLog(PlaceOrderEvent event) {
        log.info("[afterPlaceOrder] log.");
    }
}


当然,还需要开启异步,SpringBoot项目默认是没有开启异步的,我们需要手动配置开启异步功能,很简单,只需要在配置类上加上@EnableAsync注解就行了,这个注解用于声明启用Spring的异步方法执行功能,需要和@Configuration注解一起使用,也可以直接加在启动类上。




@SpringBootApplication
@EnableAsync
public class DailyApplication {

    public static void main(String[] args) {
        SpringApplication.run(DairlyLearnApplication.classargs);
    }

}


自定义线程池


使用@Async的时候,一般都会自定义线程池,因为@Async的默认线程池为SimpleAsyncTaskExecutor,不是真的线程池,这个类不重用线程,默认每次调用都会创建一个新的线程。


自定义线程池有三种方式:


e35b373ef6774284f47636c44035b1f4.webp@Async自定义线程池

  • 实现接口AsyncConfigurer


  • 继承AsyncConfigurerSupport


  • 配置由自定义的TaskExecutor替代内置的任务执行器

我们来看看三种写法:



  • 实现接口AsyncConfigurer



@Configuration
@Slf4j
public class AsyncConfiguration implements AsyncConfigurer {

    @Bean("fighter3AsyncExecutor")
    public ThreadPoolTaskExecutor executor() {
        //Spring封装的一个线程池
        ThreadPoolTaskExecutor executor = new ThreadPoolTaskExecutor();
        //随便写的一些配置
        executor.setCorePoolSize(10);
        executor.setMaxPoolSize(50);
        executor.setQueueCapacity(30);
        executor.setRejectedExecutionHandler(new ThreadPoolExecutor.CallerRunsPolicy());
        executor.setThreadNamePrefix("fighter3AsyncExecutor-");
        executor.initialize();
        return executor;
    }

    @Override
    public Executor getAsyncExecutor() {
        return executor();
    }

    @Override
    public AsyncUncaughtExceptionHandler getAsyncUncaughtExceptionHandler() {
        return (ex, method, params) -> log.error(String.format("[async] task{} error:", method), ex);
    }
}




  • 继承AsyncConfigurerSupport



@Configuration
@Slf4j
public class SpringAsyncConfigurer extends AsyncConfigurerSupport {

    @Bean
    public ThreadPoolTaskExecutor asyncExecutor() {
        ThreadPoolTaskExecutor threadPool = new ThreadPoolTaskExecutor();
        //随便写的一些配置
        threadPool.setCorePoolSize(10);
        threadPool.setMaxPoolSize(30);
        threadPool.setWaitForTasksToCompleteOnShutdown(true);
        threadPool.setAwaitTerminationSeconds(60 * 15);
        return threadPool;
    }

    @Override
    public Executor getAsyncExecutor() {
        return asyncExecutor();
    }

    @Override
    public AsyncUncaughtExceptionHandler getAsyncUncaughtExceptionHandler() {
        return (ex, method, params) -> log.error(String.format("[async] task{} error:", method), ex);
    }
}



  • 配置自定义的TaskExecutor




    @Slf4j
    @Service
    public class OrderLogListener  {

        @EventListener
        @Async("asyncExecutor")
        public void orderLog(PlaceOrderEvent event) {
            log.info("[afterPlaceOrder] log.");
        }
    }



    • 配置线程池




      @Configuration
      public class TaskPoolConfig {

          @Bean(name = "asyncExecutor")
          public Executor taskExecutor() {
              ThreadPoolTaskExecutor executor = new ThreadPoolTaskExecutor();
              //随便写的一些配置
              executor.setCorePoolSize(10);
              executor.setMaxPoolSize(20);
              executor.setQueueCapacity(200);
              executor.setKeepAliveSeconds(60);
              executor.setThreadNamePrefix("asyncExecutor-");
              executor.setRejectedExecutionHandler(new ThreadPoolExecutor.CallerRunsPolicy());
              return executor;
          }
      }




    • 使用@Async注解的时候,指定线程池,推荐使用这种方式,因为在项目里,尽量做到线程池隔离,不同的任务使用不同的线程池





异步和自定义线程池这一部分只是一些扩展,稍微占了一些篇幅,大家可不要觉得Spring Event用起来很繁琐。






发布事件


发布事件也非常简单,只需要使用Spring 提供的ApplicationEventPublisher来发布自定义事件。



  • OrderServiceImpl




    @Service
    @Slf4j
    public class OrderServiceImpl implements OrderService {
        @Autowired
        private ApplicationEventPublisher applicationEventPublisher;

        /**
         * 下单
         */

        @Override
        public PlaceOrderResVO placeOrder(PlaceOrderReqVO reqVO) {
            log.info("[placeOrder] start.");
            PlaceOrderResVO resVO = new PlaceOrderResVO();
            //消息
            PlaceOrderEventMessage eventMessage = new PlaceOrderEventMessage();
            //发布事件
            applicationEventPublisher.publishEvent(new PlaceOrderEvent(eventMessage));
            log.info("[placeOrder] end.");
            return resVO;
        }
    }



在Idea里查看事件的监听者也比较方便,点击下面图中的图标,就可以查看监听者。


e6654c09ab82dc99a4bbd96f4d543137.webp查看监听者9329db6dae3b677a95468e0e9b50a233.webp监听者



测试


最后,我们还是测试一下。




    @Test
    void placeOrder() {
        PlaceOrderReqVO placeOrderReqVO = new PlaceOrderReqVO();
        orderService.placeOrder(placeOrderReqVO);
    }



  • 执行结果



2022-11-08 10:05:14.415  INFO 22674 --- [           main] c.f.o.event.event.OrderServiceImpl       : [placeOrder] start.
2022-11-08 10:05:14.424  INFO 22674 --- [           main] c.f.o.event.event.OrderServiceImpl       : [placeOrder] end.
2022-11-08 10:05:14.434  INFO 22674 --- [sync-executor-3] c.f.o.event.event.OrderNotifyListener    : [afterPlaceOrder] notify.
2022-11-08 10:05:14.435  INFO 22674 --- [sync-executor-2] c.f.o.event.event.OrderMetricsListener   : [afterPlaceOrder] metrics
2022-11-08 10:05:14.436  INFO 22674 --- [sync-executor-1] c.f.o.event.event.OrderLogListener       : [afterPlaceOrder] log.


可以看到,异步执行,而且用到了我们自定义的线程池。




小结

这篇文章里,从最开始自己实现的观察者模式,再到利用Spring简化的观察者模式,再到使用Spring Event实现发布/订阅模式,可以看到,Spring Event用起来还是比较简单的。除此之外,还有Guava EventBus这样的事件驱动实现,大家更习惯使用哪种呢?




··········  END  ··············



在看



点赞



转发

,是对我最大的鼓励

       
浏览 47
点赞
评论
收藏
分享

手机扫一扫分享

分享
举报
评论
图片
表情
推荐
点赞
评论
收藏
分享

手机扫一扫分享

分享
举报