ActivityManagerService 分发广播
简述
上一节我们看了发送广播流程,主要是将广播信息封装至BroadcastRecord,然后通过BroadcastQueueImpl/BroadcastQueueModernImpl的enqueueBroadcastLocked来发送广播。
这一节我们来看一下AMS是怎么分发广播的流程,BroadcastQueueImpl/BroadcastQueueModernImpl是两套策略,只是分发的模式不同,我们主要关心流程,策略细节就不去关注了,我们这里就挑BroadcastQueueImpl来看一下分发的流程。
广播分发
前面提到过BroadcastQueueImpl模式下,会有四个队列来发送广播,流程上四个队列都是一样的,只是有不同的优先级。
我们从BroadcastQueueImpl.enqueueBroadcastLocked开始看。
1.1 BroadcastQueueImpl.enqueueBroadcastLocked
区分有序广播和无序广播,如果有序广播,可能还需要替换队列中之前的BroadcastRecord。
有序和无序广播会添加到BroadcastDispatcher不同的队列中。
最终都会通过调用scheduleBroadcastsLocked来触发广播的分发。
public void enqueueBroadcastLocked(BroadcastRecord r) {
r.applySingletonPolicy(mService);
// flag如果有FLAG_RECEIVER_REPLACE_PENDING,则说明新的广播需要替代之前的
final boolean replacePending = (r.intent.getFlags()
& Intent.FLAG_RECEIVER_REPLACE_PENDING) != 0;
// 有序广播需要顺序分发
boolean serialDispatch = r.ordered;
if (!serialDispatch) {
final int N = (r.receivers != null) ? r.receivers.size() : 0;
for (int i = 0; i < N; i++) {
if (r.receivers.get(i) instanceof ResolveInfo) {
serialDispatch = true;
break;
}
}
}
if (serialDispatch) {
// BroadcastDispatcher里面有一个mOrderedBroadcasts,如果之前已经发送过一样的广播,需要replace mOrderedBroadcasts里面之前的。
final BroadcastRecord oldRecord =
replacePending ? replaceOrderedBroadcastLocked(r) : null;
if (oldRecord != null) {
// 如果有替换,调用performReceiveLocked
if (oldRecord.resultTo != null) {
try {
oldRecord.mIsReceiverAppRunning = true;
performReceiveLocked(oldRecord, oldRecord.resultToApp, oldRecord.resultTo,
oldRecord.intent,
Activity.RESULT_CANCELED, null, null,
false, false, oldRecord.shareIdentity, oldRecord.userId,
oldRecord.callingUid, r.callingUid, r.callerPackage,
SystemClock.uptimeMillis() - oldRecord.enqueueTime, 0, 0,
oldRecord.resultToApp != null
? oldRecord.resultToApp.mState.getCurProcState()
: ActivityManager.PROCESS_STATE_UNKNOWN);
} catch (RemoteException e) {
// ...
}
}
} else {
// 否则添加到BroadcastDispatcher的队列中去。
enqueueOrderedBroadcastLocked(r);
// 最终需要调用scheduleBroadcastsLocked来发送广播,详见1.2
scheduleBroadcastsLocked();
}
} else {
final boolean replaced = replacePending
&& (replaceParallelBroadcastLocked(r) != null);
// 无序广播
if (!replaced) {
// 添加到BroadcastDispatcher的队列中去
enqueueParallelBroadcastLocked(r);
// 最终需要调用scheduleBroadcastsLocked来发送广播,详见1.2
scheduleBroadcastsLocked();
}
}
}
1.2 BroadcastQueueImpl.scheduleBroadcastsLocked
发送一个BROADCAST_INTENT_MSG消息
public void scheduleBroadcastsLocked() {
// ...log
// 防重入
if (mBroadcastsScheduled) {
return;
}
// 发送BROADCAST_INTENT_MSG消息
mHandler.sendMessage(mHandler.obtainMessage(BROADCAST_INTENT_MSG, this));
mBroadcastsScheduled = true;
}
1.3 BroadcastHandler.handleMessage
调用processNextBroadcast
private final class BroadcastHandler extends Handler {
@Override
public void handleMessage(Message msg) {
switch (msg.what) {
case BROADCAST_INTENT_MSG: {
if (DEBUG_BROADCAST) Slog.v(
TAG_BROADCAST, "Received BROADCAST_INTENT_MSG ["
+ mQueueName + "]");
// 调用 processNextBroadcast,详见1.4
processNextBroadcast(true);
} break;
case BROADCAST_TIMEOUT_MSG: {
synchronized (mService) {
// 这里是广播超时,触发ANR的地方,我们不是在说稳定性,这里就不关注了
broadcastTimeoutLocked(true);
}
} break;
}
}
}
1.4 BroadcastQueueImpl.processNextBroadcast
加了AMS大锁,然后调用processNextBroadcastLocked
private void processNextBroadcast(boolean fromMsg) {
synchronized (mService) {
processNextBroadcastLocked(fromMsg, false);
}
}
1.5 BroadcastQueueImpl.processNextBroadcastLocked
首先先通过deliverToRegisteredReceiverLocked分发无序广播,然后再来处理有序广播。
有序广播需要一个一个receiver来分发,所以先要找到下一个receiver。这里有一个延迟策略,有一些广播app侧处理起来比较耗时,例如需要启动进程。
这种情况下就会将广播进行拆分,新建一个BroadcastRecord,它的receiver只有刚刚需要延迟的那一个,然后在原来的BroadcastRecord中删除这个receiver。
找到下一个receiver后,先设置广播的Timeout超时逻辑,这里相当于ANR埋雷,然后获取receiver的进程信息,检查是否需要skip该receiver,如果不需要则检查进程是否已经运行,如果已经运行就调用processCurBroadcastLocked来分发广播,否则会先启动目标进程,启动进程的逻辑和之前Activity启动进程逻辑类似,就不说了。
public void processNextBroadcastLocked(boolean fromMsg, boolean skipOomAdj) {
BroadcastRecord r;
// ...
// 先分发无序广播
while (mParallelBroadcasts.size() > 0) {
r = mParallelBroadcasts.remove(0);
r.dispatchTime = SystemClock.uptimeMillis();
r.dispatchRealTime = SystemClock.elapsedRealtime();
r.dispatchClockTime = System.currentTimeMillis();
r.mIsReceiverAppRunning = true;
// ...
final int N = r.receivers.size();
if (DEBUG_BROADCAST_LIGHT) Slog.v(TAG_BROADCAST, "Processing parallel broadcast ["
+ mQueueName + "] " + r);
for (int i=0; i<N; i++) {
Object target = r.receivers.get(i);
// 分发无序广播
deliverToRegisteredReceiverLocked(r,
(BroadcastFilter) target, false, i);
}
addBroadcastToHistoryLocked(r);
}
// 如果有mPendingBroadcast,正在发送的广播,什么都不做,只是检查一下对应进程是否还活着,如果活着就直接return。
// 否则需要继续发送广播的流程
if (mPendingBroadcast != null) {
boolean isDead;
if (mPendingBroadcast.curApp.getPid() > 0) {
synchronized (mService.mPidsSelfLocked) {
ProcessRecord proc = mService.mPidsSelfLocked.get(
mPendingBroadcast.curApp.getPid());
isDead = proc == null || proc.mErrorState.isCrashing();
}
} else {
final ProcessRecord proc = mService.mProcessList.getProcessNamesLOSP().get(
mPendingBroadcast.curApp.processName, mPendingBroadcast.curApp.uid);
isDead = proc == null || !proc.isPendingStart();
}
if (!isDead) {
// 进程还活着,什么都不做直接return
return;
} else {
mPendingBroadcast.state = BroadcastRecord.IDLE;
mPendingBroadcast.nextReceiver = mPendingBroadcastRecvIndex;
mPendingBroadcast = null;
}
}
boolean looped = false;
// 循环处理广播,寻找下一个我们需要发送的广播以及receiver
do {
final long now = SystemClock.uptimeMillis();
r = mDispatcher.getNextBroadcastLocked(now);
if (r == null) {
// ...
// 没有广播需要分发,直接return
return;
}
boolean forceReceive = false;
//
int numReceivers = (r.receivers != null) ? r.receivers.size() : 0;
if (mService.mProcessesReady && !r.timeoutExempt && r.dispatchTime > 0) {
if ((numReceivers > 0) &&
(now > r.dispatchTime + (2 * mConstants.TIMEOUT * numReceivers))) {
// ... 如果广播处理太慢超时,强制停止广播
broadcastTimeoutLocked(false); // forcibly finish this broadcast
forceReceive = true;
r.state = BroadcastRecord.IDLE;
}
}
if (r.state != BroadcastRecord.IDLE) {
// ...如果当前广播不是IDLE状态,则直接返回
return;
}
// 检查广播发送是否已经完成
if (r.receivers == null || r.nextReceiver >= numReceivers
|| r.resultAbort || forceReceive) {
if (r.resultTo != null) {
boolean sendResult = true;
// ...
if (sendResult) {
// ...
// 如果需要发送结果
performReceiveLocked(r, r.resultToApp, r.resultTo,
new Intent(r.intent), r.resultCode,
r.resultData, r.resultExtras, false, false, r.shareIdentity,
r.userId, r.callingUid, r.callingUid, r.callerPackage,
r.dispatchTime - r.enqueueTime,
now - r.dispatchTime, 0,
r.resultToApp != null
? r.resultToApp.mState.getCurProcState()
: ActivityManager.PROCESS_STATE_UNKNOWN);
// ...
}
}
// ...清除广播超时计时器
cancelBroadcastTimeoutLocked();
// ...
mDispatcher.retireBroadcastLocked(r);
r = null;
looped = true;
// 当前广播已经完成,则进入下一次循环
continue;
}
// 检查下一个receiver是否在defer策略下。
if (!r.deferred) {
final int receiverUid = r.getReceiverUid(r.receivers.get(r.nextReceiver));
if (mDispatcher.isDeferringLocked(receiverUid)) {
// 如果这个是最后一个receiver,就没有必要做拆分操作了,就正常推迟即可。
BroadcastRecord defer;
if (r.nextReceiver + 1 == numReceivers) {
defer = r;
mDispatcher.retireBroadcastLocked(r);
} else {
// 进行分割操作,这里会新构建一个BroadcastRecord,接收者只有这一个uid,单独来发送
defer = r.splitRecipientsLocked(receiverUid, r.nextReceiver);
// ...
// Track completion refcount as well if relevant
if (r.resultTo != null) {
int token = r.splitToken;
// 分割的计数。
if (token == 0) {
r.splitToken = defer.splitToken = nextSplitTokenLocked();
mSplitRefcounts.put(r.splitToken, 2);
} else {
final int curCount = mSplitRefcounts.get(token);
mSplitRefcounts.put(token, curCount + 1);
}
}
}
// 添加分割后需要延迟的广播
mDispatcher.addDeferredBroadcast(receiverUid, defer);
r = null;
looped = true;
continue;
}
}
} while (r == null);
int recIdx = r.nextReceiver++;
r.receiverTime = SystemClock.uptimeMillis();
r.scheduledTime[recIdx] = r.receiverTime;
if (recIdx == 0) {
r.dispatchTime = r.receiverTime;
r.dispatchRealTime = SystemClock.elapsedRealtime();
r.dispatchClockTime = System.currentTimeMillis();
// ... trace/log
}
if (! mPendingBroadcastTimeoutMessage) {
long timeoutTime = r.receiverTime + mConstants.TIMEOUT;
// 设置广播超时
setBroadcastTimeoutLocked(timeoutTime);
}
final BroadcastOptions brOptions = r.options;
final Object nextReceiver = r.receivers.get(recIdx);
//...
// ...获取receiver进程信息
// 判断是否需要跳过当前的receiver
boolean skip = mSkipPolicy.shouldSkip(r, info);
Bundle filteredExtras = null;
if (!skip && r.filterExtrasForReceiver != null) {
final Bundle extras = r.intent.getExtras();
if (extras != null) {
filteredExtras = r.filterExtrasForReceiver.apply(receiverUid, extras);
if (filteredExtras == null) {
skip = true;
}
}
}
// 如果需要跳过,就调用scheduleBroadcastsLocked进入下一次广播分发逻辑
if (skip) {
r.delivery[recIdx] = BroadcastRecord.DELIVERY_SKIPPED;
r.curFilter = null;
r.state = BroadcastRecord.IDLE;
r.manifestSkipCount++;
scheduleBroadcastsLocked();
return;
}
r.manifestCount++;
r.delivery[recIdx] = BroadcastRecord.DELIVERY_DELIVERED;
r.state = BroadcastRecord.APP_RECEIVE;
r.curComponent = component;
r.curReceiver = info.activityInfo;
r.curFilteredExtras = filteredExtras;
final boolean isActivityCapable =
(brOptions != null && brOptions.getTemporaryAppAllowlistDuration() > 0);
maybeScheduleTempAllowlistLocked(receiverUid, r, brOptions);
// ...
// 检查receiver目标app是否已经启动
if (app != null && app.getThread() != null && !app.isKilled()) {
try {
app.addPackage(info.activityInfo.packageName,
info.activityInfo.applicationInfo.longVersionCode, mService.mProcessStats);
maybeAddBackgroundStartPrivileges(app, r);
r.mIsReceiverAppRunning = true;
// 发送广播,详见1.6
processCurBroadcastLocked(r, app);
return;
}
// ...
}
r.mWasReceiverAppStopped =
(info.activityInfo.applicationInfo.flags & ApplicationInfo.FLAG_STOPPED) != 0;
// 如果目标进程没有启动,这里需要先启动进程。
r.curApp = mService.startProcessLocked(targetProcess,
info.activityInfo.applicationInfo, true,
r.intent.getFlags() | Intent.FLAG_FROM_BACKGROUND,
new HostingRecord(HostingRecord.HOSTING_TYPE_BROADCAST, r.curComponent,
r.intent.getAction(), r.getHostingRecordTriggerType()),
isActivityCapable ? ZYGOTE_POLICY_FLAG_LATENCY_SENSITIVE : ZYGOTE_POLICY_FLAG_EMPTY,
(r.intent.getFlags() & Intent.FLAG_RECEIVER_BOOT_UPGRADE) != 0, false);
r.curAppLastProcessState = ActivityManager.PROCESS_STATE_NONEXISTENT;
if (r.curApp == null) {
// 如果启动进程失败,结束当前的广播
logBroadcastReceiverDiscardLocked(r);
finishReceiverLocked(r, r.resultCode, r.resultData,
r.resultExtras, r.resultAbort, false);
// 重新调用scheduleBroadcastsLocked发送广播
scheduleBroadcastsLocked();
r.state = BroadcastRecord.IDLE;
return;
}
maybeAddBackgroundStartPrivileges(r.curApp, r);
mPendingBroadcast = r;
mPendingBroadcastRecvIndex = recIdx;
}
1.6 BroadcastQueueImpl.processCurBroadcastLocked
通过ActivityThread.scheduleReceiver binder回调回app侧执行广播。
private final void processCurBroadcastLocked(BroadcastRecord r,
ProcessRecord app) throws RemoteException {
// ...
// 跟新adj
mService.enqueueOomAdjTargetLocked(app);
mService.updateOomAdjPendingTargetsLocked(OOM_ADJ_REASON_START_RECEIVER);
// ...
boolean started = false;
try {
mService.notifyPackageUse(r.intent.getComponent().getPackageName(),
PackageManager.NOTIFY_PACKAGE_USE_BROADCAST_RECEIVER);
final boolean assumeDelivered = false;
// 通过binder回调App侧,分发广播,详见1.7
thread.scheduleReceiver(
prepareReceiverIntent(r.intent, r.curFilteredExtras),
r.curReceiver, null /* compatInfo (unused but need to keep method signature) */,
r.resultCode, r.resultData, r.resultExtras, r.ordered, assumeDelivered,
r.userId, r.shareIdentity ? r.callingUid : Process.INVALID_UID,
app.mState.getReportedProcState(),
r.shareIdentity ? r.callerPackage : null);
started = true;
} finally {
// ...
}
if (app.isKilled()) {
throw new RemoteException("app gets killed during broadcasting");
}
}
1.7 ActivityThread.scheduleReceiver
将广播封装到ReceiverData,发送H.RECEIVER消息
public final void scheduleReceiver(Intent intent, ActivityInfo info,
CompatibilityInfo compatInfo, int resultCode, String data, Bundle extras,
boolean ordered, boolean assumeDelivered, int sendingUser, int processState,
int sendingUid, String sendingPackage) {
updateProcessState(processState, false);
// 封装ReceiverData
ReceiverData r = new ReceiverData(intent, resultCode, data, extras,
ordered, false, assumeDelivered, mAppThread.asBinder(), sendingUser,
sendingUid, sendingPackage);
r.info = info;
// 发送H.RECEIVER消息,详见1.8
sendMessage(H.RECEIVER, r);
}
1.8 H.handleMessage
调用handleReceiver
public void handleMessage(Message msg) {
// ...
case RECEIVER:
if (Trace.isTagEnabled(Trace.TRACE_TAG_ACTIVITY_MANAGER)) {
ReceiverData rec = (ReceiverData) msg.obj;
// ...
// 详见1.9
handleReceiver((ReceiverData)msg.obj);
Trace.traceEnd(Trace.TRACE_TAG_ACTIVITY_MANAGER);
break;
// ...
}
1.9 ActivityThread.handleReceiver
这里通过反射构造出对应BroadcastReceiver,然后调用了Receiver的onReceive,后续就是我们应用开发时候定义广播接收器,复写onReceive自己定义的逻辑了。
private void handleReceiver(ReceiverData data) {
// ...
try {
// ...
java.lang.ClassLoader cl = context.getClassLoader();
data.intent.setExtrasClassLoader(cl);
data.intent.prepareToEnterProcess(
isProtectedComponent(data.info) || isProtectedBroadcast(data.intent),
context.getAttributionSource());
data.setExtrasClassLoader(cl);
// 通过反射构造对应是BroadcastReceiver
receiver = packageInfo.getAppFactory()
.instantiateReceiver(cl, data.info.name, data.intent);
} catch (Exception e) {
// ...
}
try {
// ...
// 调用广播的onReceive
sCurrentBroadcastIntent.set(data.intent);
receiver.setPendingResult(data);
receiver.onReceive(context.getReceiverRestrictedContext(),
data.intent);
} catch (Exception e) {
// ...
} finally {
sCurrentBroadcastIntent.set(null);
}
if (receiver.getPendingResult() != null) {
data.finish();
}
}
小结
本章主要介绍了一下广播分发的流程,虽然有一些方法比较长,但是核心的逻辑是比较简单的,主要是一些分发的策略逻辑比较多。
发送广播时会把广播数据放到队列中,然后分发广播会不断调用scheduleBroadcastsLocked来处理广播的分发。
广播分发分为无序广播和有序广播,无序广播分发逻辑比较简单,直接分发即可。而有序广播有一些额外策略,例如如果有处理广播时间较长的情况,如目标进程未启动需要现在启动进程,会将广播单独拆分开,防止单个应用处理广播太慢广播影响其他应用。
分发到应用通过ActivityThread.scheduleReceiver的binder调用通知到应用,而应用会通过反射构造对应的BroadcastReceiver,然后调用它的onReceiver,之后就是app侧复写onReceive的逻辑了,这里的其实和Activity的onCreate操作很像。