【RabbitMQ】golang客户端教程2——工作队列

news2024/9/20 20:23:00

任务队列/工作队列

在这里插入图片描述
在上一个教程中,我们编写程序从命名的队列发送和接收消息。在这一节中,我们将创建一个工作队列,该队列将用于在多个工人之间分配耗时的任务。

工作队列(又称任务队列)的主要思想是避免立即执行某些资源密集型任务并且不得不等待这些任务完成。相反,我们安排任务异步地同时或在当前任务之后完成。我们将任务封装为消息并将其发送到队列,在后台运行的工作进程将取出消息并最终执行任务。当你运行多个工作进程时,任务将在他们之间共享。

这个概念在Web应用中特别有用,因为在Web应用中不可能在较短的HTTP请求窗口内处理复杂的任务,(译注:例如注册时发送邮件或短信验证码等场景)。

准备工作

在本教程的上一部分,我们发送了一条包含“ Hello World!”的消息。现在,我们将发送代表复杂任务的字符串。我们没有实际的任务,例如调整图像大小或渲染pdf文件,所以我们通过借助time.Sleep函数模拟一些比较耗时的任务。我们会将一些包含.的字符串封装为消息发送到队列中,其中每有一个.就表示需要耗费1秒钟的工作,例如,hello...表示一个将花费三秒钟的假任务。

我们将稍微修改上一个示例中的send.go代码,以允许从命令行发送任意消息。该程序会将任务安排到我们的工作队列中,因此我们将其命名为new_task.go

body := bodyFrom(os.Args)  // 从参数中获取要发送的消息正文
err = ch.Publish(
  "",           // exchange
  q.Name,       // routing key
  false,        // mandatory
  false,
  amqp.Publishing {
    DeliveryMode: amqp.Persistent,
    ContentType:  "text/plain",
    Body:         []byte(body),
  })
failOnError(err, "Failed to publish a message")
log.Printf(" [x] Sent %s", body)

下面是bodyFrom函数:

func bodyFrom(args []string) string {
    var s string
    if (len(args) < 2) || os.Args[1] == "" {
        s = "hello"
    } else {
        s = strings.Join(args[1:], " ")
    }
    return s
}

我们以前的receive.go程序也需要进行一些更改:它需要为消息正文中出现的每个.伪造一秒钟的工作。它将从队列中弹出消息并执行任务,因此我们将其称为worker.go

msgs, err := ch.Consume(
  q.Name, // queue
  "",     // consumer
  true,   // auto-ack
  false,  // exclusive
  false,  // no-local
  false,  // no-wait
  nil,    // args
)
failOnError(err, "Failed to register a consumer")

forever := make(chan bool)

go func() {
  for d := range msgs {
    log.Printf("Received a message: %s", d.Body)
    dot_count := bytes.Count(d.Body, []byte("."))  // 数一下有几个.
    t := time.Duration(dot_count)
    time.Sleep(t * time.Second)  // 模拟耗时的任务
    log.Printf("Done")
  }
}()

log.Printf(" [*] Waiting for messages. To exit press CTRL+C")
<-forever

请注意,我们的假任务模拟执行时间。

然后,我们就可以打开两个终端,分别执行new_task.goworker.go了。

# shell 1
go run worker.go
# shell 2
go run new_task.go

循环调度

使用任务队列的优点之一是能够轻松并行化工作。如果我们的工作正在积压,我们可以增加更多的工人,这样就可以轻松扩展。

首先,让我们尝试同时运行两个worker.go脚本。它们都将从队列中获取消息,但是究竟是怎样呢?让我们来看看。

你需要打开三个控制台。其中两个将运行worker.go脚本。这些控制台将成为我们的两个消费者——C1和C2。

# shell 1
go run worker.go
# => [*] Waiting for messages. To exit press CTRL+C
# shell 2
go run worker.go
# => [*] Waiting for messages. To exit press CTRL+C

