rabbitmq+netcore6 【4】Routing:路由

news2025/1/24 8:50:30

文章目录

    • 1)前言
    • 2)Direct exchange 直接类型的交换机
    • 3)Multiple bindings 多绑定
    • 4)Emitting logs 发送日志
    • 5)Subscribing 订阅
    • 6)综合以上代码
      • 准备工作
      • 生产者
      • 消费者1
      • 消费者2
      • 消费者3
      • 运行结果

官网参考链接: https://www.rabbitmq.com/tutorials/tutorial-four-dotnet.html
其他人的翻译版参考: https://www.cnblogs.com/grayguo/p/5377552.html
以下工作是本人在参考官网的教程下,结合自己的理解做的代码流程,更深刻的理解还需要参考官网进行学习哦

1)前言

在上一篇文章中我们构建了一个简单的日志系统,我们可以向多个接受者广播消息。

在本文章中,我们将为其添加一个功能使得针对部分消息的接受成为可能。例如:我们将能够将关键错误消息定向到日志文件(即存到磁盘),同时仍然能够在控制台上打印所有日志消息(不对非错误信息进行存储,节约了磁盘空间)

上一篇文章中,我们写了如下的绑定,将logs交换机绑定给所有的队列,这样所有的队列都能接收到转发的消息

channel.QueueBind(queue: queueName,
                  exchange: "logs",
                  routingKey: "");

若想指定某队列接收消息需要设定参数routingKey的值,而不是向上面那样指定为空,为了避免和BasicPublish 方法的参数混淆,我们暂且称之为binding key,下面是我们创建一个带有指定binding key的绑定:

channel.QueueBind(queue: queueName,
                  exchange: "direct_logs", // 交换机的名字也进行了修改
                  routingKey: "black");

2)Direct exchange 直接类型的交换机

我们之前的日志系统,把接受到的消息广播给所有的接受者,我们将要扩展它使得其能够根据消息的级别来过滤发送消息,例如我们想让记录日志的接受者仅仅接受严重性级别的错误消息,而不用在警告和信息级别的消息上浪费磁盘空间。

我们知道交换机类型分为 directtopicheadersfanout 。其中direct类型的交换机会将消息会被发送到其binding key 和消息的routing key 完全匹配的队列上,所以可以使用该类型的交换机实现消息的过滤。
在这里插入图片描述
一个带有routing key为"orange"的消息,会被路由到队列Q1上;带有routing key为"black" 或 "green"的消息将会被路由到队列Q2上。

3)Multiple bindings 多绑定

在这里插入图片描述
使用一个binding key 绑定多个队列完全是合法的,在我们案例中我们可以在 路由X 和 队列Q1,Q2中同时添加一个binding key为"black"的绑定,在这种情况下路由 X 将会像 fanout交换机一样把匹配到的消息发送给所有的接受者即,路由会把binding key 为"black"的消息发送给Q1和Q2。

4)Emitting logs 发送日志

我们把日志级别作为路由的rouing key,这样接收者就可以根据日志级别选择接受其感兴趣的日志。
首先我们需要创建一个交换机

channel.ExchangeDeclare(exchange: "direct_logs", type: "direct");

然后准备发送的消息, 其中severity变量 分为以下几种:“info”,“warning”,“error”.
在这里插入图片描述

var body = Encoding.UTF8.GetBytes(message);
channel.BasicPublish(exchange: "direct_logs",
                     routingKey: severity,
                     basicProperties: null,
                     body: body);

之前的写法如下(可以对比来看):
在这里插入图片描述

5)Subscribing 订阅

接受消息和之前的一样,唯一的区别就是我们将会为我们感兴趣的每一个级别的消息新建绑定:

var queueName = channel.QueueDeclare().QueueName;

foreach(var severity in args)
{
    channel.QueueBind(queue: queueName,
                      exchange: "direct_logs",
                      routingKey: severity); // 添加了bindingKey,用来指定队列
}

6)综合以上代码

准备工作

新建一个netcore6的控制台项目,添加RabbitMQ的包依赖

NuGet\Install-Package RabbitMQ.Client -Version 6.4.0

在这里插入图片描述
新建一个类MainClass,注释掉program.cs的代码,使MainClass中的tatic void Main(string[] args)作为程序的入口
按照此方法新建三个netcore6的控制台项目,分别代表生产者,消费者1,消费者2,消费者3

此代码为本人的简化版与官网代码略有不同,先运行两个消费者(一个只接收error信息,另一个只接收info信息),再运行生产者(生产者要先输入信息来源回车,再输入具体信息)

