rabbitmq+netcore6 【3】Publish/Subscribe:发布/订阅

news2024/12/26 5:35:58

文章目录

    • 1)前言
    • 2)临时队列
    • 3)绑定
    • 4)综合以上代码
      • 准备工作
      • 1、生产者
      • 2、消费者1
      • 3、消费者2
    • 5)验证

官网教程原文链接: https://www.rabbitmq.com/tutorials/tutorial-three-dotnet.html
翻译版参考链接: https://www.cnblogs.com/grayguo/p/5356070.html

上一章节的文章介绍的是将多个消息发给两个消费者,现在我们来试试将一个消息发给多个消费者,这种模式称为发布/订阅。

为了说明这种模式,我们将构建一个简单的日志记录系统。它将由两个程序组成 :第一个将发出日志消息,第二个将接收并打印它们。在我们的日志系统中,每一个运行中的接收者副本将都会获得消息,这种方式可以让我们在运行一个接收者直接把消息保存在磁盘的同时,另外一个消费者可以把消息打印到屏幕上。(本质上,已发布的日志消息将广播到所有接收方)

1)前言

  • 生产者(发布者)是发送消息的用户应用程序
  • 队列是存储消息的缓冲区
  • 消费者(接收者)是接收消息的用户应用程序

实际上生产者不知道自己发送的消息会被存入队列中,生产者是直接将消息发送给交换机的,然后根据交换类型由交换机决定将消息发送给哪个队列或哪些队列或直接丢弃该消息。
在这里插入图片描述
交换机类型分为 directtopicheadersfanout ,为了达到日志系统的功能,我们将创建一个fanout类型的交换机,名字叫做logs。fanout类型的交换机会将收到的所有消息发给已知的所有队列

channel.ExchangeDeclare("logs", ExchangeType.Fanout);

可以细心的看一下,当只写两个参数时,duiable和autoDelete默认设置为false:
在这里插入图片描述
这里你会有个疑惑,上一篇work queues的文章中没有提到过交换机的事,是怎么将消息发送给队列的呢?
实际上我们使用的是一个默认的交互机,名字为空(“”),如下
在这里插入图片描述
当指定routingkey后会将消息发给指定的队列,若不指定则将发给所有的队列

var message = GetMessage(args);
var body = Encoding.UTF8.GetBytes(message);
channel.BasicPublish(exchange: "logs",
                     routingKey: "",
                     basicProperties: null,
                     body: body);

注:使用如下命令可以列出rabbitmq中的已添加的交换机

rabbitmqctl list_exchanges

在这里插入图片描述

2)临时队列

上一篇文章中,我们给每一个队列起了一个名字,这样在消费者代码中指定同样的队列就能对该队列中发布的消息进行消费(生产者、消费者共享一个队列),但是我们这章想做的有所区别:

  • 监听到所有的日志消息,而非其子集
  • 只得到当前正在流转的消息,而非旧的消息

为了解决以上两点我们需要做两件事:

  • 1、无论何时我们连接到rabbitmq都需要新建一个崭新的空队列,换就话说我们可以每次创建一个随机名称的队列,更好的方式是让服务器随机选取一个名字来给我们的队列。使用var queueName = channel.QueueDeclare().QueueName;可以查看名字
  • 2、当消费者断开连接时队列应当同时被删除
  • 在.Net Client 我们使用无参的channel.QueueDeclare()方法来创建一个随机命名的、非持久的、自动删除的、的队列.

3)绑定

交换机与队列之间的关系叫做绑定,添加了绑定交换机才知道给那些队列转发消息。

channel.QueueBind(queue: queueName,
                  exchange: "logs",
                  routingKey: "");

使用以下代码可以查看已有的绑定信息

rabbitmqctl list_bindings

4)综合以上代码

准备工作

新建一个netcore6的控制台项目,添加RabbitMQ的包依赖

NuGet\Install-Package RabbitMQ.Client -Version 6.4.0

