rabbitmq+netcore6 【2】Work Queues:一个生产者两个消费者

news2024/11/16 18:59:03

文章目录

    • 1)准备工作
    • 2)新建消费者1
    • 3)新建消费者2
    • 4)生产者
    • 5)知识点解读
      • 1、autoAck: true
      • 2、重复声明/前后不一致
      • 3、Message durability 消息持久化
      • 4、Fair Dispatch 公平调度
    • 5、综合以上知识点的代码:

官网参考链接:https://www.rabbitmq.com/tutorials/tutorial-two-dotnet.html
其他人的翻译版参考:https://www.cnblogs.com/longlongogo/p/6489574.html
以下工作是本人在参考官网的教程下,结合自己的理解做的简易版,更深刻的理解还需要参考官网进行学习哦

1)准备工作

rabbitmqctl status

运行成功后显示:
在这里插入图片描述
新建一个用户,设置密码,并授予权限,并将其设置为管理员

rabbitmqctl  add_user  JC JayChou   //创建用户JC密码为JayChou
rabbitmqctl  set_permissions  JC ".*"  ".*"  ".*"    //赋予JC读写所有消息队列的权限
rabbitmqctl  set_user_tags JC administrator    //分配用户组

进入本机rabbitmq的可视化网址http://localhost:15672/,使用上述账号密码进行登录

2)新建消费者1

新建一个netcore6的控制台项目,新建一个类MainClass,注释掉program.cs的代码,使MainClass中的tatic void Main(string[] args)作为程序的入口,在MainClass中写下如下代码:

using RabbitMQ.Client;
using RabbitMQ.Client.Events;
using System;
using System.Collections.Generic;
using System.Linq;
using System.Text;
using System.Threading.Tasks;

namespace ProjectReceive
{
    public class MainClass
    {
        static void Main(string[] args)
        {
            var factory = new ConnectionFactory(); // 定义用于连接RabbitMQ节点的工厂类
            factory.HostName = "localhost"; // RabbitMQ服务在本地运行
            factory.UserName = "lyh";
            factory.Password = "1211";

            using (var connection = factory.CreateConnection())
            {
                using (var channel = connection.CreateModel())
                {
                    channel.QueueDeclare(queue: "task_queue2", durable: false, exclusive: false, autoDelete: false, arguments: null);  // 声明一个队列

                    var consumer = new EventingBasicConsumer(channel); //定义同步的消费者

                    consumer.Received += (model, r) => //接收消息
                    {
                        var body = r.Body.ToArray();
                        var message = Encoding.UTF8.GetString(body); //将二进制数据转为字符串
                        Console.WriteLine("[x1] 已接收:{0}", message);
                        int dots = message.Split('.').Length; //根据传过来数据中的.个数等待,模拟耗时任务
                        Thread.Sleep(dots * 1000);
                        Console.WriteLine("[x1] Done");
                    };
                    channel.BasicConsume(queue: "task_queue2", autoAck: true, consumer: consumer); //进行消费,消费后就不会被另一消费者获取到了(autoAck: true 自动标记已消费)
                    Console.ReadLine();
                }
            }
        }
    }
}

3)新建消费者2

步骤及内容同消费者1,输出内容有区别

using RabbitMQ.Client;
using RabbitMQ.Client.Events;
using System;
using System.Collections.Generic;
using System.Linq;
using System.Text;
using System.Threading.Tasks;

namespace ProjectReceive2
{
    public class MainClass
    {
        static void Main(string[] args)
        {
            var factory = new ConnectionFactory();
            factory.HostName = "localhost";
            factory.UserName = "lyh";
            factory.Password = "1211";

            using (var connection = factory.CreateConnection())
            {
                using (var channel = connection.CreateModel())
                {
                    channel.QueueDeclare(queue: "task_queue2", durable: false, exclusive: false, autoDelete: false, arguments: null);  // 声明一个队列

                    var consumer = new EventingBasicConsumer(channel);

                    consumer.Received += (model, r) =>
                    {
                        var body = r.Body.ToArray();
                        var message = Encoding.UTF8.GetString(body);
                        Console.WriteLine("[x2] 已接收:{0}", message);
                        int dots = message.Split('.').Length;
                        Thread.Sleep(dots * 1000);
                        Console.WriteLine("[x2] Done");
                        channel.BasicAck(r.DeliveryTag, false); // 手动标记已消费
                    };
                    channel.BasicConsume(queue: "task_queue2", autoAck: false, consumer: consumer); // (autoAck: false 手动标记已消费)
                    Console.ReadLine();
                }
            }
        }
    }
}