在第三个控制台中,我们将发布新任务。启动消费者之后,你可以发布一些消息:

# shell 3
go run new_task.go msg1.
go run new_task.go msg2..
go run new_task.go msg3...
go run new_task.go msg4....
go run new_task.go msg5.....

然后我们在shell1shell2 两个窗口看到如下输出结果了:

# shell 1
go run worker.go
# => [*] Waiting for messages. To exit press CTRL+C
# => [x] Received a message: msg1.
# => [x] Received a message: msg3...
# => [x] Received a message: msg5.....
# shell 2
go run worker.go
# => [*] Waiting for messages. To exit press CTRL+C
# => [x] Received a message: msg2..
# => [x] Received a message: msg4....

默认情况下,RabbitMQ将按顺序将每个消息发送给下一个消费者。平均而言,每个消费者都会收到相同数量的消息。这种分发消息的方式称为轮询。使用三个或者更多worker试一下。

消息确认

work 完成任务可能需要耗费几秒钟,如果一个worker在任务执行过程中宕机了该怎么办呢?我们当前的代码中,RabbitMQ一旦向消费者传递了一条消息,便立即将其标记为删除。在这种情况下,如果你终止一个worker那么你就可能会丢失这个任务,我们还将丢失所有已经交付给这个worker的尚未处理的消息。

我们不想丢失任何任务,如果一个worker意外宕机了,那么我们希望将任务交付给其他worker来处理。

为了确保消息永不丢失,RabbitMQ支持 消息确认。消费者发送回一个确认(acknowledgement),以告知RabbitMQ已经接收,处理了特定的消息,并且RabbitMQ可以自由删除它。

如果使用者在不发送确认的情况下死亡(其通道已关闭,连接已关闭或TCP连接丢失),RabbitMQ将了解消息未完全处理,并将对其重新排队。如果同时有其他消费者在线,它将很快将其重新分发给另一个消费者。这样,您可以确保即使工人偶尔死亡也不会丢失任何消息。

没有任何消息超时;RabbitMQ将在消费者死亡时重新传递消息。即使处理一条消息需要很长时间也没关系。

在本教程中,我们将使用手动消息确认,方法是为“auto-ack”参数传递一个false,然后在完成任务后,使用d.Ack(false)worker发送一个正确的确认(这将确认一次传递)。

msgs, err := ch.Consume(
	q.Name, // queue
	"",     // consumer
	false,  // 注意这里传false,关闭自动消息确认
	false,  // exclusive
	false,  // no-local
	false,  // no-wait
	nil,    // args
)
if err != nil {
	fmt.Printf("ch.Consume failed, err:%v\n", err)
	return
}

// 开启循环不断地消费消息
forever := make(chan bool)
go func() {
	for d := range msgs {
		log.Printf("Received a message: %s", d.Body)
		dotCount := bytes.Count(d.Body, []byte("."))
		t := time.Duration(dotCount)
		time.Sleep(t * time.Second)
		log.Printf("Done")
		d.Ack(false) // 手动传递消息确认
	}
}()

使用这段代码,我们可以确保即使你在处理消息时使用CTRL+C杀死一个worker,也不会丢失任何内容。在worker死后不久,所有未确认的消息都将被重新发送。

消息确认必须在接收消息的同一通道(Channel)上发送。尝试使用不同的通道(Channel)进行消息确认将导致通道级协议异常。

忘记确认

忘记确认是一个常见的错误。这是一个简单的错误,但后果是严重的。当你的客户机退出时,消息将被重新传递(这看起来像随机重新传递),但是RabbitMQ将消耗越来越多的内存,因为它无法释放任何未确认的消息。

为了调试这种错误,可以使用rabbitmqctl打印messages_unacknowledged字段:

> sudo rabbitmqctl list_queues name messages_ready messages_unacknowledged
> ```
>
> 在Windows平台,去掉sudo
>
> ```bash
> rabbitmqctl.bat list_queues name messages_ready messages_unacknowledged
> ```

### 消息持久化

我们已经学会了如何确保即使消费者死亡,任务也不会丢失。但是如果RabbitMQ服务器停止运行,我们的任务仍然会丢失。

当RabbitMQ退出或崩溃时,它将忘记队列和消息,除非您告诉它不要这样做。要确保消息不会丢失,需要做两件事:我们需要将队列和消息都标记为持久的。

首先,我们需要确保队列能够在RabbitMQ节点重新启动后继续运行。为此,我们需要声明它是持久的:

```go
q, err := ch.QueueDeclare(
	"hello", // name
	true,    // 声明为持久队列
	false,   // delete when unused
	false,   // exclusive
	false,   // no-wait
	nil,     // arguments
)

虽然这个命令本身是正确的,但它在我们当前的设置中不起作用。这是因为我们已经定义了一个名为hello的队列,它不是持久的。RabbitMQ不允许你使用不同的参数重新定义现有队列,并将向任何尝试重新定义的程序返回错误。但是有一个快速的解决方法——让我们声明一个具有不同名称的队列,例如task_queue

q, err := ch.QueueDeclare(
	"task_queue", // name
	true,         // 声明为持久队列
	false,        // delete when unused
	false,        // exclusive
	false,        // no-wait
	nil,          // arguments
)

这种持久的选项更改需要同时应用于生产者代码和消费者代码。

在这一点上,我们确信即使RabbitMQ重新启动,任务队列队列也不会丢失。现在我们需要将消息标记为持久的——通过使用amqp.Publishing中的持久性选项amqp.Persistent

err = ch.Publish(
	"",     // exchange
	q.Name, // routing key
	false,  // 立即
	false,  // 强制
	amqp.Publishing{
		DeliveryMode: amqp.Persistent, // 持久(交付模式:瞬态/持久)
		ContentType:  "text/plain",
		Body:         []byte(body),
	})

有关消息持久性的说明

将消息标记为持久性并不能完全保证消息不会丢失。尽管它告诉RabbitMQ将消息保存到磁盘上,但是RabbitMQ接受了一条消息并且还没有保存它时,仍然有一个很短的时间窗口。而且,RabbitMQ并不是对每个消息都执行fsync(2)——它可能只是保存到缓存中,而不是真正写入磁盘。持久性保证不是很强,但是对于我们的简单任务队列来说已经足够了。如果您需要更强有力的担保,那么您可以使用publisher confirms

公平分发

你可能已经注意到调度仍然不能完全按照我们的要求工作。例如,在一个有两个worker的情况下,当所有的奇数消息都是重消息而偶数消息都是轻消息时,一个worker将持续忙碌,而另一个worker几乎不做任何工作。嗯,RabbitMQ对此一无所知,仍然会均匀地发送消息。

这是因为RabbitMQ只是在消息进入队列时发送消息。它不考虑消费者未确认消息的数量。只是盲目地向消费者发送信息。
在这里插入图片描述
为了避免这种情况,我们可以将预取计数设置为1。这告诉RabbitMQ不要一次向一个worker发出多个消息。或者,换句话说,在处理并确认前一条消息之前,不要向worker发送新消息。相反,它将把它发送给下一个不忙的worker

err = ch.Qos(
  1,     // prefetch count
  0,     // prefetch size
  false, // global
)

关于队列大小的说明

如果所有的worker都很忙,你的queue随时可能会满。你会想继续关注这一点,也许需要增加更多的worker,或者有一些其他的策略。

完整的代码实例

我们的new_task.go的最终代码代入如下:

package main

import (
	"github.com/streadway/amqp"
	"log"
	"os"
	"strings"
)

func failOnErrorNew(err error, msg string) {
	if err != nil {
		log.Fatalf("%s: %s", msg, err)
	}
}
func bodyFrom(args []string) string {
	var s string
	if len(args) < 2 || os.Args[1] == "" {
		s = "hello"
	} else {
		s = strings.Join(args[1:], "")
	}
	return s
}

func main() {
	//建立连接
	conn, err := amqp.Dial("amqp://licong:123456@8.130.85.112:5672/")
	failOnErrorNew(err, "Failed to connect to RabbitMQ")
	defer conn.Close()
	//获取channel
	ch, err := conn.Channel()
	failOnErrorNew(err, "Failed to open a channel")
	defer ch.Close()

	q, err := ch.QueueDeclare(
		"task_queue", //name
		true,         //durable
		false,        //delete when unused
		false,        //exclusive
		false,        //no-wait
		nil,          //arguments
	)
	failOnErrorNew(err, "Failed to declare a queue ")

	body := bodyFrom(os.Args)

	err = ch.Publish(
		"",     // exchange
		q.Name, // routing key
		false,  // mandatory
		false,
		amqp.Publishing{
			DeliveryMode: amqp.Persistent, //表示将消息设置为持久模式,确保在 RabbitMQ 重启后消息能够被恢复。
			ContentType:  "text/plain",
			Body:         []byte(body),
		})
	failOnErrorNew(err, "Failed to publish a message")
	log.Printf(" [x] Sent %s", body)

}

work.go内容如下:

package main

import (
	"bytes"
	"github.com/streadway/amqp"
	"log"
	"time"
)

func failOnErrorWork(err error, msg string) {
	if err != nil {
		log.Fatalf("%s: %s", msg, err)
	}
}
func main() {
	//建立连接
	conn, err := amqp.Dial("amqp://licong:123456@8.130.85.112:5672/")
	failOnErrorWork(err, "Failed to connect to RabbitMQ")
	defer conn.Close()
	//获取channel
	ch, err := conn.Channel()
	failOnErrorWork(err, "Failed to open a channel")
	defer ch.Close()
	//声明队列
	q, err := ch.QueueDeclare(
		"task_queue", //name
		true,         //durable
		false,        //delete when unused
		false,        //exclusive
		false,        //no-wait
		nil,          //argument
	)
	failOnErrorWork(err, "Failed to declare a queue")
	err = ch.Qos(
		1,     //prefetch count 消费者一次能获取的最大未确认消息数量
		0,     //prefetch size 消费者一次能获取的最大未确认消息的总大小(以字节为单位)
		false, //global  是否将预取配置应用于所有通道。如果为 true,则应用于所有通道;如果为 false,则仅应用于当前通道。
	)
	//获取接收消息的Delivery通道
	msgs, err := ch.Consume(
		q.Name, //queue
		"",     //consumer
		false,  //注意这里传false,关闭自动消息确认
		false,  //exclusive
		false,  //no-local
		false,  //no-wait
		nil,    //args
	)
	failOnErrorWork(err, "Failed to register a consumer")
	forever := make(chan bool)
	go func() {
		for d := range msgs {
			log.Printf("Reveived a message:%s", d.Body)
			dot_count := bytes.Count(d.Body, []byte(".")) //数一下有几个
			t := time.Duration(dot_count)
			time.Sleep(t * time.Second)
			log.Printf("Done")
			d.Ack(false) //手动传递消息确认
		}
	}()
	log.Printf("[*] Waiting for messages. To exit press CTRL+C")
	<-forever
}

源自:https://www.rabbitmq.com/getstarted.html

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.coloradmin.cn/o/812409.html

如若内容造成侵权/违法违规/事实不符,请联系多彩编程网进行投诉反馈,一经查实,立即删除!

相关文章

[golang gin框架] 43.Gin商城项目-微服务实战之后台Rbac微服务之管理员的增删改查以及管理员和角色关联

上一节讲解了后台Rbac微服务角色增删改查微服务,这里讲解权限管理Rbac微服务管理员的增删改查微服务以及管理员和角色关联微服务功能 一.实现后台权限管理Rbac之管理员增删改查微服务服务端功能 1.创建Manager模型 要实现管理员的增删改查,就需要创建对应的模型,故在server/r…

