Springcloud-消息总线-Bus

news2024/12/24 21:11:47

1.消息总线在微服务中的应用

BUS- 消息总线-将消息变更发送给所有的服务节点。
在微服务架构的系统中,通常我们会使用消息代理来构建一个Topic,让所有
服务节点监听这个主题,当生产者向topic中发送变更时,这个主题产生的消息会被
所有实例消费,这就是消息总线的工作模式。也是我们熟悉的发布-订阅模型。
其实广义的消息总线不单指这种“发布-订阅”模型!也可以代指分布式服务间进行通信,
消息分发的单播模式,甚至有的公司既不使用HTTP也不用RPC来构建微服务。完全靠消息
总线来做服务调用。比如,银行老系统采用总线型架构,在不同服务结点之间做消息分发

SpringCloud中的Bus职责范围就相对小了很多,因为还有一个Stream组件代理了大部分的消息中间件通信服务,因此BUS在实际应用中大多是为了应对“消息广播”的场景。比如和config异同搭配使用推送配置信息。

总线式架构的完整流程
在这里插入图片描述
我们要关注一下白底红框那三个和BUS有关系的步骤
MQ/KAFKA:BUS是一个调用封装,它的背后还是需要依赖消息中间件来完成底层的消息
分发,实际项目中最常用的两个中间件分别是RabbitMQ和kafka。
BUS:作为对接上游应用和下游中间件系统的中间层,当接到刷新请求的时候,通知底层中间件向所有服务结点推送消息。
Refresh:类比config-center中可以通过actuator的Refresh请求刷新配置,那么对于总线式架构的ReFresh请求来说,有两个需要解决的问题:
谁来发起变更,服务结点还是由ConfigServer发起变更请求?
何时发起变更-时手工发起变更?还是每次Github改动完成后自动推送?

2.BUS简介

BUS实现:
加入我们所有的节点都订阅了topic(消息组件这个属性刷新这个topic)当你的属性发生变动的时候,只要发送一个广播消息,所有的节点都会消费消息,并且触发刷新动作。

BUS的标签:
BUS只是对消息进行了简单的封装,底层是依赖Stream(专业用来与消息中间件进行通信的组件)来广播消息。

在这里插入图片描述
BUS的两个场景:
配置变更通知;自定义消息广播;

3.BUS体系结构解析

BUS的三个角色:
消息的发布者,是一个中间件;
事件监听者,监听事件动态,各个监听消息的服务节点;
事件主体,配置变更就是事件

事件的架构:
在BUS配置刷新的事件类是RefreshRemoteApplicationEvent。在 BUS的规范下,所有事件都包含三个维度的信息:
**source:**这是一个必填信息,它可以是一个自定义并且能够被序列化反序列化的pojo对象,它包含了一个事件想要传达的信息;
Original Service 消息来源方,通常是事件发布方的机器ID,或者AppId等;
Destination Service 目标机器,Bus会根据Destination Service指定的过滤条件(比如服务名,端口等),只让指定的监听者响应事件;

消息发布者
我们所有的“事件”都是通过Bus来发布的,Bus默认提供了两个Endpoint作为消息发布者:
bus-env:在本地发布EnvironmentChangeRemoteApplicationEvent事件,表示一个远程环境变更事件。进一步查看这个事件的内容,我们发现其中包含了一个Map<String, String>属性,事件监听者接收到这个事件之后,会将事件中的Map添加到Spring环境变量中(由Spring Cloud的EnvironmentManager负责具体处理),从而达到修改环境变量的目的
bus-refresh:发布RefreshRemoteApplicationEvent事件,表示一个远程配置刷新事件,这个事件会触发@RefreshScope注解所修饰的Java类中属性的刷新(@RefreshScope修饰的类可以在运行期更改属性)
以上两个ENDpoint就是BUS通过、actuator服务对外提供出来的

消息监听者:
BUS中默认创建了两个消息监听器,分别对应上面两个消息发布的Endpoints。
在这里插入图片描述
在spring-cloud-context这个依赖中定义了大量的事件。

4.Bus的接入方式RabbitMQ & Kafka

Spring的组件一向是以一种插件式的方式提供功能,将组件自身和我们项目中的业务代码隔离,使得我们更换组件的成本可以降到最低。Spring Cloud Bus也不例外,它的底层消息组件非常容易替换,替换过程不需要对业务代码引入任何变更。Bus就像一道隔离了底层消息组件和业务应用的中间层,比如我们从RabbitMQ切换为Kafka的时候,只需要做两件事就好了:
在项目pom中替换依赖组件;
更改配置文件里的连接信息。

RabbitMQ和Kafka两种消息组件如何接入Bus
接入RabbitMQ
RabbitMQ是实现了AMQP(Advanced Message Queue Protocal)的开源消息代理软件,也是平时项目中应用最广泛的消息分发组件之一。
接入RabbitMQ的方式很简单,我们只要在项目中引入以下依赖:

org.springframework.cloud
spring-cloud-starter-bus-amqp

点进去发现,它还依赖于spring-cloud-starter-stream-rabbit。
也就是说stream组件是被真正用来发送广播消息到RabbitMQ,
BUS只是帮我们封装了整个消息的发布和监听动作!
项目所需要的具体的配置:
spring:
rabbitmq:
host: localhost
port: 5672
username: guest
password: guest

接入Kafka;
要使用kafka来实现消息代理,只需要把上一步中引入spring-cloud-starter-bus-amqp
依赖替换成spring-cloud-starter-bus-kafka依赖

org.springframework.cloud
spring-cloud-starter-bus-kafka

如果大家的Kafka和ZooKeeper都运行在本地,并且采用了默认配置,那么不需要做任何额外的配置,就可以直接使用。但是在生产环境中往往Kafka和ZooKeeper会部署在不同的环境,所以就需要做一些额外配置:
spring.cloud.stream.kafka.binder.brokers Kafka服务节点(默认localhost)
spring.cloud.stream.kafka.binder.defaultBrokerPort Kafka端口(默认9092)
spring.cloud.stream.kafka.binder.zkNodes ZooKeeper服务节点(默认localhost)
zspring.cloud.stream.kafka.binder.defaultZkPort ZooKeeper端口(默认2181)

5.部分关键源码:

内置事件的架构RefreshRemoteApplicationEvent
刷新事件的发送端-RefreshBusEndpoint

开端:RefreshRemoteApplicationEvent

public class RefreshRemoteApplicationEvent extends RemoteApplicationEvent {

   @SuppressWarnings("unused")
   private RefreshRemoteApplicationEvent() {
      // for serializers
   }

   public RefreshRemoteApplicationEvent(Object source, String originService,
         String destinationService) {
      super(source, originService, destinationService);
   }

查看find usage:有两个大类:RefreshBusEndpoint以及RefreshListener类。
一个是起点RefreshBusEndpoint,一个是终点RefreshListener。
关注起点:RefreshBusEndpoint

@Endpoint(id = "bus-refresh") // TODO: document new id
public class RefreshBusEndpoint extends AbstractBusEndpoint {

   public RefreshBusEndpoint(ApplicationEventPublisher context, String id) {
      super(context, id);
   }

   @WriteOperation
   public void busRefreshWithDestination(@Selector String destination) { // TODO:
                                                         // document
                                                         // destination
      publish(new RefreshRemoteApplicationEvent(this, getInstanceId(), destination));
   }

