diskqueue第五篇 - 追尾检测,错误处理,如何正常关闭

news2025/3/13 7:31:48

diskqueue是nsq消息持久化的核心,内容较多,故分为多篇

1. diskqueue第一篇 - 是什么,为什么需要它,整体架构图,对外接口

2. diskqueue第二篇 - 元数据文件,数据文件,启动入口,元数据文件的读写及保存

3. diskqueue第三篇 - 数据定义详解,运转核心ioloop()源码详解

4. diskqueue第四篇 - 怎么写入消息,怎么对外发送消息

5. diskqueue第五篇 - 追尾检测,错误处理,如何正常关闭

6. diskqueue第六篇 - 如何使用diskqueue

经过前面四篇的介绍,相信大家对diskqueue的整体架构,运转逻辑应该有很清楚的了解了,这篇博客做个总结,先给大家看下diskqueue内部如何进行追尾检测,错误处理,最后给大家讲下如何正常关闭一个diskqueue

1. diskqueue追尾检测

在第一篇博客中我们就讲了diskqueue的整体架构图,也就是前面写,后面读,如下

写快读慢时,结果是消息积累,消息文件越来越多

写慢读快时,只要持续的时间够长,读的位置一定会追上写的位置,也就是我们要讲的追尾

在运转核心 ioloop()函数中,我们可以看到,读取的数据会压入通道readChan,当外部从readChan取走后,select中会响应,代码如下(这里仅给出核心代码片段)

case r <- dataRead: // 把读出来的这个消息压入到readChan,外部从readChan取走以后,这里会立即返回
    // 每取走一个消息,count值+1
    count++
    // 重新计算下次读位置,读文件号
    d.moveForward()

响应操作

1. count值加1

2. 调用moveForward()函数,该函数内部会重新设置readPos的值,readFileNum的值

实现如下(已添加详细注释)

// 向前移动一个消息,前进读位置和读文件号(读出来的消息成功投递给外部后,会调用本函数)
func (d *diskQueue) moveForward() {
	oldReadFileNum := d.readFileNum
	d.readFileNum 	= d.nextReadFileNum	// 下次要读的文件号
	d.readPos 		= d.nextReadPos		// 下次要读的位置
	d.depth 		-= 1				// 未处理消息数减1
	// 如果旧的读文件号和新的读文件号不一致,说明旧文件已读完,可以删除了,下次开始读新文件
	if oldReadFileNum != d.nextReadFileNum {
		d.needSync = true				// 当我们开始读新文件的时候,强制把写的数据同步一次磁盘

		fn := d.fileName(oldReadFileNum)// 旧文件名字
		err := os.Remove(fn)			// 删除旧文件
		if err != nil {
			d.logf(ERROR, "DISKQUEUE(%s) failed to Remove(%s) - %s", d.name, fn, err)
		}
	}
	// 检测有没有追尾问题
	d.checkTailCorruption(d.depth)
}

可以看到,刷新完readPos,readFileNum,读完消息删除文件后

最后调用了checkTailCorruption()函数,也就是我们要讲的追尾函数,代码如下(已添加详细注释)

// 检测追尾问题
func (d *diskQueue) checkTailCorruption(depth int64) {
	// 读文件比写文件的编号小 或 读位置比写位置小,说明没追尾,中间还有消息,这是正常情况
	if d.readFileNum < d.writeFileNum || d.readPos < d.writePos {
		return
	}

	// 到这里:要么消息已全部读完(即发生追尾),要么出错
	if depth != 0 { // depth应该为0,如果不为0,说明出错,强制置零吧
		if depth < 0 {
			d.logf(ERROR, "DISKQUEUE(%s) negative depth at tail (%d), metadata corruption, resetting 0...", d.name, depth)	// depth<0说明元数据错了
		} else if depth > 0 {
			d.logf(ERROR, "DISKQUEUE(%s) positive depth at tail (%d), data loss, resetting 0...", d.name, depth)				// depth>0说明消息丢了
		}
		d.depth 	= 0		// 消息数强制置0
		d.needSync	= true  // 强制同步一次磁盘
	}

	// 读文件号不等于写文件号 或者 读位置不等于写位置,说明严重错误
	if d.readFileNum != d.writeFileNum || d.readPos != d.writePos {
		// 读的文件号 > 写的文件号
		if d.readFileNum > d.writeFileNum {
			d.logf(ERROR,"DISKQUEUE(%s) readFileNum > writeFileNum (%d > %d), corruption, skipping to next writeFileNum and resetting 0...", d.name, d.readFileNum, d.writeFileNum)
		}
		if d.readPos > d.writePos {
			d.logf(ERROR,"DISKQUEUE(%s) readPos > writePos (%d > %d), corruption, skipping to next writeFileNum and resetting 0...", d.name, d.readPos, d.writePos)
		}
		d.skipToNextRWFile()// 删除所有的消息文件
		d.needSync = true	// 强制同步一次磁盘
	}
}

