RabbitMQ系列(25)--RabbitMQ搭建镜像队列

news2025/1/12 15:49:56

 前言:如果RabbitMQ集群中只有一个Broker节点,那么该节点的失效将导致整体服务的临时性不可用,并且也可能会导致消息的丢失,虽然可以将所有消息都设置为持久化,并且对应队列的durable属性也设置为true,这样可以保证消息不丢失,但是这样仍然无法避免由于缓存导致的问题:因为消息在发送之后和被写入磁盘井执行刷盘动作之间存在一个短暂却会产生问题的时间窗。通过 publisherconfirm机制能够确保客户端知道哪些消息己经存入磁盘,尽管如此,一般不希望遇到因单点故障导致的服务不可用,而通过引入镜像队列(Mirror Queue)的机制,可以将队列镜像到集群中的其他Broker节点之上,如果集群中的一个节点失效了,队列能自动地切换到镜像中的另一个节点上以保证服务的可用性。


验证过程如下(没开启消息持久化,有兴趣的同学可以看看):

我这里准备了3台虚拟机来跑RabbitMQ服务分别为

node1:192.168.194.128

node2:192.168.194.129

node3:192.168.194.130

(1)执行以下代码,在node1节点里生成队列

package com.ken;

import com.rabbitmq.client.Channel;
import com.rabbitmq.client.Connection;
import com.rabbitmq.client.ConnectionFactory;
import com.rabbitmq.client.MessageProperties;

/**
 * 生产者
 */
public class Producer {

    //队列名称(用于指定往哪个队列发送消息)
    public static final String QUEUE_NAME = "my_queue";

    //进行发送操作
    public static void main(String[] args) throws Exception{

        //创建一个连接工厂
        ConnectionFactory factory = new ConnectionFactory();
        //设置工厂IP,用于连接RabbitMQ的队列
        factory.setHost("192.168.194.128");
        //设置连接RabbitMQ的用户名
        factory.setUsername("admin");
        //设置连接RabbitMQ的密码
        factory.setPassword("123456");
        //创建连接
        Connection connection = factory.newConnection();
        //获取信道
        Channel channel = connection.createChannel();
        /**
         * 创建队列
         * 第一个参数:队列名称
         * 第二个参数:服务器重启后队列是否还存在,即队列是否持久化,true为是,false为否,默认false,即消息存储在内存中而不是硬盘中
         * 第三个参数:该队列是否只供一个消费者进行消费,是否进行消息共享,true为只允许一个消费者进行消费,false为允许多个消费者对队列进行消费,默认false
         * 第四个参数:是否自动删除,最后一个消费者断开连接后该队列是否自动删除,true自动删除,false不自动删除
         * 第五个参数:其他参数
         */
        channel.queueDeclare(QUEUE_NAME,false ,false,false,null);
        //发消息
        String message = "Hello World";
        /**
         * 用信道对消息进行发布(消息持久化)
         * 第一个参数:发送到哪个交换机
         * 第二个参数:路由的Key值是哪个,本次是队列名
         * 第三个参数:其他参数信息
         * 第四个参数:发送消息的消息体
         */
        channel.basicPublish("",QUEUE_NAME, MessageProperties.PERSISTENT_TEXT_PLAIN,message.getBytes());
        System.out.println("消息发送成功!");
    }

}

效果图:

(2)进页面查看效果,可以得知my_queue这个队列只在node1节点创建了,没在其他节点同步创建

(3) 在node1节点执行关闭RabbitMQ服务的命令来模拟节点宕机

rabbitmqctl stop_app

效果图:

(4)用其他节点的可视化页面来查看集群信息,由图可知node1节点没在运行

(5)查看队列,可以看到my_queue队列的状态为停止

(6)执行以下代码,在node2节点里生成消费者,尝试消费消息

package com.ken;

import com.rabbitmq.client.*;

/**
 * 消费者
 */
public class Consumer {

    //队列名称(用于指定往哪个队列接收消息)
    public static final String QUEUE_NAME = "my_queue";

