【RabbitMQ】golang客户端教程3——发布订阅(使用fanout交换器)

news2024/12/28 23:16:52

发布订阅

在上一个教程中,我们创建了一个工作队列。工作队列背后的假设是每个任务只传递给一个工人。在这一部分中,我们将做一些完全不同的事情——我们将向多个消费者传递一个消息。这就是所谓的“订阅/发布模式”

为了说明这种模式,我们将构建一个简单的日志系统。它将由两个程序组成——第一个程序将发出日志消息,第二个程序将接收并打印它们。

在我们的日志系统中,每一个运行的接收器程序副本都会收到消息。这样,我们就可以运行一个接收器并将日志定向到磁盘;同时,我们还可以运行另一个接收器并在屏幕上查看日志。

本质上,已发布的日志消息将被广播到所有接收者。

Exchanges(交换器)

在本教程的前面部分中,我们向队列发送消息和接收消息。现在是时候在Rabbit中引入完整的消息传递模型了。

让我们快速回顾一下先前教程中介绍的内容:

  • 生产者是发送消息的用户应用程序。
  • 队列是存储消息的缓冲区。
  • 消费者是接收消息的用户应用程序。

RabbitMQ消息传递模型中的核心思想是生产者从不将任何消息直接发送到队列。实际上,生产者经常甚至根本不知道是否将消息传递到任何队列。

相反,生产者只能将消息发送到交换器。交换器是非常简单的东西。一方面,它接收来自生产者的消息,另一方面,将它们推入队列。交换器必须确切知道如何处理接收到的消息。它应该被附加到特定的队列吗?还是应该将其附加到许多队列中?或者它应该被丢弃。这些规则由交换器的类型定义。

在这里插入图片描述
有几种交换器类型可用:direct, topic, headersfanout。我们将集中讨论最后一个——fanout。让我们创建一个这种类型的交换器,并给它起个名字叫logs:

err = ch.ExchangeDeclare(
  "logs",   // name
  "fanout", // type
  true,     // durable
  false,    // auto-deleted
  false,    // internal
  false,    // no-wait
  nil,      // arguments
)

fanout(扇出)交换器非常简单。正如你可能从名称中猜测的那样,它只是将接收到的所有消息广播到它知道的所有队列中。而这正是我们记录器所需要的。

交换器清单

rabbitmqctl list_exchanges


在此列表中,将有一些`amq.*`交换器和一个默认的(未命名)交换器。这些是默认创建的,但是你现在不太可能需要使用它们。

默认交换器

在本教程的前面部分中,我们还不知道交换器的存在,但仍然能够将消息发送到队列。之所以能这样做,是因为我们使用的是默认交换器,该交换器由空字符串(`""`)标识。

回想一下我们之前是怎么发布消息的:


err = ch.Publish(
  "",     // exchange
  q.Name, // routing key
  false,  // mandatory
  false,  // immediate
  amqp.Publishing{
    ContentType: "text/plain",
    Body:        []byte(body),
})


在这里,我们使用默认或无名称的交换器:消息将以`route_key`参数指定的名称路由到队列(如果存在)。
现在,我们可以改为发布到我们的命名交换器:
err = ch.ExchangeDeclare(
 "logs",   // 使用命名的交换器
   "fanout", // 交换器类型
    true,     // durable
    false,    // auto-deleted
    false,    // internal
    false,    // no-wait
    nil)     // arguments
  failOnError(err, "Failed to declare an exchange")
  body := bodyFrom(os.Args)
  err = ch.Publish(
  "logs", // exchange
    "",     // routing key
    false,  // mandatory
    false,  // immediate
    amqp.Publishing{
    ContentType: "text/plain",
    Body:        []byte(body),
      })

临时队列

你可能还记得,先前我们使用的是具有特定名称的队列(还记得hello和task_queue吗?)能够命名队列对我们来说至关重要——我们需要将工作人员指向同一个队列。当你想在生产者和消费者之间共享队列时,给队列一个名称非常重要。

但对于我们的记录器来说,情况并非如此。我们希望收到所有日志消息,而不仅仅是它们的一部分。我们也只对当前正在发送的消息感兴趣,而对旧消息不感兴趣。为了解决这个问题,我们需要两件事。

首先,当我们连接到Rabbit时,我们需要一个新的、空的队列。为此,我们可以创建一个随机名称的队列,或者更好的方法是让服务器为我们选择一个随机队列名称。

其次,一旦我们断开消费者的连接,队列就会自动删除。

在amqp客户端中,当我们传递一个空字符串作为队列名称时,我们将使用随机生成的名称创建一个非持久队列:

q, err := ch.QueueDeclare(
  "",    // 空字符串作为队列名称
  false, // 非持久队列
  false, // delete when unused
  true,  // 独占队列(当前声明队列的连接关闭后即被删除)
  false, // no-wait
  nil,   // arguments
)

上述方法返回时,生成的队列实例包含RabbitMQ生成的随机队列名称。例如,它可能看起来像amq.gen-JzTY20BRgKO-HjmUJj0wLg

当声明它的连接关闭时,该队列将被删除,因为它被声明为独占。

你可以在队列指南中了解有关exclusive标志和其他队列属性的更多信息。

绑定

在这里插入图片描述
我们已经创建了一个扇出交换器和一个队列。现在我们需要告诉交换器将消息发送到我们的队列。交换器和队列之间的关系称为绑定

err = ch.QueueBind(
  q.Name, // queue name
  "",     // routing key
  "logs", // exchange
  false,
  nil,
)

从现在开始,logs交换器将会把消息添加到我们的队列中。

列出绑定关系
你猜也猜到了,我们可以使用下面的命令列出绑定关系

rabbitmqctl list_bindings

完整示例

产生日志消息的生产程序与上一教程看起来没有太大不同。最重要的变化是我们现在希望将消息发布到logs交换器,而不是空的消息交换器。发送时,我们需要提供一个routingKey,但是对于fanout型交换器,它的值可以被忽略(传空字符串)。下面是emit_log.go脚本的代码:


import (
        "log"
        "os"
        "strings"

        "github.com/streadway/amqp" )

func failOnError(err error, msg string) {
        if err != nil {
                log.Fatalf("%s: %s", msg, err)
        } }

func main() {
        conn, err := amqp.Dial("amqp://guest:guest@localhost:5672/")
        failOnError(err, "Failed to connect to RabbitMQ")
        defer conn.Close()

        ch, err := conn.Channel()
        failOnError(err, "Failed to open a channel")
        defer ch.Close()

        err = ch.ExchangeDeclare(
                "logs",   // name
                "fanout", // type
                true,     // durable
                false,    // auto-deleted
                false,    // internal
                false,    // no-wait
                nil,      // arguments
        )
        failOnError(err, "Failed to declare an exchange")

        body := bodyFrom(os.Args)
        err = ch.Publish(
                "logs", // exchange
                "",     // routing key
                false,  // mandatory
                false,  // immediate
                amqp.Publishing{
                        ContentType: "text/plain",
                        Body:        []byte(body),
                })
        failOnError(err, "Failed to publish a message")

        log.Printf(" [x] Sent %s", body) }

func bodyFrom(args []string) string {
        var s string
        if (len(args) < 2) || os.Args[1] == "" {
                s = "hello"
        } else {
                s = strings.Join(args[1:], " ")
        }
        return s 
        }

如你所见,在建立连接之后,我们声明了交换器。此步骤是必需的,因为禁止发布到不存在的交换器。

如果没有队列绑定到交换器,那么消息将丢失,但这对我们来说是ok的。如果没有消费者在接收,我们可以安全地丢弃该消息。

receive_logs.go的代码:

package main

import (
        "log"

        "github.com/streadway/amqp"
)

func failOnError(err error, msg string) {
        if err != nil {
                log.Fatalf("%s: %s", msg, err)
        }
}

func main() {
        conn, err := amqp.Dial("amqp://guest:guest@localhost:5672/")
        failOnError(err, "Failed to connect to RabbitMQ")
        defer conn.Close()

        ch, err := conn.Channel()
        failOnError(err, "Failed to open a channel")
        defer ch.Close()

        err = ch.ExchangeDeclare(
                "logs",   // name
                "fanout", // type
                true,     // durable
                false,    // auto-deleted
                false,    // internal
                false,    // no-wait
                nil,      // arguments
        )
        failOnError(err, "Failed to declare an exchange")

        q, err := ch.QueueDeclare(
                "",    // name
                false, // durable
                false, // delete when unused
                true,  // exclusive
                false, // no-wait
                nil,   // arguments
        )
        failOnError(err, "Failed to declare a queue")

        err = ch.QueueBind(
                q.Name, // queue name
                "",     // routing key
                "logs", // exchange
                false,
                nil,
        )
        failOnError(err, "Failed to bind a queue")

        msgs, err := ch.Consume(
                q.Name, // queue
                "",     // consumer
                true,   // auto-ack
                false,  // exclusive
                false,  // no-local
                false,  // no-wait
                nil,    // args
        )
        failOnError(err, "Failed to register a consumer")

        forever := make(chan bool)

        go func() {
                for d := range msgs {
                        log.Printf(" [x] %s", d.Body)
                }
        }()

        log.Printf(" [*] Waiting for logs. To exit press CTRL+C")
        <-forever
}

