nsq中diskqueue详解 - 第一篇

news2024/11/26 20:39:20

一、diskqueue是什么,为什么需要它

在nsq中消息主要存在于两种队列

一种是内存队列,内部是用go的通道实现,所以处理速度很快,缺点是一旦nsqd进程挂掉消息就丢失了,这让人难以接受,数据丢了不得被用户骂死?

一种是持久化队列,内部实现是把消息保存在磁盘文件中,即使nsqd进程突然挂掉或者服务器重启,文件仍然存在,只需重启nsqd即可,nsqd进程会重新加载再进行投递。持久化队列也是nsq能保证消息可靠投递的重要原因之一

贴出来源码,给大家看下nsqd中的两种队列,以Topic举例,文件在nsq/nsqd/topic.go

type Topic struct {
	// 省略无关代码......
	sync.RWMutex
	name              string				// topic名字
	channelMap        map[string]*Channel	// 注册的所有的channel,k:通道名,v:通道对象
	backend           BackendQueue			// 持久化队列
	memoryMsgChan     chan *Message			// 内存中的消息队列
    // 省略无关代码......
}

字段 memoryMsgChan 即内存队列,使用go语言的chan来实现,内部元素为Message,也就是消息对象

字段 backend 即持久化队列,类型为BackendQueue,是个接口类型,每个人可以提供自己喜欢的实现。nsq作者为该接口实现的是diskqueue包,就是我们接下来两篇博客要详细讲的了

二、diskqueue地址

可能基于替换性考虑,也为了把diskqueue包提供给有数据持久化需求的人用,nsq作者并没有把代码嵌入到nsq中,而是把disqueue做成了一个独立的包,大家可以很方便地嵌入到自己的项目中

官方github地址为:GitHub - nsqio/go-diskqueue: A Go package providing a filesystem-backed FIFO queue

建议大家下载下来,边看博客,边看代码,这样效率更高

三、代码目录

大家先看下diskqueue的整体代码,有个感觉,如下图

 整个包只有两个有用的文件,

diskqueue.go 这个文件是整个diskqueue的实现代码

diskqueue_test.go 这个文件是对diskqueue的测试代码

四、diskqueue整体实现流程

有的博客一上来直接贴源码一通讲,连diskqueue是什么,做什么用,整体实现如何都没讲,让人看得云里雾里,这样效果很差。为了让大家更好更快地理解diskqueue的实现,我专门画了它的架构图和实现流程,大家可以先了解下它的整体结构,如下图

对上面处理模型解释下:

1. diskqueue为每个队列保存一份meta文件(以.meta结尾),这个文件一共就3行,描述了队列的整体信息(总的消息数,当前读文件编号,读位置,当前写文件编号)

2. diskqueue把队列的消息保存在了一个或多个数据文件(以.dat结尾)中,单个文件写入达到最大限值就新创建一个,比如某个topic的名字为TestTopic,那么最终文件名字格式如下

TestTopic.diskqueue.000001.dat

TestTopic.diskqueue.000002.dat

TestTopic.diskqueue.000003.dat

即前面是topic的名字,最后是文件编号从1开始增加

3. 这些消息文件组成了一个消息队列,前面在接收写入,后面在读取处理,处理结束的文件就删掉,这也就是为什么diskqueue可以宣称自己是FIFO的持久化队列,因为确实是队列的处理模型

4. 单个消息的格式:前面4个字节表消息体大小,后面是消息体。单个文件内所有消息按数组的格式紧密排列,虽然上面图中两个消息之间有空隙,这只是方便大家看,实际是紧挨着的,大家要注意

5. 队列头是正在写的文件,队列尾是正在读的文件

    如果写快读慢,那文件队列长度就会一直增长

    如果写慢读快,那最终会读的位置等于写的位置,像汽车追尾一样

6. nsq给每个消息文件限制最大为100M,但也有可能写入了99.7M的时候,再尝试写消息时发现若写了就超过文件最大限制,这个时候只能新创建文件。所以一个文件最终可读多少字节,要以文件的实际大小为准

五、接口介绍

diskqueue的对外接口没几个,我用的代码取自官方github 日期2023/08/08,如下(已添加注释)

// 持久化队列的所有接口,在nsq中该接口名为BackendQueue,下面的diskQueue会实现这些接口
type Interface interface {
	Put([]byte) error			// 向队列中存一个消息
	ReadChan() <-chan []byte 	// 获取一个无缓冲的只读通道(真正弹出了消息),外部使用者可以多协程读取处理
	PeekChan() <-chan []byte 	// 获取一个无缓冲的只读通道(本通道仅查看,并不算弹出消息),外部使用者可以多协程读取处理
	Close() error			 	// 关闭队列(退出,有保存数据)
	Delete() error				// 删除队列(退出,无保存数据,注意本函数并不是真正删除消息或文件,仅仅是退出。所以想真正删除数据时需先调用Empty()再调用Delete())
	Depth() int64				// 当前未处理的消息数
	Empty() error				// 清空队列(删除所有文件)
}

一共只有7个函数,我已经写了详细的注释,特别要注意的是

Put()             :        新写进一个消息,立刻有结果

ReadChan():        获取只读通道,只要还有消息,就能从该通道读消息,没有消息时该通道为nil

Empty()        :        清空队列

Delete()         :       无保存退出,并非真正的删除,函数名有点坑人

切记:如果想删除某个队列,必须先调用Empty()再调用Delete()

nsq中就是这样做的,无论是删除topic还是删除channel,对于持久化队列的处理都是先调用Empty()再调用Delete()

举例:删除topic时会调用Delete()函数,如下(已添加注释)

// 删除topic的处理(先清空所有队列,channel中的队列,再执行删除)
func (t *Topic) Delete() error {
	return t.exit(true)
}

也就是调用了exit()函数,exit()如下(已添加注释)

// topic删除/关闭的处理
// deleted	:	true表删除topic,false表nsqd正常关闭
func (t *Topic) exit(deleted bool) error {
	// 省略无关代码......
	// 删除topic
	if deleted {
		t.Lock()
		for _, channel := range t.channelMap { // 删除本topic下的所有channel
			delete(t.channelMap, channel.name)
			channel.Delete()
		}
		t.Unlock()
		// 清空队列(内存队列,调用bckend.Empty())
		t.Empty()
		return t.backend.Delete() // 再删除持久化队列
	}
    // 省略无关代码......
}

可以看到exit()函数中判断deleted为true时,

1. 对所有的channel执行Delete()

2. 再执行Empty(),该函数会对持久化队列backend执行Empty()

3. 最后对backend执行Delete()

好了,这篇博客大家先对diskqueue有个整体印象,了解其实现原理。下一篇博客我们会从源码角度进行分析diskqueue是如何实现的,它是怎么维持FIFO队列的,有哪些要注意的点等等

todo

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.coloradmin.cn/o/877709.html

如若内容造成侵权/违法违规/事实不符,请联系多彩编程网进行投诉反馈,一经查实,立即删除!

相关文章

强训第31天

选择 传输层叫段 网络层叫包 链路层叫帧 A 2^16-2 C D C 70都没收到&#xff0c;确认号代表你该从这个号开始发给我了&#xff0c;所以发70而不是71 B D C 248&123120 OSI 物理层 数据链路层 网络层 传输层 会话层 表示层 应用层 C 记一下304读取浏览器缓存 502错误网关 编…

论文总结《Adversarial Personalized Ranking for Recommendation(APR)》

原文链接 APR、 本篇论文是对抗训练在RS领域的先锋作&#xff0c;在这篇文章前对抗训练应用在图像领域&#xff0c;以提高模型鲁棒性。本篇论文填补了对抗训练在RS领域的空缺&#xff0c;首次基于BPR进行对抗训练&#xff0c;以提高RS排序模型的鲁棒性。 Motivation 文章在…

Python脚本之连接MySQL【四】

本文为博主原创&#xff0c;未经授权&#xff0c;严禁转载及使用。 本文链接&#xff1a;https://blog.csdn.net/zyooooxie/article/details/124640412 之前写了篇 Python脚本之连接MySQL【三】&#xff0c;日常使用过程中&#xff0c;代码实际有很多改动&#xff0c;特此更新…

数据分析--帆软报表--大数据大屏

进入国企公司学习有一段时间了&#xff0c;岗位是数据分析方向------ 母前使用的是帆软工具进行的开发。 可以进行大数据大屏 也可使嵌入到手机端。 下面是例子

IntelliJ IDEA快捷键大全 + 动图演示

IntelliJ IDEA快捷键大全 动图演示 &#x1f60d; 一、构建/编译Ctrl F9&#xff1a;构建项目Ctrl Shift F9&#xff1a;重新编译当前类 &#x1f60d; 二、文本编辑Ctrl X&#xff1a;剪切Ctrl C&#xff1a;复制Ctrl V&#xff1a;粘贴Ctrl Alt Shift V&#xff1a…

Nginx安装及Minio集群反向动态代理配置(二)

安装所需插件 1、安装gcc gcc是linux下的编译器在此不多做解释&#xff0c;感兴趣的小伙伴可以去查一下相关资料&#xff0c;它可以编译 C,C,Ada,Object C和Java等语言 命令&#xff1a;查看gcc版本 [rootwww ~]# gcc -v -bash: gcc: 未找到命令 说明没有安装: 那就直接yu…

mclust学习总结

mclust example1 import numpy as np import pandas as pd from matplotlib import pyplot as plt from sklearn.datasets import make_blobs X, y make_blobs(n_samples300, centers4, cluster_std0.60, random_state0) # n_features2是默认的 plt.scatter(X[:,0], X[:,1]) …

Java【数据结构】二分查找

&#x1f31e; 题目&#xff1a; &#x1f30f;在有序数组A中&#xff0c;查找目标值target &#x1f30f;如果找到返回索引 &#x1f30f;如果找不到返回-1 算法描述解释前提给定一个内含n个元素的有序数组A&#xff0c;满足A0<A1<A2<<An-1,一个待查值target1设…

如何利用 EMC 模型解决能源服务提供商的瓶颈

01. 什么是合同能源管理&#xff1f; 合同能源管理(EMC-Energy Management Contract) 是一种新型的市场化节能机制,其实质就是以减少的能源费用来支付节能项目全部成本的节能投资方式。&#xff1a;节能服务公司与用能单位以契约形式约定节能项目的节能目标&#xff0c;节能服…

在 ubuntu 18.04 上使用源码升级 OpenSSH_7.6p1到 OpenSSH_9.3p1

1、检查系统已安装的当前 SSH 版本 使用命令 ssh -V 查看当前 ssh 版本&#xff0c;输出如下&#xff1a; OpenSSH_7.6p1 Ubuntu-4ubuntu0.7, OpenSSL 1.0.2n 7 Dec 20172、安装依赖&#xff0c;依次执行以下命令 sudo apt update sudo apt install build-essential zlib1g…

springBoot 简单的demo

springBoot 学习开始 场景开发流程1、创建项目2、导入依赖3、创建启动springBoot 项目的主入口程序4、创建业务程序5、在MainApplication文件运行程序6、将文件打包成jar包 遇到的问题未解决 希望大哥们帮忙--本地运行jar包报错 场景 浏览器发送hello请求&#xff0c;返回“he…

培训报名小程序-用户注册

目录 1 创建数据源2 注册用户3 判断用户是否注册4 完整代码总结 我们的培训报名小程序&#xff0c;用户每次打开时都需要填写个人信息才可以报名&#xff0c;如果用户多次报名课程&#xff0c;每次都需要填写个人信息&#xff0c;比较麻烦。 本篇我们就优化一下功能&#xff0c…

[静态时序分析简明教程(十)]模式分析与约束管理

静态时序分析简明教程-组合电路路径 一、写在前面1.1 快速导航链接 二、模式分析三、约束管理3.1 自顶向下的方法3.2 自底向上的方法 四、总结 一、写在前面 一个数字芯片工程师的核心竞争力是什么&#xff1f;不同的工程师可能给出不同的答复&#xff0c;有些人可能提到硬件描…

新品牌怎么进行产品营销,小红书布局指南!

随着互联网的快速发展&#xff0c;新品牌们逐渐意识到小红书这一平台的潜力。平台庞大的用户群体和高度活跃的内容创作者&#xff0c;成为新品牌布局的理想之地。今天&#xff0c;就来和大家分享一下新品牌怎么进行产品营销&#xff0c;小红书布局指南&#xff01; 第一步&…

ARM 作业1

一、思维导图 二、 1. 2. .text 文本段 .globl _start 声明_start:mov r0,#0mov r1,#0fun:cmp r1,#100bhi stopadd r0,r0,r1add r1,r1,#1b fun stop:b stop .end

简单谈谈 EMP-SSL:自监督对比学习的一种极简主义风

论文链接&#xff1a;https://arxiv.org/pdf/2304.03977.pdf 代码&#xff1a;https://github.com/tsb0601/EMP-SSL 其他学习链接&#xff1a;突破自监督学习效率极限&#xff01;马毅、LeCun联合发布EMP-SSL&#xff1a;无需花哨trick&#xff0c;30个epoch即可实现SOTA 主要…

图解结构体大小和位域例子

struct A {short a; char b; int c : 1; char d : 4; short e : 7; }; 备注&#xff1a;蓝色&#xff1a;表示占一个符号位空间红色&#xff1a;表示补齐其他颜色&#xff1a;实际最大值所占空间 &#xff08;1&#xff09;图解例1 st…

ASPICE学习笔记

文章目录 1. ASPICE是什么?2. ASPICE能干什么?2.1 过程参考模型2.2 过程评估模型参考1. ASPICE是什么? ASPICE的全称是Automotive SPICE。很明显的看出ASPICE是由SPICE发展而来。而SPICE是由国际标准化组织ISO、国际电工委员会IEC、信息技术委员会JTC1发起制定的ISO15504标…

VSCODE[配置ssh免密远程登录]

配置ssh免密远程登录 本文摘录于&#xff1a;https://blog.csdn.net/qq_44571245/article/details/123031276只是做学习备份之用&#xff0c;绝无抄袭之意&#xff0c;有疑惑请联系本人&#xff01; 这里要注意如下几个地方: 1.要进入.ssh目录创建文件: 2.是拷贝带"ssh-…

Android面试官:“来给我讲讲View绘制?”

前言 迎面走来的一位中年男子&#xff0c;他一手拿着保温杯&#xff0c;一手抱着笔记本电脑&#xff0c;顶着惺忪的睡眼&#xff0c;不紧不慢地走着&#xff0c;不多的几根头发在他头顶自由飞翔。过了一会&#xff0c;他面对着我坐下&#xff0c;放下电脑和保温杯&#xff0c;…