QPS秒级限流
- 一、Aop
- 二、自定义注解
- 三、测试类
- 实例
QPS简介:
QPS(Query Per Second),QPS 其实是衡量吞吐量(Throughput)的一个常用指标,就是说服务器在一秒的时间内处理了多少个请求 —— 我们通常是指 HTTP 请求,显然数字越大代表服务器的负荷越高、处理能力越强。作为参考,一个有着简单业务逻辑(包括数据库访问)的程序在单核心运行时可以提供 50 - 100 左右的 QPS,即每秒可以处理 50 - 100 个请求。
pom.xml:
<!-- RateLimiter -->
<dependency>
<groupId>com.google.guava</groupId>
<artifactId>guava</artifactId>
<version>30.0-android</version>
<scope>compile</scope>
</dependency>
<!-- AOP -->
<dependency>
<groupId>org.aspectj</groupId>
<artifactId>aspectjrt</artifactId>
</dependency>
<!-- 访问http-API -->
<dependency>
<groupId>org.apache.httpcomponents</groupId>
<artifactId>httpcore</artifactId>
</dependency>
<dependency>
<groupId>org.apache.httpcomponents</groupId>
<artifactId>httpclient</artifactId>
</dependency>
一、Aop
package cn.edu.shanghaitech.kysjutil.aop;
import cn.edu.shanghaitech.kysjutil.qps.QPS;
import com.google.common.util.concurrent.RateLimiter;
import lombok.extern.slf4j.Slf4j;
import org.aspectj.lang.ProceedingJoinPoint;
import org.aspectj.lang.Signature;
import org.aspectj.lang.annotation.Around;
import org.aspectj.lang.annotation.Aspect;
import org.aspectj.lang.annotation.Pointcut;
import org.aspectj.lang.reflect.MethodSignature;
import org.springframework.stereotype.Component;
import org.springframework.web.bind.annotation.ResponseBody;
import java.lang.reflect.Method;
@Slf4j
@Aspect
@Component
public class RateLimitAop {
// RateLimiter限流桶 默认值10
private RateLimiter rateLimiter = RateLimiter.create(10);
/**
* 定义切点
* 1、通过扫包切入
* 2、带有指定注解切入
*/
@Pointcut("@annotation(cn.edu.shanghaitech.kysjutil.qps.QPS)")
public void pointcut() {
}
// 环绕通知
@ResponseBody
@Around(value = "pointcut()")
public Object aroundNotice(ProceedingJoinPoint pjp) throws Throwable {
log.info("拦截到了{}方法...", pjp.getSignature().getName());
Signature signature = pjp.getSignature();
MethodSignature methodSignature = (MethodSignature) signature;
//获取目标方法
Method targetMethod = methodSignature.getMethod();
if (targetMethod.isAnnotationPresent(QPS.class)) {
//获取目标方法的@QPS注解
QPS qps = targetMethod.getAnnotation(QPS.class);
// 判断注解设置值是否大于限流桶的默认值
long limitNum = Long.parseLong(qps.limitNum());
if(limitNum > Math.round(rateLimiter.getRate())){
rateLimiter.setRate(limitNum);
}
if (!rateLimiter.tryAcquire()){
log.error("访问频繁...请稍后重试!!!");
return "访问频繁...请稍后重试!!!";
}
}
log.info("执行成功!!!...做一些业务处理");
return pjp.proceed();
}
}
二、自定义注解
@Documented 注解
功能:指明修饰的注解,可以被例如javadoc此类的工具文档化,只负责标记,没有成员取值。
@Retention 注解
功能:指明修饰的注解的生存周期,即会保留到哪个阶段。
RetentionPolicy的取值包含以下三种:
SOURCE:源码级别保留,编译后即丢弃。
CLASS: 编译级别保留,编译后的class文件中存在,在jvm运行时丢弃,这是默认值。
RUNTIME: 运行级别保留,编译后的class文件中存在,在jvm运行时保留,可以被反射调用。
@Target 注解
功能:指明了修饰的这个注解的使用范围,即被描述的注解可以用在哪里。
ElementType的取值包含以下几种:
TYPE: 类,接口或者枚举
FIELD: 域,包含枚举常量
METHOD: 方法
PARAMETER: 参数
CONSTRUCTOR: 构造方法
LOCAL_VARIABLE: 局部变量
ANNOTATION_TYPE: 注解类型
PACKAGE: 包
package cn.edu.shanghaitech.kysjutil.qps;
import java.lang.annotation.*;
import java.util.concurrent.TimeUnit;
@Target(ElementType.METHOD)
@Retention(RetentionPolicy.RUNTIME)
@Documented
public @interface QPS {
/**
* 每秒向桶中放入令牌的数量 默认最大即不做限流
* @return
*/
String limitNum() default "0";
/**
* 获取令牌的等待时间 默认0
* @return
*/
int timeOut() default 0;
/**
* 超时时间单位
* @return
*/
TimeUnit timeOutUnit() default TimeUnit.SECONDS;
}
三、测试类
package cn.edu.shanghaitech.kysjutil.test;
import org.apache.http.HttpEntity;
import org.apache.http.client.config.RequestConfig;
import org.apache.http.client.methods.CloseableHttpResponse;
import org.apache.http.client.methods.HttpGet;
import org.apache.http.client.methods.HttpPost;
import org.apache.http.impl.client.CloseableHttpClient;
import org.apache.http.impl.client.HttpClients;
import org.apache.http.util.EntityUtils;
import java.io.IOException;
import java.util.concurrent.*;
public class Test {
static int num = 0; //访问失败次数
static int fwnum = 0; // 总访问次数
int ThreadNum = 60; //定义线程数
// static List<Map<String,Object>> infences = new ArrayList<>();
//发令枪
CountDownLatch countDownLatch = new CountDownLatch(ThreadNum);
public void runThread() {
//定义线程池
ExecutorService executorService = Executors.newFixedThreadPool(ThreadNum);
for (int i = 0; i < ThreadNum; i++) {
executorService.submit(buildThread());
}
}
public Thread buildThread() {
//创建线程
Thread thread = new Thread(new Runnable() {
public void run() {
synchronized (countDownLatch) { //这一步不知道有没有必要,但是我还是加了
//发令枪减1
countDownLatch.countDown();
}
try {
System.out.println("线程:" + Thread.currentThread().getName() + "准备");
//线程阻塞
countDownLatch.await();
//这一步是调用线上的接口,发送HTTP请求 127.0.0.1可以换成你的ip地址 post是我的请求方式
Object appectContext = appectContext("http://localhost:8099/wuzhi/getOrder?Num=123456", "post");
fwnum++; //访问的总次数
if (appectContext == null) {
num++; //访问失败的次数
}
System.out.println("接受的值" + appectContext);
System.out.println("错误次数" + num);
System.out.println("总次数" + fwnum);
} catch (InterruptedException e) {
e.printStackTrace();
}
}
});
return thread;
}
public static void main(String[] args) {
Test h = new Test();
h.runThread();
}
public Object appectContext(String url, String states) {
// 配置请求信息(请求时间)
RequestConfig rc = RequestConfig.custom().setSocketTimeout(5000)
.setConnectTimeout(5000).build();
// 获取使用DefaultHttpClient对象
CloseableHttpClient httpclient = HttpClients.createDefault();
// 返回结果
String result = null;
if (url != null) { //请求路径不是null
if (states.equals("post")) { //post请求
//System.out.println("post请求");
HttpPost httpPost = new HttpPost(url); //这些post请求和get请求我就不多说了 json格式的参数自己在网上搜一搜
httpPost.setConfig(rc);
try {
CloseableHttpResponse response = httpclient.execute(httpPost);
HttpEntity entity = response.getEntity();
result = EntityUtils.toString(entity, "UTF-8");
return result;
} catch (IOException e) {
e.printStackTrace();
}
} else if (states.equals("get")) { //get请求
//System.out.println("get请求");
HttpGet httpGet = new HttpGet(url);
httpGet.setConfig(rc);
try {
CloseableHttpResponse response = httpclient.execute(httpGet);
return response;
} catch (IOException e) {
e.printStackTrace();
}
} else {
System.out.println("请求失败");
}
} else { //传的路径是null
System.out.println("路径是null");
}
return null;
}
}
实例
API加入@QPS注解设置limitNum次数,通过测试类进行访问调试就好了。
打印访问API返回结果。