Spring Cloud第二季--消息驱动Spring Cloud Stream

news2024/9/22 1:03:17

文章目录

  • 什么是Spring Cloud Stream
    • Stream 原理
  • 牛刀小试
    • 消息重复消费问题

什么是Spring Cloud Stream

Spring Cloud Stream is a framework for building highly scalable event-driven microservices connected with shared messaging systems.

The framework provides a flexible programming model built on already established and familiar Spring idioms and best practices, including support for persistent pub/sub semantics, consumer groups, and stateful partitions.

官方定义 Spring Cloud Stream 是一个构建消息驱动微服务的框架。

比如有这样的一个场景,应用A 使用了RabbitMQ作为消息队列,而应用B使用了Kafka,各消息中间件构建的初衷不同,架构不同,肯定不能直接通信,更不用说发送消息、接收消息啦,但我们业务上又必须让二者通信,怎么办呢?这时候,就是Spring Cloud Stream大显身手的时候啦!

Spring Cloud Stream通过定义绑定器(binder)作为中间层,完美地实现了应用程序与消息中间件细节之间的隔离。

在这里插入图片描述

Stream 原理

Stream中的消息通信方式遵循了发布-订阅模式
在这里插入图片描述

Source和Sink,简单的可理解为参照对象是Spring Cloud Stream自身,从Stream发布消息就是输出,接受消息就是输入。

通过向应用程序暴露统一的Channel通道,使得应用程序不需要再考虑各种不同的消息中间件实现。

Binder可以生成Binding,Binding用来绑定消息容器的生产者和消费者,它有两种类型,INPUT和OUTPUT,INPUT对应于消费者,OUTPUT对应于生产者。

目前Spring Cloud Stream仅支持RabbitMQ、Kafka。

牛刀小试

第一步,新建cloud-stream-provider, 作为生产者,进行发消息。pom文件添加依赖:

        <dependency>
            <groupId>org.springframework.cloud</groupId>
            <artifactId>spring-cloud-starter-stream-rabbit</artifactId>
        </dependency>
        <dependency>
            <groupId>org.springframework.boot</groupId>
            <artifactId>spring-boot-starter-web</artifactId>
        </dependency>
        <dependency>
            <groupId>org.springframework.boot</groupId>
            <artifactId>spring-boot-starter-actuator</artifactId>
        </dependency>
        <dependency>
            <groupId>org.springframework.cloud</groupId>
            <artifactId>spring-cloud-starter-netflix-eureka-client</artifactId>
        </dependency>
        
        <!--基础配置-->
        <dependency>
            <groupId>org.springframework.boot</groupId>
            <artifactId>spring-boot-devtools</artifactId>
            <scope>runtime</scope>
            <optional>true</optional>
        </dependency>
        <dependency>
            <groupId>org.projectlombok</groupId>
            <artifactId>lombok</artifactId>
            <optional>true</optional>
        </dependency>
        <dependency>
            <groupId>org.springframework.boot</groupId>
            <artifactId>spring-boot-starter-test</artifactId>
            <scope>test</scope>
        </dependency>

配置文件application.yml,output参数标识输出通道,发布的消息将通过该通道离开应用程序

server:
  port: 8801

spring:
  application:
    name: cloud-stream-provider
  cloud:
      stream:
        binders: # 配置绑定的rabbitmq的服务信息;
          defaultRabbit: # 表示定义的名称,用于于binding整合
            type: rabbit # 消息组件类型
            environment: # 设置rabbitmq的相关的环境配置
              spring:
                rabbitmq:
                  host: ******
                  port: 5672
                  username: ******
                  password: ******
        bindings: # 服务的整合处理
          output: # 这个名字是一个通道的名称
            destination: testExchange07 # 表示要使用的Exchange名称定义
            content-type: application/json # 设置消息类型,本次为json,文本则设置“text/plain”
            binder: defaultRabbit # 设置要绑定的消息服务的具体设置

eureka:
  client: # 客户端进行Eureka注册的配置
    service-url:
      defaultZone: http://localhost:7001/eureka
  instance:
    lease-renewal-interval-in-seconds: 2 # 设置心跳的时间间隔(默认是30秒)
    lease-expiration-duration-in-seconds: 5 # 如果现在超过了5秒的间隔(默认是90秒)
    prefer-ip-address: true     # 访问的路径变为IP地址

