springCould中的Stream-从小白开始【12】

news2025/1/12 19:09:26

🥚今日鸡汤🥚

        见过一些人,他们朝九晚五😭,有时也要加班,却能把生活过得很😎有趣。他们有自己的爱好,不怕独处。他们有自己的坚持,哪怕没人在乎。🤦‍♂️

                                                               开心一点😁

                                                               认真一点🤔

                                                               努力一点🫡

目录

😶‍🌫️1.为什么引入Stream

🥚2.什么是Stream 

🧇3.Steam设计思想 

🥓4.案例说明 

🧂5.重复消费 


1.为什么引入Stream🥚🥚🥚

  • 屏蔽底层消息中间件的差异,降低切换成本,统一消息的编程模型

1.1无感知的使用消息中间件

Stream解决了开发人员无感知的使用消息中间件的问题,因为Stream对消息中间件的进一步封装,可以做到代码层面对中间件的无感知。

1.2中间件和服务的高度解耦

Spring Cloud Stream进行了配置隔离,只需要调整配置,开发中可以动态的切换中间件(如rabbitmq切换为kafka),使得微服务开发的高度解耦,服务可以关注更多自己的业务流程。

2.什么是Stream 🥚🥚🥚

  • 官方定义 Spring Cloud Stream 是一个构建消息驱动微服务的框架
  • 应用程序通过inputs或者outputs来与Spring Cloud Stream中binder对象交互。
  • 通过我们配置binding(绑定),而Spring Cloud Stream的 binder对象负责与消息中间件交互

所以,我们只需要搞清楚如何与Spring Cloud Stream交互就可以方便使用消息驱动的方式。

3.Steam设计思想🥚🥚🥚 

  • 通过定义绑定器作为中间层,完美地实现了应用程序与消息中间件细节之间的隔离。
  • 通过向应用程序暴露统一的Channel通道,使得应用程序不需要再考虑各种不同的消息中间件实现。
  • 通过定义绑定器Binder作为中间层,实现了应用程序与消息中间件细节之间的隔离

4.案例说明 🥚🥚🥚

  • cloud-stream-rabbitmq-provider8801,作为生产者进行发消息模块
  • cloud-stream-rabbitmq-consumer8802,作为消息接收模块
  • cloud-stream-rabbitmq-consumer8803,作为消息接收模块

4.1消息驱动-生产者

1.加pom

   <dependencies>
       <dependency>
           <groupId>org.springframework.boot</groupId>
           <artifactId>spring-boot-starter-web</artifactId>
       </dependency>
       <dependency>
           <groupId>org.springframework.boot</groupId>
           <artifactId>spring-boot-starter-actuator</artifactId>
       </dependency>
       <dependency>
           <groupId>org.springframework.boot </groupId>
           <artifactId>spring-boot-starter-test</artifactId>
       </dependency>
       <!--基础依赖-->
       <dependency>
           <groupId>org.projectlombok</groupId>
           <artifactId>lombok</artifactId>
       </dependency>
       <!--eureka客户端-->
       <dependency>
           <groupId>org.springframework.cloud</groupId>
           <artifactId>spring-cloud-starter-netflix-eureka-client</artifactId>
       </dependency>
       <!--消息驱动-->
       <dependency>
           <groupId>org.springframework.cloud</groupId>
           <artifactId>spring-cloud-starter-stream-rabbit</artifactId>
       </dependency>
   </dependencies>

2.改yml

  • 注意:小张的Rabbitmq是在Linux上的,所以配置如下:
server:
  port: 8801

spring:
  application:
    name: cloud-stream-provider
  rabbitmq:
    host: 192.168.20.129
    port: 5672
    username: root
    password: 123456
  cloud:
    stream:
      binders:
        defaultRabbit:
          type: rabbit
      bindings:
        output:
          destination: studyExchange
          content-type: application/json
          binder: defaultRabbit

eureka:
  client: # 客户端进行Eureka注册的配置
    service-url:
      defaultZone: http://eureka7001.com:7001/eureka,http://eureka7002.com:7002/eureka,http://eureka7003.com:7003/eureka
  instance:
    lease-renewal-interval-in-seconds: 2 # 设置心跳的时间间隔(默认是30秒)
    lease-expiration-duration-in-seconds: 5 # 如果现在超过了5秒的间隔(默认是90秒)
    instance-id: send-8801.com  # 在信息列表时显示主机名称
    prefer-ip-address: true     # 访问的路径变为IP地址

3.主启动类

@SpringBootApplication
public class StreamMqMain8801 {
    public static void main(String[] args) {
        SpringApplication.run(StreamMqMain8801.class);

    }
}

