RabbitMQ 发布订阅

news2025/1/10 20:58:08

 RabbitMQ 发布订阅视频学习地址:

简单模式下RabbitMQ 发布者发布消息 消费者消费消息

Publist/Subscribe 发布订阅

RabbitMQ 中,发布订阅模式是一种消息传递方式,其中发送者(发布者)不会将消息直接发送到特 定的接收者(订阅者)。而是将消息发送到一个交换机,交换机将消息转发到绑定到该交换机的每个队 ,每个绑定交换机的队列都将接收到消息。消费者(订阅者)监听自己的队列 并进行消费 。

 

场景 : 开放平台 开发者订阅了某个开放平台的 api 之后,数据有变化就会自动获取到最新的

 

 

 

在订阅模型中,多了一个 Exchange 角色,而且过程略有变化:

 

P :生产者,也就是要发送消息的程序,但是不再发送到队列中,而是发给 X (交换机)
C :消费者,消息的接收者,会一直等待消息到来
Queue :消息队列,接收消息、缓存消息
Exchange :交换机( X )。一方面,接收生产者发送的消息。另一方面,知道如何处理消息,例如递 交给某个特别队列、递交给所有队列、或是将消息丢弃。到底如何操作,取决于Exchange 的类型。
Exchange 有常见以下 3 种类型:
Fanout :广播,将消息交给所有绑定到交换机的队列
Direct :定向,把消息交给符合指定 routing key 的队列
Topic :通配符,把消息交给符合 routing pattern (路由模式) 的队列
Exchange (交换机)只负责转发消息,不具备存储消息的能力,因此如果没有任何队列与 Exchange 绑定,或者没有符合路由规则的队列,那么消息会丢失

 

RabbitMQ 发布订阅模式的一些应用场景:  

 

1. 数据提供商与应用商 :例如中国气象局向多个门户网站提供气象数据。
2. 新闻机构 :将独家新闻发布给多个订阅者,但可能需要根据新闻类型进行更精细的路由。
3. 商城系统 :新添加商品后,同时更新缓存和数据库。
4. 用户通知 :用户充值或转账成功后,通过多种方式(如短信、邮件)通知用户。
5. 消息广播 :将消息广播到多个消费者,例如系统公告、活动通知等。
6. 降低耦合 :生产者和消费者通过 RabbitMQ 进行解耦,不需要直接连接,提高系统的灵活性和可
扩展性。
7. 异步处理 :生产者发送消息后,消费者可以异步处理,提高系统的响应速度和并发处理能力。

 

生产者
emit_log.go

 

package main
import (
"context"
"log"
"os"
"strings"
"github.com/rabbitmq/amqp091-go"
)
func failOnError(err error, msg string) {
if err != nil {
log.Printf("%s: %s", msg, err)
}
}
func bodyForm(args []string) string {
var s string
if (len(args) < 2) || os.Args[1] == "" {
s = "hello"
} else {
s = strings.Join(args[1:], " ")
}
return s
}
func main() {
// 连接到RabbitMQ服务器
conn, err := amqp091.Dial("amqp://guest:guest@localhost:5672/")
failOnError(err, "Failed to connect to RabbitMQ")
defer conn.Close()
// 创建一个通道
ch, err := conn.Channel()
failOnError(err, "Failed to open a channel")
defer ch.Close()
//声明一个交换机
err = ch.ExchangeDeclare(
"logs", //name 交换机名称
"fanout", //交换机类型 Fanout 广播
true, //durable 持久化
false, //autoDelete 是否自动删除
false, //internal 是否内部使用 设置为 false 时,表示无论如何这个交换器都不是
内置的
false, //noWait 是否等待服务器响应 参数通常默认为False,意味着操作会同步进
行并等待服务器的响应
nil, // 其他属性
)
failOnError(err, "Failed to declare an exchange")
//发送消息
body := bodyForm(os.Args)
// 发布消息到交换机,并指定路由键
err = ch.PublishWithContext(
context.Background(),
"logs", // 交换器的名称
"", // 队列名
false, // mandatory 必须发送到队列 ,false表示如果交换器无法根据自身的类型和路
由键找到一个符合条件的队列丢弃
false, //immediate 参数设置为 false 时,表示消息不需要立即被消费者接收
amqp091.Publishing{
ContentType: "text/plain",
Body: []byte(body),
})
failOnError(err, "Failed to publish a message")
log.Printf(" [x] Sent: %s", body)
}

 

消费者
receive_log.go

 