发送消息接口及实现类

/**
 * @Auther: songweichao
 * @Date: 2023-05-10 11:52
 * @Description:
 */
public interface MessageProvider {

    public String send() ;

}

@EnableBinding ,信道channel和exchange绑定在一起

/**
 * @Auther: songweichao
 * @Date: 2023-05-10 11:52
 * @Description:  
 */
@EnableBinding(Source.class)
public class MessageProviderImpl implements MessageProvider{

    @Resource
    private MessageChannel output;

    @Override
    public String send() {
    
        SimpleDateFormat sdf = new SimpleDateFormat();
        this.output.send(MessageBuilder.withPayload(sdf.format(new Date())).build());
        
        return "success";
    }
}

方法调用:

/**
 * @Auther: songweichao
 * @Date: 2023-05-10 12:14
 * @Description:
 */
@RestController
public class SendMessageController {

    @Resource
    private MessageProvider messageProvider;


    @GetMapping(value = "/send")
    public String sendMessage(){
        return messageProvider.send();
    }
}

第二步,新建cloud-stream-consumer7701,pom依赖同cloud-stream-provider,添加配置文件application.yml

input参数标识输入通道,通过该输入通道接收到的消息进入应用程序。

server:
  port: 7702

spring:
  application:
    name: cloud-stream-consumer
  cloud:
      stream:
        binders: # 配置绑定的rabbitmq的服务信息;
          defaultRabbit: # 表示定义的名称,用于于binding整合
            type: rabbit # 消息组件类型
            environment: # 设置rabbitmq的相关的环境配置
              spring:
                rabbitmq:
                  host: ******
                  port: 5672
                  username: ******
                  password: ******
        bindings: # 服务的整合处理
          input: # 这个名字是一个通道的名称
            destination: testExchange07 # 表示要使用的Exchange名称定义
            content-type: application/json # 设置消息类型,本次为对象json,如果是文本则设置“text/plain”
            binder: defaultRabbit # 设置要绑定的消息服务的具体设置

eureka:
  client: # 客户端进行Eureka注册的配置
    service-url:
      defaultZone: http://localhost:7001/eureka
  instance:
    lease-renewal-interval-in-seconds: 2 # 设置心跳的时间间隔(默认是30秒)
    lease-expiration-duration-in-seconds: 5 # 如果现在超过了5秒的间隔(默认是90秒)
    prefer-ip-address: true     # 访问的路径变为IP地址

接收消息实现,@StreamListener ,监听队列,用于消费者队列的消息接收

/**
 * @Auther: songweichao
 * @Date: 2023-05-10 12:26
 * @Description:
 */
@Component
@EnableBinding(Sink.class)
public class ReceiveMessageListener {

    @Value("${server.port}")
    private String port;

    @StreamListener(Sink.INPUT)
    public void input(Message<String> message){

        System.out.println("消费之7702接收消息------>"+message.getPayload()+"\t port:"+port);
    }
}

第三步,复制cloud-stream-consumer7701为cloud-stream-consumer7702,构建两个消费者,分别启动cloud-stream-provider和cloud-stream-consumer,调用接口测试:
在这里插入图片描述
控制台输出打印,7701和7702都收到了消息。
在这里插入图片描述
在这里插入图片描述

消息重复消费问题

如果7701和7702属于同一个微服务的集群部署,不应该都收到消息,一条消息只能由一个消费者消费,该怎么实现呢?

Spring Cloud Stream 通过分组的方式实现。不同组是可以全面消费的(重复消费),同一组内会发生竞争关系,只有其中一个可以消费。

分别在7701和7702的配置中,添加group属性,为同一组,重启测试

在这里插入图片描述

7701/7702实现了轮询分组,每次只有一个消费者;8801模块的发的消息只能被7701或7702其中一个接收到,这样避免了重复消费。

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.coloradmin.cn/o/509036.html

如若内容造成侵权/违法违规/事实不符,请联系多彩编程网进行投诉反馈,一经查实,立即删除!

相关文章

linux系统函数的运用

