RabbitMQ-发布/订阅模式

news2024/11/17 3:47:42

1、发布/订阅模式介绍

在普通的生产者、消费者模式,rabbitmq会将消息依次传递给每一个消费者,一个worker一个,平均分配,这就是Round-robin调度方式,为了实现更加复杂的调度,我们就需要使用发布/订阅的方式。

2、交换机(exchange)

RabbitMQ中,消息模型的核心理念就是,生产者从来不能直接将消息发送到队列,甚至生产者都不知道消息要被发送到队列中。

相反,生产者只能将消息发送到交换机中,交换机一侧从生产者接收消息,一侧将消息发送到队列中,交换机需要知道如何处理接收到的消息,是发送给一个队列还是多个队列?这是由交换机的类型决定的。

交换机共分为四类:  directtopicheaders and fanout. 本章节以扇形交换机为例说明rabbitmq的使用。

3、fanout交换机的使用方式

扇形交换机,就像你猜测的那样,他可以将他接收到的全部消息广播到所有队列里。

3.1 声明交换机

首先声明一个扇形交换机,type参数设置为『fanout』

err = ch.ExchangeDeclare(
  "logs",   // name
  "fanout", // type
  true,     // durable
  false,    // auto-deleted
  false,    // internal
  false,    // no-wait
  nil,      // arguments
)

3.2 发送消息到交换机

交换机设定完成后,就可以往该交换机发送消息:

	body := "Hello World!"
	err = ch.Publish("logs", "", false, false, amqp.Publishing{
		ContentType: "text/plain",
		Body:        []byte(body),
	})

如果要在rabbitmq的页面上查看发送的消息,需要提前创建一个队列,并绑定到该交换机[logs]上,就可以查看发送的消息:

扇形交换机的特性,就是他会将收到的消息广播给所有绑定到该交换机的队列,我们可以创建多个队列,并绑定到该交换机上,我们发送一次消息,就会看到,所有绑定到该交换机的队列中都会有一条消息,先创建三个队列,并分别绑定到logs交换机:

之后运行脚本,发送两次消息:

 可以看到,三个队列当中都有两条消息。

3.2 扇形交换机发送消息代码

package main

import (
	"fmt"

	amqp "github.com/rabbitmq/amqp091-go"
)

func main() {
	conn, err := amqp.Dial("amqp://guest:guest@localhost:5672/")
	if err != nil {
		fmt.Println("Failed to connect to RabbitMQ")
		return
	}
	defer conn.Close()
	ch, err := conn.Channel()
	if err != nil {
		fmt.Println("Failed to open a channel")
		return
	}
	err = ch.ExchangeDeclare("logs", "fanout", true, false, false, false, nil)
	if err != nil {
		fmt.Println("Failed to declare an exchange")
		return
	}
	body := "Hello World!"
	err = ch.Publish("logs", "", false, false, amqp.Publishing{
		ContentType: "text/plain",
		Body:        []byte(body),
	})
	if err != nil {
		fmt.Println("Failed to publish a message")
		return
	}
}

 3.2 声明队列,用于接收消息

	q, err := ch.QueueDeclare(
		"",    // name
		false, // durable
		false, // delete when unused
		true,  // exclusive
		false, // no-wait
		nil,   // arguments
	)

声明队列时,没有指定队列名称,这时,系统会返回一个随机名称存储在q变量中。 

3.3 binding

队列声明完成后,需要将该队列绑定到交换机上,这样交换机才能把消息广播给该队列:

绑定代码: 

    err = ch.QueueBind(
		q.Name, // queue name
		"",     // routing key
		"logs", // exchange
		false,
		nil,
	)

消费者侧全部代码如下:

package main

import (
	"fmt"

	amqp "github.com/rabbitmq/amqp091-go"
)

func main() {
	conn, err := amqp.Dial("amqp://guest:guest@localhost:5672/")
	if err != nil {
		fmt.Println("Failed to connect to RabbitMQ")
		return
	}
	defer conn.Close()
	ch, err := conn.Channel()
	if err != nil {
		fmt.Println("Failed to open a channel")
		return
	}
	err = ch.ExchangeDeclare("logs", "fanout", true, false, false, false, nil)
	if err != nil {
		fmt.Println("Failed to declare an exchange")
		return
	}
	q, err := ch.QueueDeclare(
		"",    // name
		false, // durable
		false, // delete when unused
		true,  // exclusive
		false, // no-wait
		nil,   // arguments
	)
	err = ch.QueueBind(
		q.Name, // queue name
		"",     // routing key
		"logs", // exchange
		false,
		nil,
	)

	msgs, err := ch.Consume(
		q.Name, // queue
		"",     // consumer
		true,   // auto-ack
		false,  // exclusive
		false,  // no-local
		false,  // no-wait
		nil,    // args
	)
	var forever chan struct{}

	go func() {
		for d := range msgs {
			fmt.Printf(" [x] %s\n", d.Body)
		}
	}()

	fmt.Printf(" [*] Waiting for logs. To exit press CTRL+C")
	<-forever
}

程序启动后,控制台上会增加一个随机命名的队列。

 运行【3.2】的生产者程序,发送消息到扇形交换机,这个时候消费者就会同步消费到消息,并进行打印:

4、总结

关于扇形交换机,核心的一点需要我们记住,发送到扇形交换机的消息,他会将消息广播给所有绑定到该交换机的队列上,无脑广播,所有队列会同时接受到交换机上全部的消息。

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.coloradmin.cn/o/1716984.html

如若内容造成侵权/违法违规/事实不符,请联系多彩编程网进行投诉反馈,一经查实,立即删除!

相关文章

现货白银的交易时间有多连贯?

国际市场上的现货白银优势很多&#xff0c;它除了具备国内同类型品种所不具备的数十倍资金杠杆外&#xff0c;也基本上实现了全天24小时不间断的交易时间&#xff0c;所以投资者可以在全天候连贯的行情中&#xff0c;寻找属于自己的交易获利机会。 但对于内地的投资者来说&…

【香橙派 AIpro】新手保姆级开箱教程:Linux镜像+vscode远程连接

香橙派 AIpro 开发板 AI 应用部署测评 写在最前面一、开发板概述官方资料试用印象适用场景 二、详细开发前准备步骤1. 环境准备2. 环境搭建3. vscode安装ssh插件4. 香橙派 AIpro 添加连接配置5. 连接香橙派 AIpro6. SSH配置 二、详细开发步骤1. 登录 juypter lab2. 样例运行3. …

HQChart使用教程100-uniapp如何在vue3运行微信小程序

HQChart使用教程100-uniapp如何在vue3运行微信小程序 症状原因分析解决思路解决步骤1. 修改vender.js2. 修改HQChartControl.js 完整实例HQChart代码地址 症状 HQChart插件在uniappvue3的项目编译成小程序以后&#xff0c; 运行会报错&#xff0c;见下图。 原因分析 查了下…

从了解到掌握 Spark 计算框架(二)RDD

文章目录 RDD 概述RDD 组成RDD 的作用RDD 算子分类RDD 的创建1.从外部数据源读取2.从已有的集合或数组创建3.从已有的 RDD 进行转换 RDD 常用算子大全转换算子行动算子 RDD 算子综合练习RDD 依赖关系窄依赖宽依赖宽窄依赖算子区分 RDD 血统信息血统信息的作用血统信息的组成代码…

【C语言回顾】预处理

前言1. 简单概要2. 预处理命令讲解结语 上期回顾: 【C语言回顾】编译和链接 个人主页&#xff1a;C_GUIQU 归属专栏&#xff1a;【C语言学习】 前言 各位小伙伴大家好&#xff01;上期小编给大家讲解了C语言中的编译和链接&#xff0c;接下来我们讲解一下预处理&#xff01; …

k8s自定义资源你会创建吗

创建自定义资源定义 CustomResourceDefinition 当你创建新的 CustomResourceDefinition&#xff08;CRD&#xff09;时&#xff0c;Kubernetes API 服务器会为你所 指定的每一个版本生成一个 RESTful 的 资源路径。CRD 可以是名字空间作用域的&#xff0c;也可以是集群作用域的…

接口测试工具:Postman的下载安装及使用

1 Postman 介绍 1.1 Postman 是什么 Postman 是一款功能超级强大的用于发送 HTTP 请求的 测试工具 做 WEB 页面开发和测试的人员常用工具 创建和发送任何的 HTTP 请求(Get/Post/Put/Delete...) 1.2 Postman 相关资源 1.2.1 官方网站&#xff1a;https://www.postman.com/ …

算法(七)插入排序

文章目录 插入排序简介代码实现 插入排序简介 插入排序&#xff08;insertion sort)是从第一个元素开始&#xff0c;该元素就认为已经被排序过了。然后取出下一个元素&#xff0c;从该元素的前一个索引下标开始往前扫描&#xff0c;比该值大的元素往后移动。直到遇到比它小的元…

【Uniapp小程序】自定义导航栏uni-nav-bar滚动渐变色

