RabbitMQ 总结二(MQ原理 通信方式 消息应答机制)

news2025/2/26 5:35:22

目录

MQ的构成

生产者

交换机

队列

消费者

通信方式

Producer -> Broker (包含Exchange)

Exchange -> Binding -> Queue -> Consumer

消息应答

为什么引入消息应答

消息自动重新入队

如何进行消息应答

案例Demo


MQ的构成

生产者 消费者 交换机和队列

生产者

产生数据发送消息的程序是生产者

交换机

交换机是 RabbitMQ 非常重要的一个部件,一方面它接收来自生产者的消息,另一方面它将消息
推送到队列中。交换机必须确切知道如何处理它接收到的消息,是将这些消息推送到特定队列还是推送到多个队列,亦或者是把消息丢弃,这个得有交换机类型决定。

队列

队列是 RabbitMQ 内部使用的一种数据结构,尽管消息流经 RabbitMQ 和应用程序,但它们只能存
储在队列中。队列仅受主机的内存和磁盘限制的约束,本质上是一个大的消息缓冲区。许多生产者可以将消息发送到一个队列,许多消费者可以尝试从一个队列接收数据。这就是我们使用队列的方式

消费者

消费与接收具有相似的含义。消费者大多时候是一个等待接收消息的程序。请注意生产者,消费
者和消息中间件很多时候并不在同一机器上。同一个应用程序既可以是生产者又是可以是消费者。

通信方式

RabbitMQ是基于AMQP协议来实现的消息中间件。AMQP,类似于HTTP协议,也是一个应用层的协议,网络层使用TCP来通信。因此,RabbitMQ也是典型的C-S模型,准确地说是C-S-C模型,因为伴随着RabbitMQ的使用,总是会有Producer与Consumer两个Client和一个Broker Server。


Client要与Server进行通信,就必须先建立连接,RabbitMQ中有Connection与Channel两个概念,前者就是一个TCP连接,后者是在这个连接上的虚拟概念,负责逻辑上的数据传递,因此,为了节省资源,一般在一个客户端中建立一个Connection,每次使用时再分配一个Channel即可。一个Connection可以有多个Channel。

Producer -> Broker (包含Exchange)

Broker 在英文中作动词是安排,协商的意思,名词是掮客。可以理解为中转 分发的功能。

Exchange -> Binding -> Queue -> Consumer

Exchange
message 到达 broker 的第一站,根据分发规则,匹配查询表中的 routing key,分发
消息到 queue 中去。常用的类型有:direct (point-to-point), topic (publish-subscribe) and fanout
(multicast)

Binding

exchange 和 queue 之间的虚拟连接,binding 中可以包含 routing key,Binding 信息被保
存到 exchange 中的查询表中,用于 message 的分发依据

Queue

消息最终被送到这里等待 consumer 取走
 

消息应答

为什么引入消息应答

消费者完成一个任务可能需要一段时间,如果其中一个消费者处理一个长的任务并仅只完成
了部分突然它挂掉了,会发生什么情况。RabbitMQ 一旦向消费者传递了一条消息,便立即将该消
息标记为删除。在这种情况下,突然有个消费者挂掉了,我们将丢失正在处理的消息,以及后续
发送给该消费这的消息,因为它无法接收到。
为了保证消息在发送过程中不丢失,rabbitmq 引入消息应答机制,消息应答就是:消费者在接
收到消息并且处理该消息之后,告诉 rabbitmq 它已经处理了,rabbitmq 可以把该消息删除了。

也可以配置消息的自动应答,不过这种模式仅适用在消费者可以高效地处理这些消息的情况下使用

如果消息手动应答的时候出现了某种状况导致应答消息没有被收到,那么broker不会删除该消息,而是会将其重新入队。

消息自动重新入队

如果消费者由于某些原因失去连接(其通道已关闭,连接已关闭或 TCP 连接丢失),导致消息
未发送 ACK 确认,RabbitMQ 将了解到消息未完全处理,并将对其重新排队。如果此时其他消费者可以处理,它将很快将其重新分发给另一个消费者。这样,即使某个消费者偶尔死亡,也可以确
保不会丢失任何消息。

如何进行消息应答

案例Demo

本来是由 worker2 处理的消息,因为worker2内部发生了异常而导致没有手动消息应答,broker没有收到应答消息于是将消息重新入队,重新分发进行处理。


public class Task02 {
    private static final String TASK_QUEUE_NAME = "ack_queue";

    public static void main(String[] argv) throws Exception {
        try (Channel channel = RabbitMqUtils.getChannel()) {

            boolean durable = true;
            channel.queueDeclare(TASK_QUEUE_NAME, durable, false, false, null);
            Scanner sc = new Scanner(System.in);
            System.out.println("请输入信息");
            while (sc.hasNext()) {
                String message = sc.nextLine();
                channel.basicPublish("", TASK_QUEUE_NAME, MessageProperties.PERSISTENT_TEXT_PLAIN, message.getBytes("UTF-8"));
                System.out.println("生产者发出消息" + message);
            }
        }
    }
}

public class Work01 {
    private static final String ACK_QUEUE_NAME="ack_queue";
    public static void main(String[] args) throws Exception {
        Channel channel = RabbitMqUtils.getChannel();
        System.out.println("C1 等待接收消息处理时间较短");
        //消息消费的时候如何处理消息
        DeliverCallback deliverCallback=(consumerTag, delivery)->{
            String message= new String(delivery.getBody());
            SleepUtils.sleep(1);
            System.out.println("接收到消息:"+message);
            channel.basicAck(delivery.getEnvelope().getDeliveryTag(),false);
        };
        //采用手动应答
        boolean autoAck=false;
        channel.basicConsume(ACK_QUEUE_NAME,autoAck,deliverCallback,(consumerTag)->{
            System.out.println(consumerTag+"消费者取消消费接口回调逻辑");
        });
    }
}

public class Work02 {
    private static final String ACK_QUEUE_NAME = "ack_queue";

    public static void main(String[] args) throws Exception {
        Channel channel = RabbitMqUtils.getChannel();
        //消息消费的时候如何处理消息
        DeliverCallback deliverCallback = (consumerTag, delivery) -> {
            String message = new String(delivery.getBody());
            System.out.println(" 出错了,消息没有应答,此时消息会重新入队交给 其他worker处理");
            int i = 1/0;
            System.out.println("接收到消息:" + message);
            channel.basicAck(delivery.getEnvelope().getDeliveryTag(), false);
        };
        //采用手动应答
        boolean autoAck = false;
        channel.basicConsume(ACK_QUEUE_NAME, autoAck, deliverCallback, (consumerTag) -> {
            System.out.println(consumerTag + "消费者取消消费接口回调逻辑");
        });
    }
}

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.coloradmin.cn/o/151359.html

如若内容造成侵权/违法违规/事实不符,请联系多彩编程网进行投诉反馈,一经查实,立即删除!

相关文章

【学习笔记之Linux】工具之yum

yum是Linux的软件包管理器。   什么是软件包?在Linux中安装软件,可以通过下载程序源码,然后编译得到可执行程序。但是这样非常麻烦,于是就有人把常用的软件编译好之后做成软件包,然后把软件包放在一个服务器上。   …

redis常见面试题

redis常见面试题 redis集群转载于:https://blog.csdn.net/sun_lm/article/details/123467103 redis的几个数据结构的应用场景借鉴于:https://blog.csdn.net/weixin_51299478/article/details/125204374 1. redis的作用 redis的作用主要就是两个&…

数据结构——串

串又称字符串,是由零个或多个字符组成的有限序列,是一种特殊的线性表。由串中若干个连续字符组成的子序列称为子串。 利用字符数组或字符指针表示串: char str1[] { a,b,c,d,\0 }; char str2[] "abcdef"; char* str3 str1; 上…

Java设计模式之单例模式

这一篇,我们来介绍下设计模式最简单的一个模式,单例模式。 二、释义以及实战 2.1 单例模式的定义 单例模式,英文:Singleton Pattern,英文解释:Ensure a class has only instance,and provide a global point of acce…

黑马2022新版SSM框架教程(SpringMVC_day02)

SpringMVC_day02 文章目录SpringMVC_day021,SSM整合1.1 流程分析1.2 整合配置步骤1:创建Maven的web项目步骤2:添加依赖步骤3:创建项目包结构步骤4:创建SpringConfig配置类步骤5:创建JdbcConfig配置类步骤6:创建MybatisConfig配置类步骤7:创建jdbc.proper…

Vue(十二)

1. TodoList案例自定义事件 //App.vue <template><div id"root"><div class"todo-container"><div class"todo-wrap"><!-- addTodo添加自定义事件 --><MyHeader addTodo"addTodo"/><MyList …

Spring AOP详解

1.什么是 Spring AOP&#xff1f; AOP&#xff08;Aspect Oriented Programming&#xff09;&#xff1a;⾯向切⾯编程&#xff0c;它是⼀种思想&#xff0c;它是对某⼀类事情的 集中处理。⽐如⽤户登录权限的效验&#xff0c;没学 AOP 之前&#xff0c;我们所有需要判断⽤户登…

YACC移进规约冲突案例分析(二)output中状态机转移步骤详解

案例 calc.y %union {int ival;const char *sval; } %token <ival> NUM %nterm <ival> exp %token <sval> STR %nterm <sval> useless %left - %left * %% exp:exp exp | exp - exp | exp * exp | exp / exp | NUM ; useless: STR; %%编译 $ biso…

恭喜龙蜥获得中国开源云联盟2022年度中国“最佳开源实践案例”和“杰出开源贡献者”奖项

