rabbitmq+netcore6 【5】Topics:主题

news2025/2/23 23:49:00

文章目录

    • 1)前言
    • 2)Topic exchange 主题交换机
    • 3)举例
    • 4)总结
    • 5)综合以上代码
      • 准备工作
      • 生产者
      • 消费者1
      • 消费者2
      • 结果验证

官网参考链接: https://www.rabbitmq.com/tutorials/tutorial-five-dotnet.html
其他人的翻译版参考: https://www.cnblogs.com/grayguo/p/5581323.html
以下工作是本人在参考官网的教程下,结合自己的理解做的代码流程,更深刻的理解还需要参考官网进行学习哦

1)前言

在之前的系统中,我们改进了日志系统,使用direct 交换机代替fanout交换机可以实现选择性的接受日志,但是还是有局限性如他不能根据多个条件来进行路由。在我们的日志系统中,我们不仅希望他能够根据日志的严重程度来进行订阅,还想根据发送日志的源(即日志的生产者)来进行订阅。

你也许已经通过unix 的工具syslog知道了这个概念,它同时通过级别(info/warn/crit…)和源(auth/cron/kern…)。这种方式给了我们很大的灵活性–我们可以同时监听来自cron的严重级别的错误消息以及来自kern的所有消息。为了在我们的系统中实现这种功能,我们需要学习更为复杂的交换机类型 –topic

注:交换机类型分为 direct,topic,headers,fanout

2)Topic exchange 主题交换机

发送到topic的交换机的消息不能带有随意写法的routing_key,下面是之前的写法,可以看出routingKey是按照我们的想法随便起的队列的名字。

var message = GetMessage(args);
var body = Encoding.UTF8.GetBytes(message);
channel.BasicPublish(exchange: "logs",
                     routingKey: "task_queue3", // 这里
                     basicProperties: null,
                     body: body);

现在若exchange的类型为Topic,则routingKey必须是一串使用"."分隔开的单词串。这些单词可以是任意的单词但通常都是跟当前消息有关的一些功能,例如:“stock.usd.nyse”, “nyse.vmw”, “quick.orange.rabbit”.你可以指定任意多个字符但不能超过255字节。

bindingkey必须也是同样的格式,topic交换机背后的原理类似于direct交换机,两个都有这样的特性:带有特定routingKey(生产者的channel.BasicPublish时参数的routingKey)的消息会被发送到绑定了该bindingKey(消费者的channel.BasicPublish时参数的bindingKey)的队列。

channel.QueueBind(queue: queueName, 
				  exchange: "logs",
				  routingKey: "task_queue3");// 这里的routingKey为了区分称为bindingKey

需要注意的是对于bindingKey这里有两个特殊的情况:

  • "*“可以代表一个单词(是单词不是字符–即”."分割的单词)
  • "#"可以代表0个或多个单词

3)举例

在这里插入图片描述

在这个例子中,我们将要发送描述动物的消息,消息的routingkey将会有3部分组成,routingkey的第一个单词描述的是速度,第二个单词描述的是颜色,第三个单词是特殊描述: “..”。

我们创建了三个绑定,Q1队列的bingingkey为*.orange.*,Q2队列的bingingkey为*.*.rabbitlazy.#,换句话说Q1只关注颜色为orange的动物消息,Q2关注种族为rabbits的动物消息,速度为layz的动物消息。

  • routingkey为quick.orange.rabbit的消息会两个队列都接收
  • routingkey为lazy.orange.elephant的消息也会被转发到两个队列上
  • routingkey为quick.orange.fox的消息仅仅进入Q1
  • routingkey为 lazy.brown.fox的消息只能被Q2接收
  • routingkey为lazy.pink.rabbit的消息只会被转发到Q2上一次,即使它匹配上了两个bindingKey
  • routingkey为quick.brown.fox的消息不会匹配到任何绑定,所有消息将会被忽略

如果我们打破约定发送带有4个单词的消息将会发生什么,例如orange或者quick.orange.male.rabbit?答案是这些routingkey将不会匹配到任何的bindingkey,因此将会丢失。但是lazy.orange.new.rabbit因为最后一个词匹配上了Q2的bindingkey,所有可以被Q2接收。

4)总结

Topic 交换机功能非常强大,可以完成像其他交换机那样工作。

  • 当队列绑定了bindingkey为#时,不论routingkey是什么它都会接收所有的消息,功能就像fanout交换机那样
  • 当特殊的*#都没在bindingkey中使用时,其功能就像direct交换机那样

5)综合以上代码

我们将要在我们的日志系统中使用topic交换机,假设我们的日志系统的routingkey有2个单词:<facility>.<severity>
代码和之前的案例基本上是一样的。

准备工作

新建一个netcore6的控制台项目,添加RabbitMQ的包依赖

NuGet\Install-Package RabbitMQ.Client -Version 6.4.0

在这里插入图片描述
新建一个类MainClass,注释掉program.cs的代码,使MainClass中的tatic void Main(string[] args)作为程序的入口
按照此方法新建三个netcore6的控制台项目,分别代表生产者,消费者1,消费者2

此代码为本人的简化版与官网代码略有不同,生产者先输入routingKey再输入message;消费者先输入bindingKey才能接收消息。先运行两个消费者,再运行生产者

生产者

using RabbitMQ.Client;
using System;
using System.Collections.Generic;
using System.Linq;
using System.Text;
using System.Threading.Tasks;

namespace ProjectSentTopic
{
    public class MainClass
    {
        static void Main()
        {
            var connectionFactory = new ConnectionFactory()
            {
                HostName = "localhost",
                UserName = "lyh",
                Password = "1211"
            };
            using(var connection = connectionFactory.CreateConnection())
            {
                using(var channel = connection.CreateModel())
                {
                    channel.ExchangeDeclare(exchange:"topic_logs",type:ExchangeType.Topic);
                    var routingKey = Console.ReadLine();
                    var message = Console.ReadLine();
                    while (!string.IsNullOrWhiteSpace(routingKey) && !string.IsNullOrWhiteSpace(message))
                    {
                        var body = Encoding.UTF8.GetBytes(message);
                        channel.BasicPublish(exchange: "topic_logs", routingKey: routingKey, basicProperties: null, body: body);
                        Console.WriteLine("[x] Sent {0}:{1}", routingKey, message);
                        routingKey = Console.ReadLine();
                        message = Console.ReadLine();
                    }
                }
            }
        }
    }
}

消费者1

using RabbitMQ.Client;
using RabbitMQ.Client.Events;
using System;
using System.Collections.Generic;
using System.Linq;
using System.Text;
using System.Threading.Channels;
using System.Threading.Tasks;

namespace ProjectReceiveTopic
{
    public class MainClass
    {
        static void Main()
        {
            var connectionFactory = new ConnectionFactory()
            {
                HostName = "localhost",
                UserName = "lyh",
                Password = "1211"
            };
            using(var connection = connectionFactory.CreateConnection())
            {
                using(var channel = connection.CreateModel())
                {
                    channel.ExchangeDeclare(exchange: "topic_logs", type: ExchangeType.Topic);
                    var queueName = channel.QueueDeclare().QueueName;
                    var routingKey = Console.ReadLine(); // 输入队列绑定的bindingKey
                    channel.QueueBind(queue: queueName, exchange: "topic_logs", routingKey: routingKey);
                    var consumer = new EventingBasicConsumer(channel);
                    consumer.Received += (mdoel, ea) =>
                    {
                        var body = ea.Body.ToArray();
                        var message = Encoding.UTF8.GetString(body);
                        var routingKey = ea.RoutingKey;
                        Console.WriteLine(" [x1] bindingKey:{0}  received message:{1}", routingKey, message);
                    };
                    channel.BasicConsume(queue: queueName, autoAck: true, consumer: consumer);
                    Console.WriteLine(" Press [enter] to exit.");
                    Console.ReadLine();
                }
            }
        }
    }
}

消费者2

using RabbitMQ.Client;
using RabbitMQ.Client.Events;
using System;
using System.Collections.Generic;
using System.Linq;
using System.Text;
using System.Threading.Channels;
using System.Threading.Tasks;

namespace ProjectReceiveTopic2
{
    public class MainClass
    {
        static void Main()
        {
            var connectionFactory = new ConnectionFactory()
            {
                HostName = "localhost",
                UserName = "lyh",
                Password = "1211"
            };
            using (var connection = connectionFactory.CreateConnection())
            {
                using (var channel = connection.CreateModel())
                {
                    channel.ExchangeDeclare(exchange: "topic_logs", type: ExchangeType.Topic);
                    var queueName = channel.QueueDeclare().QueueName;
                    var routingKey = Console.ReadLine(); // 输入队列绑定的bindingKey
                    channel.QueueBind(queue: queueName, exchange: "topic_logs", routingKey: routingKey);
                    var consumer = new EventingBasicConsumer(channel);
                    consumer.Received += (mdoel, ea) =>
                    {
                        var body = ea.Body.ToArray();
                        var message = Encoding.UTF8.GetString(body);
                        var routingKey = ea.RoutingKey;
                        Console.WriteLine(" [x2] bindingKey:{0}  received message:{1}", routingKey, message);
                    };
                    channel.BasicConsume(queue: queueName, autoAck: true, consumer: consumer);
                    Console.WriteLine(" Press [enter] to exit.");
                    Console.ReadLine();
                }
            }
        }
    }
}

结果验证

(1)bindingKey为#kern.*

先运行消费者,后运行生产者。
消费者1输入 (接收所有消息)

#

消费者2输入 (接收源为kernel的所有消息)

kern.*

生产者 输入(发布源为kernel、严重度为fatal的消息 + 源为outter、严重度为info的消息)

kern.critical
A critical kernel error

outter.info
this is a infomation of lyh

当消费者的bindingKey为#时(如receive1),效果和fanout交换机一样;消费者的bindingKey为kern.*时(如receive2)只能接收源为kern的消息
在这里插入图片描述
(2)bindingKey为*.critical*.*

消费者1输入 (接收严重程度为critical的消息)

*.critical

消费者2输入 (接收所有消息,相当于#

*.*

生产者 输入(发布源为kernel、严重度为fatal的消息 + 源为outter、严重度为info的消息)

kern.critical
Another critical kernel error

outter.info
this is another infomation of lyh

在这里插入图片描述

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.coloradmin.cn/o/158969.html

如若内容造成侵权/违法违规/事实不符,请联系多彩编程网进行投诉反馈,一经查实,立即删除!

相关文章

臻图信息搭建智慧水务管理平台,保障供水安全运行

伴随着城市智慧化进程&#xff0c;供水系统也在朝着高度集成化、数字化、智能化的管理模式发展。在2022年&#xff0c;水利部也印发了相关水务保障规划&#xff0c;对供水工程的建设、运行管理、水源保护等管理提出了明确要求&#xff0c;采取物联网、互联网等措施&#xff0c;…

从“以旧换新”送手机看年礼消费新风尚

千门万户曈曈日&#xff0c;总把新桃换旧符。每年的春节&#xff0c;都是中国人辞旧迎新的重要时刻。在新春年礼的选择上&#xff0c;曾经的“烟酒糖茶”老四样正在逐渐被其他新潮年礼所替代&#xff0c;手机等诸多科技好物被纳入到送年礼清单。手机年货很“潮”&#xff0c;让…

Redis整理合集

SQL和NOSQL的区别?SQLNOSQL数据结构结构化非结构化数据关联关联的非关联的查询方式SQL查询非SQL查询事物特性ACID&#xff08;事务&#xff09;BASE存储方式磁盘内存扩展性垂直水平使用场景数据结构固定相对业务对数据的安全性一致性需求较高数据结构不固定对一致性、安全性需…

论文投稿指南——中文核心期刊推荐(地质学 2)

【前言】 &#x1f680; 想发论文怎么办&#xff1f;手把手教你论文如何投稿&#xff01;那么&#xff0c;首先要搞懂投稿目标——论文期刊 &#x1f384; 在期刊论文的分布中&#xff0c;存在一种普遍现象&#xff1a;即对于某一特定的学科或专业来说&#xff0c;少数期刊所含…

【性能优化】Mybatis Plus:优化查询速度 - SQL替换Service

优化查询速度 - SQL替换Service Service 接口问题 下面是原先的 Service 实现类代码&#xff0c;有门店 ID、订单状态、查询时间段&#xff0c;然后查出了所有的结果&#xff0c;继续使用 java8 的特性获取汇总结果&#xff0c;随着项目的推移&#xff0c;数据量越来越大&…

Webpack 中使用source map 在开发过程中进行调试

我们都知道webpack在打包的时候会将源代码打包成一个bundle文件&#xff0c;bundle文件就是经过了loader转换&#xff0c;还有webpack的一些插件处理&#xff0c;以及webpack构建过程中的一些转换&#xff0c;最后会生成一个大的JS文件&#xff0c;直接去看这个文件是没法调试的…

【React】一.React基本使用

目录 基本介绍 一.React基本使用 安装命令 使用方法 记录问题 使用React脚手架初始化项目 基本介绍 构建用户界面的js库用户界面可以理解为html页面&#xff08;前端&#xff09;react主要用来写html页面或者构建web应用只负责视图层&#xff08;V&#xff09;的渲染。&am…

【ROS2 入门】虚拟机环境 ubuntu 18.04 ROS2 安装

大家好&#xff0c;我是虎哥&#xff0c;从今天开始&#xff0c;我将花一段时间&#xff0c;开始将自己从ROS1切换到ROS2&#xff0c;做为有别于ROS1的版本&#xff0c;做了很多更新和改变&#xff0c;我还是很期待自己逐步去探索ROS2中的惊喜。在安装过程中我也遇到的一些坑&a…

阿里云服务器安装wireshark图形界面与远程连接配置(使用tigervnc)

tags: Server Ubuntu Wireshark 写在前面 昨天折腾了一下透视HTTP协议这门课的实验环境, 通过阿里云的轻量应用服务器来完成了, 但是还差一步, 那就是wireshark的安装, 虽然通过apt安装好了, 但是打不开实在是烦人, 后来经过各种搜索, 我发现问题出在了tightvnc上, 这个vnc服…

vue2中swiper6不能正常使用的解决

第一步安装swiper6 第二步在main.js中引入swiper6 注意&#xff1a;也可以在其他地方引入&#xff0c;但是在main.js中引入&#xff0c;所有的组件都能用swiper的样式 样式的引入&#xff0c;不是平常的引入&#xff0c;引入代码如下 注意&#xff1a;一定要这样引入样式 …

【自学Python】Python比较运算符

Python比较运算符 Python比较运算符教程 在 Python 中&#xff0c;比较运算符的结果都是 bool 型&#xff0c;也就是要么是 True&#xff0c;要么是 False。关系表达式经常用在 if 结构的条件中或 循环结构 的条件中。 Python比较运算符语法 比较运算符功能说明>大于如果…

正则表达式表单校验实例

描述 一个简单的注册页面&#xff0c;对输入框进行了简单的正则表达式校验 代码 <!DOCTYPE html> <html><head><meta charset"utf-8" /><title></title><link rel"stylesheet" href"css/index2.css" …

深入思考 Schema 管理的几个基本问题

本文作者&#xff1a;王大龙&#xff0c;数据分析领域资深工程师&#xff0c;观远产品中一切数据的风暴降生之主&#xff0c;元数据世界的精神领袖&#xff0c;数据治理的永恒守望者。前言我发现理解某一个具体「事物」最好的方式是先去理解其背后所遵循的「范式」。范式是一个…

java-Spring集成定时器使用方法

文章目录定时器配置文件测试文件配置参数说明SpringBoot集成schedulepom.xml文件启动类运行结果定时器配置文件 spring核心配置文件 <?xml version"1.0" encoding"UTF-8"?> <beans:beans xmlns:xsi"http://www.w3.org/2001/XMLSchema-in…

python笔记之转义问题 字符串前缀 正则表达式

Python的字符串自己也用\转义 s ABC\\-001 # Python的字符串 # 对应的正则表达式字符串变成&#xff1a; # ABC\-001建议使用Python的r前缀&#xff0c;就不用考虑转义的问题了 s rABC\-001 # Python的字符串 # 对应的正则表达式字符串不变&#xff1a; # ABC\-001关于斜杠…

RGB、LVDS、MIPI和EDP接口液晶屏

RGB、LVDS、MIPI和EDP接口液晶屏概述一、RGB_TTL二、LVDS三、MIPI&#xff08;手机、平板等数码产品应用场合&#xff09;四、eDP接口&#xff08;笔记本、工控机、工业平板等应用场合&#xff0c;用来取代LVDS接口&#xff09;五、应用概述 液晶屏有RGB、LVDS、MIPI DSI和EDP等…

《MYSQL实战45讲》笔记(1-10)

1&#xff1a;一条SQL查询语句是如何执行的&#xff1f; 下面我们来结合一张图来了解MySQL的基本架构 总体来看&#xff0c;MySQL分为服务层和存储引擎两个部分。其中存储引擎负责数据的存储和提取&#xff0c;而服务层负责连接的建立、分析、优化、执行等其他步骤。 常见的…

虚幻引擎中GPU Lightmass全局光照的使用步骤

GPU Lightmass (GPULM) 是一种光烘焙方法&#xff0c;它预先计算来自具有 Stationary 或 Static 移动性的灯光的复杂光交互&#xff0c;并将该数据存储在创建的应用于场景几何体的光照贴图纹理中。GPU Lightmass 显着减少了为复杂场景计算、构建和生成光照数据所需的时间&#…

MQTT QoS 0, 1, 2 介绍

什么是 QoS 很多时候&#xff0c;使用 MQTT 协议的设备都运行在网络受限的环境下&#xff0c;而只依靠底层的 TCP 传输协议&#xff0c;并不能完全保证消息的可靠到达。因此&#xff0c;MQTT 提供了 QoS 机制&#xff0c;其核心是设计了多种消息交互机制来提供不同的服务质量&…

自监督学习之掩码自动编码器(Masked Autoencoders, MAE)——音频识别方面

自监督学习之掩码自动编码器(Masked Autoencoders, MAE)——音频识别方面 1.参考文献 《Masked Autoencoders that Listen》 2.背景 Transformers和self-supervised learning(自监督学习)占据了计算机视觉(Computer Vision,CV)和自然语言处理(natural language processing, …