java springBoot实现RabbitMq消息队列 生产者,消费者

news2025/1/21 17:58:59

1.RabbitMq的数据源配置文件

# 数据源配置
spring:
    rabbitmq:
        host: 127.0.0.1
        port: 5672
        username: root
        password: root
        #消息发送和接收确认
        publisher-confirms: true
        publisher-returns: true
        listener:
            direct:
                acknowledge-mode: manual
            simple:
                acknowledge-mode: manual
                retry:
                    enabled: true #是否开启消费者重试
                    max-attempts: 5 #最大重试次数
                    initial-interval: 2000 #重试间隔时间(单位毫秒)

2.maven依赖

<!-- rabbitmq -->
<dependency>
    <groupId>org.springframework.boot</groupId>
    <artifactId>spring-boot-starter-amqp</artifactId>
</dependency>

3.RabbitMq文件目录预览

4. RabbitMq的Action文件

package com.zq.cnz.mq.constant;

public enum Action {
	ACCEPT, // 处理成功
	RETRY, // 可以重试的错误
	REJECT, // 无需重试的错误
}

5.RabbitMq的QueueContent文件

package com.zq.cnz.mq.constant;

/**
 * @ClassName: QueueContent
 * @Description: 消息队列名称
 * @author 吴顺杰
 * @date 2023年11月15日
 *
 */
public class QueueContent {
	/**
	 * 测试消息队列
	 */
	public static final String TEST_MQ_QUEUE = "test_mq_queue";


	/**
	 * 测试消息队列交换机
	 */
	public static final String TEST_MQ_QUEUE_EXCHANGE = "test_mq_queue_exchange";

	/**
	 * 测试消息延迟消费队列
	 */
	public static final String TEST_MQ_QUEUE_TIME_DELAY_EXCHANGE = "test_mq_queue_time_delay_exchange";

}

6.消息队列生产者MessageProvider方法

package com.zq.cnz.mq;

import com.alibaba.fastjson.JSONObject;
import com.zq.common.utils.IdUtils;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.amqp.AmqpException;
import org.springframework.amqp.core.Message;
import org.springframework.amqp.core.MessagePostProcessor;
import org.springframework.amqp.rabbit.connection.CorrelationData;
import org.springframework.amqp.rabbit.core.RabbitTemplate;
import org.springframework.stereotype.Component;

/**
 * 消息队列生产
 */
@Component
public class MessageProvider implements RabbitTemplate.ConfirmCallback {
	static Logger logger = LoggerFactory.getLogger(MessageProvider.class);

	/**
	 * RabbitMQ 模版消息实现类
	 */
	protected RabbitTemplate rabbitTemplate;

	public MessageProvider(RabbitTemplate rabbitTemplate) {
		this.rabbitTemplate = rabbitTemplate;
		this.rabbitTemplate.setMandatory(true);
		this.rabbitTemplate.setConfirmCallback(this);
	}

	private String msgPojoStr;

	/**
	 * 推送消息至消息队列
	 *
	 * @param msg
	 * @param queueName
	 */
	public void sendMqMessage(String queueName,String msg) {
		try {
			JSONObject object = JSONObject.parseObject(msg);
			String msgId = IdUtils.fastUUID().toString();
			object.put("msgId", msgId);
			msg = object.toString();
			msgPojoStr = msg;
			logger.info("推送消息至" + queueName + "消息队列,消息内容" + msg);
			rabbitTemplate.convertAndSend(queueName, msg);
		} catch (AmqpException e) {
			e.printStackTrace();
			logger.error("推送消息至消息队列异常 ,msg=" + msg + ",queueName=" + queueName, e);
		}
	}

	/**
	 * 推送广播消息
	 *
	 * @param exchangeName
	 * @param msg
	 */
	public void sendFanoutMsg(String exchangeName, String msg) {
		try {
			JSONObject object = JSONObject.parseObject(msg);
			String msgId = IdUtils.fastUUID().toString();
			object.put("msgId", msgId);
			msg = object.toString();
			msgPojoStr = msg;
			logger.info("推送广播消息至交换机" + exchangeName + ",消息内容" + msg);
			rabbitTemplate.convertAndSend(exchangeName, "", msg);
		} catch (AmqpException e) {
			e.printStackTrace();
			logger.error("推送广播至交换机异常 ,msg=" + msg + ",exchangeName=" + exchangeName, e);
		}
	}

