目录
MQ的构成
生产者
交换机
队列
消费者
通信方式
Producer -> Broker (包含Exchange)
Exchange -> Binding -> Queue -> Consumer
消息应答
为什么引入消息应答
消息自动重新入队
如何进行消息应答
案例Demo
MQ的构成
生产者 消费者 交换机和队列
生产者
产生数据发送消息的程序是生产者
交换机
交换机是 RabbitMQ 非常重要的一个部件,一方面它接收来自生产者的消息,另一方面它将消息
推送到队列中。交换机必须确切知道如何处理它接收到的消息,是将这些消息推送到特定队列还是推送到多个队列,亦或者是把消息丢弃,这个得有交换机类型决定。
队列
队列是 RabbitMQ 内部使用的一种数据结构,尽管消息流经 RabbitMQ 和应用程序,但它们只能存
储在队列中。队列仅受主机的内存和磁盘限制的约束,本质上是一个大的消息缓冲区。许多生产者可以将消息发送到一个队列,许多消费者可以尝试从一个队列接收数据。这就是我们使用队列的方式
消费者
消费与接收具有相似的含义。消费者大多时候是一个等待接收消息的程序。请注意生产者,消费
者和消息中间件很多时候并不在同一机器上。同一个应用程序既可以是生产者又是可以是消费者。
通信方式
RabbitMQ是基于AMQP协议来实现的消息中间件。AMQP,类似于HTTP协议,也是一个应用层的协议,网络层使用TCP来通信。因此,RabbitMQ也是典型的C-S模型,准确地说是C-S-C模型,因为伴随着RabbitMQ的使用,总是会有Producer与Consumer两个Client和一个Broker Server。
Client要与Server进行通信,就必须先建立连接,RabbitMQ中有Connection与Channel两个概念,前者就是一个TCP连接,后者是在这个连接上的虚拟概念,负责逻辑上的数据传递,因此,为了节省资源,一般在一个客户端中建立一个Connection,每次使用时再分配一个Channel即可。一个Connection可以有多个Channel。
Producer -> Broker (包含Exchange)
Broker 在英文中作动词是安排,协商的意思,名词是掮客。可以理解为中转 分发的功能。
Exchange -> Binding -> Queue -> Consumer
Exchange
message 到达 broker 的第一站,根据分发规则,匹配查询表中的 routing key,分发
消息到 queue 中去。常用的类型有:direct (point-to-point), topic (publish-subscribe) and fanout
(multicast)
Binding
exchange 和 queue 之间的虚拟连接,binding 中可以包含 routing key,Binding 信息被保
存到 exchange 中的查询表中,用于 message 的分发依据
Queue
消息最终被送到这里等待 consumer 取走
消息应答
为什么引入消息应答
消费者完成一个任务可能需要一段时间,如果其中一个消费者处理一个长的任务并仅只完成
了部分突然它挂掉了,会发生什么情况。RabbitMQ 一旦向消费者传递了一条消息,便立即将该消
息标记为删除。在这种情况下,突然有个消费者挂掉了,我们将丢失正在处理的消息,以及后续
发送给该消费这的消息,因为它无法接收到。
为了保证消息在发送过程中不丢失,rabbitmq 引入消息应答机制,消息应答就是:消费者在接
收到消息并且处理该消息之后,告诉 rabbitmq 它已经处理了,rabbitmq 可以把该消息删除了。
也可以配置消息的自动应答,不过这种模式仅适用在消费者可以高效地处理这些消息的情况下使用
如果消息手动应答的时候出现了某种状况导致应答消息没有被收到,那么broker不会删除该消息,而是会将其重新入队。
消息自动重新入队
如果消费者由于某些原因失去连接(其通道已关闭,连接已关闭或 TCP 连接丢失),导致消息
未发送 ACK 确认,RabbitMQ 将了解到消息未完全处理,并将对其重新排队。如果此时其他消费者可以处理,它将很快将其重新分发给另一个消费者。这样,即使某个消费者偶尔死亡,也可以确
保不会丢失任何消息。
如何进行消息应答
案例Demo
本来是由 worker2 处理的消息,因为worker2内部发生了异常而导致没有手动消息应答,broker没有收到应答消息于是将消息重新入队,重新分发进行处理。
public class Task02 {
private static final String TASK_QUEUE_NAME = "ack_queue";
public static void main(String[] argv) throws Exception {
try (Channel channel = RabbitMqUtils.getChannel()) {
boolean durable = true;
channel.queueDeclare(TASK_QUEUE_NAME, durable, false, false, null);
Scanner sc = new Scanner(System.in);
System.out.println("请输入信息");
while (sc.hasNext()) {
String message = sc.nextLine();
channel.basicPublish("", TASK_QUEUE_NAME, MessageProperties.PERSISTENT_TEXT_PLAIN, message.getBytes("UTF-8"));
System.out.println("生产者发出消息" + message);
}
}
}
}
public class Work01 {
private static final String ACK_QUEUE_NAME="ack_queue";
public static void main(String[] args) throws Exception {
Channel channel = RabbitMqUtils.getChannel();
System.out.println("C1 等待接收消息处理时间较短");
//消息消费的时候如何处理消息
DeliverCallback deliverCallback=(consumerTag, delivery)->{
String message= new String(delivery.getBody());
SleepUtils.sleep(1);
System.out.println("接收到消息:"+message);
channel.basicAck(delivery.getEnvelope().getDeliveryTag(),false);
};
//采用手动应答
boolean autoAck=false;
channel.basicConsume(ACK_QUEUE_NAME,autoAck,deliverCallback,(consumerTag)->{
System.out.println(consumerTag+"消费者取消消费接口回调逻辑");
});
}
}
public class Work02 {
private static final String ACK_QUEUE_NAME = "ack_queue";
public static void main(String[] args) throws Exception {
Channel channel = RabbitMqUtils.getChannel();
//消息消费的时候如何处理消息
DeliverCallback deliverCallback = (consumerTag, delivery) -> {
String message = new String(delivery.getBody());
System.out.println(" 出错了,消息没有应答,此时消息会重新入队交给 其他worker处理");
int i = 1/0;
System.out.println("接收到消息:" + message);
channel.basicAck(delivery.getEnvelope().getDeliveryTag(), false);
};
//采用手动应答
boolean autoAck = false;
channel.basicConsume(ACK_QUEUE_NAME, autoAck, deliverCallback, (consumerTag) -> {
System.out.println(consumerTag + "消费者取消消费接口回调逻辑");
});
}
}