nsq中diskqueue详解 - 第二篇

news2025/1/16 21:49:29

上一篇博客 nsq中diskqueue详解 - 第一篇_YZF_Kevin的博客-CSDN博客 中我们讲了diskqueue是什么,为什么需要它,它的整体架构流程,以及对外接口等等,如果你还没了解过,强烈建议先看一下,不然直接看这篇博客的话可能会有点儿懵

这篇博客开始,我们从源码角度分析disqueue是怎么实现的。为了避免纯代码分析的枯燥性,我会尽量把内容讲解占一大部分,纯代码分析占一小部分

1、diskqueue的文件存储

diskqueue的文件分为两种类型:元数据文件消息数据文件

1.1 元数据文件

也叫描述数据,记载了disqueue的整体信息,文件格式为 xxxx.diskqueue.meta.dat,其中的xxxx即队列名字。之所以要求队列名字相互不重复,也是为了防止元数据文件重名的问题。注意:一个diskqueue只会有一个元数据文件

文件一共只有3行

第一行    ->    总消息数

第二行    ->    当前读的文件编号,读的位置

第三行    ->    当前写的文件编号,写的位置

diskqueue在初始化的时候,会尝试从磁盘中加载元数据文件

1.2 消息数据文件

存储了所有未处理的消息,文件格式为xxxx.00000n.dat,其中的xxxx即队列名字,n表第几个文件。一个diskqueue至少有一个消息数据文件,也可能有多个

文件内只有消息数据,所有消息紧密排列;每个消息的格式为:消息长度(4字节) + 消息体数据;

如下图

2. diskqueue的启动入口

diskquque对外提供的启动入口是New()函数,使用者需通过调用diskqueue.New(xxx,...)来创建一个diskqueue对象

举例:nsq中创建topic对象后就会调用diskqueue.New()函数来创建topic的持久化队列,如下

// 新创建topic
func NewTopic(topicName string, nsqd *NSQD, deleteCallback func(*Topic)) *Topic {
    // 省略无关代码......
    t.backend = diskqueue.New( // 永久topic的持久化队列
		topicName,
		nsqd.getOpts().DataPath,
		nsqd.getOpts().MaxBytesPerFile,
		int32(minValidMsgLength),
		int32(nsqd.getOpts().MaxMsgSize)+minValidMsgLength,
		nsqd.getOpts().SyncEvery,
		nsqd.getOpts().SyncTimeout,
		dqLogf,
	)
    // 省略无关代码......
}

可以看到,调用diskqueue.New()函数时需要传入一些参数:队列名字,数据存储目录,单个数据文件最大等等

现在我们看下diskqueue.New()函数源码,如下(已添加详细注释)

// 新建一个diskQueue的实例
// name				:	队列名字,需唯一
// dataPath			:	保存文件时的路径
// maxBytesPerFile	:	单个文件的最大字节数
// minMsgSize		:	单个msg的最小字节数
// maxMsgSize		:	单个msg的最大字节数
// syncEvery		:	读写多少次触发刷到磁盘
// syncTimeout		:	同步到文件的间隔
// logf				:	日志函数
func New(name string, dataPath string, maxBytesPerFile int64, minMsgSize int32, maxMsgSize int32, syncEvery int64, syncTimeout time.Duration, logf AppLogFunc) Interface {
	d := diskQueue{
		name:              name,				// 队列名字
		dataPath:          dataPath,			// 目录
		maxBytesPerFile:   maxBytesPerFile,		// 单个文件最大字节数,写满了就建下一个文件
		minMsgSize:        minMsgSize,			// 单个msg最小字节数
		maxMsgSize:        maxMsgSize,			// 单个msg最大字节数
		readChan:          make(chan []byte),	// 对外的读消息通道
		peekChan:          make(chan []byte),	// 对外的查看通道
		depthChan:         make(chan int64),	// 无缓冲的通道
		writeChan:         make(chan []byte),
		writeResponseChan: make(chan error),
		emptyChan:         make(chan int),
		emptyResponseChan: make(chan error),
		exitChan:          make(chan int),
		exitSyncChan:      make(chan int),
		syncEvery:         syncEvery,			// 读写多少次,向磁盘同步一次
		syncTimeout:       syncTimeout,			// 每多少秒检测一次是否需同步磁盘
		logf:              logf,				// 日志函数
	}

	// 从文件中恢复队列的元数据(消息个数,读文件号,读的位置,写文件号,写的位置)
	err := d.retrieveMetaData()
	if err != nil && !os.IsNotExist(err) { // 文件不存在的情况是正常的,其他错误需报错
		d.logf(ERROR, "DISKQUEUE(%s) failed to retrieveMetaData - %s", d.name, err)
	}

	// 新开一个协程,专门处理读写
	go d.ioLoop()

	return &d
}

对上面的代码解释下

1. 根据New()函数传入的参数,初始化本队列的属性,例如队列名字,数据存取的目录,单个数据文件的最大字节数,单个msg的最大最小字节数,日志函数等等。值得一提的是,diskqueue自己不提供日志函数,必须使用方传入

2. 创建diskquue对象后,调用 retrieveMetaData() 函数来读取原来的数据,注意:原数据可能存在,也可能不存在,都是正常的。比如nsq启动时可能有以前的旧数据,那么启动时就应加载再次分发。如果没有,说明消息都处理完了或者是nsq第一次启动,也属正常

3. 启动一个新协程 ioloop() ,这个ioloop()是整个diskqueue的运转核心,负责真正接收消息,文件的创建销毁,文件读写光标的移动,存盘等,后面会详细讲解

3. 元数据文件的读取

我们已经了解到diskqueue启动时会调用retrieveMetaData()函数进行元数据的读取,

前面也讲了,元数据文件的格式很简单,文件一共只有3行

第一行    ->    总消息数

第二行    ->    当前读的文件编号,读的位置

第三行    ->    当前写的文件编号,写的位置

读取元数据,函数retrieveMetaData()的代码如下(已添加详细注释)

// 从文件中恢复disQueue的元数据
func (d *diskQueue) retrieveMetaData() error {
	var f *os.File
	var err error

	// meta文件名(带全路径)
	fileName := d.metaDataFileName()

	// 以只读的方式打开文件
	f, err = os.OpenFile(fileName, os.O_RDONLY, 0600)
	if err != nil {
		return err
	}
	defer f.Close()

	var depth int64

	// 从文件中读取,文件一共三行
	// 第一行格式:%d  	表总的消息数量
	// 第二行格式:%d,%d  表 readFileNum(当前读的文件编号),readPos(读的位置)
	// 第三行格式:%d,%d  表 writeFileNum(当前写的文件编号),writePos(写的位置)
	_, err = fmt.Fscanf(f, "%d\n%d,%d\n%d,%d\n",
		&depth,
		&d.readFileNum, &d.readPos,
		&d.writeFileNum, &d.writePos)
	if err != nil {
		return err
	}
	d.depth 			= depth			// 总的消息数
	d.nextReadFileNum 	= d.readFileNum	// 要读文件的编号
	d.nextReadPos 		= d.readPos		// 要读文件的位置,即文件读取游标

	// 要写入的数据文件名(带全路径)
	fileName = d.fileName(d.writeFileNum)
	fileInfo, err := os.Stat(fileName)	// os.Stat是获取文件的整体信息,例如大小,路径,修改时间,是否目录等
	if err != nil {
		return err
	}
	fileSize := fileInfo.Size() // 取得文件大小

	// 正常情况下writePos应等于文件size;如果出现了writePos比文件size小,可能是消息数据有写入但元数据没同步,安全的做法是新开一个文件重新写,至于这个文件的内容也承认
	if d.writePos < fileSize {
		d.logf(WARN, "DISKQUEUE(%s) %s metadata writePos %d < file size of %d, skipping to new file", d.name, fileName, d.writePos, fileSize)
		d.writeFileNum += 1		// 要写入的文件编号(后面发现writeFile指针为nil,会新建)
		d.writePos 		= 0		// 写入位置从0开始
		// 当前的写入文件关闭吧,后面建新的写入文件了
		if d.writeFile != nil {
			d.writeFile.Close()
			d.writeFile = nil
		}
	}

	return nil
}

对上面的代码解释下

1. 调用metaDataFileName()函数,该函数内组装得到元数据文件的全路径名,然后调用os.OpenFile()函数以只读的方式打开文件,文件不存在的话err不为空就返回了

2. 调用 fmt.Fscanf(f, "%d\n%d,%d\n%d,%d\n"),读取3行数据,分别赋值给 d.depth,d.readFileNum, d.readPos,d.writeFileNum,d.writePos

3. 取得写文件的信息,正常情况下,d.writePos应该等于写文件的实际大小。如果d.writePos比文件实际大小要小,说明出错了。有可能是数据写入后,但是d.writePos没更新导致的。这个时候就新建一个写文件,写入位置重新从0开始

4. 元数据文件的写入

看完了读取元数据,再来看下元数据的写入

diskqueue会在以下几种情况下调用persistMetaData()函数进行元数据的保存

分别是:队列关闭或退出时,写入的文件写满时,读写次数达到syncEvery时

persistMetaData()的代码如下(已添加详细注释)

// 把元数据保存到文件中
func (d *diskQueue) persistMetaData() error {
	var f *os.File
	var err error

	// 元数据的最终文件名
	fileName := d.metaDataFileName()
	// 元数据的临时文件名
	tmpFileName := fmt.Sprintf("%s.%d.tmp", fileName, rand.Int())

	// 以读写方式(没有则新建)打开文件
	f, err = os.OpenFile(tmpFileName, os.O_RDWR|os.O_CREATE, 0600)
	if err != nil {
		return err
	}

	// 使用Fprintf()往文件缓冲区写
	// 第一行格式:%d  	存未读的消息数量
	// 第二行格式:%d,%d  存 readFileNum(当前读的文件编号),readPos(读的位置)
	// 第三行格式:%d,%d  存 writeFileNum(当前写的文件编号),writePos(写的位置)
	_, err = fmt.Fprintf(f, "%d\n%d,%d\n%d,%d\n",
		d.depth,
		d.readFileNum, d.readPos,
		d.writeFileNum, d.writePos)
	if err != nil {
		f.Close()
		return err
	}
	f.Sync()	// 刷到磁盘
	f.Close()	// 关闭文件

	// 重命名为最终文件名
	return os.Rename(tmpFileName, fileName)
}

对上面的代码解释下

1. 调用metaDataFileName()函数,该函数内组装得到元数据文件的全路径名,随机一个数字,加上.tmp作为后缀,组成一个临时文件名

2. 以读写的方式,新建这个临时文件

3. 调用 fmt.Fprintf(f, "%d\n%d,%d\n%d,%d\n") 写入3行数据,依次把d.depth,d.readFileNum, d.readPos,d.writeFileNum,d.writePos写入文件,

4. 调用f.Sync()强制刷到磁盘,关闭这个临时文件

5. 最后以文件重命名的方式,改为真正的元数据文件

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.coloradmin.cn/o/967758.html

如若内容造成侵权/违法违规/事实不符,请联系多彩编程网进行投诉反馈,一经查实,立即删除!

相关文章

AVR128单片机 USART通信控制发光二极管显示

一、系统方案 二、硬件设计 原理图如下&#xff1a; 三、单片机软件设计 1、首先是系统初始化 void port_init(void) { PORTA 0xFF; DDRA 0x00;//输入 PORTB 0xFF;//低电平 DDRB 0x00;//输入 PORTC 0xFF;//低电平 DDRC 0xFF;//输出 PORTE 0xFF; DDRE 0xfE;//输出 PO…

Leetcode Top 100 Liked Questions(序号236~347)

236. Lowest Common Ancestor of a Binary Tree 题意&#xff1a;二叉树&#xff0c;求最近公共祖先&#xff0c;All Node.val are unique. 我的思路 首先把每个节点的深度得到&#xff0c;之后不停向上&#xff0c;直到val相同&#xff0c;存深度就用map存吧 但是它没有向…

Lesson4-2:OpenCV图像特征提取与描述---Harris和Shi-Tomas算法

学习目标 理解Harris和Shi-Tomasi算法的原理能够利用Harris和Shi-Tomasi进行角点检测 1 Harris角点检测 1.1 原理 H a r r i s Harris Harris角点检测的思想是通过图像的局部的小窗口观察图像&#xff0c;角点的特征是窗口沿任意方向移动都会导致图像灰度的明显变化&#xff…

【多线程】线程间通信及状态

文章目录 1. 线程间的通信1.1 wait和notify1.2 notify随机唤醒1.3 notifyAll()1.4 join() 2. 线程间的状态3. 验证线程的状态3.1 验证NEW、RUNNABLE、TERMINATED3.2 验证WAITING3.3 验证TIMED-WAITING3.4 验证BLOCKED 4. 面试题&#xff1a;wait和sleep对比 1. 线程间的通信 1…

人工智能轨道交通行业周刊-第58期(2023.8.28-9.3)

本期关键词&#xff1a;成都智慧工厂、机务段、站台地标、备案大模型、AIGC报告 1 整理涉及公众号名单 1.1 行业类 RT轨道交通人民铁道世界轨道交通资讯网铁路信号技术交流北京铁路轨道交通网上榜铁路视点ITS World轨道交通联盟VSTR铁路与城市轨道交通RailMetro轨道世界铁路…

Redis 缓存穿透击穿和雪崩

一、说明 Redis 缓存的使用&#xff0c;极大的提升了应用程序的性能和效率&#xff0c;特别是数据查询方面。但同时&#xff0c;它也带来了一些问题。其中&#xff0c;最要害的问题&#xff0c;就是数据的一致性问题&#xff0c;从严格意义上讲&#xff0c;这个问题无解。如果对…

C# Color颜色RGB对照表

序号Color色系颜色RGB图例1Color.AliceBlue蓝色艾丽丝蓝240,248,2552Color.AntiqueWhite白色古典白色250,235,2153Color.Aqua&#xff0c;Color.Cyan青色浅蓝色&#xff0c;蓝绿色&#xff0c;青色0,255,255 C# Color颜色RGB对照表_旭东怪的博客-CSDN博客 C#颜色和名称样式对照…

Nginx全家桶配置详解

源码包安装NGINX A&#xff0c;搭建Web Server&#xff0c;任意HTML页面&#xff0c;其8080端口提供Web访问服务&#xff0c;截图成功访问http(s)&#xff1a;//[Server1]:8080并且回显Web页面。保留Server1&#xff0c;但是不允许直接访问Server 1&#xff0c;再部署1套NGINX …

8.(Python数模)马尔科夫链预测

Python实现马尔科夫链预测 马尔科夫链原理 马尔科夫链是一种进行预测的方法&#xff0c;常用于系统未来时刻情况只和现在有关&#xff0c;而与过去无关。 用下面这个例子来讲述马尔科夫链。 如何预测下一时刻计算机发生故障的概率&#xff1f; 当前状态只存在0&#xff08;故…

1分钟实现 CLIP + Annoy + Gradio 文搜图+图搜图 系统

多模态图文搜索系统 CLIP 进行 Text 和 Image 的语义EmbeddingAnnoy 向量数据库实现树状结构索引来加速最近邻搜索Gradio 轻量级的机器学习 Web 前端搭建 文搜图 图搜图 CLIP图像语义提取功能&#xff01;

生信分析Python实战练习 3 | 视频21

开源生信 Python教程 生信专用简明 Python 文字和视频教程 源码在&#xff1a;https://github.com/Tong-Chen/Bioinfo_course_python 目录 背景介绍 编程开篇为什么学习Python如何安装Python如何运行Python命令和脚本使用什么编辑器写Python脚本Python程序事例Python基本语法 数…

已解决下载安装Python官网安装包下载速度慢问题

本文摘要&#xff1a;本文已解决下载安装Python官网安装包下载速度慢的问题。 &#x1f60e; 作者介绍&#xff1a;我是程序员洲洲&#xff0c;一个热爱写作的非著名程序员。CSDN全栈优质领域创作者、华为云博客社区云享专家、阿里云博客社区专家博主、前后端开发、人工智能研究…

47、springboot 的 国际化消息支持--就是根据浏览器选择的语言,项目上的一些提示信息根据语言的选择进行对应的显示

springboot的国际化也是基于spring mvc 的。 springboot 的 国际化消息支持–就是根据浏览器选择的语言&#xff0c;项目上的一些提示信息根据语言的选择进行对应的显示。 总结下国家化自动配置&#xff1a; 功能实现就是&#xff1a; 比如一个登录页面&#xff0c;我们在浏览…

ORB-SLAM3复现的详细过程——配置安装及ROS和脚本运行---Ubuntu20.04

ORB-SLAM3配置安装及ROS和脚本运行---Ubuntu20.04 1. 安装所需要的依赖和包2. 修改代码及文件内容2.1 CMakeLists.txt文件的修改2.2 单目可视化代码修改2.3 环境配置文件的修改2.4 源码的修改 3.ORB-SLAM3的编译3.1 构建 ORB-SLAM3 库3.2 生成ROS节点 4.ORB-SLAM3的运行4.1 非R…

ModaHub魔搭社区专访百度智能云李莅:您认为向量数据库是一个刚需产品吗?

ModaHub魔搭社区&#xff1a;可以看到&#xff0c;大模型火了以后&#xff0c;向量数据库受到了特别高的关注&#xff0c;您是如何看待这种现象呢&#xff1f;您认为向量数据库是一个刚需产品吗&#xff1f; 李莅&#xff1a;是的。大模型是在今年才崭露头角&#xff0c;或者说…

【Python Odyssey】1-1 | Python初见面

文章目录 Python 由来Python 的历史Python 的优点Python的缺点Python 运用领域Python 环境配置Python 解释器安装确认Python的版本编写Python源代码运行程序 Python 由来 Python 是 吉多范罗苏姆&#xff08;Guido van Rossum&#xff09;在阿姆斯特丹为了打发无聊的圣诞节&…

三维点云转换为二维图像

文章目录 前言原理代码总结与反思实验结果展示 前言 目的&#xff1a;将三维点云转换为二维图像 作用&#xff1a; a.给点云赋予彩色信息&#xff0c;增强点云所表达物体或对象的辨识度&#xff1b; b.将三维点云中绘制的目标物体通过映射关系绘制到二维图像中&#xff0c;这个…

Lesson5-1:OpenCV视频操作---视频读写

学习目标 掌握读取视频文件&#xff0c;显示视频&#xff0c;保存视频文件的方法 1 从文件中读取视频并播放 在OpenCV中我们要获取一个视频&#xff0c;需要创建一个VideoCapture对象&#xff0c;指定你要读取的视频文件&#xff1a; 创建读取视频的对象 cap cv.VideoCapt…

Nginx安装与部署

文章目录 一,说明二,下载三,Windows下安装1,安装2,启动3,验证 四,Linux下安装1,安装2,启动3,验证 五,Nginx配置 一,说明 Nginx是一款高性能Web和反向代理服务器,提供内存少,高并发,负载均衡和反向代理服务,支持windos和linux系统 二,下载 打开浏览器,输入地址: https://ngin…

【小沐学NLP】Python使用NLTK库的入门教程

文章目录 1、简介2、安装2.1 安装nltk库2.2 安装nltk语料库 3、测试3.1 分句分词3.2 停用词过滤3.3 词干提取3.4 词形/词干还原3.5 同义词与反义词3.6 语义相关性3.7 词性标注3.8 命名实体识别3.9 Text对象3.10 文本分类3.11 其他分类器3.12 数据清洗 结语 1、简介 NLTK - 自然…