Zinx框架学习 - 消息队列及多任务

news2025/1/10 6:14:47

Zinx - V0.8 消息队列及多任务

  • 之前zinxV0.7我们已经实现了读写分离,对应每个client,我们有3个go程,分别是reader、writer、DoMsgHandle
  • 假设服务器有10W个client请求,那么server就会有10W个reader的go、10W个writer的go程,10W个DoMsgHandler的go程;其中10W个reader的go程和10W个writer的go程处于阻塞状态并不会抢占CPU资源,CPU仍然会在10W个DoMsgHandler的go程中来回切换,这个切换成本还是很高的;我们希望有固定的go程数量来处理DoMsgHandler的业务
  • 接下来我们就需要给Zinx添加消息队列和多任务Worker机制了。我们可以通过worker的数量来限定处理业务的固定goroutine数量,⽽不是⽆限制的开辟Goroutine,虽然我们知道go的调度算法已经做的很极致了,但是⼤数量的Goroutine依然会带来一些不必要的环境切换成本,这些本应该是服务器应该节省掉的成本。我们可以⽤消息队列来缓冲worker⼯作的数据

创建消息队列

  • msgHandler.go

MsgHandle增加消息队列和worker池

//消息处理模块的实现
type MsgHandle struct {
	//存放每个MsgID 所对应的处理方法
	Apis map[uint32]ziface.IRouter
	//负责Worker取任务的消息队列
	TaskQueue []chan ziface.IRequest
	//业务工作Worker池的worker数量
	WorkerPoolSize uint32
}
//初始化/创建MsgHandle方法
func NewMsgHandle() *MsgHandle {
	return &MsgHandle{
		Apis:           make(map[uint32]ziface.IRouter),
		WorkerPoolSize: utils.GlobalObject.WorkerPoolSize, //从全局配置中获取
		TaskQueue:      make([]chan ziface.IRequest, utils.GlobalObject.MaxWorkerTaskLen),
	}
}
  • globalobj.go

将消息队列和worker数量配置化

package utils

import (
	"encoding/json"
	"io/ioutil"
	"zinx/ziface"
)


//存储一切有关Zinx框架的全局参数, 供其他模块使用
//一些参数是可以通过zinx.json由用户进行配置
type GlobalObj struct {
	//Server
	TcpServer ziface.IServer //当前Zinx全局的Server对象
	Host      string         //当前服务器主机监听的IP
	TcpPort   int            //当前服务器主机监听的端口号
	Name      string         //当前服务器的名称

	//Zinx
	Version          string //当前Zinx的版本号
	MaxConn          int    //当前服务器主机允许的最大链接数
	MaxPackageSize   uint32 //当前Zinx框架数据包的最大值
	WorkerPoolSize   uint32 //当前业务工作Worker池的Goroutine数量
	MaxWorkerTaskLen uint32 //Zinx框架允许用户最多开辟多少个Worker(限定条件)
}

//定义一个全局的对外Globalobj
var GlobalObject *GlobalObj

//从 zinx.json去加载用于自定义的参数
func (g *GlobalObj) Reload() {
	data, err := ioutil.ReadFile("conf/zinx.json")
	if err != nil {
		panic(err)
	}
	//将json文件数据解析到struct中
	err = json.Unmarshal(data, &GlobalObject)
	if err != nil {
		panic(err)
	}
}

//提供一个init方法,初始化当前的GlobalObject
func init() {
	//如果配置文件没有加载,默认的值
	GlobalObject = &GlobalObj{
		Name:             "ZinxServerApp",
		Version:          "V0.8",
		TcpPort:          8999,
		Host:             "0.0.0.0",
		MaxConn:          1000,
		MaxPackageSize:   4096,
		WorkerPoolSize:   10,   //Worker工作池的队列的个数
		MaxWorkerTaskLen: 1024, //每个worker对应的消息队列的任务的数量最大值
	}

	//应该尝试从conf/zinx.json去加载一些用户自定义的参数
	GlobalObject.Reload()
}

