第十一章 Stream消息驱动

news2025/1/10 0:58:27

Stream消息驱动

gitee:springcloud_study: springcloud:服务集群、注册中心、配置中心(热更新)、服务网关(校验、路由、负载均衡)、分布式缓存、分布式搜索、消息队列(异步通信)、数据库集群、分布式日志、系统监控链路追踪。

1. 消息驱动概述

作用:屏蔽底层消息中间件的差异,降低切换成本,统—消息的编程模型。底层不管是什么中间件如kafka、rabbitmq,Stream可以解决不同中间件的通信。 官网:Spring Cloud Stream

官方定义Spring Cloud Stream是一个构建消息驱动微服务的框架。
​
应用程序通过 inputs 或者 outputsj来与Spring Cloud Stream中binder对象交互。通过我们配置来binding(绑定),而Spring Cloud Stream的 binder对象负责与消息中间件交互。所以,我们只需要搞清楚如何与Spring Cloud Stream交互就可以方便使用消息驱动的方式。
​
通过使用Spring Integration来连接消息代理中间件以实现消息事件驱动。
Spring Cloud Stream为一些供应商的消息中间件产品提供了个性化的自动化配置实现,引用了发布-订阅、消费组、分区的三个核心概念。
​
但是Stream只支持kafka、rabbitmq。

img

设计思想 标准的MQ:

1.生产者/消费者之间靠消息媒介传递信息内容:Message
2.消息必须走特定的通道:消息通道MessageChannel
3.消息通道里的消息如何被消费呢,谁负责收发处理:消息通道MessageChannel的子接口SubscribableChannel,由MessageHandler消息处理器所订阅

Cloud Stream:

Stream利用Binder来绑定中间件的输入流和输出流。如果系统使用到了两个中间件(kafka、rabbitmq):这些中间件的差异性导致我们实际项目开发给我们造成了一定的困扰,我们如果用了两个消息队列的其中一种,后面的业务需求,我想往另外一种消息队列进行迁移,这时候无疑就是一个灾难性的人—大堆东西都要重新推倒重新做,因为它跟我们的系统耦合了,这时候springcloud Stream给我们提供了一种解耦合的方式。

Stream中的消息通信方式遵循了发布-订阅模式:

Topic在Rabbitmq中是Exchange、在kafka中是Topic。

Spring Cloud Stream标准流程套路

img

Middleware:中间件,目前只支持RabbitMQ和Kafka
Binder:是应用与消息中间件之间的封装,目前实行了Kafka和RabbitMQ的Binder,通过Binder可以很方便的连接中间件,可以动态的改变消息类型(对应于Kafka的topic,RabbitMQ的exchange),这些都可以通过配置文件来实现。
@Input:注解标识输入通道,通过该输入通道接收到的消息进入应用程序
@Output:注解标识输出通道,发布的消息将通过该通道离开应用程序
@StreamListener:监听队列。用于消费者的队列的消息接收
@EnableBinding:指信道channel和exchange绑定在一起

Binder:很方便的连接中间件,屏蔽差异 Channel:通道,是队列Queue的一种抽象,在消息通讯系统中就是实现存储和转发的媒介,通过Channel对队列进行配置。 Source和Sink:简单的可理解为参照对象是Spring Cloud Stream自身,从Stream发布消息就是输出,接受消息就是输入。

2. 消息驱动之生产者

创建cloud-stream-rabbitmq-provider8801:作为生产者进行发消息模块

  1. pom文件

<dependency>
    <groupId>org.springframework.cloud</groupId>
    <artifactId>spring-cloud-starter-stream-rabbit</artifactId>
</dependency>
  1. application.yaml

server:
  port: 8801
spring:
  application:
    name: cloud-stream-provider
  cloud:
    stream:
      binders: #在此处配置要绑定的rabbitmq的服务信息;
        defaultRabbit:
          type: rabbit
          environment:
            spring:
              rabbitmq:
                host: 192.168.25.153
                port: 5672
                username: admin
                password: aaaaaa
      bindings: #服务的整合处理
        output: #
          destination: studyExchange  #表示要使用的Exchange名称定义
          content-type: application/json #设置消息类型,本次为json,文本则设置“text/plain”
          binder: defaultRabbit #设置要绑定的消.息服务的具体设置
eureka:
  client:
    service-url:
      defaultZone: http://eureka7001.com:7001/eureka,http://eureka7002.com:7002/eureka
    register-with-eureka: true
    fetch-registry: true
  instance:
    lease-renewal-interval-in-seconds: 2
    lease-expiration-duration-in-seconds: 5
    instance-id: send-8801.com
    prefer-ip-address: true
  1. 主启动类

@SpringBootApplication
@EnableEurekaClient
public class StreamMQMain8801 {
    public static void main(String[] args) {
        SpringApplication.run(StreamMQMain8801.class,args);
    }
}
  1. service层