对上面的代码解释下

1. 正常可读(即没有发生追尾)的两个条件:

要么readFileNum(读文件号)小于writeFileNum(写文件号),说明还有很多消息

要么readFile等于writeFileNum,即读写同一个文件,但需readPos(读位置)小于writePos(写位置)

2. 如果不满足上面的两个可读条件,那很可能就是发生追尾(读写同一个文件,且读写位置一样,且depth为0,也就是未处理消息数为0)

3. 也有可能是其他错误

    3.1 如果depth不为0,无论是>0,还是<0,都强制置零

    3.2 如果 readFileNum > writeFileNum,readPos>writePos,都属于严重错误,删除所有数据吧

2. diskqueue的错误处理

因为整个diskqueue是建立在多协程读写的基础上,读写分离,且元数据和消息数据分离,所以很可能会发生各种各样的异常,都会造成无法正常运转的错误,因此错误处理必不可少

刚才讲的追尾处理函数checkTailCorruption()中也有对错误的处理,比如发生追尾时,depth值错误,readFileNum>writeFileNm,readPos>writePos等等

现在我们开始讲读取消息时的错误处理,先看下触发位置,在ioloop函数中,代码如下

		if (d.readFileNum < d.writeFileNum) || (d.readPos < d.writePos) {
			// 读位置已经移动(说明外部取走了数据),才可读下一个消息
			if d.nextReadPos == d.readPos {
				dataRead, err = d.readOne()
				if err != nil {
					d.logf(ERROR, "DISKQUEUE(%s) reading at %d of %s - %s", d.name, d.readPos, d.fileName(d.readFileNum), err)
					// 读出错时的处理
					d.handleReadError()
					continue
				}
			}
        }

在读取消息函数readOne()之后,发现返回的errr不为nil,则调用函数handleReadError()进行读错误处理

我们先总结下函数readOne()有哪些地方可能返回错误,切记,函数readOne()是正常未追尾的情况下才会调用

在博客 diskqueue第四篇 - 怎么写入消息,怎么对外发送消息 中有对函数readOne()的详解

这里不再重复,直接对函数readOne()的错误给出总结

1、打开读文件出错,跳转文件指针位置出错

2、读取消息大小错误,消息大小范围错误

3、读取消息数据错误

大家可以看下上面的几种错误,不能通过重试或数据修正来解决,也就是说没法挽救,所以diskqueue对此处理也很简单,既然挽救不了,那就不要这个文件了,直接跳过

现在看下diskqueue的handleReadError()函数,内部实现就是放弃这个文件,打印下错误日志,读下一个文件,源码如下(已添加详细注释)