创建及启动Worker工作池

  • StartWorkerPool() ⽅法是启动Worker⼯作池,这⾥根据⽤户配置好的 WorkerPoolSize 的数量来启动,然后分别给每个Worker分配⼀个 TaskQueue ,然后⽤⼀个goroutine来承载⼀个Worker的⼯作业务
  • StartOneWorker() ⽅法就是⼀个Worker的⼯作业务,每个worker是不会退出的(⽬前没有设定worker的停⽌⼯作机制),会不断的阻塞从对应的TaskQueue中等待消息,并处理
//启动一个Worker工作池(开启工作池的动作只能发生一次,一个zinx框架只能有一个worker工作池)
func (mh *MsgHandle) StartWorkerPool() {
	//根据workerPoolSize 分别开启Worker,每个Worker用一个go来承载
	for i := 0; i < int(mh.WorkerPoolSize); i++ {
		//一个worker被启动
		// 1 当前的worker对应的channel消息队列 开辟空间 第0个worker 就用第0个channel ...
		mh.TaskQueue[i] = make(chan ziface.IRequest, utils.GlobalObject.MaxWorkerTaskLen)
		//2 启动当前的Worker, 阻塞等待消息从channel传递进来
		go mh.StartOneWorker(i, mh.TaskQueue[i])
	}
}

//启动一个Worker工作流程
func (mh *MsgHandle) StartOneWorker(workerID int, taskQueue chan ziface.IRequest) {
	fmt.Println("Worker ID = ", workerID, " is started ...")
	//不断的阻塞等待对应消息队列的消息
	for {
		select {
		//如果有消息过来,出列的就是一个客户端的Request, 执行当前Request所绑定业务
		case request := <-taskQueue:
			mh.DoMsgHandler(request)
		}
	}
}

发送消息给消息队列

//将消息交给TaskQueue, 由Worker进行处理
func (mh *MsgHandle) SendMsgToTaskQueue(request ziface.IRequest) {
	//1 将消息平均分配给不通过的worker
	//根据客户端建立的ConnID来进行分配
	workerID := request.GetConnection().GetConnID() % mh.WorkerPoolSize
	fmt.Println("Add ConnID = ", request.GetConnection().GetConnID(),
		" reqeust MsgID = ", request.GetMsgID(),
		" to WorkerID = ", workerID)
	//2 将消息发送给对应的worker的TaskQueue即可
	mh.TaskQueue[workerID] <- request
}

SendMsgToTaskQueue() 作为⼯作池的数据⼊⼝,接收传入的request消息,这⾥⾯采⽤的是轮询的分配机制,因为不同链接信息都会调⽤这个⼊⼝,那么到底应该由哪个worker处理该链接的请求处理,整理⽤的是⼀个简单的求模运算。⽤ConnID取余和workerID的匹配来进⾏分配

Zinx集成消息队列及工作池机制

  • 将消息交给消息队列处理
//链接的读业务方法
func (c *Connection) StartReader() {
	...
		//得到当前conn数据的Request请求数据
		req := Request{
			conn: c,
			msg:  msg,
		}

		if utils.GlobalObject.WorkerPoolSize > 0 {
			//已经开启了工作池机制,将消息发送给Worker工作池处理即可
			c.MsgHandler.SendMsgToTaskQueue(&req)
		} else {
			//从路由中,找到注册绑定的Conn对应的router调用
			//根据绑定好的MsgID 找到对应处理api业务 执行
			go c.MsgHandler.DoMsgHandler(&req)
		}
	}
}

这⾥并没有强制使⽤多任务Worker机制,⽽是判断⽤户配置 WorkerPoolSize 的个数,如果⼤于0,那么我就启动多任务机制处理链接请求消息,如果=0或者<0那么,我们依然只是之前的开启⼀个临时的Goroutine处理客户端请求消息

  • 工作池开启
//启动服务器
func (s *Server) Start() {
	fmt.Printf("[Zinx] Server Name : %s, listenner at IP : %s, Port:%d is starting\n",
		utils.GlobalObject.Name, utils.GlobalObject.Host, utils.GlobalObject.TcpPort)
	fmt.Printf("[Zinx] Version %s, MaxConn:%d, MaxPackeetSize:%d\n",
		utils.GlobalObject.Version,
		utils.GlobalObject.MaxConn,
		utils.GlobalObject.MaxPackageSize)

	go func() {
		//0 开启消息队列及Worker工作池
		s.MsgHandler.StartWorkerPool()
		...
		}
	}()
}

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.coloradmin.cn/o/608775.html

