先看下官网文档关于并发线程数的解释:链接地址
public class FlowThreadDemo {
private static AtomicInteger pass = new AtomicInteger();
private static AtomicInteger block = new AtomicInteger();
private static AtomicInteger total = new AtomicInteger();
private static AtomicInteger activeThread = new AtomicInteger();
private static volatile boolean stop = false;
private static final int threadCount = 100;
private static int seconds = 60 + 40;
private static volatile int methodBRunningTime = 2000;
public static void main(String[] args) throws Exception {
System.out.println(
"MethodA will call methodB. After running for a while, methodB becomes fast, "
+ "which make methodA also become fast ");
tick();
initFlowRule();
for (int i = 0; i < threadCount; i++) {
Thread entryThread = new Thread(new Runnable() {
@Override
public void run() {
while (true) {
Entry methodA = null;
try {
//TimeUnit.MILLISECONDS.sleep(5);
methodA = SphU.entry("methodA");
activeThread.incrementAndGet();
Entry methodB = SphU.entry("methodB");
TimeUnit.MILLISECONDS.sleep(methodBRunningTime);
methodB.exit();
pass.addAndGet(1);
} catch (BlockException e1) {
block.incrementAndGet();
} catch (Exception e2) {
// biz exception
} finally {
total.incrementAndGet();
if (methodA != null) {
methodA.exit();
activeThread.decrementAndGet();
}
}
}
}
});
entryThread.setName("working thread");
entryThread.start();
}
}
private static void initFlowRule() {
List<FlowRule> rules = new ArrayList<FlowRule>();
FlowRule rule1 = new FlowRule();
rule1.setResource("methodA");
// set limit concurrent thread for 'methodA' to 20
rule1.setCount(20);
rule1.setGrade(RuleConstant.FLOW_GRADE_THREAD);
rule1.setLimitApp("default");
rules.add(rule1);
FlowRuleManager.loadRules(rules);
}
private static void tick() {
Thread timer = new Thread(new TimerTask());
timer.setName("sentinel-timer-task");
timer.start();
}
static class TimerTask implements Runnable {
@Override
public void run() {
long start = System.currentTimeMillis();
System.out.println("begin to statistic!!!");
long oldTotal = 0;
long oldPass = 0;
long oldBlock = 0;
while (!stop) {
try {
TimeUnit.SECONDS.sleep(1);
} catch (InterruptedException e) {
}
long globalTotal = total.get();
long oneSecondTotal = globalTotal - oldTotal;
oldTotal = globalTotal;
long globalPass = pass.get();
long oneSecondPass = globalPass - oldPass;
oldPass = globalPass;
long globalBlock = block.get();
long oneSecondBlock = globalBlock - oldBlock;
oldBlock = globalBlock;
System.out.println(seconds + " total qps is: " + oneSecondTotal);
System.out.println(TimeUtil.currentTimeMillis() + ", total:" + oneSecondTotal
+ ", pass:" + oneSecondPass
+ ", block:" + oneSecondBlock
+ " activeThread:" + activeThread.get());
if (seconds-- <= 0) {
stop = true;
}
if (seconds == 40) {
System.out.println("method B is running much faster; more requests are allowed to pass");
methodBRunningTime = 20;
}
}
long cost = System.currentTimeMillis() - start;
System.out.println("time cost: " + cost + " ms");
System.out.println("total:" + total.get() + ", pass:" + pass.get()
+ ", block:" + block.get());
System.exit(0);
}
}
}
运行结果:
官方给出的demo的大概意思就是:
设置流控规则为并发线程数==20
模拟100个并发线程执行methodA调用methodB,期间模拟了methodB的方法执行耗时。
在前60秒,methodB方法耗时2秒,所以在前60s内的每2s内会在20个线程执行一遍,所以会出现前一秒pass=20,后一秒pass=0的情况
在后20秒,methodB方法耗时20ms,所以20线程在每一秒里执行了大概25遍(wihle循环)
从demo打印的日志来看得出如下结论:
并发线程数控制的是方法执行的线程数量,对于应用而言,每秒处理的数据约等于 并发线程数量*单位时间内能够处理的次数。
带着结论自己测试一遍
并发线程数==2
分别测试并发100,500,1000,2000
并发线程数==10
从上述数据来看,并发线程数控制着处理处理入口流量的线程数,从给出的官方demo意图在于表述在面对大量请求时,能够很好的保护耗时操作不被大量的qps拖垮,从自身的测试看出,在处理数据rt近似平均的时候,越大的并发数能够处理更多的流量。
在面对突然的大流量时,如果机器的性能足够好,可以适当增大线程数,增加处理量,如果机器的性能一般时,为防止被突如其来的大量线程数拖垮时,可以起到限制作用,保护应用。