Rabbit MQ整合springBoot

news2025/1/16 19:02:34

  • 一、pom依赖
  • 二、消费端
    • 2.1、application.properties 配置文件
    • 2.2、消费端核心组件
  • 三、生产端
    • 3.1、application.properties 配置文件
    • 2.2、生产者 MQ消息发送组件
    • 四、测试
    • 1、生产端控制台
    • 2、消费端控制台

一、pom依赖

        <dependency>
            <groupId>org.springframework.boot</groupId>
            <artifactId>spring-boot-starter-web</artifactId>
        </dependency>
        <!--spring整合MQ-->
        <dependency>
            <groupId>org.springframework.boot</groupId>
            <artifactId>spring-boot-starter-amqp</artifactId>
        </dependency>
         <dependency>
            <groupId>org.springframework.boot</groupId>
            <artifactId>spring-boot-starter-test</artifactId>
        </dependency>

二、消费端

2.1、application.properties 配置文件

server.port=8002
#上下文路径
server.servlet.context-path=/
spring.application.name=rabbit_consumer

# MQ配置
spring.rabbitmq.addresses=192.168.220.3:5672
spring.rabbitmq.username=root
spring.rabbitmq.password=root
# 虚拟主机
spring.rabbitmq.virtual-host=/
# 连接超时 15秒
spring.rabbitmq.connection-timeout=15000
# 设置消费端消费成功消息后手动签收消息,默认auto自动签收
spring.rabbitmq.listener.simple.acknowledge-mode=manual
spring.rabbitmq.listener.simple.concurrency=6
# 最大消费线程数,并发数
spring.rabbitmq.listener.simple.max-concurrency=11
# prefetch为限制一次传送给消费者的消息数
spring.rabbitmq.listener.simple.prefetch=1


# 自定义属性配置 MQ
spring.rabbitmq.listener.test.exchange=test_topic_exchange
spring.rabbitmq.listener.test.exchange.type=topic
spring.rabbitmq.listener.test.queue=test_topic1
spring.rabbitmq.listener.test.key=test_topic1.*

2.2、消费端核心组件

package com.xiao.component;

import com.rabbitmq.client.Channel;
import org.springframework.amqp.rabbit.annotation.*;
import org.springframework.amqp.support.AmqpHeaders;
import org.springframework.messaging.Message;
import org.springframework.stereotype.Component;
import java.io.IOException;

@Component
public class RabbitMQReceived {

    @RabbitListener(bindings = @QueueBinding(exchange = @Exchange(name = "${spring.rabbitmq.listener.test.exchange}",
                     type = "${spring.rabbitmq.listener.test.exchange.type}",
                     durable = "true",ignoreDeclarationExceptions = "true"),
                     value = @Queue(value = "${spring.rabbitmq.listener.test.queue}",durable = "true"),
                      key = "${spring.rabbitmq.listener.test.key}"/*,admins = "root"*/

    ))

    /**
     * 监听消息
     * @param message   消息
     * @param channel   通道
     */
    @RabbitHandler
    public void onMessage(Message message, Channel channel) throws IOException {
        System.err.println("=====================================");
        System.err.println("消费端 RabbitMQReceived 消费消息:" + message.getPayload());
        Long deliveryTag = (Long) message.getHeaders().get(AmqpHeaders.DELIVERY_TAG);
        //由于消费端配置手动消费消息后签收机制 spring.rabbitmq.listener.simple.acknowledge-mode=manual
//        channel.basicAck(deliveryTag,false);
        System.err.println("消费端 RabbitMQReceived ack:yes deliveryTag:" + deliveryTag);
    }
}

三、生产端

3.1、application.properties 配置文件

server.port=8001
#上下文路径
server.servlet.context-path=/
spring.application.name=rabbit_produce

# MQ配置
spring.rabbitmq.addresses=192.168.220.3:5672
spring.rabbitmq.port=5672
spring.rabbitmq.username=root
spring.rabbitmq.password=root
# 虚拟主机
spring.rabbitmq.virtual-host=/
# 连接超时 15秒
spring.rabbitmq.connection-timeout=15000
# 开启produce发送给broker的消息确认模式,可靠性投递
spring.rabbitmq.publisher-confirms=true
#spring.rabbitmq.publisher-confirm-type=true  #有点问题
# 针对于broker未接收的消息return机制,需要结合mandatory一起使用
#spring.rabbitmq.template.mandatory=true
#spring.rabbitmq.publisher-returns=true

