MessageQueue类封装了与消息队列有关的操作,一个以消息驱动的系统中,最重要部分就是消息队列和消息处理循环。
MessageQueue的核心代码在native层,可以处理java和native的事件
1.1MessageQueue创建
构造方法,调用nativeInit
frameworks/base/core/java/android/os/MessageQueue.java
MessageQueue(boolean quitAllowed) {
mQuitAllowed = quitAllowed;
mPtr = nativeInit();
}
nativeInit是一个native方法,方法中调用构造初始化了native层的MessageQueue
frameworks/base/core/jni/android_os_MessageQueue.cpp
static jlong android_os_MessageQueue_nativeInit(JNIEnv* env, jclass clazz) {
NativeMessageQueue* nativeMessageQueue = new NativeMessageQueue();
if (!nativeMessageQueue) {
jniThrowRuntimeException(env, "Unable to allocate native queue");
return 0;
}
frameworks/base/core/jni/android_os_MessageQueue.cpp
NativeMessageQueue::NativeMessageQueue() :
mPollEnv(NULL), mPollObj(NULL), mExceptionObj(NULL) {
mLooper = Looper::getForThread();
if (mLooper == NULL) {
mLooper = new Looper(false);
Looper::setForThread(mLooper);
}
}
如果mLooper为空,创建mLooper(是一种以线程为单位的单例模式)。通过setForThread与线程关联
一个线程会有一个Looper来循环处理消息队列的消息,下列一行代码是获取线程的本地存储空间中的
mLooper = Looper::getForThread();
1.2 消息提取
next()方法用于从消息队列中获取下一条消息,并返回该消息对象
frameworks/base/core/java/android/os/MessageQueue.java
@UnsupportedAppUsage
Message next() {
// Return here if the message loop has already quit and been disposed.
// This can happen if the application tries to restart a looper after quit
// which is not supported.
final long ptr = mPtr;
if (ptr == 0) {
return null;
}
int pendingIdleHandlerCount = -1; // -1 only during first iteration
int nextPollTimeoutMillis = 0;
for (;;) {
if (nextPollTimeoutMillis != 0) {
Binder.flushPendingCommands();
}
nativePollOnce(ptr, nextPollTimeoutMillis);
synchronized (this) {
// Try to retrieve the next message. Return if found.
final long now = SystemClock.uptimeMillis();
Message prevMsg = null;
Message msg = mMessages;
if (msg != null && msg.target == null) {
// Stalled by a barrier. Find the next asynchronous message in the queue.
do {
prevMsg = msg;
msg = msg.next;
} while (msg != null && !msg.isAsynchronous());
}
if (msg != null) {
if (now < msg.when) {
// Next message is not ready. Set a timeout to wake up when it is ready.
nextPollTimeoutMillis = (int) Math.min(msg.when - now, Integer.MAX_VALUE);
} else {
// Got a message.
mBlocked = false;
if (prevMsg != null) {
prevMsg.next = msg.next;
} else {
mMessages = msg.next;
}
msg.next = null;
if (DEBUG) Log.v(TAG, "Returning message: " + msg);
msg.markInUse();
return msg;
}
} else {
// No more messages.
nextPollTimeoutMillis = -1;
}
// Process the quit message now that all pending messages have been handled.
if (mQuitting) {
dispose();
return null;
}
// If first time idle, then get the number of idlers to run.
// Idle handles only run if the queue is empty or if the first message
// in the queue (possibly a barrier) is due to be handled in the future.
if (pendingIdleHandlerCount < 0
&& (mMessages == null || now < mMessages.when)) {
pendingIdleHandlerCount = mIdleHandlers.size();
}
if (pendingIdleHandlerCount <= 0) {
// No idle handlers to run. Loop and wait some more.
mBlocked = true;
continue;
}
if (mPendingIdleHandlers == null) {
mPendingIdleHandlers = new IdleHandler[Math.max(pendingIdleHandlerCount, 4)];
}
mPendingIdleHandlers = mIdleHandlers.toArray(mPendingIdleHandlers);
}
// Run the idle handlers.
// We only ever reach this code block during the first iteration.
for (int i = 0; i < pendingIdleHandlerCount; i++) {
final IdleHandler idler = mPendingIdleHandlers[i];
mPendingIdleHandlers[i] = null; // release the reference to the handler
boolean keep = false;
try {
keep = idler.queueIdle();
} catch (Throwable t) {
Log.wtf(TAG, "IdleHandler threw exception", t);
}
if (!keep) {
synchronized (this) {
mIdleHandlers.remove(idler);
}
}
}
// Reset the idle handler count to 0 so we do not run them again.
pendingIdleHandlerCount = 0;
// While calling an idle handler, a new message could have been delivered
// so go back and look again for a pending message without waiting.
nextPollTimeoutMillis = 0;
}
}
首先,方法会检查消息循环是否已经退出并被销毁,如果是,则直接返回null。这种情况可能发生在应用程序在退出后尝试重新启动消息循环,但这是不被支持的。
接下来,方法会初始化两个变量pendingIdleHandlerCount和nextPollTimeoutMillis。pendingIdleHandlerCount用于记录待运行的空闲处理器(IdleHandler)的数量,初始值为-1,表示在第一次迭代时。nextPollTimeoutMillis用于设置下一次轮询的超时时间,初始值为0。
然后,方法进入一个无限循环,不断尝试从消息队列中获取下一条消息。
在每次循环中,首先检查nextPollTimeoutMillis的值,如果不为0,则调用Binder.flushPendingCommands()方法来刷新待处理的Binder命令。
接下来,调用nativePollOnce()方法来从底层获取下一条消息。该方法会阻塞当前线程,直到有消息到达或超时。
然后,使用synchronized关键字对this进行同步,以确保多线程环境下的安全访问。
在同步块中,首先尝试从消息队列中获取下一条消息。如果找到了消息,则判断该消息是否已经准备好处理。如果还未准备好,则计算出下一次轮询的超时时间,并更新nextPollTimeoutMillis的值。如果消息已经准备好处理,则将mBlocked标志设置为false,将该消息从消息队列中移除,并返回该消息。
如果没有找到消息,则将nextPollTimeoutMillis的值设置为-1,表示没有更多的消息。
接下来,检查mQuitting标志,如果为true,表示消息循环已经退出,需要进行清理操作,并返回null。
然后,检查是否是第一次空闲处理。如果是第一次空闲处理,并且消息队列为空或者下一条消息的时间还未到达,则获取待运行的空闲处理器的数量。
如果待运行的空闲处理器数量小于等于0,则将mBlocked标志设置为true,继续下一次循环。
如果待运行的空闲处理器数量大于0,则将空闲处理器列表转换为数组,并存储在mPendingIdleHandlers中。
接下来,运行空闲处理器。在第一次迭代中,会遍历所有待运行的空闲处理器,并调用它们的queueIdle()方法。如果queueIdle()方法返回false,则表示该空闲处理器不再需要运行,需要将其从空闲处理器列表中移除。
最后,将pendingIdleHandlerCount重置为0,以确保下一次循环不再运行空闲处理器。然后,将nextPollTimeoutMillis的值设置为0,以便立即进行下一次轮询。
总结起来,这段代码的作用是从消息队列中获取下一条消息,并返回该消息对象。它会根据消息的准备状态和时间戳来确定是否需要等待,同时还会处理空闲处理器的运行。这样,消息循环可以不断地处理消息,并在空闲时运行空闲处理器。
Java层投递Message
enqueueMessage()方法用于将消息添加到消息队列中。
frameworks/base/core/java/android/os/MessageQueue.java
boolean enqueueMessage(Message msg, long when) {
if (msg.target == null) {
throw new IllegalArgumentException("Message must have a target.");
}
synchronized (this) {
if (msg.isInUse()) {
throw new IllegalStateException(msg + " This message is already in use.");
}
if (mQuitting) {
IllegalStateException e = new IllegalStateException(
msg.target + " sending message to a Handler on a dead thread");
Log.w(TAG, e.getMessage(), e);
msg.recycle();
return false;
}
msg.markInUse();
msg.when = when;
Message p = mMessages;
boolean needWake;
if (p == null || when == 0 || when < p.when) {
// New head, wake up the event queue if blocked.
msg.next = p;
mMessages = msg;
needWake = mBlocked;
} else {
// Inserted within the middle of the queue. Usually we don't have to wake
// up the event queue unless there is a barrier at the head of the queue
// and the message is the earliest asynchronous message in the queue.
needWake = mBlocked && p.target == null && msg.isAsynchronous();
Message prev;
for (;;) {
prev = p;
p = p.next;
if (p == null || when < p.when) {
break;
}
if (needWake && p.isAsynchronous()) {
needWake = false;
}
}
msg.next = p; // invariant: p == prev.next
prev.next = msg;
}
// We can assume mPtr != 0 because mQuitting is false.
if (needWake) {
nativeWake(mPtr);
}
}
return true;
}
首先,方法会检查消息的target是否为null,如果是,则抛出IllegalArgumentException异常,表示消息必须具有目标。
接下来,使用synchronized关键字对this进行同步,以确保多线程环境下的安全访问。
在同步块中,首先检查消息是否已经在使用中,如果是,则抛出IllegalStateException异常,表示该消息已经在使用中。
然后,检查mQuitting标志,如果为true,表示消息循环已经退出,需要进行清理操作。在这种情况下,会抛出IllegalStateException异常,并将消息回收后返回false。
接下来,将消息标记为正在使用,并设置消息的时间戳为when。
然后,获取消息队列中的第一条消息p。
接下来,根据消息的时间戳when和当前消息队列中的消息的时间戳进行比较,确定消息的插入位置。
如果消息队列为空,或者when为0,或者when小于第一条消息的时间戳,则将消息作为新的头部插入到消息队列中,并将needWake标志设置为mBlocked的值。
如果消息需要插入到队列的中间位置,通常情况下不需要唤醒事件队列,除非队列头部有一个障碍(barrier)并且消息是队列中最早的异步消息。在这种情况下,将needWake标志设置为mBlocked的值,并遍历消息队列,找到合适的插入位置。
最后,将消息插入到队列中,并更新前后消息的关联关系。
最后,如果需要唤醒事件队列,则调用nativeWake()方法唤醒事件队列。
总结起来,这段代码的作用是将消息添加到消息队列中。它会根据消息的时间戳和队列中已有消息的时间戳来确定插入位置,并根据需要唤醒事件队列。这样,消息循环可以按照一定的顺序处理消息,并在需要时唤醒事件队列。
nativeWake
nativeWake是一个native函数,用于唤醒队列
frameworks/base/core/jni/android_os_MessageQueue.cpp
static void android_os_MessageQueue_nativeWake(JNIEnv* env, jclass clazz, jlong ptr) {
NativeMessageQueue* nativeMessageQueue = reinterpret_cast<NativeMessageQueue*>(ptr);
nativeMessageQueue->wake();
}
通过reinterpret_cast获取NativeMessageQueue对象,然后调用wake函数,wake函数调用Looper的wake
void NativeMessageQueue::wake() {
mLooper->wake();
}
wake
system/core/libutils/Looper.cpp
void Looper::wake() {
#if DEBUG_POLL_AND_WAKE
ALOGD("%p ~ wake", this);
#endif
uint64_t inc = 1;
ssize_t nWrite = TEMP_FAILURE_RETRY(write(mWakeEventFd.get(), &inc, sizeof(uint64_t)));
if (nWrite != sizeof(uint64_t)) {
if (errno != EAGAIN) {
LOG_ALWAYS_FATAL("Could not write wake signal to fd %d (returned %zd): %s",
mWakeEventFd.get(), nWrite, strerror(errno));
}
}
}
向文件描述符写入一个inc的值来实现唤醒操作
1.3 nativePollOnce分析
nativePollOnce()方法用来从底层获取下一条消息,该方法会阻塞当前线程,直到有消息到达或超时
frameworks/base/core/jni/android_os_MessageQueue.cpp
static void android_os_MessageQueue_nativePollOnce(JNIEnv* env, jobject obj,
jlong ptr, jint timeoutMillis) {
NativeMessageQueue* nativeMessageQueue = reinterpret_cast<NativeMessageQueue*>(ptr);
nativeMessageQueue->pollOnce(env, obj, timeoutMillis);
}
调用了pollOnce方法
frameworks/base/core/jni/android_os_MessageQueue.cpp
void NativeMessageQueue::pollOnce(JNIEnv* env, jobject pollObj, int timeoutMillis) {
mPollEnv = env;
mPollObj = pollObj;
mLooper->pollOnce(timeoutMillis);
mPollObj = NULL;
mPollEnv = NULL;
if (mExceptionObj) {
env->Throw(mExceptionObj);
env->DeleteLocalRef(mExceptionObj);
mExceptionObj = NULL;
}
}
调用了Looper中的pollOnce方法
system/core/libutils/Looper.cpp
int Looper::pollOnce(int timeoutMillis, int* outFd, int* outEvents, void** outData) {
int result = 0;
for (;;) {
while (mResponseIndex < mResponses.size()) {
const Response& response = mResponses.itemAt(mResponseIndex++);
int ident = response.request.ident;
if (ident >= 0) {
int fd = response.request.fd;
int events = response.events;
void* data = response.request.data;
#if DEBUG_POLL_AND_WAKE
ALOGD("%p ~ pollOnce - returning signalled identifier %d: "
"fd=%d, events=0x%x, data=%p",
this, ident, fd, events, data);
#endif
if (outFd != nullptr) *outFd = fd;
if (outEvents != nullptr) *outEvents = events;
if (outData != nullptr) *outData = data;
return ident;
}
}
if (result != 0) {
#if DEBUG_POLL_AND_WAKE
ALOGD("%p ~ pollOnce - returning result %d", this, result);
#endif
if (outFd != nullptr) *outFd = 0;
if (outEvents != nullptr) *outEvents = 0;
if (outData != nullptr) *outData = nullptr;
return result;
}
result = pollInner(timeoutMillis);
}
}
首先,定义了一个int类型的变量result,并将其初始化为0。
然后,使用一个无限循环来进行轮询操作。在每次循环中,首先通过while循环遍历mResponses中的响应对象。
在遍历过程中,获取当前响应对象response,并从中提取出标识符ident、文件描述符fd、事件events和数据data。
如果标识符ident大于等于0,则表示该响应对象是一个有效的事件。在这种情况下,将文件描述符、事件和数据分别赋值给outFd、outEvents和outData指向的变量,并返回标识符ident。
如果遍历完所有响应对象后仍未找到有效的事件,则检查result的值。如果result不等于0,则表示在之前的轮询操作中发生了错误,直接返回result。
如果以上两种情况都不满足,则调用pollInner()方法进行实际的轮询操作,并将返回的结果赋值给result。
总结起来,这段代码的作用是在事件循环中进行一次轮询操作,等待并处理事件。它通过遍历响应对象来查找有效的事件,并将相关信息返回。如果没有找到有效的事件,则返回之前的轮询结果。
int Looper::pollOnce(int timeoutMillis, int* outFd, int* outEvents, void** outData)
该方法接受四个参数:
timeoutMillis:超时等待事件,如果为-1,表示无线等到,直到有事件发生。为0表示无需等待立即返回
outFd:发生事件的那个文件描述符
outEvents:在该文件描述符上发生了哪些事件,包括可读、可写、错误和终端四个事件。四个事件都是从epoll转化而来
outData:存储上下文数据,由用户在添加监听句柄时传递,用于传递用户自定义的数据
该方法的返回值也有特殊意义:
返回值为ALOOPER_POLL_WAKE:这次返回由wake函数触发,也就是管道写端的那次写事件触发
ALOOPER_POLL_TIMEOUT:等待超时
ALOOPER_POLL_ERROR:等待过程中发生错误
ALOOPER_POLL_CALLBACK:表示某个被监听的句柄因某种原因被触发
pollInner
pollinner方法很长,截取关键部分
int Looper::pollInner(int timeoutMillis) {
...
// Poll.
int result = POLL_WAKE;
mResponses.clear();
mResponseIndex = 0;
// We are about to idle.
mPolling = true;
struct epoll_event eventItems[EPOLL_MAX_EVENTS];
int eventCount = epoll_wait(mEpollFd.get(), eventItems, EPOLL_MAX_EVENTS, timeoutMillis);
// No longer idling.
mPolling = false;
// Acquire lock.
mLock.lock();
// Rebuild epoll set if needed.
if (mEpollRebuildRequired) {
mEpollRebuildRequired = false;
rebuildEpollLocked();
goto Done;
}
// Check for poll error.
if (eventCount < 0) {
if (errno == EINTR) {
goto Done;
}
ALOGW("Poll failed with an unexpected error: %s", strerror(errno));
result = POLL_ERROR;
goto Done;
}
// Check for poll timeout.
if (eventCount == 0) {
...
result = POLL_TIMEOUT;
goto Done;
}
// Handle all events.
#if DEBUG_POLL_AND_WAKE
ALOGD("%p ~ pollOnce - handling events from %d fds", this, eventCount);
#endif
for (int i = 0; i < eventCount; i++) {
int fd = eventItems[i].data.fd;
uint32_t epollEvents = eventItems[i].events;
if (fd == mWakeEventFd.get()) {
if (epollEvents & EPOLLIN) {
awoken();
} else {
ALOGW("Ignoring unexpected epoll events 0x%x on wake event fd.", epollEvents);
}
} else {
ssize_t requestIndex = mRequests.indexOfKey(fd);
if (requestIndex >= 0) {
int events = 0;
if (epollEvents & EPOLLIN) events |= EVENT_INPUT;
if (epollEvents & EPOLLOUT) events |= EVENT_OUTPUT;
if (epollEvents & EPOLLERR) events |= EVENT_ERROR;
if (epollEvents & EPOLLHUP) events |= EVENT_HANGUP;
pushResponse(events, mRequests.valueAt(requestIndex));
} else {
ALOGW("Ignoring unexpected epoll events 0x%x on fd %d that is "
"no longer registered.", epollEvents, fd);
}
}
}
Done: ;
...
}
在这段代码中,首先调用epoll_wait函数等待事件的发生,然后根据事件的类型进行相应的处理。
在事件处理过程中,代码会检查是否需要重新构建epoll集合。如果需要重新构建,会调用rebuildEpollLocked方法,并跳转到Done标签所在的位置。
接下来,代码会检查事件的数量。如果事件数量小于0,表示发生了错误,会输出相应的错误日志,并将结果设置为POLL_ERROR,然后跳转到Done标签。
如果事件数量为0,表示发生了超时,会将结果设置为POLL_TIMEOUT,然后跳转到Done标签。
如果事件数量大于0,表示有事件发生,代码会遍历所有的事件,并根据事件的类型进行相应的处理。如果事件是唤醒事件,会调用awoken方法进行处理。如果事件是注册的文件描述符上的事件,会将事件类型和相关的请求信息添加到响应队列中。
最后,代码会跳转到Done标签所在的位置,继续执行标签后面的代码。
根据事件类型进行相应的处理后,就该处理事件了,下面是Done之后的代码
int Looper::pollInner(int timeoutMillis) {
...
// Invoke pending message callbacks.
mNextMessageUptime = LLONG_MAX;
while (mMessageEnvelopes.size() != 0) {
nsecs_t now = systemTime(SYSTEM_TIME_MONOTONIC);
const MessageEnvelope& messageEnvelope = mMessageEnvelopes.itemAt(0);
if (messageEnvelope.uptime <= now) {
// Remove the envelope from the list.
// We keep a strong reference to the handler until the call to handleMessage
// finishes. Then we drop it so that the handler can be deleted *before*
// we reacquire our lock.
{ // obtain handler
sp<MessageHandler> handler = messageEnvelope.handler;
Message message = messageEnvelope.message;
mMessageEnvelopes.removeAt(0);
mSendingMessage = true;
mLock.unlock();
#if DEBUG_POLL_AND_WAKE || DEBUG_CALLBACKS
ALOGD("%p ~ pollOnce - sending message: handler=%p, what=%d",
this, handler.get(), message.what);
#endif
handler->handleMessage(message);
} // release handler
mLock.lock();
mSendingMessage = false;
result = POLL_CALLBACK;
} else {
// The last message left at the head of the queue determines the next wakeup time.
mNextMessageUptime = messageEnvelope.uptime;
break;
}
}
// Release lock.
mLock.unlock();
// Invoke all response callbacks.
for (size_t i = 0; i < mResponses.size(); i++) {
Response& response = mResponses.editItemAt(i);
if (response.request.ident == POLL_CALLBACK) {
int fd = response.request.fd;
int events = response.events;
void* data = response.request.data;
#if DEBUG_POLL_AND_WAKE || DEBUG_CALLBACKS
ALOGD("%p ~ pollOnce - invoking fd event callback %p: fd=%d, events=0x%x, data=%p",
this, response.request.callback.get(), fd, events, data);
#endif
// Invoke the callback. Note that the file descriptor may be closed by
// the callback (and potentially even reused) before the function returns so
// we need to be a little careful when removing the file descriptor afterwards.
int callbackResult = response.request.callback->handleEvent(fd, events, data);
if (callbackResult == 0) {
removeFd(fd, response.request.seq);
}
// Clear the callback reference in the response structure promptly because we
// will not clear the response vector itself until the next poll.
response.request.callback.clear();
result = POLL_CALLBACK;
}
}
return result;
}
在这段代码中,首先会设置mNextMessageUptime为一个很大的值,以确保在没有新消息到达时不会触发下一次唤醒。
然后,代码进入一个循环,遍历消息队列中的消息。对于每个消息,代码会检查消息的到达时间是否小于等于当前时间。如果是,表示该消息已经到达,需要处理该消息。
在处理消息之前,代码会获取消息的处理器(handler)和消息内容,并从消息队列中移除该消息。然后,代码会释放锁,并调用处理器的handleMessage方法来处理该消息。(处理Native的Message,调用Native Handler的handleMessage处理该Message)
处理完消息后,代码会重新获取锁,并将mSendingMessage设置为false,表示消息处理完成。然后,将结果设置为POLL_CALLBACK,表示有回调发生。
如果消息的到达时间大于当前时间,表示还有未到达的消息,代码会将下一次唤醒时间设置为该消息的到达时间,并跳出循环。
接下来,代码会释放锁,并遍历所有的响应(response)。对于每个响应,代码会检查响应的标识是否为POLL_CALLBACK,如果是,表示需要调用回调函数。
代码会获取响应中的文件描述符(fd)、事件(events)和数据(data),然后调用回调函数的handleEvent方法进行处理。如果回调函数返回值为0,表示需要移除该文件描述符。
最后,代码会清除响应结构中的回调引用,并将结果设置为POLL_CALLBACK。然后,返回结果。
总的来说,这段代码实现了处理消息和响应的逻辑。它会遍历消息队列中的消息,处理已到达的消息,并根据响应的标识调用相应的回调函数。处理完消息和响应后,返回相应的结果。
添加监控请求
添加监控请求就是调用epoll_ctl添加文件句柄,以Nativce的Activity的代码来分析mRequests
frameworks/base/core/jni/android_app_NativeActivity.cpp
loadNativeCode_native(JNIEnv* env, jobject clazz, jstring path, jstring funcName,
jobject messageQueue, jstring internalDataDir, jstring obbDir,
jstring externalDataDir, jint sdkVersion, jobject jAssetMgr,
jbyteArray savedState, jobject classLoader, jstring libraryPath){
...
code->messageQueue->getLooper()->addFd(
code->mainWorkRead, 0, ALOOPER_EVENT_INPUT, mainWorkCallback, code.get());
...
}
调用Looper的addFd函数,第一个参数表示监听的fd,第二个参数0表示ident,第三个参数表示需要监听的事件,这里只监听可读事件,第四个参数为回调函数,当该fd发生指定事件,looper会回调该函数,第五个参数为回调函数的参数
system/core/libutils/Looper.cpp
int Looper::addFd(int fd, int ident, int events, const sp<LooperCallback>& callback, void* data) {
#if DEBUG_CALLBACKS
ALOGD("%p ~ addFd - fd=%d, ident=%d, events=0x%x, callback=%p, data=%p", this, fd, ident,
events, callback.get(), data);
#endif
if (!callback.get()) {
if (! mAllowNonCallbacks) {
ALOGE("Invalid attempt to set NULL callback but not allowed for this looper.");
return -1;
}
if (ident < 0) {
ALOGE("Invalid attempt to set NULL callback with ident < 0.");
return -1;
}
} else {
ident = POLL_CALLBACK;
}
{ // acquire lock
AutoMutex _l(mLock);
Request request;
request.fd = fd;
request.ident = ident;
request.events = events;
request.seq = mNextRequestSeq++;
request.callback = callback;
request.data = data;
if (mNextRequestSeq == -1) mNextRequestSeq = 0; // reserve sequence number -1
struct epoll_event eventItem;
request.initEventItem(&eventItem);
ssize_t requestIndex = mRequests.indexOfKey(fd);
if (requestIndex < 0) {
int epollResult = epoll_ctl(mEpollFd.get(), EPOLL_CTL_ADD, fd, &eventItem);
if (epollResult < 0) {
ALOGE("Error adding epoll events for fd %d: %s", fd, strerror(errno));
return -1;
}
mRequests.add(fd, request);
} else {
int epollResult = epoll_ctl(mEpollFd.get(), EPOLL_CTL_MOD, fd, &eventItem);
if (epollResult < 0) {
if (errno == ENOENT) {
// Tolerate ENOENT because it means that an older file descriptor was
// closed before its callback was unregistered and meanwhile a new
// file descriptor with the same number has been created and is now
// being registered for the first time. This error may occur naturally
// when a callback has the side-effect of closing the file descriptor
// before returning and unregistering itself. Callback sequence number
// checks further ensure that the race is benign.
//
// Unfortunately due to kernel limitations we need to rebuild the epoll
// set from scratch because it may contain an old file handle that we are
// now unable to remove since its file descriptor is no longer valid.
// No such problem would have occurred if we were using the poll system
// call instead, but that approach carries others disadvantages.
#if DEBUG_CALLBACKS
ALOGD("%p ~ addFd - EPOLL_CTL_MOD failed due to file descriptor "
"being recycled, falling back on EPOLL_CTL_ADD: %s",
this, strerror(errno));
#endif
epollResult = epoll_ctl(mEpollFd.get(), EPOLL_CTL_ADD, fd, &eventItem);
if (epollResult < 0) {
ALOGE("Error modifying or adding epoll events for fd %d: %s",
fd, strerror(errno));
return -1;
}
scheduleEpollRebuildLocked();
} else {
ALOGE("Error modifying epoll events for fd %d: %s", fd, strerror(errno));
return -1;
}
}
mRequests.replaceValueAt(requestIndex, request);
}
} // release lock
return 1;
}
addFd方法接受一个sp对象作为回调函数。在这段代码中,首先会检查回调函数是否为空。如果为空,会根据mAllowNonCallbacks的值来判断是否允许设置空回调函数。如果不允许,则会返回错误。如果允许,并且ident小于0,则会返回错误。
如果回调函数不为空,则会将ident设置为POLL_CALLBACK,表示该文件描述符是一个回调文件描述符。
然后,代码会获取锁,并创建一个Request对象,设置相应的属性。接着,代码会初始化一个epoll_event结构体,并调用epoll_ctl函数将文件描述符添加到epoll事件集合中。
如果添加成功,则会将Request对象添加到mRequests集合中。如果添加失败,则会根据错误类型进行处理。如果错误类型是ENOENT,表示文件描述符在移除之前已经关闭,并且一个新的文件描述符与相同的编号已经创建并且正在首次注册。在这种情况下,代码会重新将文件描述符添加到epoll事件集合中,并调用scheduleEpollRebuildLocked方法重新构建epoll事件集合。如果是其他错误类型,则表示添加失败,会返回错误。
最后,代码会释放锁,并返回成功。
总的来说,这段代码实现了向Looper对象添加文件描述符和回调函数的逻辑。它会检查回调函数是否为空,并根据情况进行处理。在添加文件描述符时,会将文件描述符添加到epoll事件集合中,并将相应的信息保存到mRequests集合中。如果添加失败,则会根据错误类型进行处理。
处理监控请求
在pollInner中,当某个监控fd发生事件后,会调用对应的Request,也就是pollInner调用的pushResponse方法
system/core/libutils/Looper.cpp
void Looper::pushResponse(int events, const Request& request) {
Response response;
response.events = events;
response.request = request;
mResponses.push(response);
}
该方法将一个Response添加到响应队列中,用于后续处理
poolInner不是单独处理request,而是先收集request,等到NativeMessage消息处理完后再处理。这表明在处理逻辑上,NativeMessage的优先级高于监控fd的优先级
添加Native的Message
system/core/libutils/Looper.cpp
void Looper::sendMessage(const sp<MessageHandler>& handler, const Message& message) {
nsecs_t now = systemTime(SYSTEM_TIME_MONOTONIC);
sendMessageAtTime(now, handler, message);
}
void Looper::sendMessageAtTime(nsecs_t uptime, const sp<MessageHandler>& handler,
const Message& message) {
#if DEBUG_CALLBACKS
ALOGD("%p ~ sendMessageAtTime - uptime=%" PRId64 ", handler=%p, what=%d",
this, uptime, handler.get(), message.what);
#endif
size_t i = 0;
{ // acquire lock
AutoMutex _l(mLock);
size_t messageCount = mMessageEnvelopes.size();
while (i < messageCount && uptime >= mMessageEnvelopes.itemAt(i).uptime) {
i += 1;
}
MessageEnvelope messageEnvelope(uptime, handler, message);
mMessageEnvelopes.insertAt(messageEnvelope, i, 1);
// Optimization: If the Looper is currently sending a message, then we can skip
// the call to wake() because the next thing the Looper will do after processing
// messages is to decide when the next wakeup time should be. In fact, it does
// not even matter whether this code is running on the Looper thread.
if (mSendingMessage) {
return;
}
} // release lock
// Wake the poll loop only when we enqueue a new message at the head.
if (i == 0) {
wake();
}
}
sendMessageAtTime方法用于向消息队列中添加一个延迟发送的消息。在这段代码中,首先会打印调试信息(如果定义了DEBUG_CALLBACKS宏),包括uptime、handler和message.what的值。
然后,代码会获取锁,并根据uptime的值找到消息队列中合适的位置插入新的消息。具体地,代码会遍历消息队列,找到第一个uptime大于等于给定uptime的位置,并将新的消息插入到该位置。
在插入消息后,代码会进行一个优化判断。如果当前Looper正在发送消息(mSendingMessage为true),则可以跳过调用wake()方法唤醒轮询循环,因为在处理完消息后,Looper会决定下一次唤醒的时间。实际上,这段代码是否在Looper线程上运行并不重要。
最后,如果新插入的消息是队列中的第一个消息(即i == 0),则调用wake()方法唤醒轮询循环。
总的来说,这段代码实现了向消息队列中添加延迟发送的消息的逻辑。它会根据给定的uptime值找到合适的位置插入消息,并在必要时唤醒轮询循环。
1.4 MessageQueue总结
· Java层提供了Looper类和MessageQueue类,其中Looper类提供循环处理消息的机制,MessageQueue类提供一个消息队列,以及插入、删除和提取消息的函数接口。另外,Handler也是在Java层常用的与消息处理相关的类。
· MessageQueue内部通过mPtr变量保存一个Native层的NativeMessageQueue对象,mMessages保存来自Java层的Message消息。
· NativeMessageQueue保存一个native的Looper对象,该Looper从ALooper派生,提供pollOnce和addFd等函数。
· Java层有Message类和Handler类,而Native层对应也有Message类和MessageHandler抽象类。在编码时,一般使用的是MessageHandler的派生类WeakMessageHandler类。
MessageQueue处理流程总结
MessageQueue核心逻辑下移到Native层后,极大地拓展了消息处理的范围,总结一下有以下几点:
1.MessageQueue继续支持来自Java层的Message消息,也就是早期的Message加Handler的处理方式。
2.MessageQueue在Native层的代表NativeMessageQueue支持来自Native层的Message,是通过Native的Message和MessageHandler来处理的。
3.NativeMessageQueue还处理通过addFd添加的Request。在后面分析输入系统时,还会大量碰到这种方式。
4.从处理逻辑上看,先是Native的Message,然后是Native的Request,最后才是Java的Message。