RocketMQ-(8-1)-EventBridge-EventBridge 核心概念

news2025/1/12 18:51:10

RocketMQ EventBridge 核心概念

理解EventBridge中的核心概念,能帮助我们更好的分析和使用EventBridge。本文重点介绍下EventBridge中包含的术语:

  • EventSource:事件源。用于管理发送到EventBridge的事件,所有发送到EventBridge中的事件都必须标注事件源名称信息,对应CloudEvent事件体中的source字段。
  • EventBus:事件总线。用于存储发送到EventBridge的事件。
  • EventRule:事件规则。当消费者需要订阅事件时,可以通过规则配置过滤和转换信息,将事件推送到指定的目标端。
  • FilterPattern:事件过滤模式,用于在规则中配置过滤出目标端需要的事件。
  • Transform:事件转换,将事件格式转换成目标端需要的数据格式。
  • EventTarget:事件目标端,即我们真正的事件消费者。

下面,我们具体展开:

EventSource​

事件源,代表事件发生的源头,用来描述一类事件,一般与微服务系统一一对应。比如:交易事件源、考勤事件源等等。事件源,是对事件一个大的分类,一个事件源下面,往往会包含多种事件类型(type),比如交易事件源下面,可能包含:下单事件、支付事件、退货事件等等。

另外,需要值得注意的是,事件源并不用来描述发生事件的实体,取而代之的是,在CloudEvent中,我们一般选用subject来表示产生这个事件的实体资源。事件源有点像市场经济大卖场中的大类分区,例如:生鲜区、日化日用区、家用电器区等等。在事件中心这个"大卖场",我们可以通过事件源快速的找到我们需要的事件。

EventBus​

事件总线是存储事件的地方,其下可以有多种实现,包括Local、RocketMQ、Kafka等。

事件生产者发送事件的时候,必须指定事件总线。事件总线是EventBridge的一等公民,其他所有资源都围绕事件总线形成逻辑上的隔离,即:事件源、事件规则必须都隶属于某一个事件总线下。不同事件总线下的事件源和事件规则可以重名,但是同一个事件总线下的事件源和规则必须不重名。

EventRule​

当消费者需要订阅事件时,可以通过事件规则配置过滤和转换信息,将事件推送到指定的目标端。所以,事件规则包含三部分:事件过滤+事件转换+事件目标。

img_1.png

FilterPattern​

通过事件过滤模式,我们可以对事件总线上的事件进行过滤,只将目标端需要的事件推送过去,以减少不必要的开通,同时减轻消费者 Target端的压力。目前EventBridge支持的事件过滤能力包括:

  • 指定值匹配
  • 前缀匹配
  • 后缀匹配
  • 除外匹配
  • 数值匹配
  • 数组匹配
  • 以及复杂的组合逻辑匹配

(详细介绍待见其他文章)

Transform​

生产者的事件可能会同时被多个消费者订阅,但不同消费者需要的数据格式往往不同。这个时候,需要我们将生产者的事件,转换成消费者 Target端需要的事件格式。目前EventBridge支持的事件转换能力包括:

  • 完整事件:不做转换,直接投递原生 CloudEvents;
  • 部分事件:通过 JsonPath 语法从 CloudEvents 中提取出需要投递到事件目标的内容;
  • 常量:事件只起到触发器的作用,投递内容为常量;
  • 模板转换器:通过定义模板,灵活地渲染投递出去的事件格式;

(详细介绍待见其他文章)

EventTarget​

事件目标端,也即我们的事件消费者。在EventBridge架构中,消费者只需要按照自己的业务领域模型设计,提供一个公共的API(这个API既可用来接收事件,同时也用来前台管控面操作),EventBridge就会按照API定义需要的数据格式,将事件安全、可靠的推送给 Target消费者。

RocketMQ EventBridge 概览

RocketMQ EventBridge 致力于帮助用户构建高可靠、低耦合、高性能的事件驱动架构。在事件驱动架构中,微服务不需要主动订阅外部消息,而是可以把所有触发微服务系统发生改变的入口统一到API,并只需要关注当前微服务自己的业务领域模型定义和设计API,无需通过大量的胶水代码去适配解析外部服务的消息。EventBridge 则会负责将外部服务产生的事件安全的、可靠的适配并投递到当前微服务设计的API。

