文章目录
- 1)前言
- 2)Direct exchange 直接类型的交换机
- 3)Multiple bindings 多绑定
- 4)Emitting logs 发送日志
- 5)Subscribing 订阅
- 6)综合以上代码
- 准备工作
- 生产者
- 消费者1
- 消费者2
- 消费者3
- 运行结果
官网参考链接: https://www.rabbitmq.com/tutorials/tutorial-four-dotnet.html
其他人的翻译版参考: https://www.cnblogs.com/grayguo/p/5377552.html
以下工作是本人在参考官网的教程下,结合自己的理解做的代码流程,更深刻的理解还需要参考官网进行学习哦
1)前言
在上一篇文章中我们构建了一个简单的日志系统,我们可以向多个接受者广播消息。
在本文章中,我们将为其添加一个功能使得针对部分消息的接受成为可能。例如:我们将能够将关键错误消息定向到日志文件(即存到磁盘),同时仍然能够在控制台上打印所有日志消息(不对非错误信息进行存储,节约了磁盘空间)
上一篇文章中,我们写了如下的绑定,将logs交换机绑定给所有的队列,这样所有的队列都能接收到转发的消息
channel.QueueBind(queue: queueName,
exchange: "logs",
routingKey: "");
若想指定某队列接收消息需要设定参数routingKey的值,而不是向上面那样指定为空,为了避免和BasicPublish 方法的参数混淆,我们暂且称之为binding key,下面是我们创建一个带有指定binding key的绑定:
channel.QueueBind(queue: queueName,
exchange: "direct_logs", // 交换机的名字也进行了修改
routingKey: "black");
2)Direct exchange 直接类型的交换机
我们之前的日志系统,把接受到的消息广播给所有的接受者,我们将要扩展它使得其能够根据消息的级别来过滤发送消息,例如我们想让记录日志的接受者仅仅接受严重性级别的错误消息,而不用在警告和信息级别的消息上浪费磁盘空间。
我们知道交换机类型分为 direct
,topic
,headers
,fanout
。其中direct类型的交换机会将消息会被发送到其binding key 和消息的routing key 完全匹配的队列上,所以可以使用该类型的交换机实现消息的过滤。
一个带有routing key为"orange"的消息,会被路由到队列Q1上;带有routing key为"black" 或 "green"的消息将会被路由到队列Q2上。
3)Multiple bindings 多绑定
使用一个binding key 绑定多个队列完全是合法的,在我们案例中我们可以在 路由X 和 队列Q1,Q2中同时添加一个binding key为"black"的绑定,在这种情况下路由 X 将会像 fanout交换机一样把匹配到的消息发送给所有的接受者即,路由会把binding key 为"black"的消息发送给Q1和Q2。
4)Emitting logs 发送日志
我们把日志级别作为路由的rouing key,这样接收者就可以根据日志级别选择接受其感兴趣的日志。
首先我们需要创建一个交换机
channel.ExchangeDeclare(exchange: "direct_logs", type: "direct");
然后准备发送的消息, 其中severity变量 分为以下几种:“info”,“warning”,“error”.
var body = Encoding.UTF8.GetBytes(message);
channel.BasicPublish(exchange: "direct_logs",
routingKey: severity,
basicProperties: null,
body: body);
之前的写法如下(可以对比来看):
5)Subscribing 订阅
接受消息和之前的一样,唯一的区别就是我们将会为我们感兴趣的每一个级别的消息新建绑定:
var queueName = channel.QueueDeclare().QueueName;
foreach(var severity in args)
{
channel.QueueBind(queue: queueName,
exchange: "direct_logs",
routingKey: severity); // 添加了bindingKey,用来指定队列
}
6)综合以上代码
准备工作
新建一个netcore6的控制台项目,添加RabbitMQ的包依赖
NuGet\Install-Package RabbitMQ.Client -Version 6.4.0
新建一个类MainClass,注释掉program.cs的代码,使MainClass中的tatic void Main(string[] args)作为程序的入口
按照此方法新建三个netcore6的控制台项目,分别代表生产者,消费者1,消费者2,消费者3
此代码为本人的简化版与官网代码略有不同,先运行两个消费者(一个只接收error信息,另一个只接收info信息),再运行生产者(生产者要先输入信息来源回车,再输入具体信息)
生产者
using RabbitMQ.Client;
using System;
using System.Collections.Generic;
using System.Linq;
using System.Text;
using System.Threading.Tasks;
namespace ProjectSendDirect
{
public class MainClass
{
static void Main()
{
var factory = new ConnectionFactory()
{
HostName = "localhost",
UserName = "lyh",
Password = "1211",
};
using(var connection = factory.CreateConnection())
{
using(var channel = connection.CreateModel())
{
channel.ExchangeDeclare(exchange:"direct_logs", type:ExchangeType.Direct);
var tmp = Console.ReadLine();
var severity = string.IsNullOrEmpty(tmp)?"info":tmp;
var message = Console.ReadLine();
while(!string.IsNullOrEmpty(severity) && !string.IsNullOrEmpty(message))
{
var body = Encoding.UTF8.GetBytes(message);
channel.BasicPublish(exchange:"direct_logs",routingKey:severity,basicProperties:null,body:body);
Console.WriteLine("[x] Sent {0}:{1}", severity, message);
tmp = Console.ReadLine();
severity = string.IsNullOrEmpty(tmp) ? "info" : tmp;
message = Console.ReadLine();
}
Console.WriteLine("Press [Enter] to exit");
Console.ReadLine();
}
}
}
}
}
消费者1
using RabbitMQ.Client;
using RabbitMQ.Client.Events;
using System;
using System.Collections.Generic;
using System.Linq;
using System.Text;
using System.Threading.Tasks;
namespace ProjectReceiveDirect1
{
public class MainClass
{
static void Main()
{
var factory = new ConnectionFactory()
{
HostName = "localhost",
UserName = "lyh",
Password = "1211"
};
using(var connection = factory.CreateConnection())
{
using(var channel = connection.CreateModel())
{
channel.ExchangeDeclare(exchange: "direct_logs", type: ExchangeType.Direct);
var queueName = channel.QueueDeclare().QueueName;
channel.QueueBind(queue: queueName, exchange: "direct_logs", routingKey: "error");
Console.WriteLine("[x1] Waiting for message.");
var consumer = new EventingBasicConsumer(channel);
consumer.Received += (model, ea) =>
{
var body = ea.Body.ToArray();
var message = Encoding.UTF8.GetString(body);
var routingKey = ea.RoutingKey;
Console.WriteLine("[x1] Received {0}:{1}", routingKey, message);
};
channel.BasicConsume(queue: queueName, autoAck: true, consumer: consumer);
Console.WriteLine("Press [enter] to exit");
Console.ReadLine();
}
}
}
}
}
消费者2
基本与消费者一致,只有输出部分有所不同
using RabbitMQ.Client;
using RabbitMQ.Client.Events;
using System;
using System.Collections.Generic;
using System.Linq;
using System.Text;
using System.Threading.Tasks;
namespace ProjectReceiveDirect2
{
public class MainClass
{
static void Main()
{
var factory = new ConnectionFactory()
{
HostName = "localhost",
UserName = "lyh",
Password = "1211"
};
using (var connection = factory.CreateConnection())
{
using (var channel = connection.CreateModel())
{
channel.ExchangeDeclare(exchange: "direct_logs", type: ExchangeType.Direct);
var queueName = channel.QueueDeclare().QueueName;
channel.QueueBind(queue: queueName, exchange: "direct_logs", routingKey: "info");
Console.WriteLine("[x2] Waiting for message.");
var consumer = new EventingBasicConsumer(channel);
consumer.Received += (model, ea) =>
{
var body = ea.Body.ToArray();
var message = Encoding.UTF8.GetString(body);
var routingKey = ea.RoutingKey;
Console.WriteLine("[x2] Received {0}:{1}", routingKey, message);
};
channel.BasicConsume(queue: queueName, autoAck: true, consumer: consumer);
Console.WriteLine("Press [enter] to exit");
Console.ReadLine();
}
}
}
}
}
消费者3
与前两个消费者基本一致,只有输出略有不同
using RabbitMQ.Client;
using RabbitMQ.Client.Events;
using System;
using System.Collections.Generic;
using System.Linq;
using System.Text;
using System.Threading.Tasks;
namespace ProjectReceiveDirect3
{
public class MainClass
{
static void Main()
{
var factory = new ConnectionFactory()
{
HostName = "localhost",
UserName = "lyh",
Password = "1211"
};
using (var connection = factory.CreateConnection())
{
using (var channel = connection.CreateModel())
{
channel.ExchangeDeclare(exchange: "direct_logs", type: ExchangeType.Direct);
var queueName = channel.QueueDeclare().QueueName;
channel.QueueBind(queue: queueName, exchange: "direct_logs", routingKey: "info");
Console.WriteLine("[x3] Waiting for message.");
var consumer = new EventingBasicConsumer(channel);
consumer.Received += (model, ea) =>
{
var body = ea.Body.ToArray();
var message = Encoding.UTF8.GetString(body);
var routingKey = ea.RoutingKey;
Console.WriteLine("[x3] Received {0}:{1}", routingKey, message);
};
channel.BasicConsume(queue: queueName, autoAck: true, consumer: consumer);
Console.WriteLine("Press [enter] to exit");
Console.ReadLine();
}
}
}
}
}
运行结果
验证:一个交换机可以绑定多个队列,绑定相同routingkey(如info)的队列接收到相同的消息
验证:一个交换机可以绑定多个队列,绑定多个routingkey(如info、error)该队列接收到两个routingkey绑定的消息。