RabbitMQ
【黑马程序员RabbitMQ全套教程,rabbitmq消息中间件到实战】
文章目录
- RabbitMQ
- 第一天 基础
- 4 RabbitMQ 的工作模式
- 4.3 Routing 路由模式
- 4.3.1 模式说明
- 4.3.2 代码编写
- 4.3.3 小结
第一天 基础
4 RabbitMQ 的工作模式
4.3 Routing 路由模式
4.3.1 模式说明
先来看个问题
上次我们使用发布订阅 Pub / Sub 工作模式 完成了发送一条消息,可以被两个消费者 分别处理的例子
这里面有个不合理的地方,假设 日志信息只是等级为 info 的不是那么重要 的信息, 这种信息就没有必要存入数据库
现在的需求:同一条消息 由不同的消费者接收并去 做不同的事【比如说 info级别 的日志就打印控制台,error 级别的日志 再存入数据库】
【这就要用到 我们接下来要讲的 Routing 工作模式了】
【模式说明】
看看官网
队列与交换机的绑定,不能是任意绑定了,而是要指定一个 RoutingKey(路由key)
消息的发送方在向 Exchange 发送消息时,也必须指定消息的 RoutingKey
Exchange 不再把消息交给每一个绑定的队列,而是根据消息的 Routing Key 进行判断,只有队列的Routingkey 与消息的 Routing key 完全一致,才会接收到消息
图解:
- P:生产者,向 Exchange 发送消息,发送消息时,会指定一个routing key
- X:Exchange(交换机),接收生产者的消息,然后把消息递交给与 routing key 完全匹配的队列
- C1:消费者,其所在队列指定了需要 routing key 为 error 的消息
- C2:消费者,其所在队列指定了需要 routing key 为 info、error、warning 的消息
4.3.2 代码编写
【生产者】
package com.dingjiaxiong.producer;
import com.rabbitmq.client.BuiltinExchangeType;
import com.rabbitmq.client.Channel;
import com.rabbitmq.client.Connection;
import com.rabbitmq.client.ConnectionFactory;
import java.io.IOException;
import java.util.concurrent.TimeoutException;
/**
* ClassName: Producer_Routing
* date: 2022/11/16 10:58
* 发送消息
*
* @author DingJiaxiong
*/
public class Producer_Routing {
public static void main(String[] args) throws IOException, TimeoutException {
//1. 创建连接工厂
ConnectionFactory factory = new ConnectionFactory();
//2. 设置参数
factory.setHost("xxxxxxxxxxxxxxxx"); // 服务器IP【默认本机 localhost】
factory.setPort(5672); //端口【默认也是 5672】
factory.setVirtualHost("/ding"); //虚拟机【 默认是 /】
factory.setUsername("dingjiaxiong"); // 用户名【默认 guest】
factory.setPassword("12345"); //密码【默认 guest】
//3. 创建连接 Connection
Connection connection = factory.newConnection();
//4. 创建Channel
Channel channel = connection.createChannel();
//5. 创建交换机
/**
* exchangeDeclare(String exchange, BuiltinExchangeType type, boolean durable, boolean autoDelete, boolean internal, Map<String, Object> arguments)
* 参数解释:
* 1. exchange:交换机名称
* 2. type:交换机类型
* 【枚举】
* DIRECT("direct"):定向
* FANOUT("fanout"):扇形【广播】【发送消息到每一个与之绑定队列】
* TOPIC("topic"):通配符的方式
* HEADERS("headers"):参数匹配
* 3. durable:是否持久化
* 4. autoDelete:自动删除
* 5. internal:内部使用【一般false】
* 6. arguments:参数
* */
String exchangeName = "test_direct";
channel.exchangeDeclare(exchangeName, BuiltinExchangeType.DIRECT, true, false, false, null);
//6. 创建队列【两个】
String queue1Name = "test_direct_queue1";
String queue2Name = "test_direct_queue2";
channel.queueDeclare(queue1Name, true, false, false, null);
channel.queueDeclare(queue2Name, true, false, false, null);
//7. 绑定队列和交换机
/**
* queueBind(String queue, String exchange, String routingKey)
* 参数解释:
* 1. queue:参数
* 2. exchange:交换机名称
* 3. routingKey:路由key【绑定规则】[交换机 类型若为fanout, 默认为空]
* */
channel.queueBind(queue1Name, exchangeName, "error");
channel.queueBind(queue2Name, exchangeName, "info");
channel.queueBind(queue2Name, exchangeName, "error");
channel.queueBind(queue2Name, exchangeName, "warning");
//8. 发送消息
String body = "这是一条日志信息...级别为info";
channel.basicPublish(exchangeName,"info",null,body.getBytes());
//9. 释放资源
channel.close();
connection.close();
}
}
直接运行
OK, 消息发送完成,看看管控台
交换机创建了,看看绑定关系
没问题, 看看队列情况
可以看到 只有队列2 中,即设置了路由key 为 info 的队列 才有消息
【消费者】
package com.dingjiaxiong.consumer;
import com.rabbitmq.client.*;
import java.io.IOException;
import java.util.concurrent.TimeoutException;
/**
* ClassName: Consumer_Routing1
* date: 2022/11/16 12:37
*
* @author DingJiaxiong
*/
public class Consumer_Routing1 {
public static void main(String[] args) throws IOException, TimeoutException {
//1. 创建连接工厂
ConnectionFactory factory = new ConnectionFactory();
//2. 设置参数
factory.setHost("43.138.50.253"); // 服务器IP【默认本机 localhost】
factory.setPort(5672); //端口【默认也是 5672】
factory.setVirtualHost("/ding"); //虚拟机【 默认是 /】
factory.setUsername("dingjiaxiong"); // 用户名【默认 guest】
factory.setPassword("12345"); //密码【默认 guest】
//3. 创建连接 Connection
Connection connection = factory.newConnection();
//4. 创建Channel
Channel channel = connection.createChannel();
String queue1Name = "test_direct_queue1";
String queue2Name = "test_direct_queue2";
// 接收消息【消费消息】
/**
* basicConsume(String queue, boolean autoAck, Consumer callback)
* 参数解释:
* 1. queue:队列名称
* 2. autoAck:是否自动确认
* 3. callback:回调对象
* */
Consumer consumer = new DefaultConsumer(channel){
//回调方法,当收到消息后,会自动执行这个 方法
/*
* 1. consumerTag:消息的标识
* 2. envelope:获取一些 信息【交换机、路由key...】
* 3. properties: 配置信息
* 4. body: 数据
* */
@Override
public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException {
// System.out.println("consumerTag:" + consumerTag);
// System.out.println("Exchange:" + envelope.getExchange());
// System.out.println("RoutingKey:" + envelope.getRoutingKey());
// System.out.println("properties:" + properties);
System.out.println("body:" + new String(body));
System.out.println("将日志 打印到了 控制台...");
}
};
channel.basicConsume(queue2Name,true,consumer);
// 不要关闭资源,让它一直监听
}
}
OK,q2 让它做打印到控制台的工作,再来一个消费者 2,专门做error日志的存储数据库的操作
OK, 直接运行
OK, 效果也很明显, 存储数据库操作 并没有运行,因为路由 key 对不上,
现在再来发送一条消息error 的
直接运行
发送完成,查看控制台
没毛病,这样就达到了我们 最上面的 需求
【这就是 Routing 工作模式】
4.3.3 小结
Routing 模式要求队列在绑定交换机时要指定 routing key,消息会转发到符合 routing key 的队列。