那什么时候我们使用RocketMQ消息,什么时候使用EventBridge事件? 事件的含义是什么,和消息有什么区别?

消息与事件​

我们给事件做了如下定义:

事件是指过去已经发生的事,尤其是比较重要的事。

事件与消息的关系如下:

image

消息包含Command消息和Event消息。Command消息是外部系统发送给本系统的一条操作命令(如上图左半部分);Event消息则是本系统收到Command操作请求,系统内部发生改变之后而产生了事件(如上图右半部分);

事件的四个特性​

1、已发生​

事件,一定是“已发生”的。 “已发生”同时意味着是不可变的。这个特性非常重要,在我们处理事件、分析事件的时候,这就意味着,我们绝对可以相信这些事件,只要是收到的事件,一定是系统真实发生过的行为。

Command,则代表一种操作请求,是否真的发生不可得知,比如:

* 把厨房的灯打开
* 去按下门铃
* 转给A账户10w

Event,则是明确已经发生的事情。比如

* 厨房灯被打开了
* 有人按了门铃
* A账户收到了10w

2、无期望​

事件是客观的描述一个事物的状态或属性值的变化,但对于如何处理事件本身并没有做任何期望。 相比之下,Command和Query则都是有期望的,他们希望系统做出改变或则返回结果,但是Event只是客观描述系统的一个变化。

举个例子: 交通信号灯,从绿灯变成黄灯,只是描述了一个客观事实,本身并没有客观期望。在不同国家地区,对这个事件赋予了不同的期望。 比如,在日本黄灯等于红灯,而在俄罗斯闯黄灯是被默许的。

与Command消息对比:

  • 事件:有点像"市场经济",商品被生产出来,摆放在商场的大橱窗里,消费者谁看着觉得好就买回去,如果一直没人买,商品可能就过期浪费了。
  • Command消息:则有点像"计划经济",按需生产,指定分配对象,也很少产生浪费。

3、天然有序且唯一​

同一个实体,不能同时发生A又发生B,必有先后关系;如果是,则这两个事件必属于不同的事件类型。

比如:针对同一个交通信号灯,不能既变成绿灯,又变成红灯,同一时刻,只能变成一种状态。 如果我们看到了两个内容一样的事件,那么一定是发生了两次,而且一次在前,一次在后。这对于我们处理数据最终一致性、以及系统行为分析(比如ABA场景)都很有价值:我们看到的,不光光是系统的一个最终结果,而是看到变成这个结果之前的,一系列中间过程。

4、具像化​

事件会尽可能的把“案发现场”完整的记录下来,因为事件不知道消费者会如何使用它,所以会做到尽量的详尽。包括:

什么时候发生的事件?
谁产生的?
是什么类型的事件?
事件的内容是什么?内容的结构是什么?
... ...

对比我们常见的消息,因为上下游一般是确定的,常常为了性能和传输效率,则会做到尽可能的精简,只要满足“计划经济”指定安排的消费者需求即可。

RocketMQ EventBridge 的典型应用场景​

场景1:事件通知​

微服务中,我们常常会遇到需要把一个微服务中生产的消息,通知给其他消费者。这里我们对比三种方式:

A:强依赖方式

生产者主动调用消费者的微服务,并适配消费者的API。这种设计无疑是非常糟糕的,生产者强依赖消费者,深度耦合。万一调用某个消费者出现异常且未做有效隔离,极容易导致整个微服务Hang起。有新的消费者进来,扩展性也极差。

B:半解耦方式

生产者将消息发送到消息服务,消费者订阅消息服务获取消息,并将消息解析成自己业务领域模型中需要的数据格式。这种方式做到了调用链路上的解耦,极大的降低了系统风险,但是对于消费者来说,依旧需要去理解和解析生产者的业务语义,将消息转换成自己业务领域内需要的格式。这种方式下,当消费者需要订阅多个生产者的数据的时候,需要用大量的胶水代码,为每一个生产者产生的消息做适配。另外,当上游生产者的消息格式发生变化时,也会存在风险和运维成本。