    //进行接收操作
    public static void main(String[] args) throws Exception{
        //创建一个连接工厂
        ConnectionFactory factory = new ConnectionFactory();
        //设置工厂IP,用于连接RabbitMQ的队列
        factory.setHost("192.168.194.129");
        //设置连接RabbitMQ的用户名
        factory.setUsername("admin");
        //设置连接RabbitMQ的密码
        factory.setPassword("123456");
        //创建连接
        Connection connection = factory.newConnection();
        //获取信道
        Channel channel = connection.createChannel();

        /**
         * 声明消费者接收消息后的回调方法(由于回调方法DeliverCallback是函数式接口,所以需要给DeliverCallback赋值一个函数,为了方便我们这里使用Lambda表达式进行赋值)
         * 为什么要这样写呢,是因为basicConsume方法里的参数deliverCallback的类型DeliverCallback用 @FunctionalInterface注解规定DeliverCallback是一个函数式接口,所以要往deliverCallback参数传的值要是一个函数
         *
         * 以下是DeliverCallback接口的源代码
         *  @FunctionalInterface
         *  public interface DeliverCallback {
         *      void handle (String consumerTag, Delivery message) throws IOException;
         *  }
         */
        DeliverCallback deliverCallback = (consumerTag,message) -> {
            System.out.println(new String(message.getBody()));
        };

        /**
         * 声明消费者取消接收消息后的回调方法(由于回调方法CancelCallback是函数式接口,所以需要给CancelCallback赋值一个函数,为了方便我们这里使用Lambda表达式进行赋值)
         * 为什么要这样写呢,是因为basicConsume方法里的参数cancelCallback的类型CancelCallback用 @FunctionalInterface注解规定CancelCallback是一个函数式接口,所以要往cancelCallback参数传的值要是一个函数
         *
         *  @FunctionalInterface
         *  public interface CancelCallback {
         *      void handle (String consumerTag) throws IOException;
         *  }
         *
         */
        CancelCallback cancelCallback = consumerTag -> {
            System.out.println("取消消费消息");
        };

        /**
         * 用信道对消息进行接收
         * 第一个参数:消费的是哪一个队列的消息
         * 第二个参数:消费成功后是否要自动应答,true代表自动应当,false代表手动应答
         * 第三个参数:消费者接收消息后的回调方法
         * 第四个参数:消费者取消接收消息后的回调方法(正常接收不调用)
         */
        channel.basicConsume(QUEUE_NAME,true,deliverCallback,cancelCallback);
    }

}

效果图:(报错信息提示node1节点上的my_queue队列已经关闭了)

(7)把node1节点上的RabbitMQ服务重新启动起来

rabbitctl start_app

效果图:

(8)查看消费者的日志,可以发现消费者并没有消费消息 

(9)查看队列里消息的情况,可以看到消息丢失了(在未设置消息持久化的情况下)


搭建镜像队列

1、启动node1、node2、node3三台集群节点

2、随便找一个节点添加policy(策略)

(1)进入node1节点的可视化界面

(2)进入添加策略的界面 

(3)给策略取一个名字 

 (4)给策略加上匹配规则,通过正则表达式匹配队列,若交换机或者队列的名字满足以mirror开头这个条件,则那条队列使用该策略

例:

(5)为策略选择模式为ha-mode(ha-mode表示是备机模式)点击HA mode即可

(6)为ha-mode指定获取参数方式为exactly(exactly表示指定参数)

(7)点击HA params,就会往自定义参数里填入ha-params,这里用于指定策略作用的节点的数量

 (8)为ha-params指定策略作用的节点的数量为2(包含被镜像的队列,镜像和被镜像的队列数总共为2)

(9)点击HA sync mode,就会往自定义参数里填入ha-sync-mode,这里用于指定同步的模式

(10)为ha-sync-mode指定同步模式为自动同步模式

(11)最后点击Add/update policy添加策略即可

(12)往上滑动查看策略添加情况

(13)执行代码在node1节点上创建名字以mirror开头的队列

代码如下

package com.ken;

import com.rabbitmq.client.Channel;
import com.rabbitmq.client.Connection;
import com.rabbitmq.client.ConnectionFactory;
import com.rabbitmq.client.MessageProperties;

/**
 * 生产者
 */
public class Producer {

    //队列名称(用于指定往哪个队列发送消息)
    public static final String QUEUE_NAME = "mirror_queue";

    //进行发送操作
    public static void main(String[] args) throws Exception{

        //创建一个连接工厂
        ConnectionFactory factory = new ConnectionFactory();
        //设置工厂IP,用于连接RabbitMQ的队列
        factory.setHost("192.168.194.128");
        //设置连接RabbitMQ的用户名
        factory.setUsername("admin");
        //设置连接RabbitMQ的密码
        factory.setPassword("123456");
        //创建连接
        Connection connection = factory.newConnection();
        //获取信道
        Channel channel = connection.createChannel();
        /**
         * 创建队列
         * 第一个参数:队列名称
         * 第二个参数:服务器重启后队列是否还存在,即队列是否持久化,true为是,false为否,默认false,即消息存储在内存中而不是硬盘中
         * 第三个参数:该队列是否只供一个消费者进行消费,是否进行消息共享,true为只允许一个消费者进行消费,false为允许多个消费者对队列进行消费,默认false
         * 第四个参数:是否自动删除,最后一个消费者断开连接后该队列是否自动删除,true自动删除,false不自动删除
         * 第五个参数:其他参数
         */
        channel.queueDeclare(QUEUE_NAME,false ,false,false,null);
        //发消息
        String message = "Hello World";
        /**
         * 用信道对消息进行发布(消息持久化)
         * 第一个参数:发送到哪个交换机
         * 第二个参数:路由的Key值是哪个,本次是队列名
         * 第三个参数:其他参数信息
         * 第四个参数:发送消息的消息体
         */
        channel.basicPublish("",QUEUE_NAME, MessageProperties.PERSISTENT_TEXT_PLAIN,message.getBytes());
        System.out.println("消息发送成功!");
    }

}

效果图:

(14)进入Queues查看队列的情况,可以发现刚刚创建的mirror_queue队列上有+1,这证明镜像队列创建成功

(15)进入mirror_queue队列查看详情,可以发现镜像队列在node2节点上

3、测试镜像队列是否正常运行

(1)关闭node1节点,模拟node1节点宕机

rabbitmqctl stop_app

效果图: 

从node2的可视化页面可以看到node1节点停机了


注意:

从Queues进入mirror_queue队列查看详情,可以发现当node1节点停掉后node2自动替代了node1节点的位置,node3作为镜像队列的节点,由此可见我们策略里写的ha-params:2这一参数是生效的,使得节点的个数总是保持2个,这样就算我们整个集群只剩下一台机器,在节点不断替代的情况下,消费者始终能消费队列里面的消息


(2)执行以下代码,在node2节点里生成消费者,尝试消费mirror_queue队列的消息,发现node2可以消费mirror_queue队列的消息并且消费成功,这证明mirror_queue队列成功镜像到node2节点上

package com.ken;

import com.rabbitmq.client.*;

/**
 * 消费者
 */
public class Consumer {

    //队列名称(用于指定往哪个队列接收消息)
    public static final String QUEUE_NAME = "mirror_queue";

    //进行接收操作
    public static void main(String[] args) throws Exception{
        //创建一个连接工厂
        ConnectionFactory factory = new ConnectionFactory();
        //设置工厂IP,用于连接RabbitMQ的队列
        factory.setHost("192.168.194.129");
        //设置连接RabbitMQ的用户名
        factory.setUsername("admin");
        //设置连接RabbitMQ的密码
        factory.setPassword("123456");
        //创建连接
        Connection connection = factory.newConnection();
        //获取信道
        Channel channel = connection.createChannel();

        /**
         * 声明消费者接收消息后的回调方法(由于回调方法DeliverCallback是函数式接口,所以需要给DeliverCallback赋值一个函数,为了方便我们这里使用Lambda表达式进行赋值)
         * 为什么要这样写呢,是因为basicConsume方法里的参数deliverCallback的类型DeliverCallback用 @FunctionalInterface注解规定DeliverCallback是一个函数式接口,所以要往deliverCallback参数传的值要是一个函数
         *
         * 以下是DeliverCallback接口的源代码
         *  @FunctionalInterface
         *  public interface DeliverCallback {
         *      void handle (String consumerTag, Delivery message) throws IOException;
         *  }
         */
        DeliverCallback deliverCallback = (consumerTag,message) -> {
            System.out.println(new String(message.getBody()));
        };

        /**
         * 声明消费者取消接收消息后的回调方法(由于回调方法CancelCallback是函数式接口,所以需要给CancelCallback赋值一个函数,为了方便我们这里使用Lambda表达式进行赋值)
         * 为什么要这样写呢,是因为basicConsume方法里的参数cancelCallback的类型CancelCallback用 @FunctionalInterface注解规定CancelCallback是一个函数式接口,所以要往cancelCallback参数传的值要是一个函数
         *
         *  @FunctionalInterface
         *  public interface CancelCallback {
         *      void handle (String consumerTag) throws IOException;
         *  }
         *
         */
        CancelCallback cancelCallback = consumerTag -> {
            System.out.println("取消消费消息");
        };

        /**
         * 用信道对消息进行接收
         * 第一个参数:消费的是哪一个队列的消息
         * 第二个参数:消费成功后是否要自动应答,true代表自动应当,false代表手动应答
         * 第三个参数:消费者接收消息后的回调方法
         * 第四个参数:消费者取消接收消息后的回调方法(正常接收不调用)
         */
        channel.basicConsume(QUEUE_NAME,true,deliverCallback,cancelCallback);
    }

}

