实战:Spring Cloud Stream消息驱动框架整合rabbitMq

news2024/12/22 9:06:14

文章目录

    • 前言
    • Spring Cloud Stream简析
    • Spring Cloud Stream与rabbitmq整合
      • 1、添加pom依赖
      • 2、application.yml增加mq配置
      • 3、定义输入输出信道
      • 4、使用输入输出信道收发消息
      • 5、模拟正常消息消费
      • 6、模拟异常消息

前言

相信很多同学都开发过WEB服务,在WEB服务的开发中一般是通过缓存、队列、读写分离、削峰填谷、限流降级等手段来提高服务性能和保证服务的正常投用。对于削峰填谷就不得不用到我们的MQ消息中间件,比如适用于大数据的kafka,性能较高支持事务活跃度高的rabbitmq等等,MQ的选用和整合已经是JAVA WEB开发中不可或缺对的一部分。当然,作为号称JAVA微服务框架全家桶的Spring Cloud也提供了良好的适配中间件的功能。今天我们就来整合一下微服务全家桶Spring Cloud提供的消息驱动——Spring Cloud Stream。

Spring Cloud Stream简析

Spring Cloud Stream是用于构建微服务具有消息驱动能力的框架,应用程序通过inputs、outputs通道与binder进行交互,binder与消息中间件进行通信。

binder的作用是将消息中间件进行粘合,相当于对第三方中间件进行封装整合,让开发人员不用关心底层消息中间件如何运行。
在这里插入图片描述

inputs是消息输入通道,类似于消息中间件的consumer消费者;outputs是消息输出通道,类似于消息中间件的producer生产者。应用程序收发消息不再直接调用消息中间件的接口或者逻辑代码,直接使用Spring Cloud Stream 的OUTPUT与INPUT通道进行处理。

可以通过binder绑定选用各种消息中间件,用binding进行中间件的相关参数配置,让应用程序达到灵活配置和切换消息中间件的目的。

Spring Cloud Stream与rabbitmq整合

本次整合直接与rabbitmq整合,如果是使用kafka的同学,可以直接移植配置修改对应粘接mq即可。

本次整合加入了消费重试机制、死信队列,并提供死信队列消费监听方法,可直接移植到生产环境。

1、添加pom依赖

引入spring-cloud-starter-stream-rabbit 需要从Spring Cloud中引入,注意dependencyManagement的配置。

<properties>
    <java.version>1.8</java.version>
    <spring-cloud.version>Hoxton.SR10</spring-cloud.version>
</properties>

<dependencies>
    <dependency>
        <groupId>org.springframework.cloud</groupId>
        <artifactId>spring-cloud-starter-stream-rabbit</artifactId>
    </dependency>
</dependencies>

<dependencyManagement>
    <dependencies>
        <dependency>
            <groupId>org.springframework.cloud</groupId>
            <artifactId>spring-cloud-dependencies</artifactId>
            <version>${spring-cloud.version}</version>
            <type>pom</type>
            <scope>import</scope>
        </dependency>
    </dependencies>
</dependencyManagement>

2、application.yml增加mq配置

spring:
  rabbitmq:
    host: 127.0.0.1
    port: 5672
    username: admin
    password: admin
    virtual-host: /
  cloud:
    stream:
      binders: #stream框架粘接的mq
        myRabbit: #自定义个人mq名称
          type: rabbit
          environment:
            spring:
              rabbitmq:
                host: 127.0.0.1
                port: 5672
                username: admin
                password: admin
                virtual-host: /
      bindings: #stream绑定信道
        output_channel: #自定义发送信道名称
          destination: assExchange #目的地 交换机/主题
          content-type: application/json
          binder: myRabbit #粘接到的mq
          group: assGroup
        input_channel: #自定义接收信道
          destination: assExchange #目的地 交换机/主题
          content-type: application/json
          binder: myRabbit #粘接到的mq
          group: assGroup
          consumer:
            maxAttempts: 3 # 尝试消费该消息的最大次数(消息消费失败后,发布者会重新投递)。默认3
            backOffInitialInterval: 1000 # 重试消费消息的初始化间隔时间。默认1s,即第一次重试消费会在1s后进行
            backOffMultiplier: 2 # 相邻两次重试之间的间隔时间的倍数。默认2
            backOffMaxInterval: 10000 # 下一次尝试重试的最大时间间隔,默认为10000ms,即10s
      rabbit: #stream mq配置
        bindings:
          input_channel:
            consumer:
              concurrency: 1 #消费者数量
              max-concurrency: 5 #最大消费者数量
              durable-subscription: true  #持久化队列
              recovery-interval: 3000  #3s 重连
              acknowledge-mode: MANUAL  #手动
              requeue-rejected: false #是否重新放入队列
              auto-bind-dlq: true #开启死信队列
              requeueRejected: true #异常放入死信
              

