RabbitMQ消息确认

news2024/9/23 3:22:36

目录

1. 消息确认作用

2 开发示例

2.1 生产者确认

2.2 消费者确认


1. 消息确认作用

保证消息的可靠性主要依靠三种机制:一个是消息的持久化,一个是事务机制,一个就是消息的确认机制。

1)消息持久化

消息持久化是将消息写入本地文件,如果rabbitmq故障退出,在重启时会从本地文件系统读取队列数据。

2)事务机制

rabbitmq的事务机制提供了消息生产者和消息服务器(broker)之间的事务的开启,提交,回滚操作(如下图所示)。这套机制可以保证消息可靠性,但也有缺点:由于使用事务机制会导致消息生产者和broker(服务器)交互次数增加,造成性能的浪费,且事务机制是阻塞的,在发送一条消息后需要等待RabbitMQ回应,获取回应后才能发送下一条消息,因此事务机制并不提倡使用(RabbitMQ事务模式与非事务模式在性能上相差可达高达上百倍,具体数值因机器性能和网络环境而不同,但差异都会非常明显)

事务提交流程:

  • 客户端向服务器请求开启事务(tx.select)
  • 服务器端返回响应接收开启事务(tx.select-ok)
  • 推送消息
  • 客户端请求提交事务(tx.commit)
  • 服务器提交事务返回响应(tx.commit-ok)

3)消息确认

消息确认分为:发送者确认,接收方确认。 发送者确认分为:消息到达交换机确认,消息到达与交换机绑定的队列确认。

2 开发示例

用于示例开发基础代码:

git clone -b rabbitmqDemo git@gitee.com:heizifeng/rabbit-mqdemo.git

2.1 生产者确认

因为:每个RabbitTemplate实例只能注册一个ConfirmCallback,所以如果启动web容器并多次调用该方法进行消息发送,则会报异常。(测试用例可以通过,是因为每次测试执行完毕后容器就终止,下次运行时是新的容器)

增加RabbitTemplate的配置类,在配置类中指定消息确认回调方法:

package com.zking.rabbitmqdemo.provied.config;

import lombok.extern.slf4j.Slf4j;
import org.springframework.amqp.rabbit.connection.ConnectionFactory;
import org.springframework.amqp.rabbit.core.RabbitTemplate;
import org.springframework.amqp.support.converter.Jackson2JsonMessageConverter;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;

/**
 * 
 * @site www.xiaomage.com
 * @company xxx公司
 * @create  2021-11-14 10:04
 */
@Configuration
@Slf4j
public class RabbitTemplateConfig {

    @Bean
    public RabbitTemplate createRabbitTemplate(ConnectionFactory connectionFactory) {
        RabbitTemplate template = new RabbitTemplate(connectionFactory);
        template.setMandatory(true);
        template.setMessageConverter(new Jackson2JsonMessageConverter());
        template.setEncoding("utf-8");

        //实现消息发送到exchange后接收ack回调,publisher-confirms:true
        //如果队列是可持久化的,则在消息成功持久化之后生产者收到确认消息
        template.setConfirmCallback(((correlationData, ack, cause) -> {
            if(ack) {
                log.info("消息成功发送到exchange,id:{}", correlationData.getId());
            } else {
                /*
                 * 消息未被投放到对应的消费者队列,可能的原因:
                 * 1)发送时在未找到exchange,例如exchange参数书写错误
                 * 2)消息队列已达最大长度限制(声明队列时可配置队列的最大限制),此时
                 * 返回的cause为null。
                 */
                log.info("******************************************************");
                log.info("11消息发送失败: {}", cause);
            }
        }));

        //消息发送失败返回队列,publisher-returns:true
        template.setMandatory(true);

        //实现消息发送的exchange,但没有相应的队列于交换机绑定时的回调
        template.setReturnCallback((message, replyCode, replyText, exchange, routingKey) -> {
            String id = message.getMessageProperties().getCorrelationId();
            log.info("消息:{} 发送失败, 应答码:{} 原因:{} 交换机: {}  路由键: {}", id, replyCode, replyText, exchange, routingKey);
        });

        return template;
    }
}

 编写用于发送消息的延迟队列(死信)

