1、RabbitMQ routing路由模式
1、 消息生产者将消息发送给交换机按照路由判断,路由是字符串(info) 当前产生的消息携带路由字符(对象的方法),交换机根据路由的key,只能匹配上路由key对应的消息队列,对应的消费者才能消费消息;
2、 根据业务功能定义路由字符串
3、 从系统的代码逻辑中获取对应的功能字符串,将消息任务扔到对应的队列中。
4、 业务场景:error 通知;EXCEPTION;错误通知的功能;传统意义的错误通知;客户通知;利用key路由,可以将程序中的错误封装成消息传入到消息队列中,开发者可以自定义消费者,实时接收错误;
2、消息怎么路由?
消息提供方->路由->一至多个队列消息发布到交换器时,消息将拥有一个路由键(routing key),在消息创建时设定。通过队列路由键,可以把队列绑定到交换器上。消息到达交换器后,RabbitMQ 会将消息的路由键与队列的路由键进行匹配(针对不同的交换器有不同的路由规则);
常用的交换器主要分为一下三种:
1、 fanout:如果交换器收到消息,将会广播到所有绑定的队列上
2、 direct:如果路由键完全匹配,消息就被投递到相应的队列
3、 topic:可以使来自不同源头的消息能够到达同一个队列。 使用 topic 交换器时,可以使用通配符
3、RabbitMQ publish/subscribe发布订阅(共享资源)
1、 每个消费者监听自己的队列;
2、 生产者将消息发给broker,由交换机将消息转发到绑定此交换机的每个队列,每个绑定交换机的队列都将接收到消息。
4、能够在地理上分开的不同数据中心使用 RabbitMQ cluster 么?
不能。第一,你无法控制所创建的 queue 实际分布在 cluster 里的哪个 node 上(一般使用 HAProxy + cluster 模型时都是这样),这可能会导致各种跨地域访问时的常见问题;第二,Erlang 的 OTP 通信框架对延迟的容忍度有限,这可能会触发各种超时,导致业务疲于处理;第三,在广域网上的连接失效问题将导致经典的“脑裂”问题,而RabbitMQ 目前无法处理(该问题主要是说 Mnesia)。
5、RabbitMQ有那些基本概念?
1、 Broker:简单来说就是消息队列服务器实体
2、 Exchange:消息交换机,它指定消息按什么规则,路由到哪个队列
3、 Queue:消息队列载体,每个消息都会被投入到一个或多个队列
4、 Binding:绑定,它的作用就是把exchange和queue按照路由规则绑定起来
5、 Routing Key:路由关键字,exchange根据这个关键字进行消息投递
6、 VHost:vhost 可以理解为虚拟 broker ,即 mini-RabbitMQ server。其内部均含有独立的 queue、exchange 和 binding 等,但最最重要的是,其拥有独立的权限系统,可以做到 vhost 范围的用户控制。当然,从 RabbitMQ 的全局角度,vhost 可以作为不同权限隔离的手段(一个典型的例子就是不同的应用可以跑在不同的 vhost 中)。
7、 Producer:消息生产者,就是投递消息的程序
8、 Consumer:消息消费者,就是接受消息的程序
9、 Channel:消息通道,在客户端的每个连接里,可建立多个channel,每个channel代表一个会话任务
由Exchange、Queue、RoutingKey三个才能决定一个从Exchange到Queue的唯一的线路。
6、什么情况下会出现 blackholed 问题?
blackholed 问题是指,向 exchange 投递了 message ,而由于各种原因导致该message 丢失,但发送者却不知道。可导致 blackholed 的情况:1.向未绑定 queue 的exchange 发送 message;2.exchange 以 binding_key key_A 绑定了 queue queue_A,但向该 exchange 发送 message 使用的 routing_key 却是 key_B。
7、什么是消费者Consumer?
消费消息,也就是接收消息的一方。
消费者连接到RabbitMQ服务器,并订阅到队列上。消费消息时只消费消息体,丢弃标签。
8、消息如何分发?
l>round-robin 轮 询分发)
若该队列至少有一个消费者订阅,消息将以循环的方式发送给消费者。每条消息只 会分发给一个订阅的消费者(前提是消费者能够正常处理消息并进行确认)。通过 路由可实现多消费的功能
2>Fair dispatch (公平分发) 使用basicQos(prefetchcount = 1)方法,来限制RabbitMQ只发不超过1条的消息 给同一个消费者。当消息处理完毕后,有了反馈,才会进行第二次发送。
注:使用公平分发,必须关闭自动应答,改为手动应答
9、Basic.Reject 的用法是什么?
该信令可用于 consumer 对收到的 message 进行 reject 。若在该信令中设置
requeue=true,则当 RabbitMQ server 收到该拒绝信令后,会将该 message 重新发送到下一个处于 consume 状态的 consumer 处(理论上仍可能将该消息发送给当前consumer)。若设置 requeue=false ,则 RabbitMQ server 在收到拒绝信令后,将直接将该message 从 queue 中移除。
另外一种移除 queue 中 message 的小技巧是,consumer 回复 Basic.Ack 但不对获取到的message 做任何处理。而 Basic.Nack 是对 Basic.Reject 的扩展,以支持一次拒绝多条 message 的能力。
10、什么是Binding绑定?
通过绑定将交换器和队列关联起来,一般会指定一个BindingKey,这样RabbitMq就知道如何正确路由消息到队列了。
11、如何解决消息队列的延时以及过期失效问题?消息队列满了以后该怎么处理?有几百万消息持续积压几小时,怎么办?
消息积压处理办法:临时紧急扩容:
1、 先修复 consumer 的问题,确保其恢复消费速度,然后将现有 cnosumer 都停掉。
2、 新建一个 topic,partition 是原来的 10 倍,临时建立好原先 10 倍的 queue 数量。
3、 然后写一个临时的分发数据的 consumer 程序,这个程序部署上去消费积压的数据,消费之后不做耗时的处理,直接均匀轮询写入临时建立好的 10 倍数量的 queue。
4、 接着临时征用 10 倍的机器来部署 consumer,每一批 consumer 消费一个临时 queue 的数据。这种做法相当于是临时将 queue 资源和 consumer 资源扩大 10 倍,以正常的 10 倍速度来消费数据。
5、 等快速消费完积压数据之后,得恢复原先部署的架构,重新用原先的 consumer 机器来消费消息。
6、 MQ中消息失效:假设你用的是 RabbitMQ,RabbtiMQ 是可以设置过期时间的,也就是 TTL。如果消息在 queue 中积压超过一定的时间就会被 RabbitMQ 给清理掉,这个数据就没了。那这就是第二个坑了。这就不是说数据会大量积压在 mq 里,而是大量的数据会直接搞丢。我们可以采取一个方案,就是批量重导,这个我们之前线上也有类似的场景干过。就是大量积压的时候,我们当时就直接丢弃数据了,然后等过了高峰期以后,比如大家一起喝咖啡熬夜到晚上12点以后,用户都睡觉了。这个时候我们就开始写程序,将丢失的那批数据,写个临时程序,一点一点的查出来,然后重新灌入 mq 里面去,把白天丢的数据给他补回来。也只能是这样了。假设 1 万个订单积压在 mq 里面,没有处理,其中 1000 个订单都丢了,你只能手动写程序把那 1000 个订单给查出来,手动发到 mq 里去再补一次。
mq消息队列块满了:
如果消息积压在 mq 里,你很长时间都没有处理掉,此时导致 mq 都快写满了,咋办?这个还有别的办法吗?没有,谁让你第一个方案执行的太慢了,你临时写程序,接入数据来消费,消费一个丢弃一个,都不要了,快速消费掉所有的消息。然后走第二个方案,到了晚上再补数据吧。
12、为什么说保证 message 被可靠持久化的条件是 queue 和 exchange 具有durable 属性,同时 message 具有 persistent 属性才行?
binding 关系可以表示为 exchange – binding – queue 。从文档中我们知道,若要求投递的 message 能够不丢失,要求 message 本身设置 persistent 属性,要求 exchange和 queue 都设置 durable 属性。
其实这问题可以这么想,若 exchange 或 queue 未设置durable 属性,则在其 crash 之后就会无法恢复,那么即使 message 设置了 persistent 属性,仍然存在 message 虽然能恢复但却无处容身的问题;同理,若 message 本身未设置persistent 属性,则 message 的持久化更无从谈起。
13、如何自动删除长时间没有消费的消息?
// 通过队列属性设置消息过期时间
Map<String, Object> argss = new HashMap<String, Object>();
argss.put("x-message-ttl",6000);
// 对每条消息设置过期时间
AMQP.BasicProperties properties = new AMQP.BasicProperties.Builder()
.expiration("10000") // TTL
14、RabbitMQ 概念里的 channel、exchange 和 queue 这些东东是逻辑概念,还是对应着进程实体?这些东东分别起什么作用?
queue 具有自己的 erlang 进程;exchange 内部实现为保存 binding 关系的查找表;channel 是实际进行路由工作的实体,即负责按照 routing_key 将 message 投递给queue 。由 AMQP 协议描述可知,channel 是真实 TCP 连接之上的虚拟连接,所有AMQP 命令都是通过 channel 发送的,且每一个 channel 有唯一的 ID。
一个 channel 只能被单独一个操作系统线程使用,故投递到特定 channel 上的 message 是有顺序的。但一个操作系统线程上允许使用多个 channel 。channel 号为 0 的 channel 用于处理所有对于当前 connection 全局有效的帧,而 1-65535 号 channel 用于处理和特定 channel 相关的帧。AMQP 协议给出的 channel ,其中每一个 channel 运行在一个独立的线程上,多线程共享同一个 socket。
15、消费者获取消息的方式?
推
拉
16、使用RabbitMQ有什么好处?
1、 解耦,系统A在代码中直接调用系统B和系统C的代码,如果将来D系统接入,系统A还需要修改代码,过于麻烦!
2、 异步,将消息写入消息队列,非必要的业务逻辑以异步的方式运行,加快响应速度
3、 削峰,并发量大的时候,所有的请求直接怼到数据库,造成数据库连接异常
17、mq的缺点
(1) 系统可用性降低
系统引入的外部依赖越多,越容易挂掉,本来你就是A系统调用BCD三个系统的 接口就好了,人ABCD四个系统好好的,没啥问题,你偏加个MQ进来,万一MQ 挂了咋整? MQ挂了,整套系统崩溃了,你不就完了么。
(2) 系统复杂性提高
硬生生加个MQ进来,你怎么保证消息没有重复消费?怎么处理消息丢失的情况? 怎么保证消息传递的顺序性?头大头大,问题一大堆,痛苦不已
(3) 一致性问题
A系统处理完了直接返回成功了,人都以为你这个请求就成功了;但是问题是,要 是BCD三个系统那里,BD两个系统写库成功了,结果C系统写库失败了,咋整? 你这数据就不一致了。
18、死信队列?
DLX,全称为 Dead-Letter-Exchange,死信交换器,死信邮箱。当消息在一个队列中变成死信 (dead message) 之后,它能被重新被发送到另一个交换器中,这个交换器就是 DLX,绑定 DLX 的队列就称之为死信队列。
19、消息传输保证层级?
At most once:最多一次。消息可能会丢失,单不会重复传输。
At least once:最少一次。消息觉不会丢失,但可能会重复传输。
Exactly once: 恰好一次,每条消息肯定仅传输一次。
20、生产者消息如何运转?
1、 Producer先连接到Broker,建立连接Connection,开启一个信道(Channel)。
2、 Producer声明一个交换器并设置好相关属性。
3、 Producer声明一个队列并设置好相关属性。
4、 Producer通过路由键将交换器和队列绑定起来。
5、 Producer发送消息到Broker,其中包含路由键、交换器等信息。
6、 相应的交换器根据接收到的路由键查找匹配的队列。
7、 如果找到,将消息存入对应的队列,如果没有找到,会根据生产者的配置丢弃或者退回给生产者。
8、 关闭信道。
9、 管理连接。
21、vhost 是什么?起什么作用?
vhost 可以理解为虚拟 broker ,即 mini-RabbitMQ server。其内部均含有独立的queue、exchange 和 binding 等,但最最重要的是,其拥有独立的权限系统,可以做到vhost 范围的用户控制。
当然,从 RabbitMQ 的全局角度,vhost 可以作为不同权限隔离的手段(一个典型的例子就是不同的应用可以跑在不同的 vhost 中)。
22、AMQP协议3层?
1、 Module Layer:协议最高层,主要定义了一些客户端调用的命令,客户端可以用这些命令实现自己的业务逻辑。
2、 Session Layer:中间层,主要负责客户端命令发送给服务器,再将服务端应答返回客户端,提供可靠性同步机制和错误处理。
3、 TransportLayer:最底层,主要传输二进制数据流,提供帧的处理、信道服用、错误检测和数据表示等。
23、RabbitMQ topic 主题模式(路由模式的一种)
1、 星号井号代表通配符
2、 星号代表多个单词,井号代表一个单词
3、 路由功能添加模糊匹配
4、 消息产生者产生消息,把消息交给交换机
5、 交换机根据key的规则模糊匹配到对应的队列,由队列的监听消费者接收消息消费
在我的理解看来就是routing查询的一种模糊匹配,就类似sql的模糊查询方式
24、RabbitMQ基本概念
1、 Broker: 简单来说就是消息队列服务器实体
2、 Exchange: 消息交换机,它指定消息按什么规则,路由到哪个队列
3、 Queue: 消息队列载体,每个消息都会被投入到一个或多个队列
4、 Binding: 绑定,它的作用就是把exchange和queue按照路由规则绑定起来
5、 Routing Key: 路由关键字,exchange根据这个关键字进行消息投递
6、 VHost: vhost 可以理解为虚拟 broker ,即 mini-RabbitMQ server。其内部均含有独立的 queue、exchange 和 binding 等,但最最重要的是,其拥有独立的权限系统,可以做到 vhost 范围的用户控制。当然,从 RabbitMQ 的全局角度,vhost 可以作为不同权限隔离的手段(一个典型的例子就是不同的应用可以跑在不同的 vhost 中)。
7、 Producer: 消息生产者,就是投递消息的程序
8、 Consumer: 消息消费者,就是接受消息的程序
9、 Channel: 消息通道,在客户端的每个连接里,可建立多个channel,每个channel代表一个会话任务
由Exchange、Queue、RoutingKey三个才能决定一个从Exchange到Queue的唯一的线路。
25、消息如何被优先消费?
生产者
Map<String, Object> argss = new HashMap<String, Object>();
argss.put("x-max-priority",10);
消费者
AMQP.BasicProperties properties = new AMQP.BasicProperties.Builder()
.priority(5) // 优先级,默认为5,配合队列的 x-max-priority 属性使用
26、在单 node 系统和多 node 构成的 cluster 系统中声明 queue、exchange ,以及进行 binding 会有什么不同?
当你在单 node 上声明 queue 时,只要该 node 上相关元数据进行了变更,你就会得到 Queue.Declare-ok 回应;而在 cluster 上声明 queue ,则要求 cluster 上的全部node 都要进行元数据成功更新,才会得到 Queue.Declare-ok 回应。
另外,若 node 类型为 RAM node 则变更的数据仅保存在内存中,若类型为 disk node 则还要变更保存在磁盘上的数据。
27、RabbitMQ消息是如何路由的?
消息路由必须有三部分:交换器、路由、绑定。
生产者把消息发布到交换器上,绑定决定了消息如何从路由器路由到特定的队列;消息最终到达队列,并被消费者接收。
消息发布到交换器时,消息将拥有一个 路由键(routing key) , 在消息创建时设定。
通过队列路由键,可以把队列绑定到交换器上。
消息到达交换器后,RabbitMQ会将消息的路由键与队列的路由键进行匹配(针对不同的交换器有不同的路由规则)。如果能够匹配到队列,则消息会投递到相应队列中;如果不能匹配到任何队列,消息将进入"黑洞"。
常用的交换器主要分为以下三种:
direct :如果路由键完全匹配,消息就会被投递到相应的队列;每个AMQP的实现都必须有一个direct交换器,包含一个空白字符串名称的默认交换器。声明一个队列时,会自动绑定到默认交换器,并且以队列名称作为路由键:channel -> basic_public($msg, ‘’, ‘queue-name’)
fanout : 如果交换器收到消息,将会广播到所有绑定的队列上;
topic :可以使来自不同源头的消息能够到达同一个队列。使用topic交换器时,可以使用通配符,比如:“*” 匹配特定位置的任意文本,“.” 把路由键分为了几个标识符, “#” 匹配所有规则等。
特别注意:发往topic交换器的消息不能随意的设置选择键(routing_key),必须是有"."隔开的一系列的标识符组成。
28、交换器4种类型?
主要有以下4种。
fanout:把所有发送到该交换器的消息路由到所有与该交换器绑定的队列中。
direct:把消息路由到BindingKey和RoutingKey完全匹配的队列中。
topic:
匹配规则:
RoutingKey 为一个 点号’.': 分隔的字符串。 比如: java.xiaoka.show
BindingKey和RoutingKey一样也是点号“.“分隔的字符串。
BindingKey可使用 _ 和 # 用于做模糊匹配,_匹配一个单词,#匹配多个或者0个
headers:不依赖路由键匹配规则路由消息。是根据发送消息内容中的headers属性进行匹配。性能差,基本用不到。
29、交换器无法根据自身类型和路由键找到符合条件队列时,有哪些处理?
mandatory :true 返回消息给生产者。
mandatory: false 直接丢弃。
30、RabbitMQ队列结构?
通常由以下两部分组成:
rabbit_amqqueue_process:负责协议相关的消息处理,即接收生产者的消息、向消费者交付消息、处理消息的确认(包括生产端的 confirm 和消费端的 ack) 等。
backing_queue:是消息存储的具体形式和引擎,并向 rabbit amqqueue process 提供相关的接口以供调用。
31、集群中的节点类型?
内存节点:ram,将变更写入内存。
磁盘节点:disc,磁盘写入操作。
RabbitMQ要求最少有一个磁盘节点。
32、集群节点类型有几种?
内存节点:保存状态到内存,但持久化的队列和消息还是会保存到磁盘;
磁盘节点:保存状态到内存和磁盘,一个集群中至少需要一个磁盘节点
33、Consumer Cancellation Notification 机制用于什么场景?
用于保证当镜像 queue 中 master 挂掉时,连接到 slave 上的 consumer 可以收到自身 consume 被取消的通知,进而可以重新执行 consume 动作从新选出的 master 出获得消息。若不采用该机制,连接到 slave 上的 consumer 将不会感知 master 挂掉这个事情,导致后续无法再收到新 master 广播出来的 message 。另外,因为在镜像 queue 模式下,存在将 message 进行 requeue 的可能,所以实现 consumer 的逻辑时需要能够正确处理出现重复 message 的情况。
34、消息如何保证幂等性?
生产者方面:可以对每条消息生成一个msgID,以控制消息重复投递
AMQP.BasicProperties properties = new AMQP.BasicProperties.Builder()
porperties.messageId(String.valueOF(UUID.randomUUID()))
消费者方面:消息体中必须携带一个业务ID,如银行流水号,消费者可以根据业务ID去重,避免重复消费
35、事务机制?
RabbitMQ 客户端中与事务机制相关的方法有三个:
channel.txSelect 用于将当前的信道设置成事务模式。
channel 、txCommit 用于提交事务 。
channel 、txRollback 用于事务回滚,如果在事务提交执行之前由于 RabbitMQ 异常崩溃或者其他原因抛出异常,通过txRollback来回滚。
36、如何避免消息重复投递或重复消费?
在消息生产时,MQ内部针对每条生产者发送的消息生成一个inner-msg-id,作为去重和幂等的依据(消息投递失败并重传),避免重复的消息进入队列;在消息消费时,要求消息体中必须要有一个bizId(对于同一业务全局唯一,如支付ID、订单ID、帖子ID等)作为去重和幂等的依据,避免同一条消息被重复消费。
这个问题针对业务场景来答分以下几点:
1、 拿到这个消息做数据库的insert操作。然后给这个消息做一个唯一主键,那么就算出现重复消费的情况,就会导致主键冲突,避免数据库出现脏数据。
2、 拿到这个消息做Redis的set的操作,因为你无论set几次结果都是一样的,set操作本来就算幂等操作。
3、 如果上面两种情况还不行。准备一个第三方介质,来做消费记录。以Redis为例,给消息分配一个全局id,只要消费过该消息,将<id,message>以K-V形式写入Redis。那消费者开始消费前,先去Redis中查询有没消费记录即可。
37、routing_key 和 binding_key 的最大长度是多少?
255 字节。
38、RabbitMQ消息确认过程?
消费者收到的每一条消息都必须进行确认(自动确认和自行确认)
消费者在声明队列时,可以置顶autoAck参数,当autoAck = false时,RabbitMQ会等待消费者显式发送回 ack 信号后才从内存(和磁盘,如果是持久化消息的话)中删除消息,否则RabbitMQ会在队列中消息被消费后立即删除它。
采用消息确认机制后,只要使 autoAck = false,消费者就有足够的时间处理消息(任务),不用担心处理消息过程中消费者进程挂掉后消息丢失的问题,因为RabbitMQ会一直持有消息直到消费者显式调用basicAck为止。
当autoAck = false时,对于RabbitMQ服务器端而言,队列中的消息分成了两部分:一部分是等待投递给消费者的消息;一部分是已经投递给消费者,但是还没有收到消费者ack信号的消息。如果服务器端一直没有收到消费者的ack信号,并且消费此消息的消费者已经断开连接,则服务器端会安排该消息 重新进入队列,等待投递给下一个消费者(也可能还是原来的那个消费者)。
RabbitMQ不会为 ack消息设置超时时间,它判断此消息是否需要重新投递给消费者的唯一依据是消费该消息的消费者连接是否已经断开。这么设计的原因是RabbitMQ允许消费者消费一条消息的时间可以很久很久。
39、RabbitMQ如何实现延时队列?
利用TTL(队列的消息存活时间或者消息存活时间),加上死信交换机
// 设置属性,消息10秒钟过期
AMQP.BasicProperties properties = new AMQP.BasicProperties.Builder()
.expiration(“10000”) // TTL
// 指定队列的死信交换机
Map<String,Object> arguments = new HashMap<String,Object>();
arguments.put(“x-dead-letter-exchange”,“DLX_EXCHANGE”);
40、什么是RabbitMQ?为什么使用RabbitMQ?
答:RabbitMQ是一款开源的,Erlang编写的,基于AMQP协议的,消息中间件;
(1) 在分布式系统下具备异步,削峰,负载均衡等一系列高级功能;
(2) 拥有持久化的机制,进程消息,队列中的信息也可以保存下来。
(3) 实现消费者和生产者之间的解耦。
(4) 对于高并发场景下,利用消息队列可以使得同步访问变为串行访问达到一定 量的限流,利于数据库的操作。
(5) 可以使用消息队列达到异步下单的效果,排队中,后台进行逻辑下单。
41、RabbitMQ有什么优缺点?
答:优点:解耦、异步、削峰;
缺点:降低了系统的稳定性:本来系统运行好好的,现在你非要加入个消息队列进去,那消息队列挂了,你的系统不是呵呵了。因此,系统可用性会降低;
增加了系统的复杂性:加入了消息队列,要多考虑很多方面的问题,比如:一致性问题、如何保证消息不被重复消费、如何保证消息可靠性传输等。因此,需要考虑的东西更多,复杂性增大。
42、如何保证RabbitMQ的高可用?
答:没有哪个项目会只用一搭建一台RabbitMQ服务器提供服务,风险太大;
43、如何保证RabbitMQ不被重复消费?
答:先说为什么会重复消费:正常情况下,消费者在消费消息的时候,消费完毕后,会发送一个确认消息给消息队列,消息队列就知道该消息被消费了,就会将该消息从消息队列中删除;
但是因为网络传输等等故障,确认信息没有传送到消息队列,导致消息队列不知道自己已经消费过该消息了,再次将消息分发给其他的消费者。
针对以上问题,一个解决思路是:保证消息的唯一性,就算是多次传输,不要让消息的多次消费带来影响;保证消息等幂性;
比如:在写入消息队列的数据做唯一标示,消费消息时,根据唯一标识判断是否消费过;
44、如何保证RabbitMQ消息的可靠传输?
答:消息不可靠的情况可能是消息丢失,劫持等原因;
丢失又分为:生产者丢失消息、消息列表丢失消息、消费者丢失消息;
生产者丢失消息:从生产者弄丢数据这个角度来看,RabbitMQ提供transaction和confirm模式来确保生产者不丢消息;
transaction机制就是说:发送消息前,开启事务(channel.txSelect()),然后发送消息,如果发送过程中出现什么异常,事务就会回滚(channel.txRollback()),如果发送成功则提交事务(channel.txCommit())。然而,这种方式有个缺点:吞吐量下降;
confirm模式用的居多:一旦channel进入confirm模式,所有在该信道上发布的消息都将会被指派一个唯一的ID(从1开始),一旦消息被投递到所有匹配的队列之后;
rabbitMQ就会发送一个ACK给生产者(包含消息的唯一ID),这就使得生产者知道消息已经正确到达目的队列了;
如果rabbitMQ没能处理该消息,则会发送一个Nack消息给你,你可以进行重试操作。
消息队列丢数据:消息持久化。
处理消息队列丢数据的情况,一般是开启持久化磁盘的配置。
这个持久化配置可以和confirm机制配合使用,你可以在消息持久化磁盘后,再给生产者发送一个Ack信号。
这样,如果消息持久化磁盘之前,rabbitMQ阵亡了,那么生产者收不到Ack信号,生产者会自动重发。
那么如何持久化呢?
这里顺便说一下吧,其实也很容易,就下面两步
将queue的持久化标识durable设置为true,则代表是一个持久的队列
发送消息的时候将deliveryMode=2
这样设置以后,即使rabbitMQ挂了,重启后也能恢复数据
消费者丢失消息:消费者丢数据一般是因为采用了自动确认消息模式,改为手动确认消息即可!
消费者在收到消息之后,处理消息之前,会自动回复RabbitMQ已收到消息;
如果这时处理消息失败,就会丢失该消息;
解决方案:处理消息成功后,手动回复确认消息。
45、如何保证RabbitMQ消息的顺序性?
答:单线程消费保证消息的顺序性;对消息进行编号,消费者处理消息是根据编号处理消息;
拆分多个queue,每个queue —个consumer,就是多一些queue而已, 确实是麻烦点;或者就一个queue但是对应一个consumer,然后这个 consumer内部用内存队列做排队,然后分发给底层不同的worker来处理
46、 使用rabbitmq的场景
(1) 服务间异步通信
(2) 顺序消费
(3) 定时任务
(4) 流量削峰
(5) 解耦(为面向服务的架构(SOA)提供基本的最终一致性实现)
47、 如何确保消息正确地发送至RabbitMQ?如何确保消息接收方消费 了消息?
发送方稣模式
将信道设置成confirm模式(发送方确认模式),则所有在信道上发布的消息都会 被指派一个唯一的ID。
—旦消息被投递到目的队列后,或者消息被写入磁盘后(可持久化的消息),信道 会发送一个确认给生产者(包含消息唯一ID)。
如果RabbitMQ发生内部错误从皿导致消息丢失,会发送一条nack
(notacknowledged,未确认)消息。
发送方确认模式是异步的,生产者应用程序在等待确认的同时,可以继续发送消 息。当确认消息到达生产者应用程序,生产者应用程序的回调方法就会被触发来处 理确认消息。
接收方稣机制
消费者接收每一条消息后都必须进行确认(消息接收和消息确认是两个不同操 作)。只有消费者确认了消息,RabbitMQ才能安全地把消息从队列中删除。
这里并没有用到超时机制,RabbitMQ仅通过Consumer的连接中断来确认是否需 要重新发送消息。也就是说,只要连接不中断,RabbitMQ给了Consumer足够长 的时间来处理消息。保证数据的最终一致性;
以下是常见的源豚清况
(1) 如果消费者接收到消息,在确认之前断开了连接或取消订阅,RabbitMQ会 认为消息没有被分发,然后重新分发给下一个订阅的消费者。(可能存在消息重复 消费的隐患,需要去重)
(2) 如果消费者接收到消息却没有确认消息,连接也未断开,则RabbitMQ认为 该消费者繁忙,将不会给该消费者分发更多的消息。
48、消息基于什么传输?
由于TCP连接的创建和销毁开销较大,且并发数受系统资源限制,会造成性能菰 颈。RabbitMQ使用信道的方式来传输数据。信道是建立在真实的TCP连接内的虚 拟连接,且每条TCP连接上的信道数量没有限制。
49、 如何确保消息不丢失?
消息持久化,当然前提是队列必须持久化
RabbitMQ确保持久性消息能从服务器重启中恢复的方式是,将它们写入磁盘上的 —个持久化日志文件,当发布一条持久性消息到持久交换器上时,Rabbit会在消 息提交到日志文件后才发送响应。一旦消费者从持久队列中消费了一条持久化消 息,RabbitMQ会在持久化日志中把这条消息标记为等待垃圾收集。如果持久化消 息在被消费之前RabbitMQ重启,那么Rabbit会自动重建交换器和队列(以及绑 定),并重新发布持久化日志文件中的消息到合适的队列。
50、 RabbitMQ 的集群
镜像集群模式
你创建的queue,无论元数据还是queue里的消息都会存在于多个实例上,然后每 次你写消息到queue的时候,都会自动把消息到多个实例的queue里进行消息同 步。
好处在于,你任何一个机器宕机了,没事儿,别的机器都可以用。
坏处,第一,这个性能开销也太大了吧,消息同步所有机器,导致网络带宽压力和 消耗很重!
第二,这么玩儿,就没有扩展性可言了,如果某个queue负载很重,你加机器, 新増的机器也包含了这个queue的所有数据,并没有办法线性扩展你的queue
51、 RabbitMQ的工作模式
Simple模式(即最简单的收发模式)
-
消息产生消息,将消息放入队列
-
消息的消费者(consumer)监听消息队列,如果队列中有消息就消费掉,消息被拿走 后,自动从队列中删除(隐患消息可能没有被消费者正确处理,已经从队列中消失了, 造成消息的丢失,这里可以设置成手动的ack但如果设置成手动ack,处理完后要 及时发送ack消息给队列,否则会造成内存溢岀)。
-work工作模式(资源的竞争)
- 消息产生者将消息放入队列消费者可以有多个,消费者1,消费者2同时监听同一 个队列,消息被消费。C1C2共同争抢当前的消息队列内容,谁先拿到谁负责消费消 息(隐患:高并发情况下默认会产生某一个消息被多个消费者共同使用,可以设置一 个并关(syncronize)保证一条消息只能被一个消费者使用)。
52、publish/subscribe发布订阅(共享资源)
2、生产者将消息发给broker,由交换机将消息賴发到绑定此交换机的每个队 列,每个绑定交换机的队列都将接收到消息。
53. 设计MQ思路
比如说这个消息队列系统,可以从以下几个角度来考虑一下:
首先这个mq得支持可伸缩性吧,就是需要的时候快速扩容,就可以増加吞 吐量和容量,那怎么搞?设计个分布式的系统呗,参照一下kafka的设寸 理念,broker -> topic -> partition,母个 partition 放一个机器,就 存一部分数据。如果现在资源不够了,简单啊,给topic増加 partition,然后做数据迁移,増加机器,不就可以存放更多数据,提供更 高的吞吐量了?
其次你得考虑一下这个mq的数据要不要落地磁盘吧?那肯定要了,落磁盘 才能保证别进程挂了数据就丢了。那落磁盘的时候怎么落啊?顺序写,这样就没有磁盘随机读写的寻址开销,磁盘顺序读写的性能是很高的,这就是 kafka的思路。
其次你考虑一下你的mq的可用性啊?这个事儿,具体参考之前可用性那个 环节讲解的kafka的高可用保障机制。多副本-〉leader & follower -> broker挂了重新选举leader即可对外服务。
能不能支持数据0丢失啊?
通过offset commit来保证数据的不丢失,以及数据的不会重复消费。
kafka自己记录了每次消费的offset数值,下次继续消费的时候,接看上次的
offset进行;肖斐即可,可以通过手动的提交offset,保证数据的不会丢失。
54. rabbitmq怎么避免消息丢失
rabbitMQ消息可能丢失的情况:
L发送方发出消息但没有进入队列。
将信道设置成confirm模式(发送方确认模式),则所有在信道上发布的消息都 会被指派一个唯一的ID。一旦消息被投递到目的队列后,或者肖息被写入磁盘后
(可持久化的消息),信道会发送一个确认给生产者(包念肖息唯一ID )。
- 接收者接到消息,但处理过程出现错误。
当消费者获取消息后,会向RabbitMQ发送回执ACK ,告知消息我被接收。不 过这种回执ACK分两种情况:
自动ACK :消息一旦被接收,消费者自动发送ACK
如果消息不太重要,丢失也没有影响,那么自动ACK会比较方便
手动ACK :消息接收后,不会发送ACK ,需要手动调用
-如果消息非常重要,不容丢失。那么最好在消费完成后手动ACK ,否则接收消息 后就自动ACK , RabbitMQ就会把消息从队列中删除。如果此时f肖费者宕机,那 么消,諷丢失了。
- 队列或者列机。
消息持久化,当然前提是队列和交换机必须持久化
55.rabbitmq怎么实现延迟消息队列
使用RabbitMQ来实现延迟任务必须先了解RabbitMQ的两个概念:消息的TTL 和死信Exchange ,通过这两者的组合来实现上述需求。如果队列设置了,消息也 设置了,那么会取小的。所以一个消息如果被路由到不同的队列中,这个消息死亡 的时间有可能不一样(不同的队列设置)O
延迟任务通过消息的TTL和Dead Letter Exchange来实现。我们需要建立2个 队列,一个用于发送消息,一个用于消息过期后的转发目标队列。
生产者输出消息到Queuel,并且这个消息是设置有有效时间的,比如3分钟。 消息会在Queuel中等待3分钟,如果没有消费者收掉的话,它就是被转发到 Queue2 , Queue2有消费者,收到,处理延迟任务。
56、死信队列和延迟队列的使用?
1、 死信消息:消息被拒绝(Basic.Reject或Basic.Nack)并且设置 requeue 参数的值为 false 消息过期了 队列达到最大的长度
2、 过期消息:在 rabbitmq 中存在2种方可设置消息的过期时间,第一种通过对队列进行设置,这种设置后,该队列中所有的消息都存在相同的过期时间,第二种通过对消息本身进行设置,那么每条消息的过期时间都不一样。如果同时使用这2种方法,那么以过期时间小的那个数值为准。当消息达到过期时间还没有被消费,那么那个消息就成为了一个 死信 消息。
3、 队列设置:在队列申明的时候使用 x-message-ttl 参数,单位为 毫秒
4、 单个消息设置:是设置消息属性的 expiration 参数的值,单位为 毫秒
5、 延时队列:在rabbitmq中不存在延时队列,但是我们可以通过设置消息的过期时间和死信队列来模拟出延时队列。消费者监听死信交换器绑定的队列,而不要监听消息发送的队列。
场景演示:需求:用户在系统中创建一个订单,如果超过时间用户没有进行支付,那么自动取消订单。
分析:
1、 上面这个情况,我们就适合使用延时队列来实现,那么延时队列如何创建
2、 延时队列可以由 过期消息+死信队列 来时间
3、 过期消息通过队列中设置 x-message-ttl 参数实现
4、 死信队列通过在队列申明时,给队列设置 x-dead-letter-exchange 参数,然后另外申明一个队列绑定x-dead-letter-exchange对应的交换器。
ConnectionFactory factory = new ConnectionFactory();
factory.setHost("127.0.0.1");
factory.setPort(AMQP.PROTOCOL.PORT);
factory.setUsername("guest");
factory.setPassword("guest");
Connection connection = factory.newConnection();
Channel channel = connection.createChannel();
//
// 声明一个接收被删除的消息的交换机和队列
String EXCHANGE_DEAD_NAME = "exchange.dead";
String QUEUE_DEAD_NAME = "queue_dead";
channel.exchangeDeclare(EXCHANGE_DEAD_NAME, BuiltinExchangeType.DIRECT);
channel.queueDeclare(QUEUE_DEAD_NAME, false, false, false, null);
channel.queueBind(QUEUE_DEAD_NAME, EXCHANGE_DEAD_NAME, "routingkey.dead");
//
String EXCHANGE_NAME = "exchange.fanout";
String QUEUE_NAME = "queue_name";
channel.exchangeDeclare(EXCHANGE_NAME, BuiltinExchangeType.FANOUT);
Map<String, Object> arguments = new HashMap<String, Object>();
// 统一设置队列中的所有消息的过期时间
arguments.put("x-message-ttl", 30000);
// 设置超过多少毫秒没有消费者来访问队列,就删除队列的时间
arguments.put("x-expires", 20000);
// 设置队列的最新的N条消息,如果超过N条,前面的消息将从队列中移除掉
arguments.put("x-max-length", 4);
// 设置队列的内容的最大空间,超过该阈值就删除之前的消息
arguments.put("x-max-length-bytes", 1024);
// 将删除的消息推送到指定的交换机,一般x-dead-letter-exchange和x-dead-letter-routing-key需要同时设置
arguments.put("x-dead-letter-exchange", "exchange.dead");
// 将删除的消息推送到指定的交换机对应的路由键
arguments.put("x-dead-letter-routing-key", "routingkey.dead");
// 设置消息的优先级,优先级大的优先被消费
arguments.put("x-max-priority", 10);
channel.queueDeclare(QUEUE_NAME, false, false, false, arguments);
channel.queueBind(QUEUE_NAME, EXCHANGE_NAME, "");
String message = "Hello RabbitMQ: ";
//
for(int i = 1; i <= 5; i++) {
// expiration: 设置单条消息的过期时间
AMQP.BasicProperties.Builder properties = new AMQP.BasicProperties().builder().priority(i).expiration( i * 1000 + "");
channel.basicPublish(EXCHANGE_NAME, "", properties.build(), (message + i).getBytes("UTF-8"));
}
channel.close();
connection.close();
57、RabbitMQ的集群模式有几种?
RabbitMQ 是比较有代表性的,因为是基于主从(非分布式)做高可用性的,我们就以 RabbitMQ 为例子讲解第一种 MQ 的高可用性怎么实现。RabbitMQ 有三种模式:单机模式、普通集群模式、镜像集群模式。
1、 单机模式,就是 Demo 级别的,一般就是你本地启动了玩玩儿的?,没人生产用单机模式
2、 普通模式:以两个节点(rabbit01,rabbit02)为例来进行说明,对于Queue来说,消息实体只存在于其中一个节点rabbit01(或者rabbit02),rabbit01和rabbit02两个节点仅有相同的元数据,即队列结构。当消息进入rabbit01节点的Queue后,consumer从rabbit02节点消费时,RabbitMQ会临时在rabbit01,rabbit02间进行消息传输,把A中的消息实体取出并经过B发送给consumer,所以consumer应尽量连接每一个节点,从中取消息。即对于同一个逻辑队列,要在多个节点建立物理Queue。否则无论consumer连rabbit01或rabbit02,出口总在rabbit01,会产生瓶颈。当rabbit01节点故障后,rabbit02节点无法取到rabbit01节点中还未消费的消息实体。如果做了消息持久化,那么等到rabbit01节点恢复,然后才可被消费。如果没有消息持久化,就会产生消息丢失的现象。
3、 镜像模式:把需要的队列做成镜像队列,存在与多个节点属于RabibitMQ的HA方案,该模式解决了普通模式中的问题,其实质和普通模式不同之处在于,消息体会主动在镜像节点间同步,而不是在客户端取数据时临时拉取,该模式带来的副作用也很明显,除了降低系统性能外,如果镜像队列数量过多,加之大量的消息进入,集群内部的网络带宽将会被这种同步通讯大大消耗掉,所以在对可靠性要求比较高的场合中适用
58、无法被路由的消息去了哪里?
mandatory:true 返回消息给生产者。
mandatory:false 直接丢弃。
59、向不存在的 exchange 发 publish 消息会发生什么?向不存在的 queue 执行consume 动作会发生什么?
都会收到 Channel.Close 信令告之不存在(内含原因 404 NOT_FOUND)。
60、除了RabbitMQ还有什么消息队列方案
来源
RabbitMQ常见面试题及答案 90道(2021版)
常见Rabbitmq面试题及答案总结
RabbitMQ常用面试题有哪些