如若内容造成侵权/违法违规/事实不符,请联系多彩编程网进行投诉反馈,一经查实,立即删除!

相关文章

python编程——环境搭建

作者&#xff1a;Insist-- 个人主页&#xff1a;insist--个人主页 本文专栏&#xff1a;python专栏 专栏介绍&#xff1a;本专栏为免费专栏&#xff0c;并且会持续更新python基础知识&#xff0c;欢迎各位订阅关注。 目录 一 、安装python 1、进入官网下载python 2、打开安装…

【AI4DB】商用数据库-使用AI4DB技术并商用的数据库总结

目录 1.Amazon Redshift参考链接&#xff1a; 2.阿里云-DAS-Database Autonomy Service参考链接&#xff1a; 3.Oracle Autonomous Database参考链接&#xff1a; 4.阿里云-MaxCompute&#xff08;原ODPS&#xff09;参考文档&#xff1a; 5.腾讯云——DBbrain参考链接&#xf…

python 算符优先分析法的设计实现 编译原理

本文内容&#xff1a; 1、给出文法如下: G[E] E->T|ET; T->F|T*F; F->i|(E); 可以构造算符优先表如下: *()i><<><*>><><(<<<<)>>>i>>> 2、计算机中表示上述优先关系&#xff0c;优先关系的机内存放…

飞桨花滑骨骼点动作识别比赛——从 baseline 调优讲解

赛题介绍背景数据集 思路讲解backbone 模型文件结构 -- PaddleVideo 框架configs 文件夹paddlevideo 文件夹 模型介绍1. ST-GCN -- Baseline 模型整体结构GCN部分TCN部分 2. 2s-AGCN自适应图卷积双流网络 3. CTR-GCNCTR-GC 赛题介绍 背景 2021 CCF BDCI 基于飞桨实现花样滑冰…

初识JavaScript---(1)

初识JavaScript———&#xff08;1&#xff09;&#xff01;&#xff01;&#xff01; 一、初识JavaScript 1.什么是JavaScript&#xff1f; JavaScript是运行在浏览器上的脚本语言&#xff0c;简称JS。JavaScript程序不需要我们程序员手动编译&#xff0c;编写完源代码之后…

shell编程-02-变量作用域

作用域 局部变量&#xff1a;变量只能在函数内部使用 全局变量&#xff1a;变量可以在当前 Shell 进程中使用 环境变量&#xff1a;变量还可以在子进程中使用 局部变量 函数中定义的变量默认是全局变量&#xff0c;在定义时加上local命令&#xff0c;此时该变量就成了局部变…

Spring系列-10 事务机制

背景&#xff1a; 在 事务-1 事务隔离级别和Spring事务传播机制 中对事务的特性、隔离级别、Spring事务的传播机制结合案例进行了分析&#xff1b;在 事务-2 Spring与Mybatis事务实现原理 中对JDBC、Mybatis、Spring整合Mybatis实现事务的原理结合框架源码进行了介绍&#xff…

如何免费使用GPT-4模型

一、引言 OpenAI 最近发布了ChatGPT最新的 GPT-4 模型&#xff0c;这是 OpenAI 迄今为止发布的最强大的语言模型系统。它不仅有视觉能力&#xff0c;而且是多模态的&#xff0c;可以解释文本和生成图像。此外&#xff0c;它在推理测试中表现良好&#xff0c;可以支持大约26种不…

Redis的ZipList和QuickList和SkipList和RedisObject

ZipList:压缩列表&#xff0c;为了节省内存而设计的一种数据结构 ZipList是一种特殊的双端链表&#xff0c;是由一系列的特殊编码的连续内存块组成&#xff0c;不需要通过指针来进行寻址来找到各个节点&#xff0c;可以在任意一端进行压入或者是弹出操作&#xff0c;并且该操作…

RocketMQ的学习历程(5)----broker内部设计

文章目录 概要整体架构流程技术名词解释CommitLog和ConsumeQueue页缓存和内存映射刷盘机制 小结 概要 在首个学习历程中&#xff0c;我们已经了解了&#xff0c;RokctMQ简单的工作流程。 如果想要更深的理解RokcetMQ消息处理的流程&#xff0c;broker内部流程的理解是必要的&…

