消息队列
- 消息队列的模型
- 为什么要用消息队列
- 分布式消息队列
- 应用场景
- 分布式消息队列选型
- RabbitMQ入门实战
-
- single
- Work
- Fanout
- Direct
- Topic
- 核心特性
-
- 消息过期机制
- 消息确认机制
- 死信队列
消息队列的模型
生产者:Producer,发送消息的人(客户端)
消费者:Consumer,接受消息的人(客户端)
消息:Message,生产者要传输给消费者的数据
消息队列:Queue,存放消息的队列
为什么要用消息队列
- 异步处理:
生产者发送完消息之后,可以继续去忙别的,消费者想什么时候消费都可以,不会产生阻塞。
- 削峰填谷:
AI处理请求的速率是有限的,如果遇到流量突刺的情况,大量的请求都会失败,并且也会给系统造成很大的压力。使用消息队列,则先可以将大量的请求放入到消息队列,然后让AI服务以自己的速率执行请求,从而保护系统的稳定性。
分布式消息队列
优点:
- 数据持久化:它可以把消息集中存储到硬盘里,服务器重启就不会丢失
- 可扩展性:可以根据需求,随时增加(或减少)节点,继续保持稳定的服务
- 应用解耦:可以连接各个不同语言、框架开发的系统,让这些系统能够灵活传输读取数据
- 发布订阅:通过订阅消息队列,来得到其他系统的变更通知
应用场景
- 耗时的场景(异步)
- 高并发场景(异步、削峰填谷)
- 分布式系统协作(尤其是跨团队、跨业务协作,应用解耦)
- 强稳定性的场景(比如金融业务,持久化、可靠性、削峰填谷)
分布式消息队列选型
RabbitMQ入门实战
RabbitMQ官网
single
一个生产者生产消息,一个消费者消费消息
SingleProducer
public class SingleProducer {
private final static String QUEUE_NAME = "hello";
public static void main(String[] argv) throws Exception {
ConnectionFactory factory = new ConnectionFactory();
factory.setHost("localhost");
try (Connection connection = factory.newConnection();
Channel channel = connection.createChannel()) {
// 声明队列
channel.queueDeclare(QUEUE_NAME, false, false, false, null);
String message = "Hello World!";
channel.basicPublish("", QUEUE_NAME, null, message.getBytes(StandardCharsets.UTF_8));
System.out.println(" SingleProducer Sent '" + message + "'");
}
}
}
SingleConsumer
public class SingleConsumer {
private final static String QUEUE_NAME = "hello";
public static void main(String[] argv) throws Exception {
ConnectionFactory factory = new ConnectionFactory();
factory.setHost("localhost");
Connection connection = factory.newConnection();
Channel channel = connection.createChannel();
channel.queueDeclare(QUEUE_NAME, false, false, false, null);
System.out.println(" [*] Waiting for messages. To exit press CTRL+C");
DeliverCallback deliverCallback = (consumerTag, delivery) -> {
String message = new String(delivery.getBody(), StandardCharsets.UTF_8);
System.out.println(" SingleConsumer Received '" + message + "'");
};
channel.basicConsume(QUEUE_NAME, true, deliverCallback, consumerTag -> {
});
}
}
Work
一个生产者生产消息,可以有多个消费者消费。
消费者的消费模式:
- 平均消费:各个消费者均分消息
- 能者多劳:做一个,取一个
WorkerProducer
public class WorkerProducer {
private static final String TASK_QUEUE_NAME = "worker_queue";
public static void main(String[] argv) throws Exception {
ConnectionFactory factory = new ConnectionFactory();
factory.setHost("localhost");
try (Connection connection = factory.newConnection();
Channel channel = connection.createChannel()) {
// durable: 队列持久化
channel.queueDeclare(TASK_QUEUE_NAME, true, false, false, null);
Scanner sc = new Scanner(System.in);
while (sc.hasNext()) {
String message = sc.nextLine();
// MessageProperties.PERSISTENT_TEXT_PLAIN: 消息持久化
channel.basicPublish("", TASK_QUEUE_NAME,
MessageProperties.PERSISTENT_TEXT_PLAIN,
message.getBytes("UTF-8"));
System.out