2.2、生产者 MQ消息发送组件

package com.xiao.component;

import org.springframework.amqp.AmqpException;
import org.springframework.amqp.core.Correlation;
import org.springframework.amqp.core.MessagePostProcessor;
import org.springframework.amqp.rabbit.connection.CorrelationData;
import org.springframework.amqp.rabbit.core.RabbitTemplate;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.messaging.Message;
import org.springframework.messaging.MessageHeaders;
import org.springframework.messaging.support.MessageBuilder;
import org.springframework.stereotype.Component;
import java.util.Map;
import java.util.UUID;

@Component
public class RabbitMQSender {
    @Autowired
    private RabbitTemplate rabbitTemplate;

    //生产者发送消息到broker确认回调接口
    private final RabbitTemplate.ConfirmCallback confirmCallback = new RabbitTemplate.ConfirmCallback() {
        /**
         * @param correlationData   消息的唯一标识
         * @param ack broke         broker是否签收成功
         * @param cause             失败异常信息
         */
        @Override
        public void confirm(CorrelationData correlationData, boolean ack, String cause) {
            String formatStr = String.format("生产端 confirmCallback 相关数据:%s," +
                            "broker签收情况 ack=%s,异常信息:%s" ,
                    correlationData.toString(),ack,cause);
            System.err.println(formatStr);
            /*System.out.println("生产端 confirmCallback 相关数据:" + correlationData);
            System.out.println("生产端 confirmCallback broker签收情况:" + ack);
            System.out.println("生产端 confirmCallback 异常信息:" + cause);*/
        }
    };

    /**
     * 发送消息
     * @param message       消息
     * @param properties    消息对应的属性,如时间
     */
    public void send(Object message, Map<String,Object> properties) {
        MessageHeaders messageHeaders = new MessageHeaders(properties);
        Message<?> msg = MessageBuilder.createMessage(message, messageHeaders);
        rabbitTemplate.setConfirmCallback(confirmCallback);
        //消息发送完后置处理器
        MessagePostProcessor messagePostProcessor = new MessagePostProcessor() {
            @Override
            public org.springframework.amqp.core.Message postProcessMessage(org.springframework.amqp.core.Message message) throws AmqpException {
                System.err.println("生产端 RabbitMQSender send后置处理:" + message);
                return message;
            }

            @Override
            public org.springframework.amqp.core.Message postProcessMessage(org.springframework.amqp.core.Message message, Correlation correlation) {
                System.err.println("生产端 RabbitMQSender send后置处理:" + message+" 消息标识:" + correlation);
                return message;
            }
        };
        //消息唯一属性
        CorrelationData correlationData = new CorrelationData(UUID.randomUUID().toString());

        rabbitTemplate.convertAndSend("test_topic_exchange",//exchange,
                "test_topic1.xiao",// routingKey,
                msg, //message,
                messagePostProcessor,
                correlationData);
    }
}

四、测试

package com.xiao;

import com.xiao.component.RabbitMQSender;
import org.junit.Test;
import org.junit.runner.RunWith;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.boot.test.context.SpringBootTest;
import org.springframework.test.context.junit4.SpringJUnit4ClassRunner;
import java.util.HashMap;
import java.util.Map;

@RunWith(SpringJUnit4ClassRunner.class)
@SpringBootTest
public class SendMessageTest {
    @Autowired
    private RabbitMQSender rabbitMQSender;

    @Test
    public void send() throws InterruptedException {
        Map<String,Object> properties = new HashMap<>(2);
        properties.put("userName","xiao");

        rabbitMQSender.send("hello world!",properties);
        Thread.sleep(5000);//10秒
    }
}

1、生产端控制台

生产端 RabbitMQSender send后置处理:(Body:'[B@3a6045c6(byte[535])' MessageProperties [headers={}, contentType=application/x-java-serialized-object, contentLength=535, deliveryMode=PERSISTENT, priority=0, deliveryTag=0]) 消息标识:CorrelationData [id=8c78e89d-80f3-4f3d-ba8b-13e863c6295c]
2023-07-21 20:05:37.611  INFO 4536 --- [           main] o.s.a.r.c.CachingConnectionFactory       : Attempting to connect to: [192.168.220.3:5672]
2023-07-21 20:05:37.653  INFO 4536 --- [           main] o.s.a.r.c.CachingConnectionFactory       : Created new connection: rabbitConnectionFactory#6f38a289:0/SimpleConnection@6215366a [delegate=amqp://root@192.168.220.3:5672/, localPort= 4712]
生产端 confirmCallback 相关数据:CorrelationData [id=8c78e89d-80f3-4f3d-ba8b-13e863c6295c],broker签收情况 ack=true,异常信息:null