3、定义输入输出信道

/**
 * MqChannel
 * @author senfel
 * @version 1.0
 * @date 2023/6/2 15:46
 */
public interface MqChannel {
  
    /**
     * 消息目的地 RabbitMQ中为交换机名称
     */
    String destination = "assExchange";
    /**
     * 输出信道
     */
    String OUTPUT_CHANNEL = "output_channel";
    /**
     * 输入信道
     */
    String INPUT_CHANNEL = "input_channel";
    /**
     * 死信队列
     */
    String INPUT_CHANNEL_DLQ = "assExchange.assGroup.dlq";

    @Output(MqChannel.OUTPUT_CHANNEL)
    MessageChannel output();

    @Input(MqChannel.INPUT_CHANNEL)
    SubscribableChannel input();

}

4、使用输入输出信道收发消息

TestMQService

/**
 * TestMQService
 * @author senfel
 * @version 1.0
 * @date 2023/6/2 15:47
 */
public interface TestMQService {

    /**
     * 发送消息
     */
    void send(String str);
}

TestMQServiceImpl

/**
 * TestMQServiceImpl
 * @author senfel
 * @version 1.0
 * @date 2023/6/2 15:49
 */
@Service
@Slf4j
@EnableBinding(MqChannel.class)
public class TestMQServiceImpl implements TestMQService {

    @Resource
    private MqChannel mqChannel;

    @Override
    public void send(String str) {
        mqChannel.output().send(MessageBuilder.withPayload("测试=========="+str).build());
    }


    /**
     * 接收消息监听
     * @param message 消息体
     * @param channel 信道
     * @param tag 标签
     * @param death
     * @author senfel
     * @date 2023/6/5 9:25
     * @return void
     */
    @StreamListener(MqChannel.INPUT_CHANNEL)
    public void process(String message,
                        @Header(AmqpHeaders.CHANNEL) Channel channel,
                        @Header(AmqpHeaders.DELIVERY_TAG) long tag) throws Exception {
        log.info("message : "+message);
        if(message.contains("9")){
            // 参数1为消息的tag  参数2为是否多条处理 参数3为是否重发
            //channel.basicNack(tag,false,false);
            System.err.println("--------------消费者消费异常--------------------------------------");
            System.err.println(message);
            throw new RuntimeException("抛出异常");
        }else{
            System.err.println("--------------消费者--------------------------------------");
            System.err.println(message);
            channel.basicAck(tag,false);
        }

    }



    /**
     * 死信监听
     * @param message 消息体
     * @param channel 信道
     * @param tag 标签
     * @param death
     * @author senfel
     * @date 2023/6/5 14:30
     * @return void
     */
    @RabbitListener(
            bindings = @QueueBinding(
                    value = @Queue(MqChannel.INPUT_CHANNEL_DLQ)
                    , exchange = @Exchange(MqChannel.destination)
            ),
            concurrency = "1-5"
    )
    public void processByDlq(String message,
                             @Header(AmqpHeaders.CHANNEL) Channel channel,
                             @Header(AmqpHeaders.DELIVERY_TAG) long tag) throws Exception {

        log.info("message : "+message);
        System.err.println("---------------死信消费者------------------------------------");
        System.err.println(message);
    }
}

controller

/**
 * @author senfel
 * @version 1.0
 * @date 2023/6/2 17:27
 */
@RestController
public class TestController{
    @Resource
    private TestMQService testMQService;

    @GetMapping("/test")
    public String testMq(String str){
        testMQService.send(str);
        return str;
    }
}

5、模拟正常消息消费

在这里插入图片描述在这里插入图片描述

6、模拟异常消息

异常消息重试满足3次投递后进入死信消费
在这里插入图片描述
在这里插入图片描述
加粗样式

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.coloradmin.cn/o/612005.html

