Android11 framework Handler
- 引言
- Handler工作流程
- MessageQueue主要函数
- Looper主要函数
- 思考
- 1.一个线程有几个handler,有几个looper
- 2.为什么handler会有内存泄漏
- 3.如果想要在子线程new Handler怎么做?
- 4.子线程中的loop如果消息队列中没有消息处理的时候怎么做?
- 消息睡眠和唤醒机制
- 5.多个可能存在于不同线程的Handler往MQ中添加数据,怎么确保线程安全?
- 6.使用Message应该如何创建
- 7.Looper死循环block为什么不会导致应用ANR
- 同步屏障
- HandlerThread
- IntentService——HandlerThread的应用
引言
电梯,把人从楼下运送到楼上;即连接楼层
handler,把消息从主/子线程运送到子/主线程;即连接不同线程的消息管理机制
handler真正的作用,是所有的代码都是在handler上运行的;因为app启动时,会通过ActivityThread
的main方法启动JVM,然后通过Looper.prepareMainLopper()
和Looper.loop()
启动loop死循环等待消息。
Handler工作流程
一般我们使用handler是以send*
方法开始,以执行重写的handler.handleMessage()
中的逻辑结束
即 handler -> sendMessage() -> MessageQueue.enqueueMessage()
和 Lopper.loop() -> MessageQueue.next() -> Handler.dispatchMessage() -> handler.handleMessage()
而通过翻阅源码,可以发现不管是调用send*
也好,调用post()
也好,最终都是调用到sendMessageAtTime()
在sendMessageAtTime()
中调用Handler.enqueueMessage()
,从而调用MessageQueue.enqueueMessage()
,MessageQueue.enqueueMessage()
中进行msg的入队操作,而MessageQueue.next()
中会进行出队操作,被loop()中的死循环调用,顺序取出msg,然后进行dispatchMessage()
操作进行逻辑处理
注意:假设在点击事件中sendmsg(此时在子线程),而handleMessage是在ui线程(一般为主线程)中执行,即完成了子线程和主线程的消息传递。
MessageQueue主要函数
消息队列是一个由单链表实现的优先级队列
Message中有一个Message类型的属性next
即msg->next(msg)->next(msg)->...
MessageQueue.enqueueMessage()
Message p = mMessages;
boolean needWake;
if (p == null || when == 0 || when < p.when) {
// New head, wake up the event queue if blocked.
msg.next = p;
mMessages = msg;
needWake = mBlocked;
} else {
// Inserted within the middle of the queue. Usually we don't have to wake
// up the event queue unless there is a barrier at the head of the queue
// and the message is the earliest asynchronous message in the queue.
needWake = mBlocked && p.target == null && msg.isAsynchronous();
Message prev;
for (;;) {
prev = p;
p = p.next;
if (p == null || when < p.when) {
break;
}
if (needWake && p.isAsynchronous()) {
needWake = false;
}
}
msg.next = p; // invariant: p == prev.next
prev.next = msg;
}
其中for循环表示以插入排序进行msg管理
即MessageQueue.enqueueMessage()
插入msg使用的是其when属性
而MessageQueue.next()
中取msg是从头部取的,满足队列的属性,为优先级队列
Looper主要函数
核心就在于其构造方法
、loop()
和ThreadLocal
private Looper(boolean quitAllowed) {
mQueue = new MessageQueue(quitAllowed);
mThread = Thread.currentThread();
}
...
/** Initialize the current thread as a looper.
* This gives you a chance to create handlers that then reference
* this looper, before actually starting the loop. Be sure to call
* {@link #loop()} after calling this method, and end it by calling
* {@link #quit()}.
*/
public static void prepare() {
prepare(true);
}
private static void prepare(boolean quitAllowed) {
if (sThreadLocal.get() != null) {
throw new RuntimeException("Only one Looper may be created per thread");
}
sThreadLocal.set(new Looper(quitAllowed));
}
其中 ThreadLocal
是一个线程上下文的存储变量
这样一个线程只有一个looper
因为一个Thread->一个ThreadLocalMap<唯一的ThreadLocal, values>->唯一的Looper>MessageQueue
注:Looper中MessageQueue对象为final修饰,且在构造方法中初始化,并提供给Handler
思考
1.一个线程有几个handler,有几个looper
一个线程可以有很多个handler,因为可以在任何地方new;但是一个线程只能由一个looper,通过Threadlocal中注入looper泛型,而Looper.prepare()
方法在构造handler之前会判断当前ThreadLocal是否已经set过looper,保证唯一性
2.为什么handler会有内存泄漏
handler中的callback作为匿名内部类如果持有外部类对象,就会引发内存泄漏,原因如下
Handler.java
public boolean sendMessageAtTime(@NonNull Message msg, long uptimeMillis) {
MessageQueue queue = mQueue;
if (queue == null) {
RuntimeException e = new RuntimeException(
this + " sendMessageAtTime() called with no mQueue");
Log.w("Looper", e.getMessage(), e);
return false;
}
return enqueueMessage(queue, msg, uptimeMillis);
}
private boolean enqueueMessage(@NonNull MessageQueue queue, @NonNull Message msg,
long uptimeMillis) {
msg.target = this;
msg.workSourceUid = ThreadLocalWorkSource.getUid();
if (mAsynchronous) {
msg.setAsynchronous(true);
}
return queue.enqueueMessage(msg, uptimeMillis);
}
其中msg.target = this;
说明msg引用了handler
而msg可能会带有长延时,那么在这个延时中,msg持有handler,handler持有activity/fragment,那么ac/frag会一直不被回收
3.如果想要在子线程new Handler怎么做?
需要手动调用prepare()
和loop()
,用于初始化Looper、MQ(MessageQueue)和开启loop循环
4.子线程中的loop如果消息队列中没有消息处理的时候怎么做?
loop()中的死循环需要退出,msg
就需要为null,即queue.next()
返回null
for (;;) {
Message msg = queue.next(); // might block
if (msg == null) {
// No message indicates that the message queue is quitting.
return;
}
...
}
MessageQueue.java
Message next() {
...
if (mQuitting) {
dispose();
return null;
}
...
}
即mQuitting
需要为true
void quit(boolean safe) {
if (!mQuitAllowed) {
throw new IllegalStateException("Main thread not allowed to quit.");
}
synchronized (this) {
if (mQuitting) {
return;
}
mQuitting = true;
if (safe) {
removeAllFutureMessagesLocked();
} else {
removeAllMessagesLocked();
}
// We can assume mPtr != 0 because mQuitting was previously false.
nativeWake(mPtr);
}
}
通过removeAll...()
清除所有消息,然后通过nativeWake(mPtr)
唤醒,唤醒是为了避免next()
方法处于阻塞状态,下面会说明
Looper.java
public void quit() {
mQueue.quit(false);
}
因此,需要调用 looper中的quit()
方法, 用于终止loop死循环
消息睡眠和唤醒机制
消息机制属于生产者-消费者设计模式
既然MQ作为一个容器,肯定不能无限制的往里面放msg,假设无限制地放,那么内存会爆,这个应该比较好理解。
入队:根据时间排序,当队列满的时候,阻塞,直到消费者通过next取出消息时,通知MQ唤醒,进行消息的入队
出队:通过loop()死循环进行next循环调用,当MQ为空的时候,阻塞,直到调用MQ.enqueueMessage()的时候,通知队列可以调用next,唤醒
但是Handler机制中MQ并没有对容量做限制,即入队阻塞不存在。因为系统也会有消息需要往里面塞,如果设置上限,会导致消息进不去,因此可以无限往里面放,但是放多了内存会炸。
而针对出队这一块,有两个方面的阻塞:
第一,队首消息延时还没到;
第二,消息队列为空;
/*
nextPollTimeoutMillis用于记录当前时间和msg.when延时的差值 now-msg.when
并通过nativePollOnce(ptr, nextPollTimeoutMillis)方法进行阻塞
其中nextPollTimeoutMillis = 0不会阻塞,直接返回
nextPollTimeoutMillis = -1,一直阻塞
nextPollTimeoutMillis > 0,最大阻塞nextPollTimeoutMillis时长,一旦有消息发送给MQ即唤醒
关键逻辑:
获取当前msg
1.如果为null,则nextPollTimeoutMillis = -1,并于之后continue,到下一次for循环阻塞
2.如果不为null,但延时未到,则计算延时time,并且则nextPollTimeoutMillis = time,并continue后阻塞
3.如果不为null,且延时已到,则正常返回msg
*/
Message next() {
// Return here if the message loop has already quit and been disposed.
// This can happen if the application tries to restart a looper after quit
// which is not supported.
final long ptr = mPtr;
if (ptr == 0) {
return null;
}
int pendingIdleHandlerCount = -1; // -1 only during first iteration
int nextPollTimeoutMillis = 0;
for (;;) {
if (nextPollTimeoutMillis != 0) {
Binder.flushPendingCommands();
}
nativePollOnce(ptr, nextPollTimeoutMillis);
synchronized (this) {
// Try to retrieve the next message. Return if found.
final long now = SystemClock.uptimeMillis();
Message prevMsg = null;
Message msg = mMessages;
if (msg != null && msg.target == null) {
// Stalled by a barrier. Find the next asynchronous message in the queue.
do {
prevMsg = msg;
msg = msg.next;
} while (msg != null && !msg.isAsynchronous());
}
if (msg != null) {
if (now < msg.when) {
// Next message is not ready. Set a timeout to wake up when it is ready.
nextPollTimeoutMillis = (int) Math.min(msg.when - now, Integer.MAX_VALUE);
} else {
// Got a message.
mBlocked = false;
if (prevMsg != null) {
prevMsg.next = msg.next;
} else {
mMessages = msg.next;
}
msg.next = null;
if (DEBUG) Log.v(TAG, "Returning message: " + msg);
msg.markInUse();
return msg;
}
} else {
// No more messages.
nextPollTimeoutMillis = -1;
}
// Process the quit message now that all pending messages have been handled.
if (mQuitting) {
dispose();
return null;
}
// If first time idle, then get the number of idlers to run.
// Idle handles only run if the queue is empty or if the first message
// in the queue (possibly a barrier) is due to be handled in the future.
if (pendingIdleHandlerCount < 0
&& (mMessages == null || now < mMessages.when)) {
pendingIdleHandlerCount = mIdleHandlers.size();
}
if (pendingIdleHandlerCount <= 0) {
// No idle handlers to run. Loop and wait some more.
mBlocked = true;
continue;
}
if (mPendingIdleHandlers == null) {
mPendingIdleHandlers = new IdleHandler[Math.max(pendingIdleHandlerCount, 4)];
}
mPendingIdleHandlers = mIdleHandlers.toArray(mPendingIdleHandlers);
}
// Run the idle handlers.
// We only ever reach this code block during the first iteration.
for (int i = 0; i < pendingIdleHandlerCount; i++) {
final IdleHandler idler = mPendingIdleHandlers[i];
mPendingIdleHandlers[i] = null; // release the reference to the handler
boolean keep = false;
try {
keep = idler.queueIdle();
} catch (Throwable t) {
Log.wtf(TAG, "IdleHandler threw exception", t);
}
if (!keep) {
synchronized (this) {
mIdleHandlers.remove(idler);
}
}
}
// Reset the idle handler count to 0 so we do not run them again.
pendingIdleHandlerCount = 0;
// While calling an idle handler, a new message could have been delivered
// so go back and look again for a pending message without waiting.
nextPollTimeoutMillis = 0;
}
}
boolean enqueueMessage(Message msg, long when) {
synchronized (this) {
...
// We can assume mPtr != 0 because mQuitting is false.
if (needWake) {
nativeWake(mPtr); //有消息入队时唤醒MQ
}
...
}
}
对于nativePollOnce()
和nativeWake()
两个JNI方法,调用栈如下
5.多个可能存在于不同线程的Handler往MQ中添加数据,怎么确保线程安全?
锁机制
synchronized:内置锁,由JVM自动完成
synchronized (this) {
}
//this,指messagequeue,对象里的所有方法、代码块,都会受到限制
由于一个线程只有一个looper,只有一个mq对象,又因为synchronized (this),所以同一时间只有一个消息能够被操作,可能是入队、也可能是出队。包括quit也需要加锁
6.使用Message应该如何创建
Message使用了一种设计模式——享元设计模式,即内存复用。使用obtain()
而不是构造方法初始化,使用recycleUnchecked()
回收,而Message类中维持next和sPool用于保存回收的msg
@UnsupportedAppUsage
void recycleUnchecked() {
// Mark the message as in use while it remains in the recycled object pool.
// Clear out all other details.
flags = FLAG_IN_USE;
what = 0;
arg1 = 0;
arg2 = 0;
obj = null;
replyTo = null;
sendingUid = UID_NONE;
workSourceUid = UID_NONE;
when = 0;
target = null;
callback = null;
data = null;
synchronized (sPoolSync) {
if (sPoolSize < MAX_POOL_SIZE) {
next = sPool;
sPool = this;
sPoolSize++;
}
}
}
这是为了防止内存抖动,避免不断new和注销对象,避免内存空洞
7.Looper死循环block为什么不会导致应用ANR
ANR的成因都是因为Message没有及时处理,比如点击事件5s,广播响应10s,services响应20s,都是指最终转换成的msg没有及时地处理,就会通过Handler发送一个ANR提醒,也就是说,就连ANR的提醒也是Handler机制的工作内容,当looper block时只是说明当前没有msg需要执行,即线程没有事情可做,理所应当交出cpu的占用权。
同步屏障
消息是入队是按照执行时间先后排序的,而出队是从队首取出来,那么,如果遇到紧急的消息应该怎么办?
插队!
消息分为同步消息和异步消息,同步消息正常排队,而当有紧急任务需要处理的时候,就会被声明为异步消息,表示需要优先处理,此时会调用postSyncBarrier()
在队列中(不一定是队首)插入一个target为空的msg,叫做同步屏障;那么同步消息什么时候被处理呢,就需要移除同步屏障,即调用removeSyncBarrier()
MessageQueue.java
private int postSyncBarrier(long when) {
// Enqueue a new sync barrier token.
// We don't need to wake the queue because the purpose of a barrier is to stall it.
synchronized (this) {
final int token = mNextBarrierToken++;
final Message msg = Message.obtain();
msg.markInUse();
msg.when = when;
msg.arg1 = token;
Message prev = null;
Message p = mMessages;
//通过p的时间和屏障的时间,确定屏障消息插入的位置
if (when != 0) {
while (p != null && p.when <= when) {
prev = p;
p = p.next;
}
}
if (prev != null) { // invariant: p == prev.next
//说明屏障消息不是插入消息队列的头部
msg.next = p;
prev.next = msg;
} else {
//屏障消息在消息队列的头部
msg.next = p;
mMessages = msg;
}
return token;
}
}
/**
* Removes a synchronization barrier.
*
* @param token The synchronization barrier token that was returned by
* {@link #postSyncBarrier}.
*
* @throws IllegalStateException if the barrier was not found.
*
* @hide
*/
@UnsupportedAppUsage
@TestApi
public void removeSyncBarrier(int token) {
// Remove a sync barrier token from the queue.
// If the queue is no longer stalled by a barrier then wake it.
synchronized (this) {
Message prev = null;
Message p = mMessages;
while (p != null && (p.target != null || p.arg1 != token)) {
prev = p;
p = p.next;
}
if (p == null) {
throw new IllegalStateException("The specified message queue synchronization "
+ " barrier token has not been posted or has already been removed.");
}
final boolean needWake;
if (prev != null) {
prev.next = p.next;
needWake = false;
} else {
mMessages = p.next;
needWake = mMessages == null || mMessages.target != null;
}
p.recycleUnchecked();
// If the loop is quitting then it is already awake.
// We can assume mPtr != 0 when mQuitting is false.
if (needWake && !mQuitting) {
nativeWake(mPtr);
}
}
}
...
Message next() {
...
// Try to retrieve the next message. Return if found.
final long now = SystemClock.uptimeMillis();
Message prevMsg = null;
Message msg = mMessages;
if (msg != null && msg.target == null) {
// Stalled by a barrier. Find the next asynchronous message in the queue.
do {
prevMsg = msg;
msg = msg.next;
} while (msg != null && !msg.isAsynchronous());
}
...
}
Message.java
public boolean isAsynchronous() {
return (flags & FLAG_ASYNCHRONOUS) != 0;
}
public void setAsynchronous(boolean async) {
if (async) {
flags |= FLAG_ASYNCHRONOUS;
} else {
flags &= ~FLAG_ASYNCHRONOUS;
}
}
通过MessageQueue.next()
源码中可以看到,当检测到同步屏障时:msg.target == null
会从屏障开始往后遍历当前消息队列,直到找到异步消息为止,并将异步消息赋值给msg往下执行,完成插队。
在Android中UI相关的一些操作、ANR等都需要使用异步消息和同步屏障进行插队
比如:ViewRootImpl.java
@UnsupportedAppUsage
void scheduleTraversals() {
if (!mTraversalScheduled) {
mTraversalScheduled = true;
//开启同步屏障
mTraversalBarrier = mHandler.getLooper().getQueue().postSyncBarrier();
//发送异步消息,调用栈如下
mChoreographer.postCallback(
Choreographer.CALLBACK_TRAVERSAL, mTraversalRunnable, null);
notifyRendererOfFramePending();
pokeDrawLockIfNeeded();
}
}
void unscheduleTraversals() {
if (mTraversalScheduled) {
mTraversalScheduled = false;
//移除同步屏障
mHandler.getLooper().getQueue().removeSyncBarrier(mTraversalBarrier);
mChoreographer.removeCallbacks(
Choreographer.CALLBACK_TRAVERSAL, mTraversalRunnable, null);
}
}
mChoreographer.postCallback -> postCallbackDelayed -> postCallbackDelayedInternal
private void postCallbackDelayedInternal(int callbackType,
Object action, Object token, long delayMillis) {
if (DEBUG_FRAMES) {
Log.d(TAG, "PostCallback: type=" + callbackType
+ ", action=" + action + ", token=" + token
+ ", delayMillis=" + delayMillis);
}
synchronized (mLock) {
final long now = SystemClock.uptimeMillis();
final long dueTime = now + delayMillis;
mCallbackQueues[callbackType].addCallbackLocked(dueTime, action, token);
if (dueTime <= now) {
scheduleFrameLocked(now);
} else {
Message msg = mHandler.obtainMessage(MSG_DO_SCHEDULE_CALLBACK, action);
msg.arg1 = callbackType;
msg.setAsynchronous(true);//异步消息
mHandler.sendMessageAtTime(msg, dueTime);
}
}
}
总结:同步屏障的设置可以方便地处理那些优先级较高的异步消息,当我们调用Handler.getLooper().getQueue(). postSyncBarrier()
并设置消息的setAsynchronous(true)
时,target即为null,也就开启了同步屏障。当在消息轮询器Looper在loop()中循环处理消息时,如若开启了同步屏障,会优先处理其中的异步消息,而阻碍同步消息。
HandlerThread
继承于Thread,在子线程中使用looper时使用。
好处:
1.封装了一个looper,方便了初始化
2.保证了线程的安全
通过wait()
和notify()
对,用来保证多线程的安全性
怎么理解呢?先看看下面的代码
private void doThread() {
Thread thread = new Thread(new Runnable() {
Looper looper;
@Override
public void run() {
Looper.prepare();
looper = Looper.myLooper();
Looper.loop();
}
public Looper getLooper() {
return looper;
}
});
thread.start();
Handler handler = new Handler(thread.getLooper());
}
这一段代码运行会不会有问题?
会,但是不绝对。因为thread.start()
时会异步执行run()
方法的逻辑,此时thread.getLooper()
和run()
中的逻辑谁先被执行就不好说了,需要看当时的调度情况,一旦threaed.getLooper()
先执行,那返回的looper
就为空了。
而转过头再来看看HandlerThread.java
的代码
public class HandlerThread extends Thread {
...
...
@Override
public void run() {
mTid = Process.myTid();
Looper.prepare();
synchronized (this) {
mLooper = Looper.myLooper();
notifyAll();
}
Process.setThreadPriority(mPriority);
onLooperPrepared();
Looper.loop();
mTid = -1;
}
/**
* This method returns the Looper associated with this thread. If this thread not been started
* or for any reason isAlive() returns false, this method will return null. If this thread
* has been started, this method will block until the looper has been initialized.
* @return The looper.
*/
public Looper getLooper() {
if (!isAlive()) {
return null;
}
// If the thread has been started, wait until the looper has been created.
synchronized (this) {
while (isAlive() && mLooper == null) {
try {
wait();
} catch (InterruptedException e) {
}
}
}
return mLooper;
}
...
...
}
可以看到getLooper()
方法中加入了mLooper的非空判断,一旦为空,则wait()
,释放当前代码块的锁,并使当前线程等待;而执行完run()
中mLooper
初始化的逻辑后,通过notifyAll()
唤醒等待的所有线程使其等待此处逻辑结束后执行,此时返回的mLooper
一定不为空,所以说HandlerThread
能够保证多线程的安全问题
IntentService——HandlerThread的应用
用于处理后台任务,这个类看起来已经弃用了,不过尚可作为学习内容
@Deprecated
public abstract class IntentService extends Service {
private volatile Looper mServiceLooper;
@UnsupportedAppUsage
private volatile ServiceHandler mServiceHandler;
private String mName;
private boolean mRedelivery;
private final class ServiceHandler extends Handler {
public ServiceHandler(Looper looper) {
super(looper);
}
@Override
public void handleMessage(Message msg) {
onHandleIntent((Intent)msg.obj);
stopSelf(msg.arg1);
}
}
/**
* Creates an IntentService. Invoked by your subclass's constructor.
*
* @param name Used to name the worker thread, important only for debugging.
*/
public IntentService(String name) {
super();
mName = name;
}
...
...
@Override
public void onCreate() {
// TODO: It would be nice to have an option to hold a partial wakelock
// during processing, and to have a static startService(Context, Intent)
// method that would launch the service & hand off a wakelock.
super.onCreate();
HandlerThread thread = new HandlerThread("IntentService[" + mName + "]");
thread.start();
mServiceLooper = thread.getLooper();
mServiceHandler = new ServiceHandler(mServiceLooper);
}
@Override
public void onStart(@Nullable Intent intent, int startId) {
Message msg = mServiceHandler.obtainMessage();
msg.arg1 = startId;
msg.obj = intent;
mServiceHandler.sendMessage(msg);
}
/**
* This method is invoked on the worker thread with a request to process.
* Only one Intent is processed at a time, but the processing happens on a
* worker thread that runs independently from other application logic.
* So, if this code takes a long time, it will hold up other requests to
* the same IntentService, but it will not hold up anything else.
* When all requests have been handled, the IntentService stops itself,
* so you should not call {@link #stopSelf}.
*
* @param intent The value passed to {@link
* android.content.Context#startService(Intent)}.
* This may be null if the service is being restarted after
* its process has gone away; see
* {@link android.app.Service#onStartCommand}
* for details.
*/
@WorkerThread
protected abstract void onHandleIntent(@Nullable Intent intent);
...
...
}
在此类onCreate()
中初始化了一个子线程HandlerThread
,在onStart()
中发送消息进行入队操作;在handleMessage()
中调用抽象方法onHandleIntent
执行使用者编写的后台任务逻辑,执行完毕后stopSelf
停止service,进行内存释放,避免内存泄露等问题。
同时,因为IntentService
根据mName
来初始化HandlerThread
,即只要mName
相同,使用的就是同一个HandlerThread
,即同一个子线程looper
,所以相同name
的IntentService
多任务的执行顺序一定是可控制的,先调用的先执行,因为在同一个线程中,由同一个looper
轮询。