应用系统集成-企业集成模式(EIP)
无论是系统间集成或是系统内部组件之间通讯,消息(信息流)都是系统设计最重要的因素。EIP将详细的讲述了从消息的角度进行集成设计考虑方方面面,是系统设计重要的参考资料。
企业集成模式(EIP)
Enterprise Integration Patterns(EIP) 对企业集成进行了分析, 并详细介绍了设计、构建和部署消息系统的模式。
图1 消息和集成映射图
EIP中将消息集成结构表示为三个核心组件:端点、通道、消息和三个关键逻辑:多步传递、路由和转换。 从集成抽象的角度来说, 这些概念不仅仅使用于消息系统,也适用于任何系统集成方式的抽象概念。 后面为保持和EIP名词一致,依然使用以上名字,但需要意识到这些同样适用于其他集成模式。
EIP中将消息集成结构表示为以下核心组件:
- 端点:位于应用程序端,负责将应用请求转换为集成请求,与消息系统进行通讯。
端点和客户端的区别:客户端实现的是具体中间件的通讯协议,而端点层在客户端和业务层之间,屏蔽客户端的差异性,提供统一的业务访问接口,将业务层的集成请求转换为具体中间件的通讯请求。
- 通道:消息传递应用程序通过消息通道传输数据,是连接发送方和接收方的虚拟管道。
- 消息:消息是通道上传输的原子数据包。
- 多步传递:在消息从发送者传递到接收者的过程中, 可能经过附加的操作,如转换、校验、增加附加信息等。
- 路由:消息中间件的核心功能,负责建立通道网络拓扑并将消息定向到最终接收者。
- 转换:不同的应用程序概念模型可能不一致,发送者以一种方式格式化消息,但接收者希望它以另一种方式格式化。
图2 EIP 消息系统模式全景图
消息
消息的设计即系统流转的数据流的设计。
消息的目的
系统中流转消息的目的可以分为:
- 命令消息:消息中包含发送者期望接收者执行的行为,某一个函数或方法。
命令消息通常代表着期望得到命令执行的结果或者异常回复。对于传输响应消息的通道可以使用发送消息通道专门对应的响应通道,或则是共享的响应通道。
响应消息涉及两个设计点: 1)响应通道的指定,可以在请求体中包含返回地址,或则以规范预定的方式确定响应。2)响应和请求的关系: 所有响应消息都应该包含可用于确定请求的标识符。
- 文档消息:消息中包含发送者期望告诉接收者的内容,包括特定的模型结构和具体数据。
- 事件消息:发送者将数据传递给接收者,只是通知接收者关于发送者的某种变化,但没有指定接收者必须用它做什么。
消息内容加工
信封包装
信封包装器将应用程序数据包装在与消息传递基础架构兼容的结构中。消息到达目的地时解包。在设计某种特定的消息总线时,需要设计总线消息的通用结构,具体的应用业务数据结构包装在总线结构中。如RocketMq消息属性包括:主体、消息类型、消息队列、消息ID、过滤标签等。
消息规范化器
在企业和企业的集成中, 某一个应用可能接收来自不同业务伙伴的消息,有不同的格式、含义。应引入消息规范器,将消息转换为通用的消息格式再传输至业务处理系统。
图3 消息规范化器
消息格式规范带来的思考是,在进行系统集成(消息集成)时,需要在不同的业务层次上设计不同级别的通用消息格式,要求每个应用程序以通用格式生成和使用消息。相当于这个系统集成生态中的业务消息协议栈。类似于TCP/IP网络模型中的协议栈。
其他内容加工模式
模式名称 | 内容 |
内容丰富器 | 使用专门的转换器Content Enricher来访问外部数据源,以增加缺少信息的消息。 |
内容过滤器 | 使用内容过滤器从邮件中删除不重要的数据项,只留下重要的项目。 |
声明检查 | 在不牺牲信息内容的情况下减少跨系统发送的消息的数据量,消息数据存储在持久存储中,并将声明检查传递给后续组件。这些组件可以使用Claim Check来检索存储的信息。 |
特定消息场景
海量消息
有时应用程序想要传输一个非常大的数据结构,这种数据结构可能不适用于单个消息。在这种情况下,将数据分解成更易于管理的块并将它们作为消息序列发送。这些块必须作为一个序列发送,而不仅仅是一堆消息,以便接收者可以重建原始数据结构
图4 消息序列
消息过期
消息内容可能是时间敏感的,因此如果在截止日期之前没有收到消息,则应该忽略并丢弃它。过期消息的处理可以由消费者确认忽略,或由消息系统处理:丢弃或路由到死信通道。
延迟\定期消息
消息的发送可能是基于时间延期像消费者发送。比如定时关闭未支付的订单,可以再创建订单的时候同时发送一条延迟的关闭未支付订单的消息。 当订单收到该消息时根据订单状态是否支付决定是否关闭订单。
RocketMQ的延迟消息原理:
RocketMQ为消息通道定义延迟消息存储的队列,消费消息,按照0到18级别来,0 表示不延迟,1表示延迟1s,大于等于18表示延迟2h按照级别一次类推。1-18级别的默认值的延迟值为“1s 5s 10s 30s 1m 2m 3m 4m 5m 6m 7m8m 9m 10m 20m 30m 1h 2h”。当发送者指定消息的延迟级别后,RocketMQ将消息放入对应级别的存储队列,通过内置的时间服务检查各自延迟队列中的消息是否达到发送时间,达到后则进行发送。
周期消息
周期性的消息是指消息按照特定的时间周期重复发送。可以建立专门负责周期投递的消费者,在接收到消息后根据消息的周期投递序列将消息再次发送到延期队列中。
消息通道
功能性设计
- 消息的发送和接收者之间的关系是一对一(点对点)还是一对多的(发布-订阅模式)。
- 消息通道的应用范围:是特定交易目的性质的,比如一条专门下订单的通道,还是通用性的消息总线型通道,如通用的消息通道, 包括下订单、发货、付款等所有消息都发送在该通道上,接收者接收所有消息,处理自己感兴趣的消息或忽略与自己无关的消息。
- 消息通道是单向或者是双向的。应用场景是发送者发送后不管的消息,还是需要接收消息消费者的响应。
消息通道的架构属性
可靠性:
如何处理消息系统崩溃或关闭的场景。
- 消息系统,消息中间件集群化设计,并支持持久化。
- 考虑到应用对消息中间件的强依赖, 强高可用要求场景下可以考虑在应用的端点层提供消息持久化,保证即使在短时间的消息系统故障期也能进行消息发送。
图5 发送端和接收端都支持消息持久化。
容错性/健壮性
死信队列
当消息传递系统确定它不能或不应该传递消息时,它可以选择将消息移动到死信通道。 如多次重试后未能成功发送到消费者或未收到消费者的响应。
无效的消息通道
当需要监控接收方接收消息的正确性是可以设置接收方无法处理的消息的特殊通道。接收方将不正确的消息移至该通道。
特定类型通道/受限传输通道
为每种数据类型使用单独的数据类型通道,以便特定通道上的所有数据属于同一类型。消息集成时可以考虑针对通道增加类型或其他的准入限制,以指明通道目的,并对通道进行保护。
通道清理
需要考虑当消费异常情况下消息得不到有效的消费,造成消息积压的情况下如何处理, 可以引入专门的通道清理器,按照清理规则对消息积压通道进行处理。
可扩展性/兼容性
要为无法改造接入消息系统的应用提供消息集成能力,可以考虑增加一个额外的通道适配层负责按照应用协议同应用进行通讯,并适配接入到消息系统。
同理, 如果涉及多个消息中间件的组网融合,可以引入消息适配或者桥接模式在多个消息系统间传递消息。经过桥接的消息网络可以起到企业消息总线的作用。
图6 通道适配和消息桥接
消息路由
基于内容的分发器
根据消息内容将每条消息路由到正确的收件人。如下图展示的RocketMq基于Tag将交易主题上的消息根据标签分发到不同系统消费者。
图7 Rocket MQ基于Tag的消息过滤功能
其他模式
模式 | 描述 |
消息过滤器 | 使用一种特殊的消息路由器,即消息过滤器,根据一组标准从通道中消除不需要的消息。 |
动态路由器 | 在基于内容过滤器的基础上,将分发规则可动态配置 |
收件人名单 | 为每个收件人定义一个渠道。然后使用收件人列表检查传入的邮件,确定所需收件人的列表,并将邮件转发到与列表中收件人关联的所有通道。 |
拆分器 | 使用拆分器将复合消息分解为一系列单独的消息,每个消息都包含与一个项目相关的数据。 |
聚合器 | 使用有聚合器收集和存储单个消息,直到收到一组完整的相关消息,并将单个聚合消息发布到输出通道以供进一步处理。 |
排序器 | 使用排序器来收集和重新排序消息,以便它们可以按指定顺序发布到输出通道。 |
组合消息处理器 | 使用组合消息处理器来处理组合消息。组合消息处理器将消息拆分,将子消息路由到适当的目的地,并将响应重新聚合回单个消息。 |
分散-聚集 | 当一条消息需要发送给多个收件人时,使用Scatter-Gather将消息广播给多个收件人并将响应重新聚合回单个消息。 |
路由单 | 将路由单附加到每条消息,指定处理步骤的顺序。用一个特殊的消息路由器包装每个组件,该路由器读取路由单并将消息路由到列表中的下一个组件 |
流程管理器 | 使用一个中央处理单元,一个Process Manager,来维护序列的状态,并根据中间结果确定下一个处理步骤—动态确定路由单 |
消息代理 | 将消息的目的地与发送者分离并保持对消息流的集中控制。将路由实现逻辑提取,集中到消息代理期的角色上。 |
消息端点
事务性客户端
使客户端与消息系统的会话具有事务性,以便客户端可以指定事务边界。事务性客户端具体表现为消息系统提供事务控制的能力来由本地事务控制消息事务。比如由于本地事务的失败,可以回滚消息的发送。
RabbitMQ提供了发送端和接收端的事务控制,RocketMQ提供发送端的事务控制。
图8 RocketMQ事务消息处理流程
RocketMQ事务消息处理流程:
- 生产者发送Half 消息到RocketMQ服务器。
- RocketMQ 将消息持久化后,返回ACK给客户端。
- 生产者执行本地事务
- 根据本地事务执行的结果,向RocketMQ服务器发送Commit或Rollback消息
- 如RocketMQ没有收到发送者发送的Commit或Rollback消息,将向发送者发送事务状态检查请求。
- 发送者收到事务状态检查请求后检查本地事务的执行结果,
- 根据本地事务的执行结果向RocketMQ服务器发送Commit或Rollback消息
- 当RocketMQ服务器接收到Commit消息则真正将消息发送到消费者,如果接收到Rollback消息,则不发送消息到消费者。
RabbitMQ接收端开启事务消息后,在接收端手工确认的情况下, 必须进行commit,才能移除消息。
// 手动确认
channel.basicAck(deliveryTag, true);
// 提交事务
channel.txCommit();
本地事务控制消息事务 OR 消息事务控制本地事务
上面表述了本地事务可以控制消息事务来完成事务消息的目的。另外一种类似做法是将消息的发送做完本地事务的最后一个步骤,根据消息的发送结果来确定本地事务是否回滚。这种做法在本地事务仅仅和一次消息发送(或远程调用)时可用,但当本地涉及多个消息的发送时则某一个消息的事务将无法控制到其他消息发送的事务。
比如以下处理流程:
Begin Local Transaction A
Send Message m1;
Send Message m2;
Commit Local Transaction A;
消息发送m2的发送失败,可以出发本地事务A的回滚,但无法改变消息m1消息成功发送的事实。
其他消息端点模式
模式 | 描述 |
消息网关 | 这是一个包装特定于消息传递的方法调用并将使用于特定领域域的方法公开给应用程序的类。 |
消息映射器 | 创建一个单独的消息传递映射器,其中包含消息传递基础结构和域对象之间的映射逻辑。类似ORM框架映射的概念 |
轮询消费者 | 应用程序应该使用轮询消费者,当它想要接收消息时显式地进行调用。 |
事件驱动的消费者 | 应用程序应该使用事件驱动的消费者,即在通道上传递消息时自动传递消息。 |
竞争消费者 | 在单个通道上创建多个竞争消费者,以便消费者可以同时处理多个消息。如RocketMQ的消费组。 |
消息调度程序 | 这是从消费者的角度来看,消费者在通道上创建一个消息调度程序,它将使用来自通道的消息并将它们分发给执行者。类似IO多路复用概念 |
选择性消费者 | 消费者成为选择性消费者,它过滤由其通道传递的消息,以便它只接收符合其条件的消息。 |
持久订阅 | 持久订阅者使消息传递系统在订阅者断开连接时保存发布的消息。Apache RocketMQ 统一管理消息的存储时长,无论消息是否被消费,而RabbitMQ可以指定持久化队列和消息持久化来保证持久订阅 |
集成涉及的架构风格
管道和过滤器
架构风格或则说是架构结构,表示了系统间的组件及其相关关系。管道和过滤器是一种常用的架构风格。任何对数据\消息\请求的业务处理过程中需要提供一种可扩展的增加额外处理逻辑的场景下都适用该架构风格,如MVC和Spring中的各类过滤器。处理复杂业务逻辑的场景设计可以考虑使用过道和过滤器风格来保障扩展性。
图9 消息系统中管道和过滤器风格的示意图。
管道和过滤器架构风格的核心组件包括:输入和输出、管道和过滤器。上图展示了输入的消息在管道上经过解密、认证、De-Dup过滤器处理后输出消息。实现该架构风格除了管道和过滤器外,还需要一个管道组装器负责将所需的过滤器组装到特定的处理管道上。
六边形架构风格
不同应用的集成就是不同应用的适配过程,如消息格式的适配,集成技术协议的适配等。可以考虑使用六边形架构风格。六边架构风格或者微内核风格,体现的是系统架构最重要的任务:识别系统的稳定点和不稳定点。
事件驱动架构(EDA)
事件驱动架构在具体实现中是指组成应用的组件之间通过事件机制完成业务功能,EDA中各组件以异步方式响应事件。
响应式编程是以异步编程为核心理念编程模型,一个真正的响应式应用是指整个业务流程执行中各个层都使用响应式编程组件来完成,处理流程中的每一步都是异步的。常见的响应式编程框架有:Spring WebFlux(以Reactor框架基础的响应式web框架),Project Reactor,RxJava,Akka。
背压问题:当上游请求过多,下游服务来不及响应,导致 Buffer 溢出的这样一个问题。在响应式编程,由于线程不阻塞,遇到 IO 就会把当前参数和要做的事情缓存起来,这样无疑增大了很多吞吐量,同时内存占用也大了起来,如果不限制的话,很可能发生OutOfMemory异常。
响应式编程目前主要适用于对性能要求特别高的系统,在业务系统开发中并不是主流模型。