Rabbitmq入门与应用(六)-rabbitmq的消息确认机制

news2024/10/3 22:21:19

rabbitmq的消息确认机制

确认消息是否发送给交换机

配置
server:
  port: 11111
spring:
  rabbitmq:
    port: 5672
    host: 192.168.201.81
    username: admin
    password: 123
    publisher-confirm-type: correlated
编码RabbitTemplate.ConfirmCallback

ConfirmCallback 是一个回调接口,消息发送到 Broker 后触发回调,确认消息是否到达 Broker 服务器,也就是只确认是否正确到达 Exchange 中。

在配置类中编码确认回调函数。tips: 设置 rabbitTemplate.setMandatory(true);

配置类

rabbitTemplate.setConfirmCallback(ConfirmCallback confirmCallback);

CorrelationData:

1、消息ID需要封装到CorrelationData
2、correlationData.getFuture().addCallback(…)是一个回调函数:决定了每个业务处理confirm成功或失败的逻辑。

@Bean
public MessageConverter messageConverter(){
    return new Jackson2JsonMessageConverter();
}

@Bean
public RabbitTemplate rabbitTemplate(ConnectionFactory connectionFactory){

    RabbitTemplate rabbitTemplate = new RabbitTemplate(connectionFactory);
    log.debug("Rabbitmq配置启动成功,RabbitTemplate:{}设置完成",rabbitTemplate);
    rabbitTemplate.setMessageConverter(messageConverter());
    rabbitTemplate.setConfirmCallback(new RabbitConfirmCallbackImpl());
    return rabbitTemplate;
}

/**
 * 确保消息是否发送到交换机
 */
class RabbitConfirmCallbackImpl implements RabbitTemplate.ConfirmCallback{
    @Override
    public void confirm(CorrelationData correlationData, boolean ack, String cause) {
         log.warn("****Exchange callback-检验是否发送成功********");
         log.warn("correlationData->相关数据:{}",correlationData);
         log.warn("ack->Exchange响应:{}",ack);
         log.warn("cause->错误原因:{}",cause);
 }
}
测试发送

测试向交换机发送数据,测试交换机是否成功收到。

假设给一个错误的Exchange
@Service
public class MqServiceImpl implements IMqService {

    @Autowired
    private RabbitTemplate rabbitTemplate;

    @Override
    public void sendMessage(String msg) {
         //错误的Exchange名称,实际名称为:ssc_sc_routing_exchange
        final String EXCHANGE = "ssc_sc_routing_exchangex";
        final String ROUTING_KEY = "ssc_sc_routing_key";

        rabbitTemplate.convertAndSend(
                EXCHANGE,
                ROUTING_KEY,
                msg
        );
    }
}

image-20231218164926379

如果Exchange正确
@Override
public void sendMessage(String msg) {
    final String EXCHANGE = "ssc_sc_routing_exchange";
    final String ROUTING_KEY = "ssc_sc_routing_key";
    rabbitTemplate.convertAndSend(
            EXCHANGE,
            ROUTING_KEY,
            msg
    );
}

image-20231218164425398

确认消息是否从交换机发送到队列RabbitTemplate.ReturnsCallback

设置ResturnsCallback

通过实现 ReturnCallback 接口,启动消息失败返回,此接口是在交换器路由不到队列时触发回调。

配置文件
spring:
  rabbitmq:
    publisher-confirm-type: correlated
    publisher-returns: true     #检查是否绑定到队列中
配置
rabbitTemplate.setMandatory(true);
rabbitTemplate.setReturnsCallback(new RabbitConfirmReturnCallbackImpl());

class RabbitConfirmReturnCallbackImpl implements RabbitTemplate.ReturnsCallback{
    @Override
    public void returnedMessage(ReturnedMessage returnedMessage) {
       log.warn("message:{}",returnedMessage.getMessage());
       log.warn("exchange:{}",returnedMessage.getExchange());
       log.warn("replyCode:{}",returnedMessage.getReplyCode());
       log.warn("replyText:{}",returnedMessage.getReplyText());
       log.warn("routingKey:{}",returnedMessage.getRoutingKey());
    }
}
测试

修改routingkey的值,让交换机不能路由到指定Queue。

package com.wnhz.ssc.cloud.mq.service.impl;

import com.wnhz.ssc.cloud.mq.service.IMqService;
import org.springframework.amqp.rabbit.core.RabbitTemplate;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.stereotype.Service;

@Service
public class MqServiceImpl implements IMqService {

    @Autowired
    private RabbitTemplate rabbitTemplate;