效果图:

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.coloradmin.cn/o/732835.html

如若内容造成侵权/违法违规/事实不符,请联系多彩编程网进行投诉反馈,一经查实,立即删除!

相关文章

mysql 常用命令综合简单运用

目录 第一大题创建数据库创建用户表及约束字段修改位置修改字段数据类型修改字段名字添加字段修改表名字删除字段修改表的存储引擎 第二大题创建表及外键和其他约束删除外键约束和查找外键名 第三大题创建数据库创建用户同时授权一些功能修改用户的密码更新权限列表查看用户的权…

pytorch线性模型 学习前要学习的基础知识

跟着刘二大人学pytorch,补全一下我的基础缺失 1.numpy基础 import numpy as np from PIL import Image anp.array([1,2,3]) #生成一维数组 print(a) bnp.arange(1,4)#创建等差数组,默认等差是1,数组为1,2,3&#xff0…

spring 详解三 IOC(spring实例化及后处理器)

Spring实例化基本流程 Spring在容器初始化的时候,读取XMl配置,将其封装成BeanDefinition(Bean定义)对象,描述所有bean的信息 BeanDefinition会注册存储到beanDefinitionMap集合中 Spring框架遍历beanDefinitionMap,使用反射创建Be…

pycharm如何给一串中文快捷加引号(方法二)

点击上方“Python爬虫与数据挖掘”,进行关注 回复“书籍”即可获赠Python从入门到进阶共10本电子书 今 日 鸡 汤 商人重利轻别离,前月浮梁买茶去。 大家好,我是皮皮。 一、前言 前几天在Python白银群【此类生物】问了一个Pycharm基础的问题&a…

SpringBoot配置动态定时任务

1.配置ScheduledTask 主要是实现SchedulingConfigurer,动态传入cron。 package com.hzl.boot.config;import lombok.Data; import org.springframework.boot.context.properties.ConfigurationProperties; import org.springframework.context.annotation.Propert…

使用promise函数封装post请求,封装aes加解密方法,并进行请求头aes加密,封装sm2国密加解密,进行请求体数据加密,响应数据解密。

export default {async post(url, params { header:{}, data:{} }, showLoading true){if(showLoading){uni.showLoading({title:"加载中",mask:true})}let options{header:{...params.header},url:globalParams.basepathurl.url,data:{...params.data}}//渠道 ae…

Devops系列五(CI篇之pipeline libraray)jenkins将gitlab helm yaml和argocd 串联,自动部署到K8S

一、说在前面的话 本文是CI篇的上文,因为上一篇已经作了总体设计,就不再赘述,有需要的请看前文。 我们将演示,使用CI工具–jenkins,怎么和CD工具–argocd串联,重点是在Jenkins该怎么做。准备工作和argocd等…

C++常用库函数 5.输入和输出函数

函数名&#xff1a;fclose 函数原型&#xff1a;int fclose(FILE *stream)&#xff1b; 参数&#xff1a;streamFILE 结构的指针。 所需头文件&#xff1a;<cstdio> 返回值&#xff1a;如果该流成功关闭&#xff0c;fclose 返回0。如果出错&#xff0c;则返回 EOF。 功…

AI在金融领域的应用

AI金融领域 信贷业务 个人信贷单笔数额小、数量大&#xff0c;需要大量的人力和时间投入&#xff0c;信贷审核的数据也呈现出分散化、碎片化的特点。同时传统金融机构和互联网金融公司的风控环节中&#xff0c;普遍存在信息不对称、成本高、时效性差、效率低等问题&#xff0c…

动态规划之343 整数拆分(第6道)

题目&#xff1a; 给定一个正整数 n &#xff0c;将其拆分为 k 个 正整数 的和&#xff08; k > 2 &#xff09;&#xff0c;并使这些整数的乘积最大化。 返回 你可以获得的最大乘积 。 示例&#xff1a; 解法&#xff1a; 其实可以从1开始遍历 j &#xff0c;然后有两种…

Mysql之视图,索引,备份与恢复

目录 一&#xff0c;视图 1.视图是什么&#xff1f; 2.视图的重要性&#xff1f; 3.那些地方使用视图&#xff1f; 4.基本语法 二&#xff0c;索引 1.索引是什么&#xff1f; 2.索引的重要性&#xff1f; 3.索引的种类&#xff1a; 4.那些地方使用索引&#xff1f; 5.…

Gateway服务集成Nacos2021.0.4错误解决

问题 gateway服务集成nacos&#xff0c;启动后报错&#xff1a; Caused by: com.alibaba.nacos.shaded.io.grpc.netty.shaded.io.netty.channel.AbstractChannel$AnnotatedConnectException: Connection refused: no further information:; 版本&#xff1a; jdk:1.8 spring-b…

10_Linux中断

目录 Linux中断API函数 中断号 上半部与下半部 软中断 tasklet 工作队列 设备树中断信息节点 获取中断号 修改设备树文件 按键中断驱动程序编写 编写测试APP Linux中断API函数 先来回顾一下裸机实验里面中断的处理方法: 1.使能中断,初始化相应的寄存器。 2.注册中…

windows配置启动若依前后端项目

一、后端 1、环境准备 JDK8、Redis、Mysql、Maven【并配置镜像源】 以上工具全部使用msi/exe安装&#xff0c;并勾选添加到环境变量&#xff0c;如果没有添加到环境变量可以参考其他博主关于每种怎么配置的情况 mysql新增一个目录名为ry-vue的空数据库 2、前往若依官网下载…

MAC M1上docker rocketmq简单环境搭建和代码

工作了这么多年&#xff0c;rocketmq还没有用过&#xff0c;由于现在的工作中涉及到了&#xff0c;周六吃完午饭就开始搞&#xff0c;结果到现在3点钟才把环境弄好&#xff0c;测试代码搞起。 整个流程分成两步 安装简单的rocket环境起springboot项目测试 参考文章&#xff…

C++STL库常用库函数总结

文章目录 1.vector, 变长数组&#xff0c;倍增的思想 size() 返回元素个数empty() 返回是否为空clear() 清空front()/back() 访问第一个元素/最后一个元素push_back()/pop_back() 插入/弹出最后一个元素begin()/end() 开始元素迭代器/结尾元素迭代器[]支持…

【kafka面试题2】如何保证kafka消息的顺序性

【kafka面试题】如何保证kafka消息的顺序性 一、整体策略 如何保证kafka消息的顺序性呢&#xff0c;其实整体的策略就是&#xff1a;我们让需要有序的消息发送到同一个分区Partition。 为什么说让有序的消息发送到同一个分区Partition就行呢&#xff0c;&#xff0c;下面我们…

tidb之旅——生成列

作者&#xff1a; 有猫万事足 原文来源&#xff1a; https://tidb.net/blog/15d0fbf6 新的问题 之前弄好了TiDB集群&#xff0c;也弄好了dm集群&#xff0c;把写入流量整个切入了TiDB集群运行起来了。但是有个别比较大的日志表&#xff0c;OLAP查询的表现还是不太行。正好7…

Node中的模块引擎EJS

1.安装EJS 2.导入EJS const ejsrequire("ejs") 3.使用ejs渲染 let outer"法外狂徒" let resultejs.render(我是<%outer %>,{outer:outer}) let str我是<%outer %> let resultejs.render(str,{outer:outer}) 说明&#xff1a; 在模板中&#xf…

SpringBoot前后端分离项目,打包、部署到服务器详细图文流程

文章目录 实施步骤一、修改配置文件地址1.修改MySQL配置2.修改Redis配置3.修改日志路径和字符集配置 二、将源码压缩并上传服务器1.上传前端文件2.上传后端文件&#xff08;同上&#xff09; 三、前端项目打包1.安装依赖2.项目打包 四、后端项目打包1.项目打包&#xff08;jar包…