【RabbitMQ六】——RabbitMQ主题模式(Topic)

news2024/9/28 15:21:21

RabbitMQ主题模式(通配符模式)

  • 前言
  • 什么是Topic模式
  • 使用Topic模式的要点
  • 通配符规则
    • 示例
  • 代码示例
    • Pom文件引入RabbtiMQ依赖
    • RabbitMQ工具类
    • 生产者
    • 消费者1
    • 消费者2
    • 效果
  • 总结

前言

通过本篇博客能够简单使用RabbitMQ的主题模式。
本篇博客主要是博主通过官网总结出的RabbitMQ主题模式。其中如果有误欢迎大家及时指正。

什么是Topic模式

Topic模式与Direct模式相比,他们都可以根据Routing key把消息路由到对应的队列上,但是Topic模式相较于Direct来说,它可以基于多个标准进行路由。也就是在队列绑定Routing key的时候使用通配符。这使我们相较于Direct模式灵活性更大。

使用Topic模式的要点

routing key必须是由"."进行分隔的单词列表,最大限制为255字节

通配符规则

  • "*"可以代替一个单词。
  • "#"可以代替零个或多个单词。

示例

创建了三个绑定:Q1绑定了绑定键“.orange”。和Q2的".*.rabbit"和“lazy.#”。
在这里插入图片描述

1.一个消息的路由键为"quick.orange.rabbit" 时,它将会被送到队列Q1和Q2。
2.一个消息的路由键为"quick.orange.fox"时,它将会背诵到队列Q1
3.一个消息的路由键为"lazy.brown.fox"时,它将被送到队列Q2
4.一个消息的路由键为"quick.brown.fox",没有匹配任何队列,消息将会丢失。
5.一个消息的路由键为"lazy.orange.new.rabbit",它将被送到队列Q2.
6.一个消息的路由键为"orang"或者"quick.orange.new.rabbit"没有匹配到任何队列消息将丢失。

代码示例

Pom文件引入RabbtiMQ依赖

<dependency>
            <groupId>com.rabbitmq</groupId>
            <artifactId>amqp-client</artifactId>
            <version>5.10.0</version>
        </dependency>

RabbitMQ工具类

import com.rabbitmq.client.Channel;
import com.rabbitmq.client.Connection;
import com.rabbitmq.client.ConnectionFactory;

import java.io.IOException;
import java.util.concurrent.TimeoutException;

/**
 * @author : [WangWei]
 * @version : [v1.0]
 * @className : RabbitMQUtils
 * @description : [rabbitmq工具类]
 * @createTime : [2023/1/17 8:49]
 * @updateUser : [WangWei]
 * @updateTime : [2023/1/17 8:49]
 * @updateRemark : [描述说明本次修改内容]
 */
public class RabbitMQUtils {

    /*
     * @version V1.0
     * Title: getConnection
     * @author Wangwei
     * @description 创建rabbitmq连接
     * @createTime  2023/1/17 8:52
     * @param []
     * @return com.rabbitmq.client.Connection
     */
    public static Connection getConnection() throws IOException, TimeoutException {
        ConnectionFactory factory = new ConnectionFactory();
        factory.setHost("ip");
        factory.setPort(5672);
        factory.setVirtualHost("虚拟主机");
        factory.setUsername("用户名");
        factory.setPassword("密码");
        //创建连接
        Connection connection=factory.newConnection();
        return  connection;
    }

    /*
     * @version V1.0
     * Title: getChannel
     * @author Wangwei
     * @description 创建信道
     * @createTime  2023/1/17 8:55
     * @param []
     * @return com.rabbitmq.client.Channel
     */
    public static Channel getChannel() throws IOException, TimeoutException {
        Connection connection=getConnection();
        Channel channel=connection.createChannel();
        return channel;
    }
}



生产者

import com.rabbitmq.client.Channel;

import java.io.IOException;
import java.nio.charset.StandardCharsets;
import java.util.concurrent.TimeoutException;

/**
 * @author : [WangWei]
 * @version : [v1.0]
 * @className : Producer
 * @description : [生产者]
 * @createTime : [2023/2/1 9:38]
 * @updateUser : [WangWei]
 * @updateTime : [2023/2/1 9:38]
 * @updateRemark : [描述说明本次修改内容]
 */
public class Producer {
    private static final String EXCHANGE_NAME = "topic_logs";

    public static void main(String[] args) throws IOException, TimeoutException {
        //建立连接
        RabbitMQUtils.getConnection();
        //声明通道
        Channel channel = RabbitMQUtils.getChannel();
        //创建topic类型交换机并命名为logs
        channel.exchangeDeclare(EXCHANGE_NAME,"topic");

        //声明routingKey
        String severityInfo="info.log.test";
        String severityError="error.test";
        String severityError2="log.error.test";
        //循环发送2条消息
        for (int i = 0; i <2 ; i++) {
            String msg="info.log.test:"+i;
            /*推送消息
             *交换机命名,不填写使用默认的交换机
             * routingKey -路由键-
             * props:消息的其他属性-路由头等正文
             * msg消息正文
             */
            channel.basicPublish(EXCHANGE_NAME,severityInfo,null,msg.getBytes(StandardCharsets.UTF_8));
            System.out.println(msg);
        }
        //循环发送2条消息
        for (int i = 0; i <2 ; i++) {
            String msg="主题模式error.test:"+i;
            /*推送消息
             *交换机命名,不填写使用默认的交换机
             * routingKey -路由键-
             * props:消息的其他属性-路由头等正文
             * msg消息正文
             */
            channel.basicPublish(EXCHANGE_NAME,severityError,null,msg.getBytes(StandardCharsets.UTF_8));
            System.out.println(msg);
        }
        //循环发送2条消息
        for (int i = 0; i <2 ; i++) {
            String msg="log.error.test:"+i;
            /*推送消息
             *交换机命名,不填写使用默认的交换机
             * routingKey -路由键-
             * props:消息的其他属性-路由头等正文
             * msg消息正文
             */
            channel.basicPublish(EXCHANGE_NAME,severityError2,null,msg.getBytes(StandardCharsets.UTF_8));
            System.out.println(msg);
        }
    }
}

消费者1

import com.rabbitmq.client.Channel;
import com.rabbitmq.client.DeliverCallback;

import java.io.IOException;
import java.util.concurrent.TimeoutException;

/**
 * @author : [WangWei]
 * @version : [v1.0]
 * @className : ConsumerOne
 * @description : [消费者1]
 * @createTime : [2023/2/1 9:39]
 * @updateUser : [WangWei]
 * @updateTime : [2023/2/1 9:39]
 * @updateRemark : [描述说明本次修改内容]
 */
public class ConsumerOne {
    private static final String EXCHANGE_NAME = "topic_logs";

    public static void main(String[] args) throws IOException, TimeoutException {
        RabbitMQUtils.getConnection();
        Channel channel = RabbitMQUtils.getChannel();

        channel.exchangeDeclare(EXCHANGE_NAME,"topic");
        String queueName = channel.queueDeclare().getQueue();
        //声明routingKey (error)
        String severityError="error.*";
        //交换机与队列进行绑定-如果没有队列与交换机进行绑定,那么消费者接受不到生产者的消息,消息会丢失
        //queueName绑定了direct_logs交换机并且绑定了routingKey
        channel.queueBind(queueName, EXCHANGE_NAME,severityError );

        //因为Rabbitmq服务器将异步地向我们推送消息,所以我们以对象的形式提供了一个回调,该回调将缓冲消息,直到我们准备好使用它们。
        DeliverCallback deliverCallback = (consumerTag, delivery) -> {
            String message = new String(delivery.getBody(), "UTF-8");
            System.out.println(" [x] Received '" + message + "'");
        };
        channel.basicConsume(queueName, true, deliverCallback, consumerTag -> { });
    }
}

消费者2

import com.rabbitmq.client.Channel;
import com.rabbitmq.client.DeliverCallback;

import java.io.IOException;
import java.util.concurrent.TimeoutException;

/**
 * @author : [WangWei]
 * @version : [v1.0]
 * @className : ConsumerTwo
 * @description : [消费者2]
 * @createTime : [2023/2/1 9:38]
 * @updateUser : [WangWei]
 * @updateTime : [2023/2/1 9:38]
 * @updateRemark : [描述说明本次修改内容]
 */
public class ConsumerTwo {
    private static final String EXCHANGE_NAME = "topic_logs";

    public static void main(String[] args) throws IOException, TimeoutException {
        RabbitMQUtils.getConnection();
        Channel channel = RabbitMQUtils.getChannel();
        //创建fanout类型交换机并命名为logs
        channel.exchangeDeclare(EXCHANGE_NAME,"topic");
        //创建了一个非持久的、排他的、自动删除的队列,并生成了一个名称
        String queueName = channel.queueDeclare().getQueue();
        //声明routingKey (info,error,warning)
        String severityInfo="info.#";
        String severityError="*.error.*";
        //交换机与队列进行绑定-如果没有队列与交换机进行绑定,那么消费者接受不到生产者的消息,消息会丢失
        //queueName绑定了direct_logs交换机并且绑定了3个routingKey
        channel.queueBind(queueName, EXCHANGE_NAME,severityInfo );
        channel.queueBind(queueName, EXCHANGE_NAME,severityError );
        //因为Rabbitmq服务器将异步地向我们推送消息,所以我们以对象的形式提供了一个回调,该回调将缓冲消息,直到我们准备好使用它们。
        DeliverCallback deliverCallback = (consumerTag, delivery) -> {
            String message = new String(delivery.getBody(), "UTF-8");
            System.out.println(" [x] Received '" + message + "'");
        };
        channel.basicConsume(queueName, true, deliverCallback, consumerTag -> { });
    }

}

效果

在这里插入图片描述
在这里插入图片描述
在这里插入图片描述

总结

通过使用通配符实现灵活性的应用有很多,例如nginx的请求转发,gateway为请求过滤等等都是使用了统配符的技术。通过这种联想来对知识进行结构化,找相同和不同,思考能力和学习力也会有很大的提高。

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.coloradmin.cn/o/353951.html

如若内容造成侵权/违法违规/事实不符,请联系多彩编程网进行投诉反馈,一经查实,立即删除!

相关文章

K8s集群部署(kubeadm安装部署详细手册)

1、简介 K8s部署主要有两种方式&#xff1a;1、Kubeadm Kubeadm是一个K8s部署工具&#xff0c;提供kubeadm init和kubeadm join&#xff0c;用于快速部署Kubernetes集群。 2、二进制 从github下载发行版的二进制包&#xff0c;手动部署每个组件&#xff0c;组成Kubernetes集群。…

【C++进阶】二、多态详解(总)

目录 一、多态的概念 二、多态的定义及实现 2.1 多态的构成条件 2.2 虚函数 2.3 虚函数的重写 2.4 虚函数重写的两个例外 2.4.1 协变 2.4.2 析构函数的重写 2.5 C11 override 和 final 2.5.1 final 2.5.2 override 2.6 重载、覆盖(重写)、隐藏(重定义)的对比 三、…

【C++】类与对象 (四)初始化列表 static成员 友元 内部类 匿名对象 拷贝对象时的一些编译器优化

前言 本章就是我们C中类与对象的终章了&#xff0c;不过本章的难度不大&#xff0c;都是类中一些边边角角的知识&#xff0c;记忆理解就行了&#xff0c;相信经过这么长时间的学习类与对象&#xff0c;你对面向对象也有了更加深的理解&#xff0c;最后我们学习完边边角角的一些…

2022黑马Redis跟学笔记.实战篇(五)

2022黑马Redis跟学笔记.实战篇 五4.5 Redis实现秒杀优化4.5.1 基于Redis实现秒杀减库存6.1 秒杀优化-异步秒杀思路4.5.2 基于Redis的一人一单限制4.5.3 基于阻塞队列的异步下单4.6 秒杀的异步优化4.6.1.基于消息队列的异步下单思路4.6.2.基于List结构的消息队列4.6.3.基于PubSu…

[答疑]经营困难时期谈建模和伪创新-长点心和长点良心

leonll 2022-11-26 9:53 我们今年真是太难了……&#xff08;此处删除若干字&#xff09;……去年底就想着邀请您来给我们讲课&#xff0c;现在也没有实行。我想再和我们老大提&#xff0c;您觉得怎么说个关键理由&#xff0c;这样的形势合适引进UML开发流程&#xff1f; UML…

ESXi Args勒索病毒来袭,VMware ESXi用户需提高警惕

近日&#xff0c;多国通报了一项名为“ESXi Args”的勒索软件活动。ESXi Args主要针对VMware ESXi服务器进行攻击&#xff0c;利用ESXi 服务器中的已知漏洞&#xff0c;获取访问权限并部署勒索软件&#xff0c;对ESXi服务器内配置文件进行加密并发送赎金票据。 当前&#xff0…

从零到1构建可发布的npm包

本文将介绍通过 rollup, 从零开始构建一个简易的可发布的npm包。本文可实现的目标如下&#xff1a; 通过 rollup进行构建支持 Typescript支持 npm 方式安装支持 cdn 方式&#xff0c;在页面中引入支持本地调试可发布到npm 一、从 package 开始项目分析 首先&#xff0c;在终…

港科夜闻|香港科大与香港科大(广州)两校交流开启新篇章

关注并星标每周阅读港科夜闻建立新视野 开启新思维1、香港科大与香港科大(广州)两校交流开启新篇章。2月10日&#xff0c;香港科技大学校董会主席廖长城先生、校董会副主席杨佳锠先生、校长叶玉如院士一行到访香港科大(广州)&#xff0c;共商“香港科大一体、双校互补”框架下的…

go gin学习记录3

环境 环境&#xff1a;mac m1&#xff0c;go version 1.17.2&#xff0c; goland&#xff0c; mysql 安装gorm 第二节学习了在gin中使用go的原生SQL进行操作&#xff0c;这节学习一下使用orm。 go的orm包有很多&#xff0c;gorm是使用较多较广的&#xff0c;所以我们就用gor…

