阅读了librtmp的源码,简单记录下。
首先补充下AMF格式基本知识
1 AMF格式
AMF是Action Message Format(动作消息格式)的简写,它是一种二进制的数据格式。它的设计是为了把actionscript里面的数据(包括Object, Array, Boolean, Number等)序列化成二进制数据,然后把这段数据随意发送给其他接收方程序,比如发给远程的服务器,在远程服务器那边,可以把这段数据给还原出来,以此达到一个数据传输的作用。
1.1 AMFObject
AMF分成两种: 1. AMF0,基本的数据转换规则; 2. AMF3,是AMF0的扩展
AMF0数据类型:
// AMF0数据类型
typedef enum
{
AMF_NUMBER = 0, // 数字(double)
AMF_BOOLEAN, // 布尔
AMF_STRING, // 字符串
AMF_OBJECT, // 对象
AMF_MOVIECLIP, // 保留,未使用
AMF_NULL, // null
AMF_UNDEFINED, // 未定义
AMF_REFERENCE, // 引用
AMF_ECMA_ARRAY, // 数组
AMF_OBJECT_END, // 对象结束
AMF_STRICT_ARRAY, // 严格的数组
AMF_DATE, // 日期
AMF_LONG_STRING, // 长字符串
AMF_UNSUPPORTED, // 未支持
AMF_RECORDSET, // 保留,未使用
AMF_XML_DOC, // xml文档
AMF_TYPED_OBJECT, // 有类型的对象
AMF_AVMPLUS, // 需要扩展到AMF3
AMF_INVALID = 0xff // 无效的
}AMFDataType;
AMF3数据类型:
// AMF3数据类型
typedef enum
{
AMF3_UNDEFINED = 0, // 未定义
AMF3_NULL, // null
AMF3_FALSE, // false
AMF3_TRUE, // true
AMF3_INTEGER, // 数字int
AMF3_DOUBLE, // double
AMF3_STRING, // 字符串
AMF3_XML_DOC, // xml文档
AMF3_DATE, // 日期
AMF3_ARRAY, // 数组
AMF3_OBJECT, // 对象
AMF3_XML, // xml
AMF3_BYTE_ARRAY // 字节数组
} AMF3DataType;
AMF定义了自己的字符串类型:
// AMF自定义的字符串
typedef struct AVal
{
char *av_val;
int av_len;
} AVal;
// AVal的快速初始化
#define AVC(str) {str,sizeof(str)-1}
// 比较AVal字符串
#define AVMATCH(a1,a2) ((a1)->av_len == (a2)->av_len && !memcmp((a1)->av_val,(a2)->av_val,(a1)->av_len))
AMFObject表示AMF对象,o_num 代表 o_props的个数, 一个对象内部可以包含N个对象属性
// AMF对象, 就是由一系列的属性构成的
typedef struct AMFObject
{
int o_num; // 属性数目;
struct AMFObjectProperty *o_props; // 属性数组;
} AMFObject;
AMFObjectProperty表示AMF对象属性,即key-value键值对。p_name表示key;p_type表示value的类型;p_vu表示value的数值。
// AMF对象的属性;
typedef struct AMFObjectProperty
{
AVal p_name; // 属性名称;
AMFDataType p_type; // 属性类型;
union
{
double p_number;
AVal p_aval;
AMFObject p_object;
} p_vu; // 属性数值;
int16_t p_UTCoffset; // UTC偏移;
} AMFObjectProperty;
p_vu设置为联合体的目的:
当p_type为number时, m_vu取值double类型 p_number;
当p_type为string时, m_vu取值AVal类型 p_aval;
当p_type为object时, m_vu取值AMFObject类型 p_object。
1.2 编码格式
浮点数:
0x00 + 8字节浮点数
Bool型:
0x01 + 1字节Bool值
短字符串:
0x02 + 2字节长度 + 字符串
长字符串
0x02 + 4字节长度 + 字符串
对象:
0x03 + 属性1名称长度 + 属性1名称 + 1字节属性1类型 + n字节属性1值 + 属性2名称长度 + 属性2名称 + 1字节属性2类型 + n字节属性2值 + 3字节结尾标志
2 librtmp源码分析
2.1 RTMP_ParseURL
解析URL,得到协议名称(protocol),主机名称(host),应用程序名称(app)
2.2 HandShake(握手)
handshake.h文件里面的HandShake有些代码是处理rtmp加密版协议,考虑普通的rtmp协议,分析rtmp.c文件里的HandShake
static int
HandShake(RTMP *r, int FP9HandShake)
{
int i;
uint32_t uptime, suptime;
int bMatch;
char type;
char clientbuf[RTMP_SIG_SIZE + 1], *clientsig = clientbuf + 1;
char serversig[RTMP_SIG_SIZE];
clientbuf[0] = 0x03; /* not encrypted */
uptime = htonl(RTMP_GetTime());
memcpy(clientsig, &uptime, 4);
memset(&clientsig[4], 0, 4);
#ifdef _DEBUG
for (i = 8; i < RTMP_SIG_SIZE; i++)
clientsig[i] = 0xff;
#else
for (i = 8; i < RTMP_SIG_SIZE; i++)
clientsig[i] = (char)(rand() % 256);
#endif
if (!WriteN(r, clientbuf, RTMP_SIG_SIZE + 1))
return FALSE;
if (ReadN(r, &type, 1) != 1) /* 0x03 or 0x06 */
return FALSE;
RTMP_Log(RTMP_LOGDEBUG, "%s: Type Answer : %02X", __FUNCTION__, type);
if (type != clientbuf[0])
RTMP_Log(RTMP_LOGWARNING, "%s: Type mismatch: client sent %d, server answered %d",
__FUNCTION__, clientbuf[0], type);
if (ReadN(r, serversig, RTMP_SIG_SIZE) != RTMP_SIG_SIZE)
return FALSE;
/* decode server response */
memcpy(&suptime, serversig, 4);
suptime = ntohl(suptime);
RTMP_Log(RTMP_LOGDEBUG, "%s: Server Uptime : %d", __FUNCTION__, suptime);
RTMP_Log(RTMP_LOGDEBUG, "%s: FMS Version : %d.%d.%d.%d", __FUNCTION__,
serversig[4], serversig[5], serversig[6], serversig[7]);
/* 2nd part of handshake */
if (!WriteN(r, serversig, RTMP_SIG_SIZE))
return FALSE;
if (ReadN(r, serversig, RTMP_SIG_SIZE) != RTMP_SIG_SIZE)
return FALSE;
bMatch = (memcmp(serversig, clientsig, RTMP_SIG_SIZE) == 0);
if (!bMatch)
{
RTMP_Log(RTMP_LOGWARNING, "%s, client signature does not match!", __FUNCTION__);
}
return TRUE;
}
1)填充C0=0x3;C1填充时间戳和随机数共1536byte
2)发送C0 C1给服务器
3)从服务器读取S0比对是否为0x3,从服务器读取S1
4)把S1作为C2发送给服务器
5)从服务器读取S2,比对C1和S2,相同则握手成功
2.3 RTMP_Connect
建立NetConnection
主要调用了两个函数,RTMP_Connect0和RTMP_Connect1
RTMP_Connect0
建立Socket连接
RTMP_Connect1
建立RTMP连接,HandShake完成握手,SendConnectPacket发送"connect"命令建立RTMP连接
SendConnectPacket
填充packet头
m_nChannel --> chunk Stream ID
m_headerType --> chunk header中的basic header中的fmt
m_packetType --> Message Type ID,填充的0x14,表示命令消息
m_nTimeStamp --> 时间戳
m_nInfoField2 --> chunk fmt为0时,header的最后四个字节,即Message Stream ID
m_hasAbsTimestamp --> 时间戳是绝对的还是相对的,即chunk type为0时为绝对时间戳,其他类型时为时间戳增量
packet.m_nChannel = 0x03; /* control channel (invoke) */
packet.m_headerType = RTMP_PACKET_SIZE_LARGE;
packet.m_packetType = RTMP_PACKET_TYPE_INVOKE;
packet.m_nTimeStamp = 0;
packet.m_nInfoField2 = 0;
packet.m_hasAbsTimestamp = 0;
packet.m_body = pbuf + RTMP_MAX_HEADER_SIZE;
将av_x串化为"x",如:av_connect串化为"connect"
#define SAVC(x) static const AVal av_##x = AVC(#x)
SAVC(app);
SAVC(connect);
SAVC(flashVer);
SAVC(swfUrl);
SAVC(pageUrl);
SAVC(tcUrl);
SAVC(fpad);
SAVC(capabilities);
SAVC(audioCodecs);
SAVC(videoCodecs);
SAVC(videoFunction);
SAVC(objectEncoding);
SAVC(secureToken);
SAVC(secureTokenResponse);
SAVC(type);
SAVC(nonprivate);
按照RTMP协议规范 7.2.1.1 发送connect命令
命令名"connect"AMF编码
enc = AMF_EncodeString(enc, pend, &av_connect);
Transaction ID AMF编码
enc = AMF_EncodeNumber(enc, pend, ++r->m_numInvokes);
设置connect 命令中使用的名值对对象
对象格式起始(0x3)
*enc++ = AMF_OBJECT;
属性"app",名字为r->Link.app
enc = AMF_EncodeNamedString(enc, pend, &av_app, &r->Link.app);
属性"flashver"
enc = AMF_EncodeNamedString(enc, pend, &av_flashVer, &r->Link.flashVer);
属性"swfUrl"
enc = AMF_EncodeNamedString(enc, pend, &av_swfUrl, &r->Link.swfUrl);
属性"tcUrl"
enc = AMF_EncodeNamedString(enc, pend, &av_tcUrl, &r->Link.tcUrl);
属性"fpad"
enc = AMF_EncodeNamedBoolean(enc, pend, &av_fpad, FALSE);
属性"capabilities"
enc = AMF_EncodeNamedNumber(enc, pend, &av_capabilities, 15.0);
属性"audioCodecs"
enc = AMF_EncodeNamedNumber(enc, pend, &av_audioCodecs, r->m_fAudioCodecs);
属性"videoCodecs"
enc = AMF_EncodeNamedNumber(enc, pend, &av_videoCodecs, r->m_fVideoCodecs);
属性"videoFunction"
enc = AMF_EncodeNamedNumber(enc, pend, &av_videoFunction, 1.0);
属性"pageUrl"
enc = AMF_EncodeNamedString(enc, pend, &av_pageUrl, &r->Link.pageUrl);
属性"objectEncoding"
enc = AMF_EncodeNamedNumber(enc, pend, &av_objectEncoding, r->m_fEncoding);
对象格式结束(0x00 0x00 0x09)
*enc++ = 0;
*enc++ = 0;
*enc++ = AMF_OBJECT_END;
wireshark抓包示意
2.4 RTMP_ConnectStream
在NetConnection基础上建立一个NetStream
主要调用了两个函数,RTMP_ReadPacket和RTMP_ClientPacket
RTMP_ReadPacket
从socket接收消息块(chunk),解析chunk存放在RTMPPacket。此处是块(chunk)而不是消息(message),因为消息在网络传输会分割成块。一个消息可能封装多个块,当所有块读取完再处理消息。
按照RTMP协议规范 5.3.1解析chunk
int
RTMP_ReadPacket(RTMP *r, RTMPPacket *packet)
{
uint8_t hbuf[RTMP_MAX_HEADER_SIZE] = { 0 };
char *header = (char *)hbuf;
int nSize, hSize, nToRead, nChunk;
int didAlloc = FALSE;
int extendedTimestamp;
RTMP_Log(RTMP_LOGDEBUG2, "%s: fd=%d", __FUNCTION__, r->m_sb.sb_socket);
if (ReadN(r, (char *)hbuf, 1) == 0)
{
RTMP_Log(RTMP_LOGERROR, "%s, failed to read RTMP packet header", __FUNCTION__);
return FALSE;
}
packet->m_headerType = (hbuf[0] & 0xc0) >> 6; // fmt(2bit)
packet->m_nChannel = (hbuf[0] & 0x3f); // 块流ID(2-63)
header++;
if (packet->m_nChannel == 0) // 块流ID第一个字节为0,表示块流ID占2个字节,表示ID的范围是64-319(第二个字节 + 64)
{
if (ReadN(r, (char *)&hbuf[1], 1) != 1)
{
RTMP_Log(RTMP_LOGERROR, "%s, failed to read RTMP packet header 2nd byte",
__FUNCTION__);
return FALSE;
}
packet->m_nChannel = hbuf[1];
packet->m_nChannel += 64;
header++;
}
else if (packet->m_nChannel == 1) // 块流ID第一个字节为1,表示块流ID占3个字节,表示ID范围是64-65599(第三个字节*256 + 第二个字节 + 64)
{
int tmp;
if (ReadN(r, (char *)&hbuf[1], 2) != 2)
{
RTMP_Log(RTMP_LOGERROR, "%s, failed to read RTMP packet header 3nd byte",
__FUNCTION__);
return FALSE;
}
tmp = (hbuf[2] << 8) + hbuf[1];
packet->m_nChannel = tmp + 64;
RTMP_Log(RTMP_LOGDEBUG, "%s, m_nChannel: %0x", __FUNCTION__, packet->m_nChannel);
header += 2;
}
nSize = packetSize[packet->m_headerType]; //ChunkMsgHeader 4种类型,大小分别11/7/3/0,数组值多加了1
if (packet->m_nChannel >= r->m_channelsAllocatedIn)
{
int n = packet->m_nChannel + 10;
int *timestamp = realloc(r->m_channelTimestamp, sizeof(int) * n);
RTMPPacket **packets = realloc(r->m_vecChannelsIn, sizeof(RTMPPacket*) * n);
if (!timestamp)
free(r->m_channelTimestamp);
if (!packets)
free(r->m_vecChannelsIn);
r->m_channelTimestamp = timestamp;
r->m_vecChannelsIn = packets;
if (!timestamp || !packets) {
r->m_channelsAllocatedIn = 0;
return FALSE;
}
memset(r->m_channelTimestamp + r->m_channelsAllocatedIn, 0, sizeof(int) * (n - r->m_channelsAllocatedIn));
memset(r->m_vecChannelsIn + r->m_channelsAllocatedIn, 0, sizeof(RTMPPacket*) * (n - r->m_channelsAllocatedIn));
r->m_channelsAllocatedIn = n;
}
if (nSize == RTMP_LARGE_HEADER_SIZE) /* if we get a full header the timestamp is absolute */
packet->m_hasAbsTimestamp = TRUE; // 11字节完整ChunkMsgHeader的TimeStamp是绝对时间戳
else if (nSize < RTMP_LARGE_HEADER_SIZE)
{ /* using values from the last message of this channel */
if (r->m_vecChannelsIn[packet->m_nChannel])
memcpy(packet, r->m_vecChannelsIn[packet->m_nChannel],
sizeof(RTMPPacket));
}
nSize--; // 真实的ChunkMsgHeader的大小,此处减1是因为前面获取包类型的时候多加了1
if (nSize > 0 && ReadN(r, header, nSize) != nSize)
{
RTMP_Log(RTMP_LOGERROR, "%s, failed to read RTMP packet header. type: %x",
__FUNCTION__, (unsigned int)hbuf[0]);
return FALSE;
}
hSize = nSize + (header - (char *)hbuf); // 目前已经读取的字节数 = basic header + chunk msg header
if (nSize >= 3) // chunk msg header为11/7/3字节,fmt类型值为0/1/2
{
packet->m_nTimeStamp = AMF_DecodeInt24(header); // 首部前3个字节为timestamp
/*RTMP_Log(RTMP_LOGDEBUG, "%s, reading RTMP packet chunk on channel %x, headersz %i, timestamp %i, abs timestamp %i", __FUNCTION__, packet.m_nChannel, nSize, packet.m_nTimeStamp, packet.m_hasAbsTimestamp); */
if (nSize >= 6) // chunk msg header为11或7字节,fmt类型值为0或1
{
packet->m_nBodySize = AMF_DecodeInt24(header + 3); // msg length
packet->m_nBytesRead = 0;
if (nSize > 6)
{
packet->m_packetType = header[6]; // msg type id
if (nSize == 11)
packet->m_nInfoField2 = DecodeInt32LE(header + 7); // msg stream id,小端字节序
}
}
}
extendedTimestamp = packet->m_nTimeStamp == 0xffffff; //timestamp为0xffffff,则需要extend timestamp,占4字节
if (extendedTimestamp)
{
if (ReadN(r, header + nSize, 4) != 4)
{
RTMP_Log(RTMP_LOGERROR, "%s, failed to read extended timestamp",
__FUNCTION__);
return FALSE;
}
packet->m_nTimeStamp = AMF_DecodeInt32(header + nSize);
hSize += 4;
}
RTMP_LogHexString(RTMP_LOGDEBUG2, (uint8_t *)hbuf, hSize);
if (packet->m_nBodySize > 0 && packet->m_body == NULL)
{
if (!RTMPPacket_Alloc(packet, packet->m_nBodySize))
{
RTMP_Log(RTMP_LOGDEBUG, "%s, failed to allocate packet", __FUNCTION__);
return FALSE;
}
didAlloc = TRUE;
packet->m_headerType = (hbuf[0] & 0xc0) >> 6;
}
nToRead = packet->m_nBodySize - packet->m_nBytesRead;
nChunk = r->m_inChunkSize;
if (nToRead < nChunk)
nChunk = nToRead;
/* Does the caller want the raw chunk? */
if (packet->m_chunk)
{
packet->m_chunk->c_headerSize = hSize; // 块头大小
memcpy(packet->m_chunk->c_header, hbuf, hSize); // 填充块头数据
packet->m_chunk->c_chunk = packet->m_body + packet->m_nBytesRead; // 块消息数据缓冲区
packet->m_chunk->c_chunkSize = nChunk; // 块大小
}
// 读取一个块大小的数据存入块消息数据缓冲区
if (ReadN(r, packet->m_body + packet->m_nBytesRead, nChunk) != nChunk)
{
RTMP_Log(RTMP_LOGERROR, "%s, failed to read RTMP packet body. len: %u",
__FUNCTION__, packet->m_nBodySize);
return FALSE;
}
RTMP_LogHexString(RTMP_LOGDEBUG2, (uint8_t *)packet->m_body + packet->m_nBytesRead, nChunk);
packet->m_nBytesRead += nChunk;
/* keep the packet as ref for other packets on this channel */
if (!r->m_vecChannelsIn[packet->m_nChannel])
r->m_vecChannelsIn[packet->m_nChannel] = malloc(sizeof(RTMPPacket));
memcpy(r->m_vecChannelsIn[packet->m_nChannel], packet, sizeof(RTMPPacket));
if (extendedTimestamp)
{
r->m_vecChannelsIn[packet->m_nChannel]->m_nTimeStamp = 0xffffff;
}
if (RTMPPacket_IsReady(packet)) // 读取完毕
{
/* make packet's timestamp absolute */
if (!packet->m_hasAbsTimestamp)
packet->m_nTimeStamp += r->m_channelTimestamp[packet->m_nChannel]; /* timestamps seem to be always relative!! */
r->m_channelTimestamp[packet->m_nChannel] = packet->m_nTimeStamp;
/* reset the data from the stored packet. we keep the header since we may use it later if a new packet for this channel */
/* arrives and requests to re-use some info (small packet header) */
r->m_vecChannelsIn[packet->m_nChannel]->m_body = NULL;
r->m_vecChannelsIn[packet->m_nChannel]->m_nBytesRead = 0;
r->m_vecChannelsIn[packet->m_nChannel]->m_hasAbsTimestamp = FALSE; /* can only be false if we reuse header */
}
else
{
packet->m_body = NULL; /* so it won't be erased on free */
}
return TRUE;
}
RTMP_ClientPacket
根据接收到的消息(Message)类型的不同,做出不同的响应
int
RTMP_ClientPacket(RTMP *r, RTMPPacket *packet)
{
int bHasMediaPacket = 0;
switch (packet->m_packetType)
{
case RTMP_PACKET_TYPE_CHUNK_SIZE: // msg type 0x1 设置块大小
/* chunk size */
HandleChangeChunkSize(r, packet);
break;
case RTMP_PACKET_TYPE_BYTES_READ_REPORT: // msg type 0x3 确认
/* bytes read report */
RTMP_Log(RTMP_LOGDEBUG, "%s, received: bytes read report", __FUNCTION__);
break;
case RTMP_PACKET_TYPE_CONTROL: // msg type 0x4 用户控制
/* ctrl */
HandleCtrl(r, packet);
break;
case RTMP_PACKET_TYPE_SERVER_BW: // msg type 0x5 确认窗口大小
/* server bw */
HandleServerBW(r, packet);
break;
case RTMP_PACKET_TYPE_CLIENT_BW: // msg type 0x6 设置对端带宽
/* client bw */
HandleClientBW(r, packet);
break;
case RTMP_PACKET_TYPE_AUDIO: // msy type 0x8 音频
/* audio data */
/*RTMP_Log(RTMP_LOGDEBUG, "%s, received: audio %lu bytes", __FUNCTION__, packet.m_nBodySize); */
HandleAudio(r, packet);
bHasMediaPacket = 1;
if (!r->m_mediaChannel)
r->m_mediaChannel = packet->m_nChannel;
if (!r->m_pausing)
r->m_mediaStamp = packet->m_nTimeStamp;
break;
case RTMP_PACKET_TYPE_VIDEO: // msg type 0x9 视频
/* video data */
/*RTMP_Log(RTMP_LOGDEBUG, "%s, received: video %lu bytes", __FUNCTION__, packet.m_nBodySize); */
HandleVideo(r, packet);
bHasMediaPacket = 1;
if (!r->m_mediaChannel)
r->m_mediaChannel = packet->m_nChannel;
if (!r->m_pausing)
r->m_mediaStamp = packet->m_nTimeStamp;
break;
case RTMP_PACKET_TYPE_FLEX_STREAM_SEND: // msg type 0xf AMF3编码 数据消息
/* flex stream send */
RTMP_Log(RTMP_LOGDEBUG,
"%s, flex stream send, size %u bytes, not supported, ignoring",
__FUNCTION__, packet->m_nBodySize);
break;
case RTMP_PACKET_TYPE_FLEX_SHARED_OBJECT: // msg type 0x10 AMF3编码 共享对象消息
/* flex shared object */
RTMP_Log(RTMP_LOGDEBUG,
"%s, flex shared object, size %u bytes, not supported, ignoring",
__FUNCTION__, packet->m_nBodySize);
break;
case RTMP_PACKET_TYPE_FLEX_MESSAGE: // msg type 0x11 AMF3编码 命令消息
/* flex message */
{
RTMP_Log(RTMP_LOGDEBUG,
"%s, flex message, size %u bytes, not fully supported",
__FUNCTION__, packet->m_nBodySize);
/*RTMP_LogHex(packet.m_body, packet.m_nBodySize); */
/* some DEBUG code */
#if 0
RTMP_LIB_AMFObject obj;
int nRes = obj.Decode(packet.m_body+1, packet.m_nBodySize-1);
if(nRes < 0) {
RTMP_Log(RTMP_LOGERROR, "%s, error decoding AMF3 packet", __FUNCTION__);
/*return; */
}
obj.Dump();
#endif
if (HandleInvoke(r, packet->m_body + 1, packet->m_nBodySize - 1) == 1)
bHasMediaPacket = 2;
break;
}
case RTMP_PACKET_TYPE_INFO: // msg type 0x12 AMF0编码数据消息
/* metadata (notify) */
RTMP_Log(RTMP_LOGDEBUG, "%s, received: notify %u bytes", __FUNCTION__,
packet->m_nBodySize);
if (HandleMetadata(r, packet->m_body, packet->m_nBodySize))
bHasMediaPacket = 1;
break;
case RTMP_PACKET_TYPE_SHARED_OBJECT: // msg type 0x13 AMF0编码共享对象消息
RTMP_Log(RTMP_LOGDEBUG, "%s, shared object, not supported, ignoring",
__FUNCTION__);
break;
case RTMP_PACKET_TYPE_INVOKE: // msg type 0x14 AMF0编码命令消息
/* invoke */
RTMP_Log(RTMP_LOGDEBUG, "%s, received: invoke %u bytes", __FUNCTION__,
packet->m_nBodySize);
/*RTMP_LogHex(packet.m_body, packet.m_nBodySize); */
if (HandleInvoke(r, packet->m_body, packet->m_nBodySize) == 1)
bHasMediaPacket = 2;
break;
case RTMP_PACKET_TYPE_FLASH_VIDEO: // msg type 0x16 统计消息
{
/* go through FLV packets and handle metadata packets */
unsigned int pos = 0;
uint32_t nTimeStamp = packet->m_nTimeStamp;
while (pos + 11 < packet->m_nBodySize)
{
uint32_t dataSize = AMF_DecodeInt24(packet->m_body + pos + 1); /* size without header (11) and prevTagSize (4) */
if (pos + 11 + dataSize + 4 > packet->m_nBodySize)
{
RTMP_Log(RTMP_LOGWARNING, "Stream corrupt?!");
break;
}
if (packet->m_body[pos] == 0x12)
{
HandleMetadata(r, packet->m_body + pos + 11, dataSize);
}
else if (packet->m_body[pos] == 8 || packet->m_body[pos] == 9)
{
nTimeStamp = AMF_DecodeInt24(packet->m_body + pos + 4);
nTimeStamp |= (packet->m_body[pos + 7] << 24);
}
pos += (11 + dataSize + 4);
}
if (!r->m_pausing)
r->m_mediaStamp = nTimeStamp;
/* FLV tag(s) */
/*RTMP_Log(RTMP_LOGDEBUG, "%s, received: FLV tag(s) %lu bytes", __FUNCTION__, packet.m_nBodySize); */
bHasMediaPacket = 1;
break;
}
default:
RTMP_Log(RTMP_LOGDEBUG, "%s, unknown packet type received: 0x%02x", __FUNCTION__,
packet->m_packetType);
#ifdef _DEBUG
RTMP_LogHex(RTMP_LOGDEBUG, packet->m_body, packet->m_nBodySize);
#endif
}
return bHasMediaPacket;
}
主要关注msg type 0x14 的消息,AMF0编码的命令消息。这在RTMP连接中是非常常见的,比如说各种控制命令:播放,暂停,停止等。消息处理函数HandleInvoke
HandleInvoke
主要分析"createStream"流程
RTMP_Connect建立网络连接后,服务器返回"_result"给客户端
如果是"connect"的回应,调用RTMP_SendCreateStream,发送"createStream"给服务器,服务器返回"_result"给客户端
if (AVMATCH(&methodInvoked, &av_connect))
{
...
RTMP_SendCreateStream(r);
...
}
如果是"createStream"的回应,使能推流调用SendPublish,发送"publish",反之调用SendPlaylist获取播放列表和SendPlay 发送"play"开始播放流媒体数据
else if (AVMATCH(&methodInvoked, &av_createStream))
{
r->m_stream_id = (int)AMFProp_GetNumber(AMF_GetProp(&obj, NULL, 3));
if (r->Link.protocol & RTMP_FEATURE_WRITE)
{
SendPublish(r);
}
else
{
if (r->Link.lFlags & RTMP_LF_PLST)
SendPlaylist(r);
SendPlay(r);
RTMP_SendCtrl(r, 3, r->m_stream_id, r->m_nBufferMS);
}
}
以下是rtmpdump拉流的打印,可以更加清晰的看出代码的执行流程
2.5 RTMP_Read
FLV层读取数据,主要调用Read_1_Packet,从网络上读取RTMPPacket数据,这个数据是不包含FLV头的,RTMP_Read函数里增加flv头,flv header+Previous Tag Size共13字节
static const char flvHeader[] = { 'F', 'L', 'V', 0x01,
0x00, /* 0x04 == audio, 0x01 == video */
0x00, 0x00, 0x00, 0x09,
0x00, 0x00, 0x00, 0x00
};
Read_1_Packet
主要调用RTMP_ReadPacket和RTMP_ClientPacket(),前一个负责从网络读取数据,后一个负责处理数据。RTMP_ReadPacket读取的RTMPPacket数据是FLV的裸数据 Tag Data,Read_1_Packet函数里增加Tag Header
/* audio (0x08), video (0x09) or metadata (0x12) packets :
* construct 11 byte header then add rtmp packet's data */
if (packet.m_packetType == RTMP_PACKET_TYPE_AUDIO
|| packet.m_packetType == RTMP_PACKET_TYPE_VIDEO
|| packet.m_packetType == RTMP_PACKET_TYPE_INFO)
{
nTimeStamp = r->m_read.nResumeTS + packet.m_nTimeStamp;
prevTagSize = 11 + nPacketLen;
*ptr = packet.m_packetType;
ptr++;
ptr = AMF_EncodeInt24(ptr, pend, nPacketLen);
#if 0
if(packet.m_packetType == RTMP_PACKET_TYPE_VIDEO) {
/* H264 fix: */
if((packetBody[0] & 0x0f) == 7) { /* CodecId = H264 */
uint8_t packetType = *(packetBody+1);
uint32_t ts = AMF_DecodeInt24(packetBody+2); /* composition time */
int32_t cts = (ts+0xff800000)^0xff800000;
RTMP_Log(RTMP_LOGDEBUG, "cts : %d\n", cts);
nTimeStamp -= cts;
/* get rid of the composition time */
CRTMP::EncodeInt24(packetBody+2, 0);
}
RTMP_Log(RTMP_LOGDEBUG, "VIDEO: nTimeStamp: 0x%08X (%d)\n", nTimeStamp, nTimeStamp);
}
#endif
ptr = AMF_EncodeInt24(ptr, pend, nTimeStamp);
*ptr = (char)((nTimeStamp & 0xFF000000) >> 24);
ptr++;
/* stream id */
ptr = AMF_EncodeInt24(ptr, pend, 0);
}
2.6 RTMP_Write
解析FLV header和Tag header,调用RTMP_SendPacket 发送Tag Data到网络
RTMP_SendPacket
主要是将Tag Data封装成chunk 数据,send出去。RTMP_ReadPacket的逆向处理,不过多分析。
2.7 字段注释
/*
表示一个 raw chunk,原始chunk
c_header,c_headerSize:保存chunk header的数据和大小
c_chunk,c_chunkSize:保存chunk data的数据和大小
*/
typedef struct RTMPChunk
{
int c_headerSize;
int c_chunkSize;
char *c_chunk;
char c_header[RTMP_MAX_HEADER_SIZE];
} RTMPChunk;
/*
表示一个Message
m_headerType :表示m_chunk的类型,即chunk header中的basic header中的fmt
m_packetType :表示Message Type ID
m_hasAbsTimestamp :表示时间戳是绝对的还是相对的,即chunk type为0时为绝对时间戳,其他类型时为时间戳增量
m_nChannel :表示chunk Stream ID
m_nTimeStamp :时间戳
m_nInfoField2 :chunk fmt为0时,header的最后四个字节,即Message Stream ID
m_nBodySize :Message的body的尺寸
m_nBytesRead :已经读取到的body的字节数
m_chunk :如果不为NULL,表示用户想要获取chunk,那么在读取Message时,会填充这个字段
m_body :Message的body
*/
typedef struct RTMPPacket
{
uint8_t m_headerType;
uint8_t m_packetType;
uint8_t m_hasAbsTimestamp; /* timestamp absolute or relative? */
int m_nChannel;
uint32_t m_nTimeStamp; /* timestamp */
int32_t m_nInfoField2; /* last 4 bytes in a long header */
uint32_t m_nBodySize;
uint32_t m_nBytesRead;
RTMPChunk *m_chunk;
char *m_body;
} RTMPPacket;
/*
RTMPSockBuf:RTMP的传输层的套接字及其缓存
表示一个TCP套接字连接,以及其读取缓存
sb_socket :Socket套接字
sb_size :buffer中未处理的字节数量,即缓冲数据的大小
sb_start :指向buffer中需要处理的字节,即指向缓冲数据
sb_buf :数据读取缓冲区
sb_timedout :套接字是否中断
sb_ssl :SSL相关数据
*/
typedef struct RTMPSockBuf
{
int sb_socket;
int sb_size; /* number of unprocessed bytes in buffer */
char *sb_start; /* pointer into sb_pBuffer of next byte to process */
char sb_buf[RTMP_BUFFER_CACHE_SIZE]; /* data read from socket */
int sb_timedout;
void *sb_ssl;
} RTMPSockBuf;
/*
RTMP的连接参数,即要建立RTMP连接所需的参数集
注意:这是由客户端的用户提供的
这个结构体里的字段的含义和rtmpdump中的选项联系紧密,可以查看rtmpdump中选项的含义来帮助我们理解它们
*/
typedef struct RTMP_LNK
{
AVal hostname; //要连接的服务器的主机名
AVal sockshost; //代理主机名称
AVal playpath0; /* parsed from URL */
AVal playpath; /* passed in explicitly */
AVal tcUrl; //要连接的目标流的URL,默认值为:rtmp[e]://host[:port]/app/playpath,由解析出的各个字段值拼接而成
AVal swfUrl; //媒体的SWF播放器的URL,默认不设置任何值
AVal pageUrl; //嵌入网页的媒体的URL,默认不设置任何值
AVal app; //要连接的服务器上的app
AVal auth;
AVal flashVer; //用于运行SWF播放器的Flash插件的版本,默认为“LUX 10,0,32,18"
AVal subscribepath; //要访问的流的名称
AVal usherToken;
AVal token; //SecureToken Response中要使用的key,当服务器需要一个SecureToken验证时使用
AVal pubUser;
AVal pubPasswd;
AMFObject extras;
int edepth;
int seekTime;
int stopTime;
#define RTMP_LF_AUTH 0x0001 /* using auth param */
#define RTMP_LF_LIVE 0x0002 /* stream is live */
#define RTMP_LF_SWFV 0x0004 /* do SWF verification */
#define RTMP_LF_PLST 0x0008 /* send playlist before play */
#define RTMP_LF_BUFX 0x0010 /* toggle stream on BufferEmpty msg */
#define RTMP_LF_FTCU 0x0020 /* free tcUrl on close */
#define RTMP_LF_FAPU 0x0040 /* free app on close */
int lFlags;
int swfAge;
int protocol; //服务器的rtmp协议类型
int timeout; /* connection timeout in seconds */
int pFlags; /* unused, but kept to avoid breaking ABI */
unsigned short socksport; //代理主机的端口
unsigned short port; //服务器的端口
#ifdef CRYPTO
#define RTMP_SWF_HASHLEN 32
void *dh; /* for encryption */
void *rc4keyIn;
void *rc4keyOut;
uint32_t SWFSize;
uint8_t SWFHash[RTMP_SWF_HASHLEN];
char SWFVerificationResponse[RTMP_SWF_HASHLEN+10];
#endif
} RTMP_LNK;
//RTMP业务层,即建立rtmp流之后对rtmp流做必要操作所需的参数
//必要操作如:seek操作,resume操作
/* state for read() wrapper */
typedef struct RTMP_READ
{
char *buf; //指向读取缓冲区
char *bufpos; //指向未处理数据的指针
unsigned int buflen; //未处理数据的大小
uint32_t timestamp; //RTMP流的当前时间戳
uint8_t dataType; //RTMP流的数据类型,即是否包含音频数据和视频数据 0x04为音频 0x01为视频,使用的是flv的表示法
uint8_t flags; //解析flag,包含以下几个值
#define RTMP_READ_HEADER 0x01 //表示是否在当前rtmp流的开头中插入flv header,默认不会设置这个状态,置位表示已添加flv header
#define RTMP_READ_RESUME 0x02 //表示是否要进行resume
#define RTMP_READ_NO_IGNORE 0x04
#define RTMP_READ_GOTKF 0x08 //表示是否完成了resume
#define RTMP_READ_GOTFLVK 0x10
#define RTMP_READ_SEEKING 0x20 //表示是否要执行seek操作
int8_t status; //读取的当前状态,表示当前的流的分析结果,为以下四个取值,为0表示正常
#define RTMP_READ_COMPLETE -3
#define RTMP_READ_ERROR -2
#define RTMP_READ_EOF -1
#define RTMP_READ_IGNORE 0
/* if bResume == TRUE */ //resume时需要指定的字段,用于帮助流定义resume的位置
uint8_t initialFrameType; //定位的帧的类型,即是视频帧还是音频帧
uint32_t nResumeTS; //定位的帧的时间戳
char *metaHeader; //要resume的流的metedata数据
char *initialFrame; //定位的帧的data
uint32_t nMetaHeaderSize; //要resume的流的metadata数据的尺寸
uint32_t nInitialFrameSize; //定位的帧的data length
uint32_t nIgnoredFrameCounter;
uint32_t nIgnoredFlvFrameCounter;
} RTMP_READ;
typedef struct RTMP_METHOD
{
AVal name;
int num;
} RTMP_METHOD;
//表示一个RTMP流,用于保存这个RTMP流的相关参数
typedef struct RTMP
{
int m_inChunkSize; //接收max chunk size
int m_outChunkSize; //发送max chunk size
int m_nBWCheckCounter; //带宽检测计数器
int m_nBytesIn; //接受到的字节的总数量
int m_nBytesInSent; //发送的字节的总数量
int m_nBufferMS; // 当前缓冲的时间长度,以MS为单位
int m_stream_id; /* returned in _result from createStream */ //Message Stream ID
int m_mediaChannel; //当前media使用的chunk Stream id
uint32_t m_mediaStamp; //当前media的时间戳
uint32_t m_pauseStamp; //当前media暂停时的时间戳
int m_pausing; //是否暂停状态
int m_nServerBW; //window size
int m_nClientBW; //Set Peer Bandwidth Message中的window size
uint8_t m_nClientBW2; //Set Peer Bandwidth Message中的limit type
uint8_t m_bPlaying; //当前是否play
uint8_t m_bSendEncoding;
uint8_t m_bSendCounter;
int m_numInvokes; //记录RTMP发起的invoke的数量
int m_numCalls; //m_methodCalls中的数量
RTMP_METHOD *m_methodCalls; /* remote method calls queue */
int m_channelsAllocatedIn;
int m_channelsAllocatedOut;
RTMPPacket **m_vecChannelsIn;
RTMPPacket **m_vecChannelsOut;
int *m_channelTimestamp; /* abs timestamp of last packet */
double m_fAudioCodecs; /* audioCodecs for the connect packet */
double m_fVideoCodecs; /* videoCodecs for the connect packet */
double m_fEncoding; /* AMF0 or AMF3 */
double m_fDuration; /* duration of stream in seconds */
int m_msgCounter; /* RTMPT stuff */
int m_polling;
int m_resplen;
int m_unackd;
AVal m_clientID;
RTMP_READ m_read; // RTMP_Read()操作的上下文
RTMPPacket m_write; // RTMP_Write()操作使用的可复用报文对象
RTMPSockBuf m_sb; // RTMP_ReadPacket()读包操作的上下文
RTMP_LNK Link; // RTMP连接上下文
} RTMP;