rtmp协议转websocketflv的去队列积压

news2025/1/9 14:57:26

websocket server的优点

websocket server的好处:WebSocket 服务器能够实现实时的数据推送,服务器可以主动向客户端发送数据

1 不需要客户端不断轮询。
2 不需要实现httpserver跨域。
在需要修改协议的时候比较灵活,我们发送数据的时候比较方便,因为两边可以随时发送协议, 且做客户端的程序更为方便,websocket协议头部已经定义了包长,使用大部分库可以直接收数据,解决了粘包的问题,所以websocket协议是一个使用比较顺畅的协议

实现websocket server

先用boost 的协程顶一下,主要是需要将http协议升级到websocket, 因此在一个函数里面实现两种server的接收,http协议顺便就接收了,同时在客户端里面存储所有的链接对象,以下是主要实现的握手协议代码,以供参考

bool func_hand_shake(boost::asio::yield_context &yield)
	{
		DEFINE_EC
		asio::streambuf content_;
		size_t length = asio::async_read_until(v_socket, content_, "\r\n\r\n", yield[ec]);
		ERROR_RETURN_FALSE
		asio::streambuf::const_buffers_type bufs = content_.data();
		std::string lines(asio::buffers_begin(bufs), asio::buffers_begin(bufs) + length);
		//std::cout<<lines<<std::endl;
		c_header_map hmap;
		//std::string get;
		int protocol = fetch_head_info(lines, hmap, v_app_stream);
		if (protocol != GET)
			return false;
		cout << "GET:" << v_app_stream << endl; //like this--> live/1001 rtmp server must like this

		auto iter = hmap.find("Upgrade");
		if (iter == hmap.end())
		{
			//it is the http protocol ,not websocket
			//func_hand_http(m, yield);
			size_t ret = boost::asio::async_write(v_socket, boost::asio::buffer(FLV_HTTP_HEADERS,
				FLV_HTTP_HEADERS_LEN), yield[ec]);
			//ERROR_RETURN_FALSE
		
			v_key = hash_add(v_app_stream.c_str(), HASH_PRIME_MIDDLE);
			if (c_hubs::instance()->push(v_key, shared_from_this(), true) !=0)
			{
				//we can not find the stream 
				//return 404 error
				if (ret == -1)
				{
					size_t len_ = sizeof(buffer404) - 1; //remove the '\0' one bytes
					asio::async_write(v_socket, asio::buffer(buffer404, len_), yield[ec]);
					//ERROR_RETURN_FALSE
					//return false;
				}
				return false;
			}
			return true;
		}
		else
		{
			v_iswebsocket = true;
			std::string response, key, encrypted_key;
			//find the get
			//std::string request;
			size_t n = lines.find_first_of('\r');
			//find the Sec-WebSocket-Key
			size_t pos = lines.find("Sec-WebSocket-Key");
			if (pos == lines.npos)
				return false;
			size_t end = lines.find("\r\n", pos);
			key = lines.substr(pos + 19, end - pos - 19) + "258EAFA5-E914-47DA-95CA-C5AB0DC85B11";
			//get the base64 encode string with sha1
#if 0
			boost::uuids::detail::sha1 sha1;
			sha1.process_bytes(key.c_str(), key.size());
			unsigned int digest[5];
			sha1.get_digest(digest);
#endif
#if 1
			SHA1 sha;
			unsigned int digest[5];
			sha.Reset();
			sha << key.c_str();
			sha.Result(digest);
#endif


			for (int i = 0; i < 5; i++) {
				digest[i] = htonl(digest[i]);
			}

			encrypted_key = base64_encode(reinterpret_cast<const uint8_t*>(&digest[0]), 20);
			//base64_encode(first, encrypted_key);

			/*
			The handshake from the server looks as follows :

			HTTP / 1.1 101 Switching Protocols
			Upgrade : websocket
			Connection : Upgrade
			Sec-WebSocket-Accept: s3pPLMBiTxaQ9kYGzzhZRbK + xOo =
			Sec-WebSocket-Protocol: chat
			 */
			 //set the response text
			response.append("HTTP/1.1 101 WebSocket Protocol Handshake\r\n");
			response.append("Upgrade: websocket\r\n");
			response.append("Connection: Upgrade\r\n");
			response.append("Sec-WebSocket-Accept: " + encrypted_key + "\r\n\r\n");
			//response.append("Sec-WebSocket-Protocol: chat\r\n");
			//response.append("Sec-WebSocket-Version: 13\r\n\r\n");
			size_t ret = boost::asio::async_write(v_socket, boost::asio::buffer(response), yield[ec]);
			//ERROR_RETURN_FALSE
				//calculate the hash key 
			v_key = hash_add(v_app_stream.c_str(), HASH_PRIME_MIDDLE);

			c_hubs::instance()->push(v_key, shared_from_this(),false);
		}
		return true;
	}

在这里插入图片描述

rtmp协议

这个协议太过出名,实在没什么好说的

实现http协议

实现websocket协议的时候顺带实现,使用map数据结构存储

转发单线程,去除队列

1 数据共享
在发送数据的时候,rtmp 和 httpflv发送 以及 websocket发送使用同一缓存,这样有一个问题,即使我们使用共享的数据结构同时使用同一个内存,也不一定会共享申请内存时多余的头部,以下是数据结构

typedef struct s_memory
{
	//head 
	uint8_t *v_data = NULL;
	//v_data_h =>rtmp use it
	uint8_t *v_data_h = NULL;
	//real data
	uint8_t *v_data_r = NULL;
	//uint8_t *v_data   = NULL;
	size_t   v_len;
	uint32_t v_ts; // timestamp
	en_flv_header v_av_type = en_flv_null;
	void memory_create(uint32_t size, int header = 18)
	{
		//zero copy
		//the last reserve 4 bytes for flv write 4 bytes tail for av data size
		//the header 18 bytes for max rtmp use
		v_data = new uint8_t[size + header +4];
		v_len    = size; //not include the header and tail 
		//we do not know the head where
		//v_data_h = v_data;
		v_data_r = v_data + header;
		
	}

	void memory_create(uint32_t size, int header, int tail)
	{
		v_data = new uint8_t[size + header + tail];
		v_len = size; //not include the header and tail 
		v_data_h = v_data;
		v_data_r = v_data + header;
	}
	~s_memory()
	{
		if (v_data != NULL)
			delete[] v_data;
	}
}s_memory;

这边要做的就是在申请内存时多申请上头部和尾部,这样,使用的时候就可以在数据前面增加不同协议的数据头部。
所以是下面这句话

v_data = new uint8_t[size + header + tail];

读者自行理解就好
在这里插入图片描述
收到数据以后不进行任何的数据拷贝, 在缓冲数据前面加上数据头部,立刻发送出去,上图可以看到,rtmp协议和websocket flv 同时打开,vlc的rtmp协议稍稍会延后一点时间。两路内存占用如下图所示:

在这里插入图片描述
可以看到去除队列积压,内存占用比较小

多个线程需要修改头部的情况

如果使用多个线程,如何在各类协议之间共享数据呢,这是个问题,我们退而求其次,利用tcp 协议的特点,它是可以分开来发送批量数据,下图是使用websocket协议发送flv数据的示例,包括发送tag,taglen,data,datalen, 以及自身websocket发送的头部字节,分了三次发送,head和headlen 是实现websocket的头部而写。

/*
 sock      : need send socket
 data      : flv av data
 datalen   : flv av data len
*/
bool c_flvserver::func_set_head_send(tcp::socket &sock,
	uint8_t* tag, int taglen, uint8_t *data, size_t datalen,
	asio::yield_context &yield)
{
	uint8_t buffer[10];
	uint8_t *head = NULL;// buf;// 0x82;
	int headlen = 0;
	int totallen = taglen + datalen;
	if (totallen <= 65535)
	{
		if (totallen < 126)
		{
			head = &buffer[0] +8;
			//relen += 1;
			*head = 0x82; //0x81:1000 0001 text code ; // 1000 0010 binary code
			*(head + 1) = (uint8_t)totallen;
			headlen = 2;
		}
		else //>=126 <65536
		{
			head = &buffer[0] +6;
			*head = 0x82;
			*(head + 1) = 126;
			*(head + 2) = (uint8_t)((totallen >> 8) & 0xFF);
			*(head + 3) = (uint8_t)(totallen & 0xFF);
			headlen = 4;
		}
	}
	else //>65535
	{
		head = &buffer[0];
		*head = 0x82;
		*(head + 1) = 127;
		*(head + 2) = 0;   //>>56
		*(head + 3) = 0;   //>>48
		*(head + 4) = 0;// >>40
		*(head + 5) = 0; // >> 32;
		*(head + 6) = (uint8_t)(totallen >> 24);
		*(head + 7) = (uint8_t)(totallen >> 16);
		*(head + 8) = (uint8_t)(totallen >> 8);
		*(head + 9) = (uint8_t)(totallen & 0xFF);
		headlen = 10;
	}


	DEFINE_EC
	asio::async_write(sock, asio::buffer(head, headlen), yield[ec]);
	asio::async_write(sock, asio::buffer(tag, taglen), yield[ec]);
	asio::async_write(sock, asio::buffer(data, datalen), yield[ec]);
	//send the data
	//flv_const_buffer bb(frame, framelen,tag,taglen, data, dlen);
	//asio::async_write(sock, bb, yield[ec]);
	return ec? false : true;
}

