1.回顾:
- 1.在路由模式中,我改进了日志记录系统。我没有使用只能进行随意广播的 fanout 交换机,而是使用了 direct 交换机,从而有能实现有选择性地接收日志。
- 2.尽管使用 direct 交换机改进了我们的系统,但是它仍然存在局限性-比方说我们
想接收的日志类型有info.base 和 info.advantage
,某个队列只想 info.base 的消息,那这个时候 direct 就办不到了
。这个时候就只能使用 topic 类型
2Topic说明:
2.1. Topic的要求
- 1.发送到类型是
topic 交换机的消息的 routing_key 不能随意写
,必须满足一定的要求,它必须是一个单词列表,以点号分隔开
。这些单词可以是任意单词,比如说:"stock.usd.nyse", "nyse.vmw", "quick.orange.rabbit".
这种类型的。 - 2.当然这个单词列表
最多不能超过 255 个字节
。 - 3.在这个规则列表中,其中有两个替换符是大家需要注意的:
*(星号)可以代替一个单词
,#(井号)可以替代零个或多个单词
2.2.Topic 匹配案例
a.下图绑定关系如下
- Q1-->绑定的是: `中间带 orange 带 3 个单词的字符串(*.orange.*)`
- Q2-->绑定的是:`最后一个单词是 rabbit 的 3 个单词(*.*.rabbit)和第一个单词是 lazy 的多个单词(lazy.#)`
b.举例:
c.当队列绑定关系是下列这种情况时需要引起注意
- 当一个
队列绑定键是#,
那么这个队列将接收所有数据,就有点像 fanout 了
- 如果队列绑定键
当中没有#和*出现,
那么该队列绑定类型就是 direct 了
3.编码实现主题模式:
3.1.主体模式结构图
3.2.编码实现:
- 2.消费者1:
- 3.消费者2:
- 3.生产者:
public class EmitLogTopic {
private static final String EXCHANGE_NAME = "topic_logs";
public static void main(String[] argv) throws Exception {
try (Channel channel = RabbitUtils.getChannel()) {
channel.exchangeDeclare(EXCHANGE_NAME, "topic");
/**
* Q1-->绑定的是
* 中间带 orange 带 3 个单词的字符串(*.orange.*)
* Q2-->绑定的是
* 最后一个单词是 rabbit 的 3 个单词(*.*.rabbit)
* 第一个单词是 lazy 的多个单词(lazy.#)
*
*/
Map<String, String> bindingKeyMap = new HashMap<>();
bindingKeyMap.put("quick.orange.rabbit","被队列 Q1Q2 接收到");
bindingKeyMap.put("lazy.orange.elephant","被队列 Q1Q2 接收到");
bindingKeyMap.put("quick.orange.fox","被队列 Q1 接收到");
bindingKeyMap.put("lazy.brown.fox","被队列 Q2 接收到");
bindingKeyMap.put("lazy.pink.rabbit","虽然满足两个绑定但只被队列 Q2 接收一次");
bindingKeyMap.put("quick.brown.fox","不匹配任何绑定不会被任何队列接收到会被丢弃");
bindingKeyMap.put("quick.orange.male.rabbit","是四个单词不匹配任何绑定会被丢弃");
bindingKeyMap.put("lazy.orange.male.rabbit","是四个单词但匹配 Q2");
for (Map.Entry<String, String> bindingKeyEntry: bindingKeyMap.entrySet()){
String bindingKey = bindingKeyEntry.getKey();
String message = bindingKeyEntry.getValue();
channel.basicPublish(EXCHANGE_NAME,bindingKey, null,
message.getBytes("UTF-8"));
System.out.println("生产者发出消息" + message);
}
}
}
}
- 4.测试:分别启动生产者,消费者