二八佳人体似酥,腰间仗剑斩愚夫。虽然不见人头落,暗里教君骨髓枯。
在上一节中,我们创建了一个工作队列。我们假设的是工作队列背后,每个任务都恰好交付给一个消
费者(工作进程)。在这一部分中,我们将做一些完全不同的事情-我们将消息传达给多个消费者。这种模式 称为 ”发布/订阅”
为了说明这种模式,我们将构建一个简单的日志系统。它将由两个程序组成:第一个程序将发出日志消 息,第二个程序是消费者。其中我们会启动三个消费者,其中一个消费者接收到消息后把日志存储在磁盘,另外一个消费者接收到消息后把消息打印在屏幕上,第三个假装存储在数据库中,事实上第一个程序发出的日志消息将广播给所有消费 者者
Exchange
概念
RabbitMQ消息传递模型的核心思想是: 生产者生产的消息从不会直接发送到队列。实际上,通常生产
者甚至都不知道这些消息传递传递到了哪些队列中。
相反,生产者只能将消息发送到交换机(exchange),交换机工作的内容非常简单,一方面它接收来
自生产者的消息,另一方面将它们推入队列。交换机必须确切知道如何处理收到的消息。是应该把这些消 息放到特定队列还是说把他们到许多队列中还是说应该丢弃它们。这就的由交换机的类型来决定。
交换机类型
总共有以下类型:
直接(direct), 主题(topic) ,标题(headers) , 扇出(fanout)
无名 exchange
在本教程的前面部分我们对 exchange 一无所知,但仍然能够将消息发送到队列。之前能实现的
原因是因为我们使用的是默认交换,我们通过空字符串(“”)进行标识。
channel.basicPublish("", "YJLA", MessageProperties.PERSISTENT_TEXT_PLAIN, "你好啊,岳泽霖".getBytes(StandardCharsets.UTF_8));
第一个参数是交换机的名称。空字符串表示默认或无名称交换机:消息能路由发送到队列中其实 是由 routingKey(bindingkey)绑定 key 指定的,如果它存在的话
临时队列
之前的章节我们使用的是具有特定名称的队列(还记得 hello 和 ack_queue 吗?)。
队列的名称我对们来说至关重要-我们需要指定我们的消费者去消费哪个队列的消息。
每当我们连接到 Rabbit 时,我们都需要一个全新的空队列,为此我们可以创建一个具有随机名称的队列,或者能让服务器为我们选择一个随机队列名称那就更好了。其次一旦我们断开了消费者的连 接,队列将被自动删除。
创建临时队列的方式如下:
String queueName = channel.queueDeclare().getQueue();
绑定(bindings)
什么是 bingding 呢,binding 其实是 exchange 和 queue 之间的桥梁,
它告诉我们 exchange 和那个队列进行了绑定关系。比如说下面这张图告诉我们的就是 X 与 Q1和 Q2 进行了绑定
Fanout 扇出
Fanout 这种类型非常简单。正如从名称中猜到的那样,它是将接收到的所有消息广播到它知道的
所有队列中。系统中默认有些 exchange 类型
生产者
routkingKey 没有值
public class FanoutMessageProducer {
private static String EXCHANGE_NAME = "fanout_logs";
public static void main(String[] args) throws Exception {
Connection connection = ConnectionFactoryUtil.createConnection();
Channel channel = connection.createChannel();
// 创建交换机, 交换机 名称是 logs
channel.exchangeDeclare(EXCHANGE_NAME, BuiltinExchangeType.FANOUT,
true, false, null);
Scanner scanner = new Scanner(System.in);
while (scanner.hasNext()) {
// routingKey 为 空
channel.basicPublish(EXCHANGE_NAME, "",
null, new String("发布广播消息,内容为:" + scanner.next()).getBytes("UTF-8"));
}
channel.close();
connection.close();
}
}
控制台消费者1
public class FanoutMessageConsumer1 {
private static String EXCHANGE_NAME = "fanout_logs";
private static String QUEUE_NAME = "debug_console";
public static void main(String[] args) throws Exception {
Connection connection = ConnectionFactoryUtil.createConnection();
final Channel channel = connection.createChannel();
//1. 创建交换器
channel.exchangeDeclare(EXCHANGE_NAME,
BuiltinExchangeType.FANOUT, true, false, null);
//2. 创建队列
channel.queueDeclare(QUEUE_NAME, true, false, false, null);
//7. 绑定队列与 交换机。 其中 routingKey 为 ""
channel.queueBind(QUEUE_NAME, EXCHANGE_NAME, "");
// 打印到控制台
DeliverCallback deliverCallback = (consumerTag, message) -> {
System.out.println(QUEUE_NAME + ">>> 获取到消息 :" + new String(message.getBody()));
};
CancelCallback cancelCallback = (consumerTag) -> {
System.out.println(QUEUE_NAME + ">>>>> 中断了消息接收 " + consumerTag);
};
channel.basicConsume(
QUEUE_NAME, true, deliverCallback, cancelCallback);
//输入流等待
System.in.read();
//关闭
channel.close();
connection.close();
}
}
文件消费者2
public class FanoutMessageConsumer2 {
private static String EXCHANGE_NAME = "fanout_logs";
private static String QUEUE_NAME = "debug_file";
public static void main(String[] args) throws Exception {
Connection connection = ConnectionFactoryUtil.createConnection();
final Channel channel = connection.createChannel();
//1. 创建交换器
channel.exchangeDeclare(EXCHANGE_NAME,
BuiltinExchangeType.FANOUT, true, false, null);
//2. 创建队列
channel.queueDeclare(QUEUE_NAME, true, false, false, null);
//7. 绑定队列与 交换机。 其中 routingKey 为 ""
channel.queueBind(QUEUE_NAME, EXCHANGE_NAME, "");
// 打印到控制台
DeliverCallback deliverCallback = (consumerTag, message) -> {
String textMessage = new String(message.getBody());
// 将消息保存到 文件里面,追加形式。
FileUtil.appendUtf8Lines(Collections.singletonList(textMessage), "D:\\rabbitMq\\debug.log");
System.out.println(">>>> 追加信息到 文件 里面, 内容是:" + textMessage);
};
CancelCallback cancelCallback = (consumerTag) -> {
System.out.println(QUEUE_NAME + ">>>>> 中断了消息接收 " + consumerTag);
};
channel.basicConsume(QUEUE_NAME, true, deliverCallback, cancelCallback);
//输入流等待
System.in.read();
//关闭
channel.close();
connection.close();
}
}
数据库消费者3
public class FanoutMessageConsumer3 {
private static String EXCHANGE_NAME = "fanout_logs";
private static String QUEUE_NAME = "debug_db";
public static void main(String[] args) throws Exception {
Connection connection = ConnectionFactoryUtil.createConnection();
final Channel channel = connection.createChannel();
//1. 创建交换器
channel.exchangeDeclare(EXCHANGE_NAME,
BuiltinExchangeType.FANOUT, true, false, null);
//2. 创建队列
channel.queueDeclare(QUEUE_NAME, true, false, false, null);
//7. 绑定队列与 交换机。 其中 routingKey 为 ""
channel.queueBind(QUEUE_NAME, EXCHANGE_NAME, "");
// 打印到控制台
DeliverCallback deliverCallback = (consumerTag, message) -> {
String textMessage = new String(message.getBody());
// 假装存储到数据库里面
System.out.println(">>>>> 插入到数据库中的信息是:" + textMessage);
};
CancelCallback cancelCallback = (consumerTag) -> {
System.out.println(QUEUE_NAME + ">>>>> 中断了消息接收 " + consumerTag);
};
channel.basicConsume(QUEUE_NAME, true, deliverCallback, cancelCallback);
//输入流等待
System.in.read();
//关闭
channel.close();
connection.close();
}
}
验证
启动生产者 ,依次启动3个消费者
生产者依次发送3条消息
发送消息1
发送消息2
发送消息3
查询控制台消费者
debug_console>>> 获取到消息 :发布广播消息,内容为:发送消息1
debug_console>>> 获取到消息 :发布广播消息,内容为:发送消息2
debug_console>>> 获取到消息 :发布广播消息,内容为:发送消息3
查询文件消费者
>>>> 追加信息到 文件 里面, 内容是:发布广播消息,内容为:发送消息1
>>>> 追加信息到 文件 里面, 内容是:发布广播消息,内容为:发送消息2
>>>> 追加信息到 文件 里面, 内容是:发布广播消息,内容为:发送消息3
查看数据库消费者
>>>>> 插入到数据库中的信息是:发布广播消息,内容为:发送消息1
>>>>> 插入到数据库中的信息是:发布广播消息,内容为:发送消息2
>>>>> 插入到数据库中的信息是:发布广播消息,内容为:发送消息3
没有通过 routingKey 进行绑定, 三个消费者均可以收到。
Direct exchange
在上一节中,我们构建了一个简单的日志记录系统。我们能够向许多接收者广播日志消息。
在本节我们将向其中添加一些特别的功能-比方说我们只让某个消费者订阅发布的部分消息。
例如我们只把 较严重错误消息定向存储到日志文件(以节省磁盘空间),严重错误消息发送给数据库,
同时仍然能够在控制台上打印所有日志消息。
我们再次来回顾一下什么是 bindings,绑定是交换机和队列之间的桥梁关系。也可以这么理解:
队列只对它绑定的交换机的消息感兴趣。绑定用参数:routingKey 来表示也可称该参数为 binding key,
创建绑定我们用代码:channel.queueBind(queueName, EXCHANGE_NAME, “routingKey”);
绑定之后的 意义由其交换类型决定。
介绍
上一节中的我们的日志系统将所有消息广播给所有消费者,对此我们想做一些改变,
例如我们希望将 warn 和 error 级别的错误发送给 数据库, 将 info, warn, error 的发送给文件, 将 debug,info,warn,error 全部打印到控制台中。
Fanout 这种交换类型并不能给我们带来很大的灵活性-它只能进行无意识的 广播,在这里我们将使用 direct 这种类型来进行替换,这种类型的工作方式是,消息只去到它绑定的 routingKey 队列中去。
生产者
/**
* @ClassName:work
* @Description 消息发布者
* @Author 岳建立
* @Date 2020/12/22 19:55
* @Version 1.0
* <p>
* direct 直连, routingKey 路由 key 一致的,才进行发送。
* <p>
* console 接收 debug info warn error
* file 接收 info warn error
* db 接收 warn error
* routingKey 有值。
**/
public class DirectMessageProducer {
private static String EXCHANGE_NAME = "direct_logs";
public static void main(String[] args) throws Exception {
Connection connection = ConnectionFactoryUtil.createConnection();
Channel channel = connection.createChannel();
// 创建交换机, 交换机 名称是 logs
channel.exchangeDeclare(EXCHANGE_NAME, BuiltinExchangeType.DIRECT,
true, false, null);
// 定义一个路由 routingKey 和 信息 的map, 由 map 进行处理。
Map<String,String> messageMap = new HashMap<>();
messageMap.put("debug","一个 debug 消息 1");
messageMap.put("debug","一个 debug 消息 2");
messageMap.put("info","一个 info 消息 1");
messageMap.put("info","一个 info 消息 2");
messageMap.put("info","一个 info 消息3 ");
messageMap.put("warn","一个 warn 消息 1");
messageMap.put("warn","一个 warn 消息 2");
messageMap.put("warn","一个 warn 消息 3");
messageMap.put("error","一个 error 消息 1");
messageMap.put("error","一个 error 消息 2");
messageMap.put("error","一个 error 消息 3");
messageMap.put("error","一个 error 消息 4");
messageMap.forEach((routingKey,message)->{
try {
channel.basicPublish(EXCHANGE_NAME,routingKey,
null,message.getBytes("UTF-8"));
} catch (IOException e) {
e.printStackTrace();
}
});
//输入流等待
System.in.read();
channel.close();
connection.close();
// Scanner scanner = new Scanner(System.in);
// // 传入的格式为 debug 一个 debug 消息 1
// while (scanner.hasNextLine()) {
// // routingKey 为 空
// String inputMessage = scanner.nextLine();
//
// String[] splitMessage = inputMessage.split("\\ ");
//
// channel.basicPublish(EXCHANGE_NAME, splitMessage[0],
// null, splitMessage[1].getBytes("UTF-8"));
// }
// channel.close();
// connection.close();
}
}
消费者1 控制台
public class DirectMessageConsumer1 {
private static String EXCHANGE_NAME = "direct_logs";
private static String QUEUE_NAME = "log_console";
public static void main(String[] args) throws Exception {
Connection connection = ConnectionFactoryUtil.createConnection();
final Channel channel = connection.createChannel();
//1. 创建交换器
channel.exchangeDeclare(EXCHANGE_NAME,
BuiltinExchangeType.DIRECT, true, false, null);
//2. 创建队列
channel.queueDeclare(QUEUE_NAME, true, false, true, null);
//3. 绑定队列与 交换机。 其中 routingKey 为 debug,info,warn,error ,可以绑定多个。
channel.queueBind(QUEUE_NAME, EXCHANGE_NAME, "debug");
channel.queueBind(QUEUE_NAME, EXCHANGE_NAME, "info");
channel.queueBind(QUEUE_NAME, EXCHANGE_NAME, "warn");
channel.queueBind(QUEUE_NAME, EXCHANGE_NAME, "error");
// 打印到控制台
DeliverCallback deliverCallback = (consumerTag, message) -> {
System.out.println(QUEUE_NAME + "获取级别:" + message.getEnvelope().getRoutingKey() + ">>> 消息 :" + new String(message.getBody()));
};
CancelCallback cancelCallback = (consumerTag) -> {
System.out.println(QUEUE_NAME + ">>>>> 中断了消息接收 " + consumerTag);
};
channel.basicConsume(
QUEUE_NAME, true, deliverCallback, cancelCallback);
//输入流等待
System.in.read();
//关闭
channel.close();
connection.close();
}
}
消费者2 文件
public class DirectMessageConsumer2 {
private static String EXCHANGE_NAME = "direct_logs";
private static String QUEUE_NAME = "log_file";
public static void main(String[] args) throws Exception {
Connection connection = ConnectionFactoryUtil.createConnection();
final Channel channel = connection.createChannel();
//1. 创建交换器
channel.exchangeDeclare(EXCHANGE_NAME,
BuiltinExchangeType.DIRECT, true, false, null);
//2. 创建队列
channel.queueDeclare(QUEUE_NAME, true, false, true, null);
//7. 绑定队列与 交换机。 其中 routingKey 为 info ,warn, error
channel.queueBind(QUEUE_NAME, EXCHANGE_NAME, "info");
channel.queueBind(QUEUE_NAME, EXCHANGE_NAME, "warn");
channel.queueBind(QUEUE_NAME, EXCHANGE_NAME, "error");
// 打印到控制台
DeliverCallback deliverCallback = (consumerTag, message) -> {
String fileMessage = QUEUE_NAME + "获取级别:" + message.getEnvelope().getRoutingKey() + ">>> 消息 :" + new String(message.getBody());
FileUtil.appendUtf8Lines(Collections.singletonList(fileMessage), "D:\\rabbitMq\\log.log");
System.out.println(">>>> 追加信息到 文件 里面, 内容是:" + fileMessage);
};
CancelCallback cancelCallback = (consumerTag) -> {
System.out.println(QUEUE_NAME + ">>>>> 中断了消息接收 " + consumerTag);
};
channel.basicConsume(QUEUE_NAME, true, deliverCallback, cancelCallback);
//输入流等待
System.in.read();
//关闭
channel.close();
connection.close();
}
}
消费者3 数据库
public class DirectMessageConsumer3 {
private static String EXCHANGE_NAME = "direct_logs";
private static String QUEUE_NAME = "log_db";
public static void main(String[] args) throws Exception {
Connection connection = ConnectionFactoryUtil.createConnection();
final Channel channel = connection.createChannel();
//1. 创建交换器
channel.exchangeDeclare(EXCHANGE_NAME,
BuiltinExchangeType.DIRECT, true, false, null);
//2. 创建队列
channel.queueDeclare(QUEUE_NAME, true, false, true, null);
//7. 绑定队列与 交换机。 其中 routingKey 为 warn error
channel.queueBind(QUEUE_NAME, EXCHANGE_NAME, "warn");
channel.queueBind(QUEUE_NAME, EXCHANGE_NAME, "error");
// 打印到控制台
DeliverCallback deliverCallback = (consumerTag, message) -> {
String fileMessage = QUEUE_NAME + "获取级别:" + message.getEnvelope().getRoutingKey() + ">>> 消息 :" + new String(message.getBody());
// 假装存储到数据库里面
System.out.println(">>>>> 插入到数据库中的信息是:" + fileMessage);
};
CancelCallback cancelCallback = (consumerTag) -> {
System.out.println(QUEUE_NAME + ">>>>> 中断了消息接收 " + consumerTag);
};
channel.basicConsume(QUEUE_NAME, true, deliverCallback, cancelCallback);
//输入流等待
System.in.read();
//关闭
channel.close();
connection.close();
}
}
验证
启动生产者, 依次启动 三个消费者
消费者1 控制台打印:
消费者2 文件打印:
消费者3 数据库打印
Topics
在上一个小节中,我们改进了日志记录系统。我们没有使用只能进行随意广播的 fanout 交换机,而是
使用了 direct 交换机,从而有能实现有选择性地接收日志。
尽管使用 direct 交换机改进了我们的系统,但是它仍然存在局限性-比方说我们想接收的日志类型有
info.base 和 info.advantage,某个队列只想 info.base 的消息,那这个时候 direct 就办不到了。
这个时候 就只能使用 topic 类型
要求
发送到类型是 topic 交换机的消息的 routing_key 不能随意写,必须满足一定的要求,它必须是一个单
词列表,以点号分隔开。这些单词可以是任意单词,比如说:“stock.usd.nyse”, “nyse.vmw”, “quick.orange.rabbit”.这种类型的。
当然这个单词列表最多不能超过 255 个字节。
在这个规则列表中,其中有两个替换符是大家需要注意的
*(星号)可以代替一个单词
#(井号)可以替代零个或多个单词
匹配案例
下图绑定关系如下
Q1–>绑定的是 中间带 orange 带 3 个单词的字符串(.orange.)
Q2–>绑定的是 最后一个单词是 rabbit 的 3 个单词(..rabbit) 第一个单词是 lazy 的多个单词(lazy.#)
上图是一个队列绑定关系图,我们来看看他们之间数据接收情况是怎么样的
quick.orange.rabbit 被队列 Q1Q2接收到
lazy.orange.elephant 被队列 Q1Q2接收到
quick.orange.fox 被队列 Q1接收到
lazy.brown.fox 被队列 Q2接收到
lazy.pink.rabbit 虽然满足两个绑定但只被队列 Q2 接收一次
quick.brown.fox 不匹配任何绑定不会被任何队列接收到会被丢弃
quick.orange.male.rabbit 是四个单词不匹配任何绑定会被丢弃
lazy.orange.male.rabbit 是四个单词但匹配 Q2
当队列绑定关系是下列这种情况时需要引起注意
当一个队列绑定键是#,那么这个队列将接收所有数据,就有点像 fanout 了
如果队列绑定键当中没有#和*出现,那么该队列绑定类型就是 direct 了
生产者
public class TopicMessageProducer {
private static String EXCHANGE_NAME = "topic_logs";
public static void main(String[] args) throws Exception {
Connection connection = ConnectionFactoryUtil.createConnection();
Channel channel = connection.createChannel();
// 创建交换机, 交换机 名称是 logs
channel.exchangeDeclare(EXCHANGE_NAME, BuiltinExchangeType.TOPIC,
true, false, null);
// 定义一个路由 routingKey 和 信息 的map, 由 map 进行处理。
Map<String, String> messageMap = new HashMap<>();
messageMap.put("quick.orange.rabbit", "被队列 Q1Q2 接收到");
messageMap.put("lazy.orange.elephant", "被队列 Q1Q2 接收到");
messageMap.put("quick.orange.fox", "被队列 Q1 接收到");
messageMap.put("lazy.brown.fox", "被队列 Q2 接收到");
messageMap.put("info", "一个 info 消息3 ");
messageMap.put("lazy.pink.rabbit", "虽然满足两个绑定但只被队列 Q2 接收一次");
messageMap.put("quick.brown.fox", "不匹配任何绑定不会被任何队列接收到会被丢弃");
messageMap.put("quick.orange.male.rabbit", "是四个单词不匹配任何绑定会被丢弃");
messageMap.put("lazy.orange.male.rabbit", "是四个单词但匹配 Q2");
messageMap.forEach((routingKey, message) -> {
try {
channel.basicPublish(EXCHANGE_NAME, routingKey,
null, message.getBytes("UTF-8"));
} catch (IOException e) {
e.printStackTrace();
}
});
//输入流等待
System.in.read();
channel.close();
connection.close();
// Scanner scanner = new Scanner(System.in);
// while(scanner.hasNextLine()) {
// // routingKey 为 空
// String inputMessage = scanner.nextLine();
//
// String[] splitMessage = inputMessage.split("\\ ");
//
// channel.basicPublish(EXCHANGE_NAME,splitMessage[0],
// null,splitMessage[1].getBytes("UTF-8"));
// }
// channel.close();
// connection.close();
}
}
消费者1
public class TopicMessageConsumer1 {
private static String EXCHANGE_NAME = "topic_logs";
private static String QUEUE_NAME = "topic_log_console";
public static void main(String[] args) throws Exception {
Connection connection = ConnectionFactoryUtil.createConnection();
final Channel channel = connection.createChannel();
//1. 创建交换器
channel.exchangeDeclare(EXCHANGE_NAME,
BuiltinExchangeType.TOPIC, true, false, null);
//2. 创建队列
channel.queueDeclare(QUEUE_NAME, true, false, true, null);
//3. 绑定队列与 交换机。 其中 routingKey 为 debug,info,warn,error ,可以绑定多个。
channel.queueBind(QUEUE_NAME, EXCHANGE_NAME, "*.orange.*");
// 打印到控制台
DeliverCallback deliverCallback = (consumerTag, message) -> {
System.out.println(QUEUE_NAME + "获取级别:" + message.getEnvelope().getRoutingKey() + ">>> 消息 :" + new String(message.getBody()));
};
CancelCallback cancelCallback = (consumerTag) -> {
System.out.println(QUEUE_NAME + ">>>>> 中断了消息接收 " + consumerTag);
};
channel.basicConsume(
QUEUE_NAME, true, deliverCallback, cancelCallback);
//输入流等待
System.in.read();
//关闭
channel.close();
connection.close();
}
}
消费者2
public class TopicMessageConsumer2 {
private static String EXCHANGE_NAME = "topic_logs";
private static String QUEUE_NAME = "topic_log_file";
public static void main(String[] args) throws Exception {
Connection connection = ConnectionFactoryUtil.createConnection();
final Channel channel = connection.createChannel();
//1. 创建交换器
channel.exchangeDeclare(EXCHANGE_NAME,
BuiltinExchangeType.TOPIC, true, false, null);
//2. 创建队列
channel.queueDeclare(QUEUE_NAME, true, false, true, null);
//7. 绑定队列与 交换机。 其中 routingKey 为 info ,warn, error
channel.queueBind(QUEUE_NAME, EXCHANGE_NAME, "*.*.rabbit");
channel.queueBind(QUEUE_NAME, EXCHANGE_NAME, "lazy.#");
// 打印到控制台
DeliverCallback deliverCallback = (consumerTag, message) -> {
String fileMessage = QUEUE_NAME + "获取级别:" + message.getEnvelope().getRoutingKey() + ">>> 消息 :" + new String(message.getBody());
FileUtil.appendUtf8Lines(Collections.singletonList(fileMessage), "D:\\rabbitMq\\log.log");
System.out.println(">>>> 追加信息到 文件 里面, 内容是:" + fileMessage);
};
CancelCallback cancelCallback = (consumerTag) -> {
System.out.println(QUEUE_NAME + ">>>>> 中断了消息接收 " + consumerTag);
};
channel.basicConsume(QUEUE_NAME, true, deliverCallback, cancelCallback);
//输入流等待
System.in.read();
//关闭
channel.close();
connection.close();
}
}
验证
消费者1:
消费者2: