RabbitMQ 开发指南

news2024/11/28 3:14:21

连接RabbitMQ

连接方式一:
在这里插入图片描述
也可以选择使用URI的方式来实现

连接方式二:
在这里插入图片描述
Connection接口被用来创建一个Channel,在创建之后,Channel可以用来发送或者接收消息。

Channel channel = conn.createChannel();

使用交换器和队列

声明一个交换器和队列

channel.exchangeDeclare(exchangeName,"direct",true);
String queueName = channel.queueDeclare().getQueue();
channel.queueBind(queueName,exchangeName,routingKey);

上面创建了一个持久化的、非自动删除的、绑定类型为direct的交换器,同时也创建了一个非持久化的、排他的、自动删除的队列(队列名称由RabbitMQ自动生成),这里的交换器和队列都没有设置特殊的参数。

上面声明的队列具备如下特性:只对当前应用中同一个Connection层面可用,同一个Connection的不同Channel可共用,并且也会在应用连接断开时自动删除。

如果要在应用中共享一个队列,可做如下声明:

channel.exchangeDeclare(exchangeName,"direct",true);
channel.queueDeclare(queueName,true,false,false,null);
channel.queueBind(queueName,exchangeName,routingKey);

这里的队列被声明为持久化的、非排他的、非自动删除的,而且也被分配给另一个确定的已知名称。

exchangeDeclare方法详解

exchangeDeclare有多个重载方法,这些重载方法都是由下面这个方法中缺省的某些参数构成的。

Exchange.DeclareOK exchangeDeclare(String exchange,String type,
boolean durable,boolean autoDelete,boolean internal,Map<String,Object> arguments)
 throws Exception;
  • exchange: 交换器名称
  • type: 交换器类型,常见的有fanout、direct、topic
  • durable: 是否持久化,durable设置true表示持久化,持久化后可以将交换器存盘。
  • autoDelete:是否自动删除,表示一旦这个交换器上没有任何绑定(即没有任何队列与其相连),RabbitMQ会自动删除这个交换机。这对于临时的、生命周期与某些特定队列或应用程序密切相关的交换机非常有用。
  • internal :如果为true,表明是内置的交换器,客户端程序无法直接发送消息到这个交换器,只能通过交换器路由到交换器这种方式。
  • argument:其他一些结构化参数,比如alternate-exchange。

此外,其他声明交换机的方法

  • exchangeDeclareNoWait: 表示在声明exchange时候,不需要服务器返回。这可能会导致一种情况,在声明完一个交换器之后(实际服务器还并未完成交换器的创建),客户端紧接着使用这个交换器,必然会发生异常。
  • exchangeDeclarePassive:可以用来检测交换机是否存在,如果存在则正常返回,如果不存在则抛出异常。

queueDeclare方法详解

queueDeclare方法,只有如下两个重载方法

1. Queue.DeclareOK queueDeclare() throws IOException;
2. Queue.DeclareOK queueDeclare(String queue,boolean durable,
boolean exclusive,boolean autoDelete,Map<String,Object> arguments)
throws IOException;

不带任何参数的queueDeclare方法默认创建一个由RabbitMQ 自动生成名称 的、排他的自动删除非持久化 的队列。

方法参数详解

  • queue:队列名称
  • durable:设置是否持久化,持久化的队列会存盘,在服务器重启的时候保证不丢失消息。
  • exclusive:排他队列
  • autoDelete:自动删除,当队列中没有任何活跃的消费者,RabbitMQ会在一段时间后自动删除该队列。当一个队列删除时,其上持久化的消息也会随之被删除。
  • arguments : 设置队列的其他参数

排他队列的特点和行为

  • 独占性:一旦某个连接声明了一个排他队列,任何其他连接都无法访问或者声明同名的队列。
  • 自动删除:当声明排他队列的连接关闭时,RabbitMQ会自动删除这个队列,即使队列被声明为持久化的。
    同一连接通道共享:虽然排他队列对其他连接不可见,但同一连接内的不同通道可以共享访问这个排他队列。
  • 排他队列常用于那些希望队列仅被当前线程或应用实例使用的场景。