package main
import (
"log"
"github.com/rabbitmq/amqp091-go"
)
func failOnError2(err error, msg string) {
if err != nil {
log.Printf("%s: %s", msg, err)
}
}
func main() {
//建立连接
conn, err := amqp091.Dial("amqp://guest:guest@localhost:5672/")
failOnError2(err, "Failed to connect to RabbitMQ")
defer conn.Close()
//创建一个Channel
ch, err := conn.Channel()
failOnError2(err, "Failed to open a channel")
defer ch.Close()
//声明一个交换机
err = ch.ExchangeDeclare(
"logs", // 交换机名称
"fanout", // 交换机类型
true, // 是否持久化
false, // 是否自动删除
false, // 是否内部使用
false, // 是否等待服务器响应
nil, // 其他属性
)
failOnError2(err, "Failed to declare an exchange")
// 声明一个临时队列
q, err := ch.QueueDeclare(
"", // 队列名称,留空表示由RabbitMQ自动生成
false, // 是否持久化
false, // 是否自动删除(当没有任何消费者连接时)
true, // 是否排他队列(仅限于当前连接)
false, // 是否等待服务器响应
nil, // 其他属性
)
failOnError2(err, "Failed to declare a queue")
// 将队列绑定到交换机上
err = ch.QueueBind(
q.Name, // 队列名称
"", // 路由键,留空表示接收交换机的所有消息
"logs", // 交换机名称
false, // 是否等待服务器响应
nil, // 其他属性
)
failOnError2(err, "Failed to bind a queue")
msgs, err := ch.Consume(
q.Name, // 队列名称
"", // 消费者标识符,留空表示由RabbitMQ自动生成
true, // 是否自动应答
false, // 是否独占模式(仅限于当前连接)
false, // 是否等待服务器响应
false, // noLocal
nil, // 其他属性
)
// msgs, err := ch.Consume(q.Name, "", true, false, false, false, nil)
failOnError2(err, "Failed to register a consumer")
var forever chan struct{}
go func() {
for d := range msgs {
log.Printf(" [x] %s", d.Body)
}
}()
log.Printf(" [x] Waiting for logs. To exit press CTRL+C")
<-forever
}

 运行

# 如果你想保存日志文件
go run receive_log.go > logs_from_rabbit.log
# 如果你想再终端看到日志
go run receive_log.go
# shell2
go run emit_log.go

 

 

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.coloradmin.cn/o/1707564.html

如若内容造成侵权/违法违规/事实不符,请联系多彩编程网进行投诉反馈,一经查实,立即删除!

相关文章

8.微信小程序之自定义组件

目录 1. 创建-注册-使用组件 1.1 创建自定义组件 1.2 使用自定义组件 2. 自定义组件-数据和方法 3. 自定义组件-属性 4. 组件 wxml 的 slot 5. 组件样式以及注意事项 6. 组件样式隔离 7. 数据监听器 8. 组件间通信与事件 8.1 父往子传值 8.2 子往父传值 8.3 获取…

十四天学会Vue——Vue核心(理论+实战)上篇(第一天)

一、Vue核心&#xff08;上篇&#xff09; 热身tops&#xff1a;选取开发模式 ①用于开发模式 我们只需要知道 我们是开发模式&#xff0c;开发模式他会跟你提示代码出现错误的地方以及出错原因&#xff0c;而生产模式比较简洁。 ②用于生产模式 1.1 new Vue()实例 了解Vue&a…

shell脚本的基础应用

规范脚本的构成 #&#xff01;/bin/bash # 注释信息 可执行的语句 执行脚本的方法 有1.添加x权限 ,绝对路经&#xff0c;或者相对路径2. 使用解释器 不需加x,root...bash...bash..echo 3,用source&#xff0c; 开机root ...bash ...echo bash -x /opt/test01.sh &#xff…

Linux网络-Socket套接字_Windows与Linux端双平台基于Udp传输协议进行多线程跨平台的服务器与客户端网络通信的简易聊天室实现

