NSQ 实现逻辑探秘

news2025/1/5 8:52:47

1 什么是 NSQ

NSQ 是一个消息队列中间件,用 go 实现,有如下特点:

  1. 分布式: 它提供了分布式的、去中心化且没有单点故障的拓扑结构,稳定的消息传输发布保障,能够具有高容错和高可用特性。

  2. 易于扩展: 它支持水平扩展,没有中心化的消息代理( Broker ),内置的发现服务让集群中增加节点非常容易。

  3. 运维方便: 它非常容易配置和部署,灵活性高。

  4. 高度集成: 现在已经有官方的 Golang、Python 和 JavaScript 客户端,社区也有了其他各个语言的客户端库方便接入,自定义客户端也非常容易。

2 名词解释

名词

释义

Topic

一个 topic 就是程序发布消息的一个逻辑键,当程序第一次发布消息时就会创建 topic

Channel

类似 kafka 中的消费组,是消费者之间的负载均衡。每当一个发布者发送一条消息到一个 topic,消息会被复制到所有消费者连接的 channel 上,然后将消息随机推送到其中一个消费者

nsqd

nsq 核心逻辑所在,负责接收消息、排队消息、并投递消息给消费者。可以同时运行多个 nsqd,不同的 nsqd 相互独立。如果存在 nsqlookup, 则会连接到 nsqlookup, 并向其同步 topic 和 channel 信息。

nsqlookup

负责管理拓扑信息。客户端通过查询 nsqlookupd 来发现指定 topic 和 channel 的 nsqd 地址。有两个接口: TCP 接口, nsqd 用它来广播; HTTP 接口,客户端用它来发现和管理。

3 整体介绍

3.1 数据流动

topic 中的消息会被复制到多个 channel 中,并将消息推送到其中一个消费这个 channel 的消费者手中。

3.2 nsqlookup 和 nsqd

图中表明了 3 中类型的连接:

  1. 黑实线带箭头。consumer 会直连 nsqd,并从 nsqd 获取消息

  2. 蓝虚线。consumer 会询问 nsqlookup 某个 topic 在哪些 nsqd 上存在,nsqlookup 会返回 nsqd 的信息

  3. 灰虚线。nsqd 和 nsqlookup 会建立一个长连接,并在 topic 或者 channel 发生变更时,将 topic 和 channel 信息同步到 nsqlookup

4 详细介绍

4.1 生产者

4.1.1 生产的负载均衡

go sdk 中提供的 NewProducer 函数需要提供 nsqd 的地址,因此生产者会直连某个 nsqd,并向其投递消息。如果希望生产也能实现负载均衡,则需要我们自己做进一步的封装,大概逻辑是:

  1. 向 nsqlookup 获取所有 nsqd 的信息

  2. 建立多个 producer, 每个 producer 与不同的 nsqd 建立连接

  3. 每次生产消息时,随机挑选一个 producer 发布消息

初次之外,还需要一个守护任务,定时从 nsqlookup 刷新 nsqd 信息,添加新上线的 nsqd,移除异常的 nsqd

4.1.2 消息类型

nsq 支持的消息类型包括:

  1. 普通消息。该消息发布后,会尽快的推送到消费者那儿。

  2. 延迟消息。生产者可以指定消息延迟多少时间后再推送给消费者。需要注意的是,延迟时间并不保证精确,即并不是在延迟时间到达后,消息一定会被消费,包括如下原因:

    1. 在延迟时间到达后,消息只是可以推送给消费者,但是如果消费已经积压,则延迟消息依然需要排队

    2. 延迟时间的计算并不精确,这在 4.2.3 节会进行介绍

    3. 延迟消息一旦被存储到磁盘,则会丢失延迟时间信息,因而就会退化成普通消息。消息何时会存储到磁盘将在 4.2.1 节中介绍

  3. 临时消息。当 topic 名称以 "#ephemeral" 结尾时,就被认为是临时 topic,临时 topic 中的消息不会被持久化,且当不再有消费者消费时,临时 topic 会被删除。