【挑战全站最全】Linux系统的安装与配置教程——以CentOS为例

&#x1f680;作者&#xff1a;那个叫马尔的大夫&#x1f680; ⭐专栏&#xff1a;操作系统⭐ &#x1f33c;内容&#xff1a;主要分享一些关于Linux操作系统的知识 &#x1f967;不忘初心&#xff0c;砥砺前行~ 目录 一、用到的软件环境——虚拟机软件&#xff08;必需&#…

调用函数不仅仅只是传递正确的参数类型

这里有一个新手犯下的一个典型错误。 假设&#xff0c;我们想调用这个函数&#xff0c;GetBinaryType。 void sample() { if (GetBinaryType(TEXT(“explorer.exe”), ????)) { … } } 请问&#xff0c;这里的问号处应该传递什么类型的参数&#xff1f;你可能会说&#x…

python、pyqt5实现人脸检测、性别和年龄预测

摘要&#xff1a;这篇博文介绍基于opencv&#xff1a;DNN模块自带的残差网络的人脸、性别、年龄识别系统&#xff0c;系统程序由OpenCv, PyQt5的库实现。如图系统可通过摄像头获取实时画面并识别其中的人脸表情&#xff0c;也可以通过读取图片识别&#xff0c;本文提供完整的程…

设计模式入门:策略模式

现有一套模拟鸭子游戏&#xff0c;可以一边游泳&#xff0c;一边呱呱叫。 每种鸭子都会呱呱叫和游泳&#xff0c;只是外观不同。因此&#xff0c;quack和swim放在父类中&#xff0c;display放在子类中实现。 增加新的功能&#xff1a;鸭子飞翔。 1 我们可能想到直接在父类中增…

LeetCode——最小化字符串长度

目录 一、题目 二、题目解读 三、代码 1、set去重 2、用一个二进制数记录每个字母是否出现过 一、题目 6462. 最小化字符串长度 - 力扣&#xff08;Leetcode&#xff09; 给你一个下标从 0 开始的字符串 s &#xff0c;重复执行下述操作 任意 次&#xff1a; 在字符串…

聊一聊数据库事务的那些事(隔离级别,传播行为)

我们平时使用事务的时候&#xff0c;可能脑子里面想到和事务有关的知识点无非就是&#xff0c;ACID&#xff0c;事务隔离级别那一套&#xff0c;使用的事务也就是是通过注解的形式&#xff0c;或者手动开启事务。更细致一点的问题或许没有深究下去&#xff0c;比如事务的传播行…

STM32F407的PWM

文章目录 32的PWM资源PWM输出原理捕获/比较模式寄存器&#xff08;TIMx_CCMR1/2&#xff09;捕获/比较使能寄存器&#xff08;TIMx_CCER&#xff09;捕获/比较寄存器&#xff08;TIMx_CCR1~4&#xff09; 库函数版本的PWM波输出开启 TIM3 时钟以及复用功能时钟置 &#xff0c;配…

对CT数据进行最小最大值归一化(Min-Max Normalization)和消除过暗过亮值处理

文章目录 PIL库的图像失真问题使用最小最大值归一化&#xff08;Min-Max Normalization&#xff09;预处理消除过暗过亮值pytorch中对tensor使用最小最大值归一化处理&#xff08;torchvision.transforms&#xff09; 我们在处理CT图像时&#xff08;以dcm格式为例&#xff09;…

全栈小程序开发路线

目录 个人心得&#xff1a; 我的学习路线 个人心得&#xff1a; 我擅长的是小程序开发和技术变现&#xff0c;从2021年至今开发上线20于个小程序&#xff0c;矩阵用户超过10万&#xff0c;变现10万左右。 以下是部分小程序截图&#xff0c;追风口做的小程序&#xff0c;基本…

「Win」Windows注册表介绍与操作

✨博客主页&#xff1a;何曾参静谧的博客 &#x1f4cc;文章专栏&#xff1a;「Win」Windows程序设计 相关术语 Windows的注册表&#xff1a;是一个重要的系统组件&#xff0c;用于存储操作系统和应用程序的配置信息。它类似于一个数据库&#xff0c;包含了各种键值对、参数、设…