	/**
	 * 发送延时消息
	 *
	 * @param queueName
	 * @param msg
	 */
	public void sendTimeDelayMsg(String queueName, String exchangeName, String msg, Integer time) {
		try {
			JSONObject object = JSONObject.parseObject(msg);
			String msgId = IdUtils.fastUUID().toString();
			object.put("msgId", msgId);
			msg = object.toString();
			msgPojoStr = msg;
			logger.info("推送延时消息至" + exchangeName + "," + queueName + "消息队列,消息内容" + msg + ",延时时间" + time + "秒");
			rabbitTemplate.convertAndSend(exchangeName, queueName, msg, new MessagePostProcessor() {
				@Override
				public Message postProcessMessage(Message message) throws AmqpException {
					message.getMessageProperties().setHeader("x-delay", time * 1000);
					return message;
				}
			});
		} catch (AmqpException e) {
			e.printStackTrace();
			logger.error("推送消息至消息队列异常 ,msg=" + msg + ",exchangeName=" + exchangeName + ",queueName=" + queueName
					+ ",time=" + time, e);
		}
	}

	@Override
	public void confirm(CorrelationData correlationData, boolean ack, String cause) {
		if (ack) {
			logger.info(msgPojoStr + ":消息发送成功");
		} else {
			logger.warn(msgPojoStr + ":消息发送失败:" + cause);
		}
	}

}

7.消息队列消费者RabbitMqConfiguration文件配置

package com.zq.cnz.mq;

import com.zq.cnz.mq.constant.QueueContent;
import org.springframework.amqp.core.*;
import org.springframework.amqp.rabbit.connection.ConnectionFactory;
import org.springframework.amqp.rabbit.core.RabbitAdmin;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;

import javax.annotation.Resource;
import java.util.HashMap;
import java.util.Map;

@Configuration
public class RabbitMqConfiguration {

	@Resource
	RabbitAdmin rabbitAdmin;


	// 创建初始化RabbitAdmin对象
	@Bean
	public RabbitAdmin rabbitAdmin(ConnectionFactory connectionFactory) {
		RabbitAdmin rabbitAdmin = new RabbitAdmin(connectionFactory);
		// 只有设置为 true,spring 才会加载 RabbitAdmin 这个类
		rabbitAdmin.setAutoStartup(true);
		return rabbitAdmin;
	}

	/**
	 * 测试消息队列
	 *
	 * @return
	 */
	@Bean
	public Queue TEST_QUEUE() {
		return new Queue(QueueContent.TEST_MQ_QUEUE);
	}

	/**
	 * 测试交换机
	 *
	 * @return
	 */
	@Bean
	FanoutExchange TEST_MQ_QUEUE_EXCHANGE() {
		return new FanoutExchange(QueueContent.TEST_MQ_QUEUE_EXCHANGE);
	}


	/**
	 * 测试延迟消费交换机
	 *
	 * @return
	 */
	@Bean
	public CustomExchange TEST_MQ_QUEUE_TIME_DELAY_EXCHANGE() {
		Map<String, Object> args = new HashMap<>();
		args.put("x-delayed-type", "direct");
		return new CustomExchange(QueueContent.TEST_MQ_QUEUE_TIME_DELAY_EXCHANGE, "x-delayed-message", true, false, args);
	}

	/**
	 * 测试延迟消费交换机绑定延迟消费队列
	 *
	 * @return
	 */
	@Bean
	public Binding banTestQueue() {
		return BindingBuilder.bind(TEST_QUEUE()).to(TEST_MQ_QUEUE_TIME_DELAY_EXCHANGE()).with(QueueContent.TEST_MQ_QUEUE).noargs();
	}


	// 创建交换机和对列,跟上面的Bean的定义保持一致
	@Bean
	public void createExchangeQueue() {
		//测试消费队列
		rabbitAdmin.declareQueue(TEST_QUEUE());
		//测试消费交换机
		rabbitAdmin.declareExchange(TEST_MQ_QUEUE_EXCHANGE());
		//测试延迟消费交换机
		rabbitAdmin.declareExchange(TEST_MQ_QUEUE_TIME_DELAY_EXCHANGE());
	}

}

