rabbitMq举例

news2025/1/23 7:22:18

新来个技术总监,把 RabbitMQ 讲的那叫一个透彻,佩服!

生产者

代码举例

public String sendMsg(final String exchangeName,final String routingKey,final String msg) {}

/**
	 * 发送消息
	 * @param exchangeName	exchangeName
	 * @param routingKey	routingKey
	 * @param msg	mq message
	 * @return msgId
	 */
	public String sendMsg(final String exchangeName,final String routingKey,final String msg) {
		final CorrelationData correlationDataInfo = new CorrelationData();
		final String msgId = String.join(":", "yl:dps", exchangeName, routingKey,
				String.valueOf(IdUtil.getSnowflake(1, 3).nextId()));
		correlationDataInfo.setId(msgId);

		//send over callback log
		rabbitTemplate.setConfirmCallback((correlationData, ack, cause) -> {
			if(correlationData!=null) {
				final String id = correlationData.getId();
				if (ack) {
					log.info("消息投递成功,消息Id[{}]【{}】", id, JSON.toJSONString(correlationData));
				} else {
					log.error("消息投递失败,消息Id[{}] [{}]", id, cause);
				}
			}else {
				log.error("消息投递失败,correlationData为null!");
			}
		});
		rabbitTemplate.convertAndSend(exchangeName, routingKey, msg, correlationDataInfo);
		

分析代码

这段代码的功能是向 RabbitMQ 发送消息,并且在发送消息后通过回调函数来确认消息的投递状态。下面是对代码的详细分析:

1. 方法签名

java

public String sendMsg(final String exchangeName, final String routingKey, final String msg)

  • 该方法接收三个参数:
    • exchangeName:消息发送的交换机名称。
    • routingKey:消息的路由键,用来确定消息将被路由到哪个队列。
    • msg:待发送的消息内容。

2. CorrelationData 创建与消息 ID 设置

java

final CorrelationData correlationDataInfo = new CorrelationData(); final String msgId = String.join(":", "yl:dps", exchangeName, routingKey, String.valueOf(IdUtil.getSnowflake(1, 3).nextId())); correlationDataInfo.setId(msgId);

  • CorrelationData:RabbitMQ 提供的一个类,用于设置和传递消息的相关信息,主要用来在消息确认回调中标识消息。
  • 生成 msgIdmsgId 是通过拼接一个前缀 "yl:dps"、交换机名 exchangeName、路由键 routingKey,以及通过 IdUtil.getSnowflake(1, 3).nextId() 生成的雪花算法(Snowflake)ID来创建的唯一标识符。这个 ID 用来唯一标识这条消息。
  • setId(msgId):将生成的 msgId 设置到 correlationDataInfo 对象中。

3. 设置回调函数

java

rabbitTemplate.setConfirmCallback((correlationData, ack, cause) -> { if(correlationData != null) { final String id = correlationData.getId(); if (ack) { log.info("消息投递成功,消息Id[{}]【{}】", id, JSON.toJSONString(correlationData)); } else { log.error("消息投递失败,消息Id[{}] [{}]", id, cause); } } else { log.error("消息投递失败,correlationData为null!"); } });

  • setConfirmCallback:通过 rabbitTemplate.setConfirmCallback() 方法设置消息确认回调函数。在消息发送之后,RabbitMQ 会调用该回调函数来通知消息是否成功投递到交换机。
  • 参数说明
    • correlationData:携带消息相关信息的对象(这里是 correlationDataInfo),包含了消息的 ID 等。
    • ack:布尔值,表示消息是否成功投递(true 表示成功,false 表示失败)。
    • cause:如果 ack 为 falsecause 会包含失败的原因。
  • 回调内容
    • 如果 correlationData 不为空:
      • 如果 ack 为 true,则表示消息投递成功,记录日志。
      • 如果 ack 为 false,则表示消息投递失败,记录失败日志,并包含失败原因。
    • 如果 correlationData 为 null,记录错误日志。

4. 发送消息

 

java

rabbitTemplate.convertAndSend(exchangeName, routingKey, msg, correlationDataInfo);

  • convertAndSend:该方法用于将消息发送到指定的交换机(exchangeName)、路由键(routingKey)和消息内容(msg)。同时,将 correlationDataInfo(包含 msgId)附带发送,以便在回调中进行确认和记录。
  • rabbitTemplate 是 RabbitTemplate 对象,用来发送和接收消息。它是 Spring AMQP 的核心组件,封装了与 RabbitMQ 的交互。

5. 返回消息 ID

return msgId;

  • 返回生成的消息 ID。这个 ID 可以在外部被用于追踪消息投递的状态。

总结:

这段代码的主要作用是:

  • 生成一个唯一的消息 ID,包含了交换机名、路由键和通过 Snowflake 算法生成的 ID。
  • 设置消息确认回调,当消息发送后,RabbitMQ 会通过回调通知消息是否成功投递。
  • 调用 rabbitTemplate.convertAndSend() 方法向指定的交换机发送消息,同时附带 CorrelationData(包含消息 ID)。
  • 返回生成的消息 ID,以便后续追踪或处理。

关键点:

  • 消息确认机制:使用 ConfirmCallback 确保消息是否成功投递,并且通过日志记录投递状态。
  • 消息 ID:通过 Snowflake 算法生成一个唯一的消息 ID,可以用于追踪和日志记录。
  • 回调机制:确保生产者能在消息投递过程中获得详细的确认信息。

消费者

代码举例


    @RabbitHandler
    @RabbitListener(
            bindings = @QueueBinding(value = @Queue(value = "${rabbitmq.yl.x.queue}", durable = "true"),
                    exchange = @Exchange(value = "${rabbitmq.yl.x.exchange}"),
                    key = "${rabbitmq.yl.x.routing.key}"))
    public void stateChange(@Payload String payload, @Headers Map<String, Object> headers, Channel channel) throws IOException {
        log.info("MQ-收到【x】状态变更消息:{} ", payload);

        final String objDeliveryTag = String.valueOf(headers.get(AmqpHeaders.DELIVERY_TAG));

        if (NumberUtil.isNumber(objDeliveryTag)) {
            final long deliveryTag= Long.parseLong(objDeliveryTag);
            log.info("MQ-收到【x】MQ消息进行消费【执行ack】[{}]", deliveryTag);
            channel.basicAck(deliveryTag,true);
        }

        if(CharSequenceUtil.isNotBlank(payload)) {}

        }

在这个 Java 方法 leaseStateChange 中,通常是在使用 Spring AMQP 框架与 RabbitMQ 集成时出现的消费消息的处理方法。下面逐个解析这个方法中的各个部分:

方法签名解析

public void leaseStateChange(@Payload String payload, @Headers Map<String, Object> headers, Channel channel) throws IOException

1. public void leaseStateChange
  • public: 表明这个方法是公开的,可以被外部调用。
  • void: 表明这个方法没有返回值。
  • leaseStateChange: 这是方法的名称。可以推测这个方法是用来处理与“租赁状态变化”相关的消息的。
2. @Payload String payload
  • @Payload: 这是 Spring AMQP 框架中的一个注解,用于将消息体的内容绑定到方法参数中。在这个例子中,payload 是一个 String 类型的参数,代表从 RabbitMQ 队列中接收到的消息内容。通常,这个消息体是通过 JSON 或其他格式的字符串传递的。

    • 注解的作用: @Payload 使得 Spring 能够自动将消息的主体部分注入到方法参数 payload 中。比如,如果消息体是一个 JSON 格式的字符串,Spring 会将其直接赋值给 payload 参数。

    • 示例: 假设接收到的消息体是 "{"state": "active", "leaseId": "12345"}"payload 将会是该字符串。

3. @Headers Map<String, Object> headers
  • @Headers: 这是另一个 Spring AMQP 注解,用来将消息的头部信息注入到方法参数中。RabbitMQ 消息不仅有消息体(payload),还可能包含一些头信息(比如消息的发送时间、路由信息等)。

    • 注解的作用: @Headers 会将消息头部的内容绑定到 headers 参数,这个参数是一个 Map<String, Object> 类型,其中键是头部的名称,值是相应的值。头部信息常常用于传递一些附加信息(例如消息的优先级、发送者标识等)。

    • 示例: 如果消息头包含如下信息:

      {"correlationId": "abc123", "messageType": "leaseUpdate"}

      那么 headers 将会是一个 Map,其内容是:

      {"correlationId": "abc123", "messageType": "leaseUpdate"}
4. Channel channel
  • Channel: 这是 RabbitMQ 的核心概念之一。Channel 代表一个与 RabbitMQ 服务的连接通道,允许你在该通道上进行消息的消费、确认等操作。

    • 作用: 在 Spring AMQP 中,Channel 通常用来进行消息的确认(acknowledge)操作,或者处理消息处理失败时的重新排队等任务。你可以使用 Channel 来手动确认消息,或者控制消息是否成功消费。

    • 示例: 如果在消息处理过程中出现异常,消费者可能需要通过 channel.basicNack() 方法来拒绝该消息并可能重新入队。

5. throws IOException
  • throws IOException: 表明这个方法可能会抛出 IOException 异常。RabbitMQ 的消息操作可能会遇到 I/O 错误,因此需要在方法签名中声明可能抛出此异常。通常,这类异常会发生在与 RabbitMQ 的连接中断、消息传输过程失败时等。

Spring AMQP 消费者代码示例

假设这是一个处理来自某个队列的消息的方法,下面是该方法的使用场景和完整代码示例:

import org.springframework.amqp.rabbit.annotation.RabbitListener;
import org.springframework.amqp.core.Message;
import org.springframework.amqp.rabbit.listener.MessageListener;
import org.springframework.stereotype.Component;
import com.rabbitmq.client.Channel;

import org.springframework.messaging.handler.annotation.Headers;
import org.springframework.messaging.handler.annotation.Payload;

import java.io.IOException;
import java.util.Map;

@Component
public class LeaseStateChangeListener {

    // 监听指定队列的消息
    @RabbitListener(queues = "leaseStateQueue")
    public void leaseStateChange(@Payload String payload, 
                                 @Headers Map<String, Object> headers, 
                                 Channel channel) throws IOException {
        try {
            // 处理消息体
            System.out.println("Received message: " + payload);

            // 获取消息头部信息
            String correlationId = (String) headers.get("correlationId");
            String messageType = (String) headers.get("messageType");
            System.out.println("CorrelationId: " + correlationId + ", MessageType: " + messageType);

            // 模拟处理业务逻辑
            processLeaseStateChange(payload);

            // 确认消息已成功消费
            channel.basicAck(headers.hashCode(), false);  // 手动确认消息
        } catch (Exception e) {
            // 异常处理,拒绝消息并重新入队
            System.err.println("Error processing message: " + e.getMessage());
            channel.basicNack(headers.hashCode(), false, true); // 拒绝并重新入队
        }
    }

    private void processLeaseStateChange(String payload) {
        // 假设这里是处理租赁状态更新的业务逻辑
        // 比如将消息解析为对象,进行租赁状态更新
        System.out.println("Processing lease state change for payload: " + payload);
    }
}

解析

  • @RabbitListener: 注解的作用是声明这个方法是一个 RabbitMQ 消息的消费者,并且该方法监听 leaseStateQueue 队列。当有消息到达这个队列时,这个方法会被调用。

  • 消息体 (payload): 这个方法会接收到一个消息体,@Payload 注解将该消息的内容(通常是 JSON 格式的字符串)自动绑定到方法参数 payload 上。

  • 消息头 (headers): 使用 @Headers 注解将消息的头部信息绑定到 headers 参数上,Map<String, Object> 类型。你可以从中获取如 correlationIdmessageType 等附加信息。

  • Channel: 这个参数用于消息的确认、拒绝等操作。在成功处理完消息后,调用 channel.basicAck() 来确认消息,表示该消息已经被成功消费。如果处理失败,调用 channel.basicNack() 拒绝该消息,并可以选择是否重新入队。

总结

  • 该方法是一个 RabbitMQ 消费者,用于从指定的队列中消费消息。
  • 通过 @Payload 获取消息体内容,使用 @Headers 获取消息头信息。
  • 使用 Channel 来确认消息的处理状态。
  • 使用 @RabbitListener 注解自动监听队列,并处理消费的消息。

这种方式非常适合处理队列中的业务逻辑,并能够灵活处理消息的确认、拒绝等操作。

消息怎么知道发给哪一个队列

先看队列与交换机怎么绑定的

先创建队列,然后绑定到交换机

RabbitMQ系列-6.如何通过控制台创建交换机、队列、死信队列、延迟队列 - 简书

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.coloradmin.cn/o/2262515.html

如若内容造成侵权/违法违规/事实不符,请联系多彩编程网进行投诉反馈,一经查实,立即删除!

相关文章

基于 uniapp 开发 android 播放 webrtc 流

一、播放rtsp协议流 如果 webrtc 流以 rtsp 协议返回&#xff0c;流地址如&#xff1a;rtsp://127.0.0.1:5115/session.mpg&#xff0c;uniapp的 <video> 编译到android上直接就能播放&#xff0c;但通常会有2-3秒的延迟。 二、播放webrtc协议流 如果 webrtc 流以 webrt…

Meta重磅发布Llama 3.3 70B:开源AI模型的新里程碑

在人工智能领域&#xff0c;Meta的最新动作再次引起了全球的关注。今天&#xff0c;我们见证了Meta发布的Llama 3.3 70B模型&#xff0c;这是一个开源的人工智能模型&#xff0c;它不仅令人印象深刻&#xff0c;而且在性能上达到了一个新的高度。 一&#xff0c;技术突破&#…

游戏AI实现-寻路算法(DFS)

​深度优先搜索算法&#xff08;英语&#xff1a;Depth-First-Search&#xff0c;缩写为DFS&#xff09;是一种用于遍历或搜索树或图的算法。 寻路地图搭建&#xff1a; 游戏AI实现-寻路地图搭建-CSDN博客 算法过程&#xff1a;遍历方向为从竖直向上沿顺时针方向 1.首先将开…

概率论得学习和整理30: 用EXCEL 描述泊松分布 poisson distribution

目录 1 泊松分布的基本内容 1.1 泊松分布的关键点 1.1.1 属于离散分布 1.1.2 泊松分布的特点&#xff1a;每个子区间内概率相等 &#xff0c; λ就是平均概率 1.2 核心参数 1.3 pmf公式 1.4 期望和方差 2 例1&#xff1a;用EXCEL计算泊松分布的概率 3 比较λ不同值时…

八、测试-性能测试

文章目录 前言一、性能测试介绍1. 简介2. 流程3. 指标4. 测试方案5. 性能评估6. 常见性能问题及解决对策 二、测试工具1. Jmeter简介2. Jmeter常见测试框架 三、Jmeter录制脚本1. 基本2. 增强3. 脚本参数化4. 断言5. 关联6. JDBC请求 四、分布式测试五、性能测试报告 前言 性能…

Open-Source Test Automation Tools for Windows Desktop Apps 2022

Do you have a Windows desktop application that needs to be tested to verify if all the different features work seamlessly and according to documentation? We suggest you use test automation—or at least try it in combination with manual testing. Test auto…

【小沐学GIS】基于C++绘制三维数字地球Earth(OpenGL、glfw、glut、QT)第三期

&#x1f37a;三维数字地球系列相关文章如下&#x1f37a;&#xff1a;1【小沐学GIS】基于C绘制三维数字地球Earth&#xff08;456:OpenGL、glfw、glut&#xff09;第一期2【小沐学GIS】基于C绘制三维数字地球Earth&#xff08;456:OpenGL、glfw、glut&#xff09;第二期3【小沐…

【论文笔记】Editing Models with Task Arithmetic

&#x1f34e;个人主页&#xff1a;小嗷犬的个人主页 &#x1f34a;个人网站&#xff1a;小嗷犬的技术小站 &#x1f96d;个人信条&#xff1a;为天地立心&#xff0c;为生民立命&#xff0c;为往圣继绝学&#xff0c;为万世开太平。 基本信息 标题: Editing Models with Task…

c++ [eigen库配置和使用]

实验环境 eigen 3.40 下载链接 https://gitlab.com/libeigen/eigen/-/archive/3.4.0/eigen-3.4.0.zip Visual Studio 2022配置 解压eigen后&#xff0c;在项目中配置包含目录 代码示例 加头文件 #include <Eigen/Dense> using namespace Eigen;矩阵运算 MatrixXd …

SpringBoot完整技术汇总

SpringBoot 注意&#xff1a;SpringBoot技术示例中的项目均已上传至Gitee&#xff0c;均可通过此处自行下载 SpringBoot是由Pivotal团队提供的全新框架&#xff0c;其设计目的是用来简化Spring应用的初始搭建以及开发过程 Spring程序与SpringBoot程序对比如下 Spring程序配置…

windows 使用python共享网络给另外一个网卡

# -*- coding: utf-8 -*- import subprocessdef open_share(to_shared_adapter, from_shared_adapter):"""打开以太网的网络共享:return: None"""powershell_script f"""# Register the HNetCfg library (once)# regsvr32 hnetc…

redis集群 服务器更换ip,怎么办,怎么更换redis集群的ip

redis集群 服务器更换ip&#xff0c;怎么办&#xff0c;怎么更换redis集群的ip 1、安装redis三主三从集群2、正常状态的redis集群3、更改redis集群服务器的ip 重启服务器 集群会down4、更改redis集群服务器的ip 重启服务器 集群down的原因5、更改redis集群服务器的ip后&#xf…

Linux入门攻坚——42、Nginx及web站点架构模式

对于lvs集群&#xff0c;是一个四层路由的集群&#xff0c;Director无需启用对端口的监控&#xff0c;直接将报文转发给后端业务服务器RealServer。 使用Nginx也可以实现集群功能&#xff0c;Nginx实现反向代理&#xff0c;实现的是七层上的转发&#xff0c;要求Nginx本身就是…

Git merge 和 rebase的区别(附图)

在 Git 中&#xff0c;merge 和 rebase 是两种用于整合分支变化的方法。虽然它们都可以将一个分支的更改引入到另一个分支中&#xff0c;但它们的工作方式和结果是不同的。以下是对这两者的详细解释&#xff1a; Git Merge 功能&#xff1a;合并分支&#xff0c;将两个分支的…

密码编码学与网络安全(第五版)答案

通过如下代码分别统计一个字符的频率和三个字符的频率&#xff0c;"8"——"e"&#xff0c;“&#xff1b;48”——“the”&#xff0c;英文字母的相对使用频率&#xff0c;猜测频率比较高的依此为&#xff09;&#xff0c;t,*,5&#xff0c;分别对应s,o,n,…

我在广州学 Mysql 系列之 数据类型和运算符详解

ℹ️大家好&#xff0c;我是&#x1f606;练小杰&#xff0c;今天主要学习 Mysql的数据类型以及运算符操作~~ 上周五学习了“Mysql 系列之 数据“表”的基本操作”~ 想要了解更多&#x1f236;️MYSQL 数据库的命令行总结&#xff01;&#xff01;&#xff01; “我是你的敌人,…

SpringBoot中基于JWt的授权与续期方案

一、 SpringBoot中Token登录授权、续期和终止的方案RedisToken SpringBoot项目写登录注册之类的方案 使用Cookie或Session的话&#xff0c;它是有状态的&#xff0c;不符合分布式技术架构使用Security或者Shiro框架实现起来比较复杂&#xff0c;一般项目无需用那么复杂使用JW…

小程序快速实现大模型聊天机器人

需求分析&#xff1a; 基于大模型&#xff0c;打造一个聊天机器人&#xff1b;使用开放API快速搭建&#xff0c;例如&#xff1a;讯飞星火&#xff1b;先实现UI展示&#xff0c;在接入API。 最终实现效果如下&#xff1a; 一.聊天机器人UI部分 1. 创建微信小程序&#xff0c…

【OSS】php使用oss存储

阿里云oss官方文档&#xff1a;文档 1、前期工作 创建阿里云账号&#xff0c;登录创建bucket&#xff0c;注意修改权限&#xff0c;要不然可能读取不到 申请accessKeyId和accessKeySecret accessKey 2、项目中安装OSS扩展 composer require aliyuncs/oss-sdk-php3、基础使…

Elasticsearch02-安装7.x

零、文章目录 Elasticsearch02-安装7.x 1、Windows安装Elasticsearch &#xff08;1&#xff09;JDK安装 Elasticsearch是基于java开发的&#xff0c;所以需要安装JDK。我们安装的Elasticsearch版本是7.15&#xff0c;对应JDK至少1.8版本以上。也可以不安装jdk&#xff0c;…