文章目录
- 1)前言
- 2)Topic exchange 主题交换机
- 3)举例
- 4)总结
- 5)综合以上代码
- 准备工作
- 生产者
- 消费者1
- 消费者2
- 结果验证
官网参考链接: https://www.rabbitmq.com/tutorials/tutorial-five-dotnet.html
其他人的翻译版参考: https://www.cnblogs.com/grayguo/p/5581323.html
以下工作是本人在参考官网的教程下,结合自己的理解做的代码流程,更深刻的理解还需要参考官网进行学习哦
1)前言
在之前的系统中,我们改进了日志系统,使用direct 交换机代替fanout交换机可以实现选择性的接受日志,但是还是有局限性如他不能根据多个条件来进行路由。在我们的日志系统中,我们不仅希望他能够根据日志的严重程度来进行订阅,还想根据发送日志的源(即日志的生产者)来进行订阅。
你也许已经通过unix 的工具syslog知道了这个概念,它同时通过级别(info/warn/crit…)和源(auth/cron/kern…)。这种方式给了我们很大的灵活性–我们可以同时监听来自cron的严重级别的错误消息以及来自kern的所有消息。为了在我们的系统中实现这种功能,我们需要学习更为复杂的交换机类型 –topic
注:交换机类型分为 direct,topic,headers,fanout
2)Topic exchange 主题交换机
发送到topic的交换机的消息不能带有随意写法的routing_key,下面是之前的写法,可以看出routingKey是按照我们的想法随便起的队列的名字。
var message = GetMessage(args);
var body = Encoding.UTF8.GetBytes(message);
channel.BasicPublish(exchange: "logs",
routingKey: "task_queue3", // 这里
basicProperties: null,
body: body);
现在若exchange的类型为Topic,则routingKey必须是一串使用"."分隔开的单词串。这些单词可以是任意的单词但通常都是跟当前消息有关的一些功能,例如:“stock.usd.nyse”, “nyse.vmw”, “quick.orange.rabbit”.你可以指定任意多个字符但不能超过255字节。
bindingkey必须也是同样的格式,topic交换机背后的原理类似于direct交换机,两个都有这样的特性:带有特定routingKey(生产者的channel.BasicPublish时参数的routingKey)的消息会被发送到绑定了该bindingKey(消费者的channel.BasicPublish时参数的bindingKey)的队列。
channel.QueueBind(queue: queueName,
exchange: "logs",
routingKey: "task_queue3");// 这里的routingKey为了区分称为bindingKey
需要注意的是对于bindingKey这里有两个特殊的情况:
- "*“可以代表一个单词(是单词不是字符–即”."分割的单词)
- "#"可以代表0个或多个单词
3)举例
在这个例子中,我们将要发送描述动物的消息,消息的routingkey将会有3部分组成,routingkey的第一个单词描述的是速度,第二个单词描述的是颜色,第三个单词是特殊描述: “..”。
我们创建了三个绑定,Q1队列的bingingkey为*.orange.*
,Q2队列的bingingkey为*.*.rabbit
和lazy.#
,换句话说Q1只关注颜色为orange的动物消息,Q2关注种族为rabbits的动物消息,速度为layz的动物消息。
- routingkey为
quick.orange.rabbit
的消息会两个队列都接收 - routingkey为
lazy.orange.elephant
的消息也会被转发到两个队列上 - routingkey为
quick.orange.fox
的消息仅仅进入Q1 - routingkey为
lazy.brown.fox
的消息只能被Q2接收 - routingkey为
lazy.pink.rabbit
的消息只会被转发到Q2上一次,即使它匹配上了两个bindingKey - routingkey为
quick.brown.fox
的消息不会匹配到任何绑定,所有消息将会被忽略
如果我们打破约定发送带有4个单词的消息将会发生什么,例如orange
或者quick.orange.male.rabbit
?答案是这些routingkey将不会匹配到任何的bindingkey,因此将会丢失。但是lazy.orange.new.rabbit
因为最后一个词匹配上了Q2的bindingkey,所有可以被Q2接收。
4)总结
Topic 交换机功能非常强大,可以完成像其他交换机那样工作。
- 当队列绑定了bindingkey为
#
时,不论routingkey是什么它都会接收所有的消息,功能就像fanout交换机那样 - 当特殊的
*
和#
都没在bindingkey中使用时,其功能就像direct交换机那样
5)综合以上代码
我们将要在我们的日志系统中使用topic交换机,假设我们的日志系统的routingkey有2个单词:<facility>.<severity>
代码和之前的案例基本上是一样的。
准备工作
新建一个netcore6的控制台项目,添加RabbitMQ的包依赖
NuGet\Install-Package RabbitMQ.Client -Version 6.4.0
新建一个类MainClass,注释掉program.cs的代码,使MainClass中的tatic void Main(string[] args)作为程序的入口
按照此方法新建三个netcore6的控制台项目,分别代表生产者,消费者1,消费者2
此代码为本人的简化版与官网代码略有不同,生产者先输入routingKey再输入message;消费者先输入bindingKey才能接收消息。先运行两个消费者,再运行生产者
生产者
using RabbitMQ.Client;
using System;
using System.Collections.Generic;
using System.Linq;
using System.Text;
using System.Threading.Tasks;
namespace ProjectSentTopic
{
public class MainClass
{
static void Main()
{
var connectionFactory = new ConnectionFactory()
{
HostName = "localhost",
UserName = "lyh",
Password = "1211"
};
using(var connection = connectionFactory.CreateConnection())
{
using(var channel = connection.CreateModel())
{
channel.ExchangeDeclare(exchange:"topic_logs",type:ExchangeType.Topic);
var routingKey = Console.ReadLine();
var message = Console.ReadLine();
while (!string.IsNullOrWhiteSpace(routingKey) && !string.IsNullOrWhiteSpace(message))
{
var body = Encoding.UTF8.GetBytes(message);
channel.BasicPublish(exchange: "topic_logs", routingKey: routingKey, basicProperties: null, body: body);
Console.WriteLine("[x] Sent {0}:{1}", routingKey, message);
routingKey = Console.ReadLine();
message = Console.ReadLine();
}
}
}
}
}
}
消费者1
using RabbitMQ.Client;
using RabbitMQ.Client.Events;
using System;
using System.Collections.Generic;
using System.Linq;
using System.Text;
using System.Threading.Channels;
using System.Threading.Tasks;
namespace ProjectReceiveTopic
{
public class MainClass
{
static void Main()
{
var connectionFactory = new ConnectionFactory()
{
HostName = "localhost",
UserName = "lyh",
Password = "1211"
};
using(var connection = connectionFactory.CreateConnection())
{
using(var channel = connection.CreateModel())
{
channel.ExchangeDeclare(exchange: "topic_logs", type: ExchangeType.Topic);
var queueName = channel.QueueDeclare().QueueName;
var routingKey = Console.ReadLine(); // 输入队列绑定的bindingKey
channel.QueueBind(queue: queueName, exchange: "topic_logs", routingKey: routingKey);
var consumer = new EventingBasicConsumer(channel);
consumer.Received += (mdoel, ea) =>
{
var body = ea.Body.ToArray();
var message = Encoding.UTF8.GetString(body);
var routingKey = ea.RoutingKey;
Console.WriteLine(" [x1] bindingKey:{0} received message:{1}", routingKey, message);
};
channel.BasicConsume(queue: queueName, autoAck: true, consumer: consumer);
Console.WriteLine(" Press [enter] to exit.");
Console.ReadLine();
}
}
}
}
}
消费者2
using RabbitMQ.Client;
using RabbitMQ.Client.Events;
using System;
using System.Collections.Generic;
using System.Linq;
using System.Text;
using System.Threading.Channels;
using System.Threading.Tasks;
namespace ProjectReceiveTopic2
{
public class MainClass
{
static void Main()
{
var connectionFactory = new ConnectionFactory()
{
HostName = "localhost",
UserName = "lyh",
Password = "1211"
};
using (var connection = connectionFactory.CreateConnection())
{
using (var channel = connection.CreateModel())
{
channel.ExchangeDeclare(exchange: "topic_logs", type: ExchangeType.Topic);
var queueName = channel.QueueDeclare().QueueName;
var routingKey = Console.ReadLine(); // 输入队列绑定的bindingKey
channel.QueueBind(queue: queueName, exchange: "topic_logs", routingKey: routingKey);
var consumer = new EventingBasicConsumer(channel);
consumer.Received += (mdoel, ea) =>
{
var body = ea.Body.ToArray();
var message = Encoding.UTF8.GetString(body);
var routingKey = ea.RoutingKey;
Console.WriteLine(" [x2] bindingKey:{0} received message:{1}", routingKey, message);
};
channel.BasicConsume(queue: queueName, autoAck: true, consumer: consumer);
Console.WriteLine(" Press [enter] to exit.");
Console.ReadLine();
}
}
}
}
}
结果验证
(1)bindingKey为#
、kern.*
时
先运行消费者,后运行生产者。
消费者1输入 (接收所有消息)
#
消费者2输入 (接收源为kernel的所有消息)
kern.*
生产者 输入(发布源为kernel、严重度为fatal的消息 + 源为outter、严重度为info的消息)
kern.critical
A critical kernel error
outter.info
this is a infomation of lyh
当消费者的bindingKey为#
时(如receive1),效果和fanout交换机一样;消费者的bindingKey为kern.*
时(如receive2)只能接收源为kern的消息
(2)bindingKey为*.critical
、*.*
时
消费者1输入 (接收严重程度为critical的消息)
*.critical
消费者2输入 (接收所有消息,相当于#
)
*.*
生产者 输入(发布源为kernel、严重度为fatal的消息 + 源为outter、严重度为info的消息)
kern.critical
Another critical kernel error
outter.info
this is another infomation of lyh