前言
本篇进行matrix框架的网络流量监控模块的代码分析。你可能想,为什么需要对流量进行监控呢?我们平常进行的网络接口请求都是一些必要的操作,监控它的意义何在?首先我们要明确流量监控的对象是什么,是上行(发请求消耗的流量)和下行(接收到服务器返回的数据流量)这两块消耗的用户流量。通过这个监控,我们可以清晰的看到每个接口在每次调用时所消耗的流量的具体值,有了这个数据之后,我们可以从两个维度来分析流量问题。第一,明确单次接口请求是否存在过多消耗流量的情况,从而促进网络数据包的体积优化;第二,从多次请求的维度来看,也能帮助我们定位是否存在单个接口请求数量异常的问题,从而定位代码存在的业务逻辑问题。笔者在万能钥匙的时候,就曾遇到过类似的情况,某接口在应用启动阶段频繁调用,导致用户流量消耗过多被用户投诉的问题。试想假如当时做了流量监控,那么对开发团队来说,就可以更高效的定位到问题所在。
言归正传,我们进入今天的代码分析,分析的对象时matrix中的TrafficPlugin,我们从它的几个关键方法入手。
- 静态代码块
- start
- stop
静态代码块
static {
System.loadLibrary("matrix-traffic");
}
根据加载的名称matrix-traffic找到MatrixTraffic.cc这个c++的class,loadLibrary方法执行的时候会进入到JNI_OnLoad方法,JNI_OnLoad方法通常用来做一些准备性的工作,用于后边c++层和Java层的一个互调,下面是一部分关键代码,可以看到,这里将Java层的TrafficPlugin保存为全局引用,并获取了它的setStackTrace方法备用,并动态注册了一些Java层到native层方法的映射关系。
JNIEXPORT jint JNICALL JNI_OnLoad(JavaVM *vm, void *) {
jclass trafficCollectorCls = env->FindClass("com/tencent/matrix/traffic/TrafficPlugin");
if (!trafficCollectorCls)
return -1;
//保存TrafficPlugin的jclass对象为全局引用
gJ.TrafficPlugin = static_cast<jclass>(env->NewGlobalRef(trafficCollectorCls));
//保存TrafficPlugin的setStackTrace方法为全局引用
gJ.TrafficPlugin_setFdStackTrace =
env->GetStaticMethodID(trafficCollectorCls, "setStackTrace", "(Ljava/lang/String;Ljava/lang/String;)V");
//动态注册一些Java层方法到native方法的映射关系
if (env->RegisterNatives(
trafficCollectorCls, TRAFFIC_METHODS, static_cast<jint>(NELEM(TRAFFIC_METHODS))) != 0)
return -1;
return JNI_VERSION_1_6;
}
start
通过nativeInitMatrixTraffic方法调用进入native层,根据上边JNI_OnLoad动态注册的映射关系找到MatrixTraffic.cc中的nativeInitMatrixTraffic方法。
@Override
public void start() {
//这里可以设置需要过滤的so
String[] ignoreSoFiles = trafficConfig.getIgnoreSoFiles();
//进入native层
nativeInitMatrixTraffic(trafficConfig.isRxCollectorEnable(), trafficConfig.isTxCollectorEnable(), trafficConfig.willDumpStackTrace(), trafficConfig.willDumpNativeBackTrace(), trafficConfig.willLookupIpAddress(), ignoreSoFiles);
}
MatrixTraffic.cc中的nativeInitMatrixTraffic方法。
static void nativeInitMatrixTraffic(JNIEnv *env, jclass, jboolean rxEnable, jboolean txEnable, jboolean dumpStackTrace, jboolean dumpNativeBackTrace, jboolean lookupIpAddress, jobjectArray ignoreSoFiles) {
//启动loop循环线程
TrafficCollector::startLoop(dumpStackTrace == JNI_TRUE, lookupIpAddress == JNI_TRUE);
//是否dump native堆栈
sDumpNativeBackTrace = (dumpNativeBackTrace == JNI_TRUE);
//需要过滤的so
ignoreSo(env, ignoreSoFiles);
//通过hook socket实现对网络请求的拦截
hookSocket(rxEnable == JNI_TRUE, txEnable == JNI_TRUE);
}
startLoop
首先startLoop启动循环线程
void TrafficCollector::startLoop(bool dumpStackTrace, bool lookupIpAddress) {
thread loopThread(loop);
loopThread.detach();
}
loop循环线程是作为一个消费者线程出现的,我们先跳过这个方法的具体实现,先把生产者生产数据的过程看一下。
void loop() {
while (loopRunning) {
if (msgQueue.empty()) {
queueMutex.lock();
} else {
...
}
}
}
hookSocket
网络请求最终都是通过底层socket进行发起的,所以通过hook socket的方式可以拦截到所有的网络请求,这里是用了plt hook的方式,什么是plt hook可以参考 爱奇艺的xhook框架介绍 。为了使代码更简洁,用…省略了部分代码。
static void hookSocket(bool rxHook, bool txHook) {
//连接和关闭
xhook_grouped_register(..., ".*\.so$", "connect",(void *) my_connect, (void **) (&original_connect));
xhook_grouped_register(..., ".*\.so$", "close",(void *) my_close, (void **) (&original_close));
//接收的数据监控
if (rxHook) {
xhook_grouped_register(..., ".*\.so$", "read",(void *) my_read, (void **) (&original_read));
xhook_grouped_register(..., ".*\.so$", "recv",(void *) my_recv, (void **) (&original_recv));
xhook_grouped_register(..., ".*\.so$", "recvfrom",(void *) my_recvfrom, (void **) (&original_recvfrom));
xhook_grouped_register(..., ".*\.so$", "recvmsg",(void *) my_recvmsg, (void **) (&original_recvmsg));
}
//上传的数据监控
if (txHook) {
xhook_grouped_register(..., ".*\.so$", "write",(void *) my_write, (void **) (&original_write));
xhook_grouped_register(..., ".*\.so$", "send",(void *) my_send, (void **) (&original_send));
xhook_grouped_register(.., ".*\.so$", "sendto",(void *) my_sendto, (void **) (&original_sendto));
xhook_grouped_register(.., ".*\.so$", "sendmsg",(void *) my_sendmsg, (void **) (&original_sendmsg));
}
}
可以看到这里hook了socket的一些关键方法,这些方法被hook之后,当方法再次被调用的时候,我们就可以拦截到它的执行,从而做一些额外的处理。
- connect
- close
- read
- recv
- recvfrom
- recvmsg
- write
- send
- sendto
- sendmsg
connect
int my_connect(int fd, sockaddr *addr, socklen_t addr_length) {
TrafficCollector::enQueueConnect(fd, addr, addr_length);
return original_connect(fd, addr, addr_length);
}
通过调用TrafficCollector的enQueueConnect方法记录本次socket连接的信息,将MSG_TYPE_CONNECT类型,文件描述符,socket地址,调用栈等信息封装成TrafficMsg存入msgQueue队列。
void TrafficCollector::enQueueConnect(int fd, sockaddr *addr, socklen_t addr_length) {
//将MSG_TYPE_CONNECT类型,文件描述符,socket地址,调用栈等信息封装成TrafficMsg存入msgQueue队列
shared_ptr<TrafficMsg> msg = make_shared<TrafficMsg>(MSG_TYPE_CONNECT, fd, addr->sa_family, getKeyAndSaveStack(fd), 0);
msgQueue.push(msg);
queueMutex.unlock();
}
close
int my_close(int fd) {
TrafficCollector::enQueueClose(fd);
return original_close(fd);
}
通过调用TrafficCollector的enQueueClose方法记录本次socket关闭的信息,将MSG_TYPE_CLOSE类型,文件描述符封装成TrafficMsg存入msgQueue队列。
void TrafficCollector::enQueueClose(int fd) {
shared_ptr<TrafficMsg> msg = make_shared<TrafficMsg>(MSG_TYPE_CLOSE, fd, 0, "", 0);
msgQueue.push(msg);
queueMutex.unlock();
}
read
ssize_t my_read(int fd, void *buf, size_t count) {
ssize_t ret = original_read(fd, buf, count);
TrafficCollector::enQueueRx(MSG_TYPE_READ, fd, ret);
return ret;
}
type为MSG_TYPE_READ,调用TrafficCollector的enQueueRx方法。
void enQueueMsg(int type, int fd, size_t len) {
shared_ptr<TrafficMsg> msg = make_shared<TrafficMsg>(type, fd, 0, getKeyAndSaveStack(fd), len);
msgQueue.push(msg);
queueMutex.unlock();
}
recv
ssize_t my_recv(int sockfd, void *buf, size_t len, int flags) {
ssize_t ret = original_recv(sockfd, buf, len, flags);
TrafficCollector::enQueueRx(MSG_TYPE_RECV, sockfd, ret);
return ret;
}
type为MSG_TYPE_RECV,调用TrafficCollector的enQueueRx方法。
void enQueueMsg(int type, int fd, size_t len) {
shared_ptr<TrafficMsg> msg = make_shared<TrafficMsg>(type, fd, 0, getKeyAndSaveStack(fd), len);
msgQueue.push(msg);
queueMutex.unlock();
}
recvfrom
ssize_t my_recvfrom(int sockfd, void *buf, size_t len, int flags,
struct sockaddr *src_addr, socklen_t *addrlen) {
ssize_t ret = original_recvfrom(sockfd, buf, len, flags, src_addr, addrlen);
TrafficCollector::enQueueRx(MSG_TYPE_RECVFROM, sockfd, ret);
return ret;
}
type为MSG_TYPE_RECVFROM,调用TrafficCollector的enQueueRx方法。
recvmsg
ssize_t my_write(int fd, const void *buf, size_t count) {
ssize_t ret = original_write(fd, buf, count);
TrafficCollector::enQueueTx(MSG_TYPE_WRITE, fd, ret);
return ret;
}
type为MSG_TYPE_RECVMSG,调用TrafficCollector的enQueueRx方法。
write
ssize_t my_send(int sockfd, const void *buf, size_t len, int flags) {
ssize_t ret = original_send(sockfd, buf, len, flags);
TrafficCollector::enQueueTx(MSG_TYPE_SEND, sockfd, ret);
return ret;
}
type为MSG_TYPE_WRITE,调用TrafficCollector的enQueueRx方法。
send
ssize_t my_send(int sockfd, const void *buf, size_t len, int flags) {
ssize_t ret = original_send(sockfd, buf, len, flags);
TrafficCollector::enQueueTx(MSG_TYPE_SEND, sockfd, ret);
return ret;
}
type为MSG_TYPE_SEND,调用TrafficCollector的enQueueRx方法。
sendto
ssize_t my_sendto(int sockfd, const void *buf, size_t len, int flags,
const struct sockaddr *dest_addr, socklen_t addrlen) {
ssize_t ret = original_sendto(sockfd, buf, len, flags, dest_addr, addrlen);
TrafficCollector::enQueueTx(MSG_TYPE_SENDTO, sockfd, ret);
return ret;
}
type为MSG_TYPE_SENDTO,调用TrafficCollector的enQueueRx方法。
sendmsg
ssize_t my_sendmsg(int sockfd, const struct msghdr *msg, int flags) {
ssize_t ret = original_sendmsg(sockfd, msg, flags);
TrafficCollector::enQueueTx(MSG_TYPE_SENDMSG, sockfd, ret);
return ret;
}
type为MSG_TYPE_SENDMSG,调用TrafficCollector的enQueueRx方法。
enQueueRx
可以看到上边除了connect和close方法外,其他都调用到了TrafficCollector的enQueueRx方法,我们看看这个方法做了什么。
void TrafficCollector::enQueueTx(int type, int fd, size_t len) {
enQueueMsg(type, fd, len);
}
void enQueueMsg(int type, int fd, size_t len) {
shared_ptr<TrafficMsg> msg = make_shared<TrafficMsg>(type, fd, 0, getKeyAndSaveStack(fd), len);
msgQueue.push(msg);
queueMutex.unlock();
}
enQueueRx方法在不断的封装TrafficMsg对象并存入队列msgQueue,每个方法间区别在于type的定义不同。所以到这里我们可以有一个简单的结论了: 被hook的这些方法,执行时会不断的获取socket此时的信息,封装成TrafficMsg对象存入队列供消费者线程进行消费,所以这里扮演的角色就是生产者线程。此时我们回头再去看loop线程。
loop
loop线程就是上边提到的消费者线程,消费线程不断的循环,当msgQueue中有数据时,就开始做进一步的处理。
void loop() {
while (loopRunning) {
if (msgQueue.empty()) {
queueMutex.lock();
} else {
shared_ptr<TrafficMsg> msg = msgQueue.front();
if (msg->type == MSG_TYPE_CONNECT) {
//socket开始连接,以文件描述符为key, 地址为value存入fdFamilyMap中
fdFamilyMap[msg->fd] = msg->sa_family;
} else if (msg->type == MSG_TYPE_READ) {
//接收数据,开始read,假如fdFamilyMap存在,这个fd,说明已连接过
if (fdFamilyMap.count(msg->fd) > 0) {
appendRxTraffic(msg->threadName, msg->len);
}
} else if (msg->type >= MSG_TYPE_RECV && msg->type <= MSG_TYPE_RECVMSG) {
//接收数据
if (fdFamilyMap[msg->fd] != AF_LOCAL) {
appendRxTraffic(msg->threadName, msg->len);
}
} else if (msg->type == MSG_TYPE_WRITE) {
//写入数据
if (fdFamilyMap.count(msg->fd) > 0) {
appendTxTraffic(msg->threadName, msg->len);
}
} else if (msg->type >= MSG_TYPE_SEND && msg->type <= MSG_TYPE_SENDMSG) {
//写入数据
if (fdFamilyMap[msg->fd] != AF_LOCAL) {
appendTxTraffic(msg->threadName, msg->len);
}
} else if (msg->type == MSG_TYPE_CLOSE) {
//关闭
fdThreadNameMapLock.lock();
fdThreadNameMap.erase(msg->fd);
fdThreadNameMapLock.unlock();
fdFamilyMap.erase(msg->fd);
}
msgQueue.pop();
}
}
}
从上边代码的注释可以看到,关键的两个方法是上传数据时调用的appendTxTraffic用来记录上行的数据流量,接收数据时调用的appendRxTraffic方法用来记录下行的数据流量,所以读或写的过程也就是不断的实时记录流量的过程。
appendTxTraffic
以线程名为key, 流量值为value存入txTrafficInfoMap中。
void appendTxTraffic(const string& threadName, long len) {
txTrafficInfoMapLock.lock();
txTrafficInfoMap[threadName] += len;
txTrafficInfoMapLock.unlock();
}
appendRxTraffic
以线程名为key, 流量值为value存入rxTrafficInfoMap中。
void appendRxTraffic(const string& threadName, long len) {
rxTrafficInfoMapLock.lock();
rxTrafficInfoMap[threadName] += len;
rxTrafficInfoMapLock.unlock();
}
看到这里感觉有点奇怪了,怎么只是将数据记录到对应的map中,什么时候取的数据?在TrafficPlugin.java中我们可以找到这个方法getTrafficInfoMap,它的返回值是HashMap,其实就是上边提到的存储起来的流量信息。
getTrafficInfoMap
public HashMap<String, String> getTrafficInfoMap(int type) {
//进入native层的方法
return nativeGetTrafficInfoMap(type);
}
来到TrafficCollector的getTrafficInfoMap方法。
static jobject nativeGetTrafficInfoMap(JNIEnv *env, jclass, jint type) {
return TrafficCollector::getTrafficInfoMap(type);
}
可以看到,下面的逻辑很清晰,通过构造一个Java层的HashMap对象,并将指定类型的信息从c++层的map对象中转移到HashMap中,这样一来,就实时的拿到了当前流量消耗的数据,数据包含线程名和流量值,拿到线程名后可以通过getStackTraceMap拿到线程名和堆栈信息的映射关系,从而获取到实时的调用堆栈信息。
jobject TrafficCollector::getTrafficInfoMap(int type) {
...
if (type == TYPE_GET_TRAFFIC_RX) {
//接收的数据
for (auto & it : rxTrafficInfoMap) {
//线程名
jstring threadName = env->NewStringUTF(it.first.c_str());
//流量长度,是一个数值型
jstring traffic = env->NewStringUTF(to_string(it.second).c_str());
env->CallObjectMethod(jHashMap, mapPut, threadName, traffic);
}
} else if (type == TYPE_GET_TRAFFIC_TX) {
//上传的数据
for (auto & it : txTrafficInfoMap) {
jstring threadName = env->NewStringUTF(it.first.c_str());
jstring traffic = env->NewStringUTF(to_string(it.second).c_str());
env->CallObjectMethod(jHashMap, mapPut, threadName, traffic);
}
}
return jHashMap;
}
看一个效果图
stop
stop方法做的就是清理资源的工作了,因为核心功能都在native层,所以清理的工作还是会进入native层做处理,第一跳出循环线程,第二清理内存中的map映射表,至此,流量监控的代码分析完成。
@Override
public void stop() {
nativeReleaseMatrixTraffic();
}
static void nativeReleaseMatrixTraffic(JNIEnv *env, jclass) {
//停止循环线程
TrafficCollector::stopLoop();
//清理所有的映射表
TrafficCollector::clearTrafficInfo();
}
总结
流量监控的实现方式是通过hook c++层socket的发起和接收相关的方法,拦截到对应方法从而对过程中涉及到的流量信息进行采集,采集到的数据就可以实时的获取到,以做进一步的分析。
Android 学习笔录
Android 性能优化篇:https://qr18.cn/FVlo89
Android Framework底层原理篇:https://qr18.cn/AQpN4J
Android 车载篇:https://qr18.cn/F05ZCM
Android 逆向安全学习笔记:https://qr18.cn/CQ5TcL
Android 音视频篇:https://qr18.cn/Ei3VPD
Jetpack全家桶篇(内含Compose):https://qr18.cn/A0gajp
OkHttp 源码解析笔记:https://qr18.cn/Cw0pBD
Kotlin 篇:https://qr18.cn/CdjtAF
Gradle 篇:https://qr18.cn/DzrmMB
Flutter 篇:https://qr18.cn/DIvKma
Android 八大知识体:https://qr18.cn/CyxarU
Android 核心笔记:https://qr21.cn/CaZQLo
Android 往年面试题锦:https://qr18.cn/CKV8OZ
2023年最新Android 面试题集:https://qr18.cn/CgxrRy
Android 车载开发岗位面试习题:https://qr18.cn/FTlyCJ
音视频面试题锦:https://qr18.cn/AcV6Ap