生产者

using RabbitMQ.Client;
using System;
using System.Collections.Generic;
using System.Linq;
using System.Text;
using System.Threading.Tasks;

namespace ProjectSendDirect
{
    public class MainClass
    {
        static void Main()
        {
            var factory = new ConnectionFactory()
            {
                HostName = "localhost",
                UserName = "lyh",
                Password = "1211",
            };
            using(var connection = factory.CreateConnection())
            {
                using(var channel = connection.CreateModel())
                {
                    channel.ExchangeDeclare(exchange:"direct_logs", type:ExchangeType.Direct);
                    var tmp = Console.ReadLine();
                    var severity = string.IsNullOrEmpty(tmp)?"info":tmp;
                    var message = Console.ReadLine();
                    while(!string.IsNullOrEmpty(severity) && !string.IsNullOrEmpty(message))
                    {
                        var body = Encoding.UTF8.GetBytes(message);
                        channel.BasicPublish(exchange:"direct_logs",routingKey:severity,basicProperties:null,body:body);
                        Console.WriteLine("[x] Sent {0}:{1}", severity, message);
                        tmp = Console.ReadLine();
                        severity = string.IsNullOrEmpty(tmp) ? "info" : tmp;
                        message = Console.ReadLine();
                    }
                    Console.WriteLine("Press [Enter] to exit");
                    Console.ReadLine();
                }
            }

        }
    }
}

消费者1

using RabbitMQ.Client;
using RabbitMQ.Client.Events;
using System;
using System.Collections.Generic;
using System.Linq;
using System.Text;
using System.Threading.Tasks;

namespace ProjectReceiveDirect1
{
    public class MainClass
    {
        static void Main()
        {
            var factory = new ConnectionFactory()
            {
                HostName = "localhost",
                UserName = "lyh",
                Password = "1211"
            };
            using(var connection = factory.CreateConnection())
            {
                using(var channel = connection.CreateModel())
                {
                    channel.ExchangeDeclare(exchange: "direct_logs", type: ExchangeType.Direct);
                    var queueName = channel.QueueDeclare().QueueName;

                    channel.QueueBind(queue: queueName, exchange: "direct_logs", routingKey: "error");
                    Console.WriteLine("[x1] Waiting for message.");
                    var consumer = new EventingBasicConsumer(channel);
                    consumer.Received += (model, ea) =>
                    {
                        var body = ea.Body.ToArray();
                        var message = Encoding.UTF8.GetString(body);
                        var routingKey = ea.RoutingKey;
                        Console.WriteLine("[x1] Received {0}:{1}", routingKey, message);
                    };
                    channel.BasicConsume(queue: queueName, autoAck: true, consumer: consumer);
                    Console.WriteLine("Press [enter] to exit");
                    Console.ReadLine();
                }
            }
        }
    }
}

消费者2

基本与消费者一致,只有输出部分有所不同

using RabbitMQ.Client;
using RabbitMQ.Client.Events;
using System;
using System.Collections.Generic;
using System.Linq;
using System.Text;
using System.Threading.Tasks;

namespace ProjectReceiveDirect2
{
    public class MainClass
    {
        static void Main()
        {
            var factory = new ConnectionFactory()
            {
                HostName = "localhost",
                UserName = "lyh",
                Password = "1211"
            };
            using (var connection = factory.CreateConnection())
            {
                using (var channel = connection.CreateModel())
                {
                    channel.ExchangeDeclare(exchange: "direct_logs", type: ExchangeType.Direct);
                    var queueName = channel.QueueDeclare().QueueName;

                    channel.QueueBind(queue: queueName, exchange: "direct_logs", routingKey: "info");
                    Console.WriteLine("[x2] Waiting for message.");
                    var consumer = new EventingBasicConsumer(channel);
                    consumer.Received += (model, ea) =>
                    {
                        var body = ea.Body.ToArray();
                        var message = Encoding.UTF8.GetString(body);
                        var routingKey = ea.RoutingKey;
                        Console.WriteLine("[x2] Received {0}:{1}", routingKey, message);
                    };
                    channel.BasicConsume(queue: queueName, autoAck: true, consumer: consumer);
                    Console.WriteLine("Press [enter] to exit");
                    Console.ReadLine();
                }
            }
        }
    }
}

消费者3

与前两个消费者基本一致,只有输出略有不同