注意:生产者和消费者都能够使用queueDeclare声明一个队列,如果消费者在同一个信道上订阅了另一个队列,就无法在声明队列了一个消费者只能订阅一个队列)。必须先取消订阅,然后将channel设置为“传输模式”,之后才能声明队列。

交换机持久化和队列持久化的区别

  • 交换机持久化主要关注的是交换机的配置和元数据的长期存储,确保重启后配置不变。
  • 队列持久化关注队列自身及其消息的长期存储,需要结合消息的持久化设置来防止消息丢失。

queueBind方法详解

queueBind方法如下

1. Queue.BindOK queueBind(String queue,String exchange,String routingKey) 
throws IOException;

2. Queue.BindOK queueBind(String queue,String exchange,String routingKey,
Map<String,Object> arguments) throws IOException;

3. void queueBindNoWait(String queue,String exchange,String routingKey,
Map<String,Object> arguments) throws IOException;

参数详解

  • queue:队列名称
  • exchange:交换机名称
  • routingKey:用来绑定队列和交换机的路由键
  • argument:定义绑定的一些参数

exchangeBind方法详解

exchangeBind方法如下

 1. Exchange.BindOK exchangeBind(String destination,String source,String routingKey)
 throws IOException;

 2. Exchange.BindOK exchangeBind(String destination,String source,String routingKey,
Map<String,Object> arguments) throws IOException;

 3. Exchange.BindOK exchangeBindNoWait(String destination,String source,
String routingKey,Map<String,Object> arguments) throws IOException;

绑定以后,消息从source交换机转发到destination交换机。某种程度上可以将destination交换机看作一个队列。
在这里插入图片描述

发送消息

如果要发送一个消息,可以使用Channel类的basicPublish方法。

示例:

byte[] messageBodyBytes = "Hello,World!".getBytes();
channel.basicPublish(exchangeName,routingKey,null,messageBodyBytes);

对于basicPublish而言,有几个重载方法

1. void basicPublish(String exchange,String routingKey,
BasicProperties props,byte[] body) throws IOException;

2.void basicPublish(String exchange,String routingKey,boolean mandatory,
BasicProperties props,byte[] body) throws IOException;

3. void basicPublish(String exchange,String routingKey,boolean mandatory,
boolean immediate,BasicProperties props,byte[] body) throws IOException;
  • exchange :交换机的名称,指明消息需要发送到哪个交换机中,如果设置为空,则消息会被发送到RabbitMQ默认的交换机中。
  • routingKey:路由键,交换机根据路由键将消息存储到相应的队列中。
  • props:消息的基本属性集,包括contentType、deliveryMode等
  • byte[] body: 真正要发送的消息内容

消费消息

消费模式分为两种:Push模式和Pull模式。推模式采用Basic.Consume进行消费,拉模式采用Basic.Get进行消费。

推模式

当调用Consumer相关API方法时,不同的订阅采用不同的消费者标签(consumerTag)来区分彼此,在同一个Channel中的消费者也需要通过唯一的消费者标签以作区分,关键消费代码如下所示

在这里插入图片描述
对于消费者来说,显示的设置autoAck为false,接收消息之后进行显示ack操作,这个设置是非常必要的。可以防止消息不必要的丢失。

核心方法

String basicConsume(String queue,boolean autoack,String consumerTag,
boolean noLocal,boolean exclusive,Map<String,Object> arguments,Consumer callback) 
throws IOException;
  • queue: 队列的名称
  • autoAck: 设置是否自动确认
  • consumerTag:消费者标签,用来区分多个消费者
  • noLocal:设置true表示不能将一个Connection中生产者发送的消息发送给这个Connection中的消费者。
  • exclusive:设置是否排他,确保该队列仅对创建他的消费者可见。
  • arguments:设置消费者的其他参数
  • callback:设置消费者的回调函数。

每个Channel都拥有自己独立的线程,最常用的做法是一个Channel对应一个消费者,也就意味着消费者彼此之间没有关联,也可以在Channel中维持多个消费者,但是,如果Channel中一个消费者一直在运行,那其他消费者的callback会被耽搁。
在这里插入图片描述

拉模式

通过channel.basicGet方法可以单条获取消息,其返回值是GetResponse。核心方法如下:

GetResponse basicGet(String queue,boolean autoAck) throws IOException;