8.TestQueueConsumer 消息队列消费+延迟消费

package com.zq.cnz.mq.MessageConsumer;

import com.alibaba.druid.util.StringUtils;
import com.alibaba.fastjson.JSONObject;
import com.rabbitmq.client.Channel;
import com.zq.cnz.mq.MessageProvider;
import com.zq.cnz.mq.constant.Action;
import com.zq.cnz.mq.constant.QueueContent;
import com.zq.common.utils.IdUtils;
import com.zq.common.utils.RedisUtils;
import com.zq.common.utils.spring.SpringUtils;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.amqp.core.Message;
import org.springframework.amqp.rabbit.annotation.RabbitHandler;
import org.springframework.amqp.rabbit.annotation.RabbitListener;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.stereotype.Component;

import java.io.IOException;

/**
 * 测试消息队列消费
 */
@Component
@RabbitListener(queues = QueueContent.TEST_MQ_QUEUE)
public class TestQueueConsumer {
    @Autowired
    private RedisUtils redisUtils;
    static final Logger logger = LoggerFactory.getLogger(TestQueueConsumer.class);

    @RabbitHandler
    public void handler(String msg, Channel channel, Message message) throws IOException {
        if (!StringUtils.isEmpty(msg)) {
            JSONObject jsonMsg = JSONObject.parseObject(msg);
//            logger.info("TestQueueConsumer:"+jsonMsg.toJSONString());
            Action action = Action.RETRY;
//			获取消息ID
            String msgId = jsonMsg.getString("msgId");
//			消费次数+1
            redisUtils.incr("MQ_MSGID:" + msgId, 1);
            redisUtils.expire("MQ_MSGID:" + msgId, 60);
            try {
                logger.info("测试消费队列消费成功啦,消费信息:"+jsonMsg.getString("test"));
                action = Action.ACCEPT;
            } catch (Exception e) {
                logger.error("MQ_MSGID:" + msgId + ",站控权限请求关闭接口异常,msg=" + msg, e);
            } finally {
                // 通过finally块来保证Ack/Nack会且只会执行一次
                if (action == Action.ACCEPT) {
                    channel.basicAck(message.getMessageProperties().getDeliveryTag(), false);
                } else if (action == Action.RETRY) {
//					判断当前消息消费次数,已经消费3次则放弃消费
                    if ((Integer) redisUtils.get("MQ_MSGID:" + msgId) >= 3) {
                        logger.error("MQ_MSGID:" + msgId + ",异步处理超出失败次数限制,msg=" + msg);
                        channel.basicNack(message.getMessageProperties().getDeliveryTag(), false, false);
                    } else {
//						回归队列重新消费
                        channel.basicNack(message.getMessageProperties().getDeliveryTag(), false, true);
                    }
                } else {
                    channel.basicNack(message.getMessageProperties().getDeliveryTag(), false, false);
                }
            }
        }
    }
}

9.TestExchangeConsumer 交换机广播模式 

package com.zq.cnz.mq.MessageConsumer;

import com.alibaba.fastjson.JSON;
import com.alibaba.fastjson.JSONArray;
import com.alibaba.fastjson.JSONObject;
import com.alibaba.fastjson.serializer.SerializerFeature;
import com.rabbitmq.client.Channel;
import com.zq.cnz.mq.constant.Action;
import com.zq.cnz.mq.constant.QueueContent;
import com.zq.common.utils.IdUtils;
import com.zq.common.utils.RedisUtils;
import com.zq.common.utils.StringUtils;
import com.zq.common.utils.spring.SpringUtils;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.amqp.core.ExchangeTypes;
import org.springframework.amqp.core.Message;
import org.springframework.amqp.rabbit.annotation.*;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.stereotype.Component;

import javax.annotation.Resource;
import java.io.IOException;
import java.util.List;

/**
 * 测试交换机消费
 */