4.2 Nsqd

4.2.1 消息接收

在 nsq 实现中,topic 和 channel 都维护了内存队列 msgChan 和磁盘队列 backendQueue,消息会被优先写入内存队列中。消息一旦写入磁盘队列,消息将会丢失延迟信息。生产者在发布消息后,消息首先会进入 topic 的消息队列,然后会被复制到这个 topic 关联的 channel。

临时 topic 没有磁盘队列,所以一旦 msgChan 满,则临时 topic 中的消息将会被丢弃。

nsq 支持延迟消息,因而 channel 还会额外维护一个延迟队列。在消息到期后,消息会从延迟队列中取出并发送到 channel 的消息队列中等待发送。延迟队列的实现将在 4.2.3 节中介绍。

整体流程大致如下

4.2.2 消息推送

如上所述,消息会被复制到所有的 channel,然后将其推送到消费端。但是消息的推送需要兼顾消费端的消费能力。nsq 通过 rdy(ready 的缩写)和 InFlight 队列实现推送速度的控制。

rdy 是消费端通过 TCP 请求设置的,表明当前消费端允许 nsqd 推送多少个消息过来。InFlight 队列是 channel 维护的,存储的是当前已推送但是没有收到响应的消息,响应包括 FIN 和 REQ 两种。FIIN 表示消息成功消费,REQ 表示消费失败,消息需要重新推送。如果 InFlight 中的消息数量已经大于了 rdy,则 nsqd 会停止推送消息。

nsqd 会定时处理 InFlight 队列中的消息,如果发现消息超时未回复,则会从 InFlight 队列中移除,重新推送。

若 nsqd 收到 REQ 回复,则会将消息放入到延迟队列中,延迟时间是消费端在 REQ 回复中设置的。

4.2.3 定时逻辑的处理

nsqd 有两个场景涉及到定时任务:

  1. 延迟消息的延迟推送

  2. InFlight 队列中消息的超时判断

nsqd 借鉴了 redis 的过期算法,主要逻辑如下:

  1. 每隔 QueueScanInterval (默认值 100ms) 时间唤醒一次

  2. 随机从所有 channel 中选择 QueueScanSelectionCount (默认20) 数量的 channel 开始处理。

  3. InFlight 队列和延迟消息队列使用最小堆实现,因此可以非常快速的找到最早过期的消息。nsqd 的定时逻辑每次被唤醒的时候,都会从 InFlight 队列和延迟消息队列中找出所有到期的消息,然后将其推送出去

  4. 如果有超过 25% 的 channel 存在过期的消息,则回到第 2 步,继续处理。

4.2.4 与 nsqlookup 的交互

  1. nsqd 会开启一个守护任务,在 topic 新增/删除,channel 新增/删除的时候,将事件告知 nsqlookup。

  2. 在 nsqd 刚与 nsqlookup 建立连接时,将会同步当前 nsqd 的 topic 和 channel 信息。

  3. 在 nsqd 同步 topic 和 channel 失败的时候,nsqd 会和 nsqlookup 断开连接,并在下次需要和 nsqlookup 通信时,尝试重新建立连接。

通过这种机制,保证 nsqlookup 始终能够保存最新的 topic 和 channel 信息。

4.3 消费端

4.3.1 连接 nsqd

go sdk 提供了多种方法去和 nsqd 建立连接:

  1. ConnectToNSQD: 提供单个 nsqd 的地址,并与之建立连接

  2. ConnectToNSQDs: 提供一组 nsqd 地址,分别和他们建立连接

  3. ConnectToNSQLookupd:提供一个 nsqlookup 地址,消费端从 nsqlookup 查询 nsqd 地址

  4. ConnectToNSQLookupds:提供一组 nsqlookup 地址,每次随机挑选一个 nsqlookup 查询 nsqd 地址