    @Override
    public void sendMessage(String msg) {
        final String EXCHANGE = "ssc_sc_routing_exchange";
        //修改routingkey,给一个错误的值,正确值为: ssc_sc_routing_key
        final String ROUTING_KEY = "ssc_sc_routing_keyx";
        rabbitTemplate.convertAndSend(
                EXCHANGE,
                ROUTING_KEY,
                msg
        );
    }
}

image-20231218165907610

返回message:

message:(
Body:'"hello confirm call back"'
    MessageProperties
    [
        headers={
                __TypeId__=java.lang.String
                 },
        contentType=application/json,
        contentEncoding=UTF-8,
        contentLength=0,
        receivedDeliveryMode=PERSISTENT,
        priority=0,
        deliveryTag=0
   ]
)

消费确认信息

消费监听模式
  • Simple模式

    image-20231218155921090

    Simple模式即SMLC。simple模式每个消费者都有其私有的线程,可以增加消费者,也会自动增加消费线程,不管消费者是不是在处理消息,可能会造成资源线程的浪费。 对每个消费者使用一个内部队列和一个专用线程。如果容器配置为侦听多个队列,则使用同一个消费者线程来处理所有队列。并发控制由concurrentConsumers和其他属性。当消息从 RabbitMQ 客户端到达时,客户端线程通过队列将它们传递给消费者线程。

  • Direct模式

    image-20231218160106458

    压力集中在Connection线程池上,线程可以复用与多个消费者,但是如果采用这种模式,需要设置Connection线程池合适的参数。

Message对象结构

Message对象的结构,

消费者在订阅队列时,可以指定 autoAck 参数,当 autoAck 参数等于 false 时,RabbitMQ 会等待消费者显式地回复确认信号后才从内存(或者磁盘)中移除消息(实际上是先打上删除标记,之后在删除)。当 autoAck 参数等于 true 时,RabbitMQ 会自动把发送出去的消息置为确认,然后从内存(或者磁盘)中删除,而不管消费者是否真正地消费到了这些消息。

image-20231218173406699

image-20231218173218649

消息确认方式
  1. AcknowledgeMode.AUTO:自动确认。
  2. AcknowledgeMode.NONE:根据情况确认。
  3. AcknowledgeMode.MANUAL:手动确认。

direct模式:

image-20231218173612719

simple模式:

image-20231218173833493
消费端监听发送
@RabbitListener(queues = "data_confirm_queue")
@Override
public void receiveBookFromMq(Message message, Channel channel, Book book) {

    log.debug("message:{}", message);
    log.debug("message.getMessageProperties().getHeaders()===>{}",
            message.getMessageProperties().getHeaders());
    log.debug("[order消费者:]接收到消息: {}", book);

    try {
        channel.basicAck(message.getMessageProperties().getDeliveryTag(), true);
        log.debug("消息队列确认: {},{}",
                message.getMessageProperties().getConsumerQueue(), "接收到回调方法");
    } catch (IOException e) {
        e.printStackTrace();
    }
}
手动确认方式
  1. Basic.Ack 命令:用于确认当前消息。
  2. Basic.Nack 命令:用于否定当前消息(注意:这是AMQP 0-9-1的RabbitMQ扩展) 。
  3. Basic.Reject 命令:用于拒绝当前消息
channel.basicAck(long deliveryTag,boolean multiple)

basicAck 方法用于确认当前消息。

public void basicAck(long deliveryTag, boolean multiple) throws IOException {
    this.delegate.basicAck(deliveryTag, multiple);
}
  • deliveryTag(唯一标识 ID):当一个消费者向 RabbitMQ 注册后,会建立起一个 Channel ,RabbitMQ 会用 basic.deliver 方法向消费者推送消息,这个方法携带了一个 delivery tag, 它代表了 RabbitMQ 向该 Channel 投递的这条消息的唯一标识 ID,是一个单调递增的正整数,delivery tag 的范围仅限于 Channel。

  • multiple:为了减少网络流量,手动确认可以被批处理。

    • true: 代表批量应答 channel 上未应答的消息,比当前tag小的未应答的也一并应答(如5,6,7未应答)。

      image-20231218175620309
    • false: 只会应答 tag=8 的消息 5,6,7 这三个消息依然不会被确认收到消息应答

      image-20231218175738313

basicNack

basicNack 方法用于否定当前消息。 由于 basicReject 方法一次只能拒绝一条消息,如果想批量拒绝消息,则可以使用 basicNack 方法。消费者客户端可以使用 channel.basicNack 方法来实现

public void basicNack(long deliveryTag, boolean multiple, boolean requeue) throws IOException {
    this.delegate.basicNack(deliveryTag, multiple, requeue);
}
basicReject(long deliveryTag, boolean requeue)

