上一节我们了解了如何通过 onInputBufferAvailable 和 getInputBuffer 获取到 input buffer index,接下来我们一起学习上层如何拿到buffer并且向下写数据的。
1、获取 input Buffer
获取 MediaCodec 中的 buffer 有两种方式,一种是调用 getInputBuffers
获取端口上所有的buffer,另一种是根据索引获取某一个 buffer。
1.1、getInputBuffers
getInputBuffers 和 getOutputBuffers 实现方式相同,都是发送一条 kWhatGetBuffers 消息,阻塞获取 buffer 数组:
status_t MediaCodec::getInputBuffers(Vector<sp<MediaCodecBuffer> > *buffers) const {
sp<AMessage> msg = new AMessage(kWhatGetBuffers, this);
msg->setInt32("portIndex", kPortIndexInput);
msg->setPointer("buffers", buffers);
sp<AMessage> response;
return PostAndAwaitResponse(msg, &response);
}
case kWhatGetBuffers:
{
sp<AReplyToken> replyID;
CHECK(msg->senderAwaitsResponse(&replyID));
// 如果不是 executing 状态 或者 是异步的状态直接返回error
if (!isExecuting() || (mFlags & kFlagIsAsync)) {
PostReplyWithError(replyID, INVALID_OPERATION);
break;
} else if (mFlags & kFlagStickyError) {
PostReplyWithError(replyID, getStickyError());
break;
}
int32_t portIndex;
CHECK(msg->findInt32("portIndex", &portIndex));
Vector<sp<MediaCodecBuffer> > *dstBuffers;
CHECK(msg->findPointer("buffers", (void **)&dstBuffers));
dstBuffers->clear();
// If we're using input surface (either non-persistent created by
// createInputSurface(), or persistent set by setInputSurface()),
// give the client an empty input buffers array.
if (portIndex != kPortIndexInput || !mHaveInputSurface) {
if (portIndex == kPortIndexInput) {
mBufferChannel->getInputBufferArray(dstBuffers);
} else {
mBufferChannel->getOutputBufferArray(dstBuffers);
}
}
(new AMessage)->postReply(replyID);
break;
}
处理 getInputBuffers 消息之前会先判断当前的状态是否是 executing?在之前的学习中我们了解到start 之后,buffer 才会全部分配完成,所以这个方法的调用需要在start之后。另外还会判断MediaCodec是否在异步模式下运行,如果是则会直接报错,意味着异步模式是不允许上层获取到所有buffer的。
getInputBuffer 获取到的 buffer 数组是直接从 ACodecBufferChannel 中获得的,并不会从 MediaCodec 存储的内容中获得。
1.1、getInputBuffer
getInputBuffer 和 getOutputBuffer 以及 getOutputFormat 的实现方式相同,只不过函数调用回传的内容不一样:
status_t MediaCodec::getInputBuffer(size_t index, sp<MediaCodecBuffer> *buffer) {
sp<AMessage> format;
return getBufferAndFormat(kPortIndexInput, index, buffer, &format);
}
内部实现 getBufferAndFormat 并没有使用 AMessage 机制,直接使用锁来进行同步:
status_t MediaCodec::getBufferAndFormat(
size_t portIndex, size_t index,
sp<MediaCodecBuffer> *buffer, sp<AMessage> *format) {
// 检查传出参数是否为 null
if (buffer == NULL) {
ALOGE("getBufferAndFormat - null MediaCodecBuffer");
return INVALID_OPERATION;
}
// 检查传出参数是否为 null
if (format == NULL) {
ALOGE("getBufferAndFormat - null AMessage");
return INVALID_OPERATION;
}
// 清除 返回值 中的内容
buffer->clear();
format->clear();
// 调用必须检查状态是否为 isExecuting
if (!isExecuting()) {
ALOGE("getBufferAndFormat - not executing");
return INVALID_OPERATION;
}
// we do not want mPortBuffers to change during this section
// we also don't want mOwnedByClient to change during this
Mutex::Autolock al(mBufferLock);
std::vector<BufferInfo> &buffers = mPortBuffers[portIndex];
if (index >= buffers.size()) {
ALOGE("getBufferAndFormat - trying to get buffer with "
"bad index (index=%zu buffer_size=%zu)", index, buffers.size());
return INVALID_OPERATION;
}
const BufferInfo &info = buffers[index];
if (!info.mOwnedByClient) {
ALOGE("getBufferAndFormat - invalid operation "
"(the index %zu is not owned by client)", index);
return INVALID_OPERATION;
}
*buffer = info.mData;
*format = info.mData->format();
return OK;
}
mBufferLock
这个锁是用来管理 MediaCodec 持有的 mPortBuffers 的,getInputBuffer 是直接从 mPortBuffers 中获取 buffer,所以需要加锁。至于为什么这里不用异步消息机制来写,还要再考究,个人感觉是差不多的,用异步消息机制可以省略锁的使用。
2、写入数据
上层拿到 input buffer(MediaCodecBuffer),向 buffer 中写入数据之后,需要通知 ACodec 数据已经写完了,ACodec 再紧接着通知 OMX Node 读取数据。我们这里看第一个步骤,如何通知 ACodec 数据已经写入完毕了呢?
看 MediaCodec 的头文件我们发现有两个相关的接口,一个是 queueInputBuffer
,另一个是 queueSecureInputBuffer
,这两个方法使用同一个消息,只不过传递的参数会不一样。
case kWhatQueueInputBuffer:
{
sp<AReplyToken> replyID;
CHECK(msg->senderAwaitsResponse(&replyID));
if (!isExecuting()) {
PostReplyWithError(replyID, INVALID_OPERATION);
break;
} else if (mFlags & kFlagStickyError) {
PostReplyWithError(replyID, getStickyError());
break;
}
status_t err = UNKNOWN_ERROR;
if (!mLeftover.empty()) {
mLeftover.push_back(msg);
size_t index;
msg->findSize("index", &index);
err = handleLeftover(index);
} else {
err = onQueueInputBuffer(msg);
}
PostReplyWithError(replyID, err);
break;
}
处理 kWhatQueueInputBuffer 时同样会先判断当前状态是否是executing的状态,接下来的过程会有一些 CCodec 相关的流程,我们这里暂时跳过,直接看 onQueueInputBuffer。
onQueueInputBuffer 的代码非常长,主要是考虑了 ACodec 以及 CCodec,普通流以及加密流这四种情况的组合,同样的我们忽略 CCodec 相关的部分:
status_t MediaCodec::onQueueInputBuffer(const sp<AMessage> &msg) {
size_t index;
size_t offset;
size_t size;
int64_t timeUs;
uint32_t flags;
CHECK(msg->findSize("index", &index));
CHECK(msg->findInt64("timeUs", &timeUs));
CHECK(msg->findInt32("flags", (int32_t *)&flags));
std::shared_ptr<C2Buffer> c2Buffer;
sp<hardware::HidlMemory> memory;
sp<RefBase> obj;
// ......
else {
CHECK(msg->findSize("offset", &offset));
}
const CryptoPlugin::SubSample *subSamples;
size_t numSubSamples;
const uint8_t *key = NULL;
const uint8_t *iv = NULL;
CryptoPlugin::Mode mode = CryptoPlugin::kMode_Unencrypted;
// We allow the simpler queueInputBuffer API to be used even in
// secure mode, by fabricating a single unencrypted subSample.
CryptoPlugin::SubSample ss;
CryptoPlugin::Pattern pattern;
if (msg->findSize("size", &size)) {
if (hasCryptoOrDescrambler()) {
ss.mNumBytesOfClearData = size;
ss.mNumBytesOfEncryptedData = 0;
subSamples = &ss;
numSubSamples = 1;
pattern.mEncryptBlocks = 0;
pattern.mSkipBlocks = 0;
}
} else if (!c2Buffer) {
// 获取解密或者解扰需要的信息
if (!hasCryptoOrDescrambler()) {
ALOGE("[%s] queuing secure buffer without mCrypto or mDescrambler!",
mComponentName.c_str());
return -EINVAL;
}
CHECK(msg->findPointer("subSamples", (void **)&subSamples));
CHECK(msg->findSize("numSubSamples", &numSubSamples));
CHECK(msg->findPointer("key", (void **)&key));
CHECK(msg->findPointer("iv", (void **)&iv));
CHECK(msg->findInt32("encryptBlocks", (int32_t *)&pattern.mEncryptBlocks));
CHECK(msg->findInt32("skipBlocks", (int32_t *)&pattern.mSkipBlocks));
int32_t tmp;
CHECK(msg->findInt32("mode", &tmp));
mode = (CryptoPlugin::Mode)tmp;
size = 0;
for (size_t i = 0; i < numSubSamples; ++i) {
size += subSamples[i].mNumBytesOfClearData;
size += subSamples[i].mNumBytesOfEncryptedData;
}
}
if (index >= mPortBuffers[kPortIndexInput].size()) {
return -ERANGE;
}
BufferInfo *info = &mPortBuffers[kPortIndexInput][index];
sp<MediaCodecBuffer> buffer = info->mData;
// ......
if (buffer == nullptr || !info->mOwnedByClient) {
return -EACCES;
}
// 检查 buffer 相关的信息
if (offset + size > buffer->capacity()) {
return -EINVAL;
}
// 将信息整合至 MediaCodecBuffer 中
buffer->setRange(offset, size);
buffer->meta()->setInt64("timeUs", timeUs);
if (flags & BUFFER_FLAG_EOS) {
buffer->meta()->setInt32("eos", true);
}
if (flags & BUFFER_FLAG_CODECCONFIG) {
buffer->meta()->setInt32("csd", true);
}
if (mTunneled) {
TunnelPeekState previousState = mTunnelPeekState;
switch(mTunnelPeekState){
case TunnelPeekState::kEnabledNoBuffer:
buffer->meta()->setInt32("tunnel-first-frame", 1);
mTunnelPeekState = TunnelPeekState::kEnabledQueued;
ALOGV("TunnelPeekState: %s -> %s",
asString(previousState),
asString(mTunnelPeekState));
break;
case TunnelPeekState::kDisabledNoBuffer:
buffer->meta()->setInt32("tunnel-first-frame", 1);
mTunnelPeekState = TunnelPeekState::kDisabledQueued;
ALOGV("TunnelPeekState: %s -> %s",
asString(previousState),
asString(mTunnelPeekState));
break;
default:
break;
}
}
status_t err = OK;
// 如果是加密的流,并且不是 CCodec,调用 queueSecureInputBuffer
if (hasCryptoOrDescrambler() && !c2Buffer && !memory) {
AString *errorDetailMsg;
CHECK(msg->findPointer("errorDetailMsg", (void **)&errorDetailMsg));
// Notify mCrypto of video resolution changes
if (mTunneled && mCrypto != NULL) {
int32_t width, height;
if (mInputFormat->findInt32("width", &width) &&
mInputFormat->findInt32("height", &height) && width > 0 && height > 0) {
if (width != mTunneledInputWidth || height != mTunneledInputHeight) {
mTunneledInputWidth = width;
mTunneledInputHeight = height;
mCrypto->notifyResolution(width, height);
}
}
}
err = mBufferChannel->queueSecureInputBuffer(
buffer,
(mFlags & kFlagIsSecure),
key,
iv,
mode,
pattern,
subSamples,
numSubSamples,
errorDetailMsg);
if (err != OK) {
mediametrics_setInt32(mMetricsHandle, kCodecQueueSecureInputBufferError, err);
ALOGW("Log queueSecureInputBuffer error: %d", err);
}
} else {
// 否则调用 queueInputBuffer
err = mBufferChannel->queueInputBuffer(buffer);
if (err != OK) {
mediametrics_setInt32(mMetricsHandle, kCodecQueueInputBufferError, err);
ALOGW("Log queueInputBuffer error: %d", err);
}
}
if (err == OK) {
// synchronization boundary for getBufferAndFormat
Mutex::Autolock al(mBufferLock);
info->mOwnedByClient = false;
info->mData.clear();
statsBufferSent(timeUs, buffer);
}
return err;
}
删除掉 CCodec 的内容后,整体的内容变得简单很多,前面的部分是检查传入参数的正确性,中间的部分是将传入参数整合进 MediaCodecBuffer 中,后面的部分是通知 ACodec 数据已经写完。
如果码流结束,那么需要写入flag BUFFER_FLAG_EOS
,这个 flag 写入有两种情况,一种是随着数据写入flag,另一种是单独写一个flag。
如果是要传 csd buffer,那么需要写入 flag BUFFER_FLAG_CODECCONFIG
。csd buffer写入有两种,一种是在configure时传入csd 信息,input buffer到达后会自动帮我们写入 csd buffer;另一种是configure时不写,我们自己在第一个buffer到达时向内部写入csd信息,并且填入flag。
接下来讲一讲对 queueSecureInputBuffer 和 queueInputBuffer 的理解:
从queueSecureInputBuffer的名字来看,它是安全的流程中使用的,联想到之前我们会创建 secure component,很容易就会把这两个关联起来(创建secure组件后向下写入数据就要调用queueSecureInputBuffer),但是这个理解是不对的。queueSecureInputBuffer 这里的 secure 指的应该是码流本身是否是加密的,是否需要解密的意思。如果写入的是加密/加扰的码流,那么传递给decoder之前我们需要先做解密/解扰的动作,这个动作会在ACodecBufferChannel中完成,因此MediaCodec和ACodecBufferChannel都有queueSecureInputBuffer
方法,用于处理解密/解扰的流程。用于存储加密/加扰数据的buffer其实是普通buffer,解密后的数据会存储到buffer handle中被保护起来
secure组件可以使用queueInputBuffer吗?当然可以了,这种情况下上层的buffer使用的就是底层创建的buffer handle,我们需要用单独的api才能完成数据拷贝/移动,整个流程数据都是被保护的。
再抛出一个问题,当使用queueSecureInput buffer时,一定要使用secure组件吗?答案不是的哦,如果使用的是secure组件,那么解密出来的清流就是受保护的。如果使用的是non-secure组件,那么清流是不受保护的,之前的加密也就没有意义了。
如图所示,前面两列整个流程中传递的都是清流,MediaCodecBuffer都是指向同一个缓冲区。最后一列上层写给MediaCodec的是加密流,进入到ACodecBufferChannel后会进行解密,把buffer写到mCodecBuffer中。