SpringCloud消息驱动——Stream

news2024/11/16 5:35:28

Stream

本专栏学习内容来自尚硅谷周阳老师的视频

有兴趣的小伙伴可以点击视频地址观看

SpringCloud Stream是SpringCloud的消息驱动,之前的微服务学的好好的,为什么会突然冒出一个这么个东西来增加我们的学习量呢?

一听到消息,那肯定就想到了MQ、Kafka,在日常工作中可能不止用到一种MQ,这时候需要对所有的MQ进行系统的学习,当然也不是所有人都有经历去学习。这时候Stream就应运而生,可以理解为Stream对RabbitMQ、Kafka进行的封装,使用者只需要了解Stream即可。

概念

官方定义 Spring Cloud Stream 是一个构建消息驱动微服务的框架。

应用程序通过 inputs 或者 outputs 来与 Spring Cloud Stream中binder对象交互。

通过我们配置来binding(绑定) ,而 Spring Cloud Stream 的 binder对象负责与消息中间件交互。

所以,我们只需要搞清楚如何与 Spring Cloud Stream 交互就可以方便使用消息驱动的方式。

通过使用Spring Integration来连接消息代理中间件以实现消息事件驱动。

Spring Cloud Stream 为一些供应商的消息中间件产品提供了个性化的自动化配置实现,引用了发布-订阅、消费组、分区的三个核心概念。

目前仅支持RabbitMQ、Kafka

Stream中的消息通信方式遵循了发布-订阅模式,使用了Topic主题进行广播

image-20230420104403564

编码API和常用注解

  • Middleware:中间件,目前只支持RabbitMQ和Kafka
  • Binder:应用与消息中间件之间的封装,目前实行了Kafka和RabbitMQ的Binder,通过Binder可以很方便的连接中间件,可以动态的该表消息类型(Kafka的topic,RabbitMQ的exchange),这些都可以通过配置文件来实现
  • @Input:注解标识输入通道,通过该输入通道接收到的消息进入应用程序
  • @Output:注解标识输出通道,发布的消息将通过该通道离开应用程序
  • @StreamListener:监听队列,用于消费者的队列的消息接收
  • @EnableBinding:信道channel和exchange绑定在一起

前期准备

在学习这章节之前,至少RabbitMQ环境已经搭配好,并且小黄在这里建议学习之前最好去了解以下RabbitMQ,便于后面的理解。小黄学RabbitMQ

另外提前说明一下,在下面的案例中需要三个模块

  • cloud-stream-rabbitmq-provider8801, 作为生产者进行发消息模块
  • cloud-stream-rabbitmq-consumer8802,作为消息接收模块
  • cloud-stream-rabbitmq-consumer8803,作为消息接收模块

消息驱动之生产者

创建cloud-stream-rabbitmq-provider8801服务

引入相关依赖

<!--stream-rabbitmq-->
<dependency>
    <groupId>org.springframework.cloud</groupId>
    <artifactId>spring-cloud-starter-stream-rabbit</artifactId>
</dependency>
<!--eureka-client-->
<dependency>
    <groupId>org.springframework.cloud</groupId>
    <artifactId>spring-cloud-starter-netflix-eureka-client</artifactId>
</dependency>

编写配置文件

server:
  port: 8801

spring:
  application:
    name: cloud-stream-provider
  cloud:
    stream:
      binders: # 在此处配置要绑定的rabbitmq的服务信息;
        defaultRabbit: # 标识定义的名称,用于binding整合
          type: rabbit # 消息组件类型
      bindings: # 服务的整合处理
        output: # 这个名字是一个通道名称
          destination: studyExchange # 标识要使用的交换机名称
          content-type: application/json #设置消息类型
          binder: defaultRabbit # 设置要绑定的消息服务的具体设置
  rabbitmq:
    host: 124.220.80.180
    port: 5672
    username: xxx
    password: xxx

eureka:
  client: # 客户端进行Eureka注册的配置
    service-url:
      defaultZone: http://eureka7001.com:7001/eureka
  instance:
    lease-renewal-interval-in-seconds: 2 # 设置心跳的时间间隔(默认是30秒)
    lease-expiration-duration-in-seconds: 5 # 如果现在超过了5秒的间隔(默认是90秒)
    instance-id: send-8801.com  # 在信息列表时显示主机名称
    prefer-ip-address: true     # 访问的路径变为IP地址

创建发送者接口及其实现类

public interface IMessageProvider {
    public String send();
}

@Slf4j
@EnableBinding(Source.class) // 可以理解为是一个消息的发送管道的定义
public class MessageProviderImpl implements IMessageProvider {