根据 nsqd 的实现,首选第 4 种方法

如果是通过 nsqlookup 发现 nsqd,消费端会定时查询 nsqlookup, 刷新本地的 nsqd 地址。

4.3.2 分配 rdy

在 4.2.2 节中了解到,消费速度是由消费端控制的。消费端会根据自己的情况,向 nsqd 发送 “RDY” 命令,从而控制 nsqd 最多发送多少个消息过来。

与此同时,消费端会维护 MaxInFlight 配置,表示消费端可以并发处理的消费总数。分配给每个 nsqd 的 rdy 之和必须小于 MaxInFlight. 

因此,给每个 nsqd 分配多少个 rdy,什么时候会分配 rdy,就成了消费速度控制的关键。

有如下几个场景消费端会发送 RDY 命令:

  1. 在和 nsqd 建立连接的时候。消费端会将 MaxInFlight 平均分配给每个 nsqd,但是至少会分配 1,即最坏情况下,每个 nsqd 只能串形推送消息

  2. 在消费失败的时候,消费端会默认进入退避模式,此时会将所有 nsqd 的 rdy 设置为0。在等待一段时间后,会开始消费恢复流程,此时会随机选择一个 nsqd,为其分配 rdy = 1。重试逻辑将在 4.3.3 节中详细介绍

  3. 消费端会启动一个守护任务,它会在 nsqd 数量大于 MaxInFlight 时,会将长时间未收到消息或者长时间未修改过 rdy 的 nsqd 的 rdy 置为0,并尽可能的将更多的 nsqd 的 rdy 置为1,从而保证在 nsqd 数量大于 MaxInFlight 时,每个 nsqd 的消息都有可能被消费到

4.3.3 失败重试

当我们消费失败时,会进行重试,重试是通过消费端向 nsqd 发送 “REQ” 命令实现,“REQ” 命令包含一个 delay 字段,用于告知 nsqd 应该延迟多久再推送。delay 的默认计算公式如下:

// DefaultRequeueDelay 是默认的重试时间,默认 90s
// Attempts 是这个消息重试的次数
// MaxRequeueDelay 是延迟时间的上限,默认 15m
delay = MIN(DefaultRequeueDelay * Attempts, MaxRequeueDelay)

当重试次数大于 MaxAttempts,消费端会直接向 nsqd 发送 “FIN”,从而结束重试。

默认情况下,即我们使用了自动 ACK 机制,重试会进入退避模式,具体逻辑如下:

  1. 维护了 backoffCounter 字段,表示进入到退避模式的次数。

  2. 计算退避的时间。当前提供两种策略,指数退避以及随机退避,时间的计算和 backoffCounter 正相关,但是最大不会大于 MaxBackoffDuration (默认 2m)

  3. 将当前所有 nsqd 的 rdy 置为 0

  4. 在退避时间到期后,会尝试进行恢复,即随机找到一个 nsqd,将它的rdy置为1,尝试重新开始消费

  5. 每次消费成功时,会将 backoffCounter - 1,如果 backoffCounter 为0,则重新平均分配 rdy,结束退避模式

QA

是否会丢消息

这个和生产者,消费者,nsqd 三方都有关系,我们这里只考虑 nsqd 会不会可能丢消息。

如果是临时 topic 则可能会丢失消息,但是其他消息,正常情况下不会丢失消息。但是若 nsqd 下线,则该 nsqd 上的消息将无法消费到。

是否能实现有序消费

nsq 的设计就没有考虑支持有序消费的场景,即使消息都发布到了同一个 nsqd,消费端也只开启了一个协程进行消费,但是由于以下两个原因,也很难保证一定是顺序消费:

  1. 消息重新推送是重新入队,此时排在了消息队列的末尾

  2. nsqd 在内存队列满的时候会将消息写入磁盘,而磁盘中的队列和内存队列的顺序是无法保证的

是否能更改消费进度,对消息进行重放

当前不支持这个能力