4)生产者

注:队列进行持久化设置即 csharpchannel.QueueDeclare("task_queue2", durable:false, exclusive:false, autoDelete:false, arguments:null);,仅代表队列的相关属性会被持久化,队列中的消息需要单独设置

using RabbitMQ.Client;
using System;
using System.Collections.Generic;
using System.Linq;
using System.Text;
using System.Threading.Tasks;
using static System.Net.Mime.MediaTypeNames;

namespace ProjectSend
{
    public class MainClass
    {
        static void Main(string[] args)
        {
            var factory = new ConnectionFactory();
            factory.HostName = "localhost"; // 本地运行
            factory.UserName = "lyh"; 
            factory.Password = "1211";

            using (var connection = factory.CreateConnection())
            {
                using (var channel = connection.CreateModel())
                {
                    channel.QueueDeclare("task_queue2", durable:false, exclusive:false, autoDelete:false, arguments:null); // 创建一个名为task_queue的消息队列 
                    var message = Console.ReadLine();
                    while (!string.IsNullOrWhiteSpace(message)) //循环接收直到输入空格,回车停止程序
                    {
                        var body = Encoding.UTF8.GetBytes(message); //信息按照UTF-8编码为二进制数组 || 实体类需要序列化再转为二进制数组
                        channel.BasicPublish(exchange: "", routingKey: "task_queue2", basicProperties: properties, body: body);//发布信息
                        Console.WriteLine("已发送:{0}", message);
                        message = Console.ReadLine(); //接收传递的信息
                    }
                }
            }
            Console.ReadLine();
        }
    }
}

先运行两个消费者,再运行生产者并输入如下数据再回车(保证每行后面都要有回车),即可查看消费者的消费记录

First message.
Second message..
Third message...
Fourth message....
Fifth message.....

当发送程序输入空格并回车,则结束发送程序。可以看出两个消费方式不同的程序都对发送的消息进行了均匀的消费
在这里插入图片描述】、

5)知识点解读

1、autoAck: true

如果不设置 autoAck: true ,如下:

channel.BasicConsume(queue: "task_queue2", autoAck: true, consumer: consumer); // autoAck: true 自动标记已消费

则需要在接收一个消息后手动进行标记

channel.BasicAck(deliveryTag:r.DeliveryTag, multiple:false); // 手动标记已消费

否则程序结束后再次运行,仍会得到上次已经消费过的消息(因为没有在已经消费过的消息上打上已消费的标记,所以还在队列中存在,这会导致rabbitmq无法释放掉已消费的消息而消耗越来越多的内存)。

这个策略是rabbitmq用来处理当一个消费者在消息处理中途挂掉,来避免消息丢失的方法,只有当一个消息被处理完,才会回复一个ack以标记该消息已被处理,此时无需转交给其他消费者,可由rabbitmq来自行销毁。

同时,可以在可视化程序中查看历史消费记录
在这里插入图片描述

2、重复声明/前后不一致

声明队列时,不可以声明同名但与上次配置不同的队列,如上次运行send声明如下队列

channel.QueueDeclare("task_queue2", durable:false, exclusive:false, autoDelete:false, arguments:null); 

下次运行时不可以在不改名的情况下修改其他配置,来测试各个属性功能(会报错)

channel.QueueDeclare("task_queue2", durable:true, exclusive:false, autoDelete:false, arguments:null);  // 错误写法

声明队列和消费时,必须使用同一个队列,否则会报错
在这里插入图片描述

3、Message durability 消息持久化

上面的ack消息确认保证了当一个消费者挂掉,消息不会丢失而是重新回到队列由其他消费者消费。但是不能保证当rabbitmq服务器挂掉后队列以及消息的丢失,所以需要设置队列持久化、消息持久化(二者缺一不可)。需要注意的是,这里的持久化只是将队列、消息存到内存中,保存在内存中仍有丢失的风险如电脑死机(保存到硬盘上才能确保不会真正的丢失,这时候就需要学习publisher confirms)

1)队列持久化durable:true(注意修改名称,同时在消费者处也要同步修改)

channel.QueueDeclare("task_queue3", durable:true, exclusive:false, autoDelete:false, arguments:null);

2)消息持久化 :(注意只需要修改生产者代码,添加消息持久化)