basicNack 方法用于否定当前消息。basicReject 方法用于明确拒绝当前的消息而不是确认。

public void basicReject(long deliveryTag, boolean requeue) throws IOException {
    this.delegate.basicReject(deliveryTag, requeue);
}

消息遗弃或入队,一般建议消息丢弃重新发。

  • requeue: true :重回队列,false :丢弃,我们在nack方法中必须设置 false,否则重发没有意义。
出现异常的解决方案
package com.wnhz.mq.order.service.impl;

import com.rabbitmq.client.Channel;
import com.wnhz.domain.Book;
import com.wnhz.mq.order.service.IOrderService;
import lombok.extern.slf4j.Slf4j;
import org.springframework.amqp.core.Message;
import org.springframework.amqp.rabbit.annotation.RabbitListener;
import org.springframework.stereotype.Service;

import java.io.IOException;

@Slf4j
@Service
public class OrderServiceImpl implements IOrderService {

    private void buildException(){
        throw  new RuntimeException("[消费者:] 消费出现异常......");
    }

    @RabbitListener(queues = "data_confirm_queue")
    @Override
    public void receiveBookFromMq(Message message, Channel channel, Book book) {
        try {
            //制造异常测试
            buildException();
            log.debug("message:{}", message);
            log.debug("message.getMessageProperties().getHeaders()===>{}",
                    message.getMessageProperties().getHeaders());
            log.debug("[order消费者:]接收到消息: {}", book);
            channel.basicAck(message.getMessageProperties().getDeliveryTag(), true);

            log.debug("消息队列确认: {},{}",
                    message.getMessageProperties().getConsumerQueue(), "接收到回调方法");
        } catch (Exception e) {
          log.debug("消费异常: {}",e.getMessage());
            try {
                log.debug("尝试丢弃:{}消息.....................",book);
                channel.basicReject(message.getMessageProperties().getDeliveryTag(),false);
            } catch (IOException ex) {
                ex.printStackTrace();
            }
        }
    }
}

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.coloradmin.cn/o/1457516.html

如若内容造成侵权/违法违规/事实不符,请联系多彩编程网进行投诉反馈,一经查实,立即删除!

相关文章

Leetcode3011. 判断一个数组是否可以变为有序

Every day a Leetcode 题目来源:3011. 判断一个数组是否可以变为有序 解法1:分组循环 排序 适用场景:按照题目要求,数组会被分割成若干组,每一组的判断/处理逻辑是相同的。 核心思想: 外层循环负责遍…

2024年TIOBE编程语言排行榜

1. 2024年TIOBE编程语言排行榜,我为C打Call,你呢? https://www.tiobe.com/tiobe-index/

大模型量化技术原理-LLM.int8()、GPTQ

近年来,随着Transformer、MOE架构的提出,使得深度学习模型轻松突破上万亿规模参数,从而导致模型变得越来越大,因此,我们需要一些大模型压缩技术来降低模型部署的成本,并提升模型的推理性能。 模型压缩主要分…

【Java中23种设计模式-单例模式2--懒汉式2线程安全】