// 读文件出错时的处理
func (d *diskQueue) handleReadError() {
	// 处理规则:把当前读的文件重命名为xxx.bad,读下一个文件
	// 如果读写的是同一个文件,又是出错,那写文件也新开一个写吧
	if d.readFileNum == d.writeFileNum {
		// 关闭当前写文件
		if d.writeFile != nil {
			d.writeFile.Close()
			d.writeFile = nil
		}
		// 新开一个写文件(后面发现writeFile为nil时会创建文件)
		d.writeFileNum++
		d.writePos = 0
	}

	badFn := d.fileName(d.readFileNum)	// 当前读的文件名
	badRenameFn := badFn + ".bad"
	d.logf(WARN,"DISKQUEUE(%s) jump to next file and saving bad file as %s", d.name, badRenameFn)
	err := os.Rename(badFn, badRenameFn)// 把当前读的文件重命名为xxx.bad
	if err != nil {
		d.logf(ERROR,"DISKQUEUE(%s) failed to rename bad diskqueue file %s to %s", d.name, badFn, badRenameFn)
	}

	// 新开一个读文件
	d.readFileNum++
	d.readPos = 0
	d.nextReadFileNum = d.readFileNum
	d.nextReadPos = 0
	// 重大的状态变更, 下一步同步一次文件
	d.needSync = true
	// 再检测一次是否追尾
	d.checkTailCorruption(d.depth)
}

对上面的代码,再解释下

1. 如果读写的是同一个文件,又是出错,那写操作也新开一个文件重新写

2. 打印一行错误,把这个读出错的文件加个.bad后缀,标明是出错的文件,文件也不删除,留给用户自己解决吧

3. 新打开一个读文件,从0开始读

4. 强制同步一次(如果读写是不同的文件,写操作同步到文件以供读取),再检测一次是否追尾

3. 如何正常关闭一个diskqueue

结论:调用Close()可以正常关闭一个diskqueue

Close()函数的实现也很简单,关闭协程ioloop(),关闭读写文件,最后保存

给大家看下Close()函数内部做了哪些操作,源码如下

// 关闭队列(有保存数据)
func (d *diskQueue) Close() error {
	err := d.exit(false) // 退出
	if err != nil {
		return err
	}
	return d.sync() // 保存数据
}

Close()函数第一步:调用了exit()函数进行退出操作

exit()函数源码如下(已添加详细注释)

// 退出操作
func (d *diskQueue) exit(deleted bool) error {
	d.Lock()
	defer d.Unlock()

	// 标记正在进行退出操作
	d.exitFlag = 1
	// 你会发现本函数内deleted标记仅仅只是打印的区别,并没有真正执行什么删除操作,是因为Close()在exit()之后又调用了sync()进行保存,而Delete()仅调用exit()
	if deleted {
		d.logf(INFO, "DISKQUEUE(%s): deleting", d.name)
	} else {
		d.logf(INFO, "DISKQUEUE(%s): closing", d.name)
	}

	// 关闭退出通道(ioloop()协程的的select都会收到信号,处理就是退出其协程)
	close(d.exitChan)
	// 阻塞等待ioloop()协程退出
	<-d.exitSyncChan
	// 关闭消息个数通道(如果此时外部取消息个数,因为从通道读取失败,会直接返回d.depth)
	close(d.depthChan)

	// 关闭读文件
	if d.readFile != nil {
		d.readFile.Close()
		d.readFile = nil
	}
	// 关闭写文件
	if d.writeFile != nil {
		d.writeFile.Close()
		d.writeFile = nil
	}
	return nil
}

对上面的代码解释下

1. 因为需把exitFlag置1,有读写操作,所以加了写锁

2. 关闭通道exitChan,协程ioloop()会收到信号,退出其协程

3. 关闭读文件,写文件

Close()函数第二步:调用sync()函数,保存消息数据,保存元数据

sync()函数的源码如下(已添加详细注释)

// 把所有数据同步到磁盘
func (d *diskQueue) sync() error {
	// 保存消息部分
	if d.writeFile != nil {
		err := d.writeFile.Sync() // 真正刷新到磁盘
		if err != nil {
			d.writeFile.Close()	// 如果出错,也要即时关闭写入文件
			d.writeFile = nil
			return err
		}
	}
	// 保存元数据部分
	err := d.persistMetaData()
	if err != nil {
		return err
	}
	// 标记本次同步已完成,后面不需要再同步了
	d.needSync = false
	return nil
}

4. 本篇总结

