一、代码
1、先上实现代码,如下
package cn.jt.emqxspringbootdesignpattern.emqx.controller;
import java.util.Queue;
import java.util.concurrent.ConcurrentLinkedQueue;
import java.util.concurrent.Executors;
import java.util.concurrent.ScheduledExecutorService;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicInteger;
public class QpsCounter {
/**
* 多少毫秒作为一个window 窗口(bucket)
*/
private static final int SEGMENT_DURATION_MS = 250;
/**
* 间隔秒数
*/
private static final int REPORT_INTERVAL_SECONDS = 1;
// 几个 window 窗口,bucket ,这里默认4个,即250ms一个区间,1s内4个
private static final int MAX_LIVED_WINDOW_SIZE = REPORT_INTERVAL_SECONDS * 1000 / SEGMENT_DURATION_MS;
/**
* 全局计数,每次都增加,在每隔一秒打印qps的时候,置空即可
*/
private final AtomicInteger globalCounter;
/**
* 定时任务打印qps
*/
private final ScheduledExecutorService scheduler;
/**
* 用于存放 区间标识
*/
private final Queue<Integer> segmentQueue;
public QpsCounter() {
globalCounter = new AtomicInteger(0);
scheduler = Executors.newScheduledThreadPool(1);
// 一个线程安全的队列实现
segmentQueue = new ConcurrentLinkedQueue<>();
}
public void start() {
scheduler.scheduleAtFixedRate(this::reportQps, REPORT_INTERVAL_SECONDS, REPORT_INTERVAL_SECONDS, TimeUnit.SECONDS);
}
public void stop() {
scheduler.shutdown();
try {
if (!scheduler.awaitTermination(10, TimeUnit.SECONDS)) {
scheduler.shutdownNow();
}
} catch (InterruptedException e) {
scheduler.shutdownNow();
}
}
public void requestReceived() {
// 用 当前毫秒数据 / SEGMENT_DURATION_MS(250),计算出当前window窗口是那个
int currentSegment = (int) (System.currentTimeMillis() / SEGMENT_DURATION_MS);
globalCounter.incrementAndGet();
segmentQueue.offer(currentSegment);
removeOldestWindowIfNeeded();
}
private void removeOldestWindowIfNeeded() {
while (segmentQueue.size() > MAX_LIVED_WINDOW_SIZE) {
segmentQueue.poll();
}
}
private void reportQps() {
int totalQps = globalCounter.getAndSet(0);
System.out.println("QPS in the last " + (MAX_LIVED_WINDOW_SIZE * SEGMENT_DURATION_MS) + " milliseconds: " + totalQps);
}
public static void main(String[] args) {
QpsCounter qpsCounter = new QpsCounter();
qpsCounter.start();
for (int i = 0; i < 100; i++) {
qpsCounter.requestReceived();
try {
// 这里的 睡眠 0.1s 就是模拟业务处理
Thread.sleep(100);
} catch (InterruptedException e) {
e.printStackTrace();
}
}
qpsCounter.stop();
}
}
2、运行上面的 main
方法,可以得到控制台输出,大约每秒10个QPS
3、如何在自己的业务代码里面使用,我这里就是统计的 mqttPlatformClient.publish(xxxx)
的qps,换成你自己的业务代码即可
其实就是在自己的业务代码里面,开启即可,每次请求之后,我们执行
qpsCounter.requestReceived();
计数即可
二、基本理论:
下面的的理论内容,原文是 Snetinel的滑动时间窗口是如何统计QPS的 ,说的挺好的,通俗易懂
1、首先我们要明确:任意时刻的 QPS 是指过去一秒内产生的请求个数。所以可以推导出,如果要获得时间轴上 X 位置上的 QPS,应该统计的是在 [X-1000ms,X) 这个时间范围通过的请求数。
2、所以,我们应该这么做(毫秒为维度):
3、每一次请求过来,我使用当前时间毫秒数在内存中查找是否存在一个统计请求个数的原子计数器,如果没有则创建并且计数器 + 1。当在 X 时刻,需要统计 QPS 时,我可以获得 [X-1000ms,X) 这个区间内所有存在的计数器做累加,就可以获得 X 时刻的 QPS 了。
4、这样做的问题在于:粒度太细了,在系统本身流量比较大的情况下,每秒可能就会产生几百个计数器,因为我们要限制 QPS,所以每次请求都需要做统计去和设置的阈值进行比较来判断本次请求应该被拦截或者放行。
5、所以我们做下优化:不用每一个毫秒刻度对应一个计数器,可以把时间窗口分成很多个段(sentinel 里面叫 Bucket),每个分段一个计数器,如下图所示:
6、我把 [X-1000,X) 这一秒的时间窗口分为了四段,也就是说每段为 250ms。当请求 Request-1 过来的时候,可以计算得到它处于 A 段,和前面的做法一样,我们先判断 A 段的计数器是否存在,不存在则创建并且计数器 + 1,这个时候我需要统计 QPS 的话,只需要往前再拿三个段的计数器,加上本身所在这个段的计数器求和就行了(也就是 A,B,C,D 这四个段,Request-2 过来的时候,就统计 B,C,D,E 这四个段)。这样对资源的消耗可以大大的减少。它的缺点就在于,如果段越宽,粒度就越粗,统计的 QPS 就会偏差越大(sentinel 默认为两段,每段 500ms)。
7、时间是永恒的,如果把每一秒的统计数据都放在内存,内存只会被无限消耗。并且,时刻 X 的 QPS 只需要前一秒的数据就好了,至于一秒之前的数据,则可以丢弃不用或者归档用于监控等。所以我们需要清理过期的段,清理的方法很多,这里介绍一下 sentinel 是怎么做的。
8、sentinel 把一秒钟的长度设计成一个圆环形,类似钟表,钟表只能显示 24 小时,而 sentinel 的圆环表示了一秒钟。
9、当 Request 进来时,首先确定这个 request 应该处于哪个段(比如我们可以通过当前时间戳的后三位 [0,999] 来确定)。
10、我们找到段 A 后,首先判断当前段的实例,是否存在,如果不存在,则创建,并记录当前段的开始时间 X。
11、如果存在,则判断当前段是否已经过期(因为不仅是 X+100 会落到 A 段,X-1000+100 也会落到 A 段),过期则重新创建并更新当前段的开始时间。
12、统计时也需要判断段是否过期,过期的则不要统计。