nsqd的架构及源码分析

news2025/1/11 17:41:12

文章目录

一  nsq的整体代码结构

二  回顾nsq的整体架构图

三  nsqd进程的作用

四  nsqd启动流程的源码分析

五  本篇博客总结


在博客 nsq整体架构及各个部件作用详解_YZF_Kevin的博客-CSDN博客 中我们讲了nsq的整体框架,各个部件的大致作用。如果没看过的,建议大家去学习下,不然理解后续的内容会有难度

这篇博客开始我们来看下每个部件的详细功能,从源码入手分析其内部实现原理

一  nsq的整体代码结构

建议大家也下载nsq的代码,一边看博客一边看代码印象更深刻。nsq的官方git代码地址:GitHub - nsqio/nsq: A realtime distributed messaging platform

nsq代码结构如下,图中有注释,大家先有个整体印象,知道各个模块的代码在哪就行

二  回顾nsq的整体架构图

 图中最上面的四个节点就是nsqd进程,至少要有1个,可以多开。我们画了4个,分别是nsq1,nsq2,nsq3,nsq4

注意看nsqd的连接关系,每个nsqd节点和所有客户端都有连接(tcp+http),且每个nsqd节点和所有的nsqlookupd节点也有连接(tcp)

三  nsqd进程的作用

1. topic的创建,清空,暂停,重新激活,删除,持久化(保存到文件,从文件加载),同步给nsqlookupd进程

2. channel的创建,清空,暂停,重新激活,删除,持久化(保存到文件,从文件加载),同步给nsqlookupd进程

3. message的监听,中转,持久化(保存到文件,从文件加载),主动推送消息给各个客户端,超时重发,消息计数

4. 配置修改,运行状态(协程、内存)统计

5. 抽检channel的延迟队列,飞行队列,消息超时的重新入队

可以说,nsqd进程是整个nsq平台的核心,消息队列架构简单的话,只有一个nsqd进程就够了。

四  nsqd启动流程的源码分析

nsqd的代码主要在两块