本篇博客从源码角度讲了diskqueue对读写时追尾的检测和处理,对读出错时的处理方法,希望让大家明白对于文件操作错误的处理方法和技巧。最后介绍了如何正常关闭一个diskqueue,也就是调用Close()函数,并分析了Close()函数的原理

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.coloradmin.cn/o/997688.html

如若内容造成侵权/违法违规/事实不符,请联系多彩编程网进行投诉反馈,一经查实,立即删除!

相关文章

熟悉Redis6

NoSQL数据库简介 技术发展 技术的分类 1、解决功能性的问题&#xff1a;Java、Jsp、RDBMS、Tomcat、HTML、Linux、JDBC、SVN 2、解决扩展性的问题&#xff1a;Struts、Spring、SpringMVC、Hibernate、Mybatis 3、解决性能的问题&#xff1a;NoSQL、Java线程、Hadoop、Nginx…

嵌入式-Linux基本操作 pwd cd ls touch clear mkdir rm cp mv

目录 一.Linux文件系统 二.Linux目录结构 三.Linux基本命令 3.1shell脚本 3.2pwd命令 3.3cd命令 3.4ls命令 3.5touch命令 3.6clear命令 3.7mkdir命令 3.8rm命令 3.9cp命令 3.10mv命令 一.Linux文件系统 Linux文件系统是Linux操作系统中用于组织和管理文件和目录的…

【算法训练-链表 六】【查找】:链表中倒数第k个节点

废话不多说&#xff0c;喊一句号子鼓励自己&#xff1a;程序员永不失业&#xff0c;程序员走向架构&#xff01;本篇Blog的主题是【查找链表】&#xff0c;使用【链表】这个基本的数据结构来实现&#xff0c;这个高频题的站点是&#xff1a;CodeTop&#xff0c;筛选条件为&…

C盘清理教程

C盘清理教程 首先使用space Sniffer 扫一下c盘&#xff0c;然后看一下到底是哪个文件这么大 第二步&#xff0c;创建软链接。 首先将我们需要移动的文件的当前路径拷贝下来&#xff1a;C:\Users\Tom\Desktop\test-link\abc\ghi.txt 然后假设剪切到D盘下&#xff1a;D:\ghi.…

MOOC软件系统外包开发

MOOC&#xff08;大规模开放在线课程&#xff09;系统是用于创建、管理和交付在线教育课程的软件平台。这些系统通常具有多种功能&#xff0c;旨在支持大规模的在线学习。以下是MOOC系统主要实现的功能以及一些常见的开源系统&#xff0c;希望对大家有所帮助。北京木奇移动技术…

vite项目框架搭建

vite项目框架搭建 1. 使用vite初始化项目 开始 | Vite 官方中文文档 (vitejs.dev) pnpm create vite # 依次设置项目名称、选择框架【vue】、选择语言【typescript】 √ Project name: ... vite-project √ Select a framework: Vue √ Select a variant: TypeScript2. ele…

【strtok函数和strerror函数的介绍和使用以及扩展】

strtok函数和strerror函数的介绍和使用以及扩展 一.strtok函数 1.strtok函数介绍 资源来源于cplusplus网站 它的作用&#xff1a; 对此函数的一系列调用将 str 拆分为标记&#xff0c;这些标记是由分隔符中的任何字符分隔的连续字符序列。 在第一次调用时&#xff0c;该函数…

vue学习之Javascript 表达式内容渲染和属性绑定

Javascript 表达式内容渲染和属性绑定 创建 demo4.html,内容如下 <!DOCTYPE html> <html lang"en"><head><meta charset"UTF-8"><meta name"viewport" content"widthdevice-width, initial-scale1.0">…

Vuex核心概念 - actions 和 getters

文章目录 actions 和 getters一、actions作用使用目的&#xff1a; 二、actions的使用执行原理代码示例&#xff1a; 三、actions中的辅助函数mapActions代码示例&#xff1a; 四、核心-getters1. 什么是getters&#xff1f;2. getters的作用&#xff1a;3. 访问 getters 的两种…

Compose的一些小Tips - 可组合项的生命周期

系列文章 Compose的一些小Tips - 可组合项的生命周期&#xff08;本文&#xff09; 前言 本系列介绍Compose的一些常识&#xff0c;了解这些tips并不会让人摇身一变成为大佬&#xff0c;但可以帮助到一些学习Compose的安卓开发者避免一些误区&#xff0c;也是对入门详解中遗漏…

python的包管理

要在 mypackage 包外使用 mypackage 包里的 speak.py 文件以及 newpackage 包里的 jump.py 文件&#xff0c;你需要确保以下几个步骤&#xff1a; 确保目录结构正确&#xff0c;如下所示&#xff1a; mypackage/__init__.pyspeak.pynewpackage/__init__.pyjump.py在 speak.py…

解决在cmd中输入mongo出现‘mongo‘ 不是内部或外部命令,也不是可运行的程序 或批处理文件的问题~

当我想通过shell连接mongoDB时&#xff0c;输入mongo命令&#xff0c;出现下述错误&#xff1a; 起初我以为我是忘记配置环境变量了&#xff0c;但检查后发现自己配置了&#xff0c;如果你和我是一样的问题&#xff0c;明明配置了环境变量&#xff0c;但上述问题依然没有被解决…

Go语言的[GPM模型]

在go中,线程是运行Groutine的实体,调度器的功能是把可以运行的Groutine分配到工作线程上 GPM模型 M与P的数量没有绝对的数量关系,当一个M阻塞时,P就会创建一个或者切换到另一个M,所以即使设置了runtime.GOMAXPROCS(1) 也可能创建多个M出来; 当M发现给自己输送G协程的那个P队列为…

懒加载指令实现

问题&#xff1a;页面过长&#xff0c;下面的图片不一定访问到&#xff0c;存在一定浪费。 解决方案&#xff1a;图片懒加载&#xff0c;仅当进入视口区&#xff0c;才发送请求显示图片。 全局指令 // 全局指令 app.directive(指令名称,{mounted(el, binding){// el:指令绑定…

《JUC》万万万万字长文解析!

JUC 四万字长文解析 juc&#xff0c;涵盖线程、内存模型、锁、线程池、原子类、同步器、并发容器、并发编程模式、并发编程应用等。 版本: jdk: 11spring boot: 2.7.0 JUC 是 java.util.concurrent 包的缩写&#xff0c;是 java 提供的用来并发编程的工具包。juc 提供了多种用…

ClickHouse的Join算法

ClickHouse的Join算法 ClickHouse是一款开源的列式分析型数据库&#xff08;OLAP&#xff09;&#xff0c;专为需要超低延迟分析查询大量数据的场景而生。为了实现分析应用可能达到的最佳性能&#xff0c;分析型数据库&#xff08;OLAP&#xff09;通常将表组合在一起形成一个…

MOS管为什么会存在寄生电感

说到MOS管的寄生参数&#xff0c;我们一般都只想到mos管各极间的寄生电容&#xff0c;很少会想到MOS管的寄生电感。 其实分立的MOS管它是存在寄生电感的&#xff0c;并且栅极&#xff0c;源极和漏极都存在。 在一些MOS的数据手册会提到这个寄生电感。 那么MOS管寄生电感是怎么产…

9月4日上课内容 第七章 案例:MHA高可用配置及故障切换

本章结构 案例概述 案例前置知识点 1&#xff0e;什么是 MHA&#xff08;MHA概念&#xff09; MHA&#xff08;MasterHigh Availability&#xff09;是一套优秀的MySQL高可用环境下故障切换和主从复制的软件。 MHA 的出现就是解决MySQL 单点的问题。 MySQL故障切换过程中&…

ComPtr源码分析

ComPtr源码分析 ComPtr是微软提供的用来管理COM组件的智能指针。DirectX的API是由一系列的COM组件来管理的&#xff0c;形如ID3D12Device&#xff0c;IDXGISwapChain等的接口类最终都继承自IUnknown接口类&#xff0c;这个接口类包含AddRef和Release两个方法&#xff0c;分别用…

BUUCTF easyre 1

使用die工具进行文件信息的查看 可以看到是64位程序 使用IDA64打开 f5 反汇编 得到flag