4.业务类

  • 1.创建接口
  • 2.创建接口实现类
  • @EnableBinding:Spring Cloud Stream中用来启用消息传递功能的注释。
  • 它用于将应用程序绑定到消息传递系统(例如,Apache Kafka, RabbitMQ),并声明用于发送和接收消息的输入和输出通道。
  • 通过使用@EnableBinding,您可以定义应用程序所需的通道和消息处理程序
@EnableBinding(Source.class)//定义消息的推送管道
public class IMessageProviderImpl implements IMessageProvider {

    @Autowired
    private MessageChannel output; //消息发送管道

    @Override
    public String send() {
        String serial= UUID.randomUUID().toString();
        output.send(MessageBuilder.withPayload(serial).build());
        System.out.println("======serial:"+serial);
        return null;
    }
}

5.测试

  • 1.浏览器192.168.20.129:15672访问RabbitMQ
  • 2.localhost:8801/sendMessage访问

4.2消息驱动-消费者

1.建模块

  • 1.在父工程下创建模块cloud-stream-rabbitmq-consumer8802,作为消息接收模块
  • 2.注意jdk和maven版本号

2.加pom

  • 1.springboot依赖
  • 2.通用依赖
  • 3.eureka客户端依赖
  • 4.消息驱动rabbitmq

    <dependencies>
        <dependency>
            <groupId>org.springframework.boot</groupId>
            <artifactId>spring-boot-starter-web</artifactId>
        </dependency>
        <dependency>
            <groupId>org.springframework.boot</groupId>
            <artifactId>spring-boot-starter-actuator</artifactId>
        </dependency>
        <dependency>
            <groupId>org.springframework.boot </groupId>
            <artifactId>spring-boot-starter-test</artifactId>
        </dependency>
        <!--基础依赖-->
        <dependency>
            <groupId>org.projectlombok</groupId>
            <artifactId>lombok</artifactId>
        </dependency>
        <!--eureka客户端-->
        <dependency>
            <groupId>org.springframework.cloud</groupId>
            <artifactId>spring-cloud-starter-netflix-eureka-client</artifactId>
        </dependency>
        <!--消息驱动-->
        <dependency>
            <groupId>org.springframework.cloud</groupId>
            <artifactId>spring-cloud-starter-stream-rabbit</artifactId>
        </dependency>
    </dependencies>

3.添yml

server:
  port: 8802

spring:
  application:
    name: cloud-stream-consumer
  rabbitmq:
    host: 192.168.20.129
    port: 5672
    username: root
    password: 123456
  cloud:
    stream:
      binders:
        defaultRabbit:
          type: rabbit
      bindings:
        input:
          destination: studyExchange
          content-type: application/json
          binder: defaultRabbit

eureka:
  client: # 客户端进行Eureka注册的配置
    service-url:
      defaultZone: http://eureka7001.com:7001/eureka,http://eureka7002.com:7002/eureka,http://eureka7003.com:7003/eureka
  instance:
    lease-renewal-interval-in-seconds: 2 # 设置心跳的时间间隔(默认是30秒)
    lease-expiration-duration-in-seconds: 5 # 如果现在超过了5秒的间隔(默认是90秒)
    instance-id: receive-8802.com  # 在信息列表时显示主机名称
    prefer-ip-address: true     # 访问的路径变为IP地址

4.主启动类

@SpringBootApplication
public class StreamMqMain8802 {
    public static void main(String[] args) {
        SpringApplication.run(StreamMqMain8802.class);
    }
}

5.业务类

  • 1.@StreamListener注解是Spring Cloud Stream框架提供的一个注解,用于定义一个消息监听器
  • 2.通过使用@StreamListener注解,可以将一个方法标记为消息的消费者并指定该方法要监听的消息通道
  • 3.当有消息到达指定的通道时,该方法会被自动触发执行,从而处理这个消息。
  • 4.@StreamListener注解通常与@EnableBinding注解一起使用,用于指定所要绑定的消息通道。
  • 5.@EnableBinding注解用于绑定消息通道与应用程序中的输入输出接口,@StreamListener注解则用于标记一个方法作为消息的消费者。
@Component
@EnableBinding(Sink.class)
public class ReceiveMessageListenerController {

    @Value("${server.port}")
    private String serverPort;

    @StreamListener(Sink.INPUT)
    public void input(Message<String> message){
        System.out.println("消费者1---接受消息:"+message.getPayload()+",port:"+serverPort);

    }

}

6.测试

  • 1.使用8801生产者发送消息
  • 2.使用8802消费者接受消息

5.重复消费 🥚🥚🥚