@Component
@RabbitListener(bindings = @QueueBinding(value = @Queue(), exchange = @Exchange(value = QueueContent.TEST_MQ_QUEUE_EXCHANGE, type = ExchangeTypes.FANOUT)))
public class TestExchangeConsumer {
    static final Logger logger = LoggerFactory.getLogger(TestExchangeConsumer.class);
    @Resource
    private RedisUtils redisUtils;

    @RabbitHandler
    public void handler(String msg, Channel channel, Message message) throws IOException {
        if (!StringUtils.isEmpty(msg)) {
//            logger.info("接收交换机生产者消息:{}", msg);
            Action action = Action.ACCEPT;
            // 请求参数
            JSONObject jsonMsg = JSONObject.parseObject(msg);
//			获取消息ID
            String msgId = jsonMsg.getString("msgId");

//			消费次数+1
            redisUtils.incr("MQ_MSGID:" + msgId, 1);
            redisUtils.expire("MQ_MSGID:" + msgId, 60);
            try {

                Integer CMD = jsonMsg.getInteger("cmd");
                if (CMD==1) {
                    logger.info("cmd1测试消费队列消费成功啦,消费信息:"+jsonMsg.getString("test"));
                }else if(CMD==2){
                    logger.info("cmd2测试消费队列消费成功啦,消费信息:"+jsonMsg.getString("test"));
                }
                action = Action.ACCEPT;
            } catch (Exception e) {
                action = Action.REJECT;
                e.printStackTrace();
            } finally {
                // 通过finally块来保证Ack/Nack会且只会执行一次
                if (action == Action.ACCEPT) {
                    channel.basicAck(message.getMessageProperties().getDeliveryTag(), false);

                } else if (action == Action.RETRY) {
//					判断当前消息消费次数,已经消费3次则放弃消费
                    if ((Integer) redisUtils.get("MQ_MSGID:" + msgId) >= 3) {
                        logger.error("MQ_MSGID::" + msgId + ",换电失败消息队列消费了三次,msg=" + msg);
                        channel.basicNack(message.getMessageProperties().getDeliveryTag(), false, false);
                    } else {
//						回归队列重新消费
                        channel.basicNack(message.getMessageProperties().getDeliveryTag(), false, true);
                    }
                } else {
                    channel.basicNack(message.getMessageProperties().getDeliveryTag(), false, false);
                }

            }
        }
    }
}

运行项目 调用RabbitmqTestController生产RabbitMq消息体, TestExchangeConsumer和TestQueueConsumer自动消费

package com.zq.web.controller.tool;
import com.alibaba.fastjson.JSONObject;
import com.zq.cnz.mq.MessageProvider;
import com.zq.cnz.mq.constant.QueueContent;
import com.zq.common.utils.IdUtils;
import org.springframework.web.bind.annotation.GetMapping;
import org.springframework.web.bind.annotation.RequestMapping;
import org.springframework.web.bind.annotation.RestController;
import javax.annotation.Resource;


/**
 * 消息队列测试
 */
@RestController
@RequestMapping("/test/mq")
public class RabbitmqTestController {
    @Resource
    private MessageProvider messageProvider;

    /**
     * 查询储能站信息列表
     */
    @GetMapping("/putMq")
    public void putMq(){
        JSONObject obj=new JSONObject();
        obj.put("test","测试数据");
        //推送消息至消息队列
       messageProvider.sendMqMessage(QueueContent.TEST_MQ_QUEUE,obj.toString());
        obj.put("cmd",1);
        obj.put("test","这是广播消费");
        //推送广播消息
        messageProvider.sendFanoutMsg(QueueContent.TEST_MQ_QUEUE_EXCHANGE,obj.toString());
        //发送延时消息
        obj.put("cmd",2);
        obj.put("test","这是延迟消费");
        messageProvider.sendTimeDelayMsg(QueueContent.TEST_MQ_QUEUE,QueueContent.TEST_MQ_QUEUE_TIME_DELAY_EXCHANGE,obj.toString(),2*60);

    }
}

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.coloradmin.cn/o/1211508.html

如若内容造成侵权/违法违规/事实不符,请联系多彩编程网进行投诉反馈,一经查实,立即删除!

相关文章

大数据分析师职业技能提升好考吗?含金量高不高