如若内容造成侵权/违法违规/事实不符,请联系多彩编程网进行投诉反馈,一经查实,立即删除!

相关文章

Web端3D模型轻量化工具如何实现建筑行业“数字化”建设?

随着数字化技术的飞速发展&#xff0c;建筑行业也在不断寻找新的技术手段来提供高产能和建筑质量。其中&#xff0c;Web端3D模型轻量化工具HOOPS Communicator SDK在建筑行业中的应用不断地得到了市场的广泛注意和应用。本文将深入探讨HOOPS Communicator在建筑行业中的应用及其…

转动的车轮-第14届蓝桥杯国赛Scratch真题初中级组第2题

[导读]&#xff1a;超平老师的《Scratch蓝桥杯真题解析100讲》已经全部完成&#xff0c;后续会不定期解读蓝桥杯真题&#xff0c;这是Scratch蓝桥杯真题解析第144讲。 转动的车轮&#xff0c;本题是2023年5月28日上午举行的第14届蓝桥杯国赛Scratch图形化编程初中级组真题第2题…

Python数据攻略-DataFrame的数据操作

大家好&#xff0c;我是Mr数据杨&#xff0c;今天我们就来谈谈Python中的数据访问和修改。 首先&#xff0c;你们一定都听过《三国演义》吧&#xff0c;里面的人物和事情其实就像我们Python中的数据。比如曹操就像我们的数据元素&#xff0c;他的性格特点、军事才能等就像我们…

5年开发经验,看完这份37W字Java高性能架构,终于拿到架构师薪资

其实现在很多的开发人员并不能解决从架构的角度全方位地了解在Java编程过程中各阶段会出现的典型问题&#xff0c;更没办法深入到底层原理了解问题出现的原因&#xff01; 且随着当下面试越来越深入到底层&#xff0c;如果大家对于底层的原理不了解的话&#xff0c;是很难做出…

JDK11+mybatis-plus+shardingsphere分库分表

1、引入jar dynamic-datasource-spring-boot-starter&#xff1a;2.5.6 sharding-jdbc-spring-boot-starter&#xff1a;4.1.1 <dependency><groupId>com.baomidou</groupId><artifactId>dynamic-datasource-spring-boot-starter</artifactId>&…

微调样本质量胜于数量 LIMA: Less Is More for Alignment

1、总体介绍 大型语言模型的训练分为两个阶段&#xff1a;&#xff08;1&#xff09;从原始文本中进行无监督的预训练&#xff0c;以学习通用的表征&#xff1b;&#xff08;2&#xff09;大规模的指令学习和强化学习&#xff0c;以更好地适应最终任务和用户的偏好。 作者通过…

作为网络安全工程师,都有哪些公司可以选?

招聘平台 首选内推 其次是公司自有招聘平台 再是第三方平台&#xff1a;boos直聘、前程无忧、拉钩、猎聘、牛客、牛聘 乙方 启明星辰 商标&#xff1a;云众可信&#xff0c;云子可信 投资&#xff1a;网御星云&#xff0c;恒安嘉新 拳头产品&#xff1a;Secin 社区、天清…

企企通×天能股份SRM一期项目成功上线,持续深化企业采购数字化

近期&#xff0c;企企通凭借在赋能客户数字化转型方面的优秀实践与丰富的解决方案&#xff0c;荣获天能电池集团股份有限公司&#xff08;以下简称“天能股份”&#xff09;颁发的“2022年度数字化优秀供应商奖”&#xff0c;同时&#xff0c;企企通SRM项目还获得天能股份采购管…

vue-cli4打包优化

项目开始时webpack配置 vue-cli3以后&#xff0c;我们修改webpack配置&#xff0c;需要自己在项目根路径下创建vue.config.js文件。 一、 配置 proxy 跨域 使用vue-cli发开项目&#xff0c;在本地开发环境中&#xff0c;如果遇到跨域的问题。可以通过配置proxy的方式&#xff…

uniapp(二) 之 uniapp 搭建与组件库的引用

小扩展&#xff1a; rpx&#xff08;responsive pixel&#xff09;:可以根据屏幕宽度自适应。规定屏幕宽度为750rpx。如果iphon6上&#xff0c;屏幕宽度为375px,共有750个像素&#xff0c;则750rpx 375培训 750物理像素&#xff0c;1rpx 0.5px 1物理像素。 页面跳转&#xff…