在这里插入图片描述
新建一个类MainClass,注释掉program.cs的代码,使MainClass中的tatic void Main(string[] args)作为程序的入口
按照此方法新建三个netcore6的控制台项目,分别代表生产者,消费者1,消费者2

1、生产者

输入空格再回车即停止输入

using RabbitMQ.Client;
using System;
using System.Collections.Generic;
using System.Linq;
using System.Text;
using System.Threading.Tasks;

namespace ProgjectSentLog
{
    public class MainClass
    {
        static void Main() 
        {
            var factory = new ConnectionFactory()
            {
                HostName = "localhost",
                UserName = "lyh",
                Password = "1211"
            };

            using(var connection = factory.CreateConnection())
            {
                using(var channel = connection.CreateModel())
                {
                    channel.ExchangeDeclare(exchange: "logs", type: ExchangeType.Fanout);
                    var message = Console.ReadLine();
                    while(!String.IsNullOrWhiteSpace(message))
                    {
                        var body = Encoding.UTF8.GetBytes(message);
                        channel.BasicPublish(exchange:"logs",routingKey:"",basicProperties:null,body:body);
                        Console.WriteLine("已发送:{0}", message);
                        message= Console.ReadLine();
                    }
                }
            }
            Console.ReadLine();
        }
    }
}

2、消费者1

using RabbitMQ.Client;
using RabbitMQ.Client.Events;
using System;
using System.Collections.Generic;
using System.Linq;
using System.Security.Cryptography;
using System.Text;
using System.Threading.Tasks;

namespace ProjectReceiveLog1
{
    public class MainClass
    {
        static void Main()
        {
            var factory = new ConnectionFactory()
            {
                HostName = "localhost",
                UserName = "lyh",
                Password = "1211"
            };
            using (var connection = factory.CreateConnection())
            {
                using(var channel = connection.CreateModel())
                {
                    channel.ExchangeDeclare(exchange: "logs", type: ExchangeType.Fanout);
                    var queueName = channel.QueueDeclare().QueueName;
                    channel.QueueBind(queue: queueName, exchange: "logs", routingKey: "");
                    Console.WriteLine("[*1] Waiting for logs");
                    var consumer = new EventingBasicConsumer(channel);
                    consumer.Received += (model, ea) =>
                    {
                        var body = ea.Body.ToArray();
                        var message = Encoding.UTF8.GetString(body);
                        Console.WriteLine("[x1] {0}", message);
                    };
                    channel.BasicConsume(queue:queueName,autoAck:true,consumer:consumer);
                    Console.WriteLine("Press [enter] to exit");
                    Console.ReadLine();
                }
            }
        }
    }
}

3、消费者2

代码同消费者1,只是输出略有差别

using RabbitMQ.Client;
using RabbitMQ.Client.Events;
using System;
using System.Collections.Generic;
using System.Linq;
using System.Security.Cryptography;
using System.Text;
using System.Threading.Tasks;

namespace ProjectReceiveLog2
{
    public class MainClass
    {
        static void Main()
        {
            var factory = new ConnectionFactory()
            {
                HostName = "localhost",
                UserName= "lyh",
                Password="1211"
            };
            using(var connection = factory.CreateConnection())
            {
                using(var channel = connection.CreateModel())
                {
                    channel.ExchangeDeclare(exchange: "logs", type: ExchangeType.Fanout);
                    var queueName = channel.QueueDeclare().QueueName;
                    channel.QueueBind(queue: queueName, exchange: "logs", "");
                    Console.WriteLine("[*2] Waiting for logs");
                    var consumer = new EventingBasicConsumer(channel);
                    consumer.Received += (model, ea) =>
                    {
                        var body = ea.Body.ToArray();
                        var message = Encoding.UTF8.GetString(body);
                        Console.WriteLine("[x2] {0}", message);
                    };
                    channel.BasicConsume(queue: queueName, autoAck: true, consumer);
                    Console.WriteLine("Press [Enter] to exit");
                    Console.ReadLine();
                }
            }

        }
    }
}