文章目录 一、Socket套接字二、socket 常见API1. int socket(int domain, int type, int protocol);2. int bind(int socket, const struct sockaddr *address, socklen_t address_len);struct sockaddr 3. ssize_t recvfrom(int socket, void *restrict buffer, size_t length…

话术巧妙分隔沟通效果更佳看看这个小技巧

客服回复客户咨询&#xff0c;如果遇到比较复杂的问题&#xff0c;经常会有大段的文字回复&#xff0c;用聊天宝的分段符功能&#xff0c;在需要分段的地方点击右上角的“插入分隔符”&#xff0c;就可以在指定位置分段&#xff0c;实现多段发送的目的。 前言 客服回复客户咨询…

Python-3.12.0文档解读-内置函数map()详细说明+记忆策略+常用场景+巧妙用法+综合技巧

一个认为一切根源都是“自己不够强”的INTJ 个人主页&#xff1a;用哲学编程-CSDN博客专栏&#xff1a;每日一题——举一反三Python编程学习Python内置函数 Python-3.12.0文档解读 目录 详细说明 map(function, iterable, *iterables) 参数 返回值 示例 注意事项 参考…

CHI Read传输——CHI(3)

目录 一、Read操作概览 二、DMT(Direct Memory Transfer) 三、DCT (Direct Cache Transfer) 四、without Direct Data Transfer 五、ReadNoSnp and ReadOnce* structure with DMT 本篇我们来介绍一下CHI传输类型中的Read 一、Read操作概览 read操作有以下几种&#xff1…

自适应星空背景个人导航单页(附带源码)

自适应星空背景个人导航单页 效果图部分源码领取源码下期更新预报 效果图 部分源码 function Star(id, x, y) {this.id id;this.x x;this.y y;this.r Math.floor(Math.random() * 2) 1;var alpha (Math.floor(Math.random() * 10) 1) / 10 / 2;this.color "rgba(…

QColor官网文档简介

颜色可以用不同的模型来进行表示,常见的有RGB,HSV或者CMYK.QColor基于RGB值创建颜色,如果要转接到HSV或者CMYK需要使用转接函数toHsv() toCmyk() 返回期望格式的副本 get...和set...函数具体参见文档用isValid()检查RGB颜色是否合法颜色组成部分可以单独检索QRgb是一种无符号整…

第98天:权限提升-WIN 全平台MSF 自动化CS 插件化EXP 筛选溢出漏洞

目录 思维导图 前置知识 案例一&#xff1a; Web&Win2008-人工手动&全自动msf-筛选&下载&利用 手动 全自动msf 案例二: Web&Win2019-CS 半自动-反弹&插件&利用 思维导图 前置知识 提权方式&#xff0c;这里讲的是溢出漏洞 windows权限 常…

⌈ 传知代码 ⌋ 高速公路车辆速度检测软件

&#x1f49b;前情提要&#x1f49b; 本文是传知代码平台中的相关前沿知识与技术的分享~ 接下来我们即将进入一个全新的空间&#xff0c;对技术有一个全新的视角~ 本文所涉及所有资源均在传知代码平台可获取 以下的内容一定会让你对AI 赋能时代有一个颠覆性的认识哦&#x…

C++学习/复习8--STL简介/六大组件/缺陷

一、STL简介 二、六大组件 三、面试题 四、STL缺陷

实现按块复制元素的进阶技巧

新书上架~&#x1f447;全国包邮奥~ python实用小工具开发教程http://pythontoolsteach.com/3 欢迎关注我&#x1f446;&#xff0c;收藏下次不迷路┗|&#xff40;O′|┛ 嗷~~ 目录 一、按块复制元素的重要性 二、使用LED模块创建数组并复制 三、实现按块复制的具体步骤 四…

粤嵌—2024/5/28—最大正方形(✔)

代码实现&#xff1a; 方法一&#xff1a;模拟——超时 int maximalSquare(char **matrix, int matrixSize, int *matrixColSize) {int maxSide 0;if (matrix NULL || matrixColSize NULL || matrixSize < 0 || matrixColSize[0] < 0) {return 0;}for (int i 0; i &l…

Cesium For Unity 在Unity中无法下载的问题

Unity 下载失败&#xff0c;提供百度网盘“com.cesium.unity-1.10.0.tgz”下载链接 链接&#xff1a;https://pan.baidu.com/s/1PybXQ8EvkRofOKD6rSN66g?pwd1234 提取码&#xff1a;1234 导入方法&#xff1a; 1.打开PackageManager;Window-PackageManager 2.在PackageMan…

Golang | Leetcode Golang题解之第104题二叉树的最大深度

题目&#xff1a; 题解&#xff1a; func maxDepth(root *TreeNode) int {if root nil {return 0}queue : []*TreeNode{}queue append(queue, root)ans : 0for len(queue) > 0 {sz : len(queue)for sz > 0 {node : queue[0]queue queue[1:]if node.Left ! nil {queue…

酒店报修进入智能时代:无纸化系统的全面革新

在这个信息爆炸的时代&#xff0c;面对酒店的设备故障你还在用纸质工单来报修吗&#xff1f;那简直像是石器时代的遗风&#xff01;一场酒店服务的革命性变革正在悄然兴起&#xff0c;它将彻底颠覆你对传统报修方式的认知。想象一下&#xff0c;当客人在房间遇到水龙头漏水&…

使用大模型LLM实现销售AI

想象一个场景&#xff0c;客户通过聊天窗口咨询一款产品。销售AI首先使用LLM解析客户的问题&#xff0c;然后通过智能代理查询数据库获取产品详细信息&#xff0c;并以自然而友好的方式回应客户。 在对话过程中&#xff0c;AI可以评估客户的兴趣&#xff0c;并主动提供促销信息…

太极图形学——渲染——光线追踪概念部分

程序动画和渲染 程序动画和渲染有一些类似的地方&#xff0c;都是找到合适的像素并填上颜色&#xff0c;但是要把一个图片渲染的好看是有一些规则来指导的 光线追踪包含哪些呢 果壳中的光线追踪器 实时条件下的光线追踪&#xff1a;需要进行光栅化的处理 离线条件下做cg的话…

Hololens 2 新建自定义按钮

官方链接地址 1、创建Cube 2、添加PressableButton脚本&#xff0c;并点击AddNearin… 3、把Cube拖入到MovingButtonVisuals变量中 4、点击NearInteractionTouchable组件&#xff08;这个组件是添加和上一个脚本绑定的&#xff0c;自动添加上来的&#xff09;上的Fix… 5、…