package com.zking.rabbitmqdemo.provied.config;

import org.springframework.amqp.core.*;
import org.springframework.beans.factory.annotation.Qualifier;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;

import java.util.HashMap;
import java.util.Map;

/**
 *
 * @site www.xiaomage.com
 * @company xxx公司
 * @create  2021-11-12 10:04
 */
@Configuration
public class RabbitMQConfig {

    @Bean(name="directExchange")
    public Exchange directExchange() {
        return ExchangeBuilder.directExchange("direct_exchange").durable(true).build();
    }

    @Bean(name="directQueue")
    public Queue directQueue() {
        Map<String,Object> args = new HashMap<>();
        args.put("x-message-ttl", 1000*60*20);
        args.put("x-max-length", 3);
        args.put("x-overflow","reject-publish");
        return QueueBuilder.durable("direct_queue").withArguments(args).build();
    }

    @Bean
    public Binding directBinding(
            @Qualifier("directQueue") Queue queue,
            @Qualifier("directExchange") Exchange exchange) {

        return BindingBuilder
                .bind(queue)
                .to(exchange)
                .with("direct_exchange_routing_key")
                .noargs();
    }


    @Bean(name="topicExchange")
    public Exchange topicExchange() {

        return ExchangeBuilder
                .topicExchange("topic_exchange")
                .durable(true)
                .build();
    }

    @Bean(name="topicQueue1")
    public Queue topicQueue1() {
        return QueueBuilder.durable("topic_queue_q1").build();
    }

    @Bean(name="topicQueue2")
    public Queue topicQueue2() {
        return QueueBuilder.durable("topic_queue_q2").build();
    }

    @Bean
    public Binding topicBindingQ1(
            @Qualifier("topicQueue1") Queue queue,
            @Qualifier("topicExchange") Exchange exchange)  {

        return BindingBuilder
                .bind(queue)
                .to(exchange)
                .with("topic.queue.#")
                .noargs();
    }

    @Bean
    public Binding topicBindingQ2(
            @Qualifier("topicQueue2") Queue queue,
            @Qualifier("topicExchange") Exchange exchange) {

        return BindingBuilder
                .bind(queue)
                .to(exchange)
                .with("topic.queue.#")
                .noargs();
    }

    //死信队列
    @Bean(name="dxlExchange")
    public Exchange dxlExchange() {
        return ExchangeBuilder.topicExchange("dxl_exchange").durable(true).build();
    }

    @Bean(name="dxlQueue")
    public Queue dxlQueue() {
        return QueueBuilder.durable("dxl_queue").build();
    }

    @Bean
    public Binding bindingDxl(
            @Qualifier("dxlQueue") Queue queue,
            @Qualifier("dxlExchange") Exchange exchange) {

        return BindingBuilder
                .bind(queue)
                .to(exchange)
                .with("routing_dxl_key")
                .noargs();
    }

    @Bean(name="usualExchange")
    public Exchange usualExchange() {
        return ExchangeBuilder.directExchange("usual_direct_exchange").durable(true).build();
    }

    @Bean(name="usualQueue")
    public Queue usualQueue() {
        return QueueBuilder.durable("usual_queue")
                .withArgument("x-message-ttl", 1000*60*2)
                .withArgument("x-max-length", 5)
                .withArgument("x-dead-letter-exchange", "dxl_exchange")
                .withArgument("x-dead-letter-routing-key", "routing_dxl_key")
                .build();
    }

    @Bean
    public Binding bindingUsualQueue(
            @Qualifier("usualQueue") Queue queue,
            @Qualifier("usualExchange") Exchange exchange) {

        return BindingBuilder
                .bind(queue)
                .to(exchange)
                .with("routing_usual_key")
                .noargs();
    }


}

编写测试Service 发送消息

package com.zking.rabbitmqdemo.provied.service;

/**
 * @author aq
 * @site www.xiaomage.com
 * @company xxx公司
 * @create  2021-11-12 10:11
 */
public interface ISendMsgService {

    void sendDirectMsg();

    void topicExchangeSend();

    void dxlExchangeSend();

    void confirmMessage();
}
package com.zking.rabbitmqdemo.provied.service;

