Stream源标识符解析

Spring Cloud Stream(全称SCS)提供更多了一连串事先表述的注释来新闻稿输入型和输入型Channel,业务控制系统如前所述那些Channel与最新NSS展开通讯,而并非间接与具体内容的最新NSS展开通讯。追踪SCS的源标识符就会辨认出,Stream有许多内部倚赖,最主要的是Messaging和Integration三个工程项目,因此在传授SCS源标识符前,有必要性先如是说呵呵Messaging和Integration与SCS管理体系的亲密关系。

SCS的最终目标是创建两套标准化的如前所述注释的最新消息推送监督机制,过滤开发者间接与下层最新消息控制系统展开技术细节可视化,而Messaging组件便是Spring架构中用以做标准化最新消息程式设计数学模型的,在Messaging中最关键性的计算机程序是Message,标识符如下表所示:

在Messaging组件中最新消息地下通道MessageChannel是两个USB类,用作推送Message最新消息,能认知为Messaging组件中的标准USB,近似于J2EE中的ServletUSB,具体内容同时实现类能同时实现具体内容最新消息地下通道。上面是MessageChannel的标识符:

在Messaging组件中,最新消息地下通道的子USBSubscribableChannel承继了MessageHandler最新消息CPU:

由MessageHandler真正地消费/处理最新消息:

Integration如前所述Spring架构能同时实现轻量级的最新消息传递,也是对Messaging的扩展同时实现,支持通过新闻稿适配器与SCS集成。它同时实现了最新消息 过 滤 、 消 息 转 换 、 消 息 聚 合 和 消 息 分 割 等 功 能 , 提 供 了 对MessageChannel 和 MessageHandler 的 实 现 , 包 括 DirectChannel 、ExecutorChannel、PublishSubscribeChannel,以及MessageFilter、ServiceActivatingHandler、MethodInvokingSplitter等。上面如是说Integration 中 的 两 种 消 息 分 发 器 : DirectChannel 和PublishSubscribeChannel。

从标识符可知,DirectChannel内部的UnicastingDispatcher类型分发器会发到对应最新消息地下通道的MessageChannel中,从名字也能看出来,UnicastingDispatcher是两个单播的分发器,只能选择两个最新消息地下通道。而PublishSubscribeChannel使用BroadcastingDispatcher作为广播最新消息分发器,会把最新消息分发给所有的MessageHandler。

SCS在Integration的集成上展开了封装,通过注释的方式和标准化的API展开最新消息的推送和消费,下层最新NSS的同时实现技术细节由各个最新NSS的Binder完成,同时,通过与Spring Boot的ExternalizedConfiguration整合,SCS提供更多了BindingProperties等内部化配置类,那些具体内容的配置信息将绑定到具体内容的最新NSS的配置类中。

SCS的架构流程图

上面是SCS的架构流程图,我们会从几个层次分别传授其中相关联的源标识符和它们之间的可视化亲密关系。

应用层

SCS为用户提供更多了三个绑定最新消息地下通道的默认同时实现。

● Sink:通过指定消费最新消息的最终目标来标识最新消息消费者。

● Source:与Sink相反,用作标识最新消息生产者。

● Processor:集成了Sink和Source的功能,用作标识最新消息生产者和消费者。

对 应 用 而 言 , 想 要 启 动 SCS 的 功 能 , 需 要 先 启 动 注 解 。

@EnableBinding注释是Stream架构运转的起点,通过这个注释能同时实现动态注册BeanDefinition,它会将最新消息地下通道绑定到自己修饰的最终目标实例上,从而让那些实例具备与最新消息队列展开可视化的能力。上面我们看源标识符:

●BindingServiceConfiguration的 作 用 是 完 成BindingService、InputBindingLifecycle、OutputBindingLifecycle等重要Bean的初始化及相关配置文件加载。

● BindingBeansRegistrar的作用是注册新闻稿地下通道的USB类的BeanDefinition,从而获取那些USB类的实例,并使用那些实例展开最新消息的推送和接收,具体内容标识符同时实现如下表所示:

registerBindingTargetBeanDefinitions方法会调用ReflectionUtils类完成扫描所有被注释@Input和@Output标注了的方法,然后注册BeanDefinition。上面是标识符示例:

registerBindingTargetsQualifiedBeanDefinitions 是 在 注 册registerBindingTargetBeanDefinitions 时 使 用 的 工 厂 类BeanDefinition,这个工厂类用以生成registerBindingTargetBeanDefinition注册的Bean实例,如下表所示所示:

Stream层

Stream 层 的 BindableProxyFactory 被 初 始 化 为 一 个rootBeanDefinition,并注册为两个FactoryBean,这样Spring容器就可 以 获 得registerBindingTargetBeanDefinitions 方 法 中 所 注 册 的Bean实例(MessageChannel对象实例)。BindableProxyFactory能说是SCS同时实现地下通道USB类新闻稿及相关类型的核心类,标识符如下表所示:

afterPropertiesSet方法会处理所有被@Input和@Output注释的函数 , 并 将 生 成 函 数 返 回 类 型 实 例 存 储 在 BoundTargetHolder 中 ,getBindingTargetName方法会返回SubscribableChannelBindingTargetFactory 实 例 , 它 会 在createOutput方法中返回两个DirectChannel实例,该实例会被存储起来供BindableProxyFactory使用。

名称为output的BeanDefinition将BindableProxyFactory设置成其实例工厂类,并将outputMessagefunction方法设置成其实例的工厂函数(BeanFactoryMethod)。当Spring容器创建该实例时,会调用BindableProxyFactory 的 outputMessagefunction 方 法 , 由 于BindableProxyFactory同时实现了MethodlnterceptorUSB,因此就调用了其invoke方法。invoke方法会从BindableProxyFactory缓存的Channel实例中匹配符合的实例方法,并反射调用。

BindingService是Stream层获取绑定器和执行绑定任务的两个重要类,首先我们看BindingService的bindProducer方法,标识符如下表所示:

在 BindingService 实 现 中 , getBinder 方 法 最 终 会 调 用DefaultBinderFactory中的getBinder方法同时实现,我们能看到,DefaultBinderFactory的作用是获取具体内容的Binder同时实现并提供更多给相应的MessageChannel实例。DefaultBinderFactory的初始化倚赖于BinderTypeRegistry获得的BinderType列表。DefaultBinderFactory的getBinder同时实现中会调用BinderConfiguration获取对应的Binder实例 , 通 过 跟 踪 BinderConfiguration 的 初 始 化 过 程 , 可 以 发 现BinderConfiguration 是 在BinderFactoryConfiguration 执 行getBinderConfiguration方法时将bindingServiceProperties变量中的BinderProperties与BinderTypeRegistry中的BinderType结合,封装成BinderConfiguration对象。BinderProperties封装了Stream从application.yml文件中读取的关于Binder的配置信息,而BinderType则 是 具 体 Binder 的 实 现 类 信 息 。 DefaultBinderFactory 的getBinderInstance同时实现如下表所示:

这 里 的 getBinderInstance 方 法 中 会 生 成 一 个ConfigurableApplicationContext 来 创 建 Binder 实 例 , 在 创 建ConfigurableApplicationContext实例时,它会将BinderConfiguration设置到SpringApplicationBuilder中。

ConfigurableApplicationContext调用getBinder方法时,会使用BinderConfiguration的属性和配置生成BinderConfiguration中设置的具体内容类型的Binder同时实现。如果你使用的Binder是RabbitMQ,那么对应 的RabbitServiceAutoConfiguration 会 自 动 初 始 化 并 加 载RabbitMessageChannelBinder实例。

在 Stream 层 对 Binder 实 例 的 初 始 化 工 作 都 完 成 后 , 再 回 到BindingService 的 bindProducer 方 法 实 现 , 它 会 调 用AbstractMessagChannlBinder 的 doBindProducer 方 法 , 关 键 代 码 如下表所示:

从源标识符可知,ProvisioningProvider是两个USB,不同的Binder实 现 可 以 根 据 接 口 实 现 各 自 不 同 的 ProducerDestination 和ConsumerDestination,标识符如下表所示:

doBindProducer会调用createProducerMessageHandler方法创建MessageHandler实例,MessageChannel会使用SendingHandler封装后的MessageHandler实例,当有output最新消息时,将最新消息推送给最终的Binder实例。

通过上面的步骤,基本上在Stream层就完成了对生产者的绑定操作,消费者的绑定是将SubscribableChannel与具体内容的最新消息队列同时实现连接,doBindConsumer与doBindProducer流程类似。

首先通过ProvisioningProvider的provisionConsumerDestination方法创建ConsumerDestination,然后调用createConsumerEndpoint方法创建MessageProducer实例,最后生成DefaultBinding实例,标识符如下表所示:

Message/Integrate/最新NSSBinder层

从@Output注释能看到,Stream架构会使用MessageChannel推送消 息 。 通 过 BindingService 的 doBindProducer 方 法 创 建 并 绑 定SendingHandler对象,然后调用handleMessageInternal方法,它会将最新消息再推送给delegate对象处理。上面是SendingHandler对象的handleMessageInternal方法的标识符同时实现:

delegate是之前在BindingServer中抽象类AbstractMessageChannelBinder执行的createProducerMessageHandler方法返回的生产者MessageHandler实例。对于RabbitMQ Binder来说,是rmqpOutboundEndpoint对象,该实 例 将 最 终 调 用 其 handlerMessage 方 法 , 该 方 法 进 一 步 调 用RabbitTemplate的send方法。最新消息推送流程如下表所示图所示。

最新消息的接收过程

最新消息的接收过程能分为三个阶段:第两个阶段是从RabbitMQ到SubscribableChannel的过程。我们从@Input注释能看到,Stream架构 会 使 用 SubscribableChannel 接 收 消 息 。 第 二 个 阶 段 是 注 解@StreamListener告诉SubscribableChannel如何将最新消息推送给对应的Sink接收端对应的回调方法。

Spring的RabbitMQ使用InternalConsumer作为默认的最新消息消费方,当接收到对应最新消息后,会调用handleDelivery方法将RabbitMQ最新消息推送给BlockingQueueConsumer中的队列。上面是handleDelivery的源标识符同时实现。

AsyncMessageProcessingConsumer类是Runnable类型的,它会消费 阻 塞 队 列 , 并 将 消 息 传 给 AmqpInboundChannelAdapter 。

AmqpInboundChannelAdapter 实 例 是 在 BindingService 构 造createConsumerEndpoint时创建的consumerEndpoint,并将它与对应的Channel绑定。上面是AmqpInboundChannelAdapter的关键性标识符,即processMessage方法,它会调用MessagingTemplate对象的send方法将最新消息推送给SubscribableChannel组件。

上面是最新消息处理的第二个阶段,是将SubscribableChannel中的 消 息 发 送 给 指 定 的 方 法 , 主 要 靠 @StreamListener 注 解 实 现 。

@StreamListener是注释在消费方法上的注释,用以接收输入型地下通道的消 息 , Stream 定 义 了StreamListenerAnnotationBeanPostProcessor类,用以处理工程项目中的@SteamListener注释。

StreamListenerAnnotationBeanPostProcessor同时实现了BeanPostProcessorUSB,用以在Bean初始化之前和之后三个时间点对Bean实例展开处理。

postProcessAfterlnitialization是在Bean实例初始化之后被调用 的 方 法 , 它 会 遍 历 Bean 实 例 中 的 所 有 函 数 , 处 理 那 些 被@StreamListener注释修饰的函数。

afterSingletonsInstantiated方法会遍历mappedListenerMethods 对 应 的 所 有 Entry 对 象 , 为 每 一 个StreamListenerHandlerMethodMapping 创 建 一 个 MessageHandler 实例。然后根据条件生成DispatchingStreamListenerMessageHandler并注册给SubscribableChannel。

下 面 是StreamListenerAnnotationBeanPostProcessor 的 代 码 同时实现:

当 SubscribableChannel 接 收 到 消 息 后 , 会 调 用DispatchingStreamListenerMessageHandler类的handleRequestMessage方法,该方法会调用ConditionalStreamListenerHandler的handleMessage方法。

findMatchingHandlers方法根据ConditionalStreamListenerHandler 的 Expression 实 例 来 判 断ConditionalStreamListenerHandler是否适合处理当前这个最新消息,最终最新消息经过InvocableHandlerMethod传递给对应的函数。SCS消费最新消息的整体流程如下表所示图所示。

本文给大家传授的内容是MOM异步通讯,Stream源标识符解析

 

1.本站所有资源来源于用户上传和网络,如有侵权请邮件联系站长!
2.分享目的仅供大家学习和交流,您必须在下载后24小时内删除!
3.不得使用于非法商业用途,不得违反国家法律。否则后果自负!
4.本站提供的源码、模板、插件等其他资源,都不包含技术服务请大家谅解!
5.如有链接无法下载或失效,请联系管理员处理!
6.本站资源售价只是赞助,收取费用仅维持本站的日常运营所需!