随着大数据时代的到来&#xff0c;大数据分析技能需求已经成为很多企业和机构的必备要求。大数据分析师证书成为当下的热门之一&#xff0c;那么大数据分析师证书需要具备哪些条件呢&#xff1f; 首先&#xff0c;报考大数据分析师证书需要具备以下方面的条件&#xff1a; …

如何让组织的KPI成为敏捷转型的推手而不是杀手 | IDCF

作者&#xff1a;IDCF学员 伍雪锋 某知名通讯公司首席敏捷教练&#xff0c;DevOps布道者。2020年到2021年小100人团队从0-1初步完成敏捷转型&#xff0c;专注传统制造业的IT转型&#xff0c;研发效能提升。 一、前言 在公司我们常常听见这么一个流传的故事&#xff0c;只要…

【Linux】非堵塞轮询

堵塞轮询&#xff1a; 堵塞轮询是我们最简单的一种等待方式也是最常应用的等待方式。 但是&#xff0c;一旦阻塞等待也就意味着我们当前在进行等待的时候&#xff0c;父进程什么都干不了。 非堵塞轮询&#xff1a; 其中非阻塞等待&#xff0c;是等待的一种模式&#xff0c; 在…

【蓝桥杯选拔赛真题21】C++行李运费 第十二届蓝桥杯青少年创意编程大赛C++编程选拔赛真题解析

C/C++行李运费 第十二届蓝桥杯青少年创意编程大赛C++选拔赛真题 一、题目要求 1、编程实现 乘坐飞机时,行李超出规定重量后,会对行李进行托运且收取托运费。 以下是某航空公司行李托运的收费标准:“行李重量在 20 公斤内(含 20)按照每公斤 10 元收取费用,超过 20 公斤的…

es安装方式

es安装方式 1.下载镜像的方式 分词器 kibana和es和容器互通的方式 docker network create es-net开始拉去镜像的方式 docker pull kibana:7.12.1运行镜像的方式 docker run -d \--name es \-e "ES_JAVA_OPTS-Xms512m -Xmx512m" \-e "discovery.typesingle-…

基于引力搜索算法优化概率神经网络PNN的分类预测 - 附代码

基于引力搜索算法优化概率神经网络PNN的分类预测 - 附代码 文章目录 基于引力搜索算法优化概率神经网络PNN的分类预测 - 附代码1.PNN网络概述2.变压器故障诊街系统相关背景2.1 模型建立 3.基于引力搜索优化的PNN网络5.测试结果6.参考文献7.Matlab代码 摘要&#xff1a;针对PNN神…

表单提交是

首先&#xff0c;确保你已经安装了Vue 3、Element UI和axios&#xff08;用于发送HTTP请求&#xff09;。你可以使用以下命令进行安装&#xff1a; bash复制代码 npm install vuenext element-ui axios --save <template> <el-form :model"form" :rules&q…

第四代智能井盖传感器,万宾科技助力城市安全

在迈向更为智能化、相互联系更为紧密的城市发展过程中&#xff0c;智能创新产品无疑扮演了一种重要的角色。智能井盖传感器作为新型科学技术产物&#xff0c;不仅解决传统井盖管理难的问题&#xff0c;也让城市变得更加安全美好&#xff0c;是城市生命线的一层重要保障。这些平…

第四代管网水位监测仪:高精度管网水位监测仪推荐

地下排水管网就像城市的“毛细血管”&#xff0c;承载着排放雨水、地表水和废水的重任&#xff0c;为城市地下生命线的正常运行保驾护航。然而在极端天气事件的侵袭下&#xff0c;排水系统可能面临巨大压力导致排水不畅。而且排水管网涉及范围较广&#xff0c;以前一般都是依靠…

突发!奥特曼宣布暂停ChatGPT Plus新用户注册!

大新闻&#xff01;就在刚刚&#xff01; OpenAI的CEO Sam Altman宣布暂停ChatGPT Plus 新用户注册&#xff01; Sam Altman对此解释道&#xff1a; 由于OpenAI开发日后ChatGPT使用量的激增超出了我们的承受能力&#xff0c;我们希望确保每个人都有良好的体验。 您仍然可以在a…

