🧑💻作者名称:DaenCode
🎤作者简介:CSDN实力新星,后端开发两年经验,曾担任甲方技术代表,业余独自创办智源恩创网络科技工作室。会点点Java相关技术栈、帆软报表、低代码平台快速开发。技术尚浅,闭关学习中······
😎人生感悟:尝尽人生百味,方知世间冷暖。
📖所属专栏:图解RabbitMQ
专栏推荐
- 专门为Redis入门打造的专栏,包含Redis基础知识、基础命令、五大数据类型实战场景、key删除策略、内存淘汰机制、持久化机制、哨兵模式、主从复制、分布式锁等等内容。
链接>>>>>>>>>
《Redis从头学》 - SpringBoot实战相关专栏,包含SpringBoot过滤器、拦截器、AOP实现日志、整合Freemaker、整合Redis等等实战相关内容,多篇文章登入全站热榜、领域热榜、被技术社区收录。
链接>>>>>>
《SpringBoot实战》
文章目录
- 专栏推荐
- 🌟前言
- 🌟连接工具类
- 🌟简单工作模型
- 介绍
- 代码实现
- 🌟工作队列模型
- 介绍
- 代码实现
- 🌟发布订阅模型
- 介绍
- 代码实现
- 🌟路由模型
- 介绍
- 代码实现
- 🌟主题模型
- 介绍
- 代码实现
- 🌟总结
- 🌟写在最后
参考网站:https://www.rabbitmq.com/getstarted.html
🌟前言
在上一节学习了RabbitMQ中交换机的相关基础知识,本文来学习一下RabbitMQ中的五种队列模型的,对其有一个基本的认识。
🌟连接工具类
public class MQConnectionUtil {
public static Connection createConnection() throws IOException, TimeoutException {
//创建连接工厂
ConnectionFactory factory = new ConnectionFactory();
factory.setHost("192.168.124.23");
factory.setUsername("admin");
factory.setPassword("password");
factory.setVirtualHost("/dev");
factory.setPort(5672);
return factory.newConnection();
}
}
🌟简单工作模型
介绍
模型图:
流程:
- 生产者发送消息到队列。
- 如果队列存在则直接存入消息;若不存在,先进行队列的创建。
- 消费者监听队列。
- 处理完消息,通过ACK机制确认消息已经消费。
特点:
- 只有一个消费者,并且其中没有交换机参与。
代码实现
生产者:
public class Send {
private final static String QUEUE_NAME="hello";
public static void main(String[] args) throws IOException, TimeoutException {
try ( //JDK7语法 或自动关闭 connnection和channel
//创建连接
Connection connection=MQConnectionUtil.createConnection();
//创建信道
Channel channel = connection.createChannel()) {
/**
* 队列名称
* 持久化配置:mq重启后还在
* 是否独占:只能有一个消费者监听队列;当connection关闭是否删除队列,一般是false,发布订阅是独占
* 自动删除: 当没有消费者的时候,自动删除掉,一般是false
* 其他参数
*
* 队列不存在则会自动创建,如果存在则不会覆盖,所以此时的时候需要注意属性
*/
channel.queueDeclare(QUEUE_NAME, false, false, false, null);
String message = "Hello World!";
/**
* 参数说明:
* 交换机名称:不写则是默认的交换机,那路由健需要和队列名称一样才可以被路由,
* 路由健名称
* 配置信息
* 发送的消息数据:字节数组
*/
//发布消息
channel.basicPublish("", QUEUE_NAME, null, message.getBytes(StandardCharsets.UTF_8));
System.out.println(" [x] Sent '" + message + "'");
}
}
}
消费者:
public class Recv {
private final static String QUEUE_NAME = "hello";
public static void main(String[] argv) throws Exception {
//消费者一般不增加自动关闭
Connection connection=MQConnectionUtil.createConnection();
Channel channel = connection.createChannel();
channel.queueDeclare(QUEUE_NAME, false, false, false, null);
System.out.println(" [*] Waiting for messages. To exit press CTRL+C");
//回调方法,下面两种都行
Consumer consumer = new DefaultConsumer(channel){
@Override
public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException {
// consumerTag 是固定的 可以做此会话的名字, deliveryTag 每次接收消息+1
System.out.println("consumerTag消息标识="+consumerTag);
//可以获取交换机,路由健等
System.out.println("envelope元数据="+envelope);
System.out.println("properties配置信息="+properties);
System.out.println("body="+new String(body,"utf-8"));
}
};
channel.basicConsume(QUEUE_NAME,true,consumer);
// DeliverCallback deliverCallback = (consumerTag,delivery) -> {
// String message = new String(delivery.getBody(), "UTF-8");
// System.out.println(" [x] Received '" + message + "'");
// };
//自动确认消息
// channel.basicConsume(QUEUE_NAME, true, deliverCallback, consumerTag -> { });
}
}
🌟工作队列模型
介绍
模型图:
特点:
- 生产者将消息发送到队列,并由多个消费者进行消费。
两种消费策略:
1 . 轮训策略:将消息平均分配给多个消费者进行消费,不考虑消费者的处理能力;采用自动ACK消息机制。
2. 公平策略:消费者每次只能处理一个消息。一定时间内,能力强者消费的多,否则少;采用手动ACK消息机制。
代码实现
轮询策略:
//生产者
public class Send {
private final static String QUEUE_NAME="work_rr";
public static void main(String[] args) throws IOException, TimeoutException {
try ( //JDK7语法 或自动关闭 connnection和channel
//创建连接
Connection connection= MQConnectionUtil.createConnection();
//创建信道
Channel channel = connection.createChannel()) {
/**
* 队列名称
* 持久化配置:mq重启后还在
* 是否独占:只能有一个消费者监听队列;当connection关闭是否删除队列,一般是false,发布订阅是独占
* 自动删除: 当没有消费者的时候,自动删除掉,一般是false
* 其他参数
*
* 队列不存在则会自动创建,如果存在则不会覆盖,所以此时的时候需要注意属性
*/
channel.queueDeclare(QUEUE_NAME, false, false, false, null);
for (int i = 0; i < 10; i++) {
String message = "Hello World!";
/**
* 参数说明:
* 交换机名称:不写则是默认的交换机,那路由健需要和队列名称一样才可以被路由,
* 路由健名称
* 配置信息
* 发送的消息数据:字节数组
*/
//发布消息
channel.basicPublish("", QUEUE_NAME, null, message.getBytes(StandardCharsets.UTF_8));
System.out.println(" [x] Sent '" + message + "'");
}
}
}
}
//消费者
public class Recv {
private final static String QUEUE_NAME = "work_rr";
public static void main(String[] argv) throws Exception {
//消费者一般不增加自动关闭
Connection connection= MQConnectionUtil.createConnection();
Channel channel = connection.createChannel();
channel.queueDeclare(QUEUE_NAME, false, false, false, null);
System.out.println(" [*] Waiting for messages. To exit press CTRL+C");
//回调方法,下面两种都行
Consumer consumer = new DefaultConsumer(channel){
@Override
public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException {
try {
TimeUnit.SECONDS.sleep(2);
} catch (InterruptedException e) {
throw new RuntimeException(e);
}
// consumerTag 是固定的 可以做此会话的名字, deliveryTag 每次接收消息+1
System.out.println("consumerTag消息标识="+consumerTag);
//可以获取交换机,路由健等
System.out.println("envelope元数据="+envelope);
System.out.println("properties配置信息="+properties);
System.out.println("body="+new String(body,"utf-8"));
channel.basicAck(envelope.getDeliveryTag(), false);
}
};
channel.basicConsume(QUEUE_NAME,false,consumer);
// DeliverCallback deliverCallback = (consumerTag,delivery) -> {
// String message = new String(delivery.getBody(), "UTF-8");
// System.out.println(" [x] Received '" + message + "'");
// };
//自动确认消息
// channel.basicConsume(QUEUE_NAME, true, deliverCallback, consumerTag -> { });
}
}
//消费者2
public class Recv2 {
private final static String QUEUE_NAME = "work_rr";
public static void main(String[] argv) throws Exception {
Connection connection= MQConnectionUtil.createConnection();
Channel channel = connection.createChannel();
channel.queueDeclare(QUEUE_NAME, false, false, false, null);
System.out.println(" [*] Waiting for messages. To exit press CTRL+C");
//回调方法,下面两种都行
Consumer consumer = new DefaultConsumer(channel){
@Override
public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException {
// consumerTag 是固定的 可以做此会话的名字, deliveryTag 每次接收消息+1
System.out.println("consumerTag消息标识="+consumerTag);
//可以获取交换机,路由健等
System.out.println("envelope元数据="+envelope);
System.out.println("properties配置信息="+properties);
System.out.println("body="+new String(body,"utf-8"));
channel.basicAck(envelope.getDeliveryTag(), false);
}
};
channel.basicConsume(QUEUE_NAME,false,consumer);
// DeliverCallback deliverCallback = (consumerTag,delivery) -> {
// String message = new String(delivery.getBody(), "UTF-8");
// System.out.println(" [x] Received '" + message + "'");
// };
//自动确认消息
// channel.basicConsume(QUEUE_NAME, true, deliverCallback, consumerTag -> { });
}
}
公平策略
通过channel.basicQos(1);确保每个消费者每次只能处理一个未确认的消息。
public class Send {
private final static String QUEUE_NAME="work_fair";
public static void main(String[] args) throws IOException, TimeoutException {
try ( //JDK7语法 或自动关闭 connnection和channel
//创建连接
Connection connection = MQConnectionUtil.createConnection();
//创建信道
Channel channel = connection.createChannel()) {
/**
* 队列名称
* 持久化配置:mq重启后还在
* 是否独占:只能有一个消费者监听队列;当connection关闭是否删除队列,一般是false,发布订阅是独占
* 自动删除: 当没有消费者的时候,自动删除掉,一般是false
* 其他参数
*
* 队列不存在则会自动创建,如果存在则不会覆盖,所以此时的时候需要注意属性
*/
channel.queueDeclare(QUEUE_NAME, false, false, false, null);
for (int i = 0; i < 10; i++) {
String message = "Hello World!";
/**
* 参数说明:
* 交换机名称:不写则是默认的交换机,那路由健需要和队列名称一样才可以被路由,
* 路由健名称
* 配置信息
* 发送的消息数据:字节数组
*/
//发布消息
channel.basicPublish("", QUEUE_NAME, null, message.getBytes(StandardCharsets.UTF_8));
System.out.println(" [x] Sent '" + message + "'");
}
}
}
}
//消费者1
public class Recv {
private final static String QUEUE_NAME = "work_fair";
public static void main(String[] argv) throws Exception {
Connection connection= MQConnectionUtil.createConnection();
Channel channel = connection.createChannel();
channel.queueDeclare(QUEUE_NAME, false, false, false, null);
System.out.println(" [*] Waiting for messages. To exit press CTRL+C");
channel.basicQos(1);
//回调方法,下面两种都行
Consumer consumer = new DefaultConsumer(channel){
@Override
public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException {
try {
TimeUnit.SECONDS.sleep(2);
} catch (InterruptedException e) {
throw new RuntimeException(e);
}
// consumerTag 是固定的 可以做此会话的名字, deliveryTag 每次接收消息+1
System.out.println("consumerTag消息标识="+consumerTag);
//可以获取交换机,路由健等
System.out.println("envelope元数据="+envelope);
System.out.println("properties配置信息="+properties);
System.out.println("body="+new String(body,"utf-8"));
channel.basicAck(envelope.getDeliveryTag(), false);
}
};
channel.basicConsume(QUEUE_NAME,false,consumer);
// DeliverCallback deliverCallback = (consumerTag,delivery) -> {
// String message = new String(delivery.getBody(), "UTF-8");
// System.out.println(" [x] Received '" + message + "'");
// };
//自动确认消息
// channel.basicConsume(QUEUE_NAME, true, deliverCallback, consumerTag -> { });
}
}
//消费者2
public class Recv2 {
private final static String QUEUE_NAME = "work_fair";
public static void main(String[] argv) throws Exception {
Connection connection=MQConnectionUtil.createConnection();
Channel channel = connection.createChannel();
channel.queueDeclare(QUEUE_NAME, false, false, false, null);
System.out.println(" [*] Waiting for messages. To exit press CTRL+C");
channel.basicQos(1);
//回调方法,下面两种都行
Consumer consumer = new DefaultConsumer(channel){
@Override
public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException {
// consumerTag 是固定的 可以做此会话的名字, deliveryTag 每次接收消息+1
System.out.println("consumerTag消息标识="+consumerTag);
//可以获取交换机,路由健等
System.out.println("envelope元数据="+envelope);
System.out.println("properties配置信息="+properties);
System.out.println("body="+new String(body,"utf-8"));
channel.basicAck(envelope.getDeliveryTag(), false);
}
};
channel.basicConsume(QUEUE_NAME,false,consumer);
// DeliverCallback deliverCallback = (consumerTag,delivery) -> {
// String message = new String(delivery.getBody(), "UTF-8");
// System.out.println(" [x] Received '" + message + "'");
// };
//自动确认消息
// channel.basicConsume(QUEUE_NAME, true, deliverCallback, consumerTag -> { });
}
}
🌟发布订阅模型
介绍
模型图:
特点:
- 一条消息可以被多个消费者同时接收。
- 采用扇形(Fanout)交换机。
- 无需路由Key
- 类似于公众号的订阅。
代码实现
生产者:
public class Send {
private final static String EXCHANGE_NAME="exchange_fanout";
public static void main(String[] args) throws IOException, TimeoutException {
try ( //JDK7语法 或自动关闭 connnection和channel
//创建连接
Connection connection = MQConnectionUtil.createConnection();
//创建信道
Channel channel = connection.createChannel()) {
channel.exchangeDeclare(EXCHANGE_NAME, BuiltinExchangeType.FANOUT);
String message="daencode rabbitmq pub";
channel.basicPublish(EXCHANGE_NAME," ",null,message.getBytes(StandardCharsets.UTF_8));
System.out.println("广播消息已经发送!!!!");
}
}
}
消费者:两个消费者都是一样的代码,都需要绑定相同的扇形交换机。
public class Recv {
private final static String EXCHANGE_NAME="exchange_fanout";
public static void main(String[] argv) throws Exception {
Connection connection = MQConnectionUtil.createConnection();
Channel channel = connection.createChannel();
//绑定交换机,fanout扇形,即广播类型
channel.exchangeDeclare(EXCHANGE_NAME,BuiltinExchangeType.FANOUT);
//获取队列(排它队列)
String queueName = channel.queueDeclare().getQueue();
//绑定队列和交换机,fanout交换机不用指定routingkey
channel.queueBind(queueName,EXCHANGE_NAME,"");
DeliverCallback deliverCallback = (consumerTag, delivery) -> {
String message = new String(delivery.getBody(), "UTF-8");
System.out.println(" [x] Received '" + message + "'");
};
//自动确认消息
channel.basicConsume(queueName, true, deliverCallback, consumerTag -> { });
}
}
🌟路由模型
介绍
模型图
特点:
- 交换机类型采用直连交换机,特定的路由key由特定的消费者进行消费。
- 交换机根据特定的路由key与队列进行绑定。
代码实现
以记录不同日志级别为例,不同的消费者进行不同日志级别的记录。
生产者:
public class Send {
private final static String EXCHANGE_NAME="exchange_direct";
public static void main(String[] args) throws IOException, TimeoutException {
try ( //JDK7语法 或自动关闭 connnection和channel
//创建连接
Connection connection = MQConnectionUtil.createConnection();
//创建信道
Channel channel = connection.createChannel()) {
channel.exchangeDeclare(EXCHANGE_NAME, BuiltinExchangeType.DIRECT);
String debugLog="[debug]daencode rabbitmq direct";
String errorLog="[error]出现error错误";
channel.basicPublish(EXCHANGE_NAME,"errorRoutingKey",null,errorLog.getBytes(StandardCharsets.UTF_8));
channel.basicPublish(EXCHANGE_NAME,"debugRoutingKey",null,debugLog.getBytes(StandardCharsets.UTF_8));
System.out.println("消息已经发送!!!!");
}
}
}
消费者1:只记录ERROR级别日志。
public class Recv1 {
private final static String EXCHANGE_NAME="exchange_direct";
public static void main(String[] argv) throws Exception {
Connection connection = MQConnectionUtil.createConnection();
Channel channel = connection.createChannel();
//绑定交换机,fanout扇形,即广播类型
channel.exchangeDeclare(EXCHANGE_NAME,BuiltinExchangeType.DIRECT);
//获取队列(排它队列)
String queueName = channel.queueDeclare().getQueue();
//绑定队列和交换机,fanout交换机不用指定routingkey
channel.queueBind(queueName,EXCHANGE_NAME,"errorRoutingKey");
DeliverCallback deliverCallback = (consumerTag, delivery) -> {
String message = new String(delivery.getBody(), "UTF-8");
System.out.println(" [x] Received '" + message + "'");
};
//自动确认消息
channel.basicConsume(queueName, true, deliverCallback, consumerTag -> { });
}
}
消费者2:只记录Debug级别日志。
public class Recv2 {
private final static String EXCHANGE_NAME="exchange_direct";
public static void main(String[] argv) throws Exception {
Connection connection = MQConnectionUtil.createConnection();
Channel channel = connection.createChannel();
//绑定交换机,fanout扇形,即广播类型
channel.exchangeDeclare(EXCHANGE_NAME,BuiltinExchangeType.DIRECT);
//获取队列(排它队列)
String queueName = channel.queueDeclare().getQueue();
//绑定队列和交换机,fanout交换机不用指定routingkey
channel.queueBind(queueName,EXCHANGE_NAME,"debugRoutingKey");
DeliverCallback deliverCallback = (consumerTag, delivery) -> {
String message = new String(delivery.getBody(), "UTF-8");
System.out.println(" [x] Received '" + message + "'");
};
//自动确认消息
channel.basicConsume(queueName, true, deliverCallback, consumerTag -> { });
}
}
🌟主题模型
介绍
模型图
特点:
- 交换机类型采用主题交换机。
- 路由key根据通配符规则,限定消息消费规则。*匹配一个词,#匹配一个或者多个词。
- 交换机通过通配符路由KEY将消息绑定到不同的队列,以此实现不同的消费者进行消息消费。
- 同时满足路由模型和发布订阅模型。
代码实现
生产者:生产者通过路由KEY向交换机发送消息。
public class Send {
private final static String EXCHANGE_NAME="exchange_topic";
public static void main(String[] args) throws IOException, TimeoutException {
try ( //JDK7语法 或自动关闭 connnection和channel
//创建连接
Connection connection= MQConnectionUtil.createConnection();
//创建信道
Channel channel = connection.createChannel()) {
channel.exchangeDeclare(EXCHANGE_NAME, BuiltinExchangeType.TOPIC);
String debugLog="[debug]daencode rabbitmq direct";
String errorLog="[error]出现error错误";
channel.basicPublish(EXCHANGE_NAME,"log.error",null,errorLog.getBytes(StandardCharsets.UTF_8));
channel.basicPublish(EXCHANGE_NAME,"log.debug",null,debugLog.getBytes(StandardCharsets.UTF_8));
System.out.println("广播消息已经发送!!!!");
}
}
}
消费者:
public class Recv1 {
private final static String EXCHANGE_NAME="exchange_topic";
public static void main(String[] argv) throws Exception {
Connection connection= MQConnectionUtil.createConnection();
Channel channel = connection.createChannel();
//绑定交换机,fanout扇形,即广播类型
channel.exchangeDeclare(EXCHANGE_NAME,BuiltinExchangeType.TOPIC);
//获取队列(排它队列)
String queueName = channel.queueDeclare().getQueue();
//绑定队列和交换机,fanout交换机不用指定routingkey
channel.queueBind(queueName,EXCHANGE_NAME,"*.debug");
DeliverCallback deliverCallback = (consumerTag, delivery) -> {
String message = new String(delivery.getBody(), "UTF-8");
System.out.println(" [x] Received '" + message + "'");
};
//自动确认消息
channel.basicConsume(queueName, true, deliverCallback, consumerTag -> { });
}
}
消费者2:
public class Recv2 {
private final static String EXCHANGE_NAME="exchange_topic";
public static void main(String[] argv) throws Exception {
Connection connection= MQConnectionUtil.createConnection();
Channel channel = connection.createChannel();
//绑定交换机,fanout扇形,即广播类型
channel.exchangeDeclare(EXCHANGE_NAME,BuiltinExchangeType.TOPIC);
//获取队列(排它队列)
String queueName = channel.queueDeclare().getQueue();
//绑定队列和交换机,fanout交换机不用指定routingkey
channel.queueBind(queueName,EXCHANGE_NAME,"*.error");
DeliverCallback deliverCallback = (consumerTag, delivery) -> {
String message = new String(delivery.getBody(), "UTF-8");
System.out.println(" [x] Received '" + message + "'");
};
//自动确认消息
channel.basicConsume(queueName, true, deliverCallback, consumerTag -> { });
}
}
🌟总结
模型 | 是否交换机参与 | 交换机类型 | 需要路由键 | 描述 |
---|---|---|---|---|
简单模型 | 否 | 无 | 否 | 消息直接发送到队列,最基本的消息传递模型。 |
工作模型 | 否 | 无 | 否 | 多个消费者共同处理一个队列中的消息。 |
发布订阅模型 | 是 | fanout | 否 | 将消息广播给所有绑定到交换机的队列,多个消费者同时订阅。 |
路由模型 | 是 | direct | 是 | 根据消息的路由键将消息发送到与之匹配的队列。 |
主题模型 | 是 | topic | 是 | 使用通配符进行灵活的路由,根据主题和通配符规则进行匹配。 |
🌟写在最后
有关于图解RabbitMQ五种队列模型介绍及代码实现到此就结束了。感谢大家的阅读,希望大家在评论区对此部分内容散发讨论,便于学到更多的知识。