近日&#xff0c;由工信部中国电子技术标准化研究院主办的 2022 木兰峰会在北京圆满举办&#xff0c;峰会上正式公布了中国开源云联盟(China Open Source Cloud League&#xff0c;简称“COSCL”) 2022 年度评选名单&#xff0c;龙蜥社区荣获中国“最佳开源实践案例”和“杰出开…

仪器设备使用

NI DcpowerSwitchDigitalDMMFgenScope名称直流电源&#xff08;SMU&#xff09;继电器PPMU数字万用表信号发生器示波器版本PXI-4147PXI-2567PXI-6571PXI-4070PXI-4463PXI-5160 1.Scope 示波器是一种电子测量仪器&#xff0c;可以在无干扰的情况下监控输入信号&#xff0c;随后…

Go结构体(struct)

文章目录Struct定义struct构造struct实例struct的值和指针在与函数共用时&#xff1a;匿名字段和嵌套struct嵌套struct的名称冲突问题Struct 是一个值类型的 定义struct type identifier struct {field1 type1field2 type2… } // 或者 type T struct { a, b int }理论上&am…

JAVA多线程初阶(1)

目录JAVA多线程(1)1.Thread类创建与使用1.1 继承Thread类1.2 实现并发关于sleep()1.3 Runnable创建线程1.4 匿名内部类创建线程1.5 lamda表达式创建线程2.多线程提高效率3.Thread类属性和方法3.1 Thread(String name)3.2 isDaemon()3.3 isAlive()3.3 线程的重要方法3.4 中断线程…

数据结构:图

文章目录图内存中存储图数据结构邻接矩阵存储方法用邻接矩阵&#xff08;Adjacency Matrix&#xff09;来表示一个图的缺点&#xff1a;浪费空间优点邻接表存储方法&#xff08;Adjacency List&#xff09;广度优先算法Breadth-First-Search&#xff08;BFS&#xff09;深度优先…

Android——GT库-日志工具

GT库在创造出来初期&#xff0c;里面的日志工具就一直存在的&#xff0c;经历了很久的迭代变更&#xff0c;当目前的最新版本&#xff0c;日志工具已经创造出更高级的调试日志方式了&#xff0c;接下来咋们来看看GT库中的日志工具具体使用方法吧。 使用GT库里的&#xff0c;当然…

web表单设计器的优点体现在哪?

在数字化管理越来越规范的当下&#xff0c;拥有一款优质高效的低代码开发平台&#xff0c;确实能给企业提质增效带来更大的帮助。很多客户朋友会问道&#xff1a;web表单设计器都有哪些特点&#xff1f;为什么能在企业的现代化办公管理中起到巨大的作用&#xff1f;今天&#x…

Linux终端远程工具xshell,xftp,mobasterm

目录 软件介绍 1.xshell 第一步&#xff1a; 第二步&#xff1a; 第三步&#xff1a; 第四步&#xff1a; 第5步&#xff1a; 2.xftp 第一步&#xff1a; 第二部&#xff1a; 第三步&#xff1a; 3.mobasterm 全能终端神器——MobaXterm 第一步&#xff1a; 第二步&a…

C1083无法打开包括文件: “atlbase.h”: No such file or directory

在打开别人的项目的过程中遇到了“atlbase.h”无法打开的问题&#xff0c;在此记录一下。1.下载ATL生成工具与缓解只下载ATL生成工具后面还会报错&#xff0c;直接下载下载ATL生成工具与缓解一步到位。下载的入口在&#xff1a;工具--->获取工具与功能。需要注意的是&#x…

Guitar Pro2023Win/Mac中文吉他/贝斯打谱识谱软件

Guitar Pro 是一款曲谱阅读器。以 GTP 结尾的曲谱文件都必须用 Guitar Pro 才能打开。Guitar Pro 凭借着其便利的制谱和读曲谱环境&#xff0c;在各大谱库论坛里都占据着一席之地&#xff0c;喜欢吉他的朋友一定略有耳闻。早几年该作者将它移植到了移动平台&#xff0c;现在你也…

7-2国王游戏

题目&#xff1a; 恰逢 H 国国庆&#xff0c;国王邀请 n 位大臣来玩一个有奖游戏。 首先&#xff0c;他让每个大臣在左、右手上面分别写下一个整数&#xff0c;国王自己也在左、右手上各写一个整数。 然后&#xff0c;让这 n 位大臣排成一排&#xff0c;国王站在队伍的最前面。…

应用层——Web和HTTP

目录 1. HTTP概况 1.1 Web页面简介 1.2 URL-统一资源定位器 1.3 HTTP协议 2. HTTP连接的两种类型 2.1 HTTP非持久性连接(Non-persistent HTTP) 2.2 HTTP持久性连接(Persistent HTTP) 2.2.1 无流水(pipelining)的持久性连接 2.2.2 带有流水机制的持久性连接 3. HT…