B:完全解耦方式

这种方式下,消费者不需要引入SDK订阅Broker,只需要按照自己的业务领域模型设计API,消息服务会将上游的事件,过滤并转换成API需要的事件格式。既没有调用链路上的依赖,也没有业务上的依赖。当上游生产者的事件数据格式发生变化时,消息服务会做兼容性校验,可以拒绝生产者发送事件或则进行告警。

image

场景2:系统间集成​

场景1主要面向一个产品内部,各个微服务之间的事件通信。场景2则是主要面向多个产品之间的事件通信。在一个企业中,我们常常会用到多款产品,而且很多产品可能并不是我们自己开发的,而是购买的外部SaaS服务。这个时候,如果我们希望事件在不同外部SaaS产品之间流转是比较困难的,因为这些外部SaaS产品不是我们自己开发的,无法轻易的修改其中的代码。EventBridge提供的事件中心能力,能够帮助收集各个产品产生的事件,并很好的组织管理起来,就像大卖场橱窗里的商品,精心摆放准备好,配备介绍说明书,供消费者挑选,同时提供送货上门服务。

image

RocketMQ EventBridge 是如何工作的?​

为了解决上述两个应用场景中提到的问题,EventBridge从5个方便入手:

第1. 确定事件标准: 因为事件不是给自己看的,而是给所有人看的。它没有明确的消费者,所有都是潜在的消费者。所以,我们需要规范化事件的定义,让所有人都能看得懂,一目了然。目前CNCF旗下的CloudEvent,以逐渐成为广泛的事实标准,因此,我们选取了CloudEvent 作为我们的EventBridge的事件标准。

第2. 建立事件中心: 事件中心里面有所有系统,注册上来的各种事件,这个就像我们上面说的市场经济大卖场,里面玲琅满目分类摆放了各种各样的事件,所有人即使不买,也都可以进来瞧一瞧,看一看,有哪些事件可能是我需要的,那就可以买回去。

第3. 定义事件格式: 事件格式用来描述事件的具体内容。这相当于市场经济的一个买卖契约。生产者发送的事件格式是什么,得确定下来,不能总是变;消费者以什么格式接收事件也得确定下来,不然整个市场就乱套了。

第4. 订阅"规则": 我们得给消费者一个,把投递事件到目标端的能力,并且投递前可以对事件进行过滤和转换,让它可以适配目标端API接收参数的格式,我们把这个过程叫做创建订阅规则。

第5. 事件总线: 最后我们还得有一个存储事件的地方,就是最图中最中间的事件总线。

image

RocketMQ EventBridge 快速开始

RocketMQ EventBridge 需要一个消息服务来存储事件,另外需要一个Runtime来订阅并推送事件。这里我们选择 Apache RocketMQ 作为我们的消息服务,选择 Apache RocketMQ Connect 作为我们的Runtime来订阅和推送事件。当然,您也可以选择其他消息服务代替,EventBridge并不对此做限制。未来EventBridge也计划基于OpenMessaging Connect API 实现自己的Runtime,以便更好的提供事件驱动服务。

系统要求:

  • 64位操作系统,推荐 Linux/Unix/macOS
  • 64位 JDK 1.8+

部署Apache RocketMQ​

Apache RocketMQ 是一个很棒的消息服务,我们默认选择它作为EventBus的默认存储。这里您可以根据这个手册快速部署: Apache RocketMQ Quick Start

部署Apache RocketMQ Connect​

我们使用Apache RocketMQ Connect作为我们的默认Runtime,来连接外部的上下游服务,您可以根据手册完成部署: RocketMQ Connect Quick Start 。在部署 Apache RocketMQ Connect 之前,您应该下载下面的插件,并将其放在rocketmq-connect中配置参数“pluginPaths”所定义的目录下:

  • rocketmq-connect-eventbridge-jar-with-dependencies.jar
  • rocketmq-connect-dingtalk-jar-with-dependencies.jar
  • connect-cloudevent-transform-jar-with-dependencies.jar
  • connect-filter-transform-jar-with-dependencies.jar
  • connect-eventbridge-transform-jar-with-dependencies.jar

部署RocketMQ EventBridge​

  • 获取 EventBridge

你可以从这里下载EventBridge的二进制包:rocketmq-eventbridge-xxx-bin-release.zip,下载完毕后进行解压缩,你会得到一个如下目录:

/rocketmq-eventbridge-xxx-bin-release/
|——bin
|   |——runserver.sh
|   |——eventbridge.sh
|——config
|   |——application.properties
|——jar
|   |——rocketmq-eventbridge.jar

  • 配置 EventBridge

运行前,我们需要配置EventBridge的运行环境,修改config/application.properties,参考如下:

# Mysql数据库的连接地址
spring.datasource.url=jdbc:mysql://xxxx:3306/xxxx?characterEncoding=utf8
spring.datasource.username=xxx
spring.datasource.password=xxxx

# RocketMQ nameserver的连接地址
rocketmq.namesrvAddr=xxxxx:9876

# RocketMQ的集群名称.
rocketmq.cluster.name=DefaultCluster

# RocketMQ Connect的连接地址
rocketmq.connect.endpoint=xxxxxx:8082

# log默认配置
log.path=~
log.level=INFO
app.name=rocketmq-eventbridge

  • 启动 EventBridge
sh bin/eventbridge.sh start 

log默认目录为~/rocketmq-eventbridge/rocketmq-eventbridge.log,可以修改上述log.path和app.name进行修改。可以通过日志来观察服务是否正常启动:  

  • 测试 EventBridge

当服务启动后,我们就可以通过下面的Demo用例来测试和验证EventBridge。

Demo​

  • 创建事件总线
POST /bus/createEventBus HTTP/1.1
Host: demo.eventbridge.com
Content-Type: application/json; charset=utf-8
{
"eventBusName":"demo-bus",
"description":"a demo bus."
}

  • 创建事件源
POST /source/createEventSource HTTP/1.1
Host: demo.eventbridge.com
Content-Type: application/json; charset=utf-8
{
"eventBusName":"demo-bus",
"eventSourceName":"demo-source",
"description":"A demo source."
}

  • 创建事件规则
POST /rule/createEventRule HTTP/1.1
Host: demo.eventbridge.com
Content-Type: application/json; charset=utf-8
{
  "eventBusName":"demo-bus",
  "eventRuleName":"demo-rule",
  "description":"A demo rule.",
  "filterPattern":"{}"
}

  • 创建事件目标

创建一个投递到云上EventBridge的事件目标:

POST /target/createEventTargets HTTP/1.1
Host: demo.eventbridge.com
Content-Type: application/json; charset=utf-8
{
    "eventBusName":"demo-bus",
    "eventRuleName":"demo-rule",
    "eventTargets":[
            {
            "eventTargetName":"eventbridge-target",
            "className":"acs.eventbridge",
                "config":{
                "RegionId":"cn-hangzhou",
                "AliyunEventBus":"rocketmq-eventbridge"
                }
            }
        ]
}

创建一个投递到钉钉机器人推送通知的事件目标:

POST /target/createEventTargets HTTP/1.1
Host: demo.eventbridge.com
Content-Type: application/json; charset=utf-8
{
    "eventBusName":"demo-bus",
    "eventRuleName":"demo-rule",
    "eventTargets":[
        {
            "eventTargetName":"dingtalk-target",
            "className":"acs.dingtalk",
            "config":{
            "WebHook":"https://oapi.dingtalk.com/robot/send?access_token=b43a54b702314415c2acdae97eda1e092528b7a9dddb31510a5b4430be2ef867",
            "SecretKey":"SEC53483bf496b8f9e0b4ab0ab669d422208e6ccfaedfd5120ea6b8426b9ecd47aa",
            "Body":"{\"template\":\"{\\\"text\\\":{\\\"content\\\":\\\"${content}\\\"},\\\"msgtype\\\":\\\"text\\\"}\",\"form\":\"TEMPLATE\",\"value\":\"{\\\"content\\\":\\\"$.data.body\\\"}\"}"
            }
        }
    ]
}

  • 发送事件到EventBus

    最后,我们通过API发送一条事件,并验证Target端是否按预期收到对应的事件。