    //消息的发送管道
    @Resource
    private MessageChannel output;

    @Override
    public String send() {
        String msg = UUID.randomUUID().toString();
        output.send(MessageBuilder.withPayload(msg).build());
        log.info("******发送消息:{}",msg);
        return null;
    }
}

编写调用代码

@RestController
public class MessageProviderController {
    @Resource
    IMessageProvider messageProvider;

    @GetMapping(value = "/sendMessage")
    public String sendMessage()
    {
        return messageProvider.send();
    }
}

测试

发送三次请求,发现RabbitMQ上确收有收到消息 http://localhost:8801/sendMessage

image-20230420133805247

消息驱动之消费者

创建cloud-stream-rabbitmq-consumer8802服务

引入相关依赖

<!--stream-rabbitmq-->
<dependency>
    <groupId>org.springframework.cloud</groupId>
    <artifactId>spring-cloud-starter-stream-rabbit</artifactId>
</dependency>
<!--eureka-client-->
<dependency>
    <groupId>org.springframework.cloud</groupId>
    <artifactId>spring-cloud-starter-netflix-eureka-client</artifactId>
</dependency>

编写配置文件

与生产者基本一致,只不过把output改成input

server:
  port: 8802

spring:
  application:
    name: cloud-stream-consumer
  cloud:
    stream:
      binders: # 在此处配置要绑定的rabbitmq的服务信息;
        defaultRabbit: # 标识定义的名称,用于binding整合
          type: rabbit # 消息组件类型
      bindings: # 服务的整合处理
        input: # 这个名字是一个通道名称 #****************
          destination: studyExchange # 标识要使用的交换机名称
          content-type: application/json #设置消息类型
          binder: defaultRabbit # 设置要绑定的消息服务的具体设置
  rabbitmq:
    host: 124.220.80.180
    port: 5672
    username: xxx
    password: xxx

eureka:
  client: # 客户端进行Eureka注册的配置
    service-url:
      defaultZone: http://eureka7001.com:7001/eureka
  instance:
    lease-renewal-interval-in-seconds: 2 # 设置心跳的时间间隔(默认是30秒)
    lease-expiration-duration-in-seconds: 5 # 如果现在超过了5秒的间隔(默认是90秒)
    instance-id: send-8802.com  # 在信息列表时显示主机名称
    prefer-ip-address: true     # 访问的路径变为IP地址

创建消息消费者

@Slf4j
@Component
@EnableBinding(Sink.class)
public class ReceiveMessageListener {
    @Value("${server.port}")
    private String serverPort;

    @StreamListener(Sink.INPUT)
    public void input(Message message)
    {
        log.info("消费者1号,------->接收到的消息:{} \t port: {}",message.getPayload(),serverPort);
    }
}

测试

通过测试发现消费者可以正常的接受到生产者的信息

image-20230420134644862

分组消费与持久化

为了演示问题场景,根据cloud-stream-rabbitmq-consumer8802克隆一个cloud-stream-rabbitmq-consumer8803服务

重复消费

启动8801、8802、8803服务,通过8801发送3条信息,发现8802、8803都消费了这三条信息,在微服务中,这种重复消费的情况是非常可怕的,可能会导致用户重复付费,所以需要避免

image-20230420135628684

image-20230420135635683

原因

这里大家最好还是先了解一下RabbitMQ

首先我们要探究为什么会出现重复消费的情况。上面讲过Stream是使用topic的形式分发消息,也就是说发到一个交换机里的消息会被所有的队列接受并消费。

在默认情况下,Stream会为每一个Input创建一个队列,所以8802、8803创建了两个队列,同时收到消息也非常合理

image-20230420140654612

原理

微服务应用放置于同一个group中,就能够保证消息只会被其中一个应用消费一次。

不同的组是可以消费的,同一个组内会发生竞争关系,只有其中一个可以消费。

解决方案

根据原理,我们的解决方案是将两个Input放入同一个组中

修改配置文件

spring:
  application:
    name: cloud-stream-consumer
  cloud:
    stream:
      binders:  
        defaultRabbit: 
          type: rabbit  
      bindings: 
        input:  
          destination: studyExchange 
          content-type: application/json  
          binder: defaultRabbit 
          group: myGroup # 为两个服务的分组

当分组完成后,再次启动服务发现交换机上只绑定了myGroup队列,再次测试发现不会出现重复消费的情况

默认采用的是轮询,即会将消息轮流发给多个消费者

image-20230420141103112

消息丢失

