Spring Cloud Stream消息驱动项目集成

news2024/12/27 12:03:38

📝 学技术、更要掌握学习的方法,一起学习,让进步发生
👩🏻 作者:一只IT攻城狮 ,关注我,不迷路 。
💐学习建议:1、养成习惯,学习java的任何一个技术,都可以先去官网先看看,更准确、更专业。
💐学习建议:2、然后记住每个技术最关键的特性(通常一句话或者几个字),从主线入手,由浅入深学习。
❤️ 《SpringCloud入门实战系列》解锁SpringCloud主流组件入门应用及关键特性。带你了解SpringCloud主流组件,是如何一战解决微服务诸多难题的。

文章目录

  • 一、项目准备
  • 二、Stream消息驱动之生产者
    • 1、pom 添加依赖
    • 2、配置bootstrap.yml
    • 3、启动类
    • 4、生产者业务类
  • 三、Stream消息驱动之消费者
    • 1、pom 添加依赖
    • 2、配置bootstrap.yml
    • 3、消费者业务类
  • 四、测试
  • 五、如何解决消息的重复消费?消息丢失?

一、项目准备

Spring Cloud Stream消息驱动基础知识传送门: SpringCloud入门实战(十一)- SpringCloud Stream 消息驱动概述

我们知道Spring Cloud Stream是构建消息驱动的微服务应用程序的框架。Spring Cloud Stream可以帮我们屏蔽底层消息中间件的差异,降低切换维护成本,统一消息的编程模型(声明和绑定频道),解决微服务系统中的一些问题。目前支持RabbitMQ 和 Kafka 。

了解了Stream消息驱动的原理和设计思想后,接下来本案例以RabbitMQ为例,模拟项目中如何使用Stream消息驱动。新建三个模块:

cloud-stream-provider:作为生产者进行发消息模块。
cloud-stream-consumer1:作为消息接收模块1
cloud-stream-consumer2:作为消息接收模块2

二、Stream消息驱动之生产者

1、pom 添加依赖

三个工程pom均添加相关依赖:

<dependency>
  <groupId>org.springframework.cloud</groupId>
  <artifactId>spring-cloud-starter-stream-rabbit</artifactId>
</dependency>

完整pom文件:

 <parent>
    <artifactId>springcloud-feign</artifactId>
    <groupId>org.qytest.springcloud</groupId>
    <version>1.0-SNAPSHOT</version>
  </parent>

  <artifactId>cloud-stream-provider</artifactId>

  <dependencies>
    <dependency>
      <groupId>org.springframework.cloud</groupId>
      <artifactId>spring-cloud-starter-stream-rabbit</artifactId>
    </dependency>
    <dependency>
      <groupId>org.springframework.cloud</groupId>
      <artifactId>spring-cloud-config-client</artifactId>
    </dependency>
    <dependency>
      <groupId>org.springframework.cloud</groupId>
      <artifactId>spring-cloud-starter-bootstrap</artifactId>
    </dependency>
    <dependency>
      <groupId>org.springframework.boot</groupId>
      <artifactId>spring-boot-starter-web</artifactId>
    </dependency>
    <dependency>
      <groupId>org.springframework.boot</groupId>
      <artifactId>spring-boot-starter-actuator</artifactId>
    </dependency>
    <dependency>
      <groupId>org.springframework.cloud</groupId>
      <artifactId>spring-cloud-starter-openfeign</artifactId>
      <version>3.1.3</version>
    </dependency>
    <dependency>
      <groupId>org.springframework.cloud</groupId>
      <artifactId>spring-cloud-starter-netflix-eureka-client</artifactId>
      <version>3.1.2</version>
    </dependency>

    <dependency>
      <groupId>org.projectlombok</groupId>
      <artifactId>lombok</artifactId>
    </dependency>
    <!--junit-->
    <dependency>
      <groupId>junit</groupId>
      <artifactId>junit</artifactId>
    </dependency>
    <!-- log4j -->
    <dependency>
      <groupId>log4j</groupId>
      <artifactId>log4j</artifactId>
    </dependency>
    <!-- devtools热部署 -->
    <dependency>
      <groupId>org.springframework.boot</groupId>
      <artifactId>spring-boot-devtools</artifactId>
      <scope>runtime</scope>
      <optional>true</optional>
    </dependency>
  </dependencies>

2、配置bootstrap.yml

cloud-stream-provider消息生产者 -》bootstrap.yml -》8001工程:

server:
  port: 8001

spring:
  application:
    name: cloud-stream-provider
  cloud:
    stream:
      binders: # 在此处配置需要绑定的rabbitMq的服务信息
        defaultRabbit: ## 表示定义的名称,用于binding的整合
          type: rabbit # 消息组件类型
          environment: # 设置rabbitmq的相关的环境配置
            spring:
              rabbitmq:
                host: localhost
                port: 5672
                username: guest
                password: guest
                virtual-host: /
      bindings: ## 服务的整合处理
        output:  # 这个名字是一个通道的名称,消息发送方使用的output、消息接收方使用的是input
          destination: qyExchange # 表示要是用的exchange名称定义
          content-type: application/json # 设置消息类型,本次为json,文本则设置为text/plain
          ## 设置要绑定的消息服务的具体设置
          binder: defaultRabbit

eureka:
  client:
    register-with-eureka: true #向注册中心注册自己
    fetch-registry: true #从EurekaServer抓取已有的注册信息,集群必须设置成true,才能配合ribbon负载均衡
    service-url:
      defaultZone: http://eureka7001.com:7001/eureka
  instance:
    instance-id: provider8001 #主机名称修改
    prefer-ip-address: true #访问路径可以显示ip

注意:生产者配置的是output

3、启动类

三个工程启动类均正常开启最服务发现注册的支持即可,其他无特别:

@SpringBootApplication
@EnableEurekaClient
public class StreamProviderApplication {
    public static void main(String[] args) {
        SpringApplication.run(StreamProviderApplication.class, args);
    }
}

4、生产者业务类

cloud-stream-provider工程业务类处理,模拟发送消息的业务:

1)SendMessageController.java:

@RestController
public class SendMessageController {
    @Resource
    private IMessageProviderService messageProviderService;
    @GetMapping(value = "/sendMessage")
    public String sendMessage() {
        return messageProviderService.send();
    }
}

2)IMessageProviderService接口:

public interface IMessageProviderService {
     String send() ;
}

3)MessageProviderServiceImpl接口实现类:

@Slf4j
@EnableBinding(Source.class) //定义消息的推送管道,即:源  
//将@EnableBinding注释应用于应用程序的配置类之一。@EnableBinding注释本身使用@Configuration进行元注释
/*此处不再需要引入 spring 注解 @Service,这里的业务实现类是与RabbitMQ配合的,使用的 SpringCloud Stream 的注解*/
public class MessageProviderServiceImpl implements IMessageProviderService {

    @Resource
    //在Spring Cloud Stream 1.0中,唯一支持的可绑定组件是Spring消息传递MessageChannel及其扩展名SubscribableChannel和PollableChannel
    private MessageChannel output; //消息发送管道

    @Override
    public String send() {
        String serial = UUID.randomUUID().toString();
        output.send(MessageBuilder.withPayload(serial).build());
        log.info("*****serial:" + serial);
        return "RabbitMQ 消息发送方:" + serial;
    }
}

三、Stream消息驱动之消费者

cloud-stream-consumer1消息接收模块1-》bootstrap.yml -》8002工程:

1、pom 添加依赖

<dependency>
  <groupId>org.springframework.cloud</groupId>
  <artifactId>spring-cloud-starter-stream-rabbit</artifactId>
</dependency>

2、配置bootstrap.yml

server:
  port: 8002

spring:
  application:
    name: cloud-stream-consumer
  cloud:
    stream:
      binders:  # 在此处配置要绑定的 rabbitmq 的服务信息
        defaultRabbit:  # 表示定义的名称,用于 binding 整合
          type: rabbit  # 消息组件类型
          environment:  # 设置 rabbitmq 的相关的环境配置
            spring:
              rabbitmq:
                host: localhost
                port: 5672
                username: guest
                password: guest
      bindings: # 服务的整合处理
        input: # 这个名字是一个通道的名称,消息发送方使用的output、消息接收方使用的是input
          destination: qyExchange  # 表示要使用的 Exchange 名称定义
          content-type: application/json  #设置消息类型,本次为json,文本则设置 text/plain
          binder: defaultRabbit   # 设置要绑定的消息服务的具体设置

eureka:
  client:
    register-with-eureka: true #向注册中心注册自己
    fetch-registry: true #从EurekaServer抓取已有的注册信息,集群必须设置成true,才能配合ribbon负载均衡
    service-url:
      defaultZone: http://eureka7001.com:7001/eureka
  instance:
    instance-id: consumer8002 #主机名称修改
    prefer-ip-address: true #访问路径可以显示ip

注意:消费者配置的是input

3、消费者业务类

建一个ReceiveMessageListenerController.java模拟消费消息即可:

/**
*增加订阅监听器
**/
@Component
@EnableBinding(Sink.class)
public class ReceiveMessageListenerController {

    @Value(value = "${server.port}")
    private String serverPort;