POST /putEvents HTTP/1.1
Host: demo.eventbridge.com
Content-Type:"application/cloudevents+json; charset=UTF-8"
{
  "specversion" : "1.0",
  "type" : "com.github.pull_request.opened",
  "source" : "https://github.com/cloudevents/spec/pull",
  "subject" : "123",
  "id" : "A234-1234-1234",
  "time" : "2018-04-05T17:31:00Z",
  "datacontenttype" : "application/json",
  "data" : {
    "body":"demo"
  },
  "aliyuneventbusname":"demo-bus"
}

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.coloradmin.cn/o/955839.html

如若内容造成侵权/违法违规/事实不符,请联系多彩编程网进行投诉反馈,一经查实,立即删除!

相关文章

【ES】elasticsearch8.3.3

这里仅实践操作并根据实际问题进行记录笔记。 运行 ES8 我们需要在自己的电脑上安装好 Docker Desktop。接着我们运行如下的命令:出现两个异常,一个是需要使用winpty因为我使用win的docker desktop,另外一个问题是docker启动elasticsearchE…

408考研-数据结构算法-顺序表

数组 如何创建数组 我们以 Java 中创建数组为例,创建语法如下 dataType[] arrName new dataType[size];dataType: 也就是我们数组中元素的数据类型arrName:即数组名size:即数组所能容纳的元素数量new: Java 语言中的关键词 假设我们要创建一个由 10 个元素的数…

基于SSM的在线挂号系统java医院预约管理 jsp源代码Mysql

本项目为前几天收费帮学妹做的一个项目,Java EE JSP项目,在工作环境中基本使用不到,但是很多学校把这个当作编程入门的项目来做,故分享出本项目供初学者参考。 一、项目介绍 基于SSM的在线挂号系统 系统有2权限:前台…

基于SSM的服装商城系统【附源码】

【项目特色】 抽奖功能优惠劵功能物流信息功能 简介 基于SSM的服装商城系统 开发语言:Java数据库:MySQL技术:Spring、Mybaits、SpringMVC工具:IDEA/Ecilpse、Navicat、Maven 前台功能:   注册、登录、退出、商品查询、商品列…

Oracle查询语句中做日期加减运算

在Oracle中,可以使用日期函数来实现日期的加减。 若想在日期上加上一定的天数,可以使用"INTERVAL"关键字。例如,如果要将一个日期加上3天,可以使用以下代码: SELECT SYSDATE INTERVAL 3 DAY FROM DUAL; …

【征稿信息】第四届先进材料和智能制造国际学术会议(ICAMIM2023)

第四届先进材料和智能制造国际学术会议(ICAMIM2023) 2023 4th International Conference on Advanced Materials and Intelligent Manufacturing 2023年广州市“国际学术会议之都”建设项目— 第四届先进材料和智能制造国际学术会议(ICAMIM2023)将于202…

徐涛政治导论:著作串连表格(重点掌握)

考试第一个选择题常考下面的表格(很重要)

使用vue-pdf出现的卡顿,空白,报错,浏览器崩溃解决办法

如果想直接知道解决办法,请翻到最下面 今天,接到了一个新的需求,我们公司的PDF展示卡住了,导致浏览器直接奔溃。我也刚来公司不久,就去看看是怎么发生的,公司前同事用的vue-pdf,刚开始以为是文…

基于鹈鹕算法优化的BP神经网络(预测应用) - 附代码

基于鹈鹕算法优化的BP神经网络(预测应用) - 附代码 文章目录 基于鹈鹕算法优化的BP神经网络(预测应用) - 附代码1.数据介绍2.鹈鹕优化BP神经网络2.1 BP神经网络参数设置2.2 鹈鹕算法应用 4.测试结果:5.Matlab代码 摘要…

QT(8.31)加载资源文件,信号与槽机制

作业: 实现登录界面,设置账号为admin,密码为123456,登陆成功则退出当前界面,切换到其他界面,密码错误或者账号不匹配则清空账号密码输入框中的内容,并输出登录失败,点击取消则退出当…