在发布/消费延迟消息的时候需要注意什么

  1. 延迟消息会进入延迟队列,延迟队列是内存中的最小堆,如果延迟消息很多又没有被及时消费,则可能导致内存/cpu占用高

  2. 延迟消息在一些场景下会被写入磁盘,一旦写入磁盘,则会丢失延迟信息,变成一个普通的消息

重试策略会影响消费速度吗

在如下环境下做了一个测试:

  1. 使用的是单机版部署的 nsq

  2. 消费者和生产者直连 nsqd

  3. 有足够的数据进行消费

控制变量包括:

  1. 消费失败率:failRate

结论:

即使消费失败率是万分之一,即成功率达到4个9,对消费速度的影响还是显而易见的。随着失败率的提高,消费速度会越来越慢

failRate = 0

curCount 表示每秒消费的数量

failRate = 1

failRate = 10

测试代码如下

package main

import (
	"fmt"
	"math/rand"
	"sync"
	"sync/atomic"
	"time"

	"github.com/nsqio/go-nsq"
	gonsq "github.com/nsqio/go-nsq"
)

var maxInFlight = 100
var failRate = 10 // 单位:万分之一,即如果 failRate = 100,表示错误率是 1%

var ticker = time.NewTicker(time.Second)
var count = int32(0)
var mux sync.Mutex

func init() {
	go func() {
		for {
			select {
			case <-ticker.C:
				mux.Lock()
				fmt.Printf("curCount: %d\n", atomic.LoadInt32(&count))
				atomic.StoreInt32(&count, 0)
				mux.Unlock()
			}
		}
	}()
}

type NsqHandler struct {
	id int
}

func InitNSQConsumer(topic string) *nsq.Consumer {
	config := nsq.NewConfig()
	config.MaxInFlight = maxInFlight
	config.MaxAttempts = 3
	consumer, err := nsq.NewConsumer(topic, "main", config)

	if err != nil {
		fmt.Printf("new consumer fail, error: %+v\n", err)
		return nil
	}
	consumer.AddHandler(&NsqHandler{id: 1})

	if err := consumer.ConnectToNSQD("169.254.1.10:4150"); err != nil {
		fmt.Printf("look up fail, error: %+v\n", err)
		return nil
	}
	return consumer
}

func InitNSQProducer() *nsq.Producer {
	nsqdUrl := "169.254.1.10:4150"
	config := nsq.NewConfig()
	p, err := nsq.NewProducer(nsqdUrl, config)
	if err != nil {
		fmt.Printf("new producer fail, error: %+v\n", err)
		return nil
	}
	return p
}

func main() {
	topic := "detector.cloudwalker.detect_result.test"

	consumer := InitNSQConsumer(topic)
	if consumer == nil {
		fmt.Printf("consumer is nil\n")
		return
	}
	time.Sleep(3 * time.Second)
	producer := InitNSQProducer()
	if producer == nil {
		fmt.Printf("producer is nil\n")
		return
	}

	var wg sync.WaitGroup
	for i := 0; i < 4; i++ {
		wg.Add(1)
		go func(gi int) {
			defer wg.Done()
			for j := 0; j < 500000; j++ {
				if err := producer.Publish(topic, []byte(fmt.Sprintf("%s-%d-%d", "aaaabbbbbcccccddddd", gi, j))); err != nil {
					fmt.Printf("publish error: %+v\n", err)
					return
				}
			}
		}(i)
	}
	wg.Wait()
	time.Sleep(time.Minute)
}

func (h *NsqHandler) HandleMessage(message *gonsq.Message) error {
	if rand.Intn(10000) < failRate {
		return fmt.Errorf("always error")
	}
	mux.Lock()
	defer mux.Unlock()
	atomic.AddInt32(&count, 1)
	return nil
}

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.coloradmin.cn/o/710837.html

如若内容造成侵权/违法违规/事实不符,请联系多彩编程网进行投诉反馈,一经查实,立即删除!

