一、服务端的自我保护(实现限流)
为什么需要限流器?
我们先看服务端,举个例子,假如我们要发布一个 Rrpc 服务,作为服务端接收调用端发送过来的请求,这时服务端的某个节点负载压力过高了,我们该如何保护这个节点?
这个问题还是很好解决的,既然负载压力高,那就不让它再接收太多的请求就好了,等接收和处理的请求数量下来后,这个节点的负载压力自然就下来了。
在 rrpc 调用中服务端的自我保护策略就是限流,
限流是一个比较通用的功能,我们可以在 rrpc 框架中集成限流的功能,让使用方自己去配置限流阈值;我们还可以在服务端添加限流逻辑,当调用端发送请求过来时,服务端在执行业务逻辑之前先执行限流逻辑,如果发现访问量过大并且超出了限流的阈值,就让服务端直接抛回给调用端一个限流异常,否则就执行正常的业务逻辑。
我们可以假设这样一个场景:我发布了一个服务,提供给多个应用的调用方去调用,这时有一个应用的调用方发送过来的请求流量要比其它的应用大很多,这时我们就应该对这个应用下的调用端发送过来的请求流量进行限流。所以说我们在做限流的时候要考虑应用级别的维度,甚至是 IP 级别的维度,这样做不仅可以让我们对一个应用下的调用端发送过来的请求流量做限流,还可以对一个 IP 发送过来的请求流量做限流。
在服务端实现限流,配置的限流阈值是作用在每个服务节点上的。比如说我配置的阈值是每秒 1000 次请求,那么就是指一台机器每秒处理 1000 次请求;如果我的服务集群拥有 10 个服务节点,那么我提供的服务限流阈值在最理想的情况下就是每秒 10000 次。
我们可以提供一个专门的限流服务,让每个节点都依赖一个限流服务,当请求流量打过来时,服务节点触发限流逻辑,调用这个限流服务来判断是否到达了限流阈值。我们甚至可以将限流逻辑放在调用端,调用端在发出请求时先触发限流逻辑,调用限流服务,如果请求量已经到达了限流阈值,请求都不需要发出去,直接返回给动态代理一个限流异常即可。
实现限流的方式
方式有很多,比如最简单的计数器,还有可以做到平滑限流的滑动窗口、漏斗算法以及令牌桶算法等等。其中令牌桶算法最为常用。我们主要实现的是IP维度通过令牌桶算法的限流。
令牌桶算法的实现步骤
令牌算法是以固定速度rate往一个桶内增加令牌tokens,当桶内令牌满了后(总容量capacity),就停止增加令牌。上游请求时,先从桶里拿一个令牌,后端只服务有令牌的请求,所以后端处理速度不一定是匀速的。当有突发请求过来时,如果令牌桶是满的,则会瞬间消耗桶中存量的令牌。如果令牌还不够,那么再等待发放令牌(固定速度),这样就导致处理请求的速度超过发放令牌的速度。
限流代码
public class TokenBuketRateLimiter implements RateLimiter{
//代表令牌的数量 , >0 说明有令牌, 能放行,放行就-1,==0 无令牌
private int tokens;
//限流的本质就是 令牌数
//总容量
private final int capacity;
//每秒加500个令牌,不能超过总数容量
//使用定时任务去加 --> 启动定时任务 每秒执行一次 token+500
//加令牌的速率
private final int rate;
//上一次放令牌的时间
private Long lastTokenTime = System.currentTimeMillis();
public TokenBuketRateLimiter(int capacity, int rate) {
this.capacity = capacity;
this.rate = rate;
lastTokenTime = System.currentTimeMillis();
tokens = capacity;
}
/**
* 多线程同时使用该方法,要保证线程安全synchronized
* 判断请求是否可以放行
* @return true 放行
*/
public synchronized boolean allowRequest(){
//1.给令牌同添加令牌 计算从现在到上一次的时间间隔需要添加的令牌数
Long currentTime = System.currentTimeMillis();
long timeInterval = currentTime - lastTokenTime;
//距离上一次请求 超过1秒 才会加令牌
if (timeInterval >= 1000/rate){
int needAddTokens = (int) (timeInterval * rate / 1000);
System.out.println("needAddTokens="+ needAddTokens);
//给令牌桶添加令牌 不能超过容量
tokens = Math.min(needAddTokens + tokens,capacity);
System.out.println("tokens="+ tokens);
//保存最后一次时间
this.lastTokenTime = System.currentTimeMillis();
}
//2.自己获取令牌
if (tokens > 0){
tokens --;
System.out.println("请求被放行------------------------");
return true;
}else {
System.out.println("请求被拦截------------------------");
return false;
}
}
public static void main(String[] args) {
TokenBuketRateLimiter rateLimiter = new TokenBuketRateLimiter(10,10);
for (int i = 0; i < 1000; i++) {
try {
Thread.sleep(10);
} catch (InterruptedException e) {
throw new RuntimeException(e);
}
boolean allowRequest = rateLimiter.allowRequest();
System.out.println("allowRequest = " + allowRequest);
}
}
}
当进行方法调用时令牌桶内没令牌时,就会限流
二、调用端的自我保护(简单熔断器的实现)
为什么需要熔断器?
举个例子,假如我要发布一个服务 B,而服务 B 又依赖服务 C,当一个服务 A 来调用服务 B 时,服务 B 的业务逻辑调用服务 C,而这时服务 C 响应超时了,由于服务 B 依赖服务 C,C 超时直接导致 B 的业务逻辑一直等待,而这个时候服务 A 在频繁地调用服务 B,服务 B 就可能会因为堆积大量的请求而导致服务宕机。
熔断器的机制
熔断器的工作机制主要是关闭、打开和半打开这三个状态之间的切换。
1. 在正常情况下,熔断器是关闭的。
2. 当调用端调用下游服务出现异常时,熔断器会收集异常指标信息进行计算,当达到熔断条件时熔断器打开,
这时调用端再发起请求是会直接被熔断器拦截,并快速地执行失败逻辑;
3. 当熔断器打开一段时间后,会转为半打开状态,这时熔断器允许调用端发送一个请求给服务端,如果这次请
求能够正常地得到服务端的响应,则将状态置为关闭状态,否则设置为打开。
熔断器代码
理论上:标准的断路器应有3种状态 open close half_open,我们只选取两种,定义三种需要枚举
public class CircuitBreaker {
//开关 需要保证线程安全
private volatile boolean isOpen = false;
//需要收集指标 异常的数量 比例
//总的请求数
private AtomicInteger requestCount = new AtomicInteger(0);
//异常的请求数
private AtomicInteger errorRequest = new AtomicInteger(0);
//允许异常的阈值
private int maxErrorRequest;
//异常的比例
private float maxErrorRate;
public CircuitBreaker(int maxErrorRequest, float maxErrorRate) {
this.maxErrorRequest = maxErrorRequest;
this.maxErrorRate = maxErrorRate;
}
//断路器的核心方法,判断是否开启
public boolean isBreak() {
//优先返回,如果已经打开了直接返回true
if (isOpen) {
return true;
}
//需要判断数据指标,是否满足当前阈值
if (errorRequest.get() > maxErrorRequest) {
this.isOpen = true;
return true;
}
//判断异常率
if (errorRequest.get() > 0 && requestCount.get() > 0 &&
errorRequest.get() / (float) requestCount.get() > maxErrorRate) {
this.isOpen = true;
return true;
}
return false;
}
//每次发生请求或者异常 应该进行记录
public void recordRequest() {
this.requestCount.getAndIncrement();
}
public void recordErrorRequest() {
this.errorRequest.getAndIncrement();
}
/**
* 重置熔断器
*/
public void reset() {
this.isOpen = false;
this.requestCount.set(0);
this.errorRequest.set(0);
}
public static void main(String[] args) {
CircuitBreaker circuitBreaker = new CircuitBreaker(3, 0.2f);
new Thread(()->{
for (int i = 0; i < 1000; i++) {
try {
Thread.sleep(100);
} catch (InterruptedException e) {
throw new RuntimeException(e);
}
circuitBreaker.recordRequest();
int num = new Random().nextInt(100);
if (num > 70) {
circuitBreaker.recordErrorRequest();
}
boolean aBreak = circuitBreaker.isBreak();
String result = aBreak ? "断路器阻塞了请求" : "断路器放行了请求";
System.out.println(result);
}
}).start();
new Thread(()->{
for (;;) {
try {
Thread.sleep(2000);
} catch (InterruptedException e) {
throw new RuntimeException(e);
}
System.out.println("--------------------------------");
circuitBreaker.reset();
}
}).start();
try {
Thread.sleep(10000000);
} catch (InterruptedException e) {
throw new RuntimeException(e);
}
}
}