加油,新时代打工人! 简单粗暴,学习Java设计模式。 23种设计模式定义介绍 Java中23种设计模式-单例模式 Java中23种设计模式-单例模式2–懒汉式线程不安全 package mode;/*** author wenhao* date 2024/02/19 09:38* description 单例模式…

三防平板丨三防工业平板电脑丨三防平板电脑有哪些优势?

三防平板电脑通常使用特殊材料和制造工艺来达到防水、防尘、防摔的目的,这样可以在极端条件下使用,并保证设备的稳定性和可靠性。因此,三防平板电脑适用于各种恶劣环境,如户外野营、物流、工业制造等应用场景。那么相比于普通消费…

Python Selenium 爬虫淘宝案例

爬虫专栏:http://t.csdnimg.cn/WfCSx 前言 在前一章中,我们已经成功尝试分析 Ajax 来抓取相关数据,但是并不是所有页面都可以通过分析 Ajax 来完成抓取。比如,淘宝,它的整个页面数据确实也是通过 Ajax 获取的&#x…

Radware Alteon负载均衡-基于域名的七层负载均衡

Radware Alteon作为一款高性能的负载均衡器,其基于域名的七层负载均衡功能为众多企业提供了灵活、高效的解决方案。 该案例实现如下需求:客户端访问服务器,当访问域名为www.iisstart.com时,默认访问Server1,当配置七层…

数据结构-最短路径(Dijkstra算法与Floyd算法)

介绍 对于网图来说,最短路径是指两顶点之间经过的边上权值之和最少的路径,其路径上第一个点记为源点,最后一个为终点。 计算最短路径有两个经典算法,即迪杰斯特拉(Dijkstra)算法与弗洛伊德(Fl…

蓝桥杯嵌入式STM32G431RBT6知识点(主观题部分)

目录 1 前置准备 1.1 Keil 1.1.1 编译器版本及微库 1.1.2 添加官方提供的LCD及I2C文件 1.2 CubeMX 1.2.1 时钟树 1.2.2 其他 1.2.3 明确CubeMX路径,放置芯片包 2 GPIO 2.1 实验1:LED1-LED8循环亮灭 ​编辑 2.2 实验2&#xff1a…

C#使用MiniExcel导入导出数据到Excel/CSV文件

MiniExcel简介 简单、高效避免OOM的.NET处理Excel查、写、填充数据工具。 目前主流框架大多需要将数据全载入到内存方便操作,但这会导致内存消耗问题,MiniExcel 尝试以 Stream 角度写底层算法逻辑,能让原本1000多MB占用降低到几MB&#xff…

提取游戏音频文件.bnk

提取游戏音频文件.bnk 什么是.bnk准备Wwise-Unpacker工具使用Wwise-Unpacker工具总结 什么是.bnk .bnk其实是一种对音频的加密方式,一个.bnk文件中通常包含了多个语音文件,一般可以使用Wwise-Unpacker来解码.bnk格式文件 准备Wwise-Unpacker工具 Wwis…

视频基础学习一——色立体、三原色以及像素

文章目录 前言一、什么是颜色1.色立体特征2.色立体模型 二、三原色和色立体1.三原色(RGB)2.RGB颜色叠加 三、像素和三原色总结 前言 本文的目的是为了梳理音视频基础相关的知识,有很多做流媒体、音视频相关的研发对于音视频的根本原理是不清楚的。博主也是查阅了相…

Linux 内存top命令详解

通过top命令可以监控当前机器的内存实时使用情况,该命令的参数解释如下: 第一行 15:30:14 —— 当前系统时间 up 1167 days, 5:02 —— 系统已经运行的时长,格式为时:分 1 users ——当前有1个用户登录系统 load average: 0.00, 0.01, 0.05…

Code Composer Studio (CCS) - 全局搜索功能

Code Composer Studio [CCS] - 全局搜索功能 1. Ctrl H,全局搜索功能References 1. Ctrl H,全局搜索功能 References [1] Yongqiang Cheng, https://yongqiang.blog.csdn.net/

小程序列表下拉刷新和加载更多

配置 在小程序的app.json中,检查window项目中是否已经加入了"enablePullDownRefresh": true,这个用来开启下拉刷新 "window": {"backgroundTextStyle": "light","navigationBarBackgroundColor": &q…

winform实现最小化至系统托盘

NotifyIcon类介绍 NotifyIcon 是 .NET中的一个类,它用于在系统托盘中显示图标。这个类在 System.Windows.Forms 命名空间下。 使用 NotifyIcon 类,你可以在系统托盘中创建一个图标,当用户点击或右键点击这个图标时,可以触发一些事…

GA-kmedoid 遗传算法优化K-medoids聚类

遗传算法优化K-medoids聚类是一种结合了遗传算法和K-medoids聚类算法的优化方法。遗传算法是一种基于自然选择和遗传机制的随机优化算法,它通过模拟生物进化过程中的遗传、交叉、变异等操作来寻找问题的最优解。而K-medoids聚类算法是一种基于划分的聚类方法&#x…

微服务-微服务Nacos配置中心

1.1 配置中心架构 1.2 Config Client源码分析 配置中心核心接口ConfigService public class ConfigServerDemo {public static void main(String[] args) throws NacosException, InterruptedException {String serverAddr "localhost";String dataId "naco…

2024 年 7 项值得学习的高收入技能

曾梦想执剑走天涯,我是程序猿【AK】 目录 简述概要7项高收入技能6 个职业目标示例1. 晋升领导职务2.成为思想领袖3.致力于个人发展4.转向新的职业道路5.体验职业稳定性6.制定职业目标 简述概要 2023年已过,2024年已来,陆陆续续开始了复工&am…

面试经典150题 -- 链表 (总结)

总的地址 : 面试经典 150 题 - 学习计划 - 力扣(LeetCode)全球极客挚爱的技术成长平台 c链表总结 : 链表总结 -- 《数据结构》-- c/c-CSDN博客 141 . 环形链表 详细题解参考 : 141 . 环形链表-CSDN博客 这里给出慢双指针的代码 : /*** Defini…