函数 函数详解函数的作用函数的定义函数的返回值函数的作用范围函数传参函数递归函数库 函数详解 函数的作用 在编写shell脚本的时候&#xff0c;经常会发现在多个地方使用了同一段代码&#xff0c;如果只是一小段代码&#xff0c;一般也无关紧要&#xff0c;但是要在脚本中多…

如何禁止电脑运行游戏?

在休息的时候&#xff0c;很多人都喜欢使用电脑玩游戏来消磨时间&#xff0c;但是对于未成年人来说&#xff0c;很容易沉迷游戏&#xff0c;从而影响正常的学业和成长。那么如何才能禁止电脑运行游戏呢&#xff1f;下面我们就来了解一下。 除了将电脑游戏卸载之外&#xff0c;还…

markdown甘特图语法介绍

1. 介绍 甘特图&#xff08;Gantt chart&#xff09;又称为横道图、条状图(Bar chart)。其通过条状图来显示项目、进度和其他时间相关的系统进展的内在关系随着时间进展的情况。 2. 语法 1. 代码通用语法 在 markdown 的 代码模块 中编写甘特图&#xff0c;类型是mermaid&a…

华硕ROG|玩家国度魔霸新锐2023 Windows11原厂预装系统 工厂模式恢复安装带ASUSRecevory一键还原

华硕ROG|玩家国度魔霸新锐2023 Windows11原厂预装系统 工厂模式恢复安装带ASUSRecevory一键还原 文件地址&#xff1a;https://pan.baidu.com/s/1snKOsH3OMl3GZLqeAf-GLA?pwd8888 华硕工厂恢复系统 &#xff0c;安装结束后带隐藏分区以及机器所有驱动软件 需准备一个16G左右…

第三十二章 React路由组件的简单使用

1、NavLink的使用 一个特殊版本的 Link&#xff0c;当它与当前 URL 匹配时&#xff0c;为其渲染元素添加样式属性 <NavLink className"list-group-item" to"/home">Home</NavLink> <NavLink className"list-group-item" to&quo…

Python 密码破解指南:0~4

协议&#xff1a;CC BY-NC-SA 4.0 译者&#xff1a;飞龙 本文来自【OpenDocCN 饱和式翻译计划】&#xff0c;采用译后编辑&#xff08;MTPE&#xff09;流程来尽可能提升效率。 收割 SB 的人会被 SB 们封神&#xff0c;试图唤醒 SB 的人是 SB 眼中的 SB。——SB 第三定律 零、简…

stream笔记

1、 创建流stream 1.1、 Stream 的操作三个步骤 1.2、 stream中间操作 1.2.1 、 limit、skip、distinct 1.2.2、 map and flatMap 1.2.3、 sort 自然排序和定制排序 1.3、 add and andAll difference: 1.4、 终止操作 1.4.1、 allmatch、anyMatch、noneMatch、max、min…

JavaScript通过js的方式来判断一个数奇偶性的代码

以下为通过js的方式来判断一个数奇偶性的程序代码和运行截图 目录 前言 一、通过js的方式来判断一个数奇偶性&#xff08;html部分&#xff09; 1.1 运行流程及思想 1.2 代码段 二、通过js的方式来判断一个数奇偶性&#xff08;js部分&#xff09; 2.1 运行流程及思想 2…

美国新的 AI 研究基金将专注于 6 个领域

美国白宫周四宣布&#xff0c;在 1.4 亿美元的联邦资金支持下&#xff0c;拜登政府将开设七个新的人工智能实验室。 国家科学基金会将在其他政府机构的支持下掌舵运营。这些研究所将专注于六个研究课题&#xff1a; 1. 值得信赖的人工智能&#xff0c;隶属于马里兰大学领导的…

JustFE团队前端代码规范,降本增效,敏感肌可用

背景 &#x1f30f; 近年来&#xff0c;接手各个前端的代码&#xff0c;看着前人屎山&#xff0c;深恶痛绝 为了避免自己或者团队&#xff0c;继续添粪&#xff0c;因此经验总结一番~ 规范化优点&#xff1a; 容易理解&#xff0c;维护性强容易编写&#xff0c;扩展性强精准定…

