不会还有人不懂Stream源码吧?10年架构师带你一次性搞懂

愿天堂没有BUG

共 7106字,需浏览 15分钟

 ·

2022-06-20 11:37


Stream源码解析

Spring Cloud Stream(简称SCS)提供了一系列预先定义的注解来声明输入型和输出型Channel,业务系统基于这些Channel与消息中间件进行通信,而不是直接与具体的消息中间件进行通信。跟踪SCS的源码就会发现,Stream有很多外部依赖,最主要的就是Messaging和Integration两个项目,所以在讲解SCS源码前,有必要先介绍一下Messaging和Integration与SCS体系的关系。

SCS的目标是建立一套统一的基于注解的消息发送机制,屏蔽开发人员直接与底层消息系统进行细节交互,而Messaging模块正是Spring框架中用来做统一消息编程模型的,在Messaging中最关键的数据结构是Message,代码如下:


在Messaging模块中消息通道MessageChannel是一个接口类,用于发送Message消息,可以理解为Messaging模块中的标准接口,类似于J2EE中的Servlet接口,具体实现类可以实现具体消息通道。下面是MessageChannel的代码:


在Messaging模块中,消息通道的子接口SubscribableChannel继承了MessageHandler消息处理器:


由MessageHandler真正地消费/处理消息:


Integration基于Spring框架可以实现轻量级的消息传递,也是对Messaging的扩展实现,支持通过声明适配器与SCS集成。它实现了消息 过 滤 、 消 息 转 换 、 消 息 聚 合 和 消 息 分 割 等 功 能 , 提 供 了 对MessageChannel 和 MessageHandler 的 实 现 , 包 括 DirectChannel 、ExecutorChannel、PublishSubscribeChannel,以及MessageFilter、ServiceActivatingHandler、MethodInvokingSplitter等。下面介绍Integration 中 的 两 种 消 息 分 发 器 :DirectChannel 和PublishSubscribeChannel。


从代码可知,DirectChannel内部的UnicastingDispatcher类型分发器会发到对应消息通道的MessageChannel中,从名字也可以看出来,UnicastingDispatcher是一个单播的分发器,只能选择一个消息通道。而PublishSubscribeChannel使用BroadcastingDispatcher作为广播消息分发器,会把消息分发给所有的MessageHandler。

SCS在Integration的集成上进行了封装,通过注解的方式和统一的API进行消息的发送和消费,底层消息中间件的实现细节由各个消息中间件的Binder完成,同时,通过与Spring Boot的ExternalizedConfiguration整合,SCS提供了BindingProperties等外部化配置类,这些具体的配置信息将绑定到具体的消息中间件的配置类中。

SCS的架构流程图

下面是SCS的架构流程图,我们会从几个层次分别讲解其中相关联的源码和它们之间的交互关系。


应用层

SCS为用户提供了三个绑定消息通道的默认实现。

● Sink:通过指定消费消息的目标来标识消息消费者。

● Source:与Sink相反,用于标识消息生产者。

● Processor:集成了Sink和Source的功能,用于标识消息生产者和消费者。


对 应 用 而 言 , 想 要 启 动 SCS 的 功 能 , 需 要 先 启 动 注 解 。

@EnableBinding注解是Stream框架运转的起点,通过这个注解可以实现动态注册BeanDefinition,它会将消息通道绑定到自己修饰的目标实例上,从而让这些实例具备与消息队列进行交互的能力。下面我们看源码:



BindingServiceConfiguration的 作 用 是 完 成BindingService、InputBindingLifecycle、OutputBindingLifecycle等重要Bean的初始化及相关配置文件加载。

● BindingBeansRegistrar的作用是注册声明通道的接口类的BeanDefinition,从而获取这些接口类的实例,并使用这些实例进行消息的发送和接收,具体代码实现如下:



registerBindingTargetBeanDefinitions方法会调用ReflectionUtils类完成扫描所有被注解@Input和@Output标注了的方法,然后注册BeanDefinition。下面是代码示例:




registerBindingTargetsQualifiedBeanDefinitions 是 在 注 册registerBindingTargetBeanDefinitions 时 使 用 的 工 厂 类BeanDefinition,这个工厂类用来生成registerBindingTargetBeanDefinition注册的Bean实例,如下所示:


Stream层

Stream 层 的 BindableProxyFactory 被 初 始 化 为 一 个rootBeanDefinition,并注册为一个FactoryBean,这样Spring容器就可 以 获 得
registerBindingTargetBeanDefinitions 方 法 中 所 注 册 的Bean实例(MessageChannel对象实例)。BindableProxyFactory可以说是SCS实现通道接口类声明及相关类型的核心类,代码如下:




afterPropertiesSet方法会处理所有被@Input和@Output注解的函数 , 并 将 生 成 函 数 返 回 类 型 实 例 存 储 在 BoundTargetHolder 中 ,getBindingTargetName方法会返回
SubscribableChannelBindingTargetFactory 实 例 , 它 会 在createOutput方法中返回一个DirectChannel实例,该实例会被存储起来供BindableProxyFactory使用。

名称为output的BeanDefinition将BindableProxyFactory设置成其实例工厂类,并将outputMessagefunction方法设置成其实例的工厂函数(BeanFactoryMethod)。当Spring容器创建该实例时,会调用BindableProxyFactory 的 outputMessagefunction 方 法 , 由 于BindableProxyFactory实现了Methodlnterceptor接口,所以就调用了其invoke方法。invoke方法会从BindableProxyFactory缓存的Channel实例中匹配符合的实例方法,并反射调用。

BindingService是Stream层获取绑定器和执行绑定任务的一个重要类,首先我们看BindingService的bindProducer方法,代码如下:


在 BindingService 实 现 中 , getBinder 方 法 最 终 会 调 用DefaultBinderFactory中的getBinder方法实现,我们可以看到,DefaultBinderFactory的作用就是获取具体的Binder实现并提供给相应的MessageChannel实例。DefaultBinderFactory的初始化依赖于BinderTypeRegistry获得的BinderType列表。DefaultBinderFactory的getBinder实现中会调用BinderConfiguration获取对应的Binder实例 , 通 过 跟 踪 BinderConfiguration 的 初 始 化 过 程 , 可 以 发 现BinderConfiguration 是 在
BinderFactoryConfiguration 执 行getBinderConfiguration方法时将bindingServiceProperties变量中的BinderProperties与BinderTypeRegistry中的BinderType结合,封装成BinderConfiguration对象。BinderProperties封装了Stream从application.yml文件中读取的关于Binder的配置信息,而BinderType则 是 具 体 Binder 的 实 现 类 信 息 。DefaultBinderFactory 的getBinderInstance实现如下:



这 里 的 getBinderInstance 方 法 中 会 生 成 一 个
ConfigurableApplicationContext 来 创 建 Binder 实 例 , 在 创 建ConfigurableApplicationContext实例时,它会将BinderConfiguration设置到SpringApplicationBuilder中。


ConfigurableApplicationContext调用getBinder方法时,会使用BinderConfiguration的属性和配置生成BinderConfiguration中设置的具体类型的Binder实现。如果你使用的Binder是RabbitMQ,那么对应 的 RabbitServiceAutoConfiguration 会 自 动 初 始 化 并 加 载RabbitMessageChannelBinder实例。

在 Stream 层 对 Binder 实 例 的 初 始 化 工 作 都 完 成 后 , 再 回 到BindingService 的 bindProducer 方 法 实 现 , 它 会 调 用
AbstractMessagChannlBinder 的 doBindProducer 方 法 , 关 键 代 码 如下:


从源码可知,ProvisioningProvider是一个接口,不同的Binder实 现 可 以 根 据 接 口 实 现 各 自 不 同 的 ProducerDestination 和ConsumerDestination,代码如下:


doBindProducer会调用
createProducerMessageHandler方法创建MessageHandler实例,MessageChannel会使用SendingHandler封装后的MessageHandler实例,当有output消息时,将消息发送给最终的Binder实例。

通过上面的步骤,基本上在Stream层就完成了对生产者的绑定操作,消费者的绑定就是将SubscribableChannel与具体的消息队列实现连接,doBindConsumer与doBindProducer流程类似。

首先通过ProvisioningProvider的
provisionConsumerDestination方法创建ConsumerDestination,然后调用createConsumerEndpoint方法创建MessageProducer实例,最后生成DefaultBinding实例,代码如下:



Message/Integrate/消息中间件Binder层

