rabbitmq的消息应答

news2025/1/23 3:53:22
消费者完成一个任务可能需要一段时间,如果其中一个消费者处理一个长的任务并仅只完成
了部分突然它挂掉了,会发生什么情况。RabbitMQ 一旦向消费者传递了一条消息,便立即将该消
息标记为删除。在这种情况下,突然有个消费者挂掉了,我们将丢失正在处理的消息。以及后续
发送给该消费这的消息,因为它无法接收到。
为了保证消息在发送过程中不丢失,rabbitmq 引入消息应答机制,消息应答就是: 消费者在接
收到消息并且处理该消息之后,告诉 rabbitmq 它已经处理了,rabbitmq 可以把该消息删除了。

消息应答的方法 

Channel.basicAck(用于肯定确认)

RabbitMQ 已知道该消息并且成功的处理消息,可以将其丢弃了

Channel.basicNack(用于否定确认)

Channel.basicReject(用于否定确认)

与 Channel.basicNack 相比少一个参数
不处理该消息了直接拒绝,可以将其丢弃了

自动应答 

消息发送后立即被认为已经传送成功,这种模式需要在 高吞吐量和数据传输安全性方面做权
,因为这种模式如果消息在接收到之前,消费者那边出现连接或者 channel 关闭,那么消息就丢
失了,当然另一方面这种模式消费者那边可以传递过载的消息, 没有对传递的消息数量进行限制
当然这样有可能使得消费者这边由于接收太多还来不及处理的消息,导致这些消息的积压,最终
使得内存耗尽,最终这些消费者线程被操作系统杀死, 所以这种模式仅适用在消费者可以高效并
以某种速率能够处理这些消息的情况下使用。默认消息采用的是自动应答.

手动应答

手动应答的好处是可以批量应答并且减少网络拥堵multiple 的 true 和 false 代表不同意思

true 代表批量应答 channel 上未应答的消息
比如说 channel 上有传送 tag 的消息 5,6,7,8 当前 tag 是 8 那么此时
5-8 的这些还未应答的消息都会被确认收到消息应答
false 同上面相比
只会应答 tag=8 的消息 5,6,7 这三个消息依然不会被确认收到消息应答

消费者1 

import com.rabbitmq.client.CancelCallback;
import com.rabbitmq.client.Channel;
import com.rabbitmq.client.DeliverCallback;

public class Worker01 {
     private static final String QUEUE_NAME="hello";

     public static void main(String[] args) throws Exception {
         Channel channel = RabbitMqUtils.getChannel();
         System.out.println("C1 等待接收消息处理时间较短");
         DeliverCallback deliverCallback=(consumerTag, delivery)->{
             String receivedMessage = new String(delivery.getBody());
             System.out.println("接收到消息:"+receivedMessage);
             try {
                 // 睡眠1秒
                 Thread.sleep(1000*1);
             } catch (InterruptedException e) {
                 throw new RuntimeException(e);
             }
             // 消息标记tag,2.false代表只应答接收到的那个传递的消息,true为应答所有消息包括传递过来的消息
             channel.basicAck(delivery.getEnvelope().getDeliveryTag(),false);
         };
         CancelCallback cancelCallback=(consumerTag)->{
         System.out.println(consumerTag+"消费者取消消费接口回调逻辑");
         };
         System.out.println("C1 消费者启动等待消费......");
         boolean autoAck=false;
         channel.basicConsume(QUEUE_NAME,autoAck,deliverCallback,cancelCallback);
     }
}

 消费者2


import com.rabbitmq.client.CancelCallback;
import com.rabbitmq.client.Channel;
import com.rabbitmq.client.DeliverCallback;

public class Worker02 {
    private static final String QUEUE_NAME="hello";

