由于网上已经有许多优秀的博文讲解了Android的异步消息机制(ALooper/AHandler/AMessage那一套),而且流程也不是很复杂,所以这里将不会去讲代码流程。本篇将会记录学习过程中的疑问以及自己的解答,希望可以帮助有同样疑问的小伙伴们,如果理解有不对或者偏差,欢迎大家一起讨论。
本文中的代码参考自 http://aospxref.com/
1 总览
下图是按照我的理解绘制出的Android异步消息处理流程。过程很简单,总结起来:创建一条消息并指定消息处理对象,将消息放入队列中等待线程处理,线程找到处理对象并处理消息。
以下是异步消息机制相关UML类图:
2 ALooper
2.1 start
ALooper
的启动有RunningLocally和异步处理两种模式,来看代码:
status_t ALooper::start(
bool runOnCallingThread, bool canCallJava, int32_t priority) {
if (runOnCallingThread) {
mRunningLocally = true;
do {
} while (loop());
return OK;
}
mThread = new LooperThread(this, canCallJava);
status_t err = mThread->run(
mName.empty() ? "ALooper" : mName.c_str(), priority);
}
当start第一个参数为true时,会阻塞调用线程,这种用法见的比较少,可能会在main函数中使用,阻塞等待任务执行完成;第一个参数如果为false,则会开启一个线程执行loop函数,例如MediaCodec
中有如下使用:
mCodecLooper = new ALooper;
mCodecLooper->setName("CodecLooper");
err = mCodecLooper->start(false, false, ANDROID_PRIORITY_AUDIO);
mCodecLooper->registerHandler(mCodec);
2.2 registerHandler
从代码上来看,AHandler
和ALooper
是没有特别大的联系的,ALooper
执行AHandler
中的onMessageReceived是通过AMessage
中存储的AHandler wp
获取的。那为什么这边要多调用一个registerHandler呢?
从上面的流程图中我们可以看到,ALooper
中保存有一个类型为ALoopRoster
静态成员。调用registerHandler时会给AHandler
赋予一个id,并将其和ALooper
成对储存。如果发现AHandler
已经有id了,则说明该AHandler
已经和其他ALooper
绑定过了,不能再与当前ALooper
进行绑定。
调用registerHandler的目的是为了线程同步,假想有两个ALooper
在同时处理一个AHandler
的事务,那么该Handler中的状态很有可能就发生错乱了,如果用锁来管理,则会变得异常复杂。
ALooper::handler_id ALooperRoster::registerHandler(
const sp<ALooper> &looper, const sp<AHandler> &handler) {
Mutex::Autolock autoLock(mLock);
if (handler->id() != 0) {
CHECK(!"A handler must only be registered once.");
return INVALID_OPERATION;
}
HandlerInfo info;
info.mLooper = looper;
info.mHandler = handler;
ALooper::handler_id handlerID = mNextHandlerID++;
mHandlers.add(handlerID, info);
handler->setID(handlerID, looper);
return handlerID;
}
既然有registerHandler,那对应的也要有unregisterHandler,如果不执行,那这个AHandler
就不能再处理事件了。
2.3 stop
这里我们要注意的是stop之前我们要先执行unregisterHandler。如果是程序结束,我们可以不用去单独执行stop,因为析构函数里面会自动帮助我们执行。
另外我们顺便看下stop中Thread
的用法:
status_t ALooper::stop() {
thread->requestExit();
thread->requestExitAndWait();
}
requestExit是设置flag让线程结束,但是线程并不一定会立即结束;requestExitAndWait是会阻塞等待线程结束。
3 AMessage
3.1 AMessage存储类型
AMessage
通过Union的特性实现存储多种类型的数据:
struct AMessage : public RefBase {
private:
struct Item {
union {
int32_t int32Value;
int64_t int64Value;
size_t sizeValue;
float floatValue;
double doubleValue;
void *ptrValue;
RefBase *refValue;
AString *stringValue;
Rect rectValue;
} u;
const char *mName;
size_t mNameLength;
Type mType;
void setName(const char *name, size_t len);
Item() : mName(nullptr), mNameLength(0), mType(kTypeInt32) { }
Item(const char *name, size_t length);
};
std::vector<Item> mItems;
}
3.2 dup
AMessage
给我们提供了一个深拷贝的方法dup,这个方法经常会在Callback中使用到,例如MediaCodec
中有如下例子:
void BufferCallback::onInputBufferAvailable(
size_t index, const sp<MediaCodecBuffer> &buffer) {
sp<AMessage> notify(mNotify->dup());
notify->setInt32("what", kWhatFillThisBuffer);
notify->setSize("index", index);
notify->setObject("buffer", buffer);
notify->post();
}
3.3 postAndAwaitResponse
这是让AMessage
变成同步消息的方法,方法中会创建一个replyID,也称为Token。消息处理过程和直接调用post方法类似,不同的是执行完post将AMessage
加入到ALooper
的队列中之后,会阻塞等待。
status_t AMessage::postAndAwaitResponse(sp<AMessage> *response) {
sp<ALooper> looper = mLooper.promote();
if (looper == NULL) {
return -ENOENT;
}
sp<AReplyToken> token = looper->createReplyToken();
if (token == NULL) {
return -ENOMEM;
}
setObject("replyID", token);
looper->post(this, 0 /* delayUs */);
return looper->awaitResponse(token, response);
}
为什么要创建这个AReplyToken
对象呢?
我们在线程中处理完消息之后,有结果要返回给ALooper
(awaitResponse阻塞等待,结果返回给ALooper
就可以返回给调用者),这里有两个问题:1)如何找到处理消息的ALooper
? 2)什么时候返回结果,结束阻塞?
第一个问题很好解决,AMessage
中就存储有ALooper
的弱引用,可以通过promote来找到ALooper
;当然通过createReplyToken方法创建的AReplyToken
对象中也存储有ALooper
的弱引用,同样也可以拿到ALooper
对象。
第二个问题就需要AReplyToken
来处理了,ALooper
会阻塞等待AReplyToken
被填充数据。
status_t ALooper::awaitResponse(const sp<AReplyToken> &replyToken, sp<AMessage> *response) {
Mutex::Autolock autoLock(mRepliesLock);
CHECK(replyToken != NULL);
while (!replyToken->retrieveReply(response)) {
{
Mutex::Autolock autoLock(mLock);
if (mThread == NULL) {
return -ENOENT;
}
}
mRepliesCondition.wait(mRepliesLock);
}
return OK;
}
status_t ALooper::postReply(const sp<AReplyToken> &replyToken, const sp<AMessage> &reply) {
Mutex::Autolock autoLock(mRepliesLock);
status_t err = replyToken->setReply(reply);
if (err == OK) {
mRepliesCondition.broadcast();
}
return err;
}
AReplyToken
是如何被填充数据的?参考MediaCodec
,先调用senderAwaitsResponse从被处理的AMessage
中拿到AReplyToken
,接着创建一个新的AMessage
用于存储返回结果,当然如果没有结果返回,可以不填充任何东西,接着call AMessage
的postReply方法,这里会层层调用将结果填到AReplyToken
当中,最后结束阻塞返回结果,完成同步调用。
void MediaCodec::onMessageReceived(const sp<AMessage> &msg) {
switch (msg->what()) {
case kWhatSetCallback:
{
sp<AReplyToken> replyID;
CHECK(msg->senderAwaitsResponse(&replyID));
sp<AMessage> response = new AMessage;
response->postReply(replyID);
break;
}
}
}
到这儿Android异步消息处理机制中就学习结束了。
我这边还提供了一个学习demo可供下载 AMessageDemo
下载之后放到源码目录下编译之后,将生成的bin文件push到/system/bin下面,执行即可看到结果。
最后还有两点要注意:
- 尝试在PlayerDemo的构造函数中执行registerHandler方法,这里会出现空指针的错误,需要等到构造函数执行完成才能够registerHandler。
- 尝试把main函数中的registerHandler注释掉,可以看到消息仍旧可以正常运行。这是如预期的,因为处理消息时并没有检查
AHandler
是否已经注册。