先运行两个消费者,再运行生产者,在生产者处输入的消息可以被两个消费者接收到。
在这里插入图片描述

5)验证

上面说到:当消费者断开连接时队列应当同时被删除,这里可以看出当程序结束时,队列、绑定都已经被删除
在这里插入图片描述
如果想了解如何监听子集要看下一篇文章奥

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.coloradmin.cn/o/144353.html

如若内容造成侵权/违法违规/事实不符,请联系多彩编程网进行投诉反馈,一经查实,立即删除!

相关文章

大话测试数据(二):概念测试数据的获取

在大话测试数据(一)文章中,我提到,获取数据的第一步是获取概念上数据。这一步看起来简单,其实不是那么容易。获取概念数据和获取需求的过程是交织在一起的,事实上,它们其实是一个事儿&#xff0…

Ribbon、Feign、Hystrix超时重试熔断问题

文章目录问题描述重试次数不生效开启熔断后重试次数生效fallbackFactory回退降级异常为空问题1分析问题2、3分析总结feign请求次数计算Hystrix超时时间设置公式问题描述 在使用Ribbon、Feign、Hystrix组合时,因为配置的问题出现以下现象,让我的大脑CPU烧…

[SWPU2019]Web1

目录 [SWPU2019]Web1 无列名查看表数据 不使用列名查询表中数据 [SWPU2019]Web1 首先我们先注册,登录进来后看到如下界面: 我们点击申请发布广告,并发送: 查看广告详情,发现疑似存在注入点: 于是我们在发…

Docker 应用篇 | Docker 学习笔记总结

Docker 视频内容可以参考黑马程序员的Docker篇 详细完整内容可以查询菜鸟教程:Docker 教程 本篇博文主要让读者对Docker有一个基本理解并可以借助Docker发布自己的项目 一、初识Docker 1.1 Docker概述 Docker是一个集装箱式的思想 Docker可以让开发者打包他们的…

招聘求职系统|基于Springboot+Vue+Nodejs实现求职招聘系统

作者主页:编程指南针 作者简介:Java领域优质创作者、CSDN博客专家 、掘金特邀作者、多年架构师设计经验、腾讯课堂常驻讲师 主要内容:Java项目、毕业设计、简历模板、学习资料、面试题库、技术互助 收藏点赞不迷路 关注作者有好处 文末获取源…

电脑系统更新后桌面的文件全部不见了怎么恢复?

电脑系统更新是很常见的一种情况,自动更新电脑系统后我们可以进行更优质的使用体验,但是最近有位小伙伴,出现了win10电脑系统更新后桌面文件丢失情况,那么电脑系统更新桌面文件没了怎么办?电脑系统更新桌面文件不见了怎…

实验二十三 基于时间的ACL配置及策略

实验二十三 基于时间的ACL配置及策略实验要求: 某公司通过router实现各部门之间的互连。公司要求禁止销售部门在上班时间(8:00 至18:00)访问工资查询服务器(IP地址为192.168.10.10),财务部门不受限制,可以 随时访问。网络拓扑图:实…

如何定义算法?10分钟带你弄懂算法的基本概念

算法是指完成一个任务所需要的具体步骤和方法。也就是说给定初始状态或输入数据,经过计算机程序的有限次运算,能够得出所要求或期望的终止状态或输出数据。 编程界的“Pascal之父”Nicklaus Wirth有一句人尽皆知的名言:“算法数据结构程序”…

【目标检测】G-GhostNet

1、论文 论文题目:《GhostNets on Heterogeneous Devices via Cheap Operations》 论文地址: https://arxiv.org/pdf/2201.03297.pdf 代码地址: https://github.com/huawei-noah/CV-Backbones 2、引言 本文针对网络部署时面临的内存和资源…

python提取excel文本框内容

就提取excel文本框的内容,提供两种方法 一、 转成pdf,识别pdf文字 该方法需要注意两点: 1.似乎只能识别选中的文字(图片不行) 2.会受到精度影响(即有可能识别出错字) 以下是代码 先转存为pdf格…