如果autoAck为false,那么同样需要调用channel.basicAck来确认消息已经被成功接受。

GetResponse response = channel.basicGet(QueueName,false);

System.out.println(new String(response.getBody()));

channel.basicAck(response.getEnvelope().getDeliveryTag(),false);

注意:Basic.Consume将信道(Channel)置为接收模式,直到取消队列的订阅为止,接收模式期间,RabbitMQ会不断将消息推送给消费者,推送消息的个数受到Basic.QoS的限制。如果只想从队列里获取单条消息而不是持续订阅,建议使用Basic.Get进行消费。但是不能将Basic.Get放在一个循环里代替Basic.Consume,这样做会严重影响MQ性能。
在这里插入图片描述

消费端的确认和拒绝

消息确认

RabbitMQ为了保证消息从队列可靠的到达消费者,提供了消息确认机制。消费者在订阅队列时,可以指定autoAck参数,当autoAck等于false时,RabbitMQ会等待消费者显示的回复确认信号后才从内存(或者磁盘)中移除消息(即使消息配置了持久化,在被ack以后仍然要被删除)。当autoAck等于true时,RabbitMQ会自动把发送出去的消息设置为确认,然后从内存中删除,而不管消费者是否真正消费这些消息。

把消息确认设置为false,消费者就有足够的时间处理消息,不用担心处理过程中消费者进程挂掉后消息丢失的问题,因为RabbitMQ会一直等待持有消息直到消费者显示调用Basic.Ack命令为止。

当autoAck参数置为false,对于MQ服务端而言,队列中的消息分成了两个部分:

  1. 等待投递给消费者的消息
  2. 已经投递给消费者,但是还没有收到消费者确认信号的消息:如果一直没有收到确认信号,消费此消息的消费者已经断开连接,则MQ会重新安排该消息进入队列,等待投递给下一个消费者。

MQ不会为未确认的消息设置过期时间,他判断此消息是否需要重新投递给消费者的唯一依据是消费该消息的消费者连接是否已经断开。如此设计,是因为RabbitMQ允许消费者消费一条消息的时间可以很久很久。

消息拒绝

Channel类中的basicReject方法定义如下

void basicReject(long deliveryTag,boolean requeue) throws IOException;

其中,deliveryTag可以看作消息的编号,如果requeue为true,则RabbitMQ会重新将这条消息存入队列,以便发送给下一个订阅的消费者。

如果想要批量拒绝消息,可以使用Basic.Nack这个命令。

void basicNack(long deliveryTag,boolean multiple,boolean requeue) throws IOException;

multiple参数为false表示只拒绝编号为deliveryTag的这条消息,如果设置为true,表示拒绝编号deliveryTag之前所有未被消费者确认的消息。

关闭连接

在应用程序使用完毕之后,需要关闭连接,释放资源

channel.close();
connection.close();

AMQP协议中connection和channel采用同样的方式来管理网络失败、内部错误和显示关闭连接。connection和channel所具备的生命周期如下:

  • Open:开启状态,代表当前对象可以使用
  • Closing:正在关闭状态,当前对象被显示通知调用关闭方法,这样就产生了一个关闭请求让其内部对象进行相应的操作,并等待这些关闭操作的完成
  • Closed:已经关闭,当前对象已经接收到所有内部对象以完成关闭动作的通知,并且其也关闭了自身。

与关闭操作相关的方法

  • addShutdownListener(ShutdownListener listener): 当connection或者channel状态转变为closed的时候调用ShutdownListener,而如果将一个ShutdownListener注册到一个已经处于Closed状态的对象(特指Connection或者Channel对象),会立刻调用ShutdownListener。
  • removeShutdownListener(ShutdownListener listener)
  • getCloseReason: 获取connection或者channel关闭原因
  • isOpen:检测当前对象是否开启
  • close(int closeCode,String closeMessage):显示通知连接执行关闭操作。

代码清单
在这里插入图片描述
当触发ShutdownListener时候,可以获取到ShutdownSingalException,这个信号包含了关闭的原因。

ShutdownSingalException 提供多个方法来分析关闭原因,isHardError方法可以知道是Connection错误还是Channel错误;getReason可以获取Cause相关的信息。

在这里插入图片描述

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.coloradmin.cn/o/1844631.html

