MessageQueue 深入理解Android卷2 学习笔记

news2025/1/23 1:00:19

MessageQueue类封装了与消息队列有关的操作,一个以消息驱动的系统中,最重要部分就是消息队列和消息处理循环。
MessageQueue的核心代码在native层,可以处理java和native的事件

1.1MessageQueue创建

构造方法,调用nativeInit
frameworks/base/core/java/android/os/MessageQueue.java

MessageQueue(boolean quitAllowed) {
    mQuitAllowed = quitAllowed;
    mPtr = nativeInit();
}

nativeInit是一个native方法,方法中调用构造初始化了native层的MessageQueue
frameworks/base/core/jni/android_os_MessageQueue.cpp

static jlong android_os_MessageQueue_nativeInit(JNIEnv* env, jclass clazz) {
    NativeMessageQueue* nativeMessageQueue = new NativeMessageQueue();
    if (!nativeMessageQueue) {
        jniThrowRuntimeException(env, "Unable to allocate native queue");
        return 0;
    }

frameworks/base/core/jni/android_os_MessageQueue.cpp

NativeMessageQueue::NativeMessageQueue() :
        mPollEnv(NULL), mPollObj(NULL), mExceptionObj(NULL) {
    mLooper = Looper::getForThread();
    if (mLooper == NULL) {
        mLooper = new Looper(false);
        Looper::setForThread(mLooper);
    }
}

如果mLooper为空,创建mLooper(是一种以线程为单位的单例模式)。通过setForThread与线程关联
一个线程会有一个Looper来循环处理消息队列的消息,下列一行代码是获取线程的本地存储空间中的
mLooper = Looper::getForThread();

1.2 消息提取

next()方法用于从消息队列中获取下一条消息,并返回该消息对象
frameworks/base/core/java/android/os/MessageQueue.java

    @UnsupportedAppUsage
    Message next() {
        // Return here if the message loop has already quit and been disposed.
        // This can happen if the application tries to restart a looper after quit
        // which is not supported.
        final long ptr = mPtr;
        if (ptr == 0) {
            return null;
        }

        int pendingIdleHandlerCount = -1; // -1 only during first iteration
        int nextPollTimeoutMillis = 0;
        for (;;) {
            if (nextPollTimeoutMillis != 0) {
                Binder.flushPendingCommands();
            }

            nativePollOnce(ptr, nextPollTimeoutMillis);

            synchronized (this) {
                // Try to retrieve the next message.  Return if found.
                final long now = SystemClock.uptimeMillis();
                Message prevMsg = null;
                Message msg = mMessages;
                if (msg != null && msg.target == null) {
                    // Stalled by a barrier.  Find the next asynchronous message in the queue.
                    do {
                        prevMsg = msg;
                        msg = msg.next;
                    } while (msg != null && !msg.isAsynchronous());
                }
                if (msg != null) {
                    if (now < msg.when) {
                        // Next message is not ready.  Set a timeout to wake up when it is ready.
                        nextPollTimeoutMillis = (int) Math.min(msg.when - now, Integer.MAX_VALUE);
                    } else {
                        // Got a message.
                        mBlocked = false;
                        if (prevMsg != null) {
                            prevMsg.next = msg.next;
                        } else {
                            mMessages = msg.next;
                        }
                        msg.next = null;
                        if (DEBUG) Log.v(TAG, "Returning message: " + msg);
                        msg.markInUse();
                        return msg;
                    }
                } else {
                    // No more messages.
                    nextPollTimeoutMillis = -1;
                }

                // Process the quit message now that all pending messages have been handled.
                if (mQuitting) {
                    dispose();
                    return null;
                }

                // If first time idle, then get the number of idlers to run.
                // Idle handles only run if the queue is empty or if the first message
                // in the queue (possibly a barrier) is due to be handled in the future.
                if (pendingIdleHandlerCount < 0
                        && (mMessages == null || now < mMessages.when)) {
                    pendingIdleHandlerCount = mIdleHandlers.size();
                }
                if (pendingIdleHandlerCount <= 0) {
                    // No idle handlers to run.  Loop and wait some more.
                    mBlocked = true;
                    continue;
                }

                if (mPendingIdleHandlers == null) {
                    mPendingIdleHandlers = new IdleHandler[Math.max(pendingIdleHandlerCount, 4)];
                }
                mPendingIdleHandlers = mIdleHandlers.toArray(mPendingIdleHandlers);
            }

            // Run the idle handlers.
            // We only ever reach this code block during the first iteration.
            for (int i = 0; i < pendingIdleHandlerCount; i++) {
                final IdleHandler idler = mPendingIdleHandlers[i];
                mPendingIdleHandlers[i] = null; // release the reference to the handler

                boolean keep = false;
                try {
                    keep = idler.queueIdle();
                } catch (Throwable t) {
                    Log.wtf(TAG, "IdleHandler threw exception", t);
                }

                if (!keep) {
                    synchronized (this) {
                        mIdleHandlers.remove(idler);
                    }
                }
            }

            // Reset the idle handler count to 0 so we do not run them again.
            pendingIdleHandlerCount = 0;

            // While calling an idle handler, a new message could have been delivered
            // so go back and look again for a pending message without waiting.
            nextPollTimeoutMillis = 0;
        }
    }

首先,方法会检查消息循环是否已经退出并被销毁,如果是,则直接返回null。这种情况可能发生在应用程序在退出后尝试重新启动消息循环,但这是不被支持的。

接下来,方法会初始化两个变量pendingIdleHandlerCount和nextPollTimeoutMillis。pendingIdleHandlerCount用于记录待运行的空闲处理器(IdleHandler)的数量,初始值为-1,表示在第一次迭代时。nextPollTimeoutMillis用于设置下一次轮询的超时时间,初始值为0。

然后,方法进入一个无限循环,不断尝试从消息队列中获取下一条消息。

在每次循环中,首先检查nextPollTimeoutMillis的值,如果不为0,则调用Binder.flushPendingCommands()方法来刷新待处理的Binder命令。

接下来,调用nativePollOnce()方法来从底层获取下一条消息。该方法会阻塞当前线程,直到有消息到达或超时。

然后,使用synchronized关键字对this进行同步,以确保多线程环境下的安全访问。

在同步块中,首先尝试从消息队列中获取下一条消息。如果找到了消息,则判断该消息是否已经准备好处理。如果还未准备好,则计算出下一次轮询的超时时间,并更新nextPollTimeoutMillis的值。如果消息已经准备好处理,则将mBlocked标志设置为false,将该消息从消息队列中移除,并返回该消息。

如果没有找到消息,则将nextPollTimeoutMillis的值设置为-1,表示没有更多的消息。

接下来,检查mQuitting标志,如果为true,表示消息循环已经退出,需要进行清理操作,并返回null。

然后,检查是否是第一次空闲处理。如果是第一次空闲处理,并且消息队列为空或者下一条消息的时间还未到达,则获取待运行的空闲处理器的数量。

如果待运行的空闲处理器数量小于等于0,则将mBlocked标志设置为true,继续下一次循环。

如果待运行的空闲处理器数量大于0,则将空闲处理器列表转换为数组,并存储在mPendingIdleHandlers中。

接下来,运行空闲处理器。在第一次迭代中,会遍历所有待运行的空闲处理器,并调用它们的queueIdle()方法。如果queueIdle()方法返回false,则表示该空闲处理器不再需要运行,需要将其从空闲处理器列表中移除。

最后,将pendingIdleHandlerCount重置为0,以确保下一次循环不再运行空闲处理器。然后,将nextPollTimeoutMillis的值设置为0,以便立即进行下一次轮询。

总结起来,这段代码的作用是从消息队列中获取下一条消息,并返回该消息对象。它会根据消息的准备状态和时间戳来确定是否需要等待,同时还会处理空闲处理器的运行。这样,消息循环可以不断地处理消息,并在空闲时运行空闲处理器。

Java层投递Message
enqueueMessage()方法用于将消息添加到消息队列中。
frameworks/base/core/java/android/os/MessageQueue.java

    boolean enqueueMessage(Message msg, long when) {
        if (msg.target == null) {
            throw new IllegalArgumentException("Message must have a target.");
        }

        synchronized (this) {
            if (msg.isInUse()) {
                throw new IllegalStateException(msg + " This message is already in use.");
            }

            if (mQuitting) {
                IllegalStateException e = new IllegalStateException(
                        msg.target + " sending message to a Handler on a dead thread");
                Log.w(TAG, e.getMessage(), e);
                msg.recycle();
                return false;
            }

            msg.markInUse();
            msg.when = when;
            Message p = mMessages;
            boolean needWake;
            if (p == null || when == 0 || when < p.when) {
                // New head, wake up the event queue if blocked.
                msg.next = p;
                mMessages = msg;
                needWake = mBlocked;
            } else {
                // Inserted within the middle of the queue.  Usually we don't have to wake
                // up the event queue unless there is a barrier at the head of the queue
                // and the message is the earliest asynchronous message in the queue.
                needWake = mBlocked && p.target == null && msg.isAsynchronous();
                Message prev;
                for (;;) {
                    prev = p;
                    p = p.next;
                    if (p == null || when < p.when) {
                        break;
                    }
                    if (needWake && p.isAsynchronous()) {
                        needWake = false;
                    }
                }
                msg.next = p; // invariant: p == prev.next
                prev.next = msg;
            }

            // We can assume mPtr != 0 because mQuitting is false.
            if (needWake) {
                nativeWake(mPtr);
            }
        }
        return true;
    }

首先,方法会检查消息的target是否为null,如果是,则抛出IllegalArgumentException异常,表示消息必须具有目标。

接下来,使用synchronized关键字对this进行同步,以确保多线程环境下的安全访问。

在同步块中,首先检查消息是否已经在使用中,如果是,则抛出IllegalStateException异常,表示该消息已经在使用中。

然后,检查mQuitting标志,如果为true,表示消息循环已经退出,需要进行清理操作。在这种情况下,会抛出IllegalStateException异常,并将消息回收后返回false。

接下来,将消息标记为正在使用,并设置消息的时间戳为when。

然后,获取消息队列中的第一条消息p。

接下来,根据消息的时间戳when和当前消息队列中的消息的时间戳进行比较,确定消息的插入位置。

如果消息队列为空,或者when为0,或者when小于第一条消息的时间戳,则将消息作为新的头部插入到消息队列中,并将needWake标志设置为mBlocked的值。

如果消息需要插入到队列的中间位置,通常情况下不需要唤醒事件队列,除非队列头部有一个障碍(barrier)并且消息是队列中最早的异步消息。在这种情况下,将needWake标志设置为mBlocked的值,并遍历消息队列,找到合适的插入位置。

最后,将消息插入到队列中,并更新前后消息的关联关系。

最后,如果需要唤醒事件队列,则调用nativeWake()方法唤醒事件队列。

总结起来,这段代码的作用是将消息添加到消息队列中。它会根据消息的时间戳和队列中已有消息的时间戳来确定插入位置,并根据需要唤醒事件队列。这样,消息循环可以按照一定的顺序处理消息,并在需要时唤醒事件队列。

nativeWake
nativeWake是一个native函数,用于唤醒队列
frameworks/base/core/jni/android_os_MessageQueue.cpp

static void android_os_MessageQueue_nativeWake(JNIEnv* env, jclass clazz, jlong ptr) {
    NativeMessageQueue* nativeMessageQueue = reinterpret_cast<NativeMessageQueue*>(ptr);
    nativeMessageQueue->wake();
}

通过reinterpret_cast获取NativeMessageQueue对象,然后调用wake函数,wake函数调用Looper的wake

void NativeMessageQueue::wake() {
    mLooper->wake();
}

wake
system/core/libutils/Looper.cpp

void Looper::wake() {
#if DEBUG_POLL_AND_WAKE
    ALOGD("%p ~ wake", this);
#endif

    uint64_t inc = 1;
    ssize_t nWrite = TEMP_FAILURE_RETRY(write(mWakeEventFd.get(), &inc, sizeof(uint64_t)));
    if (nWrite != sizeof(uint64_t)) {
        if (errno != EAGAIN) {
            LOG_ALWAYS_FATAL("Could not write wake signal to fd %d (returned %zd): %s",
                             mWakeEventFd.get(), nWrite, strerror(errno));
        }
    }
}

向文件描述符写入一个inc的值来实现唤醒操作

1.3 nativePollOnce分析

nativePollOnce()方法用来从底层获取下一条消息,该方法会阻塞当前线程,直到有消息到达或超时
frameworks/base/core/jni/android_os_MessageQueue.cpp

static void android_os_MessageQueue_nativePollOnce(JNIEnv* env, jobject obj,
        jlong ptr, jint timeoutMillis) {
    NativeMessageQueue* nativeMessageQueue = reinterpret_cast<NativeMessageQueue*>(ptr);
    nativeMessageQueue->pollOnce(env, obj, timeoutMillis);
}

调用了pollOnce方法
frameworks/base/core/jni/android_os_MessageQueue.cpp

void NativeMessageQueue::pollOnce(JNIEnv* env, jobject pollObj, int timeoutMillis) {
    mPollEnv = env;
    mPollObj = pollObj;
    mLooper->pollOnce(timeoutMillis);
    mPollObj = NULL;
    mPollEnv = NULL;

    if (mExceptionObj) {
        env->Throw(mExceptionObj);
        env->DeleteLocalRef(mExceptionObj);
        mExceptionObj = NULL;
    }
}

调用了Looper中的pollOnce方法
system/core/libutils/Looper.cpp

int Looper::pollOnce(int timeoutMillis, int* outFd, int* outEvents, void** outData) {
    int result = 0;
    for (;;) {
        while (mResponseIndex < mResponses.size()) {
            const Response& response = mResponses.itemAt(mResponseIndex++);
            int ident = response.request.ident;
            if (ident >= 0) {
                int fd = response.request.fd;
                int events = response.events;
                void* data = response.request.data;
#if DEBUG_POLL_AND_WAKE
                ALOGD("%p ~ pollOnce - returning signalled identifier %d: "
                        "fd=%d, events=0x%x, data=%p",
                        this, ident, fd, events, data);
#endif
                if (outFd != nullptr) *outFd = fd;
                if (outEvents != nullptr) *outEvents = events;
                if (outData != nullptr) *outData = data;
                return ident;
            }
        }

        if (result != 0) {
#if DEBUG_POLL_AND_WAKE
            ALOGD("%p ~ pollOnce - returning result %d", this, result);
#endif
            if (outFd != nullptr) *outFd = 0;
            if (outEvents != nullptr) *outEvents = 0;
            if (outData != nullptr) *outData = nullptr;
            return result;
        }

        result = pollInner(timeoutMillis);
    }
}

首先,定义了一个int类型的变量result,并将其初始化为0。

然后,使用一个无限循环来进行轮询操作。在每次循环中,首先通过while循环遍历mResponses中的响应对象。

在遍历过程中,获取当前响应对象response,并从中提取出标识符ident、文件描述符fd、事件events和数据data。

如果标识符ident大于等于0,则表示该响应对象是一个有效的事件。在这种情况下,将文件描述符、事件和数据分别赋值给outFd、outEvents和outData指向的变量,并返回标识符ident。

如果遍历完所有响应对象后仍未找到有效的事件,则检查result的值。如果result不等于0,则表示在之前的轮询操作中发生了错误,直接返回result。

如果以上两种情况都不满足,则调用pollInner()方法进行实际的轮询操作,并将返回的结果赋值给result。

总结起来,这段代码的作用是在事件循环中进行一次轮询操作,等待并处理事件。它通过遍历响应对象来查找有效的事件,并将相关信息返回。如果没有找到有效的事件,则返回之前的轮询结果。

int Looper::pollOnce(int timeoutMillis, int* outFd, int* outEvents, void** outData)

该方法接受四个参数:
timeoutMillis:超时等待事件,如果为-1,表示无线等到,直到有事件发生。为0表示无需等待立即返回
outFd:发生事件的那个文件描述符
outEvents:在该文件描述符上发生了哪些事件,包括可读、可写、错误和终端四个事件。四个事件都是从epoll转化而来
outData:存储上下文数据,由用户在添加监听句柄时传递,用于传递用户自定义的数据

该方法的返回值也有特殊意义:
返回值为ALOOPER_POLL_WAKE:这次返回由wake函数触发,也就是管道写端的那次写事件触发
ALOOPER_POLL_TIMEOUT:等待超时
ALOOPER_POLL_ERROR:等待过程中发生错误
ALOOPER_POLL_CALLBACK:表示某个被监听的句柄因某种原因被触发
pollInner
pollinner方法很长,截取关键部分

int Looper::pollInner(int timeoutMillis) {
...
 // Poll.
    int result = POLL_WAKE;
    mResponses.clear();
    mResponseIndex = 0;

    // We are about to idle.
    mPolling = true;

    struct epoll_event eventItems[EPOLL_MAX_EVENTS];
    int eventCount = epoll_wait(mEpollFd.get(), eventItems, EPOLL_MAX_EVENTS, timeoutMillis);

    // No longer idling.
    mPolling = false;

    // Acquire lock.
    mLock.lock();

    // Rebuild epoll set if needed.
    if (mEpollRebuildRequired) {
        mEpollRebuildRequired = false;
        rebuildEpollLocked();
        goto Done;
    }

    // Check for poll error.
    if (eventCount < 0) {
        if (errno == EINTR) {
            goto Done;
        }
        ALOGW("Poll failed with an unexpected error: %s", strerror(errno));
        result = POLL_ERROR;
        goto Done;
    }

    // Check for poll timeout.
    if (eventCount == 0) {
...
        result = POLL_TIMEOUT;
        goto Done;
    }

    // Handle all events.
#if DEBUG_POLL_AND_WAKE
    ALOGD("%p ~ pollOnce - handling events from %d fds", this, eventCount);
#endif

    for (int i = 0; i < eventCount; i++) {
        int fd = eventItems[i].data.fd;
        uint32_t epollEvents = eventItems[i].events;
        if (fd == mWakeEventFd.get()) {
            if (epollEvents & EPOLLIN) {
                awoken();
            } else {
                ALOGW("Ignoring unexpected epoll events 0x%x on wake event fd.", epollEvents);
            }
        } else {
            ssize_t requestIndex = mRequests.indexOfKey(fd);
            if (requestIndex >= 0) {
                int events = 0;
                if (epollEvents & EPOLLIN) events |= EVENT_INPUT;
                if (epollEvents & EPOLLOUT) events |= EVENT_OUTPUT;
                if (epollEvents & EPOLLERR) events |= EVENT_ERROR;
                if (epollEvents & EPOLLHUP) events |= EVENT_HANGUP;
                pushResponse(events, mRequests.valueAt(requestIndex));
            } else {
                ALOGW("Ignoring unexpected epoll events 0x%x on fd %d that is "
                        "no longer registered.", epollEvents, fd);
            }
        }
    }
Done: ;
...
}

在这段代码中,首先调用epoll_wait函数等待事件的发生,然后根据事件的类型进行相应的处理。

在事件处理过程中,代码会检查是否需要重新构建epoll集合。如果需要重新构建,会调用rebuildEpollLocked方法,并跳转到Done标签所在的位置。

接下来,代码会检查事件的数量。如果事件数量小于0,表示发生了错误,会输出相应的错误日志,并将结果设置为POLL_ERROR,然后跳转到Done标签。

如果事件数量为0,表示发生了超时,会将结果设置为POLL_TIMEOUT,然后跳转到Done标签。

如果事件数量大于0,表示有事件发生,代码会遍历所有的事件,并根据事件的类型进行相应的处理。如果事件是唤醒事件,会调用awoken方法进行处理。如果事件是注册的文件描述符上的事件,会将事件类型和相关的请求信息添加到响应队列中。

最后,代码会跳转到Done标签所在的位置,继续执行标签后面的代码。
根据事件类型进行相应的处理后,就该处理事件了,下面是Done之后的代码

int Looper::pollInner(int timeoutMillis) {
...
    // Invoke pending message callbacks.
    mNextMessageUptime = LLONG_MAX;
    while (mMessageEnvelopes.size() != 0) {
        nsecs_t now = systemTime(SYSTEM_TIME_MONOTONIC);
        const MessageEnvelope& messageEnvelope = mMessageEnvelopes.itemAt(0);
        if (messageEnvelope.uptime <= now) {
            // Remove the envelope from the list.
            // We keep a strong reference to the handler until the call to handleMessage
            // finishes.  Then we drop it so that the handler can be deleted *before*
            // we reacquire our lock.
            { // obtain handler
                sp<MessageHandler> handler = messageEnvelope.handler;
                Message message = messageEnvelope.message;
                mMessageEnvelopes.removeAt(0);
                mSendingMessage = true;
                mLock.unlock();

#if DEBUG_POLL_AND_WAKE || DEBUG_CALLBACKS
                ALOGD("%p ~ pollOnce - sending message: handler=%p, what=%d",
                        this, handler.get(), message.what);
#endif
                handler->handleMessage(message);
            } // release handler

            mLock.lock();
            mSendingMessage = false;
            result = POLL_CALLBACK;
        } else {
            // The last message left at the head of the queue determines the next wakeup time.
            mNextMessageUptime = messageEnvelope.uptime;
            break;
        }
    }

    // Release lock.
    mLock.unlock();

    // Invoke all response callbacks.
    for (size_t i = 0; i < mResponses.size(); i++) {
        Response& response = mResponses.editItemAt(i);
        if (response.request.ident == POLL_CALLBACK) {
            int fd = response.request.fd;
            int events = response.events;
            void* data = response.request.data;
#if DEBUG_POLL_AND_WAKE || DEBUG_CALLBACKS
            ALOGD("%p ~ pollOnce - invoking fd event callback %p: fd=%d, events=0x%x, data=%p",
                    this, response.request.callback.get(), fd, events, data);
#endif
            // Invoke the callback.  Note that the file descriptor may be closed by
            // the callback (and potentially even reused) before the function returns so
            // we need to be a little careful when removing the file descriptor afterwards.
            int callbackResult = response.request.callback->handleEvent(fd, events, data);
            if (callbackResult == 0) {
                removeFd(fd, response.request.seq);
            }

            // Clear the callback reference in the response structure promptly because we
            // will not clear the response vector itself until the next poll.
            response.request.callback.clear();
            result = POLL_CALLBACK;
        }
    }
    return result;
}

在这段代码中,首先会设置mNextMessageUptime为一个很大的值,以确保在没有新消息到达时不会触发下一次唤醒。
然后,代码进入一个循环,遍历消息队列中的消息。对于每个消息,代码会检查消息的到达时间是否小于等于当前时间。如果是,表示该消息已经到达,需要处理该消息。

在处理消息之前,代码会获取消息的处理器(handler)和消息内容,并从消息队列中移除该消息。然后,代码会释放锁,并调用处理器的handleMessage方法来处理该消息。(处理Native的Message,调用Native Handler的handleMessage处理该Message)

处理完消息后,代码会重新获取锁,并将mSendingMessage设置为false,表示消息处理完成。然后,将结果设置为POLL_CALLBACK,表示有回调发生。

如果消息的到达时间大于当前时间,表示还有未到达的消息,代码会将下一次唤醒时间设置为该消息的到达时间,并跳出循环。

接下来,代码会释放锁,并遍历所有的响应(response)。对于每个响应,代码会检查响应的标识是否为POLL_CALLBACK,如果是,表示需要调用回调函数。

代码会获取响应中的文件描述符(fd)、事件(events)和数据(data),然后调用回调函数的handleEvent方法进行处理。如果回调函数返回值为0,表示需要移除该文件描述符。

最后,代码会清除响应结构中的回调引用,并将结果设置为POLL_CALLBACK。然后,返回结果。

总的来说,这段代码实现了处理消息和响应的逻辑。它会遍历消息队列中的消息,处理已到达的消息,并根据响应的标识调用相应的回调函数。处理完消息和响应后,返回相应的结果。
添加监控请求
添加监控请求就是调用epoll_ctl添加文件句柄,以Nativce的Activity的代码来分析mRequests
frameworks/base/core/jni/android_app_NativeActivity.cpp

loadNativeCode_native(JNIEnv* env, jobject clazz, jstring path, jstring funcName,
        jobject messageQueue, jstring internalDataDir, jstring obbDir,
        jstring externalDataDir, jint sdkVersion, jobject jAssetMgr,
        jbyteArray savedState, jobject classLoader, jstring libraryPath){
...        
        
    code->messageQueue->getLooper()->addFd(
            code->mainWorkRead, 0, ALOOPER_EVENT_INPUT, mainWorkCallback, code.get());     
...     
        
}

调用Looper的addFd函数,第一个参数表示监听的fd,第二个参数0表示ident,第三个参数表示需要监听的事件,这里只监听可读事件,第四个参数为回调函数,当该fd发生指定事件,looper会回调该函数,第五个参数为回调函数的参数

system/core/libutils/Looper.cpp

int Looper::addFd(int fd, int ident, int events, const sp<LooperCallback>& callback, void* data) {
#if DEBUG_CALLBACKS
    ALOGD("%p ~ addFd - fd=%d, ident=%d, events=0x%x, callback=%p, data=%p", this, fd, ident,
            events, callback.get(), data);
#endif

    if (!callback.get()) {
        if (! mAllowNonCallbacks) {
            ALOGE("Invalid attempt to set NULL callback but not allowed for this looper.");
            return -1;
        }

        if (ident < 0) {
            ALOGE("Invalid attempt to set NULL callback with ident < 0.");
            return -1;
        }
    } else {
        ident = POLL_CALLBACK;
    }

    { // acquire lock
        AutoMutex _l(mLock);

        Request request;
        request.fd = fd;
        request.ident = ident;
        request.events = events;
        request.seq = mNextRequestSeq++;
        request.callback = callback;
        request.data = data;
        if (mNextRequestSeq == -1) mNextRequestSeq = 0; // reserve sequence number -1

        struct epoll_event eventItem;
        request.initEventItem(&eventItem);

        ssize_t requestIndex = mRequests.indexOfKey(fd);
        if (requestIndex < 0) {
            int epollResult = epoll_ctl(mEpollFd.get(), EPOLL_CTL_ADD, fd, &eventItem);
            if (epollResult < 0) {
                ALOGE("Error adding epoll events for fd %d: %s", fd, strerror(errno));
                return -1;
            }
            mRequests.add(fd, request);
        } else {
            int epollResult = epoll_ctl(mEpollFd.get(), EPOLL_CTL_MOD, fd, &eventItem);
            if (epollResult < 0) {
                if (errno == ENOENT) {
                    // Tolerate ENOENT because it means that an older file descriptor was
                    // closed before its callback was unregistered and meanwhile a new
                    // file descriptor with the same number has been created and is now
                    // being registered for the first time.  This error may occur naturally
                    // when a callback has the side-effect of closing the file descriptor
                    // before returning and unregistering itself.  Callback sequence number
                    // checks further ensure that the race is benign.
                    //
                    // Unfortunately due to kernel limitations we need to rebuild the epoll
                    // set from scratch because it may contain an old file handle that we are
                    // now unable to remove since its file descriptor is no longer valid.
                    // No such problem would have occurred if we were using the poll system
                    // call instead, but that approach carries others disadvantages.
#if DEBUG_CALLBACKS
                    ALOGD("%p ~ addFd - EPOLL_CTL_MOD failed due to file descriptor "
                            "being recycled, falling back on EPOLL_CTL_ADD: %s",
                            this, strerror(errno));
#endif
                    epollResult = epoll_ctl(mEpollFd.get(), EPOLL_CTL_ADD, fd, &eventItem);
                    if (epollResult < 0) {
                        ALOGE("Error modifying or adding epoll events for fd %d: %s",
                                fd, strerror(errno));
                        return -1;
                    }
                    scheduleEpollRebuildLocked();
                } else {
                    ALOGE("Error modifying epoll events for fd %d: %s", fd, strerror(errno));
                    return -1;
                }
            }
            mRequests.replaceValueAt(requestIndex, request);
        }
    } // release lock
    return 1;
}

addFd方法接受一个sp对象作为回调函数。在这段代码中,首先会检查回调函数是否为空。如果为空,会根据mAllowNonCallbacks的值来判断是否允许设置空回调函数。如果不允许,则会返回错误。如果允许,并且ident小于0,则会返回错误。

如果回调函数不为空,则会将ident设置为POLL_CALLBACK,表示该文件描述符是一个回调文件描述符。

然后,代码会获取锁,并创建一个Request对象,设置相应的属性。接着,代码会初始化一个epoll_event结构体,并调用epoll_ctl函数将文件描述符添加到epoll事件集合中。

如果添加成功,则会将Request对象添加到mRequests集合中。如果添加失败,则会根据错误类型进行处理。如果错误类型是ENOENT,表示文件描述符在移除之前已经关闭,并且一个新的文件描述符与相同的编号已经创建并且正在首次注册。在这种情况下,代码会重新将文件描述符添加到epoll事件集合中,并调用scheduleEpollRebuildLocked方法重新构建epoll事件集合。如果是其他错误类型,则表示添加失败,会返回错误。

最后,代码会释放锁,并返回成功。

总的来说,这段代码实现了向Looper对象添加文件描述符和回调函数的逻辑。它会检查回调函数是否为空,并根据情况进行处理。在添加文件描述符时,会将文件描述符添加到epoll事件集合中,并将相应的信息保存到mRequests集合中。如果添加失败,则会根据错误类型进行处理。
处理监控请求
在pollInner中,当某个监控fd发生事件后,会调用对应的Request,也就是pollInner调用的pushResponse方法
system/core/libutils/Looper.cpp

void Looper::pushResponse(int events, const Request& request) {
    Response response;
    response.events = events;
    response.request = request;
    mResponses.push(response);
}

该方法将一个Response添加到响应队列中,用于后续处理
poolInner不是单独处理request,而是先收集request,等到NativeMessage消息处理完后再处理。这表明在处理逻辑上,NativeMessage的优先级高于监控fd的优先级

添加Native的Message
system/core/libutils/Looper.cpp

void Looper::sendMessage(const sp<MessageHandler>& handler, const Message& message) {
    nsecs_t now = systemTime(SYSTEM_TIME_MONOTONIC);
    sendMessageAtTime(now, handler, message);
}
void Looper::sendMessageAtTime(nsecs_t uptime, const sp<MessageHandler>& handler,
        const Message& message) {
#if DEBUG_CALLBACKS
    ALOGD("%p ~ sendMessageAtTime - uptime=%" PRId64 ", handler=%p, what=%d",
            this, uptime, handler.get(), message.what);
#endif

    size_t i = 0;
    { // acquire lock
        AutoMutex _l(mLock);

        size_t messageCount = mMessageEnvelopes.size();
        while (i < messageCount && uptime >= mMessageEnvelopes.itemAt(i).uptime) {
            i += 1;
        }

        MessageEnvelope messageEnvelope(uptime, handler, message);
        mMessageEnvelopes.insertAt(messageEnvelope, i, 1);

        // Optimization: If the Looper is currently sending a message, then we can skip
        // the call to wake() because the next thing the Looper will do after processing
        // messages is to decide when the next wakeup time should be.  In fact, it does
        // not even matter whether this code is running on the Looper thread.
        if (mSendingMessage) {
            return;
        }
    } // release lock

    // Wake the poll loop only when we enqueue a new message at the head.
    if (i == 0) {
        wake();
    }
}

sendMessageAtTime方法用于向消息队列中添加一个延迟发送的消息。在这段代码中,首先会打印调试信息(如果定义了DEBUG_CALLBACKS宏),包括uptime、handler和message.what的值。

然后,代码会获取锁,并根据uptime的值找到消息队列中合适的位置插入新的消息。具体地,代码会遍历消息队列,找到第一个uptime大于等于给定uptime的位置,并将新的消息插入到该位置。

在插入消息后,代码会进行一个优化判断。如果当前Looper正在发送消息(mSendingMessage为true),则可以跳过调用wake()方法唤醒轮询循环,因为在处理完消息后,Looper会决定下一次唤醒的时间。实际上,这段代码是否在Looper线程上运行并不重要。

最后,如果新插入的消息是队列中的第一个消息(即i == 0),则调用wake()方法唤醒轮询循环。

总的来说,这段代码实现了向消息队列中添加延迟发送的消息的逻辑。它会根据给定的uptime值找到合适的位置插入消息,并在必要时唤醒轮询循环。

1.4 MessageQueue总结

在这里插入图片描述

· Java层提供了Looper类和MessageQueue类,其中Looper类提供循环处理消息的机制,MessageQueue类提供一个消息队列,以及插入、删除和提取消息的函数接口。另外,Handler也是在Java层常用的与消息处理相关的类。

· MessageQueue内部通过mPtr变量保存一个Native层的NativeMessageQueue对象,mMessages保存来自Java层的Message消息。

· NativeMessageQueue保存一个native的Looper对象,该Looper从ALooper派生,提供pollOnce和addFd等函数。

· Java层有Message类和Handler类,而Native层对应也有Message类和MessageHandler抽象类。在编码时,一般使用的是MessageHandler的派生类WeakMessageHandler类。

MessageQueue处理流程总结
MessageQueue核心逻辑下移到Native层后,极大地拓展了消息处理的范围,总结一下有以下几点:
1.MessageQueue继续支持来自Java层的Message消息,也就是早期的Message加Handler的处理方式。

2.MessageQueue在Native层的代表NativeMessageQueue支持来自Native层的Message,是通过Native的Message和MessageHandler来处理的。

3.NativeMessageQueue还处理通过addFd添加的Request。在后面分析输入系统时,还会大量碰到这种方式。

4.从处理逻辑上看,先是Native的Message,然后是Native的Request,最后才是Java的Message。

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.coloradmin.cn/o/1139546.html

如若内容造成侵权/违法违规/事实不符,请联系多彩编程网进行投诉反馈,一经查实,立即删除!

相关文章

外网访问|SD-WAN跨境网络专线助力企业摆脱网络困境

在如今国际市场的大趋势下&#xff0c;跨境访问和沟通对于外贸企业来说至关重要&#xff0c;国际市场的竞争越来越激烈&#xff0c;外贸企业需要与全球各地的合作伙伴、客户和供应商保持紧密的跨境访问和沟通。而在解决跨境网络困境方面&#xff0c;MPLS、VPN和SD-WAN是常见的选…

CVE漏洞复现-CVE-2023-38831 WinRAR代码执行漏洞

CVE-2023-38831 WinRAR代码执行漏洞 WinRAR介绍 WinRAR 是一款功能强大的压缩包管理器&#xff0c;它是档案工具RAR在 Windows环境下的图形界面。 该软件可用于备份数据&#xff0c;缩减电子邮件附件的大小&#xff0c;解压缩从 Internet 上下载的RAR、ZIP及其它类型文件&…

装了固态硬盘Win10开机很慢的解决方法

在Win10电脑中&#xff0c;用户反映自己装了固态硬盘后&#xff0c;电脑开机变得很慢&#xff0c;想知道解决此问题的方法。接下来小编给大家详细介绍关于解决装了固态硬盘Win10电脑开机很慢的问题&#xff0c;解决后Win10电脑开机速度就能变得更快。 装了固态硬盘Win10开机很慢…

【速看】如何通过合理的封装,让你的自动化脚本更上一层楼!

1. 前言 上一篇推文利用一个在图片范围内实现随机坐标点击的例子&#xff0c;去教会大家如何将自己想要的效果实现出来&#xff0c;受到大家的热情反响&#xff0c;在我们官方讨论群中&#xff0c;还有大佬对我们的示例代码进行优化改进&#xff0c;做了很多合理的函数封装&…

B链圆桌派 — 创新的去中心化存储网络 BNB GREENFIELD 主网上线

B链圆桌派 主題: BNB GREENFIELD主网上线 - 创新的去中心化资料储存网路 日期: 10/19, 8 pm utc8 頻道&#xff1a; BNB Chain 华语电报群 ○ AMA环节 ○ BNB GREENFIELD主网上线 一、回复主持人问题 嘉宾回答主持人提出的问题。本环节请大家保持安静&#xff0c;专注嘉宾…

Flink CDC 2.0 主要是借鉴 DBLog 算法

DBLog 算法原理 DBLog 这个算法的原理分成两个部分&#xff0c;第一部分是分 chunk&#xff0c;第二部分是读 chunk。分 chunk 就是把一张表分为多个 chunk&#xff08;桶/片&#xff09;。我可以把这些 chunk 分发给不同的并发的 task 去做。例如&#xff1a;有 reader1 和 re…

Windows详细安装和彻底删除RabbitMQ图文流程

RabbiitMQ简介 RabbitMQ是实现了高级消息队列协议&#xff08;AMQP&#xff1a;Advanced Message Queue Protocol&#xff09;的开源消息代理软件&#xff08;亦称面向消息的中间件&#xff09;。RabbitMQ服务器是用Erlang语言编写的&#xff0c;而聚类和故障转移是构建在开放…

Redis -- 基础知识3 数据类型及指令

FLUSHALL:清空所有键值对操作(最好别搞,删库要被绳之以法的) 1.string类型 1.介绍 1.redis的字符串,直接按照二进制进行存储,所以可以存储任何数据,取出时不需要转码 2.redis的string类型,限制大小最大为512M,因为为单线程模型为了操作短平快 2.操作 1.set与get set key value …

财务数字化转型是什么?_光点科技

财务数字化转型是当今企业发展中的一项关键策略&#xff0c;旨在借助先进的数字技术&#xff0c;重新塑造和优化财务管理体系&#xff0c;以适应迅速变化的商业环境。这一转型不仅仅是技术的升级&#xff0c;更是对企业财务理念和流程的全面升级和改革。 财务数字化转型的核心在…

【Java】Java开发环境搭建

Java开发环境搭建 文章目录 Java开发环境搭建1. Java开发环境搭建1.1 什么是JDK、JRE1.2 JDK的下载1.3 JDK的安装1.4 配置path环境变量1.4.1 理解path环境变量1.4.2 JDK8配置&#xff1a;配置JAVA_HOMEpath1.4.3 JDK17配置方案&#xff1a;自动配置 1. Java开发环境搭建 1.1 什…

掌握CSS Flexbox,打造完美响应式布局,适配各种设备!

&#x1f3ac; 江城开朗的豌豆&#xff1a;个人主页 &#x1f525; 个人专栏 :《 VUE 》 《 javaScript 》 &#x1f4dd; 个人网站 :《 江城开朗的豌豆&#x1fadb; 》 ⛺️ 生活的理想&#xff0c;就是为了理想的生活 ! ​ 目录 ⭐ 专栏简介 &#x1f4d8; 文章引言 基…

磁盘监控:告警时发送邮件

1.配置邮箱 1.编辑邮箱配置文件 vim /etc/mail.rc2.在末尾输入自己的邮箱配置&#xff0c;以163邮箱为例 #开启ssl set ssl-verifyignore #证书目录&#xff0c;下方为centos系统证书默认位置&#xff0c;也自行生成证书并指定 set nss-config-dir/etc/pki/nssdb # 配置的第…

Windows 11 如何同步文件到OneDrive ?

Windows 11 中的 OneDrive OneDrive 默认内置于 Windows 11系统中。Windows 11从设置中删除了原来的两种备份方法并添加了OneDrive。更重要的是&#xff0c;微软在设置中添加了OneDrive Sync&#xff0c;以取代设置中的备份功能。 您可以使用 OneDrive 在计算机和云之间同步文…

【Leetcode】200. 岛屿数量

给你一个由 1&#xff08;陆地&#xff09;和 0&#xff08;水&#xff09;组成的的二维网格&#xff0c;请你计算网格中岛屿的数量。 岛屿总是被水包围&#xff0c;并且每座岛屿只能由水平方向和/或竖直方向上相邻的陆地连接形成。 此外&#xff0c;你可以假设该网格的四条边…

【信创】 银河麒麟 软件目录及 常见问题汇总(持续更新)

1. 官方问题列举常见问题 常见问题国产操作系统、银河麒麟、中标麒麟、开放麒麟、星光麒麟——麒麟软件官方网站 2. 官方资源池目录 Index of /kylin/KYLIN-ALL/pool/ 如 gcc 的安装依赖 deb包目录&#xff1a; 3. 麒麟系统各版本 软件源使用方法

70 搜索插入位置

搜索插入位置 题解1 二分查找防越界写法 题解2 STL大法两行 给定一个排序数组和一个目标值&#xff0c;在数组中找到目标值&#xff0c; 并返回其索引。如果目标值不存在于数组中&#xff0c; 返回它将会被按顺序插入的位置。 请必须使用时间复杂度为 O ( l o g n ) O(log n…

学习paddle-detection(paddlex的使用)

首先下载paddlex&#xff08;网页&#xff09;的本地软件&#xff0c;下载链接如下&#xff1a; paddlex 下载完成后进行安装 打开后选择开发者模式&#xff0c;开发者模式主要是和VScode进行集成 本章节主要介绍在开发者模式下可以查看和编辑的文件及其作用&#xff0c;关于…

Vuex插件的安装与使用原理

Vuex插件的安装与使用原理 Vuex安装和环境搭建 安装Vuex 第一步&#xff1a;打开CMD窗口&#xff0c;通过命令转到Vue的安装路径第二步&#xff1a;输入安装Vuex的命令 vue2 安装 Vuex3 版本&#xff1a;npm i vuex3vue3 安装 Vuex4 版本&#xff1a;npm i vuex4 当在CMD窗口…

pycharm运行R语言脚本(环境安装)

文章目录 简介1. pycharm安装插件2. 安装R语言解释器2.1下载安装包2.2具体安装过程 3.编辑环境变量4.pycharm中配置安装好的R语言解释器 简介 pycharm 安装 R language for Intellij R language for Intellij 是一个插件&#xff0c;它为Intellij IDEA集成开发环境添加了对R语…

Bootstrap 中CSS媒体查询分辨率 @media(min-width)

媒体查询是非常别致的"有条件的 CSS 规则"。它只适用于一些基于某些规定条件的 CSS。如果满足那些条件&#xff0c;则应用相应的样式。 Bootstrap 中的媒体查询允许您基于视口大小移动、显示并隐藏内容。下面的媒体查询在 LESS 文件中使用&#xff0c;用来创建 Boot…