如果要将日志保存到文件,只需打开控制台并输入:

 go run receive_logs.go > logs_from_rabbit.log

在这里插入图片描述
如果希望在屏幕上查看日志,请切换到一个新的终端并运行:

go run receive_logs.go

当然,要发出日志,请输入:

go run emit_log.go

在这里插入图片描述
在这里插入图片描述
使用rabbitmqctl list_bindings命令,你可以验证代码是否确实根据需要创建了绑定关系和队列。在运行两个receive_logs.go程序后,你应该看到类似以下内容:

rabbitmqctl list_bindings
# => Listing bindings ...
# => logs    exchange        amq.gen-JzTY20BRgKO-HjmUJj0wLg  queue           []
# => logs    exchange        amq.gen-vso0PVvyiRIL2WoV3i48Yg  queue           []
# => ...done.

对结果的解释很简单:数据从logs交换器进入了两个由服务器分配名称的队列。这正是我们想要的。

源自:https://www.rabbitmq.com/getstarted.html

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.coloradmin.cn/o/839877.html

如若内容造成侵权/违法违规/事实不符,请联系多彩编程网进行投诉反馈,一经查实,立即删除!

相关文章

Vulnhub: DriftingBlues: 6靶机

kali&#xff1a;192.168.111.111 靶机&#xff1a;192.168.111.180 信息收集 端口扫描 nmap -A -sC -v -sV -T5 -p- --scripthttp-enum 192.168.111.180 查看robots.txt发现存在目录&#xff1a;/textpattern/textpattern 访问后发现是textpattern cms 目录爆破发现文件sp…

尚品汇总结三:商城首页(面试专用)

目录 首页商品分类实现 1、封装数据接口 2、页面静态化&#xff1a; 什么是页面静态化 为什么要使用静态化 首页商品分类实现 前面做了商品详情&#xff0c;我们现在来做首页分类&#xff0c;我先看看京东的首页分类效果&#xff0c;我们如何实现类似效果&#xff1a; 思路…

2020年06月《全国青少年软件编程等级考试》Python一级真题解析

一、单选题 第1题 以下哪种输入结果不可能得到以下反馈&#xff1a; 重要的事情说三遍&#xff1a;安全第一&#xff01;安全第一&#xff01;安全第一&#xff01; A&#xff1a;print(“重要事情说三遍&#xff1a;”“安全第一&#xff01;”*3) B&#xff1a;print(“重…

扩散模型实战(一):基本原理介绍

扩散模型&#xff08;Diffusion Model&#xff09;是⼀类⼗分先进的基于物理热⼒学中的扩散思想的深度学习⽣成模型&#xff0c;主要包括前向扩散和反向扩散两个过程。⽣成模型除了扩散模型之外&#xff0c;还有出现较早的VAE&#xff08;Variational Auto-Encoder&#xff0c;…

【TypeScript】初识TypeScript和变量类型介绍

TypeScript 1&#xff0c;TypeScript是什么?2&#xff0c;类型的缺失带来的影响3&#xff0c;Ts搭建环境-本博主有专门的文章专说明这个4&#xff0c;使用tsc对ts文件进行编译5&#xff0c;TS运行初体验简化Ts运行步骤解决方案1解决方案2&#xff08;常见&#xff09; 开始学习…

【探索Linux】—— 步步学习强大的命令行工具 P.1(Linux简介)

目录 前言 一、Linux简介 二、linux的不同发行版本 三、Linux的开源性质 四、Linux的特点 五、Linux代码演示&#xff08;仅供参考&#xff09; 总结 前言 前面我们讲了C语言的基础知识&#xff0c;也了解了一些数据结构&#xff0c;并且讲了有关C的一些知识&#xff…

如何将本地的conda算法库打包到无网络的服务器用于部署

如何将本地的conda算法库打包到无网络的服务器用于部署 1、先安装conda-pack库&#xff0c;2、将本地的虚拟环境进行打包3、登录远程服务器&#xff0c;切换到conda安装目录&#xff0c;将本地生成的tar文件复制到该目录下4、新建文件夹&#xff0c;例如yus_env&#xff0c;这个…

并查集练习 —岛屿数量(解法一)

题目&#xff1a; 给定一个二维数组matrix&#xff08;char[][]&#xff09;&#xff0c;里面的值不是1就是0&#xff0c;上、下、左、右相邻的1认为是一片岛。返回matrix中岛的数量。 本题共有2种解法&#xff0c;本篇先介绍最快的一种解法—递归。 分析&#xff1a; 递归的方…