车辆逆行识别检测预警算法 yolov5

车辆逆行识别检测预警算法通过Pythonyolov5网络模型计算机算法技术&#xff0c;车辆逆行识别检测预警算法对道路来往行驶车辆出现逆行行为及时预警存档。Python是一种由Guido van Rossum开发的通用编程语言&#xff0c;它很快就变得非常流行&#xff0c;主要是因为它的简单性和…

大数据之-Nifi-监控nifi数据流信息_监控数据来源_bub轻松复现---大数据之Nifi工作笔记0011

通过数据流功能可以轻松复现,数据的流向在某个时间点数据是怎么流动的,出现了什么问题,太强大了.. 真的是,可以看到通过右键,处理器,打开view data province就可以看到, 上面是处理器处理数据的详细信息 点击左侧的详情图标可以查看详情信息,details是这个事件处理的内容详情,…

【计算机网络】运输层

文章目录运输层概述运输层端口号、复用与分用的概念UDP和TCP的对比TCP的流量控制TCP的拥塞控制TCP超时重传时间的选择TCP可靠传输的实现TCP的运输连接管理TCP的连接建立(3次握手)TCP的连接释放(4次挥手)TCP报文段的首部格式运输层概述 这里我们对运输层进行概述&#xff0c;之…

【双指针问题】LeetCode 925. 长按键入

Halo&#xff0c;这里是Ppeua。平时主要更新C语言&#xff0c;C&#xff0c;数据结构算法......感兴趣就关注我吧&#xff01;你定不会失望。 &#x1f308;个人主页&#xff1a;主页链接 &#x1f308;算法专栏&#xff1a;专栏链接 我会一直往里填充内容哒&#xff01; &…

【C++】类型转化

&#x1f308;欢迎来到C专栏~~类型转化 (꒪ꇴ꒪(꒪ꇴ꒪ )&#x1f423;,我是Scort目前状态&#xff1a;大三非科班啃C中&#x1f30d;博客主页&#xff1a;张小姐的猫~江湖背景快上车&#x1f698;&#xff0c;握好方向盘跟我有一起打天下嘞&#xff01;送给自己的一句鸡汤&…

Python-第九天 Python异常、模块与包

Python-第九天 Python异常、模块与包一、了解异常1. 什么是异常&#xff1a;2. bug是什么意思&#xff1a;二、异常的捕获方法1. 为什么要捕获异常&#xff1f;2. 捕获异常的语法3. 如何捕获所有异常&#xff1f;三、异常的传递性1.异常是具有传递性的四、Python模块1. 什么是模…

21. 合并两个有序链表

题目链接&#xff1a;解题思路&#xff1a;遍历&#xff0c;双指针&#xff1a;因为两个链表有序&#xff0c;所以只需要依次比较两个元素的大小&#xff0c;然后添加到新的链表中即可first指针指向第一个链表l1&#xff0c;second指针指向第二个链表l2&#xff0c;answer保存合…

Python3 JSON 数据解析

Python3 JSON 数据解析 JSON (JavaScript Object Notation) 是一种轻量级的数据交换格式。 Python3 中可以使用 json 模块来对 JSON 数据进行编解码&#xff0c;它包含了两个函数&#xff1a; json.dumps(): 对数据进行编码。json.loads(): 对数据进行解码。 在 json 的编解码…

CleanMyMac4.12.5最新版安装下载教程

告别硬盘空间不足&#xff0c;让您的Mac极速如新CleanMyMac是一款强大的 Mac 清理、加速工具和健康卫士&#xff0c;让您的 Mac 加快启动速度。CleanMyMac是一款专业的Mac清理软件&#xff0c;可智能清理mac磁盘垃圾和多余语言安装包&#xff0c;快速释放电脑内存&#xff0c;轻…

01:入门篇 - 初识 CTK

作者: 一去、二三里 个人微信号: iwaleon 微信公众号: 高效程序员 CTK 是什么 CTK:支持生物医学图像计算的公共开发包 CTK 全称:Common ToolkitCTK 主页:http://www.commontk.org/Github 地址:https://github.com/commontkCTK 标志 Logo 是一个品牌的形象,对外它传递的…

关于 Docke r安装 Redis 的评论区问题总结及解答

前言&#xff1a; 文章链接&#xff1a;史上最详细Docker安装Redis &#xff08;含每一步的图解&#xff09;实战 原文标题只是想让我这篇文章&#xff0c;能够多得到大家的一些关注&#xff0c;事实证明它做到了&#xff0c;当然看到收藏量的那一刻&#xff0c;我也明白&…