using RabbitMQ.Client;
using RabbitMQ.Client.Events;
using System;
using System.Collections.Generic;
using System.Linq;
using System.Text;
using System.Threading.Tasks;

namespace ProjectReceiveDirect3
{
    public class MainClass
    {
        static void Main()
        {
            var factory = new ConnectionFactory()
            {
                HostName = "localhost",
                UserName = "lyh",
                Password = "1211"
            };
            using (var connection = factory.CreateConnection())
            {
                using (var channel = connection.CreateModel())
                {
                    channel.ExchangeDeclare(exchange: "direct_logs", type: ExchangeType.Direct);
                    var queueName = channel.QueueDeclare().QueueName;

                    channel.QueueBind(queue: queueName, exchange: "direct_logs", routingKey: "info");
                    Console.WriteLine("[x3] Waiting for message.");
                    var consumer = new EventingBasicConsumer(channel);
                    consumer.Received += (model, ea) =>
                    {
                        var body = ea.Body.ToArray();
                        var message = Encoding.UTF8.GetString(body);
                        var routingKey = ea.RoutingKey;
                        Console.WriteLine("[x3] Received {0}:{1}", routingKey, message);
                    };
                    channel.BasicConsume(queue: queueName, autoAck: true, consumer: consumer);
                    Console.WriteLine("Press [enter] to exit");
                    Console.ReadLine();
                }
            }
        }
    }
}

运行结果

验证:一个交换机可以绑定多个队列,绑定相同routingkey(如info)的队列接收到相同的消息
在这里插入图片描述
验证:一个交换机可以绑定多个队列,绑定多个routingkey(如info、error)该队列接收到两个routingkey绑定的消息。
在这里插入图片描述

在这里插入图片描述

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.coloradmin.cn/o/150533.html

如若内容造成侵权/违法违规/事实不符,请联系多彩编程网进行投诉反馈,一经查实,立即删除!

相关文章

USB大容量存储设备浅析

一 USB 设备类 SB 引入了设备类的概念,根据每一类驱动程序的功能将USB设备分为几大类,标准的几大类包括: 大容量存储类 网络类 集线器类 串行转换器 音频类 视频类 图像类 调制解调器 打印机 HID(Human Interface Device 人机接口设备)每一…

我记不住的那些编程语言的语法(数组)-1

背景:我记不住各种语言的语法,例如C、Java、Go、Python、JavaScript,大概就是常用的这几种语言,每种语言有其自己的语法规范,有的时候会记混了,所以想记录一下细节。这个系列会不定期的更新,本期…

一路坎坷,入局到突破【2022年度总结】

秃秃 1> 来到CSDN: 2019年5月时决定只身一人去外省实习,顺便在CSDN这个“资源库”注册了一下账号。直到我20年在公司做技术分享时,才真正开始在CSDN上的创作; 21年的时候也只是把CSDN当做笔记,就自己写一写&…

行业洞察|猴子可以打字,动物走进元宇宙还有多远?

很多学者和专家认为,人类与动物的区别在于语言的使用。人类可以使用语言,但是动物不会。其实也许是我们人类听不懂动物的语言,并不是他们不会使用。本质在于沟通的媒介不同,导致我们无法相互交流。但是,埃隆马斯克&…

App原型设计规范