AIGC爆火,拓世法宝平台上线,打造属于你的专属数字人!

在数字科技的风潮下,短视频已经成为人们日常生活中不可或缺的一部分。中国互联网络信息中心于8月28日发布的第52次《中国互联网络发展状况统计报告》报告显示,截至2023年6月,中国短视频用户已达10.26亿人。在这里面,80后、90后和0…

视觉SLAM与激光SLAM简单对比分析

总述 本文旨在梳理目前较为前沿的SLAM技术,包括激光和视觉,主要从精度和实时性两个方面对算法进系评价。 对于激光SLAM了解不深,后期需要补充相关算法的核心思想与算法框架。有问题请大佬们随时留言,我再改正。 0.1 视觉SLAM算…

vue使用qrcodejs2生成二维码

目录 概要 构建展示的vue组件qrcode.vue 组件的使用 概要 项目中用到需要展示二维码的样式&#xff0c;想到了qrcode 例如&#xff1a; 前提&#xff1a;安装包 npm install qrcodejs2 --save 构建展示的vue组件qrcode.vue <template><div style"width: …

使用spring自带的发布订阅来实现发布订阅

背景 公司的项目以前代码里面有存在使用spring自带发布订阅的代码&#xff0c;因此稍微学习一下如何使用&#xff0c;并了解一下这种实现方式的优缺点。 优点 实现方便&#xff0c;代码方面基本只需要定义消息体和消费者&#xff0c;适用于小型应用程序。不依赖外部中间件&a…

学习高级数据结构:探索平衡树与图的高级算法

文章目录 1. 平衡树&#xff1a;维护数据的平衡与高效性1.1 AVL 树&#xff1a;严格的平衡1.2 红黑树&#xff1a;近似平衡 2. 图的高级算法&#xff1a;建模复杂关系与优化2.1 最小生成树&#xff1a;寻找最优连接方式2.2 拓扑排序&#xff1a;解决依赖关系 拓展思考 &#x1…

el-select下拉多选框 el-select 设置默认值不可删除功能

Element3.0vue3.0 el-select下拉多选框 el-select 设置默认值不可删除功能 Element-UI是一款广泛使用的Vue.js组件库&#xff0c;其中El-Select下拉多选框组件在实际项目开发中经常被使用。然而&#xff0c;在Element 3.0版本中&#xff0c;El-Select下拉多选框默认值可被删除&…

降本56%!纵腾集团搭载OceanBase Cloud开启降本增效新篇章

近日&#xff0c;跨境电商物流领跑企业福建纵腾网络有限公司&#xff08;以下简称“纵腾集团”&#xff09;正式商用原生分布式数据库 OceanBase&#xff0c;为其下专业物流服务“云途物流”提供云数据库支撑服务。目前&#xff0c;已有两大关键业务系统全部接入 OceanBase Clo…

基于JAVAEE技术的ssm校园车辆管理系统源码和论文

基于JAVAEE技术的ssm校园车辆管理系统源码和论文105 开发工具&#xff1a;idea 数据库mysql5.7 数据库链接工具&#xff1a;navcat,小海豚等 技术&#xff1a;ssm 1.选题背景和意义 背景&#xff1a; 随着第二次工业革命后&#xff0c;内燃机的发明与完善&#xff0c;解…

Charles信任证书后依然无法抓包的解决方案

前提 1、Charles安装证书 2、Charles设置SSL代理 3、查看Android安装Charles证书的方法 4、查看Android安装的Charles证书 问题 Charles拦截时&#xff0c;报“SSL handshake with client failed: An unknown issue occurred processing the certificate (certificate_unknow…

Ubuntu20.04安装ROS

Ubuntu20.04安装ROS Excerpt ubuntu安装方式有两种&#xff0c;一种是安装ubuntu系统&#xff0c;另一种是在windows下安装虚拟机&#xff0c;在虚拟机里安装ubuntu。下面为双系统安装ubuntu&#xff08;用虚拟机装ubuntu会很卡&#xff0c;bug很多&#xff0c;除非电脑配置极好…