MQ对消息的持久化是存放在队列中,如果这时候服务宕机了,8801发送的消息还是存储在原来的队列中,在默认没有分组的情况下,每次重启服务生成的队列名是随机的,是无法获取到在宕机时发送到MQ上的消息,就造成了消息丢失。

解决方案

同样也是使用分组来解决,定义了一个group也就是定义了一个队列,重启服务时会重新去监听那个队列,消息就不会丢失了。

友情提示:生产环境中group名称一旦确认了,谨慎修改!!!

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.coloradmin.cn/o/452530.html

如若内容造成侵权/违法违规/事实不符,请联系多彩编程网进行投诉反馈,一经查实,立即删除!

相关文章

u盘文件不见但还占用容量文件办法?

将U盘插入电脑的时候为什么会出现“U盘文件突然不见但还占用空间”的提示框呢?遇到这个问题时又该怎么处理呢?别慌&#xff0c;下面小编就来给大家演示一下子解决U盘文件突然不见但还占用空间这个问题的解决方法。 u盘文件不见但还占用容量文件办法&#xff1f; u盘文件不见但…

短视频平台-小说推文(最右)推广任务详情

最右推荐书单 https://nr6mwfrzw8.feishu.cn/sheets/shtcnVgsBY18qft FqBG9b8eYFnc?sheetpfiUaC 复制链接到飞书或浏览器打开 最右会员 1.1关键词 最右关键词审核时间周一~周日 上午:10点前提交&#xff0c;15:00点前可查下午:15点前提交&#xff0c;20:00点前可查注意: …

盘点几款还不错的企业网盘产品

企业网盘的出现&#xff0c;为企业提供文件安全管理&#xff0c;团队协作服务&#xff0c;解决了便捷性与安全性等问题&#xff0c;受到了企业的青睐。市面上的企业网盘工具也是五花八门&#xff0c;我们该如何选择适合自己团队的网盘工具呢&#xff1f; 本文盘点了几款还不错的…

2023年软件测试的前景?测试工程师技能提升,进阶自动化测试...

目录&#xff1a;导读 前言一、Python编程入门到精通二、接口自动化项目实战三、Web自动化项目实战四、App自动化项目实战五、一线大厂简历六、测试开发DevOps体系七、常用自动化测试工具八、JMeter性能测试九、总结&#xff08;尾部小惊喜&#xff09; 前言 2023年软件测试行…

中国社科院与美国杜兰大学金融管理硕士项目——努力撑起未来的一片天

牛顿说&#xff1a;如果说我看得比别人更远些&#xff0c;那是因为我站在巨人的肩膀上。人类文明浩如烟海&#xff0c;我们每个人都是吸收着前人的精神食粮长大。父母也是尽全力地给我们提供好的学习环境&#xff0c;让我们站在他们的肩头上&#xff0c;青出于蓝而胜于蓝。如今…

新互联网人必学-产品经理课无密为伊消得人憔悴

新互联网人必学-产品经理课 download&#xff1a;https://www.666xit.com/3832/ 产品经理&#xff1a;连接用户需求和产品设计的重要角色 随着移动互联网的迅猛发展&#xff0c;产品经理已成为越来越多IT公司中不可或缺的职位。作为一名产品经理&#xff0c;他所扮演的角色是…

你掌握了stream流的全部新特性吗?

我们知道很早之前java8对于之前的版本更新了许多 新的支持&#xff0c;比如lamda函数式接口的支持&#xff0c;支持更多函数式接口的使用&#xff0c;对链表&#xff0c;数组&#xff0c;队列&#xff0c;集合等实现了Collectio接口的数据结构提供了StreamSupport.stream()支持…

Windows下版本控制器(SVN)-验证是否安装成功+配置版本库+启动服务器端程序

文章目录 基础知识-Windows下版本控制器(SVN)3、Subversion 安装与配置3.1 验证是否安装成功。3.2 配置版本库3.3 启动服务器端程序 基础知识-Windows下版本控制器(SVN) 3、Subversion 安装与配置 TortoiseSVN安装与配置网上资料太多了&#xff0c;这里就不阐述了。 3.1 验证是…

LinkedHashMap如何实现LRU缓存淘汰策略?

本文目录 1.LRU是什么&#xff1f;2.如何使用LinkedHashMap实现LRU?3.LinkedHashMap源码分析3.1 LinkedHashMap简介3.2 继承体系3.3 内部数据存储结构3.4源码解析属性&#xff1a;构造方法&#xff1a;afterNodeInsertion(boolean evict)方法afterNodeAccess(Node e)方法after…

一种应用于车载系统的GPS接收机射频前端的设计

一种应用于车载系统的GPS接收机射频前端的设计 GPS&#xff08;GLOBLE POSITIONING SYSTEM&#xff09;是一种可以定时和测距的空间交会定点导航系统&#xff0c;它可以向全球用户提供连续、实时、高精度的三维位置、三维速度和实践信息。GPS提供两种服务&#xff1a;标准定位…

科研小技巧 | 用ArcGIS绘制研究区地图

目录 01 地图的导入 02 设置十段线小图框 03 设置研究区示意图 04 添加细节04添加细节 05 添加省份名称 06 对研究区额外上色 论文用图对准确性和美观度有一定要求&#xff0c;而ArcGIS具有强大的地图制作功能&#xff0c;可以利用该软件快速制作研究区地图。 01 地图的导…

实力认证 | 睿士主机取证溯源系统再获国产化兼容性认证

睿士主机取证溯源系统喜获鲲鹏技术认证 近日&#xff0c;中睿天下自主研发的睿士主机取证溯源系统与华为技术有限公司旗下鲲鹏&#xff08;Kunpeng&#xff09;920处理器完成兼容性测试认证&#xff0c;并获得鲲鹏技术认证书。这表明睿士主机取证溯源系统可为鲲鹏920处理器主机…

操作系统原理 —— 进程有哪几种状态?状态之间如何切换?(七)

进程的五种状态 首先我们一起来看一下进程在哪些情况下&#xff0c;会有不同的状态表示。 创建态、就绪态 当我们刚开始运行程序的时候&#xff0c;操作系统把可执行文件加载到内存的时候&#xff0c;进程正在被创建的时候&#xff0c;它的状态是创建态&#xff0c;在这个阶…

【两个月算法速成】day02

目录 977. 有序数组的平方 题目链接 思路&#xff1a; 代码 &#xff1a; 209. 长度最小的子数组 题目链接 思路 代码 59. 螺旋矩阵 II 题目链接 思路 代码 总结 977. 有序数组的平方 题目链接 ​​​​​​力扣 思路&#xff1a; 双指针法 因为数组是非递减的…

Linux中的YUM源仓库

这里写目录标题 一 、YUM仓库源的介绍和相关信息1.1yum相关介绍1.2 Linux系统各家厂商用的安装源1.3 yum下载方式 二 、 yum 仓库源的三种搭建2.1yum 配置本地源2.2创建ftp源2.3 配置http源2.4 配置yum在线源 一 、YUM仓库源的介绍和相关信息 1.1yum相关介绍 yum是一个专门为…

关于Netty使用中黏包拆包带来报错问题及解决

文章目录 问题现象解决总结 问题现象 业务场景&#xff1a;雷达作为客户端&#xff0c;平台作为服务端&#xff0c;采用TCP/IP协议的socket连接&#xff0c;数据包采用字节的二进制数据传输平台与雷达的通信和数据解析&#xff0c;在我接手时&#xff0c;已经开发完成&#xf…

2023年Q1天猫空调品牌销量排行榜

如今&#xff0c;空调的普及水平较高&#xff0c;空调行业进入存量换新为主的发展阶段。 根据鲸参谋数据分析平台的相关数据显示&#xff0c;2023年Q1在天猫平台上&#xff0c;空调的销量将近100万件&#xff0c;销售额将近30亿&#xff0c;同时&#xff0c;空调产品的产品均价…

浅谈CRM系统:优化企业管理,提高客户满意度!

一、什么是 CRM&#xff1f; CRM 是 Customer Relationship Management&#xff0c;即客户关系管理的缩写。它是一套用于帮助企业与客户建立和维护良好关系的系统。在 CRM 系统中&#xff0c;将客户的信息集成在一起&#xff0c;包括其历史交易记录、活动记录、沟通记录以及个…

深入探讨Linux驱动开发:Linux设备树

文章目录 一、设备树介绍二、设备树框架1.设备树框架2.节点基本格式3.节点部分属性简介 总结 一、设备树介绍 设备树&#xff08;Device Tree&#xff0c;简称 DT&#xff09;是一种在嵌入式系统中描述硬件设备的一种数据结构和编程语言。它用于将硬件设备的配置信息以树形结构…

介绍与评测Intel HLE与RTM技术

HLE&#xff08;即Hardware Lock Elision&#xff0c;硬件锁省略&#xff09;以及RTM&#xff08;即Restricted Transactional Memory&#xff0c;受限的事务性存储器&#xff09;是Intel在x86微架构中所引入的两条指令集系统&#xff0c;它们均属于TSX&#xff08;Transaction…