2、消费端控制台

=====================================
消费端 RabbitMQReceived 消费消息:hello world!
消费端 RabbitMQReceived ack:yes deliveryTag:1

在这里插入图片描述

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.coloradmin.cn/o/794012.html

如若内容造成侵权/违法违规/事实不符,请联系多彩编程网进行投诉反馈,一经查实,立即删除!

相关文章

【lesson4】linux权限

文章目录 权限权限是什么&#xff1f;对人权限对角色和文件权限权限修改改属性改人 权限 权限分为两种对人权限和对角色和文件的权限 权限是什么&#xff1f; 在脑海中我们对权限有一定的理解那么权限的定义到底是什么我们却说不出来&#xff0c;接下来我们来举个例子介绍一…

黑客和网络安全学习资源,限时免费领取,点这里!

统计数据显示&#xff0c;目前我国网安人才缺口达140万之多… 不管你是网络安全爱好者还是有一定工作经验的从业人员 不管你是刚毕业的行业小白还是想跳槽的专业人员 都需要这份超级超级全面的资料 几乎打败了市面上90%的自学资料 并覆盖了整个网络安全学习范畴 来 收藏它&…

MySQL基础(三)用户权限管理

目录 前言 一、概述 二、用户权限类型 1.CREATE 2.DROP 三、用户赋权 例子 四、权限删除 例子 五、用户删除 例子 总结 前言 关于MySQL的权限简单的理解就是MySQL允许你做你权利以内的事情&#xff0c;不可以越界。MySQL服务器通过权限表来控制用户对数据库的访问&…

[SSM]Spring中的JabcTemplate

目录 十三、JdbcTemplate 13.1环境准备 13.2新增 13.3修改 13.4删除 13.5查询 13.6查询一个值 13.7批量添加 13.8批量修改 13.9批量删除 13.10使用回调函数 13.11使用德鲁伊连接池 十三、JdbcTemplate JdbcTemplate是Spring提供的一个JDBC模板类&#xff0c;是对JDBC…

如何使用一个数据库构建一个消耗大量IOPS的应用程序

​我很喜欢关于社交媒体和数据库的创作主意。所以&#xff0c;让我们以一个新的方向来探索&#xff1a;看看Twitch.tv或任何具有即时通讯功能的平台。如果你刚开始接触数据库&#xff0c;可以阅读之前的那篇文章&#xff1a;社交媒体中的“点赞”“喜欢”是如何存储在数据库中的…

ubuntu开机自启动

ubuntu开机自启动 1、建一个test.sh脚本&#xff0c;并写入 #!/bin/sh gnome-terminal -x bash -c ‘cd /home/文件路径/;python3 main.py’ exit 0 2、:wq!保存 3、创建rc-local.service文件&#xff08;sudo vim /etc/systemd/system/rc-local.service&#xff09;&#xf…

Python post请求发送的是Form Data的类型

