消息队列(一)-- RabbitMQ入门(4)

news2024/10/2 16:27:45

RabbitMQ 其他知识点

幂等性

  • 消息重复消费
    消费者在消费MQ 中的消息时,MQ 已经把消息发送给消费者,消费者在给 MQ 返回 ack 时网络中断,故MQ 未收到确认消息,该消息会重新发给其他消费者,或网络重新连接后再次发给该消费者,但是实际上该消息已被消费过了,造成消费者重复消费同一条消息。

  • 解决思路
    MQ 消费者的幂等性的解决一般使用全局ID 或者写个唯一标识,如时间戳、UUID 或 MQ的ID等,每次消费消息时用ID 判断该消息是否已经消费过。
    业界主流的幂等性有两种操作:

    • 唯一ID + 指纹码机制
      指纹码:一些规则或时间戳加别的服务给的唯一信息码,它并不一定是我们系统生成的,基本都是由业务规则拼接而来,但是一定要保证唯一性,然后就利用查询语句进行判断这个 id 是否存在数据库中。优势就是实现简单就一个拼接,然后查询判断是否重复。劣势就是在高并发时,如果是单个数据库就会有写入性能瓶颈,当然也可以采用分库分表提升性能,但是不太推荐。
    • Redis 原子性
      利用 redis 执行 setnx 命令,天然具有幂等性。从而实现不重复消费。

优先级队列

曾经后端系统是使用 redis 来存储定时轮询,而 redis 只能用List 做一个简单的消息队列,并不能实现优先级的场景。所以订单量大了后采用 RabbitMQ 进行改造和优化,如果发现是VIP客户的订单,就给一个相对较高的优先级,否则默认优先级。

  • 控制台页面添加
    在这里插入图片描述
  • 队列声明代码中添加优先级
Map<String , Object> params = new HashMap<>();
params.put("x-max-priority", 10);//官方允许是0-255 之间 ,此处设置10 允许优先级范围为0-10 不要设置过大,浪费CPU与内存
channel.queueDeclare(QUEUE_NAME, true, false, false, params);
for (int i = 0; i < 10; i++) {
    String message = "info"+(i+1);
    if((i+1) == 5){
        AMQP.BasicProperties properties = new AMQP.BasicProperties()
                .builder().priority(5).build();
        channel.basicPublish("", QUEUE_NAME, properties, message.getBytes());
    }else{
        channel.basicPublish("", QUEUE_NAME, null, message.getBytes());
    }
}

先运行生产者发消息,然后再运行消费者,消息info5 因为设置了优先级,优先被消费了。
在这里插入图片描述

  • 实现队列优先级注意如下:
    队列需要设置为优先级队列,消息需要设置消息的优先级,消费者需要等待消息已经发送到队列中才能消费,才有机会对消息进行排序。

惰性队列

RabbitMQ 从3.6.0 版本开始引入了惰性队列的概念。惰性队列会尽可能的将消息存入磁盘中,而在消费者消费到响应的消息时才会被加载到内存中,它的一个重要设计目标是能够支持更长的队列,即支持更多的消息存储。当消费者由于各种各样的原因(比如消费者下线、宕机或者由于维护而关闭等)而致使长时间内不能消费消息造成堆积是,惰性队列就很有必要了。
默认情况下,当消费者将消息发送到 RabbitMQ 的时候,队列中的消息会尽可能的存储在内存中,这样可以更加快速的将消息发送给消费者。即使是持久化的消息,在被写入磁盘的同时也会在内存中驻留一份备份。当 RabbbitMQ 需要释放内存的时候,会将内存中的消息换页至磁盘中,这个操作会消耗较长的时间,也会阻塞队列的操作,进而无法接受新的消息。

  • 两种模式
    队列具备两种模式:default 和 lazy 。默认的为 default。lazy模式为惰性队列的模式,可以通过调用 channel.queueDeclare 方法的时候,在参数中设置。也可以通过 Policy 的方式设置。如果一个队列同时使用两种方式设置的话,那么 Policy 方式具备更高的优先级。