使用Beego和MySQL实现帖子和评论的应用,并进行接口测试(附源码和代码深度剖析)

文章目录 小项目介绍源码分析main.gorouter.gomodels/user.gomodels/Post.gomodels/comment.gocontrollers/post.gocontrollers/comment.go 接口测试测试增加帖子测试查看帖子测试增加评论测试查看评论 小项目介绍 经过对需求的分析&#xff0c;我增加了一些额外的东西&#x…

Linux学习之脚本优先级控制

fork炸弹 在编写Shell脚本时不要写出不可控的死循环&#xff0c;比如func() { func | func& } ; func&#xff0c;简写版为.(){ .|.& };.。接下来见证一下这两条语句的威力。因为在root用户下许多资源没有限制&#xff0c;所以useradd userfork新建一个用户userfork&a…

fwrite函数

1、函数声明 size_t fwrite( const void *buffer, size_t size, size_t count, FILE *stream ); 2、参数说明 buffer 指向要写入的数据的指针。 size 项大小&#xff08;以字节为单位&#xff09;。 count 要写入的项的最大数量。 stream 指向 FILE 结构的指针。 3、…

OpenCloudOS 与PolarDB全面适配

近日&#xff0c;OpenCloudOS 开源社区签署阿里巴巴开源 CLA (Contribution License Agreement, 贡献许可协议), 正式与阿里云 PolarDB 开源数据库社区牵手&#xff0c;并展开 OpenCloudOS &#xff08;V8&#xff09;与阿里云开源云原生数据库 PolarDB 分布式版、开源云原生数…

AD21原理图的高级应用(四)线束的设计及应用

&#xff08;四&#xff09;线束的设计及应用 Altium Designer 21 可以使用 Signal Harnesses(信号线束)的方法来建立元件之间的连接,也可用于不同原理图间的信号对接。信号线束是一种抽象连接,操作方式类似于总线,但信号线束可对包括总线、导线和其他信号线束在内的不同信号进…

el-button增加下载功能

vue3和element-plus <el-uploadv-model:file-list="fileList"action="/api/upload"multiple:limit="1":headers="headers" ><el-button type="primary">选择文件</el-button><template #file

【前缀和】560.和为 K 的子数组

Halo&#xff0c;这里是Ppeua。平时主要更新C&#xff0c;数据结构算法&#xff0c;Linux与ROS…感兴趣就关注我bua&#xff01; 和为K的子数组 题目:示例:题解&#xff1a;解法一:解法二: 题目: 示例: 题解&#xff1a; 解法一: 暴力解法:我们很容易想到通过两个for循环去遍…

【使用 DSP 滤波器加速速度和位移】使用信号处理算法过滤加速度数据并将其转换为速度和位移研究(Matlab代码实现)

&#x1f4a5;&#x1f4a5;&#x1f49e;&#x1f49e;欢迎来到本博客❤️❤️&#x1f4a5;&#x1f4a5; &#x1f3c6;博主优势&#xff1a;&#x1f31e;&#x1f31e;&#x1f31e;博客内容尽量做到思维缜密&#xff0c;逻辑清晰&#xff0c;为了方便读者。 ⛳️座右铭&a…

操作系统理论:Linux进程与进程状态(进程调度的大O(1)算法数据结构模型)

文章目录 一.进程的基本概念进程间的基本关系:父子关系 二.进程状态(1)进程的运行状态RLinux进程调度的大O(1)算法数据结构模型(运行队列哈希桶):进程的运行时间片 (2)进程的睡眠状态(S和D)(3)进程的僵尸状态和死亡状态 一.进程的基本概念 冯诺依曼体系的计算机在运行时,内存中…

基于C语言 --- 自己写一个三子棋小游戏

C语言程序设计笔记---019 初阶三子棋小游戏(开源)1、arr_main.c程序大纲2、arr_game1.h3、arr_game1.c3.1、 自定义初识化函数 InitBoard( ) 和 自定义显示函数 DisPlayBoard( )3.2、 自定义玩家下棋函数 PlayerMove( )3.4、 自定义电脑下棋函数 ComputerMove( )3.5、 输赢判断…

成为一名黑客(网络安全),需要掌握哪些黑客技能?

前言 黑客技能是一项非常复杂和专业的技能&#xff0c;需要广泛的计算机知识和网络安全知识。你可以参考下面一些学习步骤&#xff0c;系统自学网络安全。 在学习之前&#xff0c;要给自己定一个目标或者思考一下要达到一个什么样的水平&#xff0c;是学完找工作&#xff08;…

使用DataX实现mysql与hive数据互相导入导出

一、概论 1.1 什么是DataX DataX 是阿里巴巴开源的一个异构数据源离线同步工具&#xff0c;致力于实现包括关系型数据库(MySQL、Oracle 等)、HDFS、Hive、ODPS、HBase、FTP 等各种异构数据源之间稳定高效的数据同步功能。 1.2 DataX 的设计 为了解决异构数据源同步问题&#xf…

【外卖系统】分类管理业务

公共字段自动填充 需求分析 对于之前的开发中&#xff0c;有创建时间、创建人、修改时间、修改人等字段&#xff0c;在其他功能中也会有出现&#xff0c;属于公共字段&#xff0c;对于这些公共字段最好是在某个地方统一处理以简化开发&#xff0c;使用Mybatis Plus提供的公共…

简单记录牛客top101算法题(初级题C语言实现)判断回文字符串 反转字符串 合并两个有序的数组

1. 判断是否为回文字符串 给定一个长度为 n 的字符串&#xff0c;请编写一个函数判断该字符串是否回文。如果是回文请返回true&#xff0c;否则返回false。   字符串回文指该字符串正序与其逆序逐字符一致。 //示例 输入&#xff1a;"ranko" 返回值&#xff1a;fa…

DevOps系列文章之 自动化测试大全(单测和集成测试)

自动化测试业界主流工具 核心目标&#xff1a; 主要是功能测试和覆盖率测试 业界常用主流工具 GoogleTest GoogleTest是一个跨平台的(Liunx、Mac OS X、Windows 、Cygwin 、Windows CE and Symbian ) C单元测试框架&#xff0c;由google公司发布&#xff0c;为在不同平台上为编…

案例:缺陷个数与返工工作量强相关

某公司积累了21个项目缺陷个数与返工工作量的数据&#xff0c;如下表所示&#xff1a; 项目序号缺陷修复工时缺陷数1943314452299040536347446471385496071061370246774066812232189276652810830213781162678126111511381110514209032015144023516516078417710010301875601239…

3.Makefile变量的用法(附示例)

一、本节概要 本专栏所有内容围绕Makefile官方文档进行刨析,给出详细具体示例做辅助理解手撕Makefile官方手册 二、Makefile中的变量 1、没有使用变量的makefile 以下是不使用变量的makefile完整示例: edit: main.o kbd.o command.o display.o insert.o search.o files…

django channels实战(websocket底层原理和案例)

1、websocket相关 1.1、轮询 1.2、长轮询 1.3、websocket 1.3.1、websocket原理 1.3.2、django框架 asgi.py在django项目同名app目录下 1.3.3、聊天室 django代码总结 小结 1.3.4、群聊&#xff08;一&#xff09; 前端代码 后端代码 1.3.5、群聊&#xff08;二&#xff09…

网络编程 IO多路复用 [epoll版] (TCP网络聊天室)

//head.h 头文件 //TcpGrpSer.c 服务器端 //TcpGrpUsr.c 客户端 通过IO多路复用实现服务器在单进程单线程下可以与多个客户端交互 API epoll函数 #include<sys/epoll.h> int epoll_create(int size); 功能&#xff1a;创建一个epoll句柄//创建红黑树根…