🍁博客主页:👉不会压弯的小飞侠
✨欢迎关注:👉点赞👍收藏⭐留言✒
✨系列专栏:👉Linux专栏
🔥欢迎大佬指正,一起学习!一起加油!
目录
- 🍁模式说明
- 🍁路由模式完成消息传递
🍁模式说明
-
使用同一绑定绑定多个队列是完全合法的 .key。在我们的示例中,我们可以在 X 和 Q1 之间添加一个绑定 绑定键黑色。在这种情况下,直接交换将表现良好 像扇出一样,会将消息广播到所有匹配 队列。路由密钥为黑色的消息将同时传递到 Q1 和 Q2。
-
需要设置类型为direct的交换机,交换机和队列进行绑定,并且指定routing key,当发送消息到交换机后,交换机会根据routing key将消息发送到对应的队列
-
Routing模式要求队列在绑定交换机时要指定routing key,消息会转发到符合routing key的队列
。
-
图解:
• P:生产者,向Exchange发送消息,发送消息时,会指定一个routing key。
• X:Exchange(交换机),接收生产者的消息,然后把消息递交给 与routing key完全匹配的队列
• C1:消费者,其所在队列指定了需要routing key 为 error 的消息
• C2:消费者,其所在队列指定了需要routing key 为 info、error、warning 的消息 -
路由模式特点:
• 队列与交换机的绑定,不能是任意绑定了,而是要指定一个RoutingKey(路由key)
• 消息的发送方在 向 Exchange发送消息时,也必须指定消息的 RoutingKey。
• Exchange不再把消息交给每一个绑定的队列,而是根据消息的Routing Key进行判断,只有队列的Routingkey与消息的 Routing key完全一致,才会接收到消息
🍁路由模式完成消息传递
- 编写生产者发送消息
- 编写消息生产者 Producter
public class Producer {
public static void main(String[] args) throws Exception {
Connection connection = ConnectionUtil.getConnection();
Channel channel = connection.createChannel();
String exchangeName = "test_direct";
// 创建交换机
channel.exchangeDeclare(exchangeName,BuiltinExchangeType.DIRECT,true,false,false,null);
// 创建队列
String queue1Name = "test_direct_queue1";
String queue2Name = "test_direct_queue2";
// 声明(创建)队列
channel.queueDeclare(queue1Name,true,false,false,null);
channel.queueDeclare(queue2Name,true,false,false,null);
// 队列绑定交换机
// 队列1绑定error
channel.queueBind(queue1Name,exchangeName,"error");
// 队列2绑定info error warning
channel.queueBind(queue2Name,exchangeName,"info");
channel.queueBind(queue2Name,exchangeName,"error");
channel.queueBind(queue2Name,exchangeName,"warning");
String message = "该消息会同时进入队列一和队列二!!!";
// 发送消息
channel.basicPublish(exchangeName,"warning",null,message.getBytes());
System.out.println(message);
channel.close();
connection.close();
}
}
-
测试
-
编写消费者接收消息
- 编写消息消费者Consumer1
public class Consumer1 {
public static void main(String[] args) throws Exception {
Connection connection = ConnectionUtil.getConnection();
Channel channel = connection.createChannel();
String queue1Name = "test_direct_queue1";
Consumer consumer = new DefaultConsumer(channel){
@Override
public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException {
System.out.println("body:"+new String(body));
}
};
channel.basicConsume(queue1Name,true,consumer);
}
}
- 编写消费者接收消息
- 编写消息消费者Consumer2
public class Consumer2 {
public static void main(String[] args) throws Exception {
Connection connection = ConnectionUtil.getConnection();
Channel channel = connection.createChannel();
String queue2Name = "test_direct_queue2";
Consumer consumer = new DefaultConsumer(channel){
@Override
public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException {
System.out.println("body:"+new String(body));
}
};
channel.basicConsume(queue2Name,true,consumer);
}
}
- 测试
- 启动所有消费者,然后使用生产者发送消息;在消费者对应的控制台可以查看到生产者发送对应routing key对应队列的消息;到达按照需要接收的效果。