public interface IMessageProvider {
    String send();
}
@EnableBinding(Source.class)  //定义消息的推送管道
public class MessageProviderImpl implements IMessageProvider {
​
    @Resource
    private MessageChannel output;  //消息发送管道
​
    @Override
    public String send() {
        String serial = UUID.randomUUID().toString();
        output.send(MessageBuilder.withPayload(serial).build());
        System.out.println("********serial:"+serial);
        return null;
    }
}
  1. controller层

@RestController
public class SendMessageController {
​
    @Resource
    private IMessageProvider messageProvider;
​
    @GetMapping(value = "/sendMessage")
    public String sendMessage(){
        return messageProvider.send();
    }
}

测试:

3. 消息驱动之消费者

创建cloud-stream-rabbitmq-consumer8802,作为消息接收模块

  1. pom文件

<dependency>
    <groupId>org.springframework.cloud</groupId>
    <artifactId>spring-cloud-starter-stream-rabbit</artifactId>
</dependency>
  1. application.yml

server:
  port: 8802
spring:
  application:
    name: cloud-stream-consumer
  cloud:
    stream:
      binders: #在此处配置要绑定的rabbitmq的服务信息;
        defaultRabbit:
          type: rabbit
          environment:
            spring:
              rabbitmq:
                host: 192.168.25.153
                port: 5672
                username: admin
                password: aaaaaa
      bindings: #服务的整合处理
        input: #
          destination: studyExchange  #表示要使用的Exchange名称定义
          content-type: application/json #设置消息类型,本次为json,文本则设置“text/plain”
          binder: defaultRabbit #设置要绑定的消.息服务的具体设置
eureka:
  client:
    service-url:
      defaultZone: http://eureka7001.com:7001/eureka,http://eureka7002.com:7002/eureka
    register-with-eureka: true
    fetch-registry: true
  instance:
    lease-renewal-interval-in-seconds: 2
    lease-expiration-duration-in-seconds: 5
    instance-id: receive-8802.com
    prefer-ip-address: true
  1. controller层

@Component
@EnableBinding(Sink.class)
public class ReceiveMessageController {
​
    @Value("${server.port}")
    private String serverPort;
​
    @StreamListener(Sink.INPUT)
    public void input(Message<String> message){
        System.out.println("消费者1号,------>接收到的消息:"+message.getPayload()+"\t port:"+serverPort);
    }
}
  1. 主启动类

@SpringBootApplication
@EnableEurekaClient
public class ConsumerMQMain8802 {
    public static void main(String[] args) {
        SpringApplication.run(ConsumerMQMain8802.class,args);
    }
}

测试:

启动loccalhost:8801/sendMessage就可以了,消费者就是一个监听器,有message就消费。

4. 分组消费与持久化

根据cloud-stream-rabbitmq-consumer8802创建8803项目,运行暴露问题:


消息重复消费和消息持久化问题,需要进行分组操作。注意在Stream中处于同一个group中的多个消费者是竞争关系,就能够保证消息只会被其中一个应用消费一次。不同组是可以全面消费的(重复消费)。

解决重复消费方法:加入同一个组(下图是不同分组的情况)

cloud-stream-rabbitmq-consumer8802和8803设置不同分组yicaiA/B

server:
  port: 8803
spring:
  application:
    name: cloud-stream-consumer
  cloud:
    stream:
      binders: #在此处配置要绑定的rabbitmq的服务信息;
        defaultRabbit:
          type: rabbit
          environment:
            spring:
              rabbitmq:
                host: 192.168.25.153
                port: 5672
                username: admin
                password: aaaaaa
      bindings: #服务的整合处理
        input: #
          destination: studyExchange  #表示要使用的Exchange名称定义
          content-type: application/json #设置消息类型,本次为json,文本则设置“text/plain”
          binder: defaultRabbit #设置要绑定的消.息服务的具体设置
          group: yicaiB
server:
  port: 8802
spring:
  application:
    name: cloud-stream-consumer
  cloud:
    stream:
      binders: #在此处配置要绑定的rabbitmq的服务信息;
        defaultRabbit:
          type: rabbit
          environment:
            spring:
              rabbitmq:
                host: 192.168.25.153
                port: 5672
                username: admin
                password: aaaaaa
      bindings: #服务的整合处理
        input: #
          destination: studyExchange  #表示要使用的Exchange名称定义
          content-type: application/json #设置消息类型,本次为json,文本则设置“text/plain”
          binder: defaultRabbit #设置要绑定的消.息服务的具体设置
          group: yicaiA

cloud-stream-rabbitmq-consumer8802和8803设置同一个组yicaiA

server:
  port: 8802