    public static void main(String[] args) throws Exception {
        Channel channel = RabbitMqUtils.getChannel();
        System.out.println("C2 等待接收消息处理时间较长");
        DeliverCallback deliverCallback=(consumerTag, delivery)->{
            String receivedMessage = new String(delivery.getBody());
            System.out.println("接收到消息:"+receivedMessage);
            try {
                // 睡眠30秒
                Thread.sleep(1000*30);
            } catch (InterruptedException e) {
                throw new RuntimeException(e);
            }
            // 消息标记tag,2.false代表只应答接收到的那个传递的消息,true为应答所有消息包括传递过来的消息
            channel.basicAck(delivery.getEnvelope().getDeliveryTag(),false);
        };
        CancelCallback cancelCallback=(consumerTag)->{
            System.out.println(consumerTag+"消费者取消消费接口回调逻辑");
        };
        System.out.println("C2 消费者启动等待消费......");
        boolean autoAck=false;
        channel.basicConsume(QUEUE_NAME,autoAck,deliverCallback,cancelCallback);
    }
}

生产者 

import com.rabbitmq.client.Channel;
import com.rabbitmq.client.Connection;
import com.rabbitmq.client.ConnectionFactory;

public class Producer {
    private final static String QUEUE_NAME = "hello";

    public static void main(String[] args) throws Exception {
                //创建一个连接工厂
                ConnectionFactory factory = new ConnectionFactory();
                factory.setHost("43.139.59.28");
                factory.setUsername("guest");
                factory.setPassword("guest");
                //channel 实现了自动 close 接口 自动关闭 不需要显示关闭
                try(Connection connection = factory.newConnection(); Channel channel = connection.createChannel()) {
                /**
                * 生成一个队列
                * 1.队列名称
                * 2.队列里面的消息是否持久化 默认消息存储在内存中
                * 3.该队列是否只供一个消费者进行消费 是否进行共享 true 可以多个消费者消费
                * 4.是否自动删除 最后一个消费者端开连接以后 该队列是否自动删除 true 自动删除
                * 5.其他参数
                */
                channel.queueDeclare(QUEUE_NAME,false,false,false,null);
                String message="hello world!!!!";
                /**
                * 发送一个消息
                * 1.发送到那个交换机
                * 2.路由的 key 是哪个
                * 3.其他的参数信息
                * 4.发送消息的消息体
                */
                    for (int i = 0; i < 10; i++) {
                        channel.basicPublish("",QUEUE_NAME,null,message.getBytes());
                    }

                System.out.println("消息发送完毕");
           }
    }
}

结果

在发送者发送消息 ,发出消息之后的把 C2 消费者停掉,按理说该 C2 来处理该消息,但是
由于它处理时间较长,在还未处理完,也就是说 C2 还没有执行 ack 代码的时候,C2 被停掉了,
此时会看到消息被 C1 接收到了,说明消息  被重新入队,然后分配给能处理消息的 C1 处理了 

消息自动重新入队

如果消费者由于某些原因失去连接(其通道已关闭,连接已关闭或 TCP 连接丢失),导致消息
未发送 ACK 确认,RabbitMQ 将了解到消息未完全处理,并将对其重新排队。如果此时其他消费者
可以处理,它将很快将其重新分发给另一个消费者。这样,即使某个消费者偶尔死亡,也可以确
保不会丢失任何消息。

 

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.coloradmin.cn/o/875233.html

如若内容造成侵权/违法违规/事实不符,请联系多彩编程网进行投诉反馈,一经查实,立即删除!

相关文章

如何保证数据库的数据和Redis的数据一致性

实际项目中有可能会使用Redis缓存数据&#xff0c;那么在更新数据的时候如何保证数据库中的数据和Redis缓存的数据一致&#xff0c;缓存同步策略的选择是一个很重要的问题。网上有各种说法&#xff0c;大概总结有以下几种&#xff0c;看看每种方案是否可行以及存在的问题和适用…

思维导图模板下载网站有哪些?这6个网站优质模板任你选!