var properties = channel.CreateBasicProperties(); //避免因为服务器重启而丢失生产者的数据
properties.Persistent = true;

生产者代码的添加位置如下:
在这里插入图片描述

4、Fair Dispatch 公平调度

消息的分发可能并没有如我们想要的那样公平分配。比如,对于两个工作者。当奇数个消息的任务比较重,但是偶数个消息任务比较轻时,奇数个工作者始终处理忙碌状态,而偶数个工作者始终处理空闲状态。但是RabbitMQ并不知道这些,他仍然会平均依次的分发消息。

为了改变这一状态,我们可以使用basicQos方法,设置perfetchCount=1 。这样就告诉RabbitMQ 不要在同一时间给一个消费者发送超于1个消息。或者换句话说,在一个消费者还在处理消息,并且没有响应消息之前不要给他分发新的消息,而是将这条新的消息发送给下一个不那么忙碌的消费者。

此时如果不存在这样一个不忙碌的消费者,那么这条消息就会一直堆积在队列中,若此种情况一直发生队列中的消息就会被不断地积压。为了防止消息积压,你一定想要监视这种情况以便增加消费者或者更改消费策略。

channel.BasicQos(prefetchSize: 0, prefetchCount: 1, global: false);

在生产者的代码下添加:
在这里插入图片描述

5、综合以上知识点的代码:

生产者:

using RabbitMQ.Client;
using System;
using System.Collections.Generic;
using System.Linq;
using System.Text;
using System.Threading.Tasks;
using static System.Net.Mime.MediaTypeNames;

namespace ProjectSend
{
    public class MainClass
    {
        static void Main(string[] args)
        {
            var factory = new ConnectionFactory();
            factory.HostName = "localhost"; // 本地运行
            factory.UserName = "lyh";
            factory.Password = "1211";

            using (var connection = factory.CreateConnection())
            {
                using (var channel = connection.CreateModel())
                {
                    channel.QueueDeclare("task_queue3", durable:true, exclusive:false, autoDelete:false, arguments:null); // 创建一个名为task_queue的消息队列
                    channel.BasicQos(prefetchSize: 0, prefetchCount: 1, global: false);
                    var message = Console.ReadLine();
                    while (!string.IsNullOrWhiteSpace(message)) //输入 空格 再回车停止程序
                    {
                        var body = Encoding.UTF8.GetBytes(message); //信息按照UTF-8编码为二进制数组 || 实体类需要序列化再转为二进制数组
                        var properties = channel.CreateBasicProperties(); //避免因为服务器重启而丢失生产者的数据
                        properties.Persistent = true;
                        channel.BasicPublish(exchange: "", routingKey: "task_queue3", basicProperties: properties, body: body);//发布信息
                        Console.WriteLine("已发送:{0}", message);
                        message = Console.ReadLine();//传递的信息
                    }
                }
            }
            Console.ReadLine();
        }
    }
}

消费者1:

using RabbitMQ.Client;
using RabbitMQ.Client.Events;
using System;
using System.Collections.Generic;
using System.Linq;
using System.Text;
using System.Threading.Tasks;

namespace ProjectReceive
{
    public class MainClass
    {
        static void Main(string[] args)
        {
            var factory = new ConnectionFactory();
            factory.HostName = "localhost";
            factory.UserName = "lyh";
            factory.Password = "1211";

            using (var connection = factory.CreateConnection())
            {
                using (var channel = connection.CreateModel())
                {
                    channel.QueueDeclare(queue: "task_queue3", durable: true, exclusive: false, autoDelete: false, arguments: null);  // 声明一个队列

                    var consumer = new EventingBasicConsumer(channel); //定义同步的消费者

                    consumer.Received += (model, r) => //接收消息
                    {
                        var body = r.Body.ToArray();
                        var message = Encoding.UTF8.GetString(body); //将二进制数据转为字符串
                        Console.WriteLine("[x1] 已接收:{0}", message);
                        int dots = message.Split('.').Length; //根据传过来数据中的.个数等待,模拟耗时任务
                        Thread.Sleep(dots * 1000);
                        Console.WriteLine("[x1] Done");
                    };
                    channel.BasicConsume(queue: "task_queue3", autoAck: true, consumer: consumer); //进行消费,消费后就不会被另一消费者获取到了
                    Console.ReadLine();
                }
            }
        }
    }
}

消费者2:

using RabbitMQ.Client;
using RabbitMQ.Client.Events;
using System;
using System.Collections.Generic;
using System.Linq;
using System.Text;
using System.Threading.Tasks;