Map<String, Object> args = new HashMap<String, Object>();
args.put("x-queue-mode","lazy");
channel.queueDeclare("myqueue",false,false,false,args);

在发送1百万条消息,每条消息大概占1KB的情况下,普通队列占用内存是1.2GB,而惰性队列仅仅占用1.5MB。

RabbitMQ 集群

集群搭建步骤

在这里插入图片描述

  • 把原有装过RabbitMQ 的虚拟机克隆两份,分别修改IP。
    在这里插入图片描述
  • 修改3台机器的主机名称,改完需要重启机器
    vim /etc/hostname
    在这里插入图片描述
  • 配置各个节点的 hosts 文件,让各个节点都能互相识别对方
    vim /etc/hosts
    192.168.1.130 node1
    192.168.1.131 node2
    192.168.1.132 node3
    在这里插入图片描述
  • 以确保各个节点的 cookie 文件使用的是同一个值
    在 node1 上执行远程操作命令
    scp /var/lib/rabbitmq/.erlang.cookie root@node2:/var/lib/rabbitmq/.erlang.cookie
    scp /var/lib/rabbitmq/.erlang.cookie root@node3:/var/lib/rabbitmq/.erlang.cookie
  • 启动 RabbitMQ 服务,顺带启动 Erlang 虚拟机和RabbitMQ 应用服务器
    rabbitmq-server -detached
  • 在节点2 执行
    rabbitmqctl stop_app
    (rabbitmqctl stop 会将 Erlang 虚拟机关闭,rabbitmqctl stop_app 只关闭 RabbitMQ 服务)
    rabbitmqctl reset
    rabbitmqctl join_cluster rabbit@node1
    (需要节点1服务器防火墙放开4369和25672端口)
    rabbitmqctl start_app(只启动应用服务)
    在这里插入图片描述
    在这里插入图片描述
  • 在节点3执行
    rabbitmqctl stop_app
    rabbitmqctl reset
    rabbitmqctl join_cluster rabbit@node2
    rabbitmqctl start_app
  • 集群状态
    rabbitmqctl cluster_status
    在这里插入图片描述
  • 需要重新设置用户
    创建账号
    rabbitmqctl add_user admin 123456
    设置用户角色
    rabbitmqctl set_user_tags admin administrator
    设置用户权限
    rabbitmqctl set_permissions -p “/” admin “.*” “.*” “.*”
    然后可以访问管理界面查看(可以任意访问其中一个IP)
    在这里插入图片描述
  • 解除集群节点(node2 和 node3 机器分别执行)
    rabbitmqctl stop_app
    rabbitmqctl reset
    rabbitmqctl start_app
    rabbitmqctl cluster_status
    rabbitmqctl forget_cluster_node rabbit@node2(node1 机器上执行)

镜像队列

引入镜像队列(Mirror Queue)的机制,可以将队列镜像到集群中的其他 Broker 节点之上,如果集群中的一个节点失效了,队列能自动地切换到镜像中的另一个节点上,保证服务的可用性。

  • 启动三台集群节点
  • 随便找一个节点添加policy
    在这里插入图片描述
    在这里插入图片描述
    其中
    Name:mirrior-two 为名称,可随意起名
    Pattern:^mirrior 规则,以mirrior 前缀的交换机和队列进行备份
    ha-mode:exactly 指定模式
    ha-params:2 备两份
    ha-sync-mode:automatic 同步模式为自动
    在这里插入图片描述

创建一个队列,并发送一个消息
在这里插入图片描述
在这里插入图片描述
然后通过命令把节点1停止,发现自动又备份到节点3上。
在这里插入图片描述

在这里插入图片描述
通过连接节点2,依然能消费消息。
在这里插入图片描述

  • 就算整个集群只剩一台机器,依然能消费队列里的消息。

HAProxy 高可用负载均衡

HAProxy 提供高可用性、负载均衡及基于 TCPHTTP 应用的代理,支持虚拟主机,它是免费、快速并且可靠的一种解决方案。HAProxy 实现了一种事件驱动、单一进程模型,此模型支持非常大的并发连接数。
在这里插入图片描述

Federation Exchange

因为减少网络延迟,两个地方(比如北京和上海)都有MQ服务,但是需要数据同步,就需要Federation 插件解决这个问题。

  • 需要保证每台节点单独运行

  • 在每台机器上开启 Federation 相关插件
    rabbitmq-plugins enable rabbitmq_federation
    rabbitmq-plugins enable rabbitmq_federation_management

  • 原理图
    在这里插入图片描述
    先运行consumer 在 node2 创建 fed_exchange

        ConnectionFactory factory = new ConnectionFactory();
        factory.setHost("192.168.1.131");
        factory.setUsername("admin");
        factory.setPassword("123456");
        Connection connection = factory.newConnection();
        Channel channel = connection.createChannel();
        channel.exchangeDeclare("fed_exchange", BuiltinExchangeType.DIRECT);
        channel.queueDeclare("node2_queue", true, false, false, null);
        channel.queueBind("node2_queue", "fed_exchange", "routeKey");
        //声明 接收消息
        DeliverCallback deliverCallback = (consumerTag, message) -> {
            System.out.println(new String(message.getBody()));
        };
        //取消消息时的回调
        CancelCallback cancelCallback = consumerTag -> {
            System.out.println("消息消费被中断");
        };
        channel.basicConsume(QUEUE_NAME, true, deliverCallback, cancelCallback);
  • 在 downstream(node2)配置 upstream(node1)
    在这里插入图片描述
  • 添加policy
    在这里插入图片描述
  • 查看状态
    在这里插入图片描述

Federation Queue

联邦队列可以在多个 Broker 节点(或者集群)之间为单个队列提供负载的功能。一个联邦队列可以连接一个或多个上游队列(upstream queue),并从这些上游队列中获取消息以满足本地消费者消费消息的需求。

  • 原理图
    在这里插入图片描述
  • 添加 upstream(同上)
  • 添加policy
    在这里插入图片描述

Shovel

跟 Federation 具备的数据转发功能类似,Shovel 够可靠、持续地从一个 Broker 中的队列(作为源端,即 source)拉取数据并转发至另一个 Broker 中的交换器(作为目的端,即 destination)。作为源端的队列和作为目的端的交换器可以同时位于同一个 Broker,也可以位于不同的 Broker上。Shovel 可以翻译为“铲子”,是一种比较形象的比喻,这个“铲子”可以将消息从一方“铲到”另一方。Shovel 行为就像优秀的客户端应用程序能够负责连接源和目的地、负责消息的读写及负责连接失败问题的处理。
在这里插入图片描述

  • 开启插件(需要的机器都开启)
    rabbitmq-plugins enable rabbitmq_shovel
    rabbitmq-plugins enable rabbitmq_shovel_management
    在这里插入图片描述
    在这里插入图片描述
  • 添加 shovel 源和目的地
    在这里插入图片描述
    在这里插入图片描述

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.coloradmin.cn/o/787985.html

如若内容造成侵权/违法违规/事实不符,请联系多彩编程网进行投诉反馈,一经查实,立即删除!

相关文章

【高危】Atlassian Confluence 远程代码执行漏洞

漏洞描述 Confluence 是由 Atlassian 开发的知识管理与协同软件&#xff0c;通常在企业内部用作wiki系统。 Confluence 7.19.8到8.2.0之前的版本中存在远程代码执行漏洞&#xff0c;具有登录权限的攻击者无需用户交互即可在 Confluence 服务器中执行任意命令。 漏洞名称Atlass…

25、matlab里面的10中优化方法介绍——Opt_Golden法(matlab程序)

1.简述 基本思想 黄金分割法也称为 0.618 法&#xff0c;其基本思想是通过取试探点和进行函数值比较&#xff0c;使包含极小点的搜索区间不断缩短以逼近极小值点。适用于确定区间上的任何单谷函数求极小值的问题。 公式推导 设有定义在[ a , b ] [a,b][a,b]上的单谷函数 φ ( …

Jmeter(二十三):快速生成测试报告

一、jmeter配置 首先要保证jmeter命令是ok的,如果你在cmd中输入jmeter -v,有出现如下截图所示的信息,那就说明jmeter环境ok; 二、jmeter执行结合命令 生成HTML测试报告 1.完成脚本的调试、参数化、断言等操作。然后在聚合报告中指定日志文件存储路径,路径中最好不要包含有…

魏牌转型,别笑得太早

作者 | 魏启扬 来源 | 洞见新研社 魏牌似乎终于迎来了自己的“救世主”。 确定冲击高端智能新能源赛道&#xff0c;并且战略性放弃2000万辆的燃油车市场后&#xff0c;魏牌CEO陈思英将2023年定位为反击之年。 4月13日&#xff0c;上海车展前夕&#xff0c;魏牌推出”比‘500…

数分面试题-SQL常见面试题型1

目录标题 1、连续时间问题1.1 最近一周内的活跃天数1.2 每个用户一周内最大连续活跃天数1.3 计算截至当前&#xff0c;每个用户已经连续签到的天数 2、时间间隔问题举例3、sql窗口分析函数3.1 有一个日志登陆列表&#xff0c;获取用户在某个页面停留时长3.2 寻找至少连续出现3次…

大文件传输中的加密与安全措施

随着现代科技的不断发展&#xff0c;大文件传输已经成为了日常工作中不可或缺的一部分。但是&#xff0c;大文件传输中面临的安全问题也越来越凸显&#xff0c;因此加密与安全措施对于保护大文件传输的安全性至关重要。 一、密码学 密码学是加密与安全措施的基础&#xff0c;它…

ES6:Object.assign方法详解

ES6&#xff1a;Object.assign方法详解 1、前言2、语法3、基本用法3.1 目标对象和源对象无重名属性3.2 目标对象和源对象有重名属性3.3 有多个源对象3.4 其他情况3.4.1 只有一个参数时&#xff0c;Object.assign会直接返回该参数3.4.2 如果该参数不是对象&#xff0c;则会先转成…

[ 容器 ] Harbor 私有仓库的部署与管理

目录 一、什么是Harbor二、Harbor的特性三、Harbor的构成四、Harbor 部署五、关于 Harbor.cfg 配置文件中有两类参数&#xff1a;所需参数和可选参数六、维护管理Harbor 一、什么是Harbor Harbor 是 VMware 公司开源的企业级 Docker Registry 项目&#xff0c;其目标是帮助用户…

centos7 基础设置

CentOS 7 是一种基于 Linux 操作系统的发行版&#xff0c;它是来自于 Red Hat Enterprise Linux&#xff08;RHEL&#xff09;源代码的重构版本。 CentOS 7 是由社区开发和维护的免费操作系统&#xff0c;被广泛应用于服务器环境和企业级应用。 CentOS 7 提供了稳定、安全且可…

OPTEE之静态代码分析实战三——optee_examples

ATF(TF-A)/OPTEE之静态代码分析汇总 一、optee_examples源码下载及分析 前文分别对optee_os和optee_client进行了静态代码分析实战,本次对optee_examples实施soanrlint静态代码分析,先到官方网站下载源码。官方网站位于github,网址optee_examples。 各发布版本如下…

windows下nginx更改配置unknown directive踩坑填坑