如若内容造成侵权/违法违规/事实不符,请联系多彩编程网进行投诉反馈,一经查实,立即删除!

相关文章

React 中的服务器渲染组件

在前后分离架构以前&#xff0c;所有的 Html 业务都是后端渲染&#xff0c;返回前前端显示&#xff0c;后端渲染把前后端逻辑耦合在一起&#xff0c;增大系统的复杂度&#xff0c;不易于扩展。React 中的 Server组件&#xff0c;准确的说是服务器进行渲染&#xff0c;无论是什么…

物联网边缘网关在物联网应用中有哪些优势?天拓四方

随着物联网技术的快速发展&#xff0c;越来越多的设备接入网络&#xff0c;数据交互日益频繁&#xff0c;对数据处理和传输的要求也越来越高。在这样的背景下&#xff0c;物联网边缘网关应运而生&#xff0c;以其低延迟、减少带宽消耗、提高数据质量和安全性等优势&#xff0c;…

小林图解系统-三、操作系统结构

Linux 内核 vs Windows 内核 内核 作为应用连接硬件设备的桥梁&#xff0c;保证应用程序只需要关心与内核交互&#xff0c;不需要关心硬件的细节 内核具备四个基本能力&#xff1a; 管理进程、线程&#xff0c;决定哪个进程、线程使用CPU&#xff0c;也就是进程调度的能力&a…

ArcGIS查找相同图斑、删除重复图斑

​ 点击下方全系列课程学习 点击学习—>ArcGIS全系列实战视频教程——9个单一课程组合系列直播回放 点击学习——>遥感影像综合处理4大遥感软件ArcGISENVIErdaseCognition 这次是上次 今天分享一下&#xff0c;很重要却被大家忽略的两个工具 这两个工具不仅可以找出属性…

【总线】AXI4第二课时:深入AXI4总线的基础事务

大家好,欢迎来到今天的总线学习时间!如果你对电子设计、特别是FPGA和SoC设计感兴趣&#xff0c;那你绝对不能错过我们今天的主角——AXI4总线。作为ARM公司AMBA总线家族中的佼佼者&#xff0c;AXI4以其高性能和高度可扩展性&#xff0c;成为了现代电子系统中不可或缺的通信桥梁…

PHP转Go系列 | 条件循环的使用姿势

大家好&#xff0c;我是码农先森。 条件 在 PHP 语言中条件控制语句&#xff0c;主要有 if、elseif、else 和 switch 语句 // if、elseif、else 语句 $word "a"; if ($word "a") {echo "a"; } elseif ($word "b") {echo "b&…

【论文笔记】LoRA LOW-RANK ADAPTATION OF LARGE LANGUAGE MODELS

题目&#xff1a;LoRA: LOW-RANK ADAPTATION OF LARGE LANGUAGE MODELS 来源: ICLR 2022 模型名称: LoRA 论文链接: https://arxiv.org/abs/2106.09685 项目链接: https://github.com/microsoft/LoRA 文章目录 摘要引言问题定义现有方法的问题方法将 LORA 应用于 Transformer 实…

无源编缆测尺助力料场实现自动化堆取料作业

随着工业4.0时代的到来&#xff0c;智能化、无人化成为现代工业发展的重要趋势。在港口码头、钢铁冶金、焦化等高耗能行业中&#xff0c;如何实现物料的精准测量与无人化操作&#xff0c;成为企业提高生产效率、降低人工成本的关键。武汉市微深节能科技有限公司凭借其先进的分段…

【面试干货】抽象类与接口的区别

【面试干货】抽象类与接口的区别 &#x1f496;The Begin&#x1f496;点点关注&#xff0c;收藏不迷路&#x1f496; 在Java编程中&#xff0c;抽象类和接口是两个非常重要的概念&#xff0c;它们都为代码的可扩展性和复用性提供了基础。但是&#xff0c;它们之间也有一些明显…

class的流光效果

效果图&#xff1a; 代码示例 <!DOCTYPE html> <html lang"zh"><head><meta charset"UTF-8"><meta name"viewport" content"widthdevice-width, initial-scale1.0"><title>Document</title&g…

React路由笔记(函数组件,自用)