常规的Form Data 大部分的Form Data 可以直接都是可以通过正常的post请求进行提交的 import requestsheaders {自己设置的请求头键: 自己设置的请求头键,Content-Type: 网页接受的数据类型 }form_data {对应的键1&#xff1a;对应的值1,对应的键2&#xff1a;对应的值2, }r…

【C++】C++11右值引用|新增默认成员函数|可变参数模版|lambda表达式

文章目录 1. 右值引用和移动语义1.1 左值引用和右值引用1.2 左值引用和右值引用的比较1.3右值引用的使用场景和意义1.4 左值引用和右值引用的深入使用场景分析1.5 完美转发1.5.1 万能引用1.5.2 完美转发 2. 新的类功能2.1 默认成员函数2.2 类成员变量初始化2.3 强制生成默认函数…

(链表) 剑指 Offer 25. 合并两个排序的链表 ——【Leetcode每日一题】

❓剑指 Offer 25. 合并两个排序的链表 难度&#xff1a;简单 输入两个递增排序的链表&#xff0c;合并这两个链表并使新链表中的节点仍然是递增排序的。 示例1&#xff1a; 输入&#xff1a;1->2->4, 1->3->4 输出&#xff1a;1->1->2->3->4->4 …

畅捷通TPlus DownloadProxy.aspx 存在任意文件读取漏洞 附POC

文章目录 畅捷通TPlus DownloadProxy.aspx 存在任意文件读取漏洞 附POC1. 畅捷通TPlus DownloadProxy.aspx 简介2.漏洞描述3.影响版本4.fofa查询语句5.漏洞复现6.POC&EXP7.整改意见8.往期回顾 畅捷通TPlus DownloadProxy.aspx 存在任意文件读取漏洞 附POC 免责声明&#x…

Unity Profiler或UPR连接WebGL应用出错

问题 在使用Unity Build出WebGL应用进行性能测试的时候&#xff0c;勾选上了 Development Build和Autoconnect Profiler&#xff0c;分别使用Profiler和UPR进行测试 现象 使用Profiler测试时&#xff0c;就收到几帧&#xff0c;然后就没了 使用UPR进行测试时&#xff0c;在…

javascript 7种继承-- 寄生组合式继承(6)

文章目录 概要继承的进化史技术名词解释寄生组合式继承案列分析源代码解析效果图调用父类构造函数次数正常数据也不会混乱 小结 概要 这阵子在整理JS的7种继承方式&#xff0c;发现很多文章跟视频&#xff0c;讲解后都不能让自己理解清晰&#xff0c;索性自己记录一下&#xf…

RNN架构解析——LSTM模型

目录 LSTMLSTM内部结构图 Bi-LSTM实现 优点和缺点 LSTM LSTM内部结构图 Bi-LSTM 实现 优点和缺点

SpringMVC 有趣的文件

文章目录 SpringMVC 文件上传--文件下载-ResponseEntity<T>文件下载-ResponseEntity<T>案例演示代码应用小结完成测试(页面方式) SpringMVC 文件上传基本介绍应用实例需求分析/图解代码实现完成测试( 页面方式) SpringMVC 文件上传–文件下载-ResponseEntity 文件…

13.3 【Linux】主机的细部权限规划:ACL 的使用

13.3.1 什么是 ACL 与如何支持启动 ACL ACL 是 Access Control List 的缩写&#xff0c;主要的目的是在提供传统的 owner,group,others 的read,write,execute 权限之外的细部权限设置。ACL 可以针对单一使用者&#xff0c;单一文件或目录来进行 r,w,x 的权限规范&#xff0c;对…

三层架构与MVC模式

MVC模式 MVC模式是软件工程中常见的一种软件架构模式&#xff0c;该模式把软件系统&#xff08;项目&#xff09;分为了三个基本部分&#xff1a;模型(Model)、视图(View)、控制器(Controller)。 视图(View) 负责界面的显示&#xff0c;以及与用户的交互功能&#xff0c;例如表…

【解析excel】利用easyexcel解析excel

【解析excel】利用easyexcel解析excel POM监听类工具类测试类部分测试结果备注其他 EasyExcel Java解析、生成Excel比较有名的框架有Apache poi、jxl。但他们都存在一个严重的问题就是非常的耗内存&#xff0c;poi有一套SAX模式的API可以一定程度的解决一些内存溢出的问题&…

【代码随想录 | Leetcode | 第十一天】字符串 | 反转字符串 | 反转字符串 II | 替换空格 | 反转字符串中的单词 | 左旋转字符串

前言 欢迎来到小K的Leetcode|代码随想录|专题化专栏&#xff0c;今天将为大家带来字符串~反转字符串 | 反转字符串 II | 替换空格 | 反转字符串中的单词 | 左旋转字符串的分享✨ 目录 前言344. 反转字符串541. 反转字符串 II剑指 Offer 05. 替换空格151. 反转字符串中的单词剑…

Linux:centos7:zabbix4.0(安装,监控》Linux》Windows》网络设备)

环境 centos7&#xff08;zabbix服务器&#xff09;内网ip&#xff1a;192.168.254.11 外网ip&#xff1a;192.168.0.188&#xff08;去网络yum源下载&#xff09; centos7&#xff08;被监控端&#xff09;内网ip&#xff1a;192.168.254.33win10&#xff08;被监控端&…

怎么学习Java安全性和加密相关知识?

学习Java安全性和加密相关知识是非常重要的&#xff0c;特别是在开发涉及敏感数据的应用程序时。以下是学习Java安全性和加密的一些建议&#xff1a; 基础知识&#xff1a; 首先&#xff0c;了解计算机网络安全的基本概念&#xff0c;包括加密、解密、哈希算法、数字签名等。…