相关文章

【Echarts】echarts饼图、圆环图配置代码详解

前言 简介&#xff1a;本文将从头开始&#xff0c;带你快速上手 echarts最常用图例—饼图 准备&#xff1a;请自行先将echarts图例引入你的项目&#xff0c;本文不多介绍。&#xff08;引入 echarts教程&#xff1a;http://t.csdn.cn/mkTa4&#xff09; 心得&#xff1a;echar…

递归函数:

含义&#xff1a;自己调自己 递归三要素&#xff1a;定义函数、终止条件和等价关系式 小案例&#xff1a;排序 let arr1 [8, 8, 9, 13, 45, 8, 0, 1, 9, 66];//定义函数function quickSort(arr) {//终止条件if (arr.length < 1) return arr;const baseIndex Math.floor(…

十五、docker学习-docker核心docker数据卷

什么是数据卷 当我们在使用docker容器的时候&#xff0c;会产生一系列的数据文件&#xff0c;这些数据文件在我们删除docker容器时是会消失的&#xff0c;但是其中产生的部分内容我们是希望能够把它给保存起来另作用途的&#xff0c;Docker将应用与运行环境打包成容器发布&…

【游戏逆向】D3D HOOK实现透视讲解

实现目的: 目前大部分游戏通过Direct3D实现3D效果,通过挂钩相应函数,可以实现3D透视,屏幕挂字效果。而透视,屏蔽特定效果,设置透明在很多游戏(特别是FPS)中发挥着巨大的作用! 实现思路: [D3D] DirectX的功能都是以COM组件的形式提供的。在Direct3D中,主要通过采…

Unity新输入系统

1、导入新输入系统 &#xff08;1&#xff09; 这里改成.NET Framework&#xff0c;下面改成input system package(New) 2、使用新系统 &#xff08;1&#xff09; 在你的player物体上添加Player Input组件&#xff0c;然后CreateAction &#xff08;2&#xff09; 创建出…

连接器信号完整性仿真教程 五

本文将详细介绍CST电磁仿真的激励源&#xff08;Excitation Source&#xff09;及其设置。CST微波工作室根据具体应用和结构类型提供多种不同的激励源&#xff0c;总得来说包含激励端口&#xff08;Excitation Port&#xff09;和场源&#xff08;Field Sources&#xff09;。 …

3.Mysql子查询练习

1.子查询概述 子查询指一个查询语句嵌套在另一个查询语句内部的查询&#xff0c;内部的查询是外部查询的条件&#xff0c;这个特性从MySQL4.1开始引入 子查询(内查询)在主查询之前执行完成 子查询的结果被主查询(外查询)使用 注意事项&#xff1a; 子查询要包含在括号内 将子查…

Vue 时间格式转换

文章目录 将秒转换成简单时间格式方式一 表格渲染方式二 js转换 将时间转换为字符串方式一 年、月、日、时、分、秒、星期等信息方式二 返回多久之前的时间 将秒转换成简单时间格式 方式一 表格渲染 element-ui 表格为例&#xff0c;duration 单位为秒 <el-table-column …

逻辑回归精讲

一、从线性回归到逻辑回归 对于分类问题&#xff0c;我们该如何解决 可以通过线性回归阈值解决吗&#xff1f; 就上面的这张图而言&#xff0c;横轴蓝色的那条线是可以将正负样本区分开的。那我们再看一个例子 就上面的图而言&#xff0c;横轴蓝色的那条线无法将正负例正确划…

基于springboot的垃圾分类网站的设计与实现

系统设计 本垃圾分类网站主要包括三大功能模块&#xff0c;即用户功能模块和管理员功能模块、垃圾分类管理员功能模块。源码下载 &#xff08;1&#xff09;管理员模块&#xff1a;系统中的核心用户是管理员&#xff0c;管理员登录后&#xff0c;通过管理员功能来管理后台系统…