信息时代&#xff0c;有效的信息组织和知识管理变得尤其重要。思维导图&#xff0c;作为一种强大的视觉化工具&#xff0c;可以帮助我们整理和理解复杂的信息&#xff0c;提高工作和学习效率。 好的思维导图模板能帮助我们拓展思路、提升美观度、更快地完成思维导图的绘制。今…

【小梦C嘎嘎——启航篇】string介绍以及日常使用的接口演示

【小梦C嘎嘎——启航篇】string 使用&#x1f60e; 前言&#x1f64c;C语言中的字符串标准库中的string类string 比较常使用的接口对上述函数和其他函数的测试代码演示&#xff1a; 总结撒花&#x1f49e; &#x1f60e;博客昵称&#xff1a;博客小梦 &#x1f60a;最喜欢的座右…

好用的安卓手机投屏到mac分享

工具推荐&#xff1a;scrcpy github地址&#xff1a;https://github.com/Genymobile/scrcpy/tree/master mac使用方式 安装环境&#xff0c;打开terminal&#xff0c;执行以下命令&#xff0c;没有brew的先安装brew brew install scrcpy brew install android-platform-too…

Mybatis 源码 ① :开篇

文章目录 一、前言二、项目搭建三、自动装配四、总结 一、前言 Mybatis 官网 以及 本系列文章地址&#xff1a; Mybatis 源码 ① &#xff1a;开篇Mybatis 源码 ② &#xff1a;流程分析Mybatis 源码 ③ &#xff1a;SqlSessionMybatis 源码 ④ &#xff1a;TypeHandlerMybat…

ubuntu1804系统ROS1和ROS2一键装机

备忘一下ROS1和ROS2的一键装机 原网址:小鱼的一键安装系列 指令: wget http://fishros.com/install -O fishros && . fishros如果想同时安装ROS1和ROS2, 运行两次. fishros, 工具会自动将ROS1和ROS2放在不同的文件目录下 安装完每次打开终端时, 会提示选择ROS1还是RO…

【Linux】应用层协议

【Linux】应用层协议 文章目录 【Linux】应用层协议1、协议作用1.1 应用层需求1.2 协议分类 2、HTTP & HTTPS2.1 HTTP/HTTPS 简介2.2 HTTP工作原理2.3 HTTPS工作原理2.4 区别 3、URL3.1 编码解码3.2 URI & URL 4、HTTP 消息结构4.1 HTTP请求方法4.2 HTTP请求头信息 5、…

思维导图在线制作,10款好用的思维导图在线制作网站推荐!

思维导图的强大作用在于它以直观、易理解的图形方式展现信息&#xff0c;让复杂的内容变得简单明了&#xff0c;极大地提升了我们的学习和工作效率。与传统的手绘思维导图相比&#xff0c;在线思维导图制作工具更具有灵活性和实用性&#xff0c;它们提供了丰富的功能&#xff0…

00|Java中常见错误或不清楚

00. 多变量声明并初始化 同时声明同类型的多变量 String a "Hello", c "hello"; int x 5, y 5;01. 变量类型 01.0 浮点类型 默认是double类型&#xff0c;如果需要指定float类型&#xff0c;可以float f 1.0F; 01.1 类型装换 如果将大的类型转为…

立秋至 | 共建智慧城,秋日硕果时

一缕缕阳光洒向大地 一股股热浪迎面拂来 一声声虫鸣清脆悦耳 一片片黄叶轻声而落 一份份清凉沁入心间 一个个硕果接踵而至 跟随我们一起来回顾下 往期铭控小伙伴们 在助力建设智慧城市 做了哪些努力呢 都做了哪些项目呢 得到了多少客户的认可呢 Part 1 智慧消防 消防…

关于网络入侵检测领域使用Spark/Flink等计算框架做分布式

