RabbitMQ
【黑马程序员RabbitMQ全套教程,rabbitmq消息中间件到实战】
文章目录
- RabbitMQ
- 第一天 基础
- 4 RabbitMQ 的工作模式
- 4.4 Topic 通配符模式
- 4.4.1 模式说明
- 4.4.2 代码编写
- 4.4.3 小结
- 4.5 工作模式总结
第一天 基础
4 RabbitMQ 的工作模式
4.4 Topic 通配符模式
4.4.1 模式说明
看看文档
Topic 类型与 Direct 相比,都是可以根据 RoutingKey 把消息路由到不同的队列。只不过 Topic 类型中的 Exchange 可以让队列在绑定 Routing key 的时候使用通配符!
Routingkey 一般都是有一个或多个单词组成,多个单词之间以”.”分割,例如: item.insert
通配符规则:# 匹配一个或多个词,* 匹配不多不少恰好1个词,例如:item.# 能够匹配 item.insert.abc
或者 item.insert,item.* 只能匹配 item.insert
没毛病,很清晰
4.4.2 代码编写
来个小需求:
之前我们已经可以“定向” 实现将error 级别的信息存入数据库了,
现在我希望 将订单相关的日志【不管什么级别】 都走这个 方式,存入数据库。
【生产者】
package com.dingjiaxiong.producer;
import com.rabbitmq.client.BuiltinExchangeType;
import com.rabbitmq.client.Channel;
import com.rabbitmq.client.Connection;
import com.rabbitmq.client.ConnectionFactory;
import java.io.IOException;
import java.util.concurrent.TimeoutException;
/**
* ClassName: Producer_Topics
* date: 2022/11/16 10:58
* 发送消息
*
* @author DingJiaxiong
*/
public class Producer_Topics {
public static void main(String[] args) throws IOException, TimeoutException {
//1. 创建连接工厂
ConnectionFactory factory = new ConnectionFactory();
//2. 设置参数
factory.setHost("xxxxxxxxxxxxxxxxxx"); // 服务器IP【默认本机 localhost】
factory.setPort(5672); //端口【默认也是 5672】
factory.setVirtualHost("/ding"); //虚拟机【 默认是 /】
factory.setUsername("dingjiaxiong"); // 用户名【默认 guest】
factory.setPassword("12345"); //密码【默认 guest】
//3. 创建连接 Connection
Connection connection = factory.newConnection();
//4. 创建Channel
Channel channel = connection.createChannel();
//5. 创建交换机
/**
* exchangeDeclare(String exchange, BuiltinExchangeType type, boolean durable, boolean autoDelete, boolean internal, Map<String, Object> arguments)
* 参数解释:
* 1. exchange:交换机名称
* 2. type:交换机类型
* 【枚举】
* DIRECT("direct"):定向
* FANOUT("fanout"):扇形【广播】【发送消息到每一个与之绑定队列】
* TOPIC("topic"):通配符的方式
* HEADERS("headers"):参数匹配
* 3. durable:是否持久化
* 4. autoDelete:自动删除
* 5. internal:内部使用【一般false】
* 6. arguments:参数
* */
String exchangeName = "test_topic";
channel.exchangeDeclare(exchangeName, BuiltinExchangeType.TOPIC, true, false, false, null);
//6. 创建队列【两个】
String queue1Name = "test_topic_queue1";
String queue2Name = "test_topic_queue2";
channel.queueDeclare(queue1Name, true, false, false, null);
channel.queueDeclare(queue2Name, true, false, false, null);
//7. 绑定队列和交换机
/**
* queueBind(String queue, String exchange, String routingKey)
* 参数解释:
* 1. queue:参数
* 2. exchange:交换机名称
* 3. routingKey:路由key【绑定规则】[交换机 类型若为fanout, 默认为空]
* */
//routing key 系统的名称.日志的级别
// 需求:所有error 级别的日志存入数据库,所有order 系统的日志存入数据库
channel.queueBind(queue1Name, exchangeName, "#.error");
channel.queueBind(queue1Name, exchangeName, "order.*");
channel.queueBind(queue2Name, exchangeName, "*.*");
//8. 发送消息
String body = "这是一条日志信息...【order 系统的】";
channel.basicPublish(exchangeName,"order.info",null,body.getBytes());
//9. 释放资源
channel.close();
connection.close();
}
}
OK,直接运行
OK, 查看管控台
交换机创建成功,查看绑定关系
OK,没问题
查看队列
可以看到 两个 队列中都收到了 消息
【消费者】
package com.dingjiaxiong.consumer;
import com.rabbitmq.client.*;
import java.io.IOException;
import java.util.concurrent.TimeoutException;
/**
* ClassName: Consumer_Topic1
* date: 2022/11/16 12:37
*
* @author DingJiaxiong
*/
public class Consumer_Topic1 {
public static void main(String[] args) throws IOException, TimeoutException {
//1. 创建连接工厂
ConnectionFactory factory = new ConnectionFactory();
//2. 设置参数
factory.setHost("xxxxxxxxxxxxxxxxx"); // 服务器IP【默认本机 localhost】
factory.setPort(5672); //端口【默认也是 5672】
factory.setVirtualHost("/ding"); //虚拟机【 默认是 /】
factory.setUsername("dingjiaxiong"); // 用户名【默认 guest】
factory.setPassword("12345"); //密码【默认 guest】
//3. 创建连接 Connection
Connection connection = factory.newConnection();
//4. 创建Channel
Channel channel = connection.createChannel();
String queue1Name = "test_topic_queue1";
String queue2Name = "test_topic_queue2";
// 接收消息【消费消息】
/**
* basicConsume(String queue, boolean autoAck, Consumer callback)
* 参数解释:
* 1. queue:队列名称
* 2. autoAck:是否自动确认
* 3. callback:回调对象
* */
Consumer consumer = new DefaultConsumer(channel){
//回调方法,当收到消息后,会自动执行这个 方法
/*
* 1. consumerTag:消息的标识
* 2. envelope:获取一些 信息【交换机、路由key...】
* 3. properties: 配置信息
* 4. body: 数据
* */
@Override
public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException {
// System.out.println("consumerTag:" + consumerTag);
// System.out.println("Exchange:" + envelope.getExchange());
// System.out.println("RoutingKey:" + envelope.getRoutingKey());
// System.out.println("properties:" + properties);
System.out.println("body:" + new String(body));
System.out.println("将日志 存入数据库...");
}
};
channel.basicConsume(queue1Name,true,consumer);
// 不要关闭资源,让它一直监听
}
}
OK,消费者 2
OK
直接运行
OK,没毛病,order.info 既打印到 了控制台,也存入了 数据库
现在来条goods 日志
直接发送
再来个goods.error
没毛病。【这就是 Topic 工作模式】
4.4.3 小结
Topic 主题模式可以实现 Pub/Sub 发布与订阅模式和 Routing 路由模式的功能,只是 Topic 在配置routing key 的时候可以使用通配符,显得更加灵活。
4.5 工作模式总结
- 简单模式 HelloWorld
一个生产者、一个消费者,不需要设置交换机(使用默认的交换机)。
- 工作队列模式 Work Queue
一个生产者、多个消费者(竞争关系),不需要设置交换机(使用默认的交换机)。
- 发布订阅模式 Publish/subscribe
需要设置类型为 fanout 的交换机,并且交换机和队列进行绑定,当发送消息到交换机后,交换机会将消息发送到绑定的队列。
- 路由模式 Routing
需要设置类型为 direct 的交换机,交换机和队列进行绑定,并且指定 routing key,当发送消息到交换机后,交换机会根据 routing key 将消息发送到对应的队列。
- 通配符模式 Topic
需要设置类型为 topic 的交换机,交换机和队列进行绑定,并且指定通配符方式的 routing key,当发送消息到交换机后,交换机会根据 routing key 将消息发送到对应的队列。