RocketMQ EventBridge 核心概念
理解EventBridge中的核心概念,能帮助我们更好的分析和使用EventBridge。本文重点介绍下EventBridge中包含的术语:
- EventSource:事件源。用于管理发送到EventBridge的事件,所有发送到EventBridge中的事件都必须标注事件源名称信息,对应CloudEvent事件体中的source字段。
- EventBus:事件总线。用于存储发送到EventBridge的事件。
- EventRule:事件规则。当消费者需要订阅事件时,可以通过规则配置过滤和转换信息,将事件推送到指定的目标端。
- FilterPattern:事件过滤模式,用于在规则中配置过滤出目标端需要的事件。
- Transform:事件转换,将事件格式转换成目标端需要的数据格式。
- EventTarget:事件目标端,即我们真正的事件消费者。
下面,我们具体展开:
EventSource
事件源,代表事件发生的源头,用来描述一类事件,一般与微服务系统一一对应。比如:交易事件源、考勤事件源等等。事件源,是对事件一个大的分类,一个事件源下面,往往会包含多种事件类型(type),比如交易事件源下面,可能包含:下单事件、支付事件、退货事件等等。
另外,需要值得注意的是,事件源并不用来描述发生事件的实体,取而代之的是,在CloudEvent中,我们一般选用subject来表示产生这个事件的实体资源。事件源有点像市场经济大卖场中的大类分区,例如:生鲜区、日化日用区、家用电器区等等。在事件中心这个"大卖场",我们可以通过事件源快速的找到我们需要的事件。
EventBus
事件总线是存储事件的地方,其下可以有多种实现,包括Local、RocketMQ、Kafka等。
事件生产者发送事件的时候,必须指定事件总线。事件总线是EventBridge的一等公民,其他所有资源都围绕事件总线形成逻辑上的隔离,即:事件源、事件规则必须都隶属于某一个事件总线下。不同事件总线下的事件源和事件规则可以重名,但是同一个事件总线下的事件源和规则必须不重名。
EventRule
当消费者需要订阅事件时,可以通过事件规则配置过滤和转换信息,将事件推送到指定的目标端。所以,事件规则包含三部分:事件过滤+事件转换+事件目标。
FilterPattern
通过事件过滤模式,我们可以对事件总线上的事件进行过滤,只将目标端需要的事件推送过去,以减少不必要的开通,同时减轻消费者 Target端的压力。目前EventBridge支持的事件过滤能力包括:
- 指定值匹配
- 前缀匹配
- 后缀匹配
- 除外匹配
- 数值匹配
- 数组匹配
- 以及复杂的组合逻辑匹配
(详细介绍待见其他文章)
Transform
生产者的事件可能会同时被多个消费者订阅,但不同消费者需要的数据格式往往不同。这个时候,需要我们将生产者的事件,转换成消费者 Target端需要的事件格式。目前EventBridge支持的事件转换能力包括:
- 完整事件:不做转换,直接投递原生 CloudEvents;
- 部分事件:通过 JsonPath 语法从 CloudEvents 中提取出需要投递到事件目标的内容;
- 常量:事件只起到触发器的作用,投递内容为常量;
- 模板转换器:通过定义模板,灵活地渲染投递出去的事件格式;
(详细介绍待见其他文章)
EventTarget
事件目标端,也即我们的事件消费者。在EventBridge架构中,消费者只需要按照自己的业务领域模型设计,提供一个公共的API(这个API既可用来接收事件,同时也用来前台管控面操作),EventBridge就会按照API定义需要的数据格式,将事件安全、可靠的推送给 Target消费者。
RocketMQ EventBridge 概览
RocketMQ EventBridge 致力于帮助用户构建高可靠、低耦合、高性能的事件驱动架构。在事件驱动架构中,微服务不需要主动订阅外部消息,而是可以把所有触发微服务系统发生改变的入口统一到API,并只需要关注当前微服务自己的业务领域模型定义和设计API,无需通过大量的胶水代码去适配解析外部服务的消息。EventBridge 则会负责将外部服务产生的事件安全的、可靠的适配并投递到当前微服务设计的API。
那什么时候我们使用RocketMQ消息,什么时候使用EventBridge事件? 事件的含义是什么,和消息有什么区别?
消息与事件
我们给事件做了如下定义:
事件是指过去已经发生的事,尤其是比较重要的事。
事件与消息的关系如下:
消息包含Command消息和Event消息。Command消息是外部系统发送给本系统的一条操作命令(如上图左半部分);Event消息则是本系统收到Command操作请求,系统内部发生改变之后而产生了事件(如上图右半部分);
事件的四个特性
1、已发生
事件,一定是“已发生”的。 “已发生”同时意味着是不可变的。这个特性非常重要,在我们处理事件、分析事件的时候,这就意味着,我们绝对可以相信这些事件,只要是收到的事件,一定是系统真实发生过的行为。
Command,则代表一种操作请求,是否真的发生不可得知,比如:
* 把厨房的灯打开
* 去按下门铃
* 转给A账户10w
Event,则是明确已经发生的事情。比如
* 厨房灯被打开了
* 有人按了门铃
* A账户收到了10w
2、无期望
事件是客观的描述一个事物的状态或属性值的变化,但对于如何处理事件本身并没有做任何期望。 相比之下,Command和Query则都是有期望的,他们希望系统做出改变或则返回结果,但是Event只是客观描述系统的一个变化。
举个例子: 交通信号灯,从绿灯变成黄灯,只是描述了一个客观事实,本身并没有客观期望。在不同国家地区,对这个事件赋予了不同的期望。 比如,在日本黄灯等于红灯,而在俄罗斯闯黄灯是被默许的。
与Command消息对比:
- 事件:有点像"市场经济",商品被生产出来,摆放在商场的大橱窗里,消费者谁看着觉得好就买回去,如果一直没人买,商品可能就过期浪费了。
- Command消息:则有点像"计划经济",按需生产,指定分配对象,也很少产生浪费。
3、天然有序且唯一
同一个实体,不能同时发生A又发生B,必有先后关系;如果是,则这两个事件必属于不同的事件类型。
比如:针对同一个交通信号灯,不能既变成绿灯,又变成红灯,同一时刻,只能变成一种状态。 如果我们看到了两个内容一样的事件,那么一定是发生了两次,而且一次在前,一次在后。这对于我们处理数据最终一致性、以及系统行为分析(比如ABA场景)都很有价值:我们看到的,不光光是系统的一个最终结果,而是看到变成这个结果之前的,一系列中间过程。
4、具像化
事件会尽可能的把“案发现场”完整的记录下来,因为事件不知道消费者会如何使用它,所以会做到尽量的详尽。包括:
什么时候发生的事件?
谁产生的?
是什么类型的事件?
事件的内容是什么?内容的结构是什么?
... ...
对比我们常见的消息,因为上下游一般是确定的,常常为了性能和传输效率,则会做到尽可能的精简,只要满足“计划经济”指定安排的消费者需求即可。
RocketMQ EventBridge 的典型应用场景
场景1:事件通知
微服务中,我们常常会遇到需要把一个微服务中生产的消息,通知给其他消费者。这里我们对比三种方式:
A:强依赖方式
生产者主动调用消费者的微服务,并适配消费者的API。这种设计无疑是非常糟糕的,生产者强依赖消费者,深度耦合。万一调用某个消费者出现异常且未做有效隔离,极容易导致整个微服务Hang起。有新的消费者进来,扩展性也极差。
B:半解耦方式
生产者将消息发送到消息服务,消费者订阅消息服务获取消息,并将消息解析成自己业务领域模型中需要的数据格式。这种方式做到了调用链路上的解耦,极大的降低了系统风险,但是对于消费者来说,依旧需要去理解和解析生产者的业务语义,将消息转换成自己业务领域内需要的格式。这种方式下,当消费者需要订阅多个生产者的数据的时候,需要用大量的胶水代码,为每一个生产者产生的消息做适配。另外,当上游生产者的消息格式发生变化时,也会存在风险和运维成本。
B:完全解耦方式
这种方式下,消费者不需要引入SDK订阅Broker,只需要按照自己的业务领域模型设计API,消息服务会将上游的事件,过滤并转换成API需要的事件格式。既没有调用链路上的依赖,也没有业务上的依赖。当上游生产者的事件数据格式发生变化时,消息服务会做兼容性校验,可以拒绝生产者发送事件或则进行告警。
场景2:系统间集成
场景1主要面向一个产品内部,各个微服务之间的事件通信。场景2则是主要面向多个产品之间的事件通信。在一个企业中,我们常常会用到多款产品,而且很多产品可能并不是我们自己开发的,而是购买的外部SaaS服务。这个时候,如果我们希望事件在不同外部SaaS产品之间流转是比较困难的,因为这些外部SaaS产品不是我们自己开发的,无法轻易的修改其中的代码。EventBridge提供的事件中心能力,能够帮助收集各个产品产生的事件,并很好的组织管理起来,就像大卖场橱窗里的商品,精心摆放准备好,配备介绍说明书,供消费者挑选,同时提供送货上门服务。
RocketMQ EventBridge 是如何工作的?
为了解决上述两个应用场景中提到的问题,EventBridge从5个方便入手:
第1. 确定事件标准: 因为事件不是给自己看的,而是给所有人看的。它没有明确的消费者,所有都是潜在的消费者。所以,我们需要规范化事件的定义,让所有人都能看得懂,一目了然。目前CNCF旗下的CloudEvent,以逐渐成为广泛的事实标准,因此,我们选取了CloudEvent 作为我们的EventBridge的事件标准。
第2. 建立事件中心: 事件中心里面有所有系统,注册上来的各种事件,这个就像我们上面说的市场经济大卖场,里面玲琅满目分类摆放了各种各样的事件,所有人即使不买,也都可以进来瞧一瞧,看一看,有哪些事件可能是我需要的,那就可以买回去。
第3. 定义事件格式: 事件格式用来描述事件的具体内容。这相当于市场经济的一个买卖契约。生产者发送的事件格式是什么,得确定下来,不能总是变;消费者以什么格式接收事件也得确定下来,不然整个市场就乱套了。
第4. 订阅"规则": 我们得给消费者一个,把投递事件到目标端的能力,并且投递前可以对事件进行过滤和转换,让它可以适配目标端API接收参数的格式,我们把这个过程叫做创建订阅规则。
第5. 事件总线: 最后我们还得有一个存储事件的地方,就是最图中最中间的事件总线。
RocketMQ EventBridge 快速开始
RocketMQ EventBridge 需要一个消息服务来存储事件,另外需要一个Runtime来订阅并推送事件。这里我们选择 Apache RocketMQ 作为我们的消息服务,选择 Apache RocketMQ Connect 作为我们的Runtime来订阅和推送事件。当然,您也可以选择其他消息服务代替,EventBridge并不对此做限制。未来EventBridge也计划基于OpenMessaging Connect API 实现自己的Runtime,以便更好的提供事件驱动服务。
系统要求:
- 64位操作系统,推荐 Linux/Unix/macOS
- 64位 JDK 1.8+
部署Apache RocketMQ
Apache RocketMQ 是一个很棒的消息服务,我们默认选择它作为EventBus的默认存储。这里您可以根据这个手册快速部署: Apache RocketMQ Quick Start
部署Apache RocketMQ Connect
我们使用Apache RocketMQ Connect作为我们的默认Runtime,来连接外部的上下游服务,您可以根据手册完成部署: RocketMQ Connect Quick Start 。在部署 Apache RocketMQ Connect 之前,您应该下载下面的插件,并将其放在rocketmq-connect中配置参数“pluginPaths”所定义的目录下:
- rocketmq-connect-eventbridge-jar-with-dependencies.jar
- rocketmq-connect-dingtalk-jar-with-dependencies.jar
- connect-cloudevent-transform-jar-with-dependencies.jar
- connect-filter-transform-jar-with-dependencies.jar
- connect-eventbridge-transform-jar-with-dependencies.jar
部署RocketMQ EventBridge
- 获取 EventBridge
你可以从这里下载EventBridge的二进制包:rocketmq-eventbridge-xxx-bin-release.zip,下载完毕后进行解压缩,你会得到一个如下目录:
/rocketmq-eventbridge-xxx-bin-release/
|——bin
| |——runserver.sh
| |——eventbridge.sh
|——config
| |——application.properties
|——jar
| |——rocketmq-eventbridge.jar
- 配置 EventBridge
运行前,我们需要配置EventBridge的运行环境,修改config/application.properties,参考如下:
# Mysql数据库的连接地址
spring.datasource.url=jdbc:mysql://xxxx:3306/xxxx?characterEncoding=utf8
spring.datasource.username=xxx
spring.datasource.password=xxxx
# RocketMQ nameserver的连接地址
rocketmq.namesrvAddr=xxxxx:9876
# RocketMQ的集群名称.
rocketmq.cluster.name=DefaultCluster
# RocketMQ Connect的连接地址
rocketmq.connect.endpoint=xxxxxx:8082
# log默认配置
log.path=~
log.level=INFO
app.name=rocketmq-eventbridge
- 启动 EventBridge
sh bin/eventbridge.sh start
log默认目录为~/rocketmq-eventbridge/rocketmq-eventbridge.log,可以修改上述log.path和app.name进行修改。可以通过日志来观察服务是否正常启动:
- 测试 EventBridge
当服务启动后,我们就可以通过下面的Demo用例来测试和验证EventBridge。
Demo
- 创建事件总线
POST /bus/createEventBus HTTP/1.1
Host: demo.eventbridge.com
Content-Type: application/json; charset=utf-8
{
"eventBusName":"demo-bus",
"description":"a demo bus."
}
- 创建事件源
POST /source/createEventSource HTTP/1.1
Host: demo.eventbridge.com
Content-Type: application/json; charset=utf-8
{
"eventBusName":"demo-bus",
"eventSourceName":"demo-source",
"description":"A demo source."
}
- 创建事件规则
POST /rule/createEventRule HTTP/1.1
Host: demo.eventbridge.com
Content-Type: application/json; charset=utf-8
{
"eventBusName":"demo-bus",
"eventRuleName":"demo-rule",
"description":"A demo rule.",
"filterPattern":"{}"
}
- 创建事件目标
创建一个投递到云上EventBridge的事件目标:
POST /target/createEventTargets HTTP/1.1
Host: demo.eventbridge.com
Content-Type: application/json; charset=utf-8
{
"eventBusName":"demo-bus",
"eventRuleName":"demo-rule",
"eventTargets":[
{
"eventTargetName":"eventbridge-target",
"className":"acs.eventbridge",
"config":{
"RegionId":"cn-hangzhou",
"AliyunEventBus":"rocketmq-eventbridge"
}
}
]
}
创建一个投递到钉钉机器人推送通知的事件目标:
POST /target/createEventTargets HTTP/1.1
Host: demo.eventbridge.com
Content-Type: application/json; charset=utf-8
{
"eventBusName":"demo-bus",
"eventRuleName":"demo-rule",
"eventTargets":[
{
"eventTargetName":"dingtalk-target",
"className":"acs.dingtalk",
"config":{
"WebHook":"https://oapi.dingtalk.com/robot/send?access_token=b43a54b702314415c2acdae97eda1e092528b7a9dddb31510a5b4430be2ef867",
"SecretKey":"SEC53483bf496b8f9e0b4ab0ab669d422208e6ccfaedfd5120ea6b8426b9ecd47aa",
"Body":"{\"template\":\"{\\\"text\\\":{\\\"content\\\":\\\"${content}\\\"},\\\"msgtype\\\":\\\"text\\\"}\",\"form\":\"TEMPLATE\",\"value\":\"{\\\"content\\\":\\\"$.data.body\\\"}\"}"
}
}
]
}
-
发送事件到EventBus
最后,我们通过API发送一条事件,并验证Target端是否按预期收到对应的事件。
POST /putEvents HTTP/1.1
Host: demo.eventbridge.com
Content-Type:"application/cloudevents+json; charset=UTF-8"
{
"specversion" : "1.0",
"type" : "com.github.pull_request.opened",
"source" : "https://github.com/cloudevents/spec/pull",
"subject" : "123",
"id" : "A234-1234-1234",
"time" : "2018-04-05T17:31:00Z",
"datacontenttype" : "application/json",
"data" : {
"body":"demo"
},
"aliyuneventbusname":"demo-bus"
}