从@Output注解可以看到,Stream框架会使用MessageChannel发送消 息 。通 过 BindingService 的 doBindProducer 方 法 创 建 并 绑 定SendingHandler对象,然后调用handleMessageInternal方法,它会将消息再发送给delegate对象处理。下面是SendingHandler对象的handleMessageInternal方法的代码实现:


delegate是之前在BindingServer中抽象类
AbstractMessageChannelBinder执行的createProducerMessageHandler方法返回的生产者MessageHandler实例。对于RabbitMQ Binder来说,就是rmqpOutboundEndpoint对象,该实 例 将 最 终 调 用 其 handlerMessage 方 法 , 该 方 法 进 一 步 调 用RabbitTemplate的send方法。消息发送流程如下图所示。


消息的接收过程

消息的接收过程可以分为两个阶段:第一个阶段是从RabbitMQ到SubscribableChannel的过程。我们从@Input注解可以看到,Stream框架 会 使 用 SubscribableChannel 接 收 消 息 。第 二 个 阶 段 是 注 解@StreamListener告诉SubscribableChannel如何将消息发送给对应的Sink接收端对应的回调方法。

Spring的RabbitMQ使用InternalConsumer作为默认的消息消费方,当接收到对应消息后,会调用handleDelivery方法将RabbitMQ消息发送给BlockingQueueConsumer中的队列。下面是handleDelivery的源码实现。



AsyncMessageProcessingConsumer类是Runnable类型的,它会消费 阻 塞 队 列 , 并 将 消 息 传 给 AmqpInboundChannelAdapter 。

AmqpInboundChannelAdapter 实 例 是 在 BindingService 构 造createConsumerEndpoint时创建的consumerEndpoint,并将它与对应的Channel绑定。下面是AmqpInboundChannelAdapter的关键代码,即processMessage方法,它会调用MessagingTemplate对象的send方法将消息发送给SubscribableChannel模块。



下面就是消息处理的第二个阶段,就是将SubscribableChannel中的 消 息 发 送 给 指 定 的 方 法 , 主 要 靠 @StreamListener 注 解 实 现 。

@StreamListener是注释在消费方法上的注解,用来接收输入型通道的消 息 , Stream 定 义 了
StreamListenerAnnotationBeanPostProcessor类,用来处理项目中的@SteamListener注解。


StreamListenerAnnotationBeanPostProcessor实现了BeanPostProcessor接口,用来在Bean初始化之前和之后两个时间点对Bean实例进行处理。


postProcessAfterlnitialization是在Bean实例初始化之后被调用 的 方 法 , 它 会 遍 历 Bean 实 例 中 的 所 有 函 数 , 处 理 那 些 被@StreamListener注解修饰的函数。


afterSingletonsInstantiated方法会遍历mappedListenerMethods 对 应 的 所 有 Entry 对 象 , 为 每 一 个StreamListenerHandlerMethodMapping 创 建 一 个 MessageHandler 实例。然后根据条件生成DispatchingStreamListenerMessageHandler并注册给SubscribableChannel。

下 面 是
StreamListenerAnnotationBeanPostProcessor 的 代 码 实现:



当 SubscribableChannel 接 收 到 消 息 后 , 会 调 用
DispatchingStreamListenerMessageHandler类的handleRequestMessage方法,该方法会调用ConditionalStreamListenerHandler的handleMessage方法。

findMatchingHandlers方法根据
ConditionalStreamListenerHandler 的 Expression 实 例 来 判 断ConditionalStreamListenerHandler是否适合处理当前这个消息,最终消息经过InvocableHandlerMethod传递给对应的函数。SCS消费消息的整体流程如下图所示。


本文给大家讲解的内容是MOM异步通信,Stream源码解析

  1. 下篇文章给大家讲解的内容是MOM异步通信,Stream应用进阶

  2. 觉得文章不错的朋友可以转发此文关注小编;

  3. 感谢大家的支持!

本文就是愿天堂没有BUG给大家分享的内容,大家有收获的话可以分享下,想学习更多的话可以到微信公众号里找我,我等你哦。

浏览 26
点赞
评论
收藏
分享

手机扫一扫分享

分享
举报
评论
图片
表情
推荐
点赞
评论
收藏
分享

手机扫一扫分享

分享
举报