import org.springframework.amqp.rabbit.connection.CorrelationData;
import org.springframework.amqp.rabbit.core.RabbitTemplate;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.stereotype.Service;

import javax.annotation.Resource;
import java.time.LocalDateTime;
import java.time.format.DateTimeFormatter;
import java.util.UUID;

/**
 * @author aq
 * @site www.xiaomage.com
 * @company xxx公司
 * @create  2021-11-12 10:08
 */
@Service
public class SendMsgService implements ISendMsgService {

    @Autowired
    private RabbitTemplate rabbitTemplate;

    @Override
    public void sendDirectMsg() {
        String msg = "rabbitmq direct exchange send msg "
                + LocalDateTime.now().format(DateTimeFormatter.ofPattern("yyyy-MM-dd HH:mm:ss"));

        rabbitTemplate.convertAndSend("direct_exchange", "direct_exchange_routing_key",msg);
    }

    @Override
    public void topicExchangeSend() {
        String msg = "rabbitmq topic exchange send msg "
                + LocalDateTime.now().format(DateTimeFormatter.ofPattern("yyyy-MM-dd HH:mm:ss"));

        rabbitTemplate.convertAndSend("topic_exchange", "topic.queue.msg", msg);
    }

    @Override
    public void dxlExchangeSend() {
        String msg = "rabbitmq usual exchange send msg "
                + LocalDateTime.now().format(DateTimeFormatter.ofPattern("yyyy-MM-dd HH:mm:ss"));

        rabbitTemplate.convertAndSend("usual_direct_exchange", "routing_usual_key", msg);
    }

    @Override
    public void confirmMessage() {
        String msg = "rabbitmq direct exchange send msg and confirm "
                + LocalDateTime.now().format(DateTimeFormatter.ofPattern("yyyy-MM-dd HH:mm:ss"));

        CorrelationData correlationData = new CorrelationData(UUID.randomUUID().toString());

        rabbitTemplate.convertAndSend(
                "direct.exchange",
                "direct.exchange.routing.key",
                msg,
                correlationData);
    }

}

编写测试接口

package com.zking.rabbitmqdemo.provied.service;

import org.junit.Test;
import org.junit.runner.RunWith;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.boot.test.context.SpringBootTest;
import org.springframework.test.context.junit4.SpringRunner;

import static org.junit.Assert.*;

/**
 *
 * @site www.xiaomage.com
 * @company xxx公司
 * @create  2021-11-12 10:12
 */
@RunWith(SpringRunner.class)
@SpringBootTest
public class SendMsgServiceTest {

    @Autowired
    private ISendMsgService sendMsgService;

    @Test
    public void sendDirectMsg() {

        sendMsgService.sendDirectMsg();
    }

    @Test
    public void topicExchangeSend() {
        sendMsgService.topicExchangeSend();
    }

    @Test
    public void testDxlExchange() {
        sendMsgService.dxlExchangeSend();
    }
}

2.2 消费者确认

1)直接使用@RabbitListener,@RabbitHandler注解,通过配置文件配置监听容器:

application.properties


server.port=8084

#rabbitMQç¸å³éç½®
spring.rabbitmq.host=192.168.164.128
spring.rabbitmq.port=5672
spring.rabbitmq.username=admin
spring.rabbitmq.password=admin
spring.rabbitmq.virtual-host=my_vhost

#启用消息确认
spring.rabbitmq.listener.simple.acknowledge-mode=manual
spring.rabbitmq.listener.direct.acknowledge-mode=manual

#连接模式为channel
spring.rabbitmq.cache.connection.mode=channel
spring.rabbitmq.cache.channel.size=50

#每个队列的消费者数量
spring.rabbitmq.listener.direct.consumers-per-queue=2

#侦听器调用者线程的最小数量
spring.rabbitmq.listener.simple.concurrency=2
spring.rabbitmq.listener.simple.max-concurrency=100

#每次用队列中取出1个消息,在有多个消息消费者,且消息者处理能力不均时,可以
#起到均衡各消息消费者的处理功能的功能
spring.rabbitmq.listener.direct.prefetch=1

/**
 * @author Administrator
 * @create 2020-02-2422:30
 */
@Component
@Slf4j
public class ReceiverConfirmDemo implements ChannelAwareMessageListener {