效果图 新建activityScrollTop.js作为mixins export default {data() {return {navBgColor: "rgba(0,0,0,0)", // 初始背景颜色为完全透明navTextColor: "rgba(0,0,0,1)", // 初始文字颜色};},onPageScroll(e) {// 设置背景const newAlpha Math.min((e.s…

elasticsearch7.15实现用户输入自动补全

Elasticsearch Completion Suggester&#xff08;补全建议&#xff09; Elasticsearch7.15安装 官方文档 补全建议器提供了根据输入自动补全/搜索的功能。这是一个导航功能&#xff0c;引导用户在输入时找到相关结果&#xff0c;提高搜索精度。 理想情况下&#xff0c;自动补…

手机站怎么推广

随着手机的普及和移动互联网的快速发展&#xff0c;越来越多的人开始使用手机进行在线购物、社交娱乐、阅读资讯等&#xff0c;同时也催生了越来越多的手机站的出现。但是&#xff0c;在海量的手机站中&#xff0c;要让自己的手机站脱颖而出&#xff0c;吸引更多用户访问和使用…

β-烟酰胺单核苷酸(NMN)功能不断得到验证 市场规模呈增长态势

β-烟酰胺单核苷酸&#xff08;NMN&#xff09;功能不断得到验证 市场规模呈增长态势 β-烟酰胺单核苷酸&#xff08;β-Nicotinamide mononucleotide&#xff0c;NMN&#xff09;是一种生物活性分子&#xff0c;是一种辅酶Ⅰ&#xff08;NAD&#xff09;的前体&#xff0c;也是…

Python魔法之旅-魔法方法(04)

目录 一、概述 1、定义 2、作用 二、主要应用场景 1、构造和析构 2、操作符重载 3、字符串和表示 4、容器管理 5、可调用对象 6、上下文管理 7、属性访问和描述符 8、迭代器和生成器 9、数值类型 10、复制和序列化 11、自定义元类行为 12、自定义类行为 13、类…

Idea的相关操作

1、关闭自动更新 点击左上角File->Setting&#xff0c;进入配置页面&#xff0c;点击Appearance & Behavior > System Settings > Updates&#xff0c;取消勾选更新选项&#xff0c;如图&#xff1b; 2、代码提示忽略大小写 点击左上角File->Setting&#xf…

Llama 3-V: 比GPT4-V小100倍的SOTA

大模型技术论文不断&#xff0c;每个月总会新增上千篇。本专栏精选论文重点解读&#xff0c;主题还是围绕着行业实践和工程量产。若在某个环节出现卡点&#xff0c;可以回到大模型必备腔调重新阅读。而最新科技&#xff08;Mamba&#xff0c;xLSTM,KAN&#xff09;则提供了大模…

【目标检测】基于深度学习的植物中草药智能识别系统【python源码+Pyqt5界面+数据集+训练代码 MX_001期】

系统简介&#xff1a; 这是一款基于深度学习技术的植物草药智能识别系统。系统通过分析植物草药的图像&#xff0c;能够准确地识别出不同种类的草药&#xff0c;并提供相关的信息和用途。用户只需将植物草药的图像上传至系统&#xff0c;即可快速获得识别结果。 系统利用先进…

海关接口源码:跨境贸易的数字桥梁

在全球化贸易日益频繁的今天&#xff0c;海关接口源码成为了促进国际贸易的关键技术之一。它不仅提高了通关效率&#xff0c;还确保了贸易数据的准确性和安全性。本文将探讨海关接口源码的重要性、功能以及其在现代贸易中的作用。 一、海关接口源码的重要性 海关接口源码是一…

C语言分支和循环(2)

我的相关博客&#xff1a; C语言的分支与循环&#xff08;1&#xff09; 1.switch语句 除了 if 语句外&#xff0c;C语⾔还提供了 switch 语句来实现分⽀结构。 switch 语句是⼀种特殊形式的 的 if...else 结构&#xff0c;⽤于判断条件有多个结果的情况。它把多重 else if…

大模型高级 RAG 检索策略:自动合并检索

节前&#xff0c;我们星球组织了一场算法岗技术&面试讨论会&#xff0c;邀请了一些互联网大厂朋友、参加社招和校招面试的同学. 针对算法岗技术趋势、大模型落地项目经验分享、新手如何入门算法岗、该如何准备、面试常考点分享等热门话题进行了深入的讨论。 汇总合集&…

C++结构体数组struct和使用sizeof 从结构体数组中取出数据并写入数字功放寄存器编程实例

C结构体数组编程实例 C结构体数组与普通数组的不同之处&#xff1a;用至少1个花括号来分隔数组。 C结构体数组定义 C结构体数组的定义和定义结构体变量的方法类似&#xff0c;struct声明其为数组即可 结构体数组实例1&#xff1a; typedef struct {u8 cmd; //定义数组中的…