配置 npm i react-router-dom基本使用 目录结构 在src中创建page文件夹放置各页面组件&#xff0c;router中放置路由 1、router中配置路由 在/router/index.js中&#xff0c;使用createBrowserRouter配置路由。 import { createBrowserRouter } from "react-router…

[Python人工智能] 四十六.PyTorch入门 (1)环境搭建、神经网络普及和Torch基础知识

从本专栏开始,作者正式研究Python深度学习、神经网络及人工智能相关知识。前文讲解合如何利用keras和tensorflow构建基于注意力机制的CNN-BiLSTM-ATT-CRF模型,并实现中文实体识别研究。这篇文章将介绍PyTorch入门知识。前面我们的Python人工智能主要以TensorFlow和Keras为主,…

Flutter第十四弹 抽屉菜单效果

目标&#xff1a; 1.怎么构建抽屉菜单效果&#xff1f; 2.抽屉菜单怎么定制&#xff1f; 一、抽屉菜单 侧滑抽屉菜单效果 1.1 抽屉菜单入口 Flutter 的脚手架Scaffold&#xff0c;默认提供了抽屉菜单效果入口。 主页面采用一个简单的页面&#xff0c;侧滑菜单首先使用一个I…

三星与SK海力士:以混合键合技术引领3D DRAM革新之路

在高速缓存内存&#xff08;HBM&#xff09;领域持续领跑的三星与SK海力士&#xff0c;正以混合键合技术为突破口&#xff0c;开启3D DRAM技术的新纪元。这一战略转型不仅预示着存储技术的深度革新&#xff0c;更体现了两大半导体巨头在提高集成度、优化性能与成本上的不懈追求…

【计算机网络篇】数据链路层(11)在数据链路层扩展以太网

文章目录 &#x1f354;使用网桥在数据链路层扩展以太网&#x1f95a;网桥的主要结构和基本工作原理&#x1f388;网桥的主要结构&#x1f50e;网桥转发帧的例子&#x1f50e;网桥丢弃帧的例子&#x1f50e;网桥转发广播帧的例子 &#x1f95a;透明网桥&#x1f50e;透明网桥的…

图论——代码随想录打卡

1 DFS深度搜索算法 深度优先搜索算法是从一个方向去进行搜索&#xff0c;直到遇到走不下去的终点&#xff0c;再进行回溯更换方向&#xff0c;重新进行搜索。因此有回溯也就意味着存在递归&#xff1a; void dfs(参数&#xff09;{处理节点dfs(图&#xff0c;选择的节点)回溯…

Hi3861 OpenHarmony嵌入式应用入门--0.96寸液晶屏 iic驱动ssd1306

使用iic驱动ssd1306&#xff0c;代码来源hihope\hispark_pegasus\demo\12_ssd1306 本样例提供了一个HarmonyOS IoT硬件接口的SSD1306 OLED屏驱动库&#xff0c;其功能如下&#xff1a; 内置了128*64 bit的内存缓冲区&#xff0c;支持全屏刷新;优化了屏幕刷新速率&#xff0c;…

AI发展核心要素之一(算力)

背景&#xff1a; 当今时代&#xff0c;云计算、人工智能、视频会议、短视频和各种社交媒体等行业蓬勃兴起&#xff0c;而ChatGPT-OpenAI的一次又一次的版本更新和迭代更是将我们带入了AI时代的新纪元。在2023年底的华为全联接大会上&#xff0c;孟晚舟就曾在演讲中表示:“算力…

云计算【第一阶段(17)】账号和权限管理

目录 一、用户账号和组账号概述 1.1、用户账号的三种角色 1.2、组账号的两个角色 二、用户账号文件 2.1、/etc/passwd 2.2、/etc/shadow 2.3、chage 命令 三、组账号文件 3.1、/etc/group 3.2、/etc/gshadow 四、添加组账户 4.1、添加删除组成员 4.2、删除组账号 …

北航数据结构与程序设计查找与排序编程题

查找与排序编程题 单词查找&#xff08;查找——基本题&#xff09;排座位&#xff08;简&#xff09;a 单词查找&#xff08;查找——基本题&#xff09; 【问题描述】 从标准输入中读入一个英文单词及查找方式&#xff0c;在一个给定的英文常用单词字典文件dictionary3000.…