源码基于:Android U
0. 前言
最近在研究 Android 自带的系统数据指标采集功能,框架依旧很严谨、完美,这里做个分享。
1. Android S 之后变化
stats 的代码从 framework 或 system/core 中转移到了 packages/modules/StatsD 目录中。
2. 框架图
大的框架分两层:
-
system_sever;
-
statsd;
Java 层创建两个 SystemService:
-
StatsCompanion.Lifecycle:会启动两个Service 用以与 native daemon 通信;
-
StatsPullAtomService:SystemServer 中用来采集数据;
StatsPullAtomService 用以采集数据,并将数据填充到参数中,在StatsManager.PullAtomCallbackInternal 中会转换成 Parcel 格式传入到 native。
frameworks/base/services/core/java/com/android/server/stats/pull/StatsPullAtomService.java
private class StatsPullAtomCallbackImpl implements StatsManager.StatsPullAtomCallback {
@Override
public int onPullAtom(int atomTag, List<StatsEvent> data) {
...
try {
switch (atomTag) {
...
case FrameworkStatsLog.PROCESS_MEMORY_STATE:
case FrameworkStatsLog.PROCESS_MEMORY_HIGH_WATER_MARK:
case FrameworkStatsLog.PROCESS_MEMORY_SNAPSHOT:
case FrameworkStatsLog.SYSTEM_ION_HEAP_SIZE:
case FrameworkStatsLog.ION_HEAP_SIZE:
case FrameworkStatsLog.PROCESS_SYSTEM_ION_HEAP_SIZE:
case FrameworkStatsLog.PROCESS_DMABUF_MEMORY:
case FrameworkStatsLog.SYSTEM_MEMORY:
case FrameworkStatsLog.VMSTAT:
...
}
} finally {}
}
该函数是针对不同类型的 puller 的回调处理,在此之前会调用 registerPullers
函数将所有的puller 注册到 StatsManager 中:
private void registerProcessMemoryHighWaterMark() {
int tagId = FrameworkStatsLog.PROCESS_MEMORY_HIGH_WATER_MARK;
mStatsManager.setPullAtomCallback(
tagId,
null, // use default PullAtomMetadata values
DIRECT_EXECUTOR,
mStatsCallbackImpl
);
}
StatsCallbackPuller::PullInternal 中会回调StatsManager.PullAtomCallbackInternal 的onPullAtom
,该函数中会回调StatsCallbackPuller::PullInternal 中定义的PullResultReceiver
的pullFinished
函数,并将 StatsPullAtomService
端采集的数据存入StatsCallbackPuller::PullInternal
的入参中;
packages/modules/StatsD/framework/java/android/app/StatsManager.java
public void onPullAtom(int atomTag, IPullAtomResultReceiver resultReceiver) {
final long token = Binder.clearCallingIdentity();
try {
mExecutor.execute(() -> {
List<StatsEvent> data = new ArrayList<>(); //上层的采集数据存入data
int successInt = mCallback.onPullAtom(atomTag, data); //callback,开始采集
boolean success = successInt == PULL_SUCCESS;
StatsEventParcel[] parcels = new StatsEventParcel[data.size()];
for (int i = 0; i < data.size(); i++) {
parcels[i] = new StatsEventParcel();
parcels[i].buffer = data.get(i).getBytes(); //转换成parcel,传入native
}
try {
resultReceiver.pullFinished(atomTag, success, parcels); //receiver回调
} catch (RemoteException e) {
...
}
});
} finally {
Binder.restoreCallingIdentity(token);
}
}
packages/modules/StatsD/statsd/src/external/StatsCallbackPuller.cpp
PullErrorCode StatsCallbackPuller::PullInternal(vector<shared_ptr<LogEvent>>* data) {
...
shared_ptr<vector<shared_ptr<LogEvent>>> sharedData =
make_shared<vector<shared_ptr<LogEvent>>>();
shared_ptr<PullResultReceiver> resultReceiver = SharedRefBase::make<PullResultReceiver>(
[cv_mutex, cv, pullFinish, pullSuccess, sharedData]( //receiver回调
int32_t atomTag, bool success, const vector<StatsEventParcel>& output) {
{
lock_guard<mutex> lk(*cv_mutex);
for (const StatsEventParcel& parcel: output) {
shared_ptr<LogEvent> event = make_shared<LogEvent>(/*uid=*/-1, /*pid=*/-1);
bool valid = event->parseBuffer((uint8_t*)parcel.buffer.data(),
parcel.buffer.size());
if (valid) {
sharedData->push_back(event); //解析上层采集数据,存入sharedData
} else {
StatsdStats::getInstance().noteAtomError(event->GetTagId(),
/*pull=*/true);
}
}
*pullSuccess = success;
*pullFinish = true;
}
cv->notify_one();
});
// Initiate the pull. This is a oneway call to a different process, except
// in unit tests. In process calls are not oneway.
Status status = mCallback->onPullAtom(mTagId, resultReceiver); //callback,等待receiver回调
...
{
unique_lock<mutex> unique_lk(*cv_mutex);
// Wait until the pull finishes, or until the pull timeout.
cv->wait_for(unique_lk, chrono::nanoseconds(mPullTimeoutNs),
[pullFinish] { return *pullFinish; });
if (!*pullFinish) {
...
} else {
if (*pullSuccess) {
*data = std::move(*sharedData); //数据最终填充到入参data中
}
...
}
}
StatsPullerManager::OnAlarmFired 最终会调用 StatsCallbackPuller::PullInternal
,参数也是这里传入的。所以采集的数据最终会回到 StatsPullerManager::OnAlarmFired
中,最终会将这些数据通过注册在StatsPullerManager中的 mReceivers
的回调函数 onDataPulled
处理。
packages/modules/StatsD/statsd/src/external/StatsPullerManager.cpp
void StatsPullerManager::OnAlarmFired(int64_t elapsedTimeNs) {
std::lock_guard<std::mutex> _l(mLock);
int64_t wallClockNs = getWallClockNs();
int64_t minNextPullTimeNs = NO_ALARM_UPDATE;
vector<pair<const ReceiverKey*, vector<ReceiverInfo*>>> needToPull;
for (auto& pair : mReceivers) { //最终数据的处理都是在mReceivers中
vector<ReceiverInfo*> receivers;
if (pair.second.size() != 0) {
for (ReceiverInfo& receiverInfo : pair.second) {
sp<PullDataReceiver> receiverPtr = receiverInfo.receiver.promote();
const bool pullNecessary = receiverPtr != nullptr && receiverPtr->isPullNeeded();
if (receiverInfo.nextPullTimeNs <= elapsedTimeNs && pullNecessary) {
receivers.push_back(&receiverInfo);
} else {
if (receiverInfo.nextPullTimeNs <= elapsedTimeNs) {
receiverPtr->onDataPulled({}, PullResult::PULL_NOT_NEEDED, elapsedTimeNs);
int numBucketsAhead = (elapsedTimeNs - receiverInfo.nextPullTimeNs) /
receiverInfo.intervalNs;
receiverInfo.nextPullTimeNs +=
(numBucketsAhead + 1) * receiverInfo.intervalNs;
}
minNextPullTimeNs = min(receiverInfo.nextPullTimeNs, minNextPullTimeNs);
}
}
if (receivers.size() > 0) {
needToPull.push_back(make_pair(&pair.first, receivers));
}
}
}
for (const auto& pullInfo : needToPull) {
vector<shared_ptr<LogEvent>> data; //采集的数据根本是存在这里的临时变量
PullResult pullResult =
PullLocked(pullInfo.first->atomTag, pullInfo.first->configKey, elapsedTimeNs, &data) //调用PullLocked 将数据采集回来
? PullResult::PULL_RESULT_SUCCESS
: PullResult::PULL_RESULT_FAIL;
if (pullResult == PullResult::PULL_RESULT_FAIL) {
VLOG("pull failed at %lld, will try again later", (long long)elapsedTimeNs);
}
// Convention is to mark pull atom timestamp at request time.
// If we pull at t0, puller starts at t1, finishes at t2, and send back
// at t3, we mark t0 as its timestamp, which should correspond to its
// triggering event, such as condition change at t0.
// Here the triggering event is alarm fired from AlarmManager.
// In ValueMetricProducer and GaugeMetricProducer we do same thing
// when pull on condition change, etc.
for (auto& event : data) {
event->setElapsedTimestampNs(elapsedTimeNs);
event->setLogdWallClockTimestampNs(wallClockNs);
}
for (const auto& receiverInfo : pullInfo.second) {
sp<PullDataReceiver> receiverPtr = receiverInfo->receiver.promote();
if (receiverPtr != nullptr) {
receiverPtr->onDataPulled(data, pullResult, elapsedTimeNs); //发送给receiver处理
// We may have just come out of a coma, compute next pull time.
int numBucketsAhead =
(elapsedTimeNs - receiverInfo->nextPullTimeNs) / receiverInfo->intervalNs;
receiverInfo->nextPullTimeNs += (numBucketsAhead + 1) * receiverInfo->intervalNs;
minNextPullTimeNs = min(receiverInfo->nextPullTimeNs, minNextPullTimeNs);
} else {
VLOG("receiver already gone.");
}
}
}
VLOG("mNextPullTimeNs: %lld updated to %lld", (long long)mNextPullTimeNs,
(long long)minNextPullTimeNs);
mNextPullTimeNs = minNextPullTimeNs; //alarm时间点更新
updateAlarmLocked(); //调用updateAlarmLocked 告知Java service 中的AlarmManagerService
}