spring:
  application:
    name: cloud-stream-consumer
  cloud:
    stream:
      binders: #在此处配置要绑定的rabbitmq的服务信息;
        defaultRabbit:
          type: rabbit
          environment:
            spring:
              rabbitmq:
                host: 192.168.25.153
                port: 5672
                username: admin
                password: aaaaaa
      bindings: #服务的整合处理
        input: #
          destination: studyExchange  #表示要使用的Exchange名称定义
          content-type: application/json #设置消息类型,本次为json,文本则设置“text/plain”
          binder: defaultRabbit #设置要绑定的消.息服务的具体设置
          group: yicaiA

测试:

持久化 加上group就算实现类持久化。所谓的持久化就是如果没有分组,一个服务发送消息,其他服务由于没有分组,如果其他哪些服务断开,又继续重启,这样就会导致以前那些消息丢失。

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.coloradmin.cn/o/1341893.html

如若内容造成侵权/违法违规/事实不符,请联系多彩编程网进行投诉反馈,一经查实,立即删除!

相关文章

最优化方法Python计算:无约束优化应用——逻辑回归模型

S型函数 sigmoid ( x ) 1 1 e − x \text{sigmoid}(x)\frac{1}{1e^{-x}} sigmoid(x)1e−x1​将全体实数 R \text{R} R映射到 ( 0 , 1 ) (0,1) (0,1)&#xff0c;称为逻辑函数。其图像为 该函数连续、有界、单调、可微&#xff0c;性质量好。拟合函数为 F ( w ; x ) sigmoi…

localsec.dll缺少怎么办?localsec.dll文件下载,一键修复

看到很多小伙伴都在问&#xff0c;自己的电脑&#xff0c;在启动游戏或某些软件的时候&#xff0c;Windows会弹出系统错误提示框&#xff0c;称一个名为“localsec.dll的文件丢失了&#xff0c;造成错误&#xff0c;可尝试重新安装程序&#xff0c;以解决问题”。 根据错误提示…

如何使用Redis实现抢红包功能

【内容摘要】 在这篇文章中&#xff0c;我们将探讨如何使用Redis来设计和实现一个抢红包的业务场景。从业务场景、需求分析、技术选型、代码实现&#xff0c;痛点问题等进行多维分析和思考。 【业务场景】 下面引入一个实际的使用案例&#xff0c;如微信群中常用的红包功能。…

计算机网络复习2

物理层 文章目录 物理层通讯基础奈奎斯特定理香农定理编码与调制交换传输介质&#xff08;了解&#xff09;物理层设备 通讯基础 数据信号码元信源信道信宿单工通道&#xff1a;只有一个方向半双工通道&#xff1a;不能同时发送和接收全双工通道 奈奎斯特定理 规定&#xff…

裕泰微YT8521SH PHY芯片在uboot下的代码适配(二)

裕泰微YT8521SH PHY芯片在uboot下的代码适配&#xff08;一&#xff09; 文章目录 代码搜索移植步骤具体代码备注 本文主要是介绍uboot下的适配代码来源和具体修改。 代码搜索 https://github.com/starfive-tech/u-boot/blob/JH7110_VisionFive2_devel/drivers/net/phy/motorc…

Linux中的gcc\g++使用

文章目录 gcc\g的使用预处理编译汇编链接函数库gcc选项 gcc\g的使用 这里我们需要知道gcc和g实际上是对应的c语言和c编译器&#xff0c;而其他的Java&#xff08;半解释型&#xff09;&#xff0c;PHP&#xff0c;Python等语言实际上是解释型语言&#xff0c;因此我们经常能听…

Ps:混合颜色带 - 基础篇

混合颜色带 Blend If是“图层样式”对话框中的一个高级功能&#xff0c;允许根据下方图层或当前图层的色彩信息来混合图层&#xff0c;无需进行复杂的选区或蒙版操作。 混合颜色带是基于亮度&#xff08;灰色&#xff09;或颜色通道的特定范围来显示或隐藏图层的特定区域。 “当…

Android : 使用GestureDetector 进行手势识别—简单应用

示例图&#xff1a; GestureDetector 介绍&#xff1a; GestureDetector 是 Android 开发中用于识别和处理手势的一个类。它允许开发者检测用户在触摸屏上的各种手势&#xff0c;如滑动、长按、双击等。通过使用 GestureDetector&#xff0c;您可以轻松地为应用程序添加手势识…

GLTF 编辑器实现逼真3D动物毛发效果

在线工具推荐&#xff1a; 3D数字孪生场景编辑器 - GLTF/GLB材质纹理编辑器 - 3D模型在线转换 - Three.js AI自动纹理开发包 - YOLO 虚幻合成数据生成器 - 三维模型预览图生成器 - 3D模型语义搜索引擎 要实现逼真的3D动物毛发效果&#xff0c;可以采用以下技术和方法&…

AI数字人克隆系统源代码克隆系统开发--本地源码部署