namespace ProjectReceive2
{
    public class MainClass
    {
        static void Main(string[] args)
        {
            var factory = new ConnectionFactory();
            factory.HostName = "localhost";
            factory.UserName = "lyh";
            factory.Password = "1211";

            using (var connection = factory.CreateConnection())
            {
                using (var channel = connection.CreateModel())
                {
                    channel.QueueDeclare(queue: "task_queue3", durable: true, exclusive: false, autoDelete: false, arguments: null);  // 声明一个队列

                    var consumer = new EventingBasicConsumer(channel);

                    consumer.Received += (model, r) =>
                    {
                        var body = r.Body.ToArray();
                        var message = Encoding.UTF8.GetString(body);
                        Console.WriteLine("[x2] 已接收:{0}", message);
                        int dots = message.Split('.').Length;
                        Thread.Sleep(dots * 1000);
                        Console.WriteLine("[x2] Done");
                        channel.BasicAck(r.DeliveryTag, false);
                    };
                    channel.BasicConsume(queue: "task_queue3", autoAck: false, consumer: consumer);
                    Console.ReadLine();
                }
            }
        }
    }
}

运行结果:
在这里插入图片描述

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.coloradmin.cn/o/136693.html

如若内容造成侵权/违法违规/事实不符,请联系多彩编程网进行投诉反馈,一经查实,立即删除!

相关文章

Linux的运行级别

Linux的运行级别: Linux系统有7种运行级别(runlevel): 运行级别 0:系统停机状态,系统默认运行级别不能设为0,否则不能正常启动运行运行级别 1:单用户工作状态,root权限,用于系统维护,找回丢失…

少儿Python每日一题(9):约瑟夫环

原题解答 本次的题目如下所示(原题出处:蓝桥杯) 【编程实现】 有n个人围成一个圈,按顺序排好号。然后从第一个人开始报数(从1到3 报数),报到3的人退出圈子,然后继续从1到3报数,直到最后留下一个 人游戏结束,问最后留下的是原来第几号。 输入描述:输入一个正整数n 输…

国际手机号码检查纠正 API 接口

国际手机号码检查纠正 API 接口 有效性检查及智能纠正,遵循 E.164 标准,智能统一格式。 1. 产品功能 智能检测国际手机号码有效性;可根据提供的国家编码参数,判断提供的手机号码是否为该国家有效手机号码;智能纠正提…

场景题:假设10W人突访,你的系统如何做到不 雪崩?

文章很长,而且持续更新,建议收藏起来,慢慢读!疯狂创客圈总目录 博客园版 为您奉上珍贵的学习资源 : 免费赠送 :《尼恩Java面试宝典》 持续更新 史上最全 面试必备 2000页 面试必备 大厂必备 涨薪必备 免费赠送 经典…

头歌:Ping服务端创建UDP套接字(底部附全关完整答案)

头歌实践教学平台 (educoder.net)在 Ping 的 服务程序中 创建一个使用 UDP 协议的 套接字数据包套接字类型套接字三种类型:流式套接字(SOCK_STREAM),数据包套接字(SOCK_DGRAM)及原始套接字(SOCK_RAW)数据包格式套接字(Datagram So…

JavaSE---多用户网络通信系统

目录 项目开发流程 多用户网络通信系统的架构设计 客户端 界面层 服务层 管理层 服务端 服务层 功能层 管理层 总结 项目开发流程 多用户网络通信系统的架构设计 整体 作为一个可供多个用户使用的通信系统,那么每个用户和其他用户之间的连接必定不是直接…

电脑宽带连接提示错误代码769怎么办?

有用户反映,在使用宽带连接网络时,出现错误代码769,无法连接到指定目标怎么办?这里整理了错误代码769的可能原因和修复方法,带大家顺利连接网络。错误代码769的原因:连接线松动或损坏网卡被禁用网卡驱动过时…

【信息论与编码 沈连丰】第四章:离散信源的信源编码

【信息论与编码 沈连丰】第四章:离散信源的信源编码第四章 离散信源的信源编码4.1 信源编码的模型4.2 信息传输速率和编码效率4.3 单义可译定理4.4 无失真信源编码定理4.5 几种典型的信源编码方法4.6 汉字编码方法及其讨论4.7 图像的信源编码4.8 误码对信源译码的影…

openFeign远程调用返回页面404 ,对应配置文件不生效,排除数据源等问题

在使用上架商品功能时,在debug时候,发现在将数据发送给ES保存时,无法远程调用es的服务,报错404找不到接口,如下图: 一开始以为是openFeign的问题,经过检查,各种接口、注解都没问题&…

2022尚硅谷SSM框架跟学(一)MyBatis基础一

2022尚硅谷SSM框架跟学 一MyBatisSSM框架整合课程优势课程体系框架图MyBatis1、MyBatis简介1.1MyBatis历史1.2MyBatis特性1.3MyBatis下载1.4和其它持久化层技术对比JDBCHibernate 和 JPAMyBatis2.搭建MyBatis2.1开发环境2.2创建maven工程(1)打包方式:jar(2)引入依赖…

【UE4 第一人称射击游戏】20-添加瞄准十字线

上一篇:【UE4 第一人称射击游戏】19-修复冲刺或换弹时可以进行射击的bug本篇效果:步骤:先下载一个瞄准的十字线图片,可以从阿里巴巴矢量图库下载:https://www.iconfont.cn/search/index?searchTypeicon&q%E7%9E%8…

反射Reflection

目录1. 反射快速入门1. 需求2. 运用反射2. 反射原理图2.1 反射相关的主要类2.1 反射优点和缺点2.1.1 反射调用优化-关闭访问检查4. Class类分析4.1 Class类常用方法4.2 获取Class类对象【六种】4.3 哪些类型有class对象4.4 动态和静态加载4.5 类加载流程图5. 获取类结构信息5.1…

RabbitMQ、Kafka、RocketMQ三种消息中间件对比总结

文章目录前言侧重点架构模型消息通讯其他对比总结参考文档前言 不论Kafka还是RabbitMQ和RocketMQ,作为消息中间件,其作用为应用解耦、异步通讯、流量削峰填谷等。 拿我之前参加的一个电商项目来说,订单消息通过MQ从订单系统到支付系统、库存…

【国科大模式识别】第一次作业

【题目一】设 ωmax⁡\omega_{\max }ωmax​ 为类别状态, 此时对所有的 i(i1,…,c)i(i1, \ldots, c)i(i1,…,c), 有 P(ωmax⁡∣x)≥P\left(\omega_{\max } \mid \boldsymbol{x}\right) \geqP(ωmax​∣x)≥ P(ωi∣x)P\left(\omega_i \mid \boldsymbol{x}\right)P(ωi​∣x) …

理解 mysql 之 count(*)的性能问题

一、 count(*) 为什么性能差 在Mysql中,count()的作用是统计表中记录的总行数。而count()的性能跟存储引擎有直接关系,并非所有的存储引擎,count(*)的性能都很差。在Mysql中使用最多的存储引擎是:innodb 和 myisam 。 在 myisam…

手写RPC框架-整合注册中心模块设计与实现

源码地址:https://github.com/lhj502819/IRpc/tree/v2 思考 如果同一个服务有10台不同的机器进行提供,那么客户端该从哪获取这10台目标机器的ip地址信息呢?随着调用方的增加,如何对服务调用者的数据进行监控呢?服务提…

十五、类加载器、反射、xml

类加载器 1类加载器【理解】 作用 负责将.class文件(存储的物理文件)加载在到内存中 2类加载的过程【理解】 类加载时机 创建类的实例(对象)调用类的类方法访问类或者接口的类变量,或者为该类变量赋值使用反射方式来…

【C++编程调试秘籍】| 总结归纳要点

文章目录一、编译器是捕捉缺陷的最好场合1 如何使用编译器捕捉缺陷二、在运行时遇见错误该如何处理1 该输出哪些错误信息2 执行安全检查则会减低程序效率,该如何处理呢3 当运行时遇到错误时,该如何处理四、索引越界1 动态数组2 静态数组3 多维数组5 指针…

uboot驱动和Linux内核驱动有什么区别?

一、前言 uboot启动后,一些外设如DDR、EMMC、网口、串口、音频、显示等等已经被初始化,为什么Linux内核中还需要写Linux驱动呢? 二、uboot驱动和Linux驱动的区别 1、直观理解 驱动,不仅仅是为了初始化,还实现了一组…

《Linux》1.权限

1.用户 首先介绍一下Linux中的用户概念。Linux下有两种用户:超级用户(root),普通用户。 超级用户:可以再linux系统下做任何事情,不受限制 普通用户:在linux下做有限的事情。 超级用户的命令提示…