IB中文解析,助力冲7分

我们知道,IB、AP、A Level三大国际课程体系都有中文,尤其IB学生,由于必选一门母语与语言,中文成了必选项。IB中文可以说是很多IB学子的心头大患了,引发焦虑的文章比比皆是。 不少家长看到这可能会问,中国学…

【Linux 进程地址空间】

1.程序地址空间的概率&#xff08;C/C的说法不够准确&#xff09;写一段代码来来证明C/C程序地址空间是按上图分布的&#xff1a;#include<stdio.h> #include<string.h> #include<stdlib.h> int uninit; int init100; int main() {printf("code addr:%p…

Anaconda中安装CUDA版本的PyTorch

Question: GPU是一种擅长处理专业计算的处理器。这与中央处理器&#xff08;CPU&#xff09;形成鲜明对比&#xff0c;中央处理器是一种擅长处理一般计算的处理器。CPU是为我们电子设备上大多数典型计算提供动力的处理器。GPU的计算速度比CPU快得多。但是&#xff0c;情况并非…

经验证短视频账号每天最多发3个视频,超过的不予推荐

经验证短视频账号每天最多发3个视频&#xff0c;超过的不予推荐 前两天我在刷短视频的时候&#xff0c;看到一个博主推荐一天可以发几十个视频&#xff0c;感觉有点不对&#xff0c;决定还是自己试一下。 于是&#xff0c;在死亡边缘疯狂试探了好几天&#xff0c;终于得到想要…

在ESXi系统上安装pve

pve是基于debian的&#xff0c;在ESXi上选择系统时建议选择debian并开启虚拟化一、下载下载&#xff1a;https://www.proxmox.com/en/downloads点进下载网站后选择 Proxmox Virtual Environment-->ISO Images-->Proxmox VE 7.3 ISO Installer 下的download按钮二、安装系…

React组件

React组件1.组件基本介绍2.React创建组件的两种方式2.1 函数组件2.2 类与继承2.2.1 class 基本语法2.2.2 extends 实现继承1.组件基本介绍 组件是React中最基本的内容&#xff0c;使用React就是在使用组件组件表示页面中的部分功能多个组件可以实现完整的页面功能组件特点&…

【无人机路径规划】基于IRM和RRTstar进行无人机路径规划(Matlab代码实现)

&#x1f4a5;&#x1f4a5;&#x1f49e;&#x1f49e;欢迎来到本博客❤️❤️&#x1f4a5;&#x1f4a5; &#x1f3c6;博主优势&#xff1a;&#x1f31e;&#x1f31e;&#x1f31e;博客内容尽量做到思维缜密&#xff0c;逻辑清晰&#xff0c;为了方便读者。 ⛳️座右铭&a…

java后端第二阶段:JavaWeb

一、Mysql 定义&#xff1a;关系型数据库&#xff0c;存储在硬盘数据安全。 1.SQL通用语法 注释&#xff1a; 单行注释 -- 注释内容 或 #注释内容&#xff08;MySQL特有&#xff09; 多行注释 /* 注释 */ DDL&#xff1a;操作数据库&#xff0c;表等 DML&#xff1…

PCB设计如何防止阻焊漏开窗

PCB的阻焊层&#xff08;solder mask&#xff09;&#xff0c;是指印刷电路板子上要上绿油的部分。阻焊开窗的位置是不上油墨的&#xff0c;露出来的铜做表面处理后焊接元器件的位置&#xff0c;不开窗的位置都是印上油墨的防止线路氧化、漏电。 PCB阻焊层开窗的三个原因 1.孔…

【架构设计】如何让你的应用做到高内聚、低耦合?

前言 最近review公司的代码&#xff0c;发现代码耦合程度特别高&#xff0c;修改一处&#xff0c;不知不觉就把其他地方影响到了&#xff0c;这就让我思考该如何让我们写的代码足够内聚&#xff0c;减少耦合呢&#xff1f; "高内聚、松耦合"是一个非常重要的设计思…