消息驱动Stream---基于SpringCloud

news2025/1/23 7:57:04

概要:实际开发中,服务与服务之间的通信经常会使用到消息中间件,而以往使用的一些消息中间件,比如RabbitMQ,该中间件和系统的耦合性非常高,如果我们要将RabbitMQ替换为Kafka,那么系统将会有较大的变动。此时,我们可以使用Spring Cloud Stream整合消息中间件降低系统和中间件的耦合性。

Spring Cloud Stream简介

什么是Spring Cloud Stream

概述Spring Cloud Stream是一个构建消息驱动微服务的框架

作用它是Spring Cloud对于消息中间件的进一步封装,通过使用Spring Cloud Stream,可忽略消息中间件之间的差异,有效降低开发人员对消息中间件的使用复杂度。 目前Spring Cloud Stream支持的消息中间件仅有RabbitMQKafka

Spring Cloud Stream 与消息中间件的交互

使用Stream构建的应用程序与消息中间件之间是通过绑定器Binder相关联的

Binder

特点Binder对于应用程序而言起到了隔离作用,它使得不同消息中间件的实现细节对Stream应用程序来说是透明的

用途对于每一个Stream的应用程序来说,Binder无需知晓消息中间件的通信细节,而是通过向应用程序暴露统一的通道(Channel)来进行通信。Binder是作为输入通道(inputs输出通道(outputs消息中间件之间的桥梁进行消息通信。

构建Stream工程

快速构建Stream工程

1.创建stream-hello项目

加入依赖:

    <dependencies>
        <dependency>
            <groupId>org.springframework.boot</groupId>
            <artifactId>spring-boot-starter-web</artifactId>
        </dependency>
        <dependency>
            <groupId>org.springframework.boot</groupId>
            <artifactId>spring-boot-starter-test</artifactId>
            <scope>test</scope>
        </dependency>
        <dependency>
            <groupId>org.springframework.cloud</groupId>
            <artifactId>spring-cloud-starter-stream-rabbit</artifactId>
            <version>2.1.3.RELEASE</version>
        </dependency>

    </dependencies>

创建rabittmq包,并在rabittmq包中创建SinkReceiver类,用于接收RabbitMQ发送的消息 

import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.cloud.stream.annotation.EnableBinding;
import org.springframework.cloud.stream.annotation.StreamListener;
import org.springframework.cloud.stream.messaging.Sink;

@EnableBinding(Sink.class)  //开启绑定通道的注解  Sink是Stream组件默认的输入通道接口
public class SinkReceiver {
    //声明日志
    private static Logger logger = LoggerFactory.getLogger(SinkReceiver.class);
    @StreamListener(Sink.INPUT)      //此注解声明此方法为监听方法
    private void receiver(String payload){
        logger.info("Receiver:"+payload);
    }
}
2.编写消息消费者类

@EnableBinding注解用来指定一个或多个定义了@Input@Output注解的接口,以此实现对消息通道(Channel)的绑定。

@StreamListener注解主要是修饰方法,用于将被修饰的方法注册为消息中间件上数据流的事件监听器,注解中的属性值对应了监听的消息通道名

3.项目测试

输入Rabbit MQ的可视化网址,账号密码默认为:guest

        第一个Spring Cloud Stream案例就完成了,但是我们会发现,此案例中并没有在配置文件 application.yml中进行任何属性设置,原因在于Spring Cloud Stream会为消息中间件RabbitMQ提供默认的自动化配置。当然我们也可以在Spring Boot支持的全局配置文件application.propertiesapplication.yml中修改相关配置。

Stream的发布-订阅模式

Stream的分布-订阅

特点:Spring Cloud Stream中的消息通信方式遵循的是发布-订阅模式,当一条消息被投递到消息中间件之后,它会通过共享主题的方式进行广播,消息消费者在订阅的主题中收到它并触发自身的业务逻辑处理。这里所提到的主题是Spring Cloud Stream中的一个抽象概念,用来代表发布共享消息给消费者的地方

Stream框架应用结构图

提供者发送消息到RabbitMQ等消息中间件,消费者通过订阅的方式从消息中间件获取消息。

搭建工程实现Stream的发布-订阅 

1.启动rabbitmq
2.创建提供者

依赖:

<dependency>
            <groupId>org.springframework.boot</groupId>
            <artifactId>spring-boot-starter-web</artifactId>
        </dependency>
        <dependency>
            <groupId>org.springframework.boot</groupId>
            <artifactId>spring-boot-starter-test</artifactId>
            <scope>test</scope>
        </dependency>
        <dependency>
            <groupId>org.springframework.cloud</groupId>
            <artifactId>spring-cloud-starter-stream-rabbit</artifactId>
            <version>2.1.3.RELEASE</version>
        </dependency>

application.yml文件:

server:
  port: 8898
spring:
  application:
    name: stream-rabbitmq-provider
  rabbitmq:
    host: localhost
    port: 5672
    username: guest
    password: guest
  cloud:
    stream:
      bindings:
        output:
          destination: minestream

 创建StreamProvider

 


import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.cloud.stream.annotation.EnableBinding;
import org.springframework.cloud.stream.annotation.Output;
import org.springframework.cloud.stream.messaging.Source;
import org.springframework.messaging.MessageChannel;
import org.springframework.messaging.support.MessageBuilder;
import org.springframework.web.bind.annotation.GetMapping;
import org.springframework.web.bind.annotation.RestController;
@EnableBinding(Source.class)
@RestController
public class StreamProvider {
    @Autowired
    @Output(Source.OUTPUT)
    private MessageChannel channel;

    @GetMapping("/send")
    public void send(){
        channel.send(MessageBuilder.withPayload("hello world").build());
    }
}
3.创建消费者

依赖和上面提供者一样(复制即可)

application.yml:

server:
  port: 9898
spring:
  application:
    name: stream-rabbitmq-consumer
  rabbitmq:
    host: localhost
    port: 5672
    username: guest
    password: guest
  cloud:
    stream:
      bindings:
        input:
          destination: minestream

创建StreamConsumer 

 

import org.springframework.cloud.stream.annotation.EnableBinding;
import org.springframework.cloud.stream.annotation.StreamListener;
import org.springframework.cloud.stream.messaging.Sink;

@EnableBinding(Sink.class)
public class StreamConsumer {

    @StreamListener(Sink.INPUT)
    public void receiver(String payload){
        System.out.println("接收到了mq中发送过来的消息"+payload);
    }
}
 4.测试运行

访问http://localhost:8898/send 地址发送消息,在消费者项目stream-rabbitmq-consumer的控制台可以看到打印日志“接收到MQ消息:Hello World!”,说明消息已成功被接收。

Stream的消费组和消息分区

Stream消费组的实现

Spring Cloud Stream应用程序开发中,如果在同一主题上的应用需要启动多个实例时,为防止对消息的重复处理,我们可以通过spring.cloud.stream.bindings.input. group属性为应用指定一个组名,这样一个应用的多个实例在接收到消息时,只会有一个实例真正收到消息并进行处理。

添加消费组

现在因为消费者是个集群,需要再复制多一个消费者,更改端口,并且两个消费者的配置文件加上group: stream

测试运行

依次启动stream-rabbitmq-provider,stream-rabbitmq-consumer,stream-rabbitmq-consumer2,在RabbitMQ控制台的 Queues可以看到两个队列合并为一个:minestream.stream

在浏览器访问http://localhost:8898/send

 

但再次访问http://localhost:8898/send时消费者2收到消息,而消费者1没有收到消息

说明:消息是以轮询的方式进行接收的

Stream的消息分区 

什么是Stream的消息分区?

        在消费组中我们可以保证消息不会被重复消费,但是在同组下有多个实例的时候,我们无法确定每次处理消息的是不是被同一消费者消费,这就要使用到Stream的消息分区,消息分区的作用就是为了确保具有共同特征标识的数据由同一个消费者实例进行处理。

1.改造提供者

修改提供者stream-rabbitmq-provider项目的application.yml配置文件

2.改造消费者

修改消费者stream-rabbitmq-consumer项目的application.yml配置文件

修改消费者stream-rabbitmq-consumer2项目的application.yml配置文件

 

3.启动测试

多次访问http://localhost:8898/send,发现在9899端口的控制台打印了多次日志信息“接收到MQ消息:Hello World!”,而9898端口的控制台没有,说明只有指定的分区可以接收到消息,这就是消费分区的作用。

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.coloradmin.cn/o/2044067.html

如若内容造成侵权/违法违规/事实不符,请联系多彩编程网进行投诉反馈,一经查实,立即删除!

相关文章

垃圾收集器G1ZGC详解

G1收集器(-XX:UseG1GC) G1 (Garbage-First)是一款面向服务器的垃圾收集器,主要针对配备多颗处理器及大容量内存的机器. 以极高概率满足GC停顿时间要求的同时,还具备高吞吐量性能特征. G1将Java堆划分为多个大小相等的独立区域&#xff08;Region&#xff09;&#xff0c;JVM目…

Python教程(十五):IO 编程

目录 专栏列表引言基础概念什么是IO&#xff1f; 同步IO vs 异步IO同步IO&#xff08;Synchronous IO&#xff09;异步IO&#xff08;Asynchronous IO&#xff09; Python中的IO标准IO标准输入和输出 文件IO文件操作的上下文管理器打开文件读取文件 高级文件操作读写二进制文件…

go注册到eureka微服务

// 注册到 Eureka&#xff0c;goeureka会自动30秒发送一次心跳 package mainimport ("fmt""github.com/SimonWang00/goeureka""github.com/gin-gonic/gin""github.com/robfig/cron/v3""time""wbGo/configs" )typ…

【C++小白到大牛】红黑树那些事儿

目录 前言&#xff1a; 一、红黑树的概念 二、红黑树的性质 三、红黑树结点的定义 四、红黑树的插入 情况一&#xff1a;u存在且为红 情况二&#xff1a;u不存在/u存在且为黑 小总结&#xff1a; 原码&#xff1a; 五、红黑树的检验 六、性能比较 前言&#xff1a; …

Linux知识复习第4期

web服务器的基本用法 目录 1、安装 2、启动 3、默认发布目录 1、安装 yum install nginx -y # nginx安装 yum install httpd -y # apache安装 2、启动 systemctl enable --now httpd systemctl enable --now nginx 3、默认发布目录 /usr/www/html/ # …

Python OpenCV 影像处理:影像轮廓

► 前言 上篇介绍使用OpenCV Python对于图像上的二值化操作&#xff0c;二值化主要用途包括图像分割、物体侦测、文字识别等。这种转换可以帮助检测图像中的物体或特定特征&#xff0c;并提取有用的信息&#xff0c;本篇基于二值化操作进行近一步的操作&#xff0c;透过影像梯…

一六七、Linux安装go并部署go项目

Linux 下安装 Golang 获取Golang下载地址 标准官网&#xff1a;https://go.dev/国内镜像官网&#xff1a;https://golang.google.cn/ 安装 1. 进入终端&#xff0c;登入root su - root2. 来到应用安装目录 cd /usr/local3. 使用 wget 下载 如果没有安装 wget 可通过软件…

《向量数据库指南》——Dopple LAbs:展望未来:构建多模态交互的尖端体验

Dopple LAbs:展望未来:构建多模态交互的尖端体验 在快速迭代的科技领域,Dopple LAbs正以其前瞻性的视野和创新精神,引领着人机交互的新篇章。Sam及其团队近期通过一系列技术突破,显著增强了其服务的沉浸感和互动性,为用户带来了前所未有的视听盛宴。以下,我们将深入探讨…

智慧农业大数据助力智慧农业建设

1. 智慧农业概述 智慧农业作为现代农业发展的重要方向&#xff0c;融合了互联网、大数据、云计算、物联网等现代信息技术&#xff0c;旨在提高农业生产效率&#xff0c;实现精准化管理和产品溯源。通过智慧农业的实施&#xff0c;可以解决传统农业面临的信息不对称、融资困难等…

打工人上班适合用的蓝牙耳机推荐?几款开放式耳机推荐

日常工作的话&#xff0c;我还是比较推荐开放式蓝牙耳机的&#xff0c;它特别适合那些需要在长时间工作中保持专注和舒适度的环境&#xff0c;那开放式耳机其实还有一些主要的优点&#xff1a; 减少耳朵疲劳&#xff1a;由于开放式耳机不需要紧密贴合耳朵&#xff0c;因此可以…

复习之 java 锁

裁员在家&#xff0c;没有面试机会&#xff0c;整理整理面试知识点吧&#xff01; 不得不知道的java 锁 Java 中&#xff0c;提供了两种方式来实现同步互斥访问&#xff08;也就是锁&#xff09;&#xff1a;synchronized 和 Lock 多线程编程中&#xff0c;有可能会出现多个线…

使用静态住宅代理解锁YouTube营销的新维度

YouTube作为众多跨境商家的重要营销推广阵地&#xff0c;YouTube的运营数据与店铺的开单息息相关。那么如何做好YouTube营销来增加产品的知名度呢&#xff1f;如何高效运营YouTube矩阵并防止账号间的关联呢&#xff1f;下文介绍的静态住宅代理就能在YouTube营销上助你一臂之力。…

HTML知识点二——表单

表单&#xff1a; 基本语法&#xff1a; <form method"post" action"xxx"><p>名字&#xff1a;<input name"name" type"text"></p><p>密码&#xff1a;<input name"pass" type"pass…

音频进阶学习二——模数和数模转换中的采样、量化和编码

文章目录 前言一、频率连续时间信号的频率数字信号的频率 二、模数转换过程A/D转换三步 三、采样确定采样频率数模转换中的插值 四、量化量化过程量化误差 五、编码总结 前言 所有软件的运行都得益于硬件上的突破&#xff0c;数字信号是从40年前就开始高速发展的领域。得益于硬…

【机器学习】深度强化学习–RL的基本概念、经典场景以及算法分类

引言 深度强化学习&#xff08;Deep Reinforcement Learning, DRL&#xff09;是机器学习的一个分支&#xff0c;它结合了深度学习&#xff08;Deep Learning&#xff09;和强化学习&#xff08;Reinforcement Learning, RL&#xff09;的技术 文章目录 引言一、深度强化学习–…

为什么 React 的函数组件每次渲染执行两次

1.这是 React18 才新增的特性。 2.仅在开发模式("development")下&#xff0c;且使用了严格模式("Strict Mode")下会触发。 生产环境("production")模式下和原来一样&#xff0c;仅执行一次。 在 React 中&#xff0c;当你看到某些代码执行了多…

整合Rocketmq实现审批流消息推送

文章目录 Docker 部署 RocketMQ拉取 RocketMQ 镜像创建容器共享网络 部署NameServer创建目录并授予权限拷贝启动脚本启动容器NameServer 部署Broker Proxy创建挂载文件夹并授权创建broker.cnf文件拷贝启动脚本启动容器Broker 部署RocketMQ控制台&#xff08;rocketmq-dashboar…

12、springboot3 vue3开发平台-前端-记住我功能实现

文章目录 1. 前端用户信息保存2. 登录页面添加3. 后端实现 1. 前端用户信息保存 使用pinia持久化保存用户名密码 src/stores/remember-me.js // 定义 store import { defineStore } from "pinia" import {reactive} from vueexport const useRememberMeStore defi…

求职Leetcode算法题(7)

1.搜索旋转排序数组 这道题要求时间复杂度为o&#xff08;log n&#xff09;&#xff0c;那么第一时间想到的就是二分法&#xff0c;二分法有个前提条件是在有序数组下&#xff0c;我们发现在这个数组中存在两部分是有序的&#xff0c;所以我们只需要对前半部分和后半部分分别…

element ——tree组件懒加载数据、自定义label、修改高亮样式、回显点击状态

需求 整体宽高占一屏&#xff0c;超出滚动条tree组件点击懒加载每一级数据&#xff0c;一共三级三级节点前加icon&#xff0c;标识是否已学习点击高亮显示背景图横向超出省略显示或者横向滚动条纵向超出纵向滚动条修改其字体和间距☆☆☆☆☆从别的页面跳入回显三级点击状态 …