1 Redis分布式缓存数据库结合“StackExchange.Redis”中间件实现消息队列(MessageQueuing)
Redis分布式缓存数据库消息队列(MessageQueuing)队列的实现模式有:
1、【生产(Producter)】--【消费(Worker)】模式。
变种模式:【基于异步消息队列List lpush-brpop(rpush-blpop)】 模式。
2、【双向链表(List)】模式。
3、【发布(Publish)】--【订阅(Subscription)】模式。
4、【数据流(Stream)】模式
变种模式:【消费组(Consumer Group)】 模式
本章的重点将放在RabbitMQ服务器结合“RabbitMQ.Client”中间件实现消息队列(MessageQueuing)上,所以只有Redis的【发布(Publish)】--【订阅(Subscription)】模式示例。
1.1 【发布(Publish)】端定义实现
using StackExchange.Redis;
namespace Publish
{
public class Program
{
public static void Main()
{
//连接到Redis分布式缓存数据库,通过“StackExchange.Redis”中间件实现:消息队列(MessageQueuing)。
using (ConnectionMultiplexer redis = ConnectionMultiplexer.Connect("127.0.0.1:6379"))
{
//通过“StackExchange.Redis”中间件实例化“Redis发布端”实例。
ISubscriber sub = redis.GetSubscriber();
Console.WriteLine("“Redis--【发布】端(Publish)”:已经成功构建,并成功启动中...");
//通过“发布端”的输入控制字符串,通过该字符串来限定“Redis订阅端”所显示的信息。
string? input;
//如果输入控制字符串不等“exit”,则“发布端”将一直通过“RedisPublishChannel”通道,来控制限定“Redis订阅端”所显示的信息,
//反之将退出“发布端”所构建的“RedisPublishChannel”通道。
do
{
//输入控制字符串。
input = Console.ReadLine();
//“Redis发布端”实例,通过“RedisPublishChannel”通道输入控制字符串,来控制限定“Redis订阅端”所显示的信息。
sub.Publish("RedisPublishChannel", input);
} while (input != "exit");
}
}
}
}
1.2 【订阅(Subscription)】端定义实现
using StackExchange.Redis;
namespace Subscription
{
public class Program
{
public static void Main()
{
//连接到Redis分布式缓存数据库,通过“StackExchange.Redis”中间件获取“发布端”实例的“RedisPublishChannel”通道。
using (ConnectionMultiplexer redis = ConnectionMultiplexer.Connect("127.0.0.1:6379"))
{
//通过“StackExchange.Redis”中间件实例化“Redis订阅端”实例。
ISubscriber sub = redis.GetSubscriber();
//“订阅端”接收“发布端”的“RedisPublishChannel”通道的输入,从而显示相应的信息
//channel:“Redis发布端”的“RedisPublishChannel”。
//message:“Redis发布端”“RedisPublishChannel”所输入的控制字符串。
sub.Subscribe("RedisPublishChannel", (channel, message) =>
{
//打印出发布端”“RedisPublishChannel”所输入的控制字符串。
Console.WriteLine($"[{DateTime.Now:HH:mm:ss}] {message}");
if (message == "yes")
{
ToDo();
}
else
{
Console.WriteLine("未获取指定的信息,什么也不做!");
}
});
Console.WriteLine("“Redis--【订阅】端(Subscription)”:已经成功构建,并成功启动中...");
Console.ReadKey();
}
}
/// <summary>
/// 【执行操作】
/// <remarks>
/// 摘要:
/// 在“Redis发布端”输入的控制字符串后,通过该方法在控制台中打印出相应的信息。
/// </remarks>
/// </summary>
public static void ToDo()
{
Console.WriteLine("执行你要做的操作....");
Console.WriteLine("已经执行完你要做的操作。");
}
}
}
2 RabbitMQ服务器结合“RabbitMQ.Client”中间件实现消息队列
2.1 准备工作
安装软件见:“windows环境下安装RabbitMQ(超详细)_luckySnow-julyo的博客-CSDN博客_windows安装rabbitmq”。
2.2 生产消费模式
2.2.1 【生产(ProducterPW)】端定义实现
using System.Text;
using static System.Console;
//Nuget--RabbitMQ.Client
using RabbitMQ.Client;
namespace ProducterPW
{
public class Program
{
public static void Main()
{
//根据配置数据,构建当前程序与RabbitMQ服务器连接。
ConnectionFactory _connectionFactory = new ConnectionFactory()
{
HostName = "127.0.0.1",//或:HostName = "localhost",
Port = 5672,//RabbitMQ服务器默认端口,可以被注释掉。
UserName = "guest",
Password = "guest"
};
//当前程序连接到RabbitMQ服务器。
IConnection? connection;
connection = _connectionFactory.CreateConnection();
//注意:【生产】--【消费】模式,是RabbitMQ消息队列的最简单实现模式,该模式中不包含交换机实例化的操作定义,实际上交换器实例(Exchange=Publish)集成定义在了【生产】端。
WriteLine("“RabbitMQ--【生产】端(ProducterPW)”:已经成功构建,并成功启动中...");
WriteLine("输入生产内容:");
while (true)
{
//通过RabbitMQ服务器实例化【生产】端与【消费】端之间的通道,实现把【生产】端中的信息发送到【生产】端。
using (var channel = connection?.CreateModel())
{
//通道名常量字符串值。
const string _queueName = "queueProducterPW";
//根据配置数据,构建当前程序与RabbitMQ服务器之间通道实例。
//queue:队列名常量字符串。
//durable:
// RabbitMQ默认将消息存储在内存(false)中,若rabbitMQ宕机,那么所有数据就会丢失,所以在声明队列的时候可以声明将数据持久化(true),
//但是如果已经声明了一个未持久化(false)的队列,如果该队列不能修改,只能将这个队列删除或重新声明一个持久化数据。
//exclusive:
// 如果你想创建一个只有自己可见(true)的队列,即不允许其它用户访问,RabbitMQ允许你将一个Queue声明成为排他性的(Exclusive Queue)。
//autoDelete:
// 如果RabbitMQ中交换器或队列不被需要了,即交换器下曾经绑定过的交换器或队列解除绑定,队列下的消费者解除订阅,则自动删除(true)将自动删除交换器或队列。
// 注意:
//自动删除(true),不能自动删除未曾经绑定过的交换器或队列。
//arguments:
// RabbitMQ队列的参数集实例,该实例用于存储RabbitMQ队列的多个配置数据实例。
channel?.QueueDeclare(queue: _queueName,
durable: true,
exclusive: false,
autoDelete: false,
arguments: null);
string _message = DateTime.Now.ToString("HH:mm:ss") + "-----";
string? _input = ReadLine();
_message = _message + _input;
byte[] _body = Encoding.UTF8.GetBytes(_message);
IBasicProperties? _basicProperties = channel?.CreateBasicProperties();
_basicProperties.Persistent = true;
//发送信息。
//exchange:
// 要将消息发送到的交换器(Exchange=Publish),由于【生产】--【消费】模式不需要交换器(Exchange=Publish),所以当前被设定为空字符串。
//但是如果已经声明了一个未持久化(false)的队列,如果该队列不能修改,只能将这个队列删除或重新声明一个持久化数据。
//routingKey:
// 路由键,这里特指通道名常量字符串。
//basicProperties:
// 其它的一些属性,如:{@link MessageProperties.PERSISTENT_TEXT_PLAIN}。
//body:
// 【生产】端向【消费】端所发送的信息。
channel.BasicPublish(exchange: "",
routingKey: _queueName,
basicProperties: _basicProperties,
body: _body);
}
}
}
}
}
2.2.2 【消费(Worker)】端定义实现
using System.Text;
using static System.Console;
//Nuget--RabbitMQ.Client
using RabbitMQ.Client;
using RabbitMQ.Client.Events;
namespace Worker
{
public class Program
{
public static void Main()
{
//根据配置数据,构建当前程序与RabbitMQ服务器连接。
ConnectionFactory _connectionFactory = new ConnectionFactory()
{
HostName = "127.0.0.1",//或:HostName = "localhost",
Port = 5672,//RabbitMQ服务器默认端口,可以被注释掉。
UserName = "guest",
Password = "guest"
};
//当前程序连接到RabbitMQ服务器。
IConnection _connection = _connectionFactory.CreateConnection();
WriteLine("“RabbitMQ--【消费】端(Worker)”:已经成功构建,并成功启动中...");
//通过RabbitMQ服务器实例化【生产】端与【消费】端之间的通道,实现把【生产】端中的信息发送到【生产】端。
using (IModel channel = _connection.CreateModel())
{
//通道名常量字符串值。
const string _queueName = "queueProducterPW";
//根据配置数据,构建当前程序与RabbitMQ服务器之间通道实例。
//queue:队列名常量字符串。
//durable:
// RabbitMQ默认将消息存储在内存(false)中,若rabbitMQ宕机,那么所有数据就会丢失,所以在声明队列的时候可以声明将数据持久化(true),
//但是如果已经声明了一个未持久化(false)的队列,如果该队列不能修改,只能将这个队列删除或重新声明一个持久化数据。
//exclusive:
// 如果你想创建一个只有自己可见(true)的队列,即不允许其它用户访问,RabbitMQ允许你将一个Queue声明成为排他性的(Exclusive Queue)。
//autoDelete:
// 如果RabbitMQ中交换器或队列不被需要了,即交换器下曾经绑定过的交换器或队列解除绑定,队列下的消费者解除订阅,则自动删除(true)将自动删除交换器或队列。
// 注意:
//自动删除(true),不能自动删除未曾经绑定过的交换器或队列。
//arguments:
// RabbitMQ队列的参数集实例,该实例用于存储RabbitMQ队列的多个配置数据实例。
channel.QueueDeclare(queue: _queueName,
durable: true,
exclusive: false,
autoDelete: false,
arguments: null);
// 顾名思义,服务的质量。通常我们设计系统的时候不能完全排除故障或保证说没有故障,而应该设计有完善的异常处理机制。
//在出现错误的时候知道在哪里出现什么样子的错误,原因是什么,怎么去恢复或者处理才是真正应该去做的。
//在接收消息出现故障的时候我们可以通过RabbitMQ重发机制来处理。重发就有重发次数的限制,
//有些时候你不可能不限次数的重发,这取决于消息的大小,重要程度和处理方式。
//prefetchSize:
// 可接收【生产】端消息的大小的,但是似乎在客户端2.8.6版本中它必须为0,即使:不受限制。如果不输0,
//程序会在运行到这一行的时候报错,说还没有实现不为0的情况。
//prefetchCount:
// 处理消息最大的数量。举个例子,如果输入1,那如果接收一个消息,但是没有应答,则客户端不会收到下一个消息,
//消息只会在队列中阻塞。如果输入3,那么可以最多有3个消息不应答,如果到达了3个,则发送端发给这个接收方得消息只会在队列中,
//而接收方不会有接收到消息的事件产生。总结说,就是在下一次发送应答消息前,客户端可以收到的消息最大数量。
//global:
// 指示是不是针对整个Connection的,因为一个Connection可以有多个Channel,如果是false则说明只是针对于这个Channel的。
channel.BasicQos(prefetchSize: 0, prefetchCount: 1, global: false);
//实例化消费事件实例,
EventingBasicConsumer consumer = new EventingBasicConsumer(channel);
//执行消费事件实例,领奖获取并打印出RabbitMQ--【生产】端(ProducterPW)传送的信息。
consumer.Received += (model, ea) =>
{
var body = ea.Body;
string _message = Encoding.Default.GetString(body.ToArray());
WriteLine("“RabbitMQ--【消费】端(Worker)”:接收RabbitMQ--【生产】端(ProducterPW)传送的信息:{0}", _message);
channel.BasicAck(deliveryTag: ea.DeliveryTag, multiple: false);
};
//根据配置数据,构建当前程序与RabbitMQ服务器之间通道实例。
//queue:队列名常量字符串。
//autoAck:
// RabbitMQ是默认开启自动删除(true)的,这样当rabbitMQ将消息发给消费者,就会从内存中将消息删除,
//这样会带来一个问题,如果消费者未处理完消息而宕机,那么消息就会丢失。所以,我们将关闭自动删除(false),
//当rabbitMQ收到消费者处理完消息的回应后才会从内存中删除消息。
//consumer:
// 1、Consumers就是消费者,主要就是消费当前的消息队列中的消息
// 2、消费消息有两种模式一种就是推送消费和拉取消费。
channel.BasicConsume(queue: _queueName,
autoAck: false,
consumer: consumer);
WriteLine("按【回车】键退出。");
ReadLine();
}
}
}
}
2.3 发布订阅模式
2.3.1 【生产(ProducterPPS)】端定义实现
using System.Text;
using static System.Console;
//Nuget--RabbitMQ.Client
using RabbitMQ.Client;
namespace ProducterPPS
{
public class Program
{
public static void Main()
{
//根据配置数据,构建当前程序与RabbitMQ服务器连接。
ConnectionFactory _connectionFactory = new ConnectionFactory()
{
HostName = "127.0.0.1",//或:HostName = "localhost",
Port = 5672,//RabbitMQ服务器默认端口,可以被注释掉。
UserName = "guest",
Password = "guest"
};
//当前程序连接到RabbitMQ服务器。
IConnection _connection = _connectionFactory.CreateConnection();
WriteLine("“RabbitMQ--【生产】端(ProducterPPS)”:已经成功构建,并成功启动中...");
WriteLine("输入生产内容:");
while (true)
{
//交换器名常量字符串值。
const string _exchangePPS = "ExchangePPS";
//通过RabbitMQ服务器实例化【生产】端与【交换器】端之间的通道,实现把【生产】端中的信息发送到【交换器】端。
//实例化1个通道(channel)实例;1个通道(channel)实例包含多个队列(Queue)实例。
IModel? _channel = _connection.CreateModel();
//实例化交换机实例。
//exchange:
// 要将消息发送到的交换器(Exchange=Publish),由于【发布】--【订阅】模式需要交换器。
//type:
// 交换机类型的枚举实例:Fanout(生产--消费/发布--订阅模式交换机类型)/direct(路由模式交换机类型)/topic(主题模式交换机类型)。
//durable:
// RabbitMQ默认将消息存储在内存(false)中,若rabbitMQ宕机,那么所有数据就会丢失,所以在声明队列的时候可以声明将数据持久化(true),
//但是如果已经声明了一个未持久化(false)的队列,如果该队列不能修改,只能将这个队列删除或重新声明一个持久化数据。
//autoDelete:
// 如果RabbitMQ中交换器或队列不被需要了,即交换器下曾经绑定过的交换器或队列解除绑定,队列下的消费者解除订阅,则自动删除(true)将自动删除交换器或队列。 _channel.ExchangeDeclare(exchange:_exchangePPS,
type:ExchangeType.Fanout,
durable:true,
autoDelete:false);
string? _input = ReadLine();
//发送信息。
//exchange:
// 要将消息发送到的交换器(Exchange=Publish),由于【发布】--【订阅】模式需要交换器。
//routingKey:
// 路由键,为空字符串。
//basicProperties:
// 其它的一些属性,如:{@link MessageProperties.PERSISTENT_TEXT_PLAIN}。
//body:
// 【生产】端向【消费】端所发送的信息。
_channel.BasicPublish(exchange:_exchangePPS,
routingKey:"",
basicProperties: null,
body:Encoding.Default.GetBytes(_input));
}
}
}
}
2.3.2 【消费A(WorkerPPS_A)】端定义实现
using System.Text;
using static System.Console;
//Nuget--RabbitMQ.Client
using RabbitMQ.Client;
using RabbitMQ.Client.Events;
namespace WorkerPPS_A
{
public class Program
{
public static void Main()
{
//通道名常量字符串值。
const string _queueName = "queueWorkerPPS_A";
//交换器名常量字符串值。
const string _exchangePPS = "ExchangePPS";
//根据配置数据,构建当前程序与RabbitMQ服务器连接。
ConnectionFactory _connectionFactory = new ConnectionFactory()
{
HostName = "127.0.0.1",//或:HostName = "localhost",
Port = 5672,//RabbitMQ服务器默认端口,可以被注释掉。
UserName = "guest",
Password = "guest"
};
//当前程序连接到RabbitMQ服务器。
IConnection _connection = _connectionFactory.CreateConnection();
WriteLine("“RabbitMQ--【消费_A】端(WorkerPPS_A)”:已经成功构建,并成功启动中...");
//实例化1个通道(channel)实例;1个通道(channel)实例包含多个队列(Queue)实例。
IModel _channel = _connection.CreateModel();
//根据配置数据,构建当前程序与RabbitMQ服务器之间通道实例。
//queue:队列名常量字符串。
//durable:
// RabbitMQ默认将消息存储在内存(false)中,若rabbitMQ宕机,那么所有数据就会丢失,所以在声明队列的时候可以声明将数据持久化(true),
//但是如果已经声明了一个未持久化(false)的队列,如果该队列不能修改,只能将这个队列删除或重新声明一个持久化数据。
//exclusive:
// 如果你想创建一个只有自己可见(true)的队列,即不允许其它用户访问,RabbitMQ允许你将一个Queue声明成为排他性的(Exclusive Queue)。
//autoDelete:
// 如果RabbitMQ中交换器或队列不被需要了,即交换器下曾经绑定过的交换器或队列解除绑定,队列下的消费者解除订阅,则自动删除(true)将自动删除交换器或队列。
// 注意:
//自动删除(true),不能自动删除未曾经绑定过的交换器或队列。
//arguments:
// RabbitMQ队列的参数集实例,该实例用于存储RabbitMQ队列的多个配置数据实例。
_channel.QueueDeclare(queue: _queueName,
durable: true,
exclusive: false,
autoDelete: false,
arguments: null);
//实例化交换机实例。
//exchange:
// 要将消息发送到的交换器(Exchange=Publish),由于【发布】--【订阅】模式需要交换器。
//type:
// 交换机类型的枚举实例:Fanout(生产--消费/发布--订阅模式交换机类型)/direct(路由模式交换机类型)/topic(主题模式交换机类型)。
//durable:
// RabbitMQ默认将消息存储在内存(false)中,若rabbitMQ宕机,那么所有数据就会丢失,所以在声明队列的时候可以声明将数据持久化(true),
//但是如果已经声明了一个未持久化(false)的队列,如果该队列不能修改,只能将这个队列删除或重新声明一个持久化数据。
//autoDelete:
// 如果RabbitMQ中交换器或队列不被需要了,即交换器下曾经绑定过的交换器或队列解除绑定,队列下的消费者解除订阅,则自动删除(true)将自动删除交换器或队列。
_channel.ExchangeDeclare(exchange: _exchangePPS,
type: ExchangeType.Fanout,
durable: true,
autoDelete: false);
//在【发布】--【订阅】模式下,将来1个指定通道(这里特指“A”通道)绑定到交换机中。
//queue:队列名常量字符串。
//exchange:
// 要将消息发送到的交换器(Exchange=Publish),由于【发布】--【订阅】模式需要交换器。
//routingKey:
// 路由键,为空字符串。
_channel.QueueBind(queue :_queueName,
exchange : _exchangePPS,
routingKey : "");
//实例化消费事件实例,
EventingBasicConsumer _consumer = new EventingBasicConsumer(_channel);
_consumer.Received += (a, e) =>
{
WriteLine($"{DateTime.Now}接收到消息:" + Encoding.Default.GetString(e.Body.ToArray()));
//根据配置数据,消费者确认收到1/n个消息。
//deliveryTag:
// 服务器端向消费者推送消息,消息会携带一个deliveryTag参数,也可以成此参数为消息唯一标识,是一个递增的正整数。
//multiple:
// true:确认所有消息,包括消息唯一标识小于等于deliveryTag的消息,false:只确认deliveryTag指定的消息,
// 收到回复后,RabbitMQ会直接在队列中删除这条消息
_channel.BasicAck(deliveryTag: e.DeliveryTag,
multiple: true);
};
//根据配置数据,构建当前程序与RabbitMQ服务器之间通道实例。
//queue:队列名常量字符串。
//autoAck:
// RabbitMQ是默认开启自动删除(true)的,这样当rabbitMQ将消息发给消费者,就会从内存中将消息删除,
//这样会带来一个问题,如果消费者未处理完消息而宕机,那么消息就会丢失。所以,我们将关闭自动删除(false),
//当rabbitMQ收到消费者处理完消息的回应后才会从内存中删除消息。
//consumer:
// 1、Consumers就是消费者,主要就是消费当前的消息队列中的消息
// 2、消费消息有两种模式一种就是推送消费和拉取消费。
_channel.BasicConsume(queue:_queueName,
autoAck:false,
consumer:_consumer);
ReadLine();
}
}
}
2.2.3 【消费B(WorkerPPS_B)】端定义实现
using System.Text;
using static System.Console;
//Nuget--RabbitMQ.Client
using RabbitMQ.Client;
using RabbitMQ.Client.Events;
namespace WorkerPPS_B
{
public class Program
{
public static void Main()
{
//通道名常量字符串值。
const string _queueName = "queueWorkerPPS_B";
// const string QUEUENAME = "WorkerPPS_A";
//交换器名常量字符串值。
const string _exchangePPS = "ExchangePPS";
//根据配置数据,构建当前程序与RabbitMQ服务器连接。
ConnectionFactory _connectionFactory = new ConnectionFactory()
{
HostName = "127.0.0.1",//或:HostName = "localhost",
Port = 5672,//RabbitMQ服务器默认端口,可以被注释掉。
UserName = "guest",
Password = "guest"
};
//当前程序连接到RabbitMQ服务器。
IConnection _connection = _connectionFactory.CreateConnection();
WriteLine("“RabbitMQ--【消费_B】端(WorkerPPS_B)”:已经成功构建,并成功启动中...");
//实例化1个通道(channel)实例;1个通道(channel)实例包含多个队列(Queue)实例。
IModel _channel = _connection.CreateModel();
//根据配置数据,构建当前程序与RabbitMQ服务器之间通道实例。
//queue:队列名常量字符串。
//durable:
// RabbitMQ默认将消息存储在内存(false)中,若rabbitMQ宕机,那么所有数据就会丢失,所以在声明队列的时候可以声明将数据持久化(true),
//但是如果已经声明了一个未持久化(false)的队列,如果该队列不能修改,只能将这个队列删除或重新声明一个持久化数据。
//exclusive:
// 如果你想创建一个只有自己可见(true)的队列,即不允许其它用户访问,RabbitMQ允许你将一个Queue声明成为排他性的(Exclusive Queue)。
//autoDelete:
// 如果RabbitMQ中交换器或队列不被需要了,即交换器下曾经绑定过的交换器或队列解除绑定,队列下的消费者解除订阅,则自动删除(true)将自动删除交换器或队列。
// 注意:
//自动删除(true),不能自动删除未曾经绑定过的交换器或队列。
//arguments:
// RabbitMQ队列的参数集实例,该实例用于存储RabbitMQ队列的多个配置数据实例。
_channel.QueueDeclare(queue: _queueName,
durable: true,
exclusive: false,
autoDelete: false,
arguments: null);
//实例化交换机实例。
//exchange:
// 要将消息发送到的交换器(Exchange=Publish),由于【发布】--【订阅】模式需要交换器。
//type:
// 交换机类型的枚举实例:Fanout(生产--消费/发布--订阅模式交换机类型)/direct(路由模式交换机类型)/topic(主题模式交换机类型)。
//durable:
// RabbitMQ默认将消息存储在内存(false)中,若rabbitMQ宕机,那么所有数据就会丢失,所以在声明队列的时候可以声明将数据持久化(true),
//但是如果已经声明了一个未持久化(false)的队列,如果该队列不能修改,只能将这个队列删除或重新声明一个持久化数据。
//autoDelete:
// 如果RabbitMQ中交换器或队列不被需要了,即交换器下曾经绑定过的交换器或队列解除绑定,队列下的消费者解除订阅,则自动删除(true)将自动删除交换器或队列。
_channel.ExchangeDeclare(exchange: _exchangePPS,
type: ExchangeType.Fanout,
durable: true,
autoDelete: false);
//在【发布】--【订阅】模式下,将来1个指定通道(这里特指“B”通道)绑定到交换机中。
//queue:队列名常量字符串。
//exchange:
// 要将消息发送到的交换器(Exchange=Publish),由于【发布】--【订阅】模式需要交换器。
//routingKey:
// 路由键,为空字符串。
_channel.QueueBind(queue: _queueName,
exchange: _exchangePPS,
routingKey : "");
//实例化消费事件实例,
EventingBasicConsumer _consumer = new EventingBasicConsumer(_channel);
_consumer.Received += (a, e) =>
{
WriteLine($"{DateTime.Now}接收到消息:" + Encoding.Default.GetString(e.Body.ToArray()));
//根据配置数据,消费者确认收到1/n个消息。
//deliveryTag:
// 服务器端向消费者推送消息,消息会携带一个deliveryTag参数,也可以成此参数为消息唯一标识,是一个递增的正整数。
//multiple:
// true:确认所有消息,包括消息唯一标识小于等于deliveryTag的消息,false:只确认deliveryTag指定的消息,
// 收到回复后,RabbitMQ会直接在队列中删除这条消息
_channel.BasicAck(deliveryTag:e.DeliveryTag,
multiple:true);
};
//根据配置数据,构建当前程序与RabbitMQ服务器之间通道实例。
//queue:队列名常量字符串。
//autoAck:
// RabbitMQ是默认开启自动删除(true)的,这样当rabbitMQ将消息发给消费者,就会从内存中将消息删除,
//这样会带来一个问题,如果消费者未处理完消息而宕机,那么消息就会丢失。所以,我们将关闭自动删除(false),
//当rabbitMQ收到消费者处理完消息的回应后才会从内存中删除消息。
//consumer:
// 1、Consumers就是消费者,主要就是消费当前的消息队列中的消息
// 2、消费消息有两种模式一种就是推送消费和拉取消费。
_channel.BasicConsume(queue: _queueName,
autoAck: false,
consumer: _consumer);
ReadLine();
}
}
}
2.4 路由模式
2.4.1 【生产(ProducterRouting)】端定义实现
using System.Text;
using static System.Console;
//Nuget--RabbitMQ.Client
using RabbitMQ.Client;
namespace ProducterRouting
{
public class Program
{
public static void Main()
{
//根据配置数据,构建当前程序与RabbitMQ服务器连接。
ConnectionFactory _connectionFactory = new ConnectionFactory()
{
HostName = "127.0.0.1",//或:HostName = "localhost",
Port = 5672,//RabbitMQ服务器默认端口,可以被注释掉。
UserName = "guest",
Password = "guest"
};
//当前程序连接到RabbitMQ服务器。
IConnection _connection = _connectionFactory.CreateConnection();
WriteLine("“RabbitMQ--【生产】端(ProducterRouting)”:已经成功构建,并成功启动中...");
WriteLine("输入生产内容:");
while (true)
{
//交换器名常量字符串值。
const string _exchangeRouting = "ExchangeRouting";
//通过RabbitMQ服务器实例化【生产】端与【交换器】端之间的通道,实现把【生产】端中的信息发送到【交换器】端。
//实例化1个通道(channel)实例;1个通道(channel)实例包含多个队列(Queue)实例。
IModel? _channel = _connection.CreateModel();
//实例化交换机实例。
//exchange:
// 要将消息发送到的交换器(Exchange=Publish),由于【路由】模式需要交换器。
//type:
// 交换机类型的枚举实例:Fanout(生产--消费/发布--订阅模式交换机类型)/direct(路由模式交换机类型)/topic(主题模式交换机类型)。
//durable:
// RabbitMQ默认将消息存储在内存(false)中,若rabbitMQ宕机,那么所有数据就会丢失,所以在声明队列的时候可以声明将数据持久化(true),
//但是如果已经声明了一个未持久化(false)的队列,如果该队列不能修改,只能将这个队列删除或重新声明一个持久化数据。
//autoDelete:
// 如果RabbitMQ中交换器或队列不被需要了,即交换器下曾经绑定过的交换器或队列解除绑定,队列下的消费者解除订阅,则自动删除(true)将自动删除交换器或队列。
_channel.ExchangeDeclare(exchange: _exchangeRouting,
type: ExchangeType.Direct,
durable: true,
autoDelete: false);
//路由键字符串常量。
string _routingKey = "Routing_A";
string _message = string.Empty;
//通过“发布端”的输入控制字符串,通过该字符串来限定“订阅端”所显示的信息。
string _input = string.Empty;
//输入控制字符串。
_input = ReadLine();
if (_input == "Routing_A")
{
_message = "QQ";
}
else if (_input == "Routing_B")
{
_routingKey = "Routing_B";
_message = "微信";
}
//发送信息。
//exchange:
// 要将消息发送到的交换器(Exchange=Publish),由于【发布】--【订阅】模式需要交换器。
//routingKey:
// 路由键,为空字符串。
//basicProperties:
// 其它的一些属性,如:{@link MessageProperties.PERSISTENT_TEXT_PLAIN}。
//body:
// 【生产】端向【消费】端所发送的信息。
_channel.BasicPublish(exchange: _exchangeRouting,
routingKey: _routingKey,
basicProperties: null,
body: Encoding.Default.GetBytes(_message));
}
}
}
}
2.4.2 【消费A(WorkerRouting_A)】端定义实现
using System.Text;
using static System.Console;
//Nuget--RabbitMQ.Client
using RabbitMQ.Client;
using RabbitMQ.Client.Events;
namespace WorkerRouting_A
{
public class Program
{
public static void Main()
{
//通道名常量字符串值。
const string _queueName = "queueWorkerRouting_A";
//交换器名常量字符串值。
const string _exchangeRouting = "ExchangeRouting";
//根据配置数据,构建当前程序与RabbitMQ服务器连接。
ConnectionFactory _connectionFactory = new ConnectionFactory()
{
HostName = "127.0.0.1",//或:HostName = "localhost",
Port = 5672,//RabbitMQ服务器默认端口,可以被注释掉。
UserName = "guest",
Password = "guest"
};
//当前程序连接到RabbitMQ服务器。
IConnection _connection = _connectionFactory.CreateConnection();
WriteLine("“RabbitMQ--【消费_A】端(WorkerRouting_A)”:已经成功构建,并成功启动中...");
//实例化1个通道(channel)实例;1个通道(channel)实例包含多个队列(Queue)实例。
IModel _channel = _connection.CreateModel();
//根据配置数据,构建当前程序与RabbitMQ服务器之间通道实例。
//queue:队列名常量字符串。
//durable:
// RabbitMQ默认将消息存储在内存(false)中,若rabbitMQ宕机,那么所有数据就会丢失,所以在声明队列的时候可以声明将数据持久化(true),
//但是如果已经声明了一个未持久化(false)的队列,如果该队列不能修改,只能将这个队列删除或重新声明一个持久化数据。
//exclusive:
// 如果你想创建一个只有自己可见(true)的队列,即不允许其它用户访问,RabbitMQ允许你将一个Queue声明成为排他性的(Exclusive Queue)。
//autoDelete:
// 如果RabbitMQ中交换器或队列不被需要了,即交换器下曾经绑定过的交换器或队列解除绑定,队列下的消费者解除订阅,则自动删除(true)将自动删除交换器或队列。
// 注意:
//自动删除(true),不能自动删除未曾经绑定过的交换器或队列。
//arguments:
// RabbitMQ队列的参数集实例,该实例用于存储RabbitMQ队列的多个配置数据实例。
_channel.QueueDeclare(queue: _queueName,
durable: true,
exclusive: false,
autoDelete: false,
arguments: null);
//实例化交换机实例。
//exchange:
// 要将消息发送到的交换器(Exchange=Publish),由于【路由】模式需要交换器。
//type:
// 交换机类型的枚举实例:Fanout(生产--消费/发布--订阅模式交换机类型)/direct(路由模式交换机类型)/topic(主题模式交换机类型)。
//durable:
// RabbitMQ默认将消息存储在内存(false)中,若rabbitMQ宕机,那么所有数据就会丢失,所以在声明队列的时候可以声明将数据持久化(true),
//但是如果已经声明了一个未持久化(false)的队列,如果该队列不能修改,只能将这个队列删除或重新声明一个持久化数据。
//autoDelete:
// 如果RabbitMQ中交换器或队列不被需要了,即交换器下曾经绑定过的交换器或队列解除绑定,队列下的消费者解除订阅,则自动删除(true)将自动删除交换器或队列。
_channel.ExchangeDeclare(exchange: _exchangeRouting,
type: ExchangeType.Direct,
durable: true,
autoDelete: false);
//在【路由】模式下,将来1个指定通道(这里特指“A”通道)绑定到交换机中。
//queue:队列名常量字符串。
//exchange:
// 要将消息发送到的交换器(Exchange=Publish),由于【路由】模式需要交换器。
//routingKey:
// 路由键,为空字符串。
_channel.QueueBind(queue: _queueName,
exchange: _exchangeRouting,
routingKey: "Routing_A");
//实例化消费事件实例,
EventingBasicConsumer _consumer = new EventingBasicConsumer(_channel);
_consumer.Received += (a, e) =>
{
WriteLine($"{DateTime.Now}接收到消息:" + Encoding.Default.GetString(e.Body.ToArray()));
//根据配置数据,消费者确认收到1/n个消息。
//deliveryTag:
// 服务器端向消费者推送消息,消息会携带一个deliveryTag参数,也可以成此参数为消息唯一标识,是一个递增的正整数。
//multiple:
// true:确认所有消息,包括消息唯一标识小于等于deliveryTag的消息,false:只确认deliveryTag指定的消息,
// 收到回复后,RabbitMQ会直接在队列中删除这条消息
_channel.BasicAck(deliveryTag: e.DeliveryTag,
multiple: true);
};
//根据配置数据,构建当前程序与RabbitMQ服务器之间通道实例。
//queue:队列名常量字符串。
//autoAck:
// RabbitMQ是默认开启自动删除(true)的,这样当rabbitMQ将消息发给消费者,就会从内存中将消息删除,
//这样会带来一个问题,如果消费者未处理完消息而宕机,那么消息就会丢失。所以,我们将关闭自动删除(false),
//当rabbitMQ收到消费者处理完消息的回应后才会从内存中删除消息。
//consumer:
// 1、Consumers就是消费者,主要就是消费当前的消息队列中的消息
// 2、消费消息有两种模式一种就是推送消费和拉取消费。
_channel.BasicConsume(queue: _queueName,
autoAck: false,
consumer: _consumer);
ReadLine();
}
}
}
2.4.3 【消费B(WorkerRouting_B)】端定义实现
using System.Text;
using static System.Console;
//Nuget--RabbitMQ.Client
using RabbitMQ.Client;
using RabbitMQ.Client.Events;
namespace WorkerRouting_B
{
public class Program
{
public static void Main()
{
//通道名常量字符串值。
const string _queueName = "queueWorkerRouting_B";
//交换器名常量字符串值。
const string _exchangeRouting = "ExchangeRouting";
//根据配置数据,构建当前程序与RabbitMQ服务器连接。
ConnectionFactory _connectionFactory = new ConnectionFactory()
{
HostName = "127.0.0.1",//或:HostName = "localhost",
Port = 5672,//RabbitMQ服务器默认端口,可以被注释掉。
UserName = "guest",
Password = "guest"
};
//当前程序连接到RabbitMQ服务器。
IConnection _connection = _connectionFactory.CreateConnection();
WriteLine("“RabbitMQ--【消费_B】端(WorkerRouting_B)”:已经成功构建,并成功启动中...");
//实例化1个通道(channel)实例;1个通道(channel)实例包含多个队列(Queue)实例。
IModel _channel = _connection.CreateModel();
//根据配置数据,构建当前程序与RabbitMQ服务器之间通道实例。
//queue:队列名常量字符串。
//durable:
// RabbitMQ默认将消息存储在内存(false)中,若rabbitMQ宕机,那么所有数据就会丢失,所以在声明队列的时候可以声明将数据持久化(true),
//但是如果已经声明了一个未持久化(false)的队列,如果该队列不能修改,只能将这个队列删除或重新声明一个持久化数据。
//exclusive:
// 如果你想创建一个只有自己可见(true)的队列,即不允许其它用户访问,RabbitMQ允许你将一个Queue声明成为排他性的(Exclusive Queue)。
//autoDelete:
// 如果RabbitMQ中交换器或队列不被需要了,即交换器下曾经绑定过的交换器或队列解除绑定,队列下的消费者解除订阅,则自动删除(true)将自动删除交换器或队列。
// 注意:
//自动删除(true),不能自动删除未曾经绑定过的交换器或队列。
//arguments:
// RabbitMQ队列的参数集实例,该实例用于存储RabbitMQ队列的多个配置数据实例。
_channel.QueueDeclare(queue: _queueName,
durable: true,
exclusive: false,
autoDelete: false,
arguments: null);
//实例化交换机实例。
//exchange:
// 要将消息发送到的交换器(Exchange=Publish),由于【路由】模式需要交换器。
//type:
// 交换机类型的枚举实例:Fanout(生产--消费/发布--订阅模式交换机类型)/direct(路由模式交换机类型)/topic(主题模式交换机类型)。
//durable:
// RabbitMQ默认将消息存储在内存(false)中,若rabbitMQ宕机,那么所有数据就会丢失,所以在声明队列的时候可以声明将数据持久化(true),
//但是如果已经声明了一个未持久化(false)的队列,如果该队列不能修改,只能将这个队列删除或重新声明一个持久化数据。
//autoDelete:
// 如果RabbitMQ中交换器或队列不被需要了,即交换器下曾经绑定过的交换器或队列解除绑定,队列下的消费者解除订阅,则自动删除(true)将自动删除交换器或队列。
_channel.ExchangeDeclare(exchange: _exchangeRouting,
type: ExchangeType.Direct,
durable: true,
autoDelete: false);
//在【路由】模式下,将来1个指定通道(这里特指“B”通道)绑定到交换机中。
//queue:队列名常量字符串。
//exchange:
// 要将消息发送到的交换器(Exchange=Publish),由于【路由】模式需要交换器。
//routingKey:
// 路由键,为空字符串。
_channel.QueueBind(queue: _queueName,
exchange: _exchangeRouting,
routingKey: "Routing_B");
//实例化消费事件实例,
EventingBasicConsumer _consumer = new EventingBasicConsumer(_channel);
_consumer.Received += (a, e) =>
{
WriteLine($"{DateTime.Now}接收到消息:" + Encoding.Default.GetString(e.Body.ToArray()));
//根据配置数据,消费者确认收到1/n个消息。
//deliveryTag:
// 服务器端向消费者推送消息,消息会携带一个deliveryTag参数,也可以成此参数为消息唯一标识,是一个递增的正整数。
//multiple:
// true:确认所有消息,包括消息唯一标识小于等于deliveryTag的消息,false:只确认deliveryTag指定的消息,
// 收到回复后,RabbitMQ会直接在队列中删除这条消息
_channel.BasicAck(deliveryTag: e.DeliveryTag,
multiple: true);
};
//根据配置数据,构建当前程序与RabbitMQ服务器之间通道实例。
//queue:队列名常量字符串。
//autoAck:
// RabbitMQ是默认开启自动删除(true)的,这样当rabbitMQ将消息发给消费者,就会从内存中将消息删除,
//这样会带来一个问题,如果消费者未处理完消息而宕机,那么消息就会丢失。所以,我们将关闭自动删除(false),
//当rabbitMQ收到消费者处理完消息的回应后才会从内存中删除消息。
//consumer:
// 1、Consumers就是消费者,主要就是消费当前的消息队列中的消息
// 2、消费消息有两种模式一种就是推送消费和拉取消费。
_channel.BasicConsume(queue: _queueName,
autoAck: false,
consumer: _consumer);
ReadLine();
}
}
}
2.5 主题(通配符)模式
同路由模式
3 参考
Redis:
1、(1条消息) 浅谈三种使用Redis实现MQ的方式_默辨的博客-CSDN博客_redis mq
2、(1条消息) .NetCore 3.1 项目中如何使用Redis的发布订阅(MQ)_风中极乐鸟的博客-CSDN博客
RabbitMQ:
- 如何用.NETCore操作RabbitMQ_C#教程_脚本之家 (jb51.net)
- (1条消息) .NET Core 中使用 RabbitMQ_sundna的博客-CSDN博客_core rabbitmq
- (1条消息) RabbitMQ之basicConsume、basicCancel、basicPublish等方法详解_艾米莉Emily的博客-CSDN博客_basicconsume
- C#调用RabbitMQ实现消息队列 - kiba518 - 博客园 (cnblogs.com)
对以上功能更为具体实现和注释见:221208_11MessageQueuing(初识消息队列)。