目录
- 1. RTMP_ConnectStream函数
- 1.1 读取packet(RTMP_ReadPacket)
- 1.2 解析packet(RTMP_ClientPacket)
- 1.2.1 设置Chunk Size(HandleChangeChunkSize)
- 1.2.2 用户控制信息(HandleCtrl)
- 1.2.3 设置应答窗口大小(HandleServerBW)
- 1.2.4 设置对端带宽(HandleClientBW)
- 1.2.5 音频数据(HandleAudio)
- 1.2.6 视频数据(HandleVideo)
- 1.2.7 元数据(HandleMetadata)
- 1.2.8 命令消息(HandleInvoke)
- 2.小结
RTMP协议相关:
【流媒体】RTMP协议概述
【流媒体】RTMP协议的数据格式
【流媒体】RTMP协议的消息类型
【流媒体】RTMPDump—主流程简单分析
【流媒体】RTMPDump—RTMP_Connect函数(握手、网络连接)
参考雷博的系列文章(可以从一篇链接到其他文章):
RTMPdump 源代码分析 1: main()函数
1. RTMP_ConnectStream函数
RTMP_ConnectStream()的作用是建立流连接,先回顾一下RTMP标准文档当中是如何进行流的连接的,以client向server发送play命令为例,流程图如下所示。从流程中看,在进行了握手和RTMP连接之后,由client向server发送一个命令 “createStream”,随后由server返回一个命令消息 _result,表示对这个 “createStream” 的反馈。随后进行play命令
RTMP实现 “createStream” 这条命令的函数为RTMP_ConnectStream(),这个函数的实现比较简单,主要有两个步骤:
(1)读取packet(RTMP_ReadPacket)
(2)解析packet(RTMP_ClientPacket)
int
RTMP_ConnectStream(RTMP * r, int seekTime)
{
RTMPPacket packet = { 0 };
/* seekTime was already set by SetupStream / SetupURL.
* This is only needed by ReconnectStream.
*/
if (seekTime > 0)
r->Link.seekTime = seekTime;
r->m_mediaChannel = 0;
// 1.读取packet
while (!r->m_bPlaying && RTMP_IsConnected(r) && RTMP_ReadPacket(r, &packet))
{
if (RTMPPacket_IsReady(&packet))
{
if (!packet.m_nBodySize)
continue;
if ((packet.m_packetType == RTMP_PACKET_TYPE_AUDIO) ||
(packet.m_packetType == RTMP_PACKET_TYPE_VIDEO) ||
(packet.m_packetType == RTMP_PACKET_TYPE_INFO))
{
RTMP_Log(RTMP_LOGWARNING, "Received FLV packet before play()! Ignoring.");
RTMPPacket_Free(&packet);
continue;
}
// 2.解析packet
RTMP_ClientPacket(r, &packet);
RTMPPacket_Free(&packet);
}
}
return r->m_bPlaying;
}
1.1 读取packet(RTMP_ReadPacket)
RTMP_ReadPacket()函数的实现如下
int
RTMP_ReadPacket(RTMP * r, RTMPPacket * packet)
{
uint8_t hbuf[RTMP_MAX_HEADER_SIZE] = { 0 };
char* header = (char*)hbuf;
int nSize, hSize, nToRead, nChunk;
int didAlloc = FALSE;
int extendedTimestamp;
RTMP_Log(RTMP_LOGDEBUG2, "%s: fd=%d", __FUNCTION__, r->m_sb.sb_socket);
// 读取packet的第1个字节,即basic header
if (ReadN(r, (char*)hbuf, 1) == 0)
{
RTMP_Log(RTMP_LOGERROR, "%s, failed to read RTMP packet header", __FUNCTION__);
return FALSE;
}
// fmt
packet->m_headerType = (hbuf[0] & 0xc0) >> 6;
// chunk stream id (cs_id)
packet->m_nChannel = (hbuf[0] & 0x3f);
header++;
// 第1字节后6位为0,说明basic header size为2字节
if (packet->m_nChannel == 0)
{
if (ReadN(r, (char*)& hbuf[1], 1) != 1)
{
RTMP_Log(RTMP_LOGERROR, "%s, failed to read RTMP packet header 2nd byte",
__FUNCTION__);
return FALSE;
}
packet->m_nChannel = hbuf[1];
packet->m_nChannel += 64;
header++;
}
else if (packet->m_nChannel == 1) // 第1字节后6位为1,说明basic header size为3字节
{
int tmp;
if (ReadN(r, (char*)& hbuf[1], 2) != 2)
{
RTMP_Log(RTMP_LOGERROR, "%s, failed to read RTMP packet header 3nd byte",
__FUNCTION__);
return FALSE;
}
tmp = (hbuf[2] << 8) + hbuf[1];
packet->m_nChannel = tmp + 64; // 计算cs_id
RTMP_Log(RTMP_LOGDEBUG, "%s, m_nChannel: %0x", __FUNCTION__, packet->m_nChannel);
header += 2;
}
// 计算message header size
nSize = packetSize[packet->m_headerType];
// cs_id大于已分配的,需要进行重新分配
if (packet->m_nChannel >= r->m_channelsAllocatedIn)
{
int n = packet->m_nChannel + 10;
int* timestamp = realloc(r->m_channelTimestamp, sizeof(int) * n);
RTMPPacket** packets = realloc(r->m_vecChannelsIn, sizeof(RTMPPacket*) * n);
if (!timestamp)
free(r->m_channelTimestamp);
if (!packets)
free(r->m_vecChannelsIn);
r->m_channelTimestamp = timestamp;
r->m_vecChannelsIn = packets;
if (!timestamp || !packets) {
r->m_channelsAllocatedIn = 0;
return FALSE;
}
memset(r->m_channelTimestamp + r->m_channelsAllocatedIn, 0, sizeof(int) * (n - r->m_channelsAllocatedIn));
memset(r->m_vecChannelsIn + r->m_channelsAllocatedIn, 0, sizeof(RTMPPacket*) * (n - r->m_channelsAllocatedIn));
r->m_channelsAllocatedIn = n;
}
// 如果获取到整个header信息,timestamp是绝对值
if (nSize == RTMP_LARGE_HEADER_SIZE) /* if we get a full header the timestamp is absolute */
packet->m_hasAbsTimestamp = TRUE;
else if (nSize < RTMP_LARGE_HEADER_SIZE)
{ /* using values from the last message of this channel */
if (r->m_vecChannelsIn[packet->m_nChannel])
memcpy(packet, r->m_vecChannelsIn[packet->m_nChannel],
sizeof(RTMPPacket));
}
nSize--; // {11, 7, 3, 0}
// 读取RTMP的message header
if (nSize > 0 && ReadN(r, header, nSize) != nSize)
{
RTMP_Log(RTMP_LOGERROR, "%s, failed to read RTMP packet header. type: %x",
__FUNCTION__, (unsigned int)hbuf[0]);
return FALSE;
}
hSize = nSize + (header - (char*)hbuf);
// 下面根据不同格式的message header来解析字段
if (nSize >= 3)
{
// 解析timestam
packet->m_nTimeStamp = AMF_DecodeInt24(header);
/*RTMP_Log(RTMP_LOGDEBUG, "%s, reading RTMP packet chunk on channel %x, headersz %i, timestamp %i, abs timestamp %i", __FUNCTION__, packet.m_nChannel, nSize, packet.m_nTimeStamp, packet.m_hasAbsTimestamp); */
if (nSize >= 6)
{
// 解析message length
packet->m_nBodySize = AMF_DecodeInt24(header + 3);
packet->m_nBytesRead = 0;
if (nSize > 6)
{
// 解析message type id
packet->m_packetType = header[6];
if (nSize == 11) // 解析message stream id
packet->m_nInfoField2 = DecodeInt32LE(header + 7);
}
}
}
// 检查是否有扩展时间戳,如果有则读取
extendedTimestamp = packet->m_nTimeStamp == 0xffffff;
if (extendedTimestamp)
{
if (ReadN(r, header + nSize, 4) != 4)
{
RTMP_Log(RTMP_LOGERROR, "%s, failed to read extended timestamp",
__FUNCTION__);
return FALSE;
}
packet->m_nTimeStamp = AMF_DecodeInt32(header + nSize);
hSize += 4;
}
RTMP_LogHexString(RTMP_LOGDEBUG2, (uint8_t*)hbuf, hSize);
if (packet->m_nBodySize > 0 && packet->m_body == NULL)
{
if (!RTMPPacket_Alloc(packet, packet->m_nBodySize))
{
RTMP_Log(RTMP_LOGDEBUG, "%s, failed to allocate packet", __FUNCTION__);
return FALSE;
}
didAlloc = TRUE;
packet->m_headerType = (hbuf[0] & 0xc0) >> 6;
}
// 剩余需要读取的字节数
nToRead = packet->m_nBodySize - packet->m_nBytesRead;
nChunk = r->m_inChunkSize;
if (nToRead < nChunk)
nChunk = nToRead;
// 是否需要将原始chunk拷贝
/* Does the caller want the raw chunk? */
if (packet->m_chunk)
{
packet->m_chunk->c_headerSize = hSize;
memcpy(packet->m_chunk->c_header, hbuf, hSize);
packet->m_chunk->c_chunk = packet->m_body + packet->m_nBytesRead;
packet->m_chunk->c_chunkSize = nChunk;
}
// 获取body的信息
if (ReadN(r, packet->m_body + packet->m_nBytesRead, nChunk) != nChunk)
{
RTMP_Log(RTMP_LOGERROR, "%s, failed to read RTMP packet body. len: %u",
__FUNCTION__, packet->m_nBodySize);
return FALSE;
}
RTMP_LogHexString(RTMP_LOGDEBUG2, (uint8_t*)packet->m_body + packet->m_nBytesRead, nChunk);
packet->m_nBytesRead += nChunk;
// 保留该数据包作为该通道上其他数据包的参考
/* keep the packet as ref for other packets on this channel */
if (!r->m_vecChannelsIn[packet->m_nChannel])
r->m_vecChannelsIn[packet->m_nChannel] = malloc(sizeof(RTMPPacket));
memcpy(r->m_vecChannelsIn[packet->m_nChannel], packet, sizeof(RTMPPacket));
if (extendedTimestamp)
{
r->m_vecChannelsIn[packet->m_nChannel]->m_nTimeStamp = 0xffffff;
}
// 当前packet所有信息都读取到了,拷贝时间戳并且将当前packet重置
if (RTMPPacket_IsReady(packet))
{
/* make packet's timestamp absolute */
if (!packet->m_hasAbsTimestamp)
packet->m_nTimeStamp += r->m_channelTimestamp[packet->m_nChannel]; /* timestamps seem to be always relative!! */
r->m_channelTimestamp[packet->m_nChannel] = packet->m_nTimeStamp;
/* reset the data from the stored packet. we keep the header since we may use it later if a new packet for this channel */
/* arrives and requests to re-use some info (small packet header) */
r->m_vecChannelsIn[packet->m_nChannel]->m_body = NULL;
r->m_vecChannelsIn[packet->m_nChannel]->m_nBytesRead = 0;
r->m_vecChannelsIn[packet->m_nChannel]->m_hasAbsTimestamp = FALSE; /* can only be false if we reuse header */
}
else
{
packet->m_body = NULL; /* so it won't be erased on free */
}
return TRUE;
}
1.2 解析packet(RTMP_ClientPacket)
该函数的主要作用是解析接收到的数据报,根据数据报的类型进行相应的操作。这些操作包括:
(1)RTMP_PACKET_TYPE_CHUNK_SIZE
设置chunk size
(2)RTMP_PACKET_TYPE_BYTES_READ_REPORT
应答消息,表示已经接收到了传输过来的数据报,返回的是已读取的比特数
(3)RTMP_PACKET_TYPE_CONTROL
用户控制信息
(4)RTMP_PACKET_TYPE_SERVER_BW
设置服务器带宽
(5)RTMP_PACKET_TYPE_CLIENT_BW
设置用户带宽
(6)RTMP_PACKET_TYPE_AUDIO
音频数据
(7)RTMP_PACKET_TYPE_VIDEO
视频数据
(8)RTMP_PACKET_TYPE_FLEX_STREAM_SEND
数据消息,发送元数据或任何用户数据到对端,AMF3 = 15
(9)RTMP_PACKET_TYPE_FLEX_SHARED_OBJECT
共享对象消息, AMF3 = 16
(10)RTMP_PACKET_TYPE_FLEX_MESSAGE
传递AMF编码命令,AMF3 = 17
(11)RTMP_PACKET_TYPE_INFO
数据消息,发送元数据或任何用户数据到对端,AFM0 = 18
(12)RTMP_PACKET_TYPE_SHARED_OBJECT
共享对象消息,AMF0 = 19
(13)RTMP_PACKET_TYPE_INVOKE
传递AMF编码命令,AMF0 = 20
(14)RTMP_PACKET_TYPE_FLASH_VIDEO
聚合消息,一个单一的包含一系列的RTMP子消息的消息;FLV视频
int
RTMP_ClientPacket(RTMP * r, RTMPPacket * packet)
{
int bHasMediaPacket = 0;
switch (packet->m_packetType)
{
case RTMP_PACKET_TYPE_CHUNK_SIZE: // 设置chunk size
/* chunk size */
HandleChangeChunkSize(r, packet);
break;
case RTMP_PACKET_TYPE_BYTES_READ_REPORT: // 应答消息,表示已经接收到了传输过来的数据报,返回的是已读取的比特数
/* bytes read report */
RTMP_Log(RTMP_LOGDEBUG, "%s, received: bytes read report", __FUNCTION__);
break;
case RTMP_PACKET_TYPE_CONTROL: // 控制命令
/* ctrl */
HandleCtrl(r, packet);
break;
case RTMP_PACKET_TYPE_SERVER_BW: // 设置服务器带宽
/* server bw */
HandleServerBW(r, packet);
break;
case RTMP_PACKET_TYPE_CLIENT_BW: // 设置用户带宽
/* client bw */
HandleClientBW(r, packet);
break;
case RTMP_PACKET_TYPE_AUDIO: // 音频数据
/* audio data */
/*RTMP_Log(RTMP_LOGDEBUG, "%s, received: audio %lu bytes", __FUNCTION__, packet.m_nBodySize); */
HandleAudio(r, packet);
bHasMediaPacket = 1;
if (!r->m_mediaChannel)
r->m_mediaChannel = packet->m_nChannel;
if (!r->m_pausing)
r->m_mediaStamp = packet->m_nTimeStamp;
break;
case RTMP_PACKET_TYPE_VIDEO: // 视频数据
/* video data */
/*RTMP_Log(RTMP_LOGDEBUG, "%s, received: video %lu bytes", __FUNCTION__, packet.m_nBodySize); */
HandleVideo(r, packet);
bHasMediaPacket = 1;
if (!r->m_mediaChannel)
r->m_mediaChannel = packet->m_nChannel;
if (!r->m_pausing)
r->m_mediaStamp = packet->m_nTimeStamp;
break;
case RTMP_PACKET_TYPE_FLEX_STREAM_SEND: // 数据消息,发送元数据或任何用户数据到对端,AMF3 = 15
/* flex stream send */
RTMP_Log(RTMP_LOGDEBUG,
"%s, flex stream send, size %u bytes, not supported, ignoring",
__FUNCTION__, packet->m_nBodySize);
break;
case RTMP_PACKET_TYPE_FLEX_SHARED_OBJECT: // 共享对象消息, AMF3 = 16
/* flex shared object */
RTMP_Log(RTMP_LOGDEBUG,
"%s, flex shared object, size %u bytes, not supported, ignoring",
__FUNCTION__, packet->m_nBodySize);
break;
case RTMP_PACKET_TYPE_FLEX_MESSAGE: // 传递AMF编码命令,AMF3 = 17
/* flex message */
{
RTMP_Log(RTMP_LOGDEBUG,
"%s, flex message, size %u bytes, not fully supported",
__FUNCTION__, packet->m_nBodySize);
/*RTMP_LogHex(packet.m_body, packet.m_nBodySize); */
/* some DEBUG code */
#if 0
RTMP_LIB_AMFObject obj;
int nRes = obj.Decode(packet.m_body + 1, packet.m_nBodySize - 1);
if (nRes < 0) {
RTMP_Log(RTMP_LOGERROR, "%s, error decoding AMF3 packet", __FUNCTION__);
/*return; */
}
obj.Dump();
#endif
if (HandleInvoke(r, packet->m_body + 1, packet->m_nBodySize - 1) == 1)
bHasMediaPacket = 2;
break;
}
case RTMP_PACKET_TYPE_INFO: // 数据消息,发送元数据或任何用户数据到对端,AFM0 = 18
/* metadata (notify) */
RTMP_Log(RTMP_LOGDEBUG, "%s, received: notify %u bytes", __FUNCTION__,
packet->m_nBodySize);
if (HandleMetadata(r, packet->m_body, packet->m_nBodySize))
bHasMediaPacket = 1;
break;
case RTMP_PACKET_TYPE_SHARED_OBJECT: // 共享对象消息, AMF3 = 16
RTMP_Log(RTMP_LOGDEBUG, "%s, shared object, not supported, ignoring",
__FUNCTION__);
break;
case RTMP_PACKET_TYPE_INVOKE: // 传递AMF编码命令,AMF0 = 20
/* invoke */
RTMP_Log(RTMP_LOGDEBUG, "%s, received: invoke %u bytes", __FUNCTION__,
packet->m_nBodySize);
/*RTMP_LogHex(packet.m_body, packet.m_nBodySize); */
if (HandleInvoke(r, packet->m_body, packet->m_nBodySize) == 1)
bHasMediaPacket = 2;
break;
case RTMP_PACKET_TYPE_FLASH_VIDEO: // 聚合消息,一个单一的包含一系列的RTMP子消息的消息
{
// FLV视频现在使用量比较少,这里就不分析了
/* go through FLV packets and handle metadata packets */
unsigned int pos = 0;
uint32_t nTimeStamp = packet->m_nTimeStamp;
while (pos + 11 < packet->m_nBodySize)
{
uint32_t dataSize = AMF_DecodeInt24(packet->m_body + pos + 1); /* size without header (11) and prevTagSize (4) */
if (pos + 11 + dataSize + 4 > packet->m_nBodySize)
{
RTMP_Log(RTMP_LOGWARNING, "Stream corrupt?!");
break;
}
if (packet->m_body[pos] == 0x12)
{
HandleMetadata(r, packet->m_body + pos + 11, dataSize);
}
else if (packet->m_body[pos] == 8 || packet->m_body[pos] == 9)
{
nTimeStamp = AMF_DecodeInt24(packet->m_body + pos + 4);
nTimeStamp |= (packet->m_body[pos + 7] << 24);
}
pos += (11 + dataSize + 4);
}
if (!r->m_pausing)
r->m_mediaStamp = nTimeStamp;
/* FLV tag(s) */
/*RTMP_Log(RTMP_LOGDEBUG, "%s, received: FLV tag(s) %lu bytes", __FUNCTION__, packet.m_nBodySize); */
bHasMediaPacket = 1;
break;
}
default:
RTMP_Log(RTMP_LOGDEBUG, "%s, unknown packet type received: 0x%02x", __FUNCTION__,
packet->m_packetType);
#ifdef _DEBUG
RTMP_LogHex(RTMP_LOGDEBUG, packet->m_body, packet->m_nBodySize);
#endif
}
return bHasMediaPacket;
}
1.2.1 设置Chunk Size(HandleChangeChunkSize)
static void
HandleChangeChunkSize(RTMP * r, const RTMPPacket * packet)
{
if (packet->m_nBodySize >= 4)
{
// 解码4字节AMF编码的信息
r->m_inChunkSize = AMF_DecodeInt32(packet->m_body);
RTMP_Log(RTMP_LOGDEBUG, "%s, received: chunk size change to %d", __FUNCTION__,
r->m_inChunkSize);
}
}
1.2.2 用户控制信息(HandleCtrl)
static void
HandleCtrl(RTMP * r, const RTMPPacket * packet)
{
short nType = -1;
unsigned int tmp;
if (packet->m_body && packet->m_nBodySize >= 2)
nType = AMF_DecodeInt16(packet->m_body); // 前2个字节为Event type
RTMP_Log(RTMP_LOGDEBUG, "%s, received ctrl. type: %d, len: %d", __FUNCTION__, nType,
packet->m_nBodySize);
/*RTMP_LogHex(packet.m_body, packet.m_nBodySize); */
if (packet->m_nBodySize >= 6)
{
switch (nType)
{
case 0: // Stream Begin
tmp = AMF_DecodeInt32(packet->m_body + 2);
RTMP_Log(RTMP_LOGDEBUG, "%s, Stream Begin %d", __FUNCTION__, tmp);
break;
case 1: // Stream EOF
tmp = AMF_DecodeInt32(packet->m_body + 2);
RTMP_Log(RTMP_LOGDEBUG, "%s, Stream EOF %d", __FUNCTION__, tmp);
if (r->m_pausing == 1)
r->m_pausing = 2;
break;
case 2: // Stream Dry
tmp = AMF_DecodeInt32(packet->m_body + 2);
RTMP_Log(RTMP_LOGDEBUG, "%s, Stream Dry %d", __FUNCTION__, tmp);
break;
case 4: // Stream IsRecorded
tmp = AMF_DecodeInt32(packet->m_body + 2);
RTMP_Log(RTMP_LOGDEBUG, "%s, Stream IsRecorded %d", __FUNCTION__, tmp);
break;
case 6: /* server ping. reply with pong. */
tmp = AMF_DecodeInt32(packet->m_body + 2);
RTMP_Log(RTMP_LOGDEBUG, "%s, Ping %d", __FUNCTION__, tmp);
RTMP_SendCtrl(r, 0x07, tmp, 0);
break;
/* FMS 3.5 servers send the following two controls to let the client
* know when the server has sent a complete buffer. I.e., when the
* server has sent an amount of data equal to m_nBufferMS in duration.
* The server meters its output so that data arrives at the client
* in realtime and no faster.
*
* The rtmpdump program tries to set m_nBufferMS as large as
* possible, to force the server to send data as fast as possible.
* In practice, the server appears to cap this at about 1 hour's
* worth of data. After the server has sent a complete buffer, and
* sends this BufferEmpty message, it will wait until the play
* duration of that buffer has passed before sending a new buffer.
* The BufferReady message will be sent when the new buffer starts.
* (There is no BufferReady message for the very first buffer;
* presumably the Stream Begin message is sufficient for that
* purpose.)
*
* If the network speed is much faster than the data bitrate, then
* there may be long delays between the end of one buffer and the
* start of the next.
*
* Since usually the network allows data to be sent at
* faster than realtime, and rtmpdump wants to download the data
* as fast as possible, we use this RTMP_LF_BUFX hack: when we
* get the BufferEmpty message, we send a Pause followed by an
* Unpause. This causes the server to send the next buffer immediately
* instead of waiting for the full duration to elapse. (That's
* also the purpose of the ToggleStream function, which rtmpdump
* calls if we get a read timeout.)
*
* Media player apps don't need this hack since they are just
* going to play the data in realtime anyway. It also doesn't work
* for live streams since they obviously can only be sent in
* realtime. And it's all moot if the network speed is actually
* slower than the media bitrate.
*/
/*
1. 由于网络通常允许以比实时更快的速度发送数据,并且rtmpdump希望尽可能快地下载数据,因此我们使用
RTMP_LF_BUFX hack:当我们获得BufferEmpty消息时,我们发送一个Pause,然后发送一个Unpause
这将导致服务器立即发送下一个缓冲区,而不是等待整个持续时间结束。(这也是ToggleStream函数的目的,
rtmpdump在读取超时时调用该函数
2. 媒体播放器应用程序不需要这个hack,因为它们只是要实时播放数据。它也不适用于直播流,
因为它们显然只能实时发送。如果网络速度实际上比媒体比特率慢,那么这一切都没有意义
*/
case 31: // Stream BufferEmpty
tmp = AMF_DecodeInt32(packet->m_body + 2);
RTMP_Log(RTMP_LOGDEBUG, "%s, Stream BufferEmpty %d", __FUNCTION__, tmp);
if (!(r->Link.lFlags & RTMP_LF_BUFX))
break;
if (!r->m_pausing)
{
r->m_pauseStamp = r->m_mediaChannel < r->m_channelsAllocatedIn ?
r->m_channelTimestamp[r->m_mediaChannel] : 0;
RTMP_SendPause(r, TRUE, r->m_pauseStamp);
r->m_pausing = 1;
}
else if (r->m_pausing == 2)
{
RTMP_SendPause(r, FALSE, r->m_pauseStamp);
r->m_pausing = 3;
}
break;
case 32: // Stream BufferReady
tmp = AMF_DecodeInt32(packet->m_body + 2);
RTMP_Log(RTMP_LOGDEBUG, "%s, Stream BufferReady %d", __FUNCTION__, tmp);
break;
default: // Stream xx
tmp = AMF_DecodeInt32(packet->m_body + 2);
RTMP_Log(RTMP_LOGDEBUG, "%s, Stream xx %d", __FUNCTION__, tmp);
break;
}
}
if (nType == 0x1A)
{
RTMP_Log(RTMP_LOGDEBUG, "%s, SWFVerification ping received: ", __FUNCTION__);
if (packet->m_nBodySize > 2 && packet->m_body[2] > 0x01)
{
RTMP_Log(RTMP_LOGERROR,
"%s: SWFVerification Type %d request not supported! Patches welcome...",
__FUNCTION__, packet->m_body[2]);
}
#ifdef CRYPTO
/*RTMP_LogHex(packet.m_body, packet.m_nBodySize); */
/* respond with HMAC SHA256 of decompressed SWF, key is the 30byte player key, also the last 30 bytes of the server handshake are applied */
else if (r->Link.SWFSize)
{
RTMP_SendCtrl(r, 0x1B, 0, 0);
}
else
{
RTMP_Log(RTMP_LOGERROR,
"%s: Ignoring SWFVerification request, use --swfVfy!",
__FUNCTION__);
}
#else
RTMP_Log(RTMP_LOGERROR,
"%s: Ignoring SWFVerification request, no CRYPTO support!",
__FUNCTION__);
#endif
}
}
1.2.3 设置应答窗口大小(HandleServerBW)
从RTMPDump代码中看,这条命令消息通常由client发出到server,用于设置应答窗口大小
static void
HandleServerBW(RTMP * r, const RTMPPacket * packet)
{
r->m_nServerBW = AMF_DecodeInt32(packet->m_body);
RTMP_Log(RTMP_LOGDEBUG, "%s: server BW = %d", __FUNCTION__, r->m_nServerBW);
}
1.2.4 设置对端带宽(HandleClientBW)
从RTMPDump代码中看,这条命令通常由server发送给client,用于设置client发送带宽
static void
HandleClientBW(RTMP * r, const RTMPPacket * packet)
{
// 解析带宽
r->m_nClientBW = AMF_DecodeInt32(packet->m_body);
// m_nClientBW2表示limit type
/*
1)Limit type = 0 (Hard Limit)
硬限制,对端应该限制其输出带宽到指示的窗口大小
(2)Limit type = 1 (Soft Limit)
对端应该限制其输出带宽到知识的窗口大小,或者已经有限制在其作用的话就取两者之间的较小值
(3)Limit type = 2(Dynamic Limit)
如果先前的限制类型为 Hard,处理这个消息就好像它被标记为 Hard,否则的话忽略这个消息
*/
if (packet->m_nBodySize > 4)
r->m_nClientBW2 = packet->m_body[4];
else
r->m_nClientBW2 = -1;
RTMP_Log(RTMP_LOGDEBUG, "%s: client BW = %d %d", __FUNCTION__, r->m_nClientBW,
r->m_nClientBW2);
}
1.2.5 音频数据(HandleAudio)
这个函数没有在RTMPDump中实现
static void
HandleAudio(RTMP * r, const RTMPPacket * packet)
{
}
1.2.6 视频数据(HandleVideo)
这个函数没有在RTMPDump中实现
static void
HandleVideo(RTMP * r, const RTMPPacket * packet)
{
}
1.2.7 元数据(HandleMetadata)
static int
HandleMetadata(RTMP * r, char* body, unsigned int len)
{
/* allright we get some info here, so parse it and print it */
/* also keep duration or filesize to make a nice progress bar */
AMFObject obj;
AVal metastring;
int ret = FALSE;
int nRes = AMF_Decode(&obj, body, len, FALSE);
if (nRes < 0)
{
RTMP_Log(RTMP_LOGERROR, "%s, error decoding meta data packet", __FUNCTION__);
return FALSE;
}
AMF_Dump(&obj);
AMFProp_GetString(AMF_GetProp(&obj, NULL, 0), &metastring);
if (AVMATCH(&metastring, &av_onMetaData))
{
AMFObjectProperty prop;
/* Show metadata */
RTMP_Log(RTMP_LOGINFO, "Metadata:");
DumpMetaData(&obj); // 输出metadata格式
if (RTMP_FindFirstMatchingProperty(&obj, &av_duration, &prop))
{
r->m_fDuration = prop.p_vu.p_number;
/*RTMP_Log(RTMP_LOGDEBUG, "Set duration: %.2f", m_fDuration); */
}
// 寻找音频或视频标记
/* Search for audio or video tags */
if (RTMP_FindPrefixProperty(&obj, &av_video, &prop))
r->m_read.dataType |= 1;
if (RTMP_FindPrefixProperty(&obj, &av_audio, &prop))
r->m_read.dataType |= 4;
ret = TRUE;
}
AMF_Reset(&obj);
return ret;
}
1.2.8 命令消息(HandleInvoke)
在RTMPDump中,该函数主要被用于处理server返回过来的命令消息
/* Returns 0 for OK/Failed/error, 1 for 'Stop or Complete' */
static int
HandleInvoke(RTMP * r, const char* body, unsigned int nBodySize)
{
AMFObject obj;
AVal method;
double txn;
int ret = 0, nRes;
if (body[0] != 0x02) /* make sure it is a string method name we start with */
{
RTMP_Log(RTMP_LOGWARNING, "%s, Sanity failed. no string method in invoke packet",
__FUNCTION__);
return 0;
}
nRes = AMF_Decode(&obj, body, nBodySize, FALSE);
if (nRes < 0)
{
RTMP_Log(RTMP_LOGERROR, "%s, error decoding invoke packet", __FUNCTION__);
return 0;
}
AMF_Dump(&obj);
AMFProp_GetString(AMF_GetProp(&obj, NULL, 0), &method);
txn = AMFProp_GetNumber(AMF_GetProp(&obj, NULL, 1));
RTMP_Log(RTMP_LOGDEBUG, "%s, server invoking <%s>", __FUNCTION__, method.av_val);
if (AVMATCH(&method, &av__result)) // 检查是否是av__result命令
{
AVal methodInvoked = { 0 };
int i;
for (i = 0; i < r->m_numCalls; i++) {
if (r->m_methodCalls[i].num == (int)txn) {
methodInvoked = r->m_methodCalls[i].name;
AV_erase(r->m_methodCalls, &r->m_numCalls, i, FALSE);
break;
}
}
if (!methodInvoked.av_val) {
RTMP_Log(RTMP_LOGDEBUG, "%s, received result id %f without matching request",
__FUNCTION__, txn);
goto leave;
}
RTMP_Log(RTMP_LOGDEBUG, "%s, received result for method call <%s>", __FUNCTION__,
methodInvoked.av_val);
// 检查是否是av_connect命令
/*
我理解这里的意思应该是,从server返回了一个result,并且是client发送出去av_connect的result
*/
if (AVMATCH(&methodInvoked, &av_connect))
{
if (r->Link.token.av_len)
{
AMFObjectProperty p;
if (RTMP_FindFirstMatchingProperty(&obj, &av_secureToken, &p))
{
DecodeTEA(&r->Link.token, &p.p_vu.p_aval);
SendSecureTokenResponse(r, &p.p_vu.p_aval);
}
}
if (r->Link.protocol & RTMP_FEATURE_WRITE)
{
SendReleaseStream(r);
SendFCPublish(r);
}
else
{
RTMP_SendServerBW(r);
RTMP_SendCtrl(r, 3, 0, 300);
}
// 前面发送的connect已经成功了,现在可以发送申请创建流的命令
RTMP_SendCreateStream(r);
if (!(r->Link.protocol & RTMP_FEATURE_WRITE))
{
/* Authenticate on Justin.tv legacy servers before sending FCSubscribe */
if (r->Link.usherToken.av_len)
SendUsherToken(r, &r->Link.usherToken);
/* Send the FCSubscribe if live stream or if subscribepath is set */
if (r->Link.subscribepath.av_len)
SendFCSubscribe(r, &r->Link.subscribepath);
else if (r->Link.lFlags & RTMP_LF_LIVE)
SendFCSubscribe(r, &r->Link.playpath);
}
}
else if (AVMATCH(&methodInvoked, &av_createStream)) // 检查是否是av_createStream命令
{
r->m_stream_id = (int)AMFProp_GetNumber(AMF_GetProp(&obj, NULL, 3));
if (r->Link.protocol & RTMP_FEATURE_WRITE)
{
SendPublish(r);
}
else
{
if (r->Link.lFlags & RTMP_LF_PLST)
SendPlaylist(r);
// 前面发送的av_createStream命令成功了,现在可以发送play和control的命令
SendPlay(r);
RTMP_SendCtrl(r, 3, r->m_stream_id, r->m_nBufferMS);
}
}
else if (AVMATCH(&methodInvoked, &av_play) ||
AVMATCH(&methodInvoked, &av_publish)) // 检查是否是av_play或av_publish命令
{
r->m_bPlaying = TRUE;
}
free(methodInvoked.av_val);
}
else if (AVMATCH(&method, &av_onBWDone)) // 检查是否是av_onBWDone命令
{
if (!r->m_nBWCheckCounter)
SendCheckBW(r);
}
else if (AVMATCH(&method, &av_onFCSubscribe)) // 检查是否是av_onFCSubscribe命令
{
/* SendOnFCSubscribe(); */
}
else if (AVMATCH(&method, &av_onFCUnsubscribe)) // 检查是否是av_onFCUnsubscribe命令
{
RTMP_Close(r);
ret = 1;
}
else if (AVMATCH(&method, &av_ping)) // 检查是否是av_ping命令
{
SendPong(r, txn);
}
else if (AVMATCH(&method, &av__onbwcheck)) // 检查是否是av__onbwcheck命令
{
SendCheckBWResult(r, txn);
}
else if (AVMATCH(&method, &av__onbwdone)) // 检查是否是av__onbwdone命令
{
int i;
for (i = 0; i < r->m_numCalls; i++)
if (AVMATCH(&r->m_methodCalls[i].name, &av__checkbw))
{
AV_erase(r->m_methodCalls, &r->m_numCalls, i, TRUE);
break;
}
}
else if (AVMATCH(&method, &av__error)) // 检查是否是av__error命令
{
#ifdef CRYPTO
AVal methodInvoked = { 0 };
int i;
if (r->Link.protocol & RTMP_FEATURE_WRITE)
{
for (i = 0; i < r->m_numCalls; i++)
{
if (r->m_methodCalls[i].num == txn)
{
methodInvoked = r->m_methodCalls[i].name;
AV_erase(r->m_methodCalls, &r->m_numCalls, i, FALSE);
break;
}
}
if (!methodInvoked.av_val)
{
RTMP_Log(RTMP_LOGDEBUG, "%s, received result id %f without matching request",
__FUNCTION__, txn);
goto leave;
}
RTMP_Log(RTMP_LOGDEBUG, "%s, received error for method call <%s>", __FUNCTION__,
methodInvoked.av_val);
if (AVMATCH(&methodInvoked, &av_connect))
{
AMFObject obj2;
AVal code, level, description;
AMFProp_GetObject(AMF_GetProp(&obj, NULL, 3), &obj2);
AMFProp_GetString(AMF_GetProp(&obj2, &av_code, -1), &code);
AMFProp_GetString(AMF_GetProp(&obj2, &av_level, -1), &level);
AMFProp_GetString(AMF_GetProp(&obj2, &av_description, -1), &description);
RTMP_Log(RTMP_LOGDEBUG, "%s, error description: %s", __FUNCTION__, description.av_val);
/* if PublisherAuth returns 1, then reconnect */
if (PublisherAuth(r, &description) == 1)
{
CloseInternal(r, 1);
if (!RTMP_Connect(r, NULL) || !RTMP_ConnectStream(r, 0))
goto leave;
}
}
}
else
{
RTMP_Log(RTMP_LOGERROR, "rtmp server sent error");
}
free(methodInvoked.av_val);
#else
RTMP_Log(RTMP_LOGERROR, "rtmp server sent error");
#endif
}
else if (AVMATCH(&method, &av_close)) // 检查是否是av_close命令
{
RTMP_Log(RTMP_LOGERROR, "rtmp server requested close");
RTMP_Close(r);
}
else if (AVMATCH(&method, &av_onStatus)) // 检查是否是av_onStatus命令
{ // server使用“onStatus”命令向client发送NetStream状态更新
AMFObject obj2;
AVal code, level;
AMFProp_GetObject(AMF_GetProp(&obj, NULL, 3), &obj2);
AMFProp_GetString(AMF_GetProp(&obj2, &av_code, -1), &code);
AMFProp_GetString(AMF_GetProp(&obj2, &av_level, -1), &level);
RTMP_Log(RTMP_LOGDEBUG, "%s, onStatus: %s", __FUNCTION__, code.av_val);
if (AVMATCH(&code, &av_NetStream_Failed)
|| AVMATCH(&code, &av_NetStream_Play_Failed)
|| AVMATCH(&code, &av_NetStream_Play_StreamNotFound)
|| AVMATCH(&code, &av_NetConnection_Connect_InvalidApp))
{
r->m_stream_id = -1;
RTMP_Close(r);
RTMP_Log(RTMP_LOGERROR, "Closing connection: %s", code.av_val);
}
else if (AVMATCH(&code, &av_NetStream_Play_Start)
|| AVMATCH(&code, &av_NetStream_Play_PublishNotify))
{
int i;
r->m_bPlaying = TRUE;
for (i = 0; i < r->m_numCalls; i++)
{
if (AVMATCH(&r->m_methodCalls[i].name, &av_play))
{
AV_erase(r->m_methodCalls, &r->m_numCalls, i, TRUE);
break;
}
}
}
else if (AVMATCH(&code, &av_NetStream_Publish_Start))
{
int i;
r->m_bPlaying = TRUE;
for (i = 0; i < r->m_numCalls; i++)
{
if (AVMATCH(&r->m_methodCalls[i].name, &av_publish))
{
AV_erase(r->m_methodCalls, &r->m_numCalls, i, TRUE);
break;
}
}
}
/* Return 1 if this is a Play.Complete or Play.Stop */
else if (AVMATCH(&code, &av_NetStream_Play_Complete)
|| AVMATCH(&code, &av_NetStream_Play_Stop)
|| AVMATCH(&code, &av_NetStream_Play_UnpublishNotify))
{
RTMP_Close(r);
ret = 1;
}
else if (AVMATCH(&code, &av_NetStream_Seek_Notify))
{
r->m_read.flags &= ~RTMP_READ_SEEKING;
}
else if (AVMATCH(&code, &av_NetStream_Pause_Notify))
{
if (r->m_pausing == 1 || r->m_pausing == 2)
{
RTMP_SendPause(r, FALSE, r->m_pauseStamp);
r->m_pausing = 3;
}
}
}
else if (AVMATCH(&method, &av_playlist_ready))
{
int i;
for (i = 0; i < r->m_numCalls; i++)
{
if (AVMATCH(&r->m_methodCalls[i].name, &av_set_playlist))
{
AV_erase(r->m_methodCalls, &r->m_numCalls, i, TRUE);
break;
}
}
}
else
{
}
leave:
AMF_Reset(&obj);
return ret;
}
现在假设状态为client向server发送了av_connect命令,server会给予一个反馈,client会根据这个反馈去进行下一步的操作,如果server告诉client,connect成功了,现在就可以调用RTMP_SendCreateStream()函数发送av_createStream命令,RTMP_SendCreateStream()函数定义如下
int
RTMP_SendCreateStream(RTMP * r)
{
RTMPPacket packet;
char pbuf[256], * pend = pbuf + sizeof(pbuf);
char* enc;
packet.m_nChannel = 0x03; /* control channel (invoke) */
packet.m_headerType = RTMP_PACKET_SIZE_MEDIUM;
packet.m_packetType = RTMP_PACKET_TYPE_INVOKE;
packet.m_nTimeStamp = 0;
packet.m_nInfoField2 = 0;
packet.m_hasAbsTimestamp = 0;
packet.m_body = pbuf + RTMP_MAX_HEADER_SIZE;
enc = packet.m_body;
enc = AMF_EncodeString(enc, pend, &av_createStream); // 写入av_createStream命令
enc = AMF_EncodeNumber(enc, pend, ++r->m_numInvokes);
*enc++ = AMF_NULL; /* NULL */
packet.m_nBodySize = enc - packet.m_body;
return RTMP_SendPacket(r, &packet, TRUE);
}
2.小结
本文记录了使用RTMP进行流连接的过程,主要内容包括:
(1)读取server反馈的packet
(2)解析packet。前面client已经发送了av_connect命令,这里会解析这条命令是否成功,如果成功则可以使用RTMP_SendCreateStream()来发送av_createStream命令,申请创建流连接