65 # 实现 http-server 里的 gzip 压缩

用 zlib 来实现 gzip 压缩 服务端优化都是&#xff1a;压缩 缓存 前端可以通过 webpack 插件进行压缩 gzip 根据替换来实现的&#xff0c;重复率越高&#xff0c;压缩后的结果越小 const zlib require("zlib"); const fs require("fs"); const path …

K8S kubeadm搭建

kubeadm搭建整体步骤 1&#xff09;所有节点进行初始化&#xff0c;安装docker引擎和kubeadm kubelet kubectl 2&#xff09;生成集群初始化配置文件并进行修改 3&#xff09;使用kubeadm init根据初始化配置文件生成K8S的master控制管理节点 4&#xff09;安装CNI网络插件&am…

分页Demo

目录 一、分页对象封装 分页数据对象 分页查询实体类 实体类用到的utils ServiceException StringUtils SqlUtil BaseMapperPlus,> BeanCopyUtils 二、示例 controller service dao 一、分页对象封装 分页数据对象 import cn.hutool.http.HttpStatus; import com.…

EditPlus连接Linux系统远程操作文件

EditPlus是一套功能强大的文本编辑器&#xff01; 1.File ->FTP->FTP Settings&#xff1b; 2.Add->Description->FTP server->Username->Password->Subdirectory->Advanced Options 注意&#xff1a;这里的Subdirectory设置的是以后上传文件的默认…

C. Mark and His Unfinished Essay - 思维

分析&#xff1a; 直接模拟操作会mle&#xff0c;可以每次复制记录对应源字符串的下标&#xff0c;可以记录每次字符串增加的长度的左右端点下标&#xff0c;可以发现左端点与读入的l是对应的&#xff0c;因此就可以向前移到l的位置&#xff0c;这样层层递归&#xff0c;就能找…

HCIE-Datacom真题和机构资料

通过认证验证的能力 具备坚实的企业网络跨场景融合解决方案理论知识&#xff0c;能够使用华为数通产品及解决方案进行企业园区网络、广域互联网络及广域承载网络的规划、建设、维护及优化&#xff0c;能够胜任企业网络全场景专家岗位&#xff08;包括客户经理、项目经理、售前…

jenkins安装部署

安装jdk 方式一&#xff1a;压缩包 cd / mkdir java_home cd / cd java_home tar -zxvf jdk-8u311-linux-x64.tar.gz ls 修改配置 vi /etc/profile 最后一行追加内容&#xff1a; #Java Env export JAVA_HOME/java_home/jdk1.8.0_311 export JRE_HOME${JAVA_HO…

微波光子的参数:动态范围

微波光子的参数&#xff1a;无杂散动态范围 无杂散动态范围的定义 微波光子链路中的非线性失真主要由电光调制器的非线性调制产生&#xff0c;这些非线性失真可以分为谐波失真和交调失真两类。图1.2&#xff08;a&#xff09;给出了光信号在调制器内被一个频率为10 GHz的射频…

两个PPT内置神器,速成页面排版

1.word转PPT &#xff08;1&#xff09;调整word文字层级 word中设置好文档一级标题、二级标题、正文等层级。 tips&#xff1a;大纲视图&#xff0c;ctrl选中多个文字&#xff0c;可以同时定义标题层级。 &#xff08;2&#xff09;ppt中导入word文档 &#xff08;3&#x…

webpack基础知识五:说说Loader和Plugin的区别?编写Loader,Plugin的思路?

一、区别 前面两节我们有提到Loader与Plugin对应的概念&#xff0c;先来回顾下 loader 是文件加载器&#xff0c;能够加载资源文件&#xff0c;并对这些文件进行一些处理&#xff0c;诸如编译、压缩等&#xff0c;最终一起打包到指定的文件中plugin 赋予了 webpack 各种灵活的…

esp32c3 xiao 脚本记录

oled显示网络时间, wifi链接网络 // ntp_get_date.h #include "time.h"String week[8] {"Sun", "Mon", "Tues", "Wednes", "Thur", "Fri", "Sat" };void printLocalTime(Adafruit_SSD1306 …

[ MySQL ] — 数据库环境安装、概念和基本使用

目录 安装MySQL 获取mysql官⽅yum源 安装mysql yum 源 安装mysql服务 启动服务 登录 方法1&#xff1a;获取临时root密码 方法2&#xff1a;无密码 方法3&#xff1a;跳过密码认证 配置my.cnf 卸载环境 设置开机启动(可以不设) 常见问题 安装遇到秘钥过期的问题&…