    @StreamListener(Sink.INPUT)//使用@StreamListener进行自动内容类型处理
    //@StreamListener注释提供了一种更简单的处理入站邮件的模型,特别是在处理涉及内容类型管理和类型强制的用例时。
    public void input(Message<String> message) {
        System.out.println("消费者 1 号,----->接收到的消息:" + message.getPayload() + "\t port:" + serverPort);
    }
}

cloud-stream-consumer2消息接收模块2-》bootstrap.yml -》8003工程操作同cloud-stream-consumer1,不再赘述。

四、测试

浏览器多次调用http://localhost:8001/sendMessage模拟生产者发送消息,观察后台日志及RabbitMQ查看实时速度:

在这里插入图片描述

在这里插入图片描述

测试结果,模拟生产及消费消息成功,但是通过观察可知,出现了重复消费的现象!

五、如何解决消息的重复消费?消息丢失?

当集群方式进行消息消费时,就会存在消息的重复消费问题。通过分组解决,只要是一个组的消费者,就处于竞争关系,一次只能有一个去消费,这就可以解决重复消费的问题了。而且分组(group)还解决了持久化的问题。

修改 8002(group1)、8003(group2) 的 yml 配置文件,添加group分组,再次进行测试,发现消息轮询被两个消费者消费:

在这里插入图片描述
在这里插入图片描述
未设置消息分组的微服务在服务宕机重启后并不会获取并消费消息,发生消息丢失故障。而设置了分组的,服务重启后会获取并消费新消息。

在实际的生产过程中,一定要配置消息分组(group),以免造成服务宕机造成的消息丢失的问题

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.coloradmin.cn/o/716693.html

如若内容造成侵权/违法违规/事实不符,请联系多彩编程网进行投诉反馈,一经查实,立即删除!

相关文章

Spark(9):RDD的序列化

目录 0. 相关文章链接 1. 闭包检查 2. 序列化方法和属性 3. Kryo 序列化框架 4. 核心点总结 0. 相关文章链接 Spark文章汇总 1. 闭包检查 从计算的角度, 算子以外的代码都是在 Driver 端执行, 算子里面的代码都是在 Executor 端执行。那么在 scala 的函数式编程中&…

C# 泛型List排序的实现

本文主要介绍了C# 泛型List排序的实现&#xff0c;分享给大家&#xff0c;具体如下&#xff1a; 代码 ? 1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 5…

Github下载Failed to connect to github.com port 443: Timed out

方法一&#xff1a; 使用ssh下载的方式 方法二 使用github加速网站 https://gitclone.com ,用命令行&#xff1a; git clone https://http://gitclone.com/http://github.com/xxx/yyy 参考链接 github克隆失败提示“443&#xff1a;Timed out”怎么解决&#xff1f; - 知乎

形式化验证,Complete Formal Verification of TriCore2 and Other Processors(五)

目录 一、Article:文献出处&#xff08;方便再次搜索&#xff09; &#xff08;1&#xff09;作者 &#xff08;2&#xff09;文献题目 &#xff08;3&#xff09;文献时间 &#xff08;4&#xff09;引用 二、Data:文献数据&#xff08;总结归纳&#xff0c;方便理解&am…

Linux 学习记录45(C++篇)

