GoLong的学习之路,进阶,RabbitMQ (消息队列)

news2025/1/8 11:59:49

快有一周没有写博客了。前面几天正在做项目。正好,项目中需要MQ(消息队列),这里我就补充一下我对mq的理解。其实在学习java中的时候,自己也仿照RabbitMQ自己实现了一个单机的mq,但是mq其中一个特点也就是,分布式我在项目中没有涉及。这里我用go语言将RabbitMQ的操作进行一次整理

文章目录

  • MQ概念
  • 操作RabbitMQ
    • 安装
    • 连接
      • 生产者
      • 消费者
      • 例子
        • 生成者
        • 消费者
  • 注意常见的问题:
    • 匹配规则

MQ概念

MQ是消息队列(Message Queue)的缩写,是一种应用程序对应用程序的通信方法。应用程序通过写和检索出入列队的针对应用程序的数据(消息)来通信,而无需专用连接来链接它们。

市面上有许多成熟的消息队列非常多,例如:

  • RabbitMQ(⾮常知名, 功能强⼤, ⼴泛使⽤的消息队列)(今天要说明的)
  • Kafka(高吞吐、可持久化、可水平扩展、支持流数据处理等多种特性而被广泛使用,大量用于大数据领域)
  • RocketMQ(具有高性能、高可靠、高实时、分布式特点)
  • ActiveMQ (高可用、高性能、可伸缩的企业级面向消息服务的系统)

而RabbitMQ为什么叫rabbit,其实这就不得不说一个时代,在16世纪左右有那么个事件(🐏吃人事件)简单而言就是🐏比人贵。而为了让羊有更多的食物。就需要和人以及动物(rabbit)抢地盘,但是兔子一生就是一窝,兔子行动非常迅速而且繁殖起来也非常疯狂,所以就把Rabbit用作这个分布式软件的命名(就是真么简单)。

它能做什么?

  • 可靠性:RabbitMQ 提供了各种功能,让你权衡性能与可靠性,其中包括持久性,交付确认和高可用性。
  • 灵活的路由:消息在到达队列之前,通过交换机的路由。RabbitMQ 为典型的路由逻辑提供了几个内置的交换机类型。对于更复杂的路由,则可以绑定几种交换机一起使用甚至可以自己实现交换机类型,并且把它作为一个插件的来使用。
  • 集群:在本地网络上的几个 RabbitMQ 服务器可以聚集在一起,作为一个独立的逻辑代理来使用。
  • 联合:对于服务器来说,它比集群需要更多的松散和非可靠链接。为此 RabbitMQ 提供了联合模型。
  • 高度可用队列:在群集中,队列可以被镜像到几个机器中,确保您的消息即使在出现硬件故障的安全。
  • 多协议:RabbitMQ 支持上各种消息传递协议的消息传送.
  • 许多客户端:有你能想到的几乎任何语言 RabbitMQ 客户端。
  • 管理用户界面:RabbitMQ 附带一个简单使用管理用户界面,允许您监视和控制您的消息代理的各个方面。
  • 追踪:如果您的消息系统行为异常,RabbitMQ 提供跟踪支持,让你找出问题是什么。
  • 插件系统:RabbitMQ 附带各种插件扩展,并且你也可以写你自己插件.
  • 商业支持:提供商业支持、 培训和咨询。
  • 大型社区:有一个庞大的社区 RabbitMQ,有各种各样的客户端、 插件、 指南等。

其基本能力:

  1. 跨进程通信
    指两个或多个进程之间进行数据交换的过程。 跨进程通信的方式有很多,比如管道、消息队列、共享内存、信号量等
  2. 异步处理
    指在执行某个操作时,不需要等待其完成,可以继续执行其他操作的一种处理方式
  3. 削峰填谷
    指在同一刻传入的数据过大使得服务无法处理,需要等待一些时间,将峰值通过时间一批一批的处理

这个就是基础的架构
在这里插入图片描述

  • N–M :这个代表的是多对多的关系,这个关系比较重要(意味着必须要用多线程处理);在图中 我表示的是多个交换机可以通过绑定去去控制多个队列
  • N–1 :这个代表多对1的关系。这个也比较重要(也意味着我们需要多线程处理);在图中表示,多个客户端去访问一个服务器。