你知道TikTok的推荐算法吗?TikTok数据分析平台哪家好?

作为当下最受欢迎的社交媒体&#xff0c;TikTok这几年的成绩大家也是有目共睹了&#xff0c;超10亿的月活加上大量活跃的年轻人&#xff0c;让无数企业和品牌为之心动。入局的人越来越多&#xff0c;想要在众多竞争者中脱颖而出&#xff0c;入局前需要了解TikTok底层逻辑和推荐…

Treadlocal源码实例详解

我们都知道treadlocal维护变量时候&#xff0c;可以为每个线程维护一个独立的副本&#xff0c;改变的是自己线程的数据。 ThreadLocal公用方法有四个&#xff1a;get&#xff0c;set&#xff0c;remove&#xff0c;intiValue 既然threadLocalMap是局部变量&#xff0c;所以他存…

内网安全:内网渗透.(拿到内网主机最高权限 vulntarget 靶场 1)

内网安全&#xff1a;内网渗透.&#xff08;拿到内网主机最高权限&#xff09; 内网穿透又被称为NAT穿透&#xff0c;内网端口映射外网&#xff0c;在处于使用了NAT设备的私有TCP/IP网络中的主机之间建立连接的问题。通过映射端口&#xff0c;让外网的电脑找到处于内网的电脑。…

中国人民大学与加拿大女王大学金融硕士——人生选对方向很重要

有人说&#xff0c;人生最重要的不是财富、不是荣誉&#xff0c;而是选择一条正确的道路。选择正确的方向&#xff0c;对一个人的成长和事业的成功与否&#xff0c;起着决定作用。有了方向&#xff0c;你前进的每一步都跟接近幸福。在职计划读研的你有了解过中国人民大学与加拿…

Linux - 第23节 - Linux高级IO(一)

1.IO的基本概念 IO的概念&#xff1a; I/O&#xff08;input/output&#xff09;也就是输入和输出&#xff0c;在著名的冯诺依曼体系结构当中&#xff0c;将数据从输入设备拷贝到内存就叫做输入&#xff0c;将数据从内存拷贝到输出设备就叫做输出。 • 对文件进行的读写操作本质…

SpringBoot注解详解,建议收藏!

一、简介 基于 SpringBoot 平台开发的项目数不胜数&#xff0c;与常规的基于Spring开发的项目最大的不同之处&#xff0c;SpringBoot 里面提供了大量的注解用于快速开发&#xff0c;而且非常简单&#xff0c;基本可以做到开箱即用! 那 SpringBoot 为开发者提供了多少注解呢?…

《大数据技术与应用》课程实验报告|week12|实验8|Pig——高级编程环境|验证评估函数

目录 一、实验内容 二、实验目的 三、实验设备 四、实验步骤 步骤一 步骤二 步骤三 步骤四 步骤五 步骤六 步骤七 步骤八 步骤九 步骤十 步骤十一 步骤十二 步骤十三 步骤十四 步骤十五 步骤十六 五、实验结果 六、实验小结 一、实验内容 验证19.5节中的…

亚马逊云科技携手木卫四,为汽车行业智能安全赋能

木卫四&#xff08;北京&#xff09;科技有限公司在汽车网络安全领域拥有独特专业知识&#xff0c;其融合人工智能算法的安全检测引擎可以不依赖车辆中安装的代理软件&#xff0c;只需几周即可快速部署实施&#xff0c;是汽车网络安全领域的技术领先者。 在亚马逊云科技初创团…

chatgpt赋能python:Python同一行多个语句:如何提高你的编程效率?

Python同一行多个语句&#xff1a;如何提高你的编程效率&#xff1f; Python是一种优雅的编程语言&#xff0c;拥有简洁易懂的语法&#xff0c;可以帮助你快速编写可以在各种领域使用的高级代码。其中&#xff0c;Python同一行多个语句&#xff0c;是一种可以大大提高编程效率…

Springboot +spring security,基于内存模型实现授权

一.简介 1.1概念 所谓授权&#xff0c;举个例子&#xff1a;某个用户想要访问某个资源(接口、页面、功能等)&#xff0c;我们应该先去检查该用户是否具备对应的权限&#xff0c;如果具备就允许访问&#xff0c;如果不具备&#xff0c;则不允许访问。也就是说&#xff0c;授权…