    /**
     * 指定监听的队列.
     * 该注解可以放在方法上,也可以放在类上,如果放在类上则需要
     * 在一个方法上设置@RabbitHandler(isDefault=true),否则会报如下异常:
     * “Listener method ‘no match’ threw exception”。
     * 因此建议注解始终使用在方法上。
     */
    @RabbitListener(queues = "direct.queue")
    //指定该方法为消息处理器
    @RabbitHandler
    @Override
    public void onMessage(Message message, Channel channel) throws IOException {

        log.info("消息内容: {}",new String(message.getBody()));

        /**
         * 模拟业务处理方法.
         * 对应业务方法中出现的异常需要区分对待,例如以下情况:
         * 1)网络异常等可能恢复的异常,可以设置消息重新返回到队列,以便于重新处理
         * 2)对应业务数据等不可恢复的异常,则可以进行补偿操作,或放入死信队列进行人工干预
         */
        try {
            log.info("正在处理 ....");
            //延迟5秒
            TimeUnit.SECONDS.sleep(5);
            
            long deliveryTag = message.getMessageProperties().getDeliveryTag();

            //模拟在业务处理是发生了网络异常,如:在连接数据库保存数据时网络发生了抖动
            //此类异常是可以恢复的,需要要消息重新返回队列,以便于下次处理
            if(deliveryTag % 2 == 0) {
                throw new ConnectException("模拟消息消费者发生网络异常");
            }

            //模拟发生不可恢复异常,此种情况消息重新入队没有意义
            if(deliveryTag % 3 == 0) {
                throw new ClassCastException("模拟消息消费者发生不可恢复异常");
            }

        } catch (SocketException se) {

            log.info("SocketException: {}", se.getMessage());

            //拒绝deliveryTag对应的消息,第二个参数是否requeue,true则重新入队列,false则不会重新入队
            //如果配置了死信队列则消息会被投递到死信队列
            channel.basicReject(message.getMessageProperties().getDeliveryTag(), true);

            //不确认deliveryTag对应的消息,第二个参数是否应用于多消息,第三个参数是否requeue,
            //与basic.reject区别就是同时支持多个消息,可以nack该消费者先前接收未ack的所有消息。
            // nack后的消息也会被自己消费到
            //channel.basicNack(message.getMessageProperties().getDeliveryTag(), false, true);

            //是否恢复消息到队列,参数是是否requeue,true则重新入队列,
            // 并且尽可能的将之前recover的消息投递给其他消费者消费,
            //而不是自己再次消费。false则消息会重新被投递给自己
            //channel.basicRecover(true);
            return;
        } catch (Exception e) {
            //此处处理无法恢复的异常,可记录日志或将消息发送到指定的队列以便于后续的处理
            log.info("Exception: {}", e.getMessage());
            channel.basicReject(message.getMessageProperties().getDeliveryTag(), false);
            return;
        }

        log.info("处理完毕, 发送ack确认 .... ");
        channel.basicAck(message.getMessageProperties().getDeliveryTag(), false);
    }

}

编写完成消息消费者后,将通过消息生产者发送消息,查看消息消费的消费。

2)使用SimpleMessageListenerContainer配置监听容器

/**
 * @author Administrator
 * @create 2020-03-0119:27
 */
@Configuration
@Slf4j
public class MessageListenerContainer {

    @Autowired
    private ReceiverConfirmDemo receiverConfirmDemo;

    @Bean
    public SimpleMessageListenerContainer simpleMessageListenerContainer(ConnectionFactory connectionFactory){
        SimpleMessageListenerContainer container = new SimpleMessageListenerContainer();
        container.setConnectionFactory(connectionFactory);
        container.setConcurrentConsumers(1);
        container.setAcknowledgeMode(AcknowledgeMode.MANUAL);
        container.setQueueNames("direct.queue");

        //后置处理器,接收到的消息都添加了Header请求头
        /*container.setAfterReceivePostProcessors(message -> {
            message.getMessageProperties().getHeaders().put("desc",10);
            return message;
        });*/

        container.setMessageListener(receiverConfirmDemo);

        return container;
    }

}

在该类中注入的receiverConfirmDemo即为上面已经编写完成的ReceiverConfirmDemo消息处理器。

