RabbitMQ手动应答与持久化

news2025/1/16 4:00:06

1.SleepUtil线程睡眠工具类

package com.hong.utils;

/**
 * @Description: 线程睡眠工具类
 * @Author: hong
 * @Date: 2023-12-16 23:10
 * @Version: 1.0
 **/
public class SleepUtil {
    public static void sleep(int second) {
        try {
            Thread.sleep(1000*second);
        } catch (InterruptedException e) {
            Thread.currentThread().interrupt();
        }
    }
}

2.消息生产者

package com.hong.rabbitmq3;

import com.hong.utils.RabbitMQUtil;
import com.rabbitmq.client.Channel;

import java.util.Scanner;

/**
 * @Description: 消息手动应答时不丢失,放回队列重新消费
 * @Author: hong
 * @Date: 2023-12-16 22:33
 * @Version: 1.0
 **/
public class Task3 {

    public static final String TASK_QUEUE_NAME = "ack_queue";

    public static void main(String[] args) throws Exception{
        Channel channel = RabbitMQUtil.getChannel();
        channel.queueDeclare(TASK_QUEUE_NAME,false,false,false,null);
        Scanner scanner = new Scanner(System.in);
        System.out.println("请输入:");
        while (scanner.hasNext()){
            String message = scanner.next();
            channel.basicPublish("",TASK_QUEUE_NAME,null,message.getBytes("UTF-8"));
            System.out.println("消息发送完成------" + message);
        }
    }
}

3.两个消费者

模拟一个处理速度快(Worker3),另一个处理速度慢(Worker4)

3.1.处理时间短

package com.hong.rabbitmq3;

import com.hong.utils.RabbitMQUtil;
import com.hong.utils.SleepUtil;
import com.rabbitmq.client.CancelCallback;
import com.rabbitmq.client.Channel;
import com.rabbitmq.client.DeliverCallback;

/**
 * @Description: 消息手动应答时不丢失,放回队列重新消费
 * @Author: hong
 * @Date: 2023-12-16 23:05
 * @Version: 1.0
 **/
public class Worker3 {
    private static final String TASK_QUEUE_NAME = "ack_queue";

    public static void main(String[] args) throws Exception{
        Channel channel = RabbitMQUtil.getChannel();
        System.out.println("worker3等待接收消息,处理速度快");

        DeliverCallback deliverCallback = (comsumerTag, message) -> {
            SleepUtil.sleep(1);
            System.out.println("接收到的消息:"+  new String(message.getBody(),"UTF-8"));
            //手动应答
            /**
             * 第一个参数:消息标识
             * 第二个参数是否批量:true批量
             */
            channel.basicAck(message.getEnvelope().getDeliveryTag(),false);
        };

        CancelCallback cancelCallback = var -> System.out.println(var + "消息消费被中断!");
        //手动应答false
       channel.basicConsume(TASK_QUEUE_NAME,false,deliverCallback,cancelCallback);
    }
}

3.2.处理时间长

package com.hong.rabbitmq3;

import com.hong.utils.RabbitMQUtil;
import com.hong.utils.SleepUtil;
import com.rabbitmq.client.CancelCallback;
import com.rabbitmq.client.Channel;
import com.rabbitmq.client.DeliverCallback;

/**
 * @Description: 消息手动应答时不丢失, 放回队列重新消费
 * @Author: hong
 * @Date: 2023-12-16 23:05
 * @Version: 1.0
 **/
public class Worker4 {
    private static final String TASK_QUEUE_NAME = "ack_queue";

    public static void main(String[] args) throws Exception{
        Channel channel = RabbitMQUtil.getChannel();
        System.out.println("worker4等待接收消息,处理速度慢");

        DeliverCallback deliverCallback = (comsumerTag, message) -> {
            SleepUtil.sleep(20);
            System.out.println("接收到的消息:"+  new String(message.getBody(),"UTF-8"));
            //手动应答
            /**
             * 第一个参数:消息标识
             * 第二个参数是否批量:true批量
             */
            channel.basicAck(message.getEnvelope().getDeliveryTag(),false);
        };

        CancelCallback cancelCallback = var -> System.out.println(var + "消息消费被中断!");
        
        //手动应答false
       channel.basicConsume(TASK_QUEUE_NAME,false,deliverCallback,cancelCallback);
    }
}

4.结果

启动生产者后启动2个消费者,等消息bb接收到后,发送cc和dd
在这里插入图片描述
在这里插入图片描述
等Worker4接收到消息bb后将其关闭,发现原本该Worker4消费的消息dd并未丢失,重回队列被Worker3消费
在这里插入图片描述