windows下nginx更改配置踩坑填坑 windows下nginx大坑&#xff1a;首先笔者建议了使用路径下cmd的方式启动服务&#xff0c;由于笔者更改了nginx配置文件&#xff0c;重新加载启动&#xff08;命令nginx -s reload&#xff09;nginx后一直报错&#xff0c;采用双击启动&#xff…

国际化(i18n)

国际化(i18n) 概述 i18n&#xff08;其来源是英文单词 internationalization的首末字符i和n&#xff0c;18为中间的字符数&#xff09;是“国际化”的简称。在信息技术领域&#xff0c;国际化与本地化&#xff08;英文&#xff1a;internationalization and localization&…

STM32 互补PWM 带死区 HAL

1、设置PWM波频率100KHz&#xff0c;占空比50%&#xff0c;死区时间1us 2、 while 循环之前启动PWM HAL_TIM_PWM_Start(&htim1, TIM_CHANNEL_1); //启动TIM1_CH1 PWM输出 HAL_TIMEx_PWMN_Start(&htim1,TIM_CHANNEL_1);//启动TIM1_CH1N PWM输出 3、死区计算 DT_time…

基于Jquery EasyUI JSZip FileSaver的简单使用

一、前言 在前端的项目开发中 &#xff0c;下载文件压缩包是很重要的一个环节&#xff0c;那么怎么下载多个文件并压缩成ZIP下载呢&#xff1f; 二、使用步骤 1、引用库 <script type"text/javascript" src"~/Scripts/comm/jszip.min.js" ></…

draw.io画图时,用一个箭头(线段)连结一个矩形和直线时,发现,无论怎么调节,都无法使其无缝连接。

问题描述&#xff1a;draw.io画图时&#xff0c;用一个箭头&#xff08;线段&#xff09;连结一个矩形和直线时&#xff0c;发现&#xff0c;无论怎么调节&#xff0c;都无法使其无缝连接。要么少一段&#xff0c;如图1所示。要么多一段&#xff0c;如图2所示。 图1&#xff0c…

国标GB28181监控设备接入EasyCVR如何正确获取RTMP与RTSP视频流

安防视频监控平台EasyCVR可拓展性强、视频能力灵活、部署轻快&#xff0c;可支持的主流标准协议有国标GB28181、RTSP/Onvif、RTMP等&#xff0c;以及支持厂家私有协议与SDK接入&#xff0c;包括海康Ehome、海大宇等设备的SDK等。平台既具备传统安防视频监控的能力&#xff0c;比…

数据结构之优先级队列(堆)

文章目录 1.优先级队列概念 &#x1f4ae;2.优先级队列的模拟实现&#x1f4ae;3.常用接口PrinrityQueue介绍&#x1f4ae;4.堆的应用&#x1f4ae; 1.优先级队列概念 &#x1f4ae; 优先级队列 &#xff1a;是不同于先进先出队列的另一种队列。每次从队列中取出的是具有最高优…

Linux —— 进程地址空间

目录 一&#xff0c;虚拟地址 二&#xff0c;进程地址空间 一&#xff0c;虚拟地址 #include<stdio.h> #include <unistd.h> #include <stdlib.h> int g_val 0; int main() {pid_t id fork();if(id<0) …

聊聊STM32的基本定时器

STM32 的基本定时器&#xff08;Basic Timer&#xff09;是一种简单的定时器模块&#xff0c;用于生成基于时钟频率的定时中断。它可以用于实现各种定时和计时功能&#xff0c;例如延时、频率测量、PWM 生成等。 基本定时器通常由一个 16 位的自由运行计数器和一个预分频器组成…

VMware InstallBuilder Crack

VMware InstallBuilder Crack VMware InstallBuilder是一种开发工具&#xff0c;用于构建桌面和服务器软件的跨平台安装程序。使用InstallBuilder&#xff0c;您可以从单个项目文件和构建环境中快速创建Linux、Windows、Mac OS X、Solaris和其他平台的动态专业安装程序。除了安…