diskqueue怎么写入消息,怎么对外发送消息

news2024/11/24 14:42:32

nsq中diskqueue是nsq消息持久化的核心,内容较多,一共分为多篇

1. diskqueue是什么,为什么需要它,整体架构图,对外接口_YZF_Kevin的博客-CSDN博客

2. diskqueue的元数据文件,数据文件,启动入口,元数据文件读写和保存_YZF_Kevin的博客-CSDN博客

3. diskqueue的数据定义,运转核心ioloop()源码详解_YZF_Kevin的博客-CSDN博客

4. diskqueue怎么写入消息,怎么对外发送消息_YZF_Kevin的博客-CSDN博客

第三篇博客中我们讲了diskqueue的定义,核心的ioloop()函数处理,这篇博客我们讲diskqueue对消息的写入,读取,对外发送等流程

1. diskqueue对写入消息时的处理

diskqueue对外提供的写入消息接口为

Put([]byte) error

这个接口的实现不复杂,就是利用writeChan通道,把传进来的数据压入写入到当前在写文件,会提前检测,如果发现写入数据后当前文件会超上限,那就不写了,关闭当前的写文件,新开一个写入文件从0位置开始写

源码如下(已添加详细注释)

// 把指定数据压入接收队列
func (d *diskQueue) Put(data []byte) error {
	d.RLock()
	defer d.RUnlock()

	// 如果队列正在退出,返回吧
	if d.exitFlag == 1 {
		return errors.New("exiting")
	}

	d.writeChan <- data			// 压入接收队列,因为无缓冲,所以会阻塞等待ioloop()循环中取走才会返回
	return <-d.writeResponseChan// 阻塞等待结果,ioloop()从writeChan读取执行后会立马出结果
}

可以看到Put()函数操作很简单,核心操作就是往d.writeChan中压入数据,由于d.writeChan是无缓冲通道,所以会阻塞等待ioloop()函数中select的取走

ioloop()函数中select对此的处理如下

case dataWrite := <-d.writeChan: // 新接收到消息
    count++    // 每收到一个消息,count值+1
    d.writeResponseChan <- d.writeOne(dataWrite) // 消息写入到文件缓冲区,结果压入返回通道

select对此的处理也很简单,count值加1后,调用了d.writeOne(dataWrite),然后把writeOne()的结果写入writeResponseChan 

我们看下writeOne()函数的处理,源码如下(已添加详细注释)

// 写入一个消息到缓冲区(函数内部会自动创建新文件)
func (d *diskQueue) writeOne(data []byte) error {
	var err error

	dataLen 	:= int32(len(data))		// 数据长度
	totalBytes 	:= int64(4 + dataLen)	// 本次写入的总字节数(4字节的数据长度 + 真正数据部分)

	// 数据大小检测
	if dataLen < d.minMsgSize || dataLen > d.maxMsgSize {
		return fmt.Errorf("invalid message write size (%d) minMsgSize=%d maxMsgSize=%d", dataLen, d.minMsgSize, d.maxMsgSize)
	}

	// 如果加上本次写入量会超过文件最大限制,就关闭当前文件,创建新的
	if d.writePos > 0 && d.writePos+totalBytes > d.maxBytesPerFile {
		// 如果当前已经在读这个文件
		if d.readFileNum == d.writeFileNum {
			d.maxBytesPerFileRead = d.writePos	// 标识当前文件的最大可读字节数即writePos(因为不再写入这个文件了,下面会往新文件里面写了)
		}
		d.writeFileNum++	// 新文件编号
		d.writePos = 0		// 新文件的写入起始点
		// 当前文件的内容刷到磁盘
		err = d.sync()
		if err != nil {
			d.logf(ERROR, "DISKQUEUE(%s) failed to sync - %s", d.name, err)
		}
		// 关闭当前文件
		if d.writeFile != nil {
			d.writeFile.Close()
			d.writeFile = nil
		}
	}

	// 要写的文件还不存在,新建
	if d.writeFile == nil {
		// 格式化文件名
		curFileName := d.fileName(d.writeFileNum)
		// 创建文件
		d.writeFile, err = os.OpenFile(curFileName, os.O_RDWR|os.O_CREATE, 0600)
		if err != nil {
			return err
		}
		d.logf(INFO, "DISKQUEUE(%s): writeOne() opened %s", d.name, curFileName)
		// 如果已有写入位置
		if d.writePos > 0 {
			_, err = d.writeFile.Seek(d.writePos, 0) // 偏移文件游标,0表从文件开头进行偏移
			if err != nil {
				d.writeFile.Close()
				d.writeFile = nil
				return err
			}
		}
	}

	d.writeBuf.Reset()

	// 先把数据长度(4字节)写入buf
	err = binary.Write(&d.writeBuf, binary.BigEndian, dataLen)
	if err != nil {
		return err
	}
	// 再把数据写入buf
	_, err = d.writeBuf.Write(data)
	if err != nil {
		return err
	}
	// 把buf写入(注意这里其实是写入到文件的缓冲区,并没有刷到磁盘中,只有调用writeFile.fsync()才是真正刷到磁盘)
	_, err = d.writeFile.Write(d.writeBuf.Bytes())
	if err != nil {
		d.writeFile.Close()
		d.writeFile = nil
		return err
	}

	d.writePos 	+= totalBytes	// 更新写入位置
	d.depth		+= 1			// 更新消息数

	return err
}

对上面的writeOne()函数源码解释下

1. 每个消息的前面4字节存放消息大小,每次写入数据 = 4字节 + 消息

2. 每个消息的大小需在当初设置的 minMsgSize 和 maxMsgSize之间

3. 如果加上本次写入量,在写文件会超最大限制,就关闭当前文件,writeFileNum+1后,新开一个文件从0写

4. 如果写入的文件还没打开,就打开文件

5. 写入消息流程:先写入4字节表消息大小,再写入消息

6. 最后更新写入位置,消息总数

2. diskqueue对读取消息时的处理

diskqueue对外提供的读取消息接口为

ReadChan() <-chan []byte

具体实现如下

// 返回读消息的通道(外部只读)
func (d *diskQueue) ReadChan() <-chan []byte {
	return d.readChan
}

也就是返回d.readChan,这是一个只读的通道,元素类型为[]byte

大家回顾下初始化diskqueue时New()函数,d.readChan是一个无缓冲的通道,如下

所以往d.readChan中压入数据后,必须等待外部读取后才能返回

为了避免阻塞,压入方,读取方都是在select中执行

好了,我们看diskqueue对readChan是怎么使用的吧

再回顾下ioloop()循环中对于,读通道,读消息的处理

当时也讲了,这里充分利用了golang中通道的特性,select对于为nil的通道会直接跳过

核心点:

1. 有消息可读时,就给r赋值为d.readChan,读取到的数据dataRead就能压入到r

2. 无消息可读时,r就会被赋值为nil,select会直接跳过。外部的读取发现通道为nil也会直接跳过

现在我们看下消息可读条件的判断,两个条件:

1. d.readFileNum < d.writeFileNum 即读的是已写入完成的文件,当然可以放心读

2. d.readPos < d.writePos 即读写的是同一个文件,但读的位置比写的位置小,说明已经有消息写入后未处理,也可以读

好,下面开始看真正的读取消息函数readOne(),源码如下(已加详细注释)

// 读取一个最早的消息,以[]byte格式返回
func (d *diskQueue) readOne() ([]byte, error) {
	var err error
	var msgSize int32

	// 没有读文件指针,那就新打开文件
	if d.readFile == nil {
		curFileName := d.fileName(d.readFileNum) // 要读的文件名字
		// 以只读的方式打开文件
		d.readFile, err = os.OpenFile(curFileName, os.O_RDONLY, 0600)
		if err != nil {
			return nil, err
		}

		d.logf(INFO, "DISKQUEUE(%s): readOne() opened %s", d.name, curFileName)

		// 跳到读的游标点
		if d.readPos > 0 {
			_, err = d.readFile.Seek(d.readPos, 0)
			if err != nil {
				d.readFile.Close()
				d.readFile = nil
				return nil, err
			}
		}

		// 赋值当前文件的最大可读字节数
		d.maxBytesPerFileRead = d.maxBytesPerFile // 默认是文件极限值
		// 如果是已写完的文件,必须要用实际文件大小
		if d.readFileNum < d.writeFileNum {
			stat, err := d.readFile.Stat()
			if err == nil {
				d.maxBytesPerFileRead = stat.Size()
			}
		}

		// 使用文件对象构建reader
		d.reader = bufio.NewReader(d.readFile)
	}

	// 先读消息大小(大端方式,4字节)
	err = binary.Read(d.reader, binary.BigEndian, &msgSize)
	if err != nil {
		d.readFile.Close()
		d.readFile = nil
		return nil, err
	}
	// 如果消息大小错误,报错返回吧
	if msgSize < d.minMsgSize || msgSize > d.maxMsgSize {
		d.readFile.Close()
		d.readFile = nil
		return nil, fmt.Errorf("invalid message read size (%d)", msgSize)
	}

	// 再读消息体
	readBuf := make([]byte, msgSize)
	_, err = io.ReadFull(d.reader, readBuf)
	if err != nil {
		d.readFile.Close()
		d.readFile = nil
		return nil, err
	}

	totalBytes := int64(4 + msgSize)
	d.nextReadPos 		= d.readPos + totalBytes	// 更新下次读取点(不能直接把readPos后移,是因为消息由外部接收了才算处理,那时候readPos才能往后移;外部没接收的话,)
	d.nextReadFileNum 	= d.readFileNum				// 更新下次读取文件名(默认还是本文件,是否跳新文件下面判断)

	// 如果是之前已写完的文件,并且下次读取点超过本文件可读最大值,说明已经读完了,那下次就读下一个文件
	if d.readFileNum < d.writeFileNum && d.nextReadPos >= d.maxBytesPerFileRead {
		// 关闭当前文件
		if d.readFile != nil {
			d.readFile.Close()
			d.readFile = nil
		}
		d.nextReadFileNum++	// 标记下次读新文件
		d.nextReadPos = 0	// 新文件肯定从0开始读了
	}

	return readBuf, nil
}

对上面的代码解释下

1. readFile为nil,说明指向的文件还没打开,就执行打开操作,再调用seek()函数跳转到读取位置

2. 取到readFile指向文件的实际大小,如果是已经写完的文件,则需标记只能读这么多

3. 读消息时,先读4字节表消息大小,再读消息体

4. 计算出下次的读取点(但不可直接后移,需等外面取走数据),如果超过了当前文件最大可读值,就读下一个文件

3. 本篇总结

这篇博客,我们详细讲了diskqueue的写入消息,读取消息,对外发送消息的机制

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.coloradmin.cn/o/974715.html

如若内容造成侵权/违法违规/事实不符,请联系多彩编程网进行投诉反馈,一经查实,立即删除!

相关文章

成都睿趣科技:现在开一家抖音小店还来得及吗

随着社交媒体的迅猛发展&#xff0c;抖音已经成为了一个全球范围内广受欢迎的社交平台。在这个短视频应用上&#xff0c;人们分享着各种各样的内容&#xff0c;从搞笑段子到美食教程&#xff0c;再到时尚搭配和手工艺品制作。随着用户数量的不断增长&#xff0c;很多人都在思考…

Python第三方库 - matplotlib库

1 matplotlib了解 Matplotlib 可能是 Python 2D - 绘图领域使用最广泛的套件。它能让使用者很轻松地将数据图形化&#xff0c;并且提供多样化的输出格式。这里将会探索 matplotlib 的常见用法。 2 matplotlib学习 2.1 引用 plt 表示当前子图&#xff0c;若没有就创建一个子图 …

x64dbg的安装

一、安装地址&#xff1a; 地址 解压目录 点击x96dbf.exe 二、使用 1.反汇编窗口 这个位置显示的是需要分析的程序的反汇编代码。在第一个区域的最左侧例如“7712EAA3”这一列就是内存地址区域&#xff0c;接着“E8 07”就是汇编指令的opcode&#xff0c;“jmp xxxxxxxxx”这…

天津Java培训机构 Java的发展空间如何?

在当今互联网时代&#xff0c;计算机技术的发展日新月异&#xff0c;越来越多人看到IT行业的广泛前景&#xff0c;纷纷想要转行成为一名程序员&#xff0c;作为一名IT从业人员&#xff0c;学习一门编程语言是必不可少的&#xff0c;而在众多编程语言中&#xff0c;Java无疑是较…

存储数据恢复- raid5多块硬盘出现坏道的数据恢复案例

存储数据恢复环境&#xff1a; 某单位一台存储&#xff0c;1个机头4个扩展柜&#xff0c;有两组分别由27块和23块硬盘组建的RAID5阵列。其中由27块磁盘组建的那一组RAID5阵列崩溃&#xff0c;这组RAID5阵列存放是Oracle数据库文件。存储系统上层共划分了11个卷。 存储故障&…

【工具】Linux下常用录屏软件

&#x1f41a;作者简介&#xff1a;花神庙码农&#xff08;专注于Linux、WLAN、TCP/IP、Python等技术方向&#xff09;&#x1f433;博客主页&#xff1a;花神庙码农 &#xff0c;地址&#xff1a;https://blog.csdn.net/qxhgd&#x1f310;系列专栏&#xff1a;善假于物&#…

深圳企业宣传片怎么做

要拍摄企业宣传片&#xff0c;首先要搞清楚客户宣传片的目的和用途&#xff0c;然后根据自身情况拟定预算以及制作周期&#xff0c;再与甲方沟通具体需求&#xff0c;最后进入制作流程。整体制作流程可以分为以下步骤&#xff0c;由深圳企业宣传片制作公司老友记小编为您解答&a…

LeetCode--HOT100题(48)

目录 题目描述&#xff1a;437. 路径总和 III&#xff08;中等&#xff09;题目接口解题思路代码 PS: 题目描述&#xff1a;437. 路径总和 III&#xff08;中等&#xff09; 给定一个二叉树的根节点 root &#xff0c;和一个整数 targetSum &#xff0c;求该二叉树里节点值之和…

Python数据分析实战-将字符串中的空格替换为逗号且要保留特定词组(附源码和实现效果)

实现功能 将字符串中的空格替换为逗号且要保留特定词组 实现代码 import restring "Linux Python Cloud Native Distributed System AI C Deep Learning Framework Micro Service Automation Git IoT"# 定义要保留的特定词组 special_phrases ["Deep Learn…

新手可以选黄金代理吗?

我们都知道选择现货黄金平台的时候&#xff0c;一定要选择一个正规的、合法的平台&#xff0c;这样投资者才可以安心进行交易&#xff0c;但是目前市面上我们看到很多的是黄金代理&#xff0c;而不是直接与现货黄金平台发生接触&#xff0c;那么&#xff0c;这种黄金代理在市场…

如何根据需求正确选择适合企业的CRM销售管理系统

现代企业的销售工作离不开使用各种各样的销售管理系统&#xff0c;随着互联网的发展&#xff0c;市面上出现了许多销售管理系统&#xff0c;那么销售管理系统哪种好呢&#xff1f;如何选择一款适合自己企业的CRM销售管理系呢&#xff1f;本文将从多个角度进行分析和比较为大家提…

nc前端合计行

nc前端合计行 1.无表体和单表体的合计行加法 只要卡片下 如果是只有表头要合计行就只留ShowTotalLine&#xff1b;如果是只有表体要合计行就只留ShowTotalLineTabcodes 2.多表体的合计行加法 表头卡片下和列表下都要 3.档案的合计行加法 重写一下列表模板

C++(18):异常处理

异常处理机制允许程序中独立开发的部分能够在运行时就出现的问题进行通信并做出相应的处理。 异常使得能够将问题的检测与解决过程分离开来&#xff1a;程序的一部分负责检测问题的出现&#xff0c;然后解决该问题的任务传递给程序的另一部分。检测环节无须知道问题处理模块的…

基于 Web HID API 的HID透传测试工具(纯前端)

前言 最近再搞HID透传 《STM32 USB使用记录&#xff1a;HID类设备&#xff08;后篇&#xff09;》 。 市面上的各种测试工具都或多或少存在问题&#xff0c;所以就自己写一个工具进行测试。目前来说纯前端方案编写这个工具应该是最方便的&#xff0c;这里放上相关代码。 项目…

通过idea实现springboot集成mybatys

概述 使用springboot 集成 mybatys后&#xff0c;通过http请求接口&#xff0c;使得通过http请求可以直接直接操作数据库&#xff1b; 完成后端功能框架&#xff1b;前端是准备上小程序&#xff0c;调用https的请求接口用。简单实现后端框架&#xff1b; 详细 springboot 集…

qt中子窗口最小化后再恢复显示窗口区域显示为全白色

问题&#xff1a; qt中子窗口最小化后再恢复显示窗口区域显示为全白色&#xff0c;如下图&#xff1a; 原因&#xff1a; 恢复显示后窗口为及时刷新。 解决办法&#xff1a; 重写showEvent函数&#xff0c;如下&#xff1a; void MyClass::showEvent(QShowEvent *event) {se…

OS | 第5章 插叙:进程API

OS | 第5章 插叙&#xff1a;进程API 文章目录 OS | 第5章 插叙&#xff1a;进程API5.1 fork()系统调用代码过程分析 5.2 wait()系统调用5.3 exec() 系统调用执行过程 为什么这样设计API&#xff1f;shell执行过程shell 重定向pipe()>>>>> 欢迎关注公众号【三戒…

YOLOv5:解读metrics.py

YOLOv5&#xff1a;解读metrics.py 前言前提条件相关介绍metrics.pyfitnesssmoothbox_iouConfusionMatrix ★ ★ \bigstar\bigstar ★★bbox_iou ★ ★ \bigstar\bigstar ★★compute_apap_per_class&#xff08;难度&#xff1a; ⋆ ⋆ ⋆ ⋆ ⋆ \star\star\star\star\star ⋆…

openpnp - 底部相机高级矫正后,底部相机看不清吸嘴的解决方法

文章目录 openpnp - 底部相机高级矫正后,底部相机看不清吸嘴的解决方法概述解决思路备注补充 - 新问题 - N1吸嘴到底部相机十字中心的位置差了很多END openpnp - 底部相机高级矫正后,底部相机看不清吸嘴的解决方法 概述 自从用openpnp后, 无论版本(dev/test), 都发现一个大概…

mac建议装双系统吗,详细分析苹果电脑双系统的利弊

mac建议装双系统吗&#xff0c;Mac电脑上安装双系统有哪些利弊呢&#xff0c;一起来看看吧&#xff01; 苹果Mac电脑安装双系统利&#xff1a; 1、用来办公更加方便&#xff1a;苹果系统功能也是很强大的&#xff0c;但是用来办公非常不方便&#xff0c;是由于一些常用的exe软…