问题描述:

  • 1.根据8802,重新创建cloud-stream-rabbitmq-consumer8803,作为消息接收模块
  • 2.8801生产者发送消息
  • 3.8802,8803都可以接收到

 如果一个订单同时被两个服务获取到,就会造成数据错误

注意:在Stream中处于同一个group中的多个消费者竞争关系,就能够保证消息只会被其中一个应用消费一次。不同组是可以全面消费的(重复消费)。

5.1自定义分组

在消费者端添加group配置:分为xzA,xzB

5.2轮询分组 

8802/8803实现了轮询分组,每次只有一个消费者8801模块的发的消息只能被8802或8803其中一个接收到,这样避免了重复消费。

8802,8803的group配置相同名称,重新启动 ,使用8801发送两条消息,8802接受一条,8803接收一条

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.coloradmin.cn/o/1376433.html

如若内容造成侵权/违法违规/事实不符,请联系多彩编程网进行投诉反馈,一经查实,立即删除!

相关文章

【Web】CTFSHOW PHP文件包含刷题记录(全)

温故知新。 目录 web78 web79 web80 web81 web82 web83 web84 web85 web86 web87 web88 web78 伪协议base64编码直接读出文件内容就行 ?filephp://filter/convert.base64-encode/resourceflag.php web79 一眼data伪协议包含php脚本 ?filedata://text/plain,<…

rust语言介绍篇

Rust出现就是为了解决C面临的所有问题。Rust是一门系统编程语言 [1]&#xff0c;专注于安全 [2]&#xff0c;尤其是并发安全&#xff0c;支持函数式和命令式以及泛型等编程范式的多范式语言。Rust在语法上和C类似 [3]&#xff0c;设计者想要在保证性能的同时提供更好的内存安全…

【财务数据分析经验分享】如何进行三大报表的年度解读

很快就要到年底了&#xff0c;大家又要开始进行年度经营数据分析了。今天我就用一个例子来演示财务数据分析三张报表的年度分析。 为了更便捷的从年度来分析三大报表&#xff0c;我分别以同样的基本思路对三大报表开发出三张年度分析报表&#xff1a; 1、 按年度来进行筛选分…

windows搭建银河麒麟v10虚拟机

需要用到&#xff1a; 已将安装包放置云盘 自取 VMware Workstation Pro16 https://cloud.189.cn/t/vYZNjqbQ7zUr (访问码:a2pd) 银河麒麟v10镜像 https://cloud.189.cn/t/j6ZNfmnYfYRr (访问码:1icf) 也可以去官网下载&#xff1a;https://www.kylinos.cn 1.安装VM 无…

解密!神奇代码消除 Vue 中 Mac 电脑左滑右滑页面跳转

想知道如何让Mac电脑左滑右滑不再意外跳转页面吗&#xff1f;本文将揭示一个独家秘籍&#xff0c;通过简单的一行代码&#xff0c;让你的用户体验飞速提升&#xff01;别错过这个让你的Vue表格组件更顺畅的宝贵技巧&#xff01; 最近&#xff0c;我在使用 Vue 开发表格组件时遇…

动态pv策略和组件

pv和pvc&#xff0c;存储卷&#xff1a; 存储卷&#xff1a; emptyDir 容器内部&#xff0c;随着pod销毁&#xff0c;emptyDir也会消失 不能做数据持久化 hostPath&#xff1a;持久化存储数据 可以和节点上的目录做挂载。pod被销毁了数据还在 NFS&#xff1a;一台机器&am…

【AI视野·今日NLP 自然语言处理论文速览 第七十四期】Wed, 10 Jan 2024

AI视野今日CS.NLP 自然语言处理论文速览 Wed, 10 Jan 2024 Totally 38 papers &#x1f449;上期速览✈更多精彩请移步主页 Daily Computation and Language Papers Model Editing Can Hurt General Abilities of Large Language Models Authors Jia Chen Gu, Hao Xiang Xu, J…

轮询定时器 清除 + vue2.0

需求? Gin Vue Element UI框架中, 我的大屏可视化项目, 大屏页面, 里边写了多个轮询定时器. 离开页面需要清理掉, 要不然切换路由还会在后台运行, 页面是自动缓存状态, 也不存在销毁一说了 所以通过路由router配置中, 页面路由监听中, 进行监听路由变化, 但是也没生效 …

支付宝异步验签踩的坑

最近公司要做支付宝小程序 我作为服务端就要给小程序配置下单啊&#xff0c;异步回调同步支付状态等功能 就不可避免的使用到了支付宝异步验签 首先背景是我是PHP语言&#xff0c;然后验签方式是RSA2 一开始写原生验签方法&#xff0c;验签失败&#xff0c;后面又搞sdk 验签…