5.持久化

5.1.队列持久化

package com.hong.rabbitmq4;

import com.hong.utils.RabbitMQUtil;
import com.rabbitmq.client.Channel;

import java.util.Scanner;

/**
 * @Description: 队列持久化
 * @Author: hong
 * @Date: 2023-12-17 22:52
 * @Version: 1.0
 **/
public class Task4 {
    public static final String TASK_QUEUE_NAME = "persist_queue";

    public static void main(String[] args) throws Exception{
        Channel channel = RabbitMQUtil.getChannel();
        //true持久化
        channel.queueDeclare(TASK_QUEUE_NAME,true,false,false,null);
        Scanner scanner = new Scanner(System.in);
        System.out.println("请输入:");
        while (scanner.hasNext()){
            String message = scanner.next();
            channel.basicPublish("",TASK_QUEUE_NAME,null,message.getBytes("UTF-8"));
            System.out.println("消息发送完成------" + message);
        }
    }
}

5.2.消息持久化

package com.hong.rabbitmq4;

import com.hong.utils.RabbitMQUtil;
import com.rabbitmq.client.Channel;
import com.rabbitmq.client.MessageProperties;

import java.util.Scanner;

/**
 * @Description: 队列持久化与消息持久化
 * @Author: hong
 * @Date: 2023-12-17 22:52
 * @Version: 1.0
 **/
public class Task4 {
    public static final String TASK_QUEUE_NAME = "persist_queue";

    public static void main(String[] args) throws Exception{
        Channel channel = RabbitMQUtil.getChannel();
        //队列持久化   true持久化
        channel.queueDeclare(TASK_QUEUE_NAME,true,false,false,null);
        Scanner scanner = new Scanner(System.in);
        System.out.println("请输入:");
        while (scanner.hasNext()){
            String message = scanner.next();
            //消息持久化  MessageProperties.PERSISTENT_TEXT_PLAIN
            channel.basicPublish("",TASK_QUEUE_NAME, MessageProperties.PERSISTENT_TEXT_PLAIN,message.getBytes("UTF-8"));
            System.out.println("消息发送完成------" + message);
        }
    }
}

在这里插入图片描述

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.coloradmin.cn/o/1318570.html

如若内容造成侵权/违法违规/事实不符,请联系多彩编程网进行投诉反馈,一经查实,立即删除!

相关文章

深入理解Java关键字volatile

前置知识-了解以下CPU结构 如下图所示,每个CPU都会有自己的一二级缓存,其中一级缓存分为数据缓存和指令缓存,这些缓存的数据都是从内存中读取的,而且每次都会加载一个cache line,关于cache line的大小可以使用命令cat…

DOS 系统(命令行)

文章目录 DOS 系统DOS 常用命令DOS 高级命令DOS 批处理命令DOS 应用场景 DOS 系统 操作系统的发展史(DOS/Windows篇) DOS操作系统的历史 DOS(Disk Operating System) 是 磁盘操作系统 的缩写,是一种早期的个人计算机操…

Mybatis的插件运⾏原理,如何编写⼀个插件?

🚀 作者主页: 有来技术 🔥 开源项目: youlai-mall 🍃 vue3-element-admin 🍃 youlai-boot 🌺 仓库主页: Gitee 💫 Github 💫 GitCode 💖 欢迎点赞…

Linux:TCP 序列号简介

文章目录 1. 前言2. 什么是 TCP 序列号?3. TCP 序号 的 初始值设置 和 后续变化过程3.1 三次握手 连接建立 期间 客户端 和 服务端 序号 的 变化过程3.1.1 客户端 socket 初始序号 的 建立3.1.2 服务端 socket 初始序号 的 建立3.1.3 客户端 socket 接收 服务端 SAC…

动态规划优化技巧

一、斐波那契系列 1、滚动数组优化空间复杂度 2、dp数组初始化/处理边界优化 力扣(LeetCode)官网 - 全球极客挚爱的技术成长平台

《Global illumination with radiance regression functions》

总结一下最近看的这篇结合神经网络的全局光照论文 这是一篇2013年TOG的论文。 介绍 论文的主要思想是利用了神经网络的非线性特性去拟合全局光照中的间接光照部分,采用了基础的2层MLP去训练,最终能实现一些点光源、glossy材质的光照渲染。为了更好的理…

【POI的如何做大文件的写入】

🔓POI如何做大文件的写入 🏆文件和POI之间的区别是什么?🏆POI对于当今的社会发展有何重要性?🏆POI大文件的写入🎖️使用XSSF写入文件🎖️使用SXSSFWorkbook写入文件🎖️对…

