目录
代码示例
接口
代理
接口实现
限流工厂
限流处理器接口
直接交换处理器
限流处理器
限流配置
滑动窗口限流
通过代理模式+滑动窗口,限流请求第三方平台,避免出现第三方平台抛出限流异常,影响正常业务流程,从出口出发进行限流请求。
代码示例
接口
/**
* 第三方请求
*/
public interface ThirdApi {
/**
* 发送消息
*
* @param userId 用户id
* @param message 消息
* @return 发送是否成功
*/
boolean sendMessage(String userId, String message);
}
代理
/**
* 第三方请求代理
*/
@Component
public class ProxyThirdApi implements ThirdApi {
@Resource
private ThirdApiServiceClient thirdApiServiceClient;
@Resource
private LimitProcessorFactory limitProcessorFactory;
@Resource
private YmlConstant ymlConstant;
private ThirdApi thirdApi;
@PostConstruct
public void initThirdApi() {
thirdApi = new ThirdApiImpl(thirdApiServiceClient, ymlConstant);
}
@Override
@SneakyThrows
public boolean sendMessage(String userId, String message) {
// 限流
String bizLimit = "MSG_SEND_LIMIT";
Object result = limitProcessorFactory.getProcessor(bizLimit).process(
() -> thirdApi.sendMessage(userId, message)
);
if (result instanceof Boolean) {
return (Boolean) result;
} else {
return false;
}
}
}
接口实现
/**
* 第三方请求实现
*
*/
@Slf4j
@AllArgsConstructor
public class ThirdApiImpl implements ThirdApi {
private final ThirdApiServiceClient thirdApiServiceClient;
private final YmlConstant ymlConstant;
@Override
public boolean sendMessage(String userId, String message) {
MessageReq messageReq = new MessageReq();
messageReq.setContent(message);
messageReq.setReceiveId(userId);
log.info("[ThirdApiImpl][sendMessage] {}", JSON.toJSONString(messageReq));
HttpResponse<SendMessagesResp> sendResp = thirdApiServiceClient.sendMessage(messageReq);
if (sendResp.isOk()) {
return true;
} else {
log.error("[ThirdApiImpl][sendMessage] 消息发送失败,返回信息:{}", JSON.toJSONString(sendResp));
return false;
}
}
}
限流工厂
/**
* 限流工厂
*
*/
@Component
public class LimitProcessorFactory {
@Resource
private LimitProperties properties;
@Getter
private Map<String, LimitProperties.LimitData> propertiesMap;
private final Map<String, LimiterProcessor> processorMap = new ConcurrentHashMap<>(10);
@PostConstruct
public void initPropertiesMap() {
List<LimitProperties.LimitData> props = properties.getProps();
if (CollectionUtils.isEmpty(props)) {
propertiesMap = Collections.emptyMap();
} else {
propertiesMap = props.stream().collect(
Collectors.toMap(LimitProperties.LimitData::getName, Function.identity())
);
}
}
/**
* 获取限流处理器
*
* @param name 业务名称
* @return 限流处理器
*/
public LimiterProcessor getProcessor(String name) {
LimitProperties.LimitData props = propertiesMap.get(name);
if (Objects.isNull(props)) {
throw new BusinessException(String.format("无法找到[%s]的处理器配置", name));
}
if (props.getEnabled()) {
return processorMap.computeIfAbsent(props.getName(), name -> {
TimeUnit timeUnit = props.getTimeUnit();
// 使用窗口滑动算法进行限流
RateLimiter limiter = new SlidingWindowRateLimiter(props.getInterval(), props.getLimit(), timeUnit);
return new LimiterProcessor(name, timeUnit.toMillis(props.getWaitTime()), limiter);
});
} else {
return new SynchronousProcessor();
}
}
}
限流处理器接口
/**
* 限流处理器接口
*/
public interface LimiterProcessor {
/**
* 限流
*
* @param callback 回调
* @return 执行结果
* @throws Throwable Throwable
*/
Object process(LimiterCallback callback) throws Throwable;
}
直接交换处理器
/**
* 直接交换处理器
*
* @author zhimajiang
*/
@Slf4j
public class SynchronousProcessor implements LimiterProcessor {
@Override
public Object process(LimiterCallback callback) throws Throwable {
return callback.process();
}
}
限流处理器
/**
* 限流处理器
*
*/
@Slf4j
@AllArgsConstructor
public class Processor implements LimiterProcessor {
private final String name;
private final long waitTime;
private final RateLimiter rateLimiter;
@Override
public Object process(LimiterCallback callback) throws Throwable {
while (true) {
if (rateLimiter.tryAcquire()) {
// 未被限流,则尝试唤醒其他被限流的任务
Object proceed = callback.process();
synchronized (this) {
this.notifyAll();
}
return proceed;
} else {
// 已被限流则进入阻塞
log.info("LimiterProcessor][process] {}-限流", name);
synchronized (this) {
try {
this.wait(waitTime);
} catch (InterruptedException ignored) {
}
}
}
}
}
}
限流配置
/**
* 限流配置
*
*/
@Data
@Configuration
@ConfigurationProperties("limit")
public class LimitProperties {
/**
* 限流配置
*/
private List<LimitProperties.LimitData> props;
@Data
public static class LimitData {
/**
* 名称
*/
private String name;
/**
* 是否启用
*/
private Boolean enabled = false;
/**
* 时间间隔
*/
private int interval;
/**
* 限制阈值
*/
private int limit;
/**
* 阻塞等待时间
*/
private int waitTime = 1000;
/**
* 时间单位
*/
private TimeUnit timeUnit = TimeUnit.MILLISECONDS;
}
}
滑动窗口限流
/**
* 滑动窗口限流
*
*/
public class SlidingWindowRateLimiter implements RateLimiter {
/**
* 子窗口数量
*/
private final int slotNum;
/**
* 子窗口大小
*/
private final long slotSize;
/**
* 限流阈值
*/
private final int limit;
/**
* 上一次的窗口结束时间
*/
private long lastTime;
/**
* 子窗口流量计数
*/
private final AtomicInteger[] counters;
/**
* 滑动窗口限流
*
* @param windowSize 时间窗口大小
* @param slotNum 子窗口数量
* @param limit 限流阈值
* @param timeUnit 时间单位
*/
public SlidingWindowRateLimiter(int windowSize, int slotNum, int limit, TimeUnit timeUnit) {
long windowSizeMills = timeUnit.toMillis(windowSize);
this.slotNum = slotNum;
this.slotSize = windowSizeMills / slotNum;
this.limit = limit;
this.lastTime = System.currentTimeMillis();
this.counters = new AtomicInteger[slotNum];
resetCounters();
}
/**
* 滑动窗口限流
*
* @param windowSize 时间窗口大小
* @param limit 限流阈值
* @param timeUnit 时间单位
*/
public SlidingWindowRateLimiter(int windowSize, int limit, TimeUnit timeUnit) {
this(windowSize, 5, limit, timeUnit);
}
/**
* 滑动窗口限流
*
* @param windowSize 时间窗口大小(毫秒)
* @param limit 限流阈值
*/
public SlidingWindowRateLimiter(int windowSize, int limit) {
this(windowSize, 5, limit, TimeUnit.MILLISECONDS);
}
/**
* 重置子窗口流量计数
*/
private void resetCounters() {
for (int i = 0; i < this.slotNum; i++) {
this.counters[i] = new AtomicInteger(0);
}
}
/**
* 限流请求
*
* @return true-允许执行 false-触发限流
*/
@Override
public synchronized boolean tryAcquire() {
long currentTime = System.currentTimeMillis();
// 小窗口移动格数
int slideNum = (int) Math.floor((double) (currentTime - this.lastTime) / this.slotSize);
slideWindow(slideNum);
// 窗口时间内的请求总数
int sum = Arrays.stream(this.counters).mapToInt(AtomicInteger::get).sum();
this.lastTime = this.lastTime + slideNum * slotSize;
if (sum >= limit) {
return false;
} else {
this.counters[this.slotNum - 1].incrementAndGet();
return true;
}
}
/**
* 将计数器内全部元素向左移动num个位置
*
* @param num 移动位置个数
*/
private void slideWindow(int num) {
if (num == 0) {
return;
}
if (num >= this.slotNum) {
// 如果移动步数大于子窗口个数,则计数全部清零
resetCounters();
return;
}
// 对于a[0]~a[num-1]来说,移动元素则代表删除元素,所以直接从a[num]开始移动
for (int index = num; index < this.slotNum; index++) {
// 移动元素
int newIndex = index - num;
this.counters[newIndex] = this.counters[index];
this.counters[index].getAndSet(0);
}
}
}