可以看出MQ其实主要由两部分构成,一部分客户端,一部分由服务器构成。而客户端的又分为生产客户端(提交消息)和消费客户端(消费消息)。服务器的构建就有些复杂,需要构图:

在这里插入图片描述
在这里插入图片描述

  • 生产者(Producer):其实就是将消息上传到服务器
  • 消费者(Consumer):其实就是从服务器中拿出消息
  • 发布(Publish):就是生产者将消息上传给服务器的过程
  • 订阅(Subscribe):就是消费者将消息从服务器拿出的过程
  • 中间服务器(BrokerServer):核心服务器,负责给生产者和消费者提供发布和订阅以及构建交换机和绑定以及队列的API
    • 虚拟机(VirtualHost): 类似于 MySQL 的 “database”, 是⼀个逻辑上的集合. ⼀个 BrokerServer 上可以存在多个 VirtualHost
    • 交换机(Exchange):⽣产者把消息先发送到 Broker 的 Exchange 上. 再根据不同的规则, 把消息转发给不同的 Queue
    • 队列(Queue):真正⽤来存储消息的部分. 每个消费者决定⾃⼰从哪个 Queue 上读取消息
    • 绑定(Bind): Exchange 和 Queue 之间的关联关系. Exchange 和 Queue 可以理解成 “多对多” 关系. 使⽤⼀个关联表就可以把这两个概念联系起来.(不理解的可以看上面的图)
    • 消息(Message):生产者和消费者之间传递的内容
    • 内存存储(Memory):方便使用,快速调用
    • 硬盘存储(Disk):重启数据不丢失

在这里插入图片描述

操作RabbitMQ

安装

说起安装我就不得不说一下,官方文件的安装简直令人炸毛。个人建议,诺是自己有云,就去看云官方文件如何实现的。如果安装在本地那就没有那么麻烦了,看官网就行。我这边使用的是阿里云,所以这个里放一个阿里云的文档

连接

首选就是在项目中拿到相应的操作远程的mq

go get github.com/streadway/amqp

补充:

  1. 端口号:15672是管理rabbit的接口
  2. 端口号:5672是我们,调用,并使用的端口
    (如果发现端口调用不了,就必须要在防火墙中将这个端口号打开)(强调一下我这里用的阿里云)

生产者

创建一个生产者的步骤:

  1. 连接 Connection
  2. 创建 Channel
  3. 创建或连接一个交换器
  4. 创建或连接一个队列
  5. 交换器绑定队列(不绑定的话,队列在运行完毕后会消失)
  6. 投递消息
  7. 关闭 Channel
  8. 关闭 Connection

创建连接

connection, err := amqp.Dial("amqp://guest:guest@localhost:5672/")

创建通道

// channel
channel, err := connection.Channel()

创建交换机

err = channel.ExchangeDeclare("e1", "direct", true, false, false, true, nil)

参数依次说明:

  • name 交换机名称
  • kind 交换机类型
  • durable 持久化标识
  • autoDelete 是否自动删除
  • internal 是否是内置交换机
  • noWait 是否等待服务器确认
  • args 其它配置

参数说明要点:

  • autoDelete :

自动删除功能必须要在交换器曾经绑定过队列或者交换器的情况下,处于不再使用的时候才会自动删除,如果是刚刚创建的尚未绑定队列或者交换器的交换器或者早已创建只是未进行队列或者交换器绑定的交换器是不会自动删除的。

  • internal :

内置交换器是一种特殊的交换器,这种交换器不能直接接收生产者发送的消息,只能作为类似于队列的方式绑定到另一个交换器,来接收这个交换器中路由的消息,内置交换器同样可以绑定队列和路由消息,只是其接收消息的来源与普通交换器不同。

  • noWait

noWaittrue 时,声明时无需等待服务器的确认。

该通道可能由于错误而关闭。 添加一个 NotifyClose 侦听器应对任何异常。创建交换器还有一个差不多的方法( ExchangeDeclarePassive ),他主要是假定交换已存在,并尝试连接到不存在的交换将导致 RabbitMQ 引发异常,可用于检测交换器的存在。

创建队列

q, err := channel.QueueDeclare("q1", true, false, false, true, nil)

参数说明:

  • name 队列名称
  • durable 持久化
  • autoDelete 自动删除
  • exclusive 排他
  • noWait 是否等待服务器确认
  • args Table

参数说明要点:

  • exclusive 排他

排他队列只对首次创建它的连接可见,排他队列是基于连接( Connection )可见的,并且该连接内的所有信道( Channel)都可以访问这个排他队列,在这个连接断开之后,该队列自动删除,由此可见这个队列可以说是绑到连接上的,对同一服务器的其他连接不可见。

同一连接中不允许建立同名的排他队列的这种排他优先于持久化,即使设置了队列持久化,在连接断开后,该队列也会自动删除。

非排他队列不依附于连接而存在,同一服务器上的多个连接都可以访问这个队列。

  • autoDelete 设置是否自动删除。

true 则设置队列为自动删除。

自动删除的前提是:至少有一个消费者连接到这个队列,之后所有与这个队列连接的消费者都断开时,才会自动删除。

不能把这个参数错误地理解为:“当连接到此队列的所有客户端断开时,这个队列自动删除”,因为生产者客户端创建这个队列,或者没有消费者客户端与这个队列连接时,都不会自动删除这个队列。

创建队列还有一个差不多的方法( QueueDeclarePassive ),他主要是假定队列已存在,并尝试连接到不存在的队列将导致 RabbitMQ 引发异常,可用于检测队列的存在

绑定交换器和队列

err = channel.QueueBind("q1", "q1Key", "e1", true, nil)

参数解析:

  • name 队列名称
  • key BindingKey 根据交换机类型来设定
  • exchange 交换机名称
  • noWait 是否等待服务器确认
  • args Table

绑定交换器(可选)

err = channel.ExchangeBind("dest", "q1Key", "src", false, nil)

参数解析:

  • destination 目的交换器
  • key RoutingKey 路由键
  • source 源交换器
  • noWait 是否等待服务器确认
  • args Table  其它参数

生产者发送消息至交换器 source 中,交换器 source 根据路由键找到与其匹配的另一个交换器 destination ,井把消息转发到 destination 中,进而存储在 destination 绑定的队列 queue 中,某种程度上来说 destination 交换器可以看作一个队列。

投递消息

err = channel.Publish("e1", "q1Key", true, false, amqp.Publishing{
    Timestamp:   time.Now(),
    DeliveryMode: amqp.Persistent, //Msg set as persistent
    ContentType: "text/plain",
    Body:        []byte("Hello Golang and AMQP(Rabbitmq)!"),
})
  • exchange 交换器名称
  • key RouterKey
  • mandatory 是否为无法路由的消息进行返回处理
  • immediate 是否对路由到无消费者队列的消息进行返回处理 RabbitMQ 3.0 废弃
  • msg 消息体

mandatory
消息发布的时候设置消息的 mandatory 属性用于设置消息在发送到交换器之后无法路由到队列的情况对消息的处理方式,设置为 true 表示将消息返回到生产者,否则直接丢弃消息。

immediate
参数告诉服务器至少将该消息路由到一个队列中,否则将消息返回给生产者。 imrnediate 参数告诉服务器,如果该消息关联的队列上有消费者,则立刻投递:如果所有匹配的队列上都没有消费者,则直接将消息返还给生产者,不用将消息存入队列而等待消费者了。

RabbitMQ 3.0版本开始去掉了对 imrnediate 参数的支持。

其中 amqp.PublishingDeliveryMode 如果设为 amqp.Persistent 则消息会持久化。需要注意的是如果需要消息持久化 Queue 也是需要设定为持久化才有效。

消费者

Rabbitmq 消费方式共有 2 种,分别是推模式拉模式

推模式

deliveries, err := channel.Consume("q1", "any", false, false, false, true, nil)

推模式是通过持续订阅的方式来消费信息, Consume 将信道( Channel )设置为接收模式,直到取消队列的订阅为止。在接收模式期间, RabbitMQ 会不断地推送消息给消费者。推送消息的个数还是会受到 channel.Qos 的限制。

参数说明:

  • queue 队列名称
  • consumer 消息者名称
  • autoAck 是否确认消费
  • exclusive 排他
  • noLocal
  • noWait bool
  • args Table

noLocal

设置为 true 则表示不能将同一个 Connection 中生产者发送的消息传送给这个Connection 中的消费者