Linux 学习记录45(C篇) 本文目录 Linux 学习记录45(C篇)一、纯虚函数和抽象类1. 纯虚函数2. 抽象类 二、C中的异常处理1. 抛出异常2. 处理/捕获异常 三、模板(template)1. 模板函数(1. 模板函数的定义和调用(2. 模板函数需要显性调用的时机 2. 模板类3. 模板函数和模板类实现的…

【运维】GitLab相关配置优化等

默认 Git 设置 http post 的缓存为 1MB&#xff0c;使用命令将git的缓存设为500M&#xff0c;重新配置一下postBuffer值 git config --global http.postBuffer 524288000 解决方法2&#xff1a;直接修改config参数&#xff0c; windows: ./git/config中&#xff0c;加入以下…

构建无忧:探索 Linux 项目自动化构建神器-make/Makefile

目录 一.make/Makefile的介绍1.理解make/Makefile二.make/Makefile的使用1.基本使用2.PHONY3.特殊符号拓展 一.make/Makefile的介绍 1.理解make/Makefile 编写Makefile是Linux开发中一项重要的技能&#xff0c;熟练的运用这个工具能提高编译效率&#xff0c;帮助你完成大型工…

【Java系列】Java虚拟机—类加载器介绍

什么是Java虚拟机 Java虚拟机&#xff08;Java Virtual Machine&#xff0c;JVM&#xff09;是一个能够执行 Java 字节码的虚拟计算机。它是 Java 技术的核心部分&#xff0c;是 Java 应用程序运行的基础。 Java 程序在编译后会生成字节码&#xff08;bytecode&#xff09;&am…

【动手学习深度学习--逐行代码解析合集】07多层感知机的简洁实现

【动手学习深度学习】逐行代码解析合集 07多层感知机的简洁实现 视频链接&#xff1a;动手学习深度学习–softmax回归简洁实现 课程主页&#xff1a;https://courses.d2l.ai/zh-v2/ 教材&#xff1a;https://zh-v2.d2l.ai/ 1、代码 import torch from torch import nn from d…

uni-app crypto-js DES 加解密 ,支持app , h5,小程序

crypto-js DES 加解密 &#xff0c;支持app,h5&#xff0c;小程序 第一步 npm install crypto-js 可以直接下载示例运行&#xff0c;看控制台打印 下载地址 https://ext.dcloud.net.cn/plugin?id13351 crypto-js DES 加解密 - DCloud 插件市场

科技富豪抑郁了

原美团二当家王慧文据说抑郁了 什么能解决抑郁问题&#xff1f; 趣讲大白话&#xff1a;科技富豪也抑郁 【趣讲信息科技216期】 **************************** 王富豪创立光年之外AI公司2个月就休息了 知识解决不了抑郁问题 抑郁是现代社会一个常见的症状 是压力所带来的综合症…

牛客网Verilog刷题——VL39

牛客网Verilog刷题——VL39 题目答案 题目 设计一个自动贩售机&#xff0c;输入货币有两种&#xff0c;为0.5/1元&#xff0c;饮料价格是1.5/2.5元&#xff0c;要求进行找零&#xff0c;找零只会支付0.5元。 1、投入的货币会自动经过边沿检测并输出一个在时钟上升沿到1&#x…

HTML-表格、表单标签

目录 表格标签 表单标签 表单项标签 表格标签 场景&#xff1a;在网页中以表格&#xff08;行、列&#xff09;形式整齐展示数据&#xff0c;如班级表标签 标签描述属性/备注<table>定义表格整体&#xff0c;可以包裹多个<tr>border:规定表格边框的宽度width&am…

盖雅劳动力管理云完成多方信创适配,打造信创产业生态

为响应国产化和信创战略需求&#xff0c;盖雅工场积极推动产品适配国产操作系统、国产数据库、国产硬件设备和国产处理器&#xff0c;不断拓展公司信创产业链技术升级。 近日&#xff0c;盖雅工场顺利完成多方信创适配&#xff0c;成功与 麒麟Kylin、鲲鹏Kunpeng、达梦数据库…

一键ai绘画怎么使用你清楚吗?

在当代科技的浪潮中&#xff0c;人工智能绘画生成器犹如一位神奇的画笔&#xff0c;以其特别的创造力&#xff0c;将数字代码转化成令人惊叹的艺术杰作。它就像是一位天才魔术师&#xff0c;能从虚无中诞生出栩栩如生的图像&#xff0c;给人们带来触动和美感。 看着这些ai绘画…

VOC数据集介绍以及读取(目标检测object detection)

VOC&#xff08;Visual Object Classes&#xff09;数据集是一个广泛使用的计算机视觉数据集&#xff0c;主要用于目标检测、图像分割和图像分类等任务。VOC数据集最初由英国牛津大学的计算机视觉小组创建&#xff0c;并在PASCAL VOC挑战赛中使用。 VOC数据集包含各种不同类别…

今晚打老虎:用katalon解决接口/自动化测试拦路虎--参数化

#全局变量 右侧菜单栏中打开profile&#xff0c;点击default&#xff0c;打开之后&#xff0c;在default页面点击add添加全局变量 如果你想学习接口自动化测试&#xff0c;我这边给你推荐一套视频&#xff0c;这个视频可以说是B站播放全网第一的接口自动化测试教程&#xff0c…

ux-grid实现表格排序

需求说明&#xff1a; 1、第一行不参与排序 2、实现带%排序 3、实现null值排序 4、实现值相等不排序 5、实现含有占位符‘–‘排序放到最后 表格属性说明文档 效果图如下&#xff1a; 代码如下&#xff1a; <template><div><ux-gridhighlightCurrentRow:data&…

JavaScript 使用canvas绘制随机生成图形验证码

文章目录 HTML 结构准备CSS 样式准备JavaScript 逻辑部分首先做个准备&#xff1a;声明一个空数组用来随机生成验证码封装一个为canvas标签渲染的函数&#xff0c;用来随机生成验证码还需要封装一个用来生成随机颜色的函数获取到canvas标签为其绑定点击事件为按钮绑定判断点击事…