文章目录
- 1)准备工作
- 2)新建消费者1
- 3)新建消费者2
- 4)生产者
- 5)知识点解读
- 1、autoAck: true
- 2、重复声明/前后不一致
- 3、Message durability 消息持久化
- 4、Fair Dispatch 公平调度
- 5、综合以上知识点的代码:
官网参考链接:https://www.rabbitmq.com/tutorials/tutorial-two-dotnet.html
其他人的翻译版参考:https://www.cnblogs.com/longlongogo/p/6489574.html
以下工作是本人在参考官网的教程下,结合自己的理解做的简易版,更深刻的理解还需要参考官网进行学习哦
1)准备工作
rabbitmqctl status
运行成功后显示:
新建一个用户,设置密码,并授予权限,并将其设置为管理员
rabbitmqctl add_user JC JayChou //创建用户JC密码为JayChou
rabbitmqctl set_permissions JC ".*" ".*" ".*" //赋予JC读写所有消息队列的权限
rabbitmqctl set_user_tags JC administrator //分配用户组
进入本机rabbitmq的可视化网址http://localhost:15672/,使用上述账号密码进行登录
2)新建消费者1
新建一个netcore6的控制台项目,新建一个类MainClass,注释掉program.cs的代码,使MainClass中的tatic void Main(string[] args)作为程序的入口,在MainClass中写下如下代码:
using RabbitMQ.Client;
using RabbitMQ.Client.Events;
using System;
using System.Collections.Generic;
using System.Linq;
using System.Text;
using System.Threading.Tasks;
namespace ProjectReceive
{
public class MainClass
{
static void Main(string[] args)
{
var factory = new ConnectionFactory(); // 定义用于连接RabbitMQ节点的工厂类
factory.HostName = "localhost"; // RabbitMQ服务在本地运行
factory.UserName = "lyh";
factory.Password = "1211";
using (var connection = factory.CreateConnection())
{
using (var channel = connection.CreateModel())
{
channel.QueueDeclare(queue: "task_queue2", durable: false, exclusive: false, autoDelete: false, arguments: null); // 声明一个队列
var consumer = new EventingBasicConsumer(channel); //定义同步的消费者
consumer.Received += (model, r) => //接收消息
{
var body = r.Body.ToArray();
var message = Encoding.UTF8.GetString(body); //将二进制数据转为字符串
Console.WriteLine("[x1] 已接收:{0}", message);
int dots = message.Split('.').Length; //根据传过来数据中的.个数等待,模拟耗时任务
Thread.Sleep(dots * 1000);
Console.WriteLine("[x1] Done");
};
channel.BasicConsume(queue: "task_queue2", autoAck: true, consumer: consumer); //进行消费,消费后就不会被另一消费者获取到了(autoAck: true 自动标记已消费)
Console.ReadLine();
}
}
}
}
}
3)新建消费者2
步骤及内容同消费者1,输出内容有区别
using RabbitMQ.Client;
using RabbitMQ.Client.Events;
using System;
using System.Collections.Generic;
using System.Linq;
using System.Text;
using System.Threading.Tasks;
namespace ProjectReceive2
{
public class MainClass
{
static void Main(string[] args)
{
var factory = new ConnectionFactory();
factory.HostName = "localhost";
factory.UserName = "lyh";
factory.Password = "1211";
using (var connection = factory.CreateConnection())
{
using (var channel = connection.CreateModel())
{
channel.QueueDeclare(queue: "task_queue2", durable: false, exclusive: false, autoDelete: false, arguments: null); // 声明一个队列
var consumer = new EventingBasicConsumer(channel);
consumer.Received += (model, r) =>
{
var body = r.Body.ToArray();
var message = Encoding.UTF8.GetString(body);
Console.WriteLine("[x2] 已接收:{0}", message);
int dots = message.Split('.').Length;
Thread.Sleep(dots * 1000);
Console.WriteLine("[x2] Done");
channel.BasicAck(r.DeliveryTag, false); // 手动标记已消费
};
channel.BasicConsume(queue: "task_queue2", autoAck: false, consumer: consumer); // (autoAck: false 手动标记已消费)
Console.ReadLine();
}
}
}
}
}
4)生产者
注:队列进行持久化设置即 csharpchannel.QueueDeclare("task_queue2", durable:false, exclusive:false, autoDelete:false, arguments:null);
,仅代表队列的相关属性会被持久化,队列中的消息需要单独设置
using RabbitMQ.Client;
using System;
using System.Collections.Generic;
using System.Linq;
using System.Text;
using System.Threading.Tasks;
using static System.Net.Mime.MediaTypeNames;
namespace ProjectSend
{
public class MainClass
{
static void Main(string[] args)
{
var factory = new ConnectionFactory();
factory.HostName = "localhost"; // 本地运行
factory.UserName = "lyh";
factory.Password = "1211";
using (var connection = factory.CreateConnection())
{
using (var channel = connection.CreateModel())
{
channel.QueueDeclare("task_queue2", durable:false, exclusive:false, autoDelete:false, arguments:null); // 创建一个名为task_queue的消息队列
var message = Console.ReadLine();
while (!string.IsNullOrWhiteSpace(message)) //循环接收直到输入空格,回车停止程序
{
var body = Encoding.UTF8.GetBytes(message); //信息按照UTF-8编码为二进制数组 || 实体类需要序列化再转为二进制数组
channel.BasicPublish(exchange: "", routingKey: "task_queue2", basicProperties: properties, body: body);//发布信息
Console.WriteLine("已发送:{0}", message);
message = Console.ReadLine(); //接收传递的信息
}
}
}
Console.ReadLine();
}
}
}
先运行两个消费者,再运行生产者并输入如下数据再回车(保证每行后面都要有回车),即可查看消费者的消费记录
First message.
Second message..
Third message...
Fourth message....
Fifth message.....
当发送程序输入空格并回车,则结束发送程序。可以看出两个消费方式不同的程序都对发送的消息进行了均匀的消费
】、
5)知识点解读
1、autoAck: true
如果不设置 autoAck: true ,如下:
channel.BasicConsume(queue: "task_queue2", autoAck: true, consumer: consumer); // autoAck: true 自动标记已消费
则需要在接收一个消息后手动进行标记
channel.BasicAck(deliveryTag:r.DeliveryTag, multiple:false); // 手动标记已消费
否则程序结束后再次运行,仍会得到上次已经消费过的消息(因为没有在已经消费过的消息上打上已消费的标记,所以还在队列中存在,这会导致rabbitmq无法释放掉已消费的消息而消耗越来越多的内存)。
这个策略是rabbitmq用来处理当一个消费者在消息处理中途挂掉,来避免消息丢失的方法,只有当一个消息被处理完,才会回复一个ack以标记该消息已被处理,此时无需转交给其他消费者,可由rabbitmq来自行销毁。
同时,可以在可视化程序中查看历史消费记录
2、重复声明/前后不一致
声明队列时,不可以声明同名但与上次配置不同的队列,如上次运行send声明如下队列
channel.QueueDeclare("task_queue2", durable:false, exclusive:false, autoDelete:false, arguments:null);
下次运行时不可以在不改名的情况下修改其他配置,来测试各个属性功能(会报错)
channel.QueueDeclare("task_queue2", durable:true, exclusive:false, autoDelete:false, arguments:null); // 错误写法
声明队列和消费时,必须使用同一个队列,否则会报错
3、Message durability 消息持久化
上面的ack消息确认保证了当一个消费者挂掉,消息不会丢失而是重新回到队列由其他消费者消费。但是不能保证当rabbitmq服务器挂掉后队列以及消息的丢失,所以需要设置队列持久化、消息持久化(二者缺一不可)。需要注意的是,这里的持久化只是将队列、消息存到内存中,保存在内存中仍有丢失的风险如电脑死机(保存到硬盘上才能确保不会真正的丢失,这时候就需要学习publisher confirms)
1)队列持久化 :durable:true
(注意修改名称,同时在消费者处也要同步修改)
channel.QueueDeclare("task_queue3", durable:true, exclusive:false, autoDelete:false, arguments:null);
2)消息持久化 :(注意只需要修改生产者代码,添加消息持久化)
var properties = channel.CreateBasicProperties(); //避免因为服务器重启而丢失生产者的数据
properties.Persistent = true;
生产者代码的添加位置如下:
4、Fair Dispatch 公平调度
消息的分发可能并没有如我们想要的那样公平分配。比如,对于两个工作者。当奇数个消息的任务比较重,但是偶数个消息任务比较轻时,奇数个工作者始终处理忙碌状态,而偶数个工作者始终处理空闲状态。但是RabbitMQ并不知道这些,他仍然会平均依次的分发消息。
为了改变这一状态,我们可以使用basicQos方法
,设置perfetchCount=1
。这样就告诉RabbitMQ 不要在同一时间给一个消费者发送超于1个消息。或者换句话说,在一个消费者还在处理消息,并且没有响应消息之前不要给他分发新的消息,而是将这条新的消息发送给下一个不那么忙碌的消费者。
此时如果不存在这样一个不忙碌的消费者,那么这条消息就会一直堆积在队列中,若此种情况一直发生队列中的消息就会被不断地积压。为了防止消息积压,你一定想要监视这种情况以便增加消费者或者更改消费策略。
channel.BasicQos(prefetchSize: 0, prefetchCount: 1, global: false);
在生产者的代码下添加:
5、综合以上知识点的代码:
生产者:
using RabbitMQ.Client;
using System;
using System.Collections.Generic;
using System.Linq;
using System.Text;
using System.Threading.Tasks;
using static System.Net.Mime.MediaTypeNames;
namespace ProjectSend
{
public class MainClass
{
static void Main(string[] args)
{
var factory = new ConnectionFactory();
factory.HostName = "localhost"; // 本地运行
factory.UserName = "lyh";
factory.Password = "1211";
using (var connection = factory.CreateConnection())
{
using (var channel = connection.CreateModel())
{
channel.QueueDeclare("task_queue3", durable:true, exclusive:false, autoDelete:false, arguments:null); // 创建一个名为task_queue的消息队列
channel.BasicQos(prefetchSize: 0, prefetchCount: 1, global: false);
var message = Console.ReadLine();
while (!string.IsNullOrWhiteSpace(message)) //输入 空格 再回车停止程序
{
var body = Encoding.UTF8.GetBytes(message); //信息按照UTF-8编码为二进制数组 || 实体类需要序列化再转为二进制数组
var properties = channel.CreateBasicProperties(); //避免因为服务器重启而丢失生产者的数据
properties.Persistent = true;
channel.BasicPublish(exchange: "", routingKey: "task_queue3", basicProperties: properties, body: body);//发布信息
Console.WriteLine("已发送:{0}", message);
message = Console.ReadLine();//传递的信息
}
}
}
Console.ReadLine();
}
}
}
消费者1:
using RabbitMQ.Client;
using RabbitMQ.Client.Events;
using System;
using System.Collections.Generic;
using System.Linq;
using System.Text;
using System.Threading.Tasks;
namespace ProjectReceive
{
public class MainClass
{
static void Main(string[] args)
{
var factory = new ConnectionFactory();
factory.HostName = "localhost";
factory.UserName = "lyh";
factory.Password = "1211";
using (var connection = factory.CreateConnection())
{
using (var channel = connection.CreateModel())
{
channel.QueueDeclare(queue: "task_queue3", durable: true, exclusive: false, autoDelete: false, arguments: null); // 声明一个队列
var consumer = new EventingBasicConsumer(channel); //定义同步的消费者
consumer.Received += (model, r) => //接收消息
{
var body = r.Body.ToArray();
var message = Encoding.UTF8.GetString(body); //将二进制数据转为字符串
Console.WriteLine("[x1] 已接收:{0}", message);
int dots = message.Split('.').Length; //根据传过来数据中的.个数等待,模拟耗时任务
Thread.Sleep(dots * 1000);
Console.WriteLine("[x1] Done");
};
channel.BasicConsume(queue: "task_queue3", autoAck: true, consumer: consumer); //进行消费,消费后就不会被另一消费者获取到了
Console.ReadLine();
}
}
}
}
}
消费者2:
using RabbitMQ.Client;
using RabbitMQ.Client.Events;
using System;
using System.Collections.Generic;
using System.Linq;
using System.Text;
using System.Threading.Tasks;
namespace ProjectReceive2
{
public class MainClass
{
static void Main(string[] args)
{
var factory = new ConnectionFactory();
factory.HostName = "localhost";
factory.UserName = "lyh";
factory.Password = "1211";
using (var connection = factory.CreateConnection())
{
using (var channel = connection.CreateModel())
{
channel.QueueDeclare(queue: "task_queue3", durable: true, exclusive: false, autoDelete: false, arguments: null); // 声明一个队列
var consumer = new EventingBasicConsumer(channel);
consumer.Received += (model, r) =>
{
var body = r.Body.ToArray();
var message = Encoding.UTF8.GetString(body);
Console.WriteLine("[x2] 已接收:{0}", message);
int dots = message.Split('.').Length;
Thread.Sleep(dots * 1000);
Console.WriteLine("[x2] Done");
channel.BasicAck(r.DeliveryTag, false);
};
channel.BasicConsume(queue: "task_queue3", autoAck: false, consumer: consumer);
Console.ReadLine();
}
}
}
}
}
运行结果: