GB28181学习(十七)——基于jrtplib实现tcp被动和主动发流

news2025/1/18 4:51:16

前言

GB/T28181-2022实时流的传输方式介绍:https://blog.csdn.net/www_dong/article/details/134255185

基于jrtplib实现tcp被动和主动收流介绍:https://blog.csdn.net/www_dong/article/details/134451387

本文主要介绍下级平台或设备发流功能,用于对接特定的SIP服务器或上级平台。

UDP发流

流程图

在这里插入图片描述

发送端流程

  • 初始化rtp参数;
  • 裸流数据做PS复用;
  • 组RTP包发送;

设计

  1. 初始化rtp参数
int CUdp::InitRtp_()
{
	RTPSessionParams sessionParams;
	sessionParams.SetMinimumRTCPTransmissionInterval(10);
	sessionParams.SetOwnTimestampUnit(1.0 / 90000.0);
	sessionParams.SetAcceptOwnPackets(true);
	sessionParams.SetMaximumPacketSize(1450);

	RTPUDPv4TransmissionParams transParams;
	transParams.SetRTPSendBuffer(2*1024*1024);
	transParams.SetBindIP(m_ip);
	transParams.SetPortbase((uint16_t)m_port);

	if (0 != Create(sessionParams, &transParams))
	{
		return -1;
	}

	SetDefaultPayloadType((uint8_t)m_payload);
	SetDefaultTimestampIncrement(3600);
	SetDefaultMark(true);

	RTPIPv4Address addr(ntohl(inet_addr(m_ip), (uint16_t)m_port);
	if(0 != AddDestination(addr))
    {
		return -1;
	}

	return 0;
}
  1. 流数据复用为PS
// 使用ireader开源库进行ps复用
// 初始化
CData2PS::CData2PS()
{
	struct ps_muxer_func_t func;
	func.alloc = Alloc;
	func.free = Free;
	func.write = Packet;
	m_ps = ps_muxer_create(&func, this);
	// TODO codecid待补充
	m_ps_stream = ps_muxer_add_stream(m_ps, PSI_STREAM_H264, nullptr, 0);
}


// 塞数据
int CData2PS::InputData(void* data, int len)
{
	if (!m_ps)
		return -1;

	uint64_t clock = time64_now();
	if (0 == m_ps_clock)
		m_ps_clock = clock;

	return ps_muxer_input(m_ps, m_ps_stream, 0, (clock - m_ps_clock) * 90, (clock - m_ps_clock) * 90, data, len);
}
  1. 发送rtp包
// 调用jrtplib中SendPacket(data, len);接口发送数据

// 以下为SendPacket部分源码
// 主要流程:
// 1. 构建packet
// 2. 发送rtp数据
int RTPSession::SendPacket(const void *data,size_t len,
                uint8_t pt,bool mark,uint32_t timestampinc)
{
	int status;

	if (!created)
		return ERR_RTP_SESSION_NOTCREATED;
	
	BUILDER_LOCK
	if ((status = packetbuilder.BuildPacket(data,len,pt,mark,timestampinc)) < 0)
	{
		BUILDER_UNLOCK
		return status;
	}
	if ((status = SendRTPData(packetbuilder.GetPacket(),packetbuilder.GetPacketLength())) < 0)
	{
		BUILDER_UNLOCK
		return status;
	}
	BUILDER_UNLOCK
	
	SOURCES_LOCK
	sources.SentRTPPacket();
	SOURCES_UNLOCK
	PACKSENT_LOCK
	sentpackets = true;
	PACKSENT_UNLOCK
	return 0;
}

// 构建包
int RTPPacketBuilder::PrivateBuildPacket(const void *data,size_t len,
	                  uint8_t pt,bool mark,uint32_t timestampinc,bool gotextension,
	                  uint16_t hdrextID,const void *hdrextdata,size_t numhdrextwords)
{
	RTPPacket p(pt,data,len,seqnr,timestamp,ssrc,mark,numcsrcs,csrcs,gotextension,hdrextID,
	            (uint16_t)numhdrextwords,hdrextdata,buffer,maxpacksize,GetMemoryManager());
	int status = p.GetCreationError();

	if (status < 0)
		return status;
	packetlength = p.GetPacketLength();

	if (numpackets == 0) // first packet
	{
		lastwallclocktime = RTPTime::CurrentTime();
		lastrtptimestamp = timestamp;
		prevrtptimestamp = timestamp;
	}
	else if (timestamp != prevrtptimestamp)
	{
		lastwallclocktime = RTPTime::CurrentTime();
		lastrtptimestamp = timestamp;
		prevrtptimestamp = timestamp;
	}
	
	numpayloadbytes += (uint32_t)p.GetPayloadLength();
	numpackets++;
	timestamp += timestampinc;
	seqnr++;

	return 0;
}

// 发送包
int RTPSession::SendRTPData(const void *data, size_t len)
{
	if (!m_changeOutgoingData)
		return rtptrans->SendRTPData(data, len);

	void *pSendData = 0;
	size_t sendLen = 0;
	int status = 0;

	status = OnChangeRTPOrRTCPData(data, len, true, &pSendData, &sendLen);
	if (status < 0)
		return status;

	if (pSendData)
	{
		status = rtptrans->SendRTPData(pSendData, sendLen);
		OnSentRTPOrRTCPData(pSendData, sendLen, true);
	}

	return status;
}

// 底层实现
int RTPUDPv4Transmitter::SendRTPData(const void *data,size_t len)	
{
	if (!init)
		return ERR_RTP_UDPV4TRANS_NOTINIT;

	MAINMUTEX_LOCK
	
	if (!created)
	{
		MAINMUTEX_UNLOCK
		return ERR_RTP_UDPV4TRANS_NOTCREATED;
	}
	if (len > maxpacksize)
	{
		MAINMUTEX_UNLOCK
		return ERR_RTP_UDPV4TRANS_SPECIFIEDSIZETOOBIG;
	}
	
	destinations.GotoFirstElement();
	while (destinations.HasCurrentElement())
	{
        // 调用sendto函数实现udp包的发送
		sendto(rtpsock,(const char *)data,len,0,(const struct sockaddr *)destinations.GetCurrentElement().GetRTPSockAddr(),sizeof(struct sockaddr_in));
		destinations.GotoNextElement();
	}
	
	MAINMUTEX_UNLOCK
	return 0;
}

tcp passive发流

流程图

在这里插入图片描述

发送端流程:

  • 上级平台或sip服务器以主动方式连接,对于下级平台或者设备(数据发送端)为被动方式;
  • 下级平台或者设备(数据发送端)启动端口监听;
  • 接收上级平台或sip服务器tcp连接请求;
  • 向上级平台或sip服务器发送流数据;

设计

  1. 创建socket、bind、listen,启动数据接收线程;
// TcpServer为封装的socket类

int CGBTcpServer::Start()
{
	if (0 != m_localPort || m_tcpServer.get())
		return 0;

	int ret = -1;
	do 
	{
		m_tcpServer = std::make_shared<TcpServer>(nullptr, this);
		if (!m_tcpServer.get())
			break;

		ret = m_tcpServer->TcpCreate();
		if (0 != ret)
			break;

		ret = m_tcpServer->TcpBind(m_localPort);
		if (0 != ret)
			break;

		ret = m_tcpServer->TcpListen(5);
		if (0 != ret)
			break;

		m_thread = std::thread(TCPData2PSThread, this);
		return 0;
	} while (0);

	Stop();
	return ret;
}
  1. 在线程内等待连接,连接成功后接收数据并回调至应用层处理
void CGBTcpServer::TCPData2PSWorker()
{
	if (!m_pspacker)
		m_pspacker = new(std::nothrow) CData2PS(PSTCPDataCB, this);

	bool bAccept = false;
	while (m_running)
	{
		if (!bAccept)
		{
			if (0 == m_tcpServer->TcpAccept())
			{
				bAccept = true;
				if (0 != InitRtp_())
				{
					break;
				}
			}

			continue;
		}

		std::this_thread::sleep_for(std::chrono::seconds(1));
	}
}
  1. 初始化rtp参数
int CGBTcpServer::InitRtp_()
{
	const int packetSize = 45678;
	RTPSessionParams sessionparams;
	sessionparams.SetProbationType(RTPSources::NoProbation);
	sessionparams.SetOwnTimestampUnit(1.0 / packetSize);
	sessionparams.SetMaximumPacketSize(packetSize + 64);

	m_rtpTcpTransmitter = new RTPTCPTransmitter(nullptr);
	m_rtpTcpTransmitter->Init(true);
	m_rtpTcpTransmitter->Create(65535, 0);

	int status = Create(sessionparams, m_rtpTcpTransmitter);
	if (status < 0)
	{
		return status;
	}

	status = AddDestination(RTPTCPAddress(m_tcpServer->GetClientSocket()));
	if (0 != status)
		return status;

	SetDefaultPayloadType(96);
	SetDefaultMark(false);
	SetDefaultTimestampIncrement(160);
	return 0;
}
  1. 将数据复用为PS;
  2. tcp方式发包
// 调用jrtplib中SendPacket(data, len);接口发送数据

// 以下为tcp方式SendPacket部分源码
int RTPTCPTransmitter::SendRTPData(const void *data,size_t len)	
{
	return SendRTPRTCPData(data, len);
}

int RTPTCPTransmitter::SendRTPRTCPData(const void *data, size_t len)
{
	if (!m_init)
		return ERR_RTP_TCPTRANS_NOTINIT;

	MAINMUTEX_LOCK
	
	if (!m_created)
	{
		MAINMUTEX_UNLOCK
		return ERR_RTP_TCPTRANS_NOTCREATED;
	}
    
    // #define RTPTCPTRANS_MAXPACKSIZE							65535
	if (len > RTPTCPTRANS_MAXPACKSIZE)
	{
		MAINMUTEX_UNLOCK
		return ERR_RTP_TCPTRANS_SPECIFIEDSIZETOOBIG;
	}
	
	std::map<SocketType, SocketData>::iterator it = m_destSockets.begin();
	std::map<SocketType, SocketData>::iterator end = m_destSockets.end();

	vector<SocketType> errSockets;
	int flags = 0;
#ifdef RTP_HAVE_MSG_NOSIGNAL
	flags = MSG_NOSIGNAL;
#endif // RTP_HAVE_MSG_NOSIGNAL

	while (it != end)
	{
		uint8_t lengthBytes[2] = { (uint8_t)((len >> 8)&0xff), (uint8_t)(len&0xff) };
		SocketType sock = it->first;

        // 调用send接口发送数据
        // 1. 先发送2字节头(固定格式)
        // 2. 再发送数据
		if (send(sock,(const char *)lengthBytes,2,flags) < 0 ||
			send(sock,(const char *)data,len,flags) < 0)
			errSockets.push_back(sock);
		++it;
	}
	
	MAINMUTEX_UNLOCK

	if (errSockets.size() != 0)
	{
		for (size_t i = 0 ; i < errSockets.size() ; i++)
			OnSendError(errSockets[i]);
	}

	// Don't return an error code to avoid the poll thread exiting
	// due to one closed connection for example

	return 0;
}

tcp active发流

流程图

在这里插入图片描述

发送端流程:

  • 上级平台或sip服务器启动tcp监听连接,对于下级平台或者设备(数据发送端)为主动方式;
  • 下级平台或者设备(数据发送端)发起tcp连接;
  • 接收上级平台或sip服务器tcp响应;
  • 向上级平台或sip服务器发送流数据;

设计

  1. 创建socket、connect、初始化rtp,启动数据接收线程
// TcpClient为封装的客户端socket类

int CGBTcpClient::Start()
{
	if (0 != m_localPort || m_tcpClient.get())
		return 0;

	int ret = -1;
	do
	{
		m_tcpClient = std::make_shared<TcpClient>(nullptr, this);
		if (!m_tcpClient.get() || 0 != m_tcpClient->TcpCreate())
			break;

		ret = m_tcpClient->TcpConnectByTime(m_localIP.c_str(), m_localPort, 5);
		if (0 != ret)
			break;

		ret = InitRtp_();
		if (0 != ret)
			break;

		m_thread = std::thread(RTPPackerThread, this);
		return 0;
	} while (0);

	Stop();
	return ret;
}
  1. 初始化rtp参数
int CGBTcpClient::InitRtp_()
{
	const int packSize = 45678;
	RTPSessionParams sessionParams;
	sessionParams.SetProbationType(RTPSources::NoProbation);
	sessionParams.SetOwnTimestampUnit(90000.0 / 25.0);
	sessionParams.SetMaximumPacketSize(packSize + 64);

	m_rtpTcpTransmitter = new RTPTCPTransmitter(nullptr);
	m_rtpTcpTransmitter->Init(true);
	m_rtpTcpTransmitter->Create(65535, 0);

	if (0 != Create(sessionParams, m_rtpTcpTransmitter))
		return -1;

	if (0 != AddDestination(RTPTCPAddress(m_tcpClient->GetClientSocket())))
		return -1;

	return 0;
}
  1. 视音频数据复用为PS
  2. 发送数据,同tcp passive发流

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.coloradmin.cn/o/1239373.html

如若内容造成侵权/违法违规/事实不符,请联系多彩编程网进行投诉反馈,一经查实,立即删除!

相关文章

微信小游戏上线流程

微信小游戏上线是一个需要经过一系列步骤的过程。以下是一个一般性的微信小游戏上线流程&#xff0c;请注意&#xff0c;上述步骤可能会有微信平台的政策和规定的变化&#xff0c;因此建议在开发过程中及时查阅微信小游戏的官方文档和最新政策。北京木奇移动技术有限公司&#…

DB2—03(DB2中常见基础操作)

DB2—03&#xff08;DB2中常见基础操作&#xff09; 1. 前言1.1 oracle和mysql相关 2. db2中的"dual"2.1 SYSIBM.SYSDUMMY12.2 使用VALUES2.3 SYSIBM.SYSDUMMY1 "变" dual 3. db2中常用函数3.1 nvl()、value()、COALESCE()3.2 NULLIF() 函数3.3 LISTAGG() …

含分布式电源的配电网可靠性评估matlab程序

微❤关注“电气仔推送”获得资料&#xff08;专享优惠&#xff09; 参考文献&#xff1a; 基于仿射最小路法的含分布式电源配电网可靠性分析——熊小萍 主要内容&#xff1a; 通过概率模型和时序模型分别进行建模&#xff0c;实现基于概率模型最小路法的含分布式电源配电网…

Python BDD之Behave测试报告

behave 本身的测试报告 behave 本身提供了四种报告格式&#xff1a; pretty&#xff1a;这是默认的报告格式&#xff0c;提供颜色化的文本输出&#xff0c;每个测试步骤的结果都会详细列出。plain&#xff1a;这也是一种文本格式的报告&#xff0c;但没有颜色&#xff0c;并且…

【C++】泛型编程 ⑫ ( 类模板 static 关键字 | 类模板 static 静态成员 | 类模板使用流程 )

文章目录 一、类模板使用流程1、类模板 定义流程2、类模板 使用3、类模板 函数 外部实现 二、类模板 static 关键字1、类模板 static 静态成员2、类模板 static 关键字 用法3、完整代码示例 将 类模板 函数声明 与 函数实现 分开进行编码 , 有 三种 方式 : 类模板 的 函数声明…

超详细!新手必看!STM32-通用定时器简介与知识点概括

一、通用定时器的功能 在基本定时器功能的基础上新增功能&#xff1a; 通用定时器有4个独立通道&#xff0c;且每个通道都可以用于下面功能。 &#xff08;1&#xff09;输入捕获&#xff1a;测量输入信号的周期和占空比等。 &#xff08;2&#xff09;输出比较&#xff1a;产…

电大搜题——让学习变得轻松高效

作为一名现代学者&#xff0c;您一定时刻关注着教育领域的进展和创新。今天&#xff0c;我将向大家介绍一个名为“电大搜题”的神奇工具&#xff0c;它将为您的学习之路带来一场完美的革命。 在快节奏的现代社会中&#xff0c;学习已经成为每个人追求成功的必经之路。然而&…

Linux超简单部署个人博客

1 安装halo 1.1 切换到超级用户 sudo -i 1.2 新建halo文件夹 mkdir ~/halo && cd ~/halo 1.3 编辑docker-compose.yml文件 vim ~/halo/docker-compose.yml 英文输入法下&#xff0c;按 i version: "3"services:halo:image: halohub/halo:2.10container_…

2023年【起重机械指挥】考试题及起重机械指挥找解析

题库来源&#xff1a;安全生产模拟考试一点通公众号小程序 起重机械指挥考试题考前必练&#xff01;安全生产模拟考试一点通每个月更新起重机械指挥找解析题目及答案&#xff01;多做几遍&#xff0c;其实通过起重机械指挥作业考试题库很简单。 1、【多选题】按照事故造成的人…

新手必看!!附源码!!STM32通用定时器输出PWM

一、什么是PWM? PWM&#xff08;脉冲宽度调制&#xff09;是一种用于控制电子设备的技术。它通过调整信号的脉冲宽度来控制电压的平均值。PWM常用于调节电机速度、控制LED亮度、产生模拟信号等应用。 二、PWM的原理 PWM的基本原理是通过以一定频率产生的脉冲信号&#xff0…

Java 编码

编码: 加密: 通过加密算法和密钥进行 也可通过码表进行加密 对称加密: 缺点:可被截获 元数据---加密算法密钥密文 ----> 解密算法密钥元数据 算法:DES(短 56位),AES(长 128位)破解时间加长 非对称加密: 元数据-加密算法加密密钥 密文 --->加密算法解密密钥元数据 …

亚马逊买家号用邮箱怎么注册

想要用邮箱注册亚马逊买家号&#xff0c;那么准备好能接受验证码的邮箱后打开相应的亚马逊官网即可。打开官网后点击注册——输入昵称——输入邮箱——输入密码——接受邮箱验证码并输入&#xff0c;如果遇到需要手机号验证就输入手机号&#xff0c;如果不需要验证&#xff0c;…

Mac M1 安装Docker打包arm64的python项目的镜像包

1、首先安装Docker&#xff0c;到官网下载&#xff0c;选择apple chip版 Docker中文网 官网 2、双击下载的dmg文件&#xff0c;在弹出框中之间拖拽到右边 3、打开docker&#xff0c;修改国内镜像源&#xff0c;位置在配置-DockerEngine "registry-mirrors": ["…

2023年,人工智能在医疗行业领域的应用场景

本期行业洞察将带领大家了解人工智能在医疗行业领域的应用&#xff0c;主要了解在患者治疗和运营中的应用、人工智能作为预防工具以及大型医院目前如何使用人工智能。未来的智慧医疗时代已经悄然到来。 人工智能在患者治疗和机构运营中的应用 人工智能有望彻底改变医疗护理的…

USB驱动开发基础

USB标准 USB1.0&#xff0c; 1996&#xff0c;低速1.5Mbps和高速12Mbps&#xff0c;USB1.1 iMac G3&#xff0c;Type A和Type B接口USB 2.0 2000&#xff0c; 480Mpbs&#xff0c;Type A/B/C接口、Micro A/BUSB 3.0 5Gbps, 随着USB 3.2命名规定&#xff0c;现在也叫USB 3.2 Ge…

羊大师提示,羊奶都有哪些惊人功效?

羊奶不仅是一种美味的健康饮品&#xff0c;在近年来备受瞩目的的健康圈子里&#xff0c;羊奶还被赋予了更多的功效&#xff0c;成为一种备受推崇的保健品。羊奶不但富含营养&#xff0c;而且还有着非常多的益处&#xff0c;它能够用来美容、保健&#xff0c;甚至还可以治疗某些…

Epub书籍阅读工具

Epub书籍阅读工具 前言WIndows总结Neat ReaderAquile ReaderWPS Android总结Neat Reader掌阅 前言 Epub文件为电子书文件格式&#xff0c;此格式的电子书相比txt书籍&#xff0c;增加了目录跳转功能&#xff0c;并可以显示图片。本文介绍WIndows和Android端的epub书籍阅读工具…

Bug等级划分

Bug是指在程序或系统中存在的错误、缺陷或异常&#xff0c;是由于编码错误、设计问题、逻辑错误或其他因素导致的。 常见的Bug分类方法 功能性Bug与软件的功能有关&#xff0c;软件无法正常工作、功能与需求不符或功能执行不正确。 用户界面Bug与软件的用户界面有关&#xff…

C++电脑组装项目(涉及知识点:多态)

需求&#xff1a; #include <iostream> #include "Computer.h" #include "AbstractCpu.h" #include "AbstractMemory.h" #include "AbstractVideoCard.h" #include "IntelCpu.h" #include "IntelMemory.h" …

langchain 部署组件-LangServe

原文&#xff1a;&#x1f99c;️&#x1f3d3; LangServe | &#x1f99c;️&#x1f517; Langchain LangServe &#x1f6a9; We will be releasing a hosted version of LangServe for one-click deployments of LangChain applications. Sign up here to get on the wa…