C语言之深入指针(二)(详细教程)

C语言之深入指针 文章目录 C语言之深入指针1. 传值调用和传址调用2. 数组名的理解3. 使用指针访问数组3. ⼀维数组传参的本质 1. 传值调用和传址调用 写一个函数用来交换a b的值 传值调用&#xff1a; #include <stdio.h> void Swap1(int x, int y) {int tmp 0;tmp x;…

ExoPlayer架构详解与源码分析(8)——Loader

系列文章目录 ExoPlayer架构详解与源码分析&#xff08;1&#xff09;——前言 ExoPlayer架构详解与源码分析&#xff08;2&#xff09;——Player ExoPlayer架构详解与源码分析&#xff08;3&#xff09;——Timeline ExoPlayer架构详解与源码分析&#xff08;4&#xff09;—…

推荐一个Node.js多版本管理的可视化工具

关于Node.js的开发者来说&#xff0c;在开发机器上管理多个不同版本的Node.js是一个常见痛点。之前在开发者安全大全专栏中&#xff0c;提到过解决方法&#xff1a;使用nvm&#xff0c;如果对于nvm还不了解的话&#xff0c;可以前往了解。 对于TJ来说&#xff0c;因为习惯敲命…

优雅写代码之《项目规范》-附加树状图生成

阿丹&#xff1a; 最近有一些小伙伴在跳槽之后接触到了新的项目小组&#xff0c;在讨论如何整理出漂亮的项目结构以及代码书写的时候&#xff0c;既然有小伙伴发问了&#xff0c;那当然就要一起学习&#xff0c;来&#xff01;开卷&#xff01;本文章只作为一个分享&#xff0c…

nvm下载安装以及配置

1. nvm下载 nvm各版本下载链接&#xff1a;Releases coreybutler/nvm-windows GitHub 建议下载安装版的&#xff0c;非安装版还需要额外配置环境变量。 2. nvm安装 注意&#xff1a;在安装 NVM for Windows 之前卸载任何现有版本的 Node.js&#xff08;否则你会遇到版本冲突…

性能测试 —— Jmeter接口处理不低于200次/秒-场景

需求&#xff1a;期望某个接口系统的处理能力不低于200次/秒&#xff0c;如何设计&#xff1f; ①这个场景是看服务器对某个接口的TPS值是否能大于等于200&#xff0c;就可以了&#xff1b; ②系统处理能力&#xff1a;说的就是我们性能测试中的TPS&#xff1b; ③只要设计一…

电子电机行业万界星空科技MES解决方案

现在电子电机行业规模越来越大&#xff0c;也伴随着生产和管理成本走向变高的现象。针对这个问题&#xff0c;mes系统就成为各电子电机制造业的最优选择。 电子机电行业MES涵盖了从原材料采购到最终产品交付的整个过程&#xff0c;包括生产计划、物料管理、生产过程监控、质量…

【漏洞复现】浙大恩特客户资源管理系统 fileupload.jsp 任意文件上传漏洞

文章目录 前言声明一、系统概述二、漏洞描述三、资产探测四、漏洞复现五、修复建议 前言 杭州恩软信息技术有限公司客户资源管理系统fileupload.jsp接口存在安全漏洞&#xff0c;攻击者可通过上传恶意脚本应用&#xff0c;获取服务器控制权限。 声明 请勿利用文章内的相关技术…

Docker之安装mysql主从复制

安装mysql主从复制 1、新建主服务器容器实例3307 docker run -p 3307:3306 --name mysql-master \ -v /mydata/mysql-master/log:/var/log/mysql \ -v /mydata/mysql-master/data:/var/lib/mysql \ -v /mydata/mysql-master/conf:/etc/mysql \ -e MYSQL_ROOT_PASSWORDroot \…

通过cpolar实现外网ssh远程连接linux

现在我有个想法&#xff0c;就是希望通过外网能够远程连接到我的开发板。这里我们就需要使用到一种技术&#xff0c;内网穿透。 内网穿透是一种将内部网络中的设备通过外网进行访问的技术。在linux系统中&#xff0c;实现内网穿透有多种方式&#xff0c;其中最常见的方法是使用…