SpringMVC (四) 数据处理及跳转

学习回顾&#xff1a;SpringMVC &#xff08;三&#xff09; RestFul和控制器 现在我们来看看SpringMVC参数接收处理和结果跳转处理吧&#xff01; 结果跳转方式 一、ModelAndView 设置ModelAndView对象 , 根据view的名称 , 和视图解析器跳到指定的页面 . 页面 : {视图解析器前…

测试员该如何向七大姑八大姨解释你的工作?

过年回家&#xff0c;走亲访友带来了一年未见的七大姑八大姨们&#xff0c;必不可少会出现一系列“灵魂拷问”&#xff0c;比如“二狗&#xff0c;在做啥工作呢&#xff1f;” 相比“有对象了么&#xff1f;”、“啥时候生娃&#xff1f;”等硬核话题&#xff0c;合理地向七大姑…

如果只能推荐3本关于python的书,你会推荐哪3本?

如果只能推荐3本Python书的话&#xff0c;我推荐这3本。 第一本&#xff1a;Python编程快速上手 让繁琐工作自动化 第2版  豆瓣评分8.9 本书是一本面向初学者的Python编程实用指南。本书不仅介绍了Python语言的基础知识&#xff0c;而且通过案例实践教读者如何使用这些知识和…

css自学框架之栅格化12格布局、flex布局下两端对齐,不满左对齐

flex基础知识 1.flex-direction 容器内元素的排列方向(默认横向排列) flex-direction:row; 沿水平主轴让元素从左向右排列flex-direction:column; 让元素沿垂直主轴从上到下垂直排列flex-direction:row-reverse;沿水平主轴让元素从右向左排列 2.flex-wrap 容器内元素的换行(…

java feign的使用详细步骤及okhttp的使用

1、首先创建一个feign的模块并配置依赖&#xff0c;如图&#xff1a; 1、引入依赖 <dependency><groupId>org.springframework.cloud</groupId><artifactId>spring-cloud-starter-openfeign</artifactId> </dependency> <dependency&g…

kali换源

sudo vim /etc/apt/sources.list&#xff08;打开sources.list 文件&#xff09; 官方源 deb http://http.kali.org/kali kali-rolling main non-free contrib deb-src http://http.kali.org/kali kali-rolling main non-free contrib 中科大源 deb http://mirrors.ustc.edu.cn…

小主机折腾记15

海鲜市场买到个华硕的h61主板&#xff0c;支持笔记本内存又带独显插槽&#xff0c;40大洋包邮…… 到货后把老笔记本上拆下来的两根威刚2g 1066的内存条安上&#xff0c;上集提到的i5 2390t安上&#xff0c;之前买的gt440安上&#xff0c;最后安上了之前买的惠普侧吹风散热器&…

北大2019计算机学科夏令营上机考试

目录 A:数与字符串【找规律】 B:打印月历【暴力水题】 C:Hopscotch【BFS】 D:上楼梯【动态规划】 E:Life Line 【图】 F:跳蛙【DSP】 G:Falling Leaves【二叉搜索树】 H&#xff1a;昂贵的聘礼【图】 I:Connect【放弃】 A:数与字符串【找规律】 #include<iostream&…

idea项目提交到git 这一篇就够了

1. 下载git 到本地文件夹 2. 在windows端打开命令行 winR 然后cmd 首先查看git是否安装成功 从这里就可以看出git已经安装成功 然后使用 git config --list 查看git的基本配置 如果是第一次使用&#xff0c;需要创建用户名和邮箱 配置成功后再次使用 git config --list …

生成式AI, 新兴职业?

动动发财的小手&#xff0c;点个赞吧&#xff01; 生成式AI是一种基于人工智能技术的创新领域&#xff0c;它的目标是通过机器学习和自然语言处理等技术来模拟人类的创造力和智慧&#xff0c;从而生成全新的内容&#xff0c;如文本、图像、音频等。生成式AI在近年来取得了巨大的…