一、界面尺寸 1.ios分辨率 2.android界面尺寸 ① 安卓分辨率 ②常见安卓手机分辨率及尺寸 一般情况下大家在设计app端原型的时候,由于现在ios和安卓慢慢在趋向一致,所以基本上都只会设计一套原型,尺寸方面一般都是按照iphone6的750*1344(2倍…

Django 数据备份dumpdata 踩的坑

项目背景: 项目使用的是sqlite数据库,要求备份除了网络表之外的所有数据 实施方案: python3 manage.py dumpdata --exclude network.TRoute --indent 2 --format json > aq3.json 方案操作结果是: 查看aq3.json如下&#xff…

一文详解GCC7、CUDA 11.2、CUDNN部署

在部署之前,需要了解下python-tensorflow-cuDNN-CUDA版本对应关系,以便能够完全兼容下文以此版本为例部署gcc-7.3.1gpu driver-460.106.00cuda-11.2cudnn-8.1.1一.gcc部署1.安装[rootgpu ~]# yum -y install centos-release-scl [rootgpu ~]# yum install devtoolse…

《收获,不止Oracle》索引细化

1.索引知识图框 2.索引探秘 2.1 BTREE索引 索引是建在表的具体列上的,其存在的目的是让表的查询变得更快,效率更高。表记录丢失关乎生死,而索引丢失只需重建即可。 索引却是数据库学习中最实用的技术之一。谁能深刻地理解和掌握索引的知识&…

Spring gateway websocket自定义负载均衡

业务需求 公司IM服务主要基于netty实现websocket,为保证在线用户channel通道畅通故一直使用单机运行。现由于公司业务增加需要增加IM集群,由于channel通道不能缓存,故急需一套可以完整兼容之前功能的方案。 技术选型 1、采用spring websocke…

Bonree ONE荣获信通院“2022IT新治理年度明星产品”

今日,由信通院主办的“GOLFIT新治理领导力论坛”正式召开,论坛上公布了2022IT新治理年度评选活动的结果,博睿数据一体化智能可观测平台Bonree ONE凭借卓越的产品力以及优秀的用户体验,从一众产品中脱颖而出,获得“2022…

spring 事务@Transantional 失效及解决方案和总结

1、线程中方法,事务会失效 2、线程中方法,事务会失效。即使在线程方法上增加Transactional注解 3、事务正常回滚,A方法调用B的普通方法 4、事务正常回滚。A方法调用B的private普通方法 6、会抛出NullPointerException异常。 Methods ann…

rock3a: 基于自建数据集+yolov5s模型的rknn模型训练部署全流程

上一篇文章其实已经详述了模型训练到部署的整个流程,但是数据集到模型都是用的官方的coco数据集,这里为了记录开发板的模型训练到部署的整个流程,重新开了一篇文章进行记录。 首先准备数据集和rockchip官方推荐的yolov5源代码 这里需要注意的…

基于Node.js Vue企业产品展示网站

摘 要随着信息技术和网络技术的飞速发展,人类已进入全新信息化时代,传统管理技术已无法高效,便捷地管理信息。为了迎合时代需求,优化管理效率,各种各样的管理系统应运而生,各行各业相继进入信息管理时代&am…

CAN201-Computer Network(2)

文章目录4. Network Layer4.1 Router4.1.1 Input port functions4.1.2 Destination-based forwarding4.1.3 Switching fabrics4.1.4 Input port queueing4.1.5 Output ports4.2 Internet Protocol4.2.1 IP fragmentation, reassembly4.3 IPv4 addressing4.3.1 Subnets4.3.2 Net…

高等数学(第七版)同济大学 习题11-4 个人解答

高等数学(第七版)同济大学 习题11-4 函数作图软件:Mathematica 1.设有一分布着质量的曲面Σ,在点(x,y,z)处它的面密度为μ(x,y,z),用对面积的曲面积分表示这曲面对于x轴的转动惯量.\begin{aligned}&1. \ 设有一分…

IB生物课程介绍与Topic 1: Cell Biology考点分享

准备让孩子就读国际学校或者孩子正在国际学校就读的家长肯定听说过“IB”或者“IB班”,那IB究竟是什么呢?IB与IB课程 IB是International Baccalaureate(国际文凭)的简称,其课程体系国际文凭大学预科课程(In…

Educational Codeforces Round 141 (Rated for Div. 2)A——C

ps:先自我检讨...自从世界杯开始后,就一直摆烂到现在。直到打了今年的第一场cf,看见打的这么菜,真是想remake/。后面我会陆陆续续的补完前段时间没有打的比赛... Dashboard - Educational Codeforces Round 141 (Rated for Div. …

ReentrantLock

目录 ReentrantLock ReentrantLock语法 ReentrantLock可重入 ReentrantLock可打断 ReentrantLock锁超时 ReentrantLock解决哲学家就餐问题 ReentrantLock公平锁 ReentrantLock条件变量 ReentrantLock ReentrantLock 相比于synchronized的特点 : 可中断:比如A线程拥有…

基于移动最小二乘法的曲线曲面拟合论文阅读笔记

基于移动最小二乘法的曲线曲面拟合论文阅读笔记 论文地址:http://www.cnki.com.cn/Article/CJFDTotal-GCTX200401016.htm 一、Problem Statement 传统的曲线(曲面)拟合方法一般使用最小二乘法, 通过使误差的平方和最小, 得到一个线性方程组&#xff0…

通过alist挂在阿里网盘的方法

1、在github官网https://github.com/alist-org/alist/releases/download/v3.8.0/alist-windows-amd64.zip下载alist软件客户端,双击运行,可以看到默认的密码和服务器地址,打开网页http://localhost:5244/manage/accounts,填写密码…