优化营商环境:打造政策精准服务平台,提高惠政策落实落地时效性

近年来&#xff0c;各级政府部门及产业园区不断加强对于惠企政策的宣传和落实&#xff0c;努力打造优质的营商环境&#xff0c;加大助企纾困力度&#xff0c;以推动经济高质量发展。为了更好地实现这一目标&#xff0c;搭建惠企政策精准服务平台成为了一个非常重要的举措。 搭建…

Apache Zeppelin系列教程第四篇——Interpreter原理分析

Interpreter 其实就是整个项目的核心&#xff0c;代码运行都是在里面进行执行的&#xff0c;首先来看下Interpreter的抽象类 以jdbc-Interpreter为例&#xff0c;可以参考下测试代码(这个代码里面可以直接测试jdbc的sql执行过程和数据返回过程) 参数和介绍可以参考官方文档&am…

CH9329双头线使用说明

1.介绍说明 CH9329双头线是集成了CH9329CH340芯片的成品线&#xff0c;主要作用是使用主控电脑发送串口指令数据来控制被控电脑的键盘以及鼠标的功能。主控电脑和被控电脑可以是同一台电脑&#xff0c;就是能够自己控制自己。支持自定义修改USB的硬件信息&#xff0c;如PID、V…

FTP-HTTP-HTTPS的学习总结

FTP协议的学习 一&#xff0c;学习的要点 ftp的掌握总体架构、了解状态机 请求响应的格式、常用操作码及响应的含义 PORT与PASV的区别、断点续传 上传、下载文件的基本流程 1&#xff0c;FTP的架构主要有两种形式 UserPI&#xff08;用户解释器&#xff09;和ServerPI&…

『面试篇』之网络知识串联(一):DNS域名解析、TCP的建立与关闭

趁着马上就要面试的这个机会&#xff0c;将网络相关的知识进行一个串联复习。前端网络会在面试中遇到的问题其实并不是很多&#xff0c;核心的内容主要是TCP建立的的三次握手、TCP断开的四次挥手、DNS解析的过程、以及前端跨域等。话不多说&#xff0c;我们直接进入正题。 一、…

【STL十八】算法——不修改序列的操作(for_each、count、find、equal、search)

【STL十八】算法——不修改序列的操作&#xff08;for_each、count、find、equal、search&#xff09; 一、简介二、头文件三、分类四、不修改序列的操作1、for_each2、count、count_if3、find、find_if4、euqal5、search 前言&#xff1a;在前面我们讲解容器和函数对象时&…

X.25,帧中继(FR),ATM三种分组交换系统

X.25、帧中继&#xff08;FR&#xff09;、ATM 是流行的三种分组交换系统&#xff0c;它们具有不同的特点。 两个术语&#xff1a; DTE(Data Terminal Equipment) &#xff0c;数据终端设备&#xff0c;如我们的个人电脑、手机。 DCE(Data Circuit Equipment) &#xff0c;数据…

【STM32】基础知识 第十二课 GPIO

【STM32】基础知识 第十二课 GPIO 概述GPIO 简介GPIO 模式GPIO 特点GPIO 配置GPIO 操作施密特触发器案例 概述 本文小白我将来介绍通用输入与输出, GPIO (General-Purpose Input/Output) 在单片机中的应用, 以及如何配合和食用 GPIO 来实现各种功能. GPIO 简介 GPIO 是单片机…

十三、共享内存

文章目录 一、什么是共享内存&#xff08;一&#xff09;共享内存的定义&#xff08;二&#xff09;共享内存的原理&#xff08;三&#xff09;共享内存的理解 二、为什么要有共享内存三、共享内存怎么进行&#xff08;一&#xff09; 共享内存的数据结构&#xff08;二&#x…

【Redis系列】Redis布隆过滤之8亿大数据集实战

序言 即便平凡的日子仿佛毫无波澜&#xff0c;但在某个特定的时刻&#xff0c;执着的努力便会显现出它的价值和意义。 文章标记颜色说明&#xff1a; 黄色&#xff1a;重要标题红色&#xff1a;用来标记结论绿色&#xff1a;用来标记一级重要蓝色&#xff1a;用来标记二级重要 …