性能测试很简单-JMeter性能测试实践

最近破费买了一台服务器&#xff0c;准备搭建自己的网站&#xff0c;顺便将自己开发的一些测试小工具部署到服务器上&#xff0c;虽然机器配置一般&#xff0c;还是决定对服务器进行压测一番&#xff0c;看一下服务器性能如何。本次压测选择的工具是JMeter&#xff0c;这个工具…

vue Element Plus Cascader级联选择器点击标签选中复选框

element-plus原功能 element-plus的Cascader级联选择器点击标签时是不会选中复选框的&#xff0c;我们想要实现点击标签时也能选中复选框这个效果&#xff0c;那么就要用到一些原生的方法 实现效果 mounted() {// Cascader 级联选择器: 点击文本就让它自动点击前面的input就可…

在Windows服务器上部署项目【虚拟机版】

一. jdk的安装 1、直接双击jdk应用程序&#xff0c;然后下一步下一步即可。 2、安装完成后&#xff0c;在此电脑➡右键➡属性➡高级系统变量。 3、配置环境变量 新建JAVA_HOMEC:\Program Files\Java\jdk1.8.0_144 编辑pathpath%JAVA_HOME%\bin;%JAVA_HOME%\jre\bin; 4、测试&am…

uniapp微信小程序投票系统实战 (SpringBoot2+vue3.2+element plus ) -投票创建页面实现

锋哥原创的uniapp微信小程序投票系统实战&#xff1a; uniapp微信小程序投票系统实战课程 (SpringBoot2vue3.2element plus ) ( 火爆连载更新中... )_哔哩哔哩_bilibiliuniapp微信小程序投票系统实战课程 (SpringBoot2vue3.2element plus ) ( 火爆连载更新中... )共计21条视频…

[足式机器人]Part2 Dr. CAN学习笔记-Advanced控制理论 Ch04-6 线性控制器设计Linear Controller Design

本文参考&#xff1a; B站&#xff1a;DR_CAN Dr. CAN学习笔记-Advanced控制理论 Ch04-6 线性控制器设计Linear Controller Design

C++核心编程——类和对象(一)

本专栏记录C学习过程包括C基础以及数据结构和算法&#xff0c;其中第一部分计划时间一个月&#xff0c;主要跟着黑马视频教程&#xff0c;学习路线如下&#xff0c;不定时更新&#xff0c;欢迎关注。 当前章节处于&#xff1a; ---------第1阶段-C基础入门 ---------第2阶段实战…

运动耳机哪个最好?2024年无线运动耳机品牌排行榜

​挑选一款适合运动的耳机是每位运动爱好者必备的决策。这些耳机不仅为你提供优质音乐&#xff0c;更在设计上考虑了运动场景&#xff0c;确保在高强度运动中稳固又舒适的佩戴感。今天&#xff0c;我将向你介绍几款在运动时表现卓越的运动耳机&#xff0c;助你全情投入每一次锻…

Unity组件开发--长连接webSocket

1.下载安装UnityWebSocket 插件 https://gitee.com/cambright/UnityWebSocket/ 引入unity项目&#xff1a; 2.定义消息体结构&#xff1a;ExternalMessage和包结构Package&#xff1a; using ProtoBuf; using System; using System.Collections; using System.Collections.Ge…

uniapp微信小程序投票系统实战 (SpringBoot2+vue3.2+element plus ) -我参与的投票列表实现

锋哥原创的uniapp微信小程序投票系统实战&#xff1a; uniapp微信小程序投票系统实战课程 (SpringBoot2vue3.2element plus ) ( 火爆连载更新中... )_哔哩哔哩_bilibiliuniapp微信小程序投票系统实战课程 (SpringBoot2vue3.2element plus ) ( 火爆连载更新中... )共计21条视频…

LeetCode刷题--- 按摩师

个人主页&#xff1a;元清加油_【C】,【C语言】,【数据结构与算法】-CSDN博客 个人专栏 力扣递归算法题 http://t.csdnimg.cn/yUl2I 【C】 ​​​​​​http://t.csdnimg.cn/6AbpV 数据结构与算法 ​​​http://t.csdnimg.cn/hKh2l 前言&#xff1a;这个专栏主要讲述动…

监督学习 - 支持向量回归(Support Vector Regression,SVR)

什么是机器学习 支持向量回归&#xff08;Support Vector Regression&#xff0c;SVR&#xff09;是一种基于支持向量机&#xff08;Support Vector Machine&#xff0c;SVM&#xff09;的回归算法&#xff0c;用于解决回归问题。与传统的回归算法不同&#xff0c;SVR的目标是…