autoAck 可以设置为 true 或者false

  • 如果设为true则消费者一接收到就从queue中去除了,如果消费者处理消息中发生意外该消息就丢失了。
  • 如果设为 false 则消费者在处理完消息后,调用 msg.Ack(false) 后消息才从 queue 中去除。即便当前消费者处理该消息发生意外,只要没有执行 msg.Ack(false) 那该消息就仍然在 queue 中,不会丢失。

如果autoAck设置为 false 则表示需要手动进行 ack 消费

v, ok := <-deliveries
if ok {
    // 手动ack确认
    // 注意: 这里只要调用了ack就是手动确认模式,
    // v.Ack的参数 multiple 表示的是在此channel中先前所有未确认的deliveries都将被确认
    // 并不是表示设置为false就不进行当前ack确认
    if err := v.Ack(true); err != nil {
        fmt.Println(err.Error())
    }
} else {
    fmt.Println("Channel close")
}

拉模式
相对来说比较简单,是由消费者主动拉取信息来消费,每次只消费一条消息,同样也需要进行 ack 确认消费。

channel.Get(queue string, autoAck bool)

例子

生成者

这个文件起名叫:new_task.go

package main

import (
	"fmt"
	"log"
	"os"
	"strings"

	"github.com/streadway/amqp"
)

func main() {
	// 1. 尝试连接RabbitMQ,建立连接
	// 该连接抽象了套接字连接,并为我们处理协议版本协商和认证等。
	conn, err := amqp.Dial("amqp://guest:guest@localhost:5672/")
	if err != nil {
		fmt.Printf("connect to RabbitMQ failed, err:%v\n", err)
		return
	}
	defer conn.Close()

	// 2. 接下来,我们创建一个通道,大多数API都是用过该通道操作的。
	ch, err := conn.Channel()
	if err != nil {
		fmt.Printf("open a channel failed, err:%v\n", err)
		return
	}
	defer ch.Close()

	// 3. 要发送,我们必须声明要发送到的队列。
	q, err := ch.QueueDeclare(
		"task_queue", // name
		true,         // 持久的
		false,        // delete when unused
		false,        // 独有的
		false,        // no-wait
		nil,          // arguments
	)
	if err != nil {
		fmt.Printf("declare a queue failed, err:%v\n", err)
		return
	}

	// 4. 然后我们可以将消息发布到声明的队列
	body := bodyFrom(os.Args)
	err = ch.Publish(
		"",     // exchange
		q.Name, // routing key
		false,  // 立即
		false,  // 强制
		amqp.Publishing{
			DeliveryMode: amqp.Persistent, // 持久
			ContentType:  "text/plain",
			Body:         []byte(body),
		})
	if err != nil {
		fmt.Printf("publish a message failed, err:%v\n", err)
		return
	}
	log.Printf(" [x] Sent %s", body)
}

// bodyFrom 从命令行获取将要发送的消息内容
func bodyFrom(args []string) string {
	var s string
	if (len(args) < 2) || os.Args[1] == "" {
		s = "hello"
	} else {
		s = strings.Join(args[1:], " ")
	}
	return s
}
消费者

这个文件起名叫:work.go

package main

import (
	"bytes"
	"fmt"
	"log"
	"time"

	"github.com/streadway/amqp"
)

func main() {
	conn, err := amqp.Dial("amqp://guest:guest@localhost:5672/")
	if err != nil {
		fmt.Printf("connect to RabbitMQ failed, err:%v\n", err)
		return
	}
	defer conn.Close()

	ch, err := conn.Channel()
	if err != nil {
		fmt.Printf("open a channel failed, err:%v\n", err)
		return
	}
	defer ch.Close()

	// 声明一个queue
	q, err := ch.QueueDeclare(
		"task_queue", // name
		true,         // 声明为持久队列
		false,        // delete when unused
		false,        // exclusive
		false,        // no-wait
		nil,          // arguments
	)
	err = ch.Qos(
		1,     // prefetch count
		0,     // prefetch size
		false, // global
	)
	if err != nil {
		fmt.Printf("ch.Qos() failed, err:%v\n", err)
		return
	}

	// 立即返回一个Delivery的通道
	msgs, err := ch.Consume(
		q.Name, // queue
		"",     // consumer
		false,  // 注意这里传false,关闭自动消息确认
		false,  // exclusive
		false,  // no-local
		false,  // no-wait
		nil,    // args
	)
	if err != nil {
		fmt.Printf("ch.Consume failed, err:%v\n", err)
		return
	}

	// 开启循环不断地消费消息
	forever := make(chan bool)
	go func() {
		for d := range msgs {
			log.Printf("Received a message: %s", d.Body)
			dotCount := bytes.Count(d.Body, []byte("."))
			t := time.Duration(dotCount)
			time.Sleep(t * time.Second)
			log.Printf("Done")
			d.Ack(false) // 手动传递消息确认
		}
	}()

	log.Printf(" [*] Waiting for messages. To exit press CTRL+C")
	<-forever
}

自己要去终端,DOS中去执行

go run new_task.go

go run work.go

注意常见的问题:

忘记确认

忘记确认是一个常见的错误。这是一个简单的错误,但后果是严重的。当你的客户机退出时,消息将被重新传递(这看起来像随机重新传递),但是RabbitMQ将消耗越来越多的内存,因为它无法释放任何未确认的消息。

为了调试这种错误,可以使用rabbitmqctl打印messages_unacknowledged字段:

sudo rabbitmqctl list_queues name messages_ready messages_unacknowledged

在Windows平台,去掉sudo

rabbitmqctl.bat list_queues name messages_ready messages_unacknowledged


持久化

将消息标记为持久性并不能完全保证消息不会丢失。尽管它告诉RabbitMQ将消息保存到磁盘上,但是RabbitMQ接受了一条消息并且还没有保存它时,仍然有一个很短的时间窗口。而且,RabbitMQ并不是对每个消息都执行fsync(2)——它可能只是保存到缓存中,而不是真正写入磁盘。持久性保证不是很强,但是对于我们的简单任务队列来说已经足够了。

如果需要更强有力的担保,那么可以使用publisher confirms。


ch.Qos

在一个有两个worker的情况下,当所有的奇数消息都是重消息而偶数消息都是轻消息时,一个worker将持续忙碌,而另一个worker几乎不做任何工作,RabbitMQ对此一无所知,仍然会均匀地发送消息。

因为RabbitMQ只是在消息进入队列时发送消息。它不考虑消费者未确认消息的数量。只是盲目地向消费者发送信息。

为了避免这种情况,我们可以将预取计数设置为1。这告诉RabbitMQ不要一次向一个worker发出多个消息。或者,换句话说,在处理并确认前一条消息之前,不要向worker发送新消息。相反,它将把它发送给下一个不忙的worker

err = ch.Qos(
  1,     // prefetch count
  0,     // prefetch size
  false, // global
)

关于队列大小的说明

如果所有的worker都很忙,你的queue随时可能会满。你会想继续关注这一点,也许需要增加更多的worker,或者有一些其他的策略。

使用消息确认和预取计数,可以设置工作队列(work queue)。即使RabbitMQ重新启动,持久性选项也可以让任务继续存在。

临时队列

当我们连接到Rabbit时,我们需要一个新的、空的队列。为此,我们可以创建一个随机名称的队列,或者更好的方法是让服务器为我们选择一个随机队列名称。

其次,一旦我们断开消费者的连接,队列就会自动删除。

在amqp客户端中,当我们传递一个空字符串作为队列名称时,我们将使用随机生成的名称创建一个非持久队列

当声明它的连接关闭时,该队列将被删除,因为它被声明为独占。

匹配规则

要说匹配规则,就不得不说一个东西,交换机类型

ExchangeType 是个表示的是交换机的类型,为什么要分类型呢?其实很简单,需求决定业务,业务决定技术。

所以这里的需求情况分为3种,

  1. 一个生产者生产的消息由一个消费者去消费,此时我们叫他(DIRECT
  2. 一个生产者生产的消息由有多个消费者去消费,此时我们叫他(FANOUT
  3. 一个生产者生产的消息有锁,必须由带有锁的消费者去消费,此时我们叫他(TOPIC
    • 队列绑定在交换机的时候,会指定一个bindingKey
    • 消息哪里指定一个outingKey
    • bindingKeyoutingKey满足条件就投递到相应的队列中

所以这里的匹配规则说到底其实就是主题交换机的规则。

发送到topic交换器的消息不能具有随意的routing_key——它必须是单词列表,以点分隔。这些词可以是任何东西,但通常它们指定与消息相关的某些功能。一些有效的routing_key示例:“stock.usd.nyse”,“nyse.vmw”,“quick.orange.rabbit”。routing_key中可以包含任意多个单词,最多255个字节

如果我们打破约定并发送一个或四个单词的消息,例如“orange”或“quick.orange.male.rabbit”,这些消息将不匹配任何绑定,并且将会丢失

绑定键也必须采用相同的形式topic交换器背后的逻辑类似于direct交换器——用特定路由键发送的消息将传递到所有匹配绑定键绑定的队列。但是,绑定键有两个重要的特殊情况:

  • *(星号)可以代替一个单词。
  • (井号)可以替代零个或多个单词。

topic交换器

topic交换器功能强大,可以像其他交换器一样运行。

当队列用“#”(井号)绑定键绑定时,它将接收所有消息,而与路由键无关,就像在fanout交换器中一样。

当在绑定中不使用特殊字符“*”(星号)和“#”(井号)时,topic交换器的行为就像direct交换器一样。

例子

生产者:emit_log_topic.go

package main

import (
        "log"
        "os"
        "strings"

        "github.com/streadway/amqp"
)

func failOnError(err error, msg string) {
        if err != nil {
                log.Fatalf("%s: %s", msg, err)
        }
}

func main() {
        conn, err := amqp.Dial("amqp://guest:guest@localhost:5672/")
        failOnError(err, "Failed to connect to RabbitMQ")
        defer conn.Close()

        ch, err := conn.Channel()
        failOnError(err, "Failed to open a channel")
        defer ch.Close()

        err = ch.ExchangeDeclare(
                "logs_topic", // name
                "topic",      // type
                true,         // durable
                false,        // auto-deleted
                false,        // internal
                false,        // no-wait
                nil,          // arguments
        )
        failOnError(err, "Failed to declare an exchange")

        body := bodyFrom(os.Args)
        err = ch.Publish(
                "logs_topic",          // exchange
                severityFrom(os.Args), // routing key
                false, // mandatory
                false, // immediate
                amqp.Publishing{
                        ContentType: "text/plain",
                        Body:        []byte(body),
                })
        failOnError(err, "Failed to publish a message")

        log.Printf(" [x] Sent %s", body)
}

func bodyFrom(args []string) string {
        var s string
        if (len(args) < 3) || os.Args[2] == "" {
                s = "hello"
        } else {
                s = strings.Join(args[2:], " ")
        }
        return s
}

func severityFrom(args []string) string {
        var s string
        if (len(args) < 2) || os.Args[1] == "" {
                s = "anonymous.info"
        } else {
                s = os.Args[1]
        }
        return s
}

消费者:receive_logs_topic.go

package main

import (
        "log"
        "os"

        "github.com/streadway/amqp"
)

func failOnError(err error, msg string) {
        if err != nil {
                log.Fatalf("%s: %s", msg, err)
        }
}

func main() {
        conn, err := amqp.Dial("amqp://guest:guest@localhost:5672/")
        failOnError(err, "Failed to connect to RabbitMQ")
        defer conn.Close()

        ch, err := conn.Channel()
        failOnError(err, "Failed to open a channel")
        defer ch.Close()

        err = ch.ExchangeDeclare(
                "logs_topic", // name
                "topic",      // type
                true,         // durable
                false,        // auto-deleted
                false,        // internal
                false,        // no-wait
                nil,          // arguments
        )
        failOnError(err, "Failed to declare an exchange")

        q, err := ch.QueueDeclare(
                "",    // name
                false, // durable
                false, // delete when unused
                true,  // exclusive
                false, // no-wait
                nil,   // arguments
        )
        failOnError(err, "Failed to declare a queue")

        if len(os.Args) < 2 {
                log.Printf("Usage: %s [binding_key]...", os.Args[0])
                os.Exit(0)
        }
  			// 绑定topic
        for _, s := range os.Args[1:] {
                log.Printf("Binding queue %s to exchange %s with routing key %s",
                        q.Name, "logs_topic", s)
                err = ch.QueueBind(
                        q.Name,       // queue name
                        s,            // routing key
                        "logs_topic", // exchange
                        false,
                        nil)
                failOnError(err, "Failed to bind a queue")
        }

        msgs, err := ch.Consume(
                q.Name, // queue
                "",     // consumer
                true,   // auto ack
                false,  // exclusive
                false,  // no local
                false,  // no wait
                nil,    // args
        )
        failOnError(err, "Failed to register a consumer")

        forever := make(chan bool)

        go func() {
                for d := range msgs {
                        log.Printf(" [x] %s", d.Body)
                }
        }()

        log.Printf(" [*] Waiting for logs. To exit press CTRL+C")
        <-forever
}

想要接收所有的日志:

go run receive_logs_topic.go "#"

只想接收“critical”日志:

go run receive_logs_topic.go "*.critical"

如何使用rabbit到这里就结束了。

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.coloradmin.cn/o/1262342.html

如若内容造成侵权/违法违规/事实不符,请联系多彩编程网进行投诉反馈,一经查实,立即删除!

相关文章

单片机薪资翻倍的学习方向

今天以一个案例&#xff0c;给大家分析下做单片机开发&#xff0c;薪资翻倍的底层逻辑和方法论&#xff0c;尽量做到有理有据。 我是2011年开始做单片机开发的&#xff0c;那几年&#xff0c;单片机的工资&#xff0c;可以说是惨不忍睹。 相关贴吧也是一片哀嚎&#xff0c;有些…

伦敦金新手和有经验的投资者 他们有什么不同?

按照笼统的分类&#xff0c;伦敦金市场中的投资者可以分为新手和有经验的&#xff0c;当然其中并没有明确的界限&#xff0c;不是说投资者做伦敦金交易满2年就一定会成为有经验的投资者。但是从下面我们的对比中&#xff0c;我们或许可以看出新手和有经验投资者的一些差别。 先…

「琥珀黄」农产品销售运营大屏助力农产品销售改革

农业作为国家经济的重要支柱产业&#xff0c;农产品销售一直备受关注。农产品销售一直是农业行业关注的焦点之一。随着科技进步和市场竞争的加剧&#xff0c;传统的销售方式面临着新的挑战。为了让农产品销售实现腾飞&#xff0c;我们需要打破传统&#xff0c;采用新的销售策略…

自然资源科普交互大屏助力自然资源的保护

在当代社会&#xff0c;自然资源的科学管理和可持续利用变得愈发重要。为了提高公众对于自然资源的认知和理解&#xff0c;科普交互大屏成为一个新兴的工具。它通过生动的图像和实时数据展示&#xff0c;以及与观众的互动方式&#xff0c;让人们更深入地了解自然资源和环境保护…

【Leetcode】907. 子数组的最小值之和

给定一个整数数组 arr&#xff0c;找到 min(b) 的总和&#xff0c;其中 b 的范围为 arr 的每个&#xff08;连续&#xff09;子数组。 由于答案可能很大&#xff0c;因此 返回答案模 10^9 7 。 示例 1&#xff1a; 输入&#xff1a;arr [3,1,2,4] 输出&#xff1a;17 解释&…

Spine深入学习———— 渲染

数据有了之后&#xff0c;就开始渲染 渲染相关 绘制顺序 骨架的绘制顺序就是一个插槽列表&#xff0c;在插槽列表中上方的附件在下方之上绘制&#xff0c;绘制顺序可以在层级树中的骨架下查看。 基础流程 渲染实现 以下按照cocos2dx的实现来 &#xff08;cocos2dx 3.7 spin…

leetCode 216.组合总和 III + 回溯算法 + 剪枝 + 图解 + 笔记

找出所有相加之和为 n 的 k 个数的组合&#xff0c;且满足下列条件&#xff1a; 只使用数字1到9每个数字 最多使用一次 返回 所有可能的有效组合的列表 。该列表不能包含相同的组合两次&#xff0c;组合可以以任何顺序返回 示例 1: 输入: k 3, n 7 输出: [[1,2,4]] 解释…

两部手机数据传输后备忘录不见了怎么回事

想必很多人都遇到过&#xff0c;当两部手机进行备忘录数据传输后&#xff0c;突然发现备忘录不见了&#xff0c;这让人不禁着急上火&#xff0c;我也曾经遇到过这种事情导致很多重要的内容都丢失了。 一般出现这种情况可能是因为&#xff0c;两部手机使用的是不同的云服务&…

重生奇迹MU魔法师操作技能

重生奇迹MU魔法师增加伤害加点方式 一、智力敏捷加点&#xff1a;2点智力1点敏捷&#xff0c;这种加点就是智敏结合的加点了&#xff0c;属性是不错的&#xff0c;提升了非常多的属性点&#xff0c;智力是偏重输出的&#xff0c;也是法师最常见的一种加点了&#xff0c;输出伤…

Mac电脑音乐标签管理 Yate 激活最新 for Mac

Yate是一款非常实用的音频编辑和标记软件&#xff0c;它提供了丰富的功能和工具来帮助用户编辑、整理和管理音频文件。无论是在音乐收藏管理、DJ和音乐制作方面&#xff0c;还是在其他需要处理大量音频文件的领域&#xff0c;Yate都是非常值得推荐的工具。 Yate for Mac功能特…

Java线程同步

认识线程同步 解决方案 方法一&#xff1a;同步代码块 package com.itheima.d3;public class ThreadTest {public static void main(String[] args) {Accout acc new Accout("ICBC-110",100000);new DrawThread(acc,"小明").start();//小明new DrawThread…

phpstudy安装redis

Redis 是一个开源的高性能键值存储数据库&#xff0c;广泛用于缓存、消息队列、会话管理和实时数据分析等应用场景。 使用 PHP Redis 扩展&#xff0c;你可以在 PHP 代码中使用一系列的函数来连接到 Redis 服务器&#xff0c;并执行各种操作&#xff0c;如设置和获取键值对、操…

ubuntu配置ssh

本教程中的涉及路径的所有命令都是在root用户下的&#xff0c;读者可将路径中的/root更改为/home/用户名 1、重置密码 新安装的系统需要在服务器控制台点击“重置密码”&#xff0c;为root用户设置一个密码 ————————————————————————————————…

城市安全守护者:分析无人机在交通领域的应用

随着科技的进步&#xff0c;无人机在交通领域的应用不断增加&#xff0c;为智慧交通管理提供了新便利。无人机凭借其灵活性&#xff0c;在违章取证、交通事故侦查、交通疏导等方面展现出巨大的应用潜力。无人机在交通领域的应用有哪些&#xff1f;跟着我们一探究竟。 1、违章取…

同为科技(TOWE)模块化定制化让每条PDU实现专属供电解决方案

作为追求最高功率和空间效率的动态数据中心的理想产品&#xff0c;模块化、定制化PDU是追求最高功率和空间效率的动态数据中心的理想产品。同为科技&#xff08;TOWE&#xff09;是我国PDU行业的开创者和领导者&#xff0c;曾率先于中国电源分配单元http://www.pdu.com.cn网站上…

第13周 预习、实验与作业:Java网络编程

目录 1 课前问题列表 1.编写一个网络程序&#xff0c;为了与其他网络程序通信&#xff0c;至少要知道对方的什么信息&#xff1f; 2.TCP与UDP协议有什么不同的呢&#xff1f;什么时候该选择哪种协议&#xff1f;HTTP使用的是TCP还是UDP&#xff1f;不重要的短信息传送之类的功能…

Docker Compose;docker-compose;docker compose

(一) Docker Compose | 菜鸟教程 --> --> --> -->

借助工具落地提高外包软件项目代码提交规范

随着外包软件项目的不断增加&#xff0c;代码提交规范成为了一个必须解决的问题。由于外包项目的特殊性&#xff0c;很难保证每个开发者都按照统一的规范开发代码。为了解决这个问题&#xff0c;我们可以借助工具来提高代码提交规范。Codigger这个工具来解决外包软件项目中的代…

aikit 2023 3D与机械臂结合!

引言 今天我们主要了解3D摄像头是如何跟机械臂应用相结合的。我们最近准备推出一款新的机械臂套装AI Kit 2023 3D&#xff0c;熟悉我们的老用户应该知道&#xff0c;我们之前的AI Kit 2023套装使用的是2D摄像头。 随着技术进步&#xff0c;市场需求和领域的扩大&#xff0c;2D的…

vue3 + element-plus + ts el-table封装

vue3 element-plus ts el-table封装 博客参考https://blog.csdn.net/weixin_45291937/article/details/125523244 1. 文件位置&#xff08;根据自己的需求&#xff09; 2. 在 custom 文件夹下面 创建 mytable 文件夹 3. 直接上代码 // index.vue<template><div …