   @WriteOperation
   public void busRefresh() {
      publish(new RefreshRemoteApplicationEvent(this, getInstanceId(), null));
   }

}

关注到主类的super方法,就是到了RemoteApplicationEvent类

protected RemoteApplicationEvent(Object source, String originService,
      String destinationService) {
   super(source);
   this.originService = originService;
   if (destinationService == null) {
      destinationService = "**";
   }
   // If the destinationService is not already a wildcard, match everything that
   // follows
   // if there at most two path elements, and last element is not a global wildcard
   // already
   if (!"**".equals(destinationService)) {
      if (StringUtils.countOccurrencesOf(destinationService, ":") <= 1
            && !StringUtils.endsWithIgnoreCase(destinationService, ":**")) {
         // All instances of the destination unless specifically requested
         destinationService = destinationService + ":**";
      }
   }
   this.destinationService = destinationService;
   this.id = UUID.randomUUID().toString();
}

本人进行测试的接口是:
测试的接口是:localhost:60002/actuator/bus-refresh
在这里插入图片描述

研究了发现对于RemoteApplicationEvent就是确定destination!

在RefreshBusEndpoint中,将contex存放在ApplicationEventPublisher里。
这就是ApplicationEventPublisher,用来发布上下文消息的!

接下来到了AbstractApplicationContext中

protected void publishEvent(Object event, @Nullable ResolvableType eventType) {
   Assert.notNull(event, "Event must not be null");

   // Decorate event as an ApplicationEvent if necessary
   ApplicationEvent applicationEvent;
   if (event instanceof ApplicationEvent) {
      applicationEvent = (ApplicationEvent) event;
   }
   else {
      applicationEvent = new PayloadApplicationEvent<>(this, event);
      if (eventType == null) {
         eventType = ((PayloadApplicationEvent<?>) applicationEvent).getResolvableType();
      }
   }

   // Multicast right now if possible - or lazily once the multicaster is initialized
   if (this.earlyApplicationEvents != null) {
      this.earlyApplicationEvents.add(applicationEvent);
   }
   else {
      getApplicationEventMulticaster().multicastEvent(applicationEvent, eventType);
   }

   // Publish event via parent context as well...
   if (this.parent != null) {
      if (this.parent instanceof AbstractApplicationContext) {
         ((AbstractApplicationContext) this.parent).publishEvent(event, eventType);
      }
      else {
         this.parent.publishEvent(event);
      }
   }
}

整个过程是事件驱动,编程解耦!

6.如何实现自动推送?Git WebHook

问题:由谁来发起状态的变更请求?
如何通过GitHub的Webhook机制实现自动推送!

Webhook?Git的一种机制,可以用于自动化的构建。
当每次提交代码到Git以后,会触发Webhook执行一段程序,来完成预定义的操作。比如说让钩子通知CI/CD系统从Github拉取最新代码开始执行构建过程或者执行其他操作!

Webhook三步走:
设置encrypt.key;
将上一步中的key添加到Github仓库设置中;
设置Webhook url;
设置encrypt.key,类似属性加解密方式,只需要在application.yml中设置一个key就好!
encrypt:
key: yourKey

自动推送需要注意的问题
无法测试:改动只要一提交就被推送到所有机器,假如不小心修改错了属性,那所有服务器就要团灭了
定点推送:尽管Bus支持在URL中添加目标范围,定向推送到指定机器,但毕竟URL在Webhook里面是写死的,不方便我们根据实际情况做定点推送

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.coloradmin.cn/o/1878121.html

如若内容造成侵权/违法违规/事实不符,请联系多彩编程网进行投诉反馈,一经查实,立即删除!

相关文章

【产品经理】订单处理9-台账库存管理

在订单处理过程中&#xff0c;台账库存的具体设计怎么做&#xff1f; 在订单处理过程中&#xff0c;分配仓库成功后要扣除仓库库存并计算商品缺货情况&#xff0c;仓库库存就是台账库存。 1&#xff0c;台账库存是针对某个仓库的库存&#xff0c;且台账库存只计算此商品SKU的库…

随州职业技术学院2024年成人高等继续教育招生简章

随州职业技术学院&#xff0c;这所历史悠久、声誉卓著的学府&#xff0c;如今正以其独特的魅力与实力&#xff0c;向广大成人学习者敞开怀抱&#xff0c;宣布启动2024年成人高等继续教育的招生工作。 在这片知识的沃土上&#xff0c;学院以其严谨的教学态度&#xff0c;为无数…

动手学深度学习(Pytorch版)代码实践 -卷积神经网络-20填充与步幅

20填充与步幅 import torch from torch import nn# 此函数初始化卷积层权重&#xff0c;并对输入和输出提高和缩减相应的维数 def comp_conv2d(conv2d, X):# 这里的&#xff08;1&#xff0c;1&#xff09;表示批量大小和通道数都是1#将输入张量 X 的形状调整为 (1, 1, height,…

最佳学习率和Batch Size缩放中的激增现象

前言 《Surge Phenomenon in Optimal Learning Rate and Batch Size Scaling》原文地址GitHub项目地址Some-Paper-CN。本项目是译者在学习长时间序列预测、CV、NLP和机器学习过程中精读的一些论文&#xff0c;并对其进行了中文翻译。还有部分最佳示例教程。如果有帮助到大家&a…

llm学习-1(包含如何使用github的codespace):

本文学习参考&#xff1a;datawhalechina/llm-universe: 本项目是一个面向小白开发者的大模型应用开发教程&#xff0c;在线阅读地址&#xff1a;https://datawhalechina.github.io/llm-universe/ 一些可使用的大模型地址&#xff1a; Claude 使用地址 PaLM 官方地址 Gemini…

24年hvv前夕,微步也要收费了,情报共享会在今年结束么?

一个人走的很快&#xff0c;但一群人才能走的更远。吉祥同学学安全https://mp.weixin.qq.com/s?__bizMzkwNjY1Mzc0Nw&mid2247483727&idx1&sndb05d8c1115a4539716eddd9fde4e5c9&scene21#wechat_redirect这个星球&#x1f517;里面已经沉淀了&#xff1a; 《Ja…

机械拆装-基于Unity-总体设计

前言 在工业设计和制造领域&#xff0c;零部件的拆装技术是一个重要的应用场景&#xff0c;比如我们在工程训练课程中经历的摩托车发动机拆装课程&#xff0c;是机械类学生的必修课程。虚拟拆装系统模拟和仿真了模型的拆装过程&#xff0c;虽然SolidWorks等机械设计软件能够解决…

apipost的安装和测试添加接口能否正常使用

1.进入官网&#xff0c;点击免费使用&#xff08;我是windows 64位&#xff0c;选合适自己的配置&#xff09; 2.开始安装 选仅为我安装——下一步 选择自己的安装目录——点安装 等待 运行——完成 3.apipost一些基本操作——实现添加内容 &#xff08;1&#xff09;新建接口…

【05】从0到1构建AI生成思维导图应用 -- 前端交互实现

【05】从0到1构建AI生成思维导图应用 – 前端交互实现 大家好&#xff01;最近自己做了一个完全免费的AI生成思维导图的网站&#xff0c;支持下载&#xff0c;编辑和对接微信公众号&#xff0c;可以在这里体验&#xff1a;https://lt2mind.zeabur.app/ 上一章&#xff1a;http…

5_Clark变换Simulink仿真详细步骤

一、Clark变换的计算过程 根据投影定理: Iα由Ia、Ib、Ic共同投影决定&#xff0c;根据几何原理&#xff0c;IαIa-cos(60&#xff09;*Ib-cos(60&#xff09;*Ic,即是IαIa-0.5*Ib-0.5*Ic Iβ由Ib、Ic共同投影决定&#xff0c;根据几何原理&#xff0c;Iβsin&#xff08;60&a…

下一代的JDK - GraalVM

GraalVM是最近几年Java相关的新技术领域不多的亮点之一&#xff0c; 被称之为革命性的下一代JDK&#xff0c;那么它究竟有什么神奇之处&#xff0c;又为当前的Java开发带来了一些什么样的改变呢&#xff0c;让我们来详细了解下 下一代的JDK 官网对GraalVM的介绍是 “GraalVM 是…

【Python机器学习】模型评估与改进——交叉验证

交叉验证是一种评估泛化性能的统计学方法&#xff0c;它比单次划分训练集和测试集的方法更稳定、前面。在交叉验证中&#xff0c;数据被多次划分&#xff0c;并且需要训练多个模型。最常用的交叉验证是k折交叉验证&#xff0c;其中k是由用户指定的数字&#xff0c;通常取5或10,…

MySQL高级-InnoDB引擎-事务日志- redo log(事务持久性的保证)

文章目录 1、redo log1.1、重做日志缓冲&#xff08;redo log buffer&#xff09;1.2、重做日志文件&#xff08;redo log file&#xff09; 2、如果没有redo log&#xff0c;可能会存在什么问题的&#xff1f;2.2、我们一起来分析一下。 2.2、那么&#xff0c;如何解决上述的问…

240629_昇思学习打卡-Day11-Vision Transformer中的self-Attention

240629_昇思学习打卡-Day11-Transformer中的self-Attention 根据昇思课程顺序来看呢&#xff0c;今儿应该看Vision Transformer图像分类这里了&#xff0c;但是大概看了一下官方api&#xff0c;发现我还是太笨了&#xff0c;看不太明白。正巧昨天学SSD的时候不是参考了太阳花的…

Databend db-archiver 数据归档压测报告

Databend db-archiver 数据归档压测报告 背景准备工作Create target databend table启动 small warehouse准备北京区阿里云 ECSdb-archiver 的配置文件准备一亿条源表数据开始压测 背景 本次压测目标为使用 db-archiver 从 MySQL 归档数据到 Databend Cloud&#xff0c; 归档的…

qt 开发笔记 动态链接库应用

1.概要 1.1 需求 库有两种&#xff0c;动态库和静态库&#xff0c;这里说的是动态库&#xff1b;动态库的加载方式有两种&#xff0c;一直是静态的一种是动态的&#xff0c;这里的静态加载是指静态加载动态&#xff0c;是一种加载动态库的方式。也有一种动态加载的方式&#…

衣服、帽子、鞋子相关深度学习数据集大合集(1)

最近收集了一大波关于衣物深度学习数据集&#xff0c;主要有衣服、帽子、鞋子、短裤、短袖、T恤等。 1、运动裤、短裤图片数据集 数据格式&#xff1a;图片 是否标注&#xff1a;已标注 标注格式&#xff1a;yolov8 图片数量&#xff1a;915张 查看地址&#xff1a;https…

# Sharding-JDBC从入门到精通(2)- Sharding-JDBC 介绍

Sharding-JDBC从入门到精通&#xff08;2&#xff09;- Sharding-JDBC 介绍 一、概述-分库分表所带来的问题 1、分库分表带来的问题 分库分表能有效的缓解了单机和单库带来的性能瓶颈和压力&#xff0c;突破网络 IO、硬件资源、连接数的瓶颈&#xff0c;同时也带来了一些问题…

容器进程

一、容器进程和宿主机进程的关系 容器在进程空间上和宿主机是隔离的&#xff0c;每创建一个容器&#xff0c;该容器都有一个独属的进程空间简称PID NameSpace。但是容器本质也是一个进程&#xff0c;自然是由其父进程创建的&#xff0c;这个可以使用ps aux命令验证。 | 容器视…

Thinger.io 支持多协议、插件化100%开源 IoT 企业级物联网平台

项目源码&#xff0c;文末联系小编 Thinger.io 是一个开源插件化物联网平台&#xff0c;提供了设备原型、扩展和设备连接管理所需的一切工具。我们的目标是使物联网的使用民主化&#xff0c;使其可供全世界使用&#xff0c;并简化大型物联网项目的开发。 01 Thinger.io 物联网平…