运行结果

 如果又问题那么 如果是网络异常 客恢复数据那么会回到原有队列 ,如果是不可处理异常那么可以用数据库保存,人工介入处理。

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.coloradmin.cn/o/132459.html

如若内容造成侵权/违法违规/事实不符,请联系多彩编程网进行投诉反馈,一经查实,立即删除!

相关文章

卷径计算详解(卷径通过卷绕的膜长和膜厚进行计算)

有关卷绕+张力控制可以参看专栏的系列文章,文章链接如下: 变频器简单张力控制(线缆收放卷应用)_RXXW_Dor的博客-CSDN博客_收放卷应用张力控制的开闭环算法,可以查看专栏的其它文章,链接地址如下:PLC张力控制(开环闭环算法分析)_RXXW_Dor的博客-CSDN博客。https://blo…

双向链表的双向冒泡排序、红白蓝砾石排序、算法设计4-5

&#xff08;PS&#xff1a;直接拿的友友zy的&#xff09; 一个不知名大学生&#xff0c;江湖人称菜狗 original author: jacky Li Email : 3435673055qq.com Time of completion&#xff1a;2023.1.1 Last edited: 2023.1.1 目录 &#xff08;PS&#xff1a;直接拿的友友zy的…

添加USB wifi驱动到RK3568

提示&#xff1a;文章写完后&#xff0c;目录可以自动生成&#xff0c;如何生成可参考右边的帮助文档 文章目录前言一、USB接口的wifi芯片二、使用步骤wireless tools 移植wireless tools 工具测试wpa_supplicant 移植openssl 移植libnl 库移植WIFI 联网测试总结前言 在日常开…

CTF-Web渗透(入门|笔记|工具)

php各种漏洞绕过 传送门&#xff1a;https://cloud.tencent.com/developer/article/2127498 php伪协议 详细博客讲解&#xff1a; https://blog.csdn.net/cosmoslin/article/details/120695429 http://hummer.vin/2022/05/10/PHP%E4%BC%AA%E5%8D%8F%E8%AE%AE/ https://ww…

Codeforces Round #833 (Div. 2)

题目链接 A. The Ultimate Square 题意&#xff1a; 给你一个n&#xff0c;表示有n块砖&#xff0c;第i块砖是1*(i/2)&#xff0c;这里是上取整&#xff0c;问你最大能组合成的正方形的边长是多少 思路&#xff1a; 观察样例就会发现是n/2上取整&#xff0c;下面看代码&…

快速了解网络原理

作者&#xff1a;~小明学编程 文章专栏&#xff1a;JavaEE 格言&#xff1a;热爱编程的&#xff0c;终将被编程所厚爱。 目录 局域网和广域网 局域网 局域网组建的方式 广域网 网络通信基础 IP地址 端口号 协议 什么是协议 协议分层 分层模型 OSI七层模型 TCP/IP…

Python解题 - CSDN周赛第18期 - 又见背包

卧床一周&#xff0c;一觉醒来&#xff0c;恍如隔世&#xff0c;做什么事都提不起兴趣&#xff0c;也不知道这算不算后遗症。 本期的题目还是比较简单的&#xff0c;也有几道做过的题。最后一道照搬过来的背包题也是比较经典的01背包了&#xff0c;整体感觉没有什么值得说的&am…

linux常用命令(四)- 文件备份解压缩

查看压缩文件信息 - zipinfo zipinfo命令用于列出压缩文件信息。 语法 zipinfo [-12hsvz][压缩文件]-1 只列出文件名称。-2 此参数的效果和指定"-1"参数类似&#xff0c;但可搭配"-h",“-t"和”-z"参数使用。-h 只列出压缩文件的文件名称。-s…

c++11 标准模板(STL)(std::deque)(四)

定义于头文件 <deque> std::deque 元素访问 访问指定的元素&#xff0c;同时进行越界检查 std::deque<T,Allocator>::at reference at( size_type pos ); const_reference at( size_type pos ) const; 返回位于指定位置 pos 的元素的引用&#xff0c;有边…

如何在PVE(Proxmox)中安装OpenWrt软路由?

出处&#xff1a; https://www.928wang.cn/archives/1763.html https://blog.itwk.cc/post/pve_install_openwrt.html 工具准备 WinSCP或者XFTPOpenWrt镜像(自行寻找)安装好PVE的主机一台 安装教程 镜像上传 将下载好的OpenWrt img镜像上传到 PVE主机中(这里使用XFTP工具) 选…

MySQL中的DDL、DML、DCL、DQL

SQL分类 DDL(Data Definition Language)数据定义语言 用来定义数据库对象&#xff1a;数据库&#xff0c;表&#xff0c;列等。关键字&#xff1a;create, drop,alter 等 DML(Data Manipulation Language)数据操作语言 用来对数据库中表的数据进行增删改。关键字&#xff1a;i…

vue-element-表格 Excel 【导入】功能 (2023元旦快乐~~~)

一、页面表格导入功能 我们借鉴vue-element-admin文件来学习表格导入功能,如果你有vue-element-admin的完整文件&#xff0c;可以去这里找 or 用我这里的代码 1. 整体复制到你要用到的页面 <template><div class"app-container"><upload-excel-com…

unreal engine 纹理动态运动的实现

先用ps涉及一张图,发光的地方为白色 下图实际上边缘是相连的白色 split_line.jpgue新建材质 基础色vector3 随便选择一个偏灰的颜色 自发光 TextureCoordirate ->Panner->图片rgb->*发光常量 * 20自发光 预览效果 通过修改纹理协调器的V垂直平铺控制条纹数量 image.pn…

mybatis插件

Configuration组成 Mapper映射器 3个部分组成&#xff1a; MappedStatement 保存一个节点(select | insert | update | delete) &#xff0c;包括我们配置的sql&#xff0c;&#xff0c;sql的id&#xff0c;&#xff0c;缓存信息&#xff0c;&#xff0c;resultMap,parameterT…

Redis高并发锁(三)分布式锁

在很多情况下&#xff0c;你的数据库不支持事务&#xff0c;分布式部署也使得你无法去使用JVM锁&#xff0c;那么这种时候&#xff0c;你可以考虑用分布式锁 文章目录分布式锁1. 实现方式2. 特征3. 操作4. 代码改造5. 测试优化1. 递归改成循环2. 防止死锁3. 防误删4. LUA脚本 保…

Arduino code for RS-365PW 16120

Pictures These pictures are from Baidu Search. Picture 1: Installment Picture 2: Appearance Picture 3: Encoder of Motor Picture 4: Pins location and number Physical Specification Brand: Mabuchi Motor (万宝至电机)Type: RS-365PW 16120 Body length&#xff1…

学生抢课接口(高并发入门)

目录 使用Mysql 常规测试 张三测试 流程总结 redis优化 修改代码 测试 使用分布式锁 总结 使用Mysql 常规测试 原始代码: Override Transactional public ResponseResult selectCourse(SelectParmas selectParmas) {if (Objects.isNull(selectParmas)){return new …

【python游戏】新的一年快来变身兔兔战士打败獾守护兔兔吧~

前言 大家早好、午好、晚好吖 ❤ ~ 一只快乐兔&#xff0c; 来到快乐山&#xff0c;喝了快乐泉&#xff0c; 又到快乐殿&#xff0c;吃了快乐莲&#xff0c;遇到快乐仙&#xff0c; 听了快乐言&#xff1a;快乐很简单&#xff0c;快乐在身边&#xff0c;快乐无极限&#xff…

C++中STL的vector扩容机制

目录前言发生扩容扩容机制size()和capacity()reserve()和resize()前言 前阵子面试的时候&#xff0c;被问到往vector中插入一个数据可能会发生什么&#xff1f; 我答:可能会扩容; 为啥vector支持变长&#xff1f; 我答:它实在堆上动态申请内存&#xff0c;因此有自己的一套扩容…

Redis集群系列十 —— 集群伸缩之收缩

集群收缩原理 集群收缩就是让其中一些节点安全下线。 所谓的安全下线指的是让一个节点下线之前&#xff0c;把其负责的所有 slots 迁移到别的节点上&#xff0c;否则该节点下线后其负责的 slots 就没法继续提供服务了。 收缩流程如下&#xff1a; 需求 前面扩容完成后&…