随着人工智能技术的不断发展&#xff0c;AI数字人克隆系统逐渐成为现实。这一系统通过克隆人的外貌和行为模式&#xff0c;可以创建具有自我认知、学习和情感的数字化人类。而为了更好地开发AI数字人克隆系统&#xff0c;本地源码部署是一项关键步骤。 在开始介绍本地源码部署…

低功耗蓝牙模块:促进智慧城市发展的关键技术

在科技快速发展的时代&#xff0c;智慧城市的概念正引领着城市管理的革新。为实现城市更高效、可持续和智能化的管理&#xff0c;低功耗蓝牙模块成为推动智慧城市发展的关键技术之一。本文将探讨低功耗蓝牙模块在智慧城市中的作用&#xff0c;以及其在城市基础设施、公共服务等…

6、LLaVA

简介 LLaVA官网 LLaVA使用Vicuna(LLaMA-2)作为LLM f ϕ ( ⋅ ) f_\phi() fϕ​(⋅)&#xff0c;使用预训练的CLIP图像编码器 ViT-L/14 g ( X v ) g(X_v) g(Xv​)。 输入图像 X v X_v Xv​&#xff0c;首先获取feature Z v g ( X v ) Z_vg(X_v) Zv​g(Xv​)。考虑到最后一…

ROS-ID®活性氧/活性氮检测试剂盒

自由基和其他活性种在许多心理和病理生理过程中发挥着重要作用。自由基一旦在细胞内产生&#xff0c;就会破坏多种细胞成分&#xff0c;包括蛋白质、脂质和DNA。然而&#xff0c;在较低浓度下&#xff0c;自由基可以作为细胞信号传导中的第二信使。 Enzo Life Sciences的ROS-I…

svn外网打不开url地址怎么解决

在家或者出差在外经常会有连接到公司内部SVN服务器的需求&#xff0c; 但是 svn外网打不开url地址怎么解决&#xff1f;如何才能连接到公司内部SVN服务器&#xff1f;今天小编教你一招&#xff0c;在本地SVN服务的内网IP端口用快解析软件添加映射&#xff0c;一步就可以提供让公…

PC9095高性能可调限流OVP过压过流保护 软启动 抗浪涌 集成功率FET开关

特点 •输入电压范围&#xff1a; •PC9095A、PC9095KA:2.5伏~13.5伏 •PC9095B&#xff0c;PC9095KB:2.5伏~10伏 •PC9095C&#xff0c;PC9095KC:2.5伏~5.5伏 •28V绝对最大额定电压VOUT •带外部电阻器的可调限流器 •集成功率FET开关&#xff0c;53mΩRds&#xff08…

【Vue2+3入门到实战】(14)路由入门之单页应用程序、路由 、 VueRouter的基本使用 详细示例

目录 一、学习目标1.路由入门 二、单页应用程序介绍1.概念2.具体示例3.单页应用 VS 多页面应用4.总结 三、路由介绍1.思考2.路由的介绍3.总结 四、路由的基本使用1.目标2.作用3.说明4.官网5.VueRouter的使用&#xff08;52&#xff09;6.代码示例7.两个核心步骤8.总结 五、组件…

关于MybatisPlus自动转化驼峰命名规则配置mapUnderscoreToCamelCase的个人测试和总结

关于MybatisPlus自动转化驼峰命名规则配置mapUnderscoreToCamelCase的个人测试和总结 测试一&#xff1a;没有添加 自动转化的配置&#xff0c;且domain中的属性名称和数据库的字段名称一致测试二&#xff1a;没有添加自动转化配置i&#xff0c;domain属性名userPassword和数据…

泽攸科技PECVD设备助力开发新型石墨烯生物传感器

近日&#xff0c;松山湖材料实验室许智团队与清华大学符汪洋合作在纳米领域头部期刊《Small》上发表了一项引人注目的研究成果&#xff0c;题为“Ultrasensitive biochemical sensing platform enabled by directly grown graphene on insulator”&#xff08;硅晶圆上直接生长…

计算机网络——应用层与网络安全(六)

前言&#xff1a; 前几章我们已经对TCP/IP协议的下四层已经有了一个简单的认识与了解&#xff0c;下面让我们对它的最顶层&#xff0c;应用层进行一个简单的学习与认识&#xff0c;由于计算机网络多样的连接形式、不均匀的终端分布&#xff0c;以及网络的开放性和互联性等特征&…

使用NTC负温度系数热敏电阻控制温度

鱼缸原来的加热棒使用的是NTC负温度系数的热敏电阻测温&#xff0c;负温度系数是指随着温度的升高&#xff0c;电阻是不断按照指数形式减小的&#xff0c;在22度的情况下实测电阻是10K多&#xff0c;可以断定使用了10K&#xff08;25度下是10K&#xff09;的电阻&#xff0c;为…