《ThreadLocal使用与学习总结:2023-12-15》由浅入深全面解析ThreadLocal

由浅入深全面解析ThreadLocal 目录 由浅入深全面解析ThreadLocal简介基本使用ThreadLocal与synchronized的区别ThreadLocal现在的设计(JDK1.8)ThreadLocal核心方法源码分析ThreadLocalMap源码分析弱引用与内存泄露(内存泄漏和弱引用没有直接关…

代码随想录算法训练营第十四天 | 二叉树理论基础、递归遍历 、迭代遍历、统一迭代

今天学习内容:二叉树理论基础、递归遍历 、迭代遍历、统一迭代 讲解:代码随想录 二叉树题目思维导图如下,来自代码随想录。 1.二叉树理论基础 1.1二叉树种类 满二叉树 完全二叉树 二叉搜索树 平衡二叉搜索树 C中map、set、multimap&…

[Verilog] Verilog 操作符与表达式

主页: 元存储博客 文章目录 前言1. 操作符2. 操作数3 表达式总结 前言 1. 操作符 图片来源: https://www.runoob.com/ Verilog语言中使用的操作符包括: 算术操作符:加法()、减法(-)、乘法(*)、除法(/)、取模(%)、自增()、自减(–…

Vue中插槽的使用

目录 一、默认插槽 (1)概念 (2)代码展示 (3)后备内容 二、具名插槽 (1)概念 (2)代码展示 三、作用域插槽 (1)概念 &#xff0…

【经典LeetCode算法题目专栏分类】【第2期】组合与排列问题系列

《博主简介》 小伙伴们好,我是阿旭。专注于人工智能、AIGC、python、计算机视觉相关分享研究。 ✌更多学习资源,可关注公-仲-hao:【阿旭算法与机器学习】,共同学习交流~ 👍感谢小伙伴们点赞、关注! 组合总和1 class So…

【计算机组成原理】存储系统基本概念与基本组成

📢:如果你也对机器人、人工智能感兴趣,看来我们志同道合✨ 📢:不妨浏览一下我的博客主页【https://blog.csdn.net/weixin_51244852】 📢:文章若有幸对你有帮助,可点赞 👍…

FRP内网映射家用服务器至公网访问

兄弟们,服务器到货了,后续与大家分享内容就用它了。我预装的操作系统是Centos8,首先要解决的是远程访问的问题。 【特别注意】下述的端口,记得在阿里云安全组配置中放开端口入规则!! 1. FRP服务器配置 之前我有购买…

UDP多人聊天室

讲解的是TCP和UDP两种通信方式它们都有着自己的优点和缺点 这两种通讯方式不通的地方就是TCP是一对一通信 UDP是一对多的通信方式 UDP通信 主要的方向是一对多通信方式 UDP通信就是一下子可以通信多个对象,这就是UDP对比TCP的优势,UDP它的原理呢 就是…

Spring之容器:IOC(1)

学习的最大理由是想摆脱平庸,早一天就多一份人生的精彩;迟一天就多一天平庸的困扰。各位小伙伴,如果您: 想系统/深入学习某技术知识点… 一个人摸索学习很难坚持,想组团高效学习… 想写博客但无从下手,急需…

破译模式:模式识别在计算机视觉中的作用

一、介绍 在当代数字领域,计算机视觉中的模式识别是关键的基石,推动着众多技术进步和应用。本文探讨了计算机视觉中模式识别的本质、方法、应用、挑战和未来趋势。通过使机器能够识别和解释视觉数据中的模式,模式识别不仅推动了计算机视觉领域…

什么是回调函数

需求 A,B两个小组开发一个功能。B小组开发制作油条模块:make_youtiao。A小组需要调用B小组开发的模块,然后执行后续的操作:sell()如下图: 上面的方式A小组必须等待B小组开发的模块make_youtiao执行完成后才能执行sell()。 上图代…

JVM-2-对象

对象创建 当Java虚拟机遇到一条字节码new指令时,首先将去检查这个指令的参数是否能在常量池中定位到一个类的符号引用,并且检查这个符号引用代表的类是否已被加载、解析和初始化过。如果没有,那必须先执行相应的类加载过程。 为对象分配空间…

linux性能优化-cpu使用率

文章目录 1.CPU使用率2.节拍率的概念2.1.查看系统节拍率2.2.用户节拍率2.3.CPU使用率公式 3.怎么查看CPU使用率3.1.top显示系统总体CPU使用情况3.2.pidstat分析每个进程CPU使用情况 4.CPU使用率过高怎么办4.1.perf命令详解 1.CPU使用率 用什么指标来描述系统的CPU性能呢?不是…