文章目录
- 1)前言
- 2)临时队列
- 3)绑定
- 4)综合以上代码
- 准备工作
- 1、生产者
- 2、消费者1
- 3、消费者2
- 5)验证
官网教程原文链接: https://www.rabbitmq.com/tutorials/tutorial-three-dotnet.html
翻译版参考链接: https://www.cnblogs.com/grayguo/p/5356070.html
上一章节的文章介绍的是将多个消息发给两个消费者,现在我们来试试将一个消息发给多个消费者,这种模式称为发布/订阅。
为了说明这种模式,我们将构建一个简单的日志记录系统。它将由两个程序组成 :第一个将发出日志消息,第二个将接收并打印它们。在我们的日志系统中,每一个运行中的接收者副本将都会获得消息,这种方式可以让我们在运行一个接收者直接把消息保存在磁盘的同时,另外一个消费者可以把消息打印到屏幕上。(本质上,已发布的日志消息将广播到所有接收方)
1)前言
- 生产者(发布者)是发送消息的用户应用程序
- 队列是存储消息的缓冲区
- 消费者(接收者)是接收消息的用户应用程序
实际上生产者不知道自己发送的消息会被存入队列中,生产者是直接将消息发送给交换机的,然后根据交换类型由交换机决定将消息发送给哪个队列或哪些队列或直接丢弃该消息。
交换机类型分为 direct
,topic
,headers
,fanout
,为了达到日志系统的功能,我们将创建一个fanout类型的交换机,名字叫做logs。fanout类型的交换机会将收到的所有消息发给已知的所有队列
channel.ExchangeDeclare("logs", ExchangeType.Fanout);
可以细心的看一下,当只写两个参数时,duiable和autoDelete默认设置为false:
这里你会有个疑惑,上一篇work queues的文章中没有提到过交换机的事,是怎么将消息发送给队列的呢?
实际上我们使用的是一个默认的交互机,名字为空(“”),如下
当指定routingkey后会将消息发给指定的队列,若不指定则将发给所有的队列
var message = GetMessage(args);
var body = Encoding.UTF8.GetBytes(message);
channel.BasicPublish(exchange: "logs",
routingKey: "",
basicProperties: null,
body: body);
注:使用如下命令可以列出rabbitmq中的已添加的交换机
rabbitmqctl list_exchanges
2)临时队列
上一篇文章中,我们给每一个队列起了一个名字,这样在消费者代码中指定同样的队列就能对该队列中发布的消息进行消费(生产者、消费者共享一个队列),但是我们这章想做的有所区别:
- 监听到所有的日志消息,而非其子集
- 只得到当前正在流转的消息,而非旧的消息
为了解决以上两点我们需要做两件事:
- 1、无论何时我们连接到rabbitmq都需要新建一个崭新的空队列,换就话说我们可以每次创建一个随机名称的队列,更好的方式是让服务器随机选取一个名字来给我们的队列。使用
var queueName = channel.QueueDeclare().QueueName;
可以查看名字 - 2、当消费者断开连接时队列应当同时被删除
- 在.Net Client 我们使用无参的
channel.QueueDeclare()
方法来创建一个随机命名的、非持久的、自动删除的、的队列.
3)绑定
交换机与队列之间的关系叫做绑定,添加了绑定交换机才知道给那些队列转发消息。
channel.QueueBind(queue: queueName,
exchange: "logs",
routingKey: "");
使用以下代码可以查看已有的绑定信息
rabbitmqctl list_bindings
4)综合以上代码
准备工作
新建一个netcore6的控制台项目,添加RabbitMQ的包依赖
NuGet\Install-Package RabbitMQ.Client -Version 6.4.0
新建一个类MainClass,注释掉program.cs的代码,使MainClass中的tatic void Main(string[] args)作为程序的入口
按照此方法新建三个netcore6的控制台项目,分别代表生产者,消费者1,消费者2
1、生产者
输入空格再回车即停止输入
using RabbitMQ.Client;
using System;
using System.Collections.Generic;
using System.Linq;
using System.Text;
using System.Threading.Tasks;
namespace ProgjectSentLog
{
public class MainClass
{
static void Main()
{
var factory = new ConnectionFactory()
{
HostName = "localhost",
UserName = "lyh",
Password = "1211"
};
using(var connection = factory.CreateConnection())
{
using(var channel = connection.CreateModel())
{
channel.ExchangeDeclare(exchange: "logs", type: ExchangeType.Fanout);
var message = Console.ReadLine();
while(!String.IsNullOrWhiteSpace(message))
{
var body = Encoding.UTF8.GetBytes(message);
channel.BasicPublish(exchange:"logs",routingKey:"",basicProperties:null,body:body);
Console.WriteLine("已发送:{0}", message);
message= Console.ReadLine();
}
}
}
Console.ReadLine();
}
}
}
2、消费者1
using RabbitMQ.Client;
using RabbitMQ.Client.Events;
using System;
using System.Collections.Generic;
using System.Linq;
using System.Security.Cryptography;
using System.Text;
using System.Threading.Tasks;
namespace ProjectReceiveLog1
{
public class MainClass
{
static void Main()
{
var factory = new ConnectionFactory()
{
HostName = "localhost",
UserName = "lyh",
Password = "1211"
};
using (var connection = factory.CreateConnection())
{
using(var channel = connection.CreateModel())
{
channel.ExchangeDeclare(exchange: "logs", type: ExchangeType.Fanout);
var queueName = channel.QueueDeclare().QueueName;
channel.QueueBind(queue: queueName, exchange: "logs", routingKey: "");
Console.WriteLine("[*1] Waiting for logs");
var consumer = new EventingBasicConsumer(channel);
consumer.Received += (model, ea) =>
{
var body = ea.Body.ToArray();
var message = Encoding.UTF8.GetString(body);
Console.WriteLine("[x1] {0}", message);
};
channel.BasicConsume(queue:queueName,autoAck:true,consumer:consumer);
Console.WriteLine("Press [enter] to exit");
Console.ReadLine();
}
}
}
}
}
3、消费者2
代码同消费者1,只是输出略有差别
using RabbitMQ.Client;
using RabbitMQ.Client.Events;
using System;
using System.Collections.Generic;
using System.Linq;
using System.Security.Cryptography;
using System.Text;
using System.Threading.Tasks;
namespace ProjectReceiveLog2
{
public class MainClass
{
static void Main()
{
var factory = new ConnectionFactory()
{
HostName = "localhost",
UserName= "lyh",
Password="1211"
};
using(var connection = factory.CreateConnection())
{
using(var channel = connection.CreateModel())
{
channel.ExchangeDeclare(exchange: "logs", type: ExchangeType.Fanout);
var queueName = channel.QueueDeclare().QueueName;
channel.QueueBind(queue: queueName, exchange: "logs", "");
Console.WriteLine("[*2] Waiting for logs");
var consumer = new EventingBasicConsumer(channel);
consumer.Received += (model, ea) =>
{
var body = ea.Body.ToArray();
var message = Encoding.UTF8.GetString(body);
Console.WriteLine("[x2] {0}", message);
};
channel.BasicConsume(queue: queueName, autoAck: true, consumer);
Console.WriteLine("Press [Enter] to exit");
Console.ReadLine();
}
}
}
}
}
先运行两个消费者,再运行生产者,在生产者处输入的消息可以被两个消费者接收到。
5)验证
上面说到:当消费者断开连接时队列应当同时被删除,这里可以看出当程序结束时,队列、绑定都已经被删除
如果想了解如何监听子集要看下一篇文章奥