1. 代码框架及main函数,目录在 nsq/apps/nsqd/*

2. 实现代码,目录在 nsq/nsqd/*

值得一提的是nsqd,nsqlookupd,nsqadmin这三个进程的框架都使用了go-svc包,这个包很简单,使用者只需实现它的三个函数即可

Init()           配置,初始化等操作

Start()        真正启动

Stop()        结束时的关闭操作

好了,我们看nsqd的入口,也就是main函数,代码在nsq/apps/nsqd/main.go,代码如下(已加注释)

type program struct {
	once 		sync.Once
	nsqd 		*nsqd.NSQD	// nsqd对象
}

// nsqd的启动入口
func main() {
	prg := &program{}
	// Run内部会调用Init(),Start(),监听到这两个系统信号时会调用Stop()
	if err := svc.Run(prg, syscall.SIGINT, syscall.SIGTERM); err != nil {
		logFatal("%s", err)
	}
}

main()函数内部只有一个对象program,也只有一处调用svc.Run(),这个函数内部会调用program.Init()program.Start()

其中program.Init()函数,主要是创建并检测nsqd的配置,然后根据配置创建出一个nsqd实例

重点在program.Start()函数,代码如下(已加注释)

// nsqd的启动,重点在调用的Main()函数
func (p *program) Start() error {
	// 加载元数据,并创建初始化出所有的topic对象,所有的channel对象
	err := p.nsqd.LoadMetadata()
	if err != nil {
		logFatal("failed to load metadata - %s", err)
	}
	// 再持久化元数据到文件(不要觉得奇怪,因为上面的LoadMetadata()函数可能会过滤掉一些无效的topic,channel,这里再重写算是刷新了元数据)
	err = p.nsqd.PersistMetadata()
	if err != nil {
		logFatal("failed to persist metadata - %s", err)
	}
	// 启动一个新协程,专门运行nsqd的Main()循环,注意这个Main()是永不退出的(除非出错)
	go func() {
		err := p.nsqd.Main()
		if err != nil {
			p.Stop()
			os.Exit(1)
		}
	}()
	return nil
}

对上面的代码解释下,program.Start()函数一共干了3件事

1. nsqd.LoadMetadata(), 这个函数根据配置加载旧nsqd元数据。这些元数据包含版本号,topic,channel,过滤掉不合法的topic和channel,合法的topic和channel都创建出对象,并且为每个topic建立处理循环

2. nsqd.PersistMetadata(), 把过滤后的topic和channel再保存到文件nsqd.dat,算是把旧数据过滤了一遍

3. 新启动一个协程,调用nsqd.Main(),这个Main()是nsqd的核心,启动了nsqd的全部服务。除非遇到错误,否则永不退出

接下来看nsqd.Main()的内部实现,代码在nsq/nsqd/nsqd.go,代码如下(已加注释)

// nsqd主协程(内部启动tcp循环,http循环,https循环, 扫描队列池,和nsqlookupd循环),永不退出,除非严重错误
func (n *NSQD) Main() error {
	exitCh := make(chan error)
	var once sync.Once

	// 退出函数(独立协程运行,一直监听,遇到错误
	exitFunc := func(err error) {
		once.Do(func() {
			if err != nil {
				n.logf(LOG_FATAL, "%s", err)
			}
			exitCh <- err
		})
	}

	// TCP服务,独立协程
	n.waitGroup.Wrap(func() {
		exitFunc(protocol.TCPServer(n.tcpListener, n.tcpServer, n.logf))
	})

	// HTTP服务,独立协程
	if n.httpListener != nil {
		httpServer := newHTTPServer(n, false, n.getOpts().TLSRequired == TLSRequired)
		n.waitGroup.Wrap(func() {
			exitFunc(http_api.Serve(n.httpListener, httpServer, "HTTP", n.logf))
		})
	}

	// HTTPS服务,独立协程
	if n.httpsListener != nil {
		httpsServer := newHTTPServer(n, true, true)
		n.waitGroup.Wrap(func() {
			exitFunc(http_api.Serve(n.httpsListener, httpsServer, "HTTPS", n.logf))
		})
	}

	// 独立协程,抽检扫描各个队列
	n.waitGroup.Wrap(n.queueScanLoop)

	// 独立协程,和nsqlookupd的循环(连接和重连,心跳维持,topic,channel变化通知等)
	n.waitGroup.Wrap(n.lookupLoop)

	if n.getOpts().StatsdAddress != "" {
		n.waitGroup.Wrap(n.statsdLoop)
	}

	err := <-exitCh
	return err
}

对上面的代码解释下,nsqd.Main()主要干了6件事

1. 开一个新协程,启动tcp服务并一直监听,为客户端一共tcp服务。我们的客户端最常用,因为生产消息,中转消息,处理消息都是这里实现的

2. 开一个新协程,启动http服务并一直监听,为客户端提供htttp服务

3. 开一个新协程,启动https服务并一直监听,为客户端提供htttps服务

4. 开一个新协程,建立并维持扫描池,这些扫描协程会扫描所有channel的延迟队列,飞行队列,如果消息超时了就重新入队。很有意思的是,nsqd作者很大方地承认他抄袭了redis的抽检策略,内部实现也确实是类redis操作,这个我们后面再讲,todo

5. 开一个新协程,和nsqlookupd建立循环,主要是连接和重连,心跳维持,实时报告自己的topic和channel变化

6. 开一个新协程,做统计操作,统计topic,channel,消息,内存,GC等等

五  本篇博客总结

1. 给大家看了nsq平台下代码整体结构,建议大家下载源码自己看下,加强印象

2. 讲了nsqd进程提供的功能实现

3. 跟踪了nsqd进程启动流程,最核心的nsqd.Main()建议大家仔细看,后面讲的nsqd内容也都是这几个协程里面干的活

下一篇博客我们讲nsqd内部各个协程的工作

todo

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.coloradmin.cn/o/830982.html

如若内容造成侵权/违法违规/事实不符,请联系多彩编程网进行投诉反馈,一经查实,立即删除!

相关文章

cloudstack远程调试

前置条件&#xff1a;服务器安装好cloudstack的management、agent; 1、managemeng、agent启动服务文件 packaging/systemd cloudstack-agent.default # Licensed to the Apache Software Foundation (ASF) under one # or more contributor license agreements. See the NOTIC…

openmp和avx配置

实际场景&#xff1a; 项目中数据拷贝慢&#xff08;使用的是memcpy&#xff09;&#xff0c;希望能加速拷贝&#xff0c;所以尝试了使用avx的流方式&#xff0c;和openmp方式处理 问题1&#xff1a; 调用avx是报错 error: inlining failed in call to always_inline ‘__m512…

亲测有效!帮你更方便更舒服使用ubuntu20.04!!!

今天要记录的是如何更舒服的使用ubuntu20.04&#xff0c;全部内容就在上面这张图里&#xff0c;包括三方面&#xff1a;1、ubuntu美化&#xff1b;2、ubuntu扩展&#xff1b;3、必备软件。 1、ubuntu美化 这部分内容可以直接参考&#xff1a;这位大佬&#xff0c;讲的很详细也…

gRPC三种Java客户端性能测试实践

本篇文章只做性能测试实践&#xff0c;不会测试各类状况下极限性能&#xff0c;所以硬件配置和软件参数就不单独分享了。 服务端 依旧采用了fun_grpc项目的SDK内容。服务端代码如下&#xff1a; package com.funtester.grpc;import com.funtester.frame.execute.ThreadPoolU…

Python实现GA遗传算法优化卷积神经网络分类模型(CNN分类算法)项目实战

说明&#xff1a;这是一个机器学习实战项目&#xff08;附带数据代码文档视频讲解&#xff09;&#xff0c;如需数据代码文档视频讲解可以直接到文章最后获取。 1.项目背景 遗传算法&#xff08;Genetic Algorithm&#xff0c;GA&#xff09;最早是由美国的 John holland于20世…

验证码安全志:AIGC+集成环境信息信息检测

目录 知己知彼&#xff0c;黑灰产破解验证码的过程 AIGC加持&#xff0c;防范黑灰产的破解 魔高一丈&#xff0c;黑灰产AIGC突破常规验证码 双重防护&#xff0c;保障验证码安全 黑灰产经常采用批量撞库方式登录用户账号&#xff0c;然后进行违法违规操作。 黑灰产将各种方…

RL — 强化学习算法概述

一、说明 在本系列中&#xff0c;我们检查了许多强化学习&#xff08;RL&#xff09;算法&#xff0c;例如&#xff0c;MoJoCo任务的策略梯度方法&#xff0c;Atari游戏的DQN和机器人控制的基于模型的RL。虽然许多算法都是针对特定领域引入的&#xff0c;但这种联系只能是遗留的…

BKTEM-3A型热电材料性能测试仪(动态法)

BKTEM-3A型热电材料性能测试仪(动态法) 关键词&#xff1a;塞贝克&#xff08;seebeck&#xff09;&#xff0c;波尔贴&#xff08;Peltier&#xff09;效应&#xff0c;热电系数 BKTEM-3型热电材料性能测试仪热电材料也称温差电材料&#xff08;thermoelectric materials&…

c语言const修饰的说明

1、const修饰的为常量&#xff0c;不可以直接修改&#xff0c;但是可以通过指针修改 #include "stdio.h" #include <stdlib.h>int main() {//1、constconst int a 10;//a 100;//err 左值不可修改&#xff0c;const修饰的为常量&#xff0c;不可以直接修改&a…

WSL2安装CentOS7和CentOS8

提示&#xff1a;文章写完后&#xff0c;目录可以自动生成&#xff0c;如何生成可参考右边的帮助文档 文章目录 前言一、下载ZIP包&#xff1f;二、安装1.打开Windows子系统支持2.安装到指定位置3.管理虚拟机4.配置虚拟机1.配置国内源2.安装软件3.安装第三方源 5.配置用户1.创建…

iNav飞控之FAILSAFE机制

iNav飞控之FAILSAFE机制 1. 源由2. 设计2.1 触发场景2.1.1 上锁时触发2.1.2 解锁时触发 2.2 FAILSAFE策略2.2.1 DROP2.2.2 LAND2.2.3 SET-THR2.2.4 RTH2.2.5 NONE 2.3 异常场景2.3.1 救援上锁2.3.2 救援后解锁2.3.3 FAILSAFE地面预判2.3.4 RTH丢失定位2.3.5 RC链路恢复 3. 重要…

怎样做好字幕翻译服务?

我们知道&#xff0c;字幕泛指影视作品后期加工的文字&#xff0c;往往显示在电视、电影、舞台作品中。字幕翻译就是将外国影片配上本国字幕或者是将本国影片配上外国字幕。那么&#xff0c;字幕翻译的主要流程是什么&#xff0c;怎样做好字幕翻译服务&#xff1f; 据了解&…

二进制链表转整数

给你一个单链表的引用结点 head。链表中每个结点的值不是 0 就是 1。已知此链表是一个整数数字的二进制表示形式。 请你返回该链表所表示数字的 十进制值 。 示例 1&#xff1a; 输入&#xff1a;head [1,0,1] 输出&#xff1a;5 解释&#xff1a;二进制数 (101) 转化为十进…

IDEA超强XSD文件编辑插件-XSD / WSDL Visualizer

前言 XSD / WSDL Visualizer可以简化XML架构定义(XSD)和WSDL文件编辑过程; 通过使用与IntelliJ无缝集成的可视化编辑器&#xff0c;转换处理XSD和WSDL文件的方式。告别导航复杂和难以阅读的代码的挫败感&#xff0c;迎接流线型和直观的体验。 插件安装 在线安装 IntelliJ IDE…

yxBUG记录

1、 原因&#xff1a;前端参数method方法名写错。 2、Field ‘REC_ID‘ doesn‘t have a default value 问题是id的生成问题。 项目的表不是自增。项目有封装好的方法。调用方法即可。 params.put("rec_id",getSequence("表名")) 3、sql语句有问题 检…

【iOS】App仿写--天气预报

文章目录 前言一、首页二、搜索界面三、添加界面四、浏览界面总结 前言 最近完成了暑假的最后一个任务——天气预报&#xff0c;特此记录博客总结。根据iPhone中天气App的功能大致可以将仿写的App分为四个界面——首页&#xff0c;搜索界面&#xff0c;添加界面&#xff0c;浏…

基金公司最佳实践:如何用价值流分析,洞察研发效能瓶颈?

近日&#xff0c;ONES 受邀参加 QECon 2023 全球软件质量&效能大会&#xff08;北京站&#xff09;。在会上&#xff0c;ONES 高级研发总监&首席解决方案架构师陈亮宇&#xff0c;发表了主题为《聚焦价值流分析&#xff0c;寻找研发效能的「北极星」》的演讲&#xff0…

NSX 4.1中新的网络和高级安全功能介绍

我们很高兴地宣布VMware NSX 4.1全面上市&#xff0c;该版本为私有云、混合云和多云的虚拟化网络和高级安全提供了新功能。该版本的新特性和功能将使VMware NSX客户能够利用增强的网络和高级安全&#xff0c;提高运营效率和灵活性&#xff0c;并简化了故障排除的过程。 领先于…

使用免费MES系统的成功经验

随着科技的发展和数字化时代的到来&#xff0c;越来越多的工厂开始采用生产管理软件来提高生产效率和管理水平。 本文将分享一家工厂在使用免费生产管理软件后的成功经验&#xff0c;希望对广大读者有所帮助。 “之前也是在市面上找了很多厂家咨询报价&#xff0c;少则十几万多…

shell脚本清理redis模糊匹配的多个key,并计算释放内存大小

#!/bin/bash# 定义Redis服务器地址和端口 REDIS_HOST"localhost" REDIS_PORT6380# 获取Redis当前内存使用量&#xff08;以字节为单位&#xff09; function get_redis_memory_usage() {redis-cli -h $REDIS_HOST -p $REDIS_PORT INFO memory | grep "used_memo…