这样在发送rtmp协议的时候,使用申请内存的多余头部空间,发送flv的时候 previous tag 长度四字节放在尾部,发送http协议的时候和flv类似,不需要发送websocket的头部字节,后面加上各类协议,比如rtsp 的tcp等等,也可以这样做,我们可以拷贝,但也可以不拷贝数据而进行零拷贝,零队列发送。

下面多打开几路观察内存

在这里插入图片描述

如下图所示:和刚才区别不大,小于10路内存都在1兆多以内
在这里插入图片描述

后面需要做的实现

实现更多的具体行业应用层服务和比较标准的协议输出, 将会做客户端发流,客户端收留,服务器对接,服务调用gpu等等,会比较谨慎。

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.coloradmin.cn/o/2185658.html

如若内容造成侵权/违法违规/事实不符,请联系多彩编程网进行投诉反馈,一经查实,立即删除!

相关文章

使用默认不可变的Rust变量会踩什么坑

讲动人的故事&#xff0c;写懂人的代码 Rust的变量真的是名不副实。名字中明明有个“变”字&#xff0c;却默认不可变。还美其名曰“不可变变量”。要想让变量名副其实&#xff0c;还必须费心额外加个mut关键字&#xff0c;并必须称其为“可变变量”&#xff0c;才能与前者区分…

对比学习与图像去雾在24TIP和CVPR经典图像去雾论文中的良好结合,展示出模型良好的泛化能力(本期内容较详细 多)

今天主要内容是图像去雾&#xff0c;对比学习&#xff0c;无监督学习&#xff0c;要分享的三篇论文分别是&#xff1a; 21CVPR Contrastive Learning for Compact Single Image Dehazing 24 TIP UCL-Dehaze: Toward Real-World Image Dehazing via Unsupervised Contrastive Le…

stm32f103调试,程序与定时器同步设置

在调试定时器相关代码时&#xff0c;注意到定时器的中断位总是置1&#xff0c;怀疑代码有问题&#xff0c;经过增大定时器的中断时间&#xff0c;发现定时器与代码调试并不同步&#xff0c;这一点对于调试涉及定时器的代码是非常不利的&#xff0c;这里给出keil调试stm32使定时…

HTB:Vaccine[WriteUP]

目录 连接至HTB服务器并启动靶机 1.Besides SSH and HTTP, what other service is hosted on this box? 2.This service can be configured to allow login with any password for specific username. What is that username? 3.What is the name of the file downloaded…

Kafka和RabbitMQ区别

RabbitMQ的消息延迟是微秒级&#xff0c;Kafka是毫秒级&#xff08;1毫秒1000微秒&#xff09; 延迟消息是指生产者发送消息发送消息后&#xff0c;不能立刻被消费者消费&#xff0c;需要等待指定的时间后才可以被消费。 Kafka的单机呑吐量是十万级&#xff0c;RabbitMQ是万级…

10.4 Linux_并发_线程

概述 线程的共享资源&#xff1a; 可执行的指令、静态数据、文件描述符、当前工作目录、用户ID、用户组ID 线程的私有资源&#xff1a; 线程ID、程序计数器PC和相关寄存器、堆栈、错误号、优先级、执行状态和属性 线程编译&#xff1a; gcc <.c文件> -l pthread -o…

数据集-目标检测系列- 螃蟹 检测数据集 crab >> DataBall

数据集-目标检测系列- 螃蟹 检测数据集 crab >> DataBall 数据集-目标检测系列- 螃蟹 检测数据集 crab >> DataBall 数据量&#xff1a;3k 想要进一步了解&#xff0c;请联系。 DataBall 助力快速掌握数据集的信息和使用方式&#xff0c;会员享有 百种数据集&a…

加密与安全_TOTP 一次性密码生成算法

文章目录 PreTOTP是什么TOTP 算法工作原理TOTP 生成公式TOTP 与 HOTP 的对比Code生成TOTP验证 TOTP使用场景小结 TOTP 与 HOTP 的主要区别TOTP 与 HOTP应用场景比较TOTP 与 HOTP安全性分析 Pre 加密与安全_HTOP 一次性密码生成算法 https://github.com/samdjstevens/java-tot…

YOLO11改进|卷积篇|引入可变核卷积AKConv

目录 一、AKConv卷积1.1AKConv卷积介绍1.2MLCA核心代码 五、添加MLCA注意力机制5.1STEP15.2STEP25.3STEP35.4STEP4 六、yaml文件与运行6.1yaml文件6.2运行成功截图 一、AKConv卷积 1.1AKConv卷积介绍 AKConv允许卷积参数的数量以线性方式增加或减少&#xff0c;而不是传统的平…

C# 表达式与运算符

本课要点&#xff1a; 1、表达式的基本概念 2、常用的几种运算符 3、运算符的优先级 4、常见问题 一 表达式 表达式是由运算符和操作数组成的。、-、*和/等都是运算符&#xff0c;操作数包括文本、常量、变量和表达式等。 二 算术运算符 2.1 算术运算符的使用 三 常见错误 …

Cocotb 学习记录--V01

1. Windows 下安装cocotb pip install cocotb 其他参考&#xff1a; 1.Welcome to cocotb’s documentation! — cocotb 1.9.1 documentation

【Koa】文件上传

主要使用两个 koa 插件&#xff0c;koa-body 里面自带文件上传功能&#xff0c;还有一个 koa-staitc 用于配置静态资源目录&#xff08;可以通过路径直接访问图片&#xff09;。 router const Router require(koa/router); const {upload} require(../controller/user);con…

Spring之生成Bean

Bean的生命周期&#xff1a;实例化->属性填充->初始化->销毁 核心入口方法&#xff1a;finishBeanFactoryInitialization-->preInstantiateSingletons DefaultListableBeanFactory#preInstantiateSingletons用于实例化非懒加载的bean。 1.preInstantiateSinglet…

【RADARSAT Constellation Mission(RCM)卫星星座简介】

RADARSAT Constellation Mission&#xff08;RCM&#xff09;卫星星座是加拿大太空局&#xff08;CSA&#xff09;的下一代C波段合成孔径雷达&#xff08;SAR&#xff09;卫星星座&#xff0c;以下是对其的详细介绍&#xff1a; 一、基本信息 发射时间&#xff1a;2019年6月…

Golang | Leetcode Golang题解之第452题用最少数量的箭引爆气球

题目&#xff1a; 题解&#xff1a; func findMinArrowShots(points [][]int) int {if len(points) 0 {return 0}sort.Slice(points, func(i, j int) bool { return points[i][1] < points[j][1] })maxRight : points[0][1]ans : 1for _, p : range points {if p[0] > …

秒懂Linux之线程

目录 线程概念 线程理解 地址空间&#xff08;页表&#xff0c;内存&#xff0c;虚拟地址&#xff09; 线程的控制 铺垫 线程创建 ​编辑 线程等待 线程异常 线程终止 代码 线程优点 线程缺点 线程特点 线程概念 线程是进程内部的一个执行分支&#xff0c;线程是C…

小程序-全局数据共享

目录 1.什么是全局数据共享 2. 小程序中的全局数据共享方案 MboX 1. 安装 MobX 相关的包 2. 创建 MobX 的 Store 实例 3. 将 Store 中的成员绑定到页面中 4. 在页面上使用 Store 中的成员 5. 将 Store 中的成员绑定到组件中 6. 在组件中使用 Store 中的成员 1.什么是全…

【LeetCode】每日一题 2024_10_2 准时到达的列车最小时速(二分答案)

前言 每天和你一起刷 LeetCode 每日一题~ 大家国庆节快乐呀~ LeetCode 启动&#xff01; 题目&#xff1a;准时到达的列车最小时速 代码与解题思路 今天这道题是经典的二分答案&#xff0c;结合这道题来讲就是&#xff0c;二分列车的速度 我最擅长的两个算法&#xff1a;一…

cGANs with Projection Discriminator

基于映射鉴别器的CGAN 模型中&#xff0c;判别器&#xff08;Discriminator&#xff09;不是通过将条件信息简单地与特征向量拼接&#xff08;concatenate&#xff09;来使用条件信息&#xff0c;而是采用一种基于投影的方式&#xff0c;这种方式更加尊重条件信息在底层概率模…

进程通信——内存映射

进程通信——内存映射 什么是内存映射 内存映射是一种将文件内容映射到进程地址空间的技术&#xff0c;使得进程可以直接访问文件内容&#xff0c;而不需要通过系统调用进行读写操作。内存映射可以提高文件访问的效率&#xff0c;并且可以实现进程间的通信。 内存映射的原理…