关于网络入侵检测领域使用Spark/Flink等计算框架做分布式 0、引言1 基于LightGBM的网络入侵检测研究2 基于互信息法的智能化运维系统入侵检测Spark实现3 基于Spark的车联网分布式组合深度学习入侵检测方法4 基于Flink的分布式在线集成学习框架研究5 基于Flink的分布式并行逻辑回…

【AHB】初识 AHB 总线

AHB 与 APB、ASB同属于 AMBA 总线架构规范&#xff0c;该总线规范由 ARM 公司提出。 目录 一、AHB 总线 二、AHB 总线组成 三、AHB 主从通信过程 一、AHB 总线 AHB&#xff08;Advanced High Performance Bus&#xff09;,意为高级高性能总线&#xff0c;能将微控制器&…

excel 下载方法封装

1.首先需要拿到后端返回的URL下载地址 2.写个下载方法 // url 接口返回的下载地址。例如&#xff1a;https://cancer-research.oss-cn-beijing.aliyuncs.com/yuance-platform-permission/校内共享数据导入模板.xlsx // name 文件名称 例如&#xff1a; 校内共享数据导入模板 /…

Pytorch安装教程:最新保姆级教程

目录 概述 重要的事情说三遍&#xff1a;不需要装cuda、不需要装cuda、不需要装cuda 1.查看自己NVIDIA版本 2.创建一个conda 环境 3.安装pytorch 本文意在帮助即将步入深入学习领域的学子 在这之前首先你需要安装好anaconda&#xff0c;不懂的可以下面这篇文章 最新Anaco…

探索人工智能 | 智能推荐系统 未来没有人比计算机更懂你

前言 智能推荐系统&#xff08;Recommendation Systems&#xff09;利用机器学习和数据挖掘技术&#xff0c;根据用户的兴趣和行为&#xff0c;提供个性化推荐的产品、内容或服务。 文章目录 前言核心机器学习为什么说机器学习是智能推荐系统的基础呢&#xff1f; 数据挖掘数据…

布置Zabbix监控

一、在 Web 页面中添加 agent 主机 1.1打开Zabbix的Web页面 2.2在 Web 页面中添加 agent 主机 二、在 Web 页面创建自定义监控项模板 2.1创建模版

3.1 Qt样式选择器

本期内容 3.1 样式选择器 3.1.1 Universal Selector (通用选择器) 3.1.2 Type Selector (类型选择器) 3.1.3 Property Selector (属性选择器) 3.1.4 Class Selector (类选择器) 3.1.5 ID Selector (ID选择器) 3.1.6 Descendant Selector (后裔选择器) 3.1.7 Chil…

励志长篇小说《周兴和》书连载之十四 守诚信身负巨债

守诚信身负巨债 天色阴霾。看样子又要下雨了吧&#xff1f; 周兴和强打精神&#xff0c;送走了来谈业务的几个客人后&#xff0c;一下就瘫倒在了藤椅上&#xff0c;一动也不想动——几个月来没日没夜的工作&#xff0c;十几天以来在高原的奔波&#xff0c;他太疲惫了&#xff…

【C与C++的相互调用方法】

C与C的相互调用方法 C与C为什么相互调用的方式不同C中调用CC中调用C致谢 C与C为什么相互调用的方式不同 C 和 C 之间的相互调用方式存在区别&#xff0c;主要是由于 C 和 C 语言本身的设计和特性不同。 函数调用和参数传递方式不同&#xff1a;C 和 C 在函数调用和参数传递方面…

【力扣每日一题】88. 合并两个有序数组 双指针 辅助数组 8.13打卡

文章目录 题目思路代码 题目 88. 合并两个有序数组 难度&#xff1a; 简单 描述&#xff1a; 给你两个按 非递减顺序 排列的整数数组 nums1 和 nums2&#xff0c;另有两个整数 m 和 n &#xff0c;分别表示 nums1 和 nums2 中的元素数目。 请你 合并 nums2 到 nums1 中&am…