【技术应用】java接口幂等性实现方案
- 一、前言
- 二、幂等性
- 三、幂等设计思路
- 四、实现代码
- 五、总结
一、前言
最近在做一个线上的项目,与之前内网项目还是有很多差别的,尤其在安全性和并发性的处理上,要多做一些措施,第一步就是接口的幂等性
上,这也是接口并发请求安全的兜底保护,针对实际的业务场景,总结实现了一个接口幂等性的demo,在此分享一下;
二、幂等性
1、概念:
- 幂等性原本是数学上的概念,即使公式:
f(x)=f(f(x))
能够成立的数学性质。用在编程领域,则意为对同一个系统,使用同样的条件,一次请求和重复的多次请求对系统资源的影响是一致的。 - 幂等性是分布式系统设计中十分重要的概念,具有这一性质的接口在设计时总是秉持这样的一种理念:调用接口发生异常并且重复尝试时,总是会造成系统所无法承受的损失,所以必须阻止这种现象的发生。
- 实现幂等的方式很多,目前基于请求令牌机制适用范围较广。其核心思想是为每一次操作生成一个唯一性的凭证,也就是 token。一个 token在操作的每一个阶段只有一次执行权,一旦执行成功则保存执行结果。对重复的请求,返回同一个结果(报错)等。
2、接口幂等性设计原因
1)前端重复提交表单
在填写一些表格时候,用户填写完成提交,很多时候会因网络波动没有及时对用户做出提交成功响应,致使用户认为没有成功提交,然后一直点提交按钮,这时就会发生重复提交表单请求。
2)黑客恶意攻击
例如在实现用户投票这种功能时,如果黑客针对一个用户进行重复提交投票,这样会导致接口接收到用户重复提交的投票信息,这样会使投票结果与事实严重不符。
3)接口超时重复提交
大部分RPC框架[比如Dubbo],为了防止网络波动超时等造成的请求失败,都会添加重试机制,导致一个请求提交多次。
4)消息重复消费
当使用 MQ 消息中间件时候,如果Consumer消费超时或者producer发送了消息但由于网络原因未收到ACK导致消息重发,都会导致重复消费。
3、哪些接口需要幂等?
幂等性的实现与判断需要消耗一定的资源,因此不应该给每个接口都增加幂等性判断,要根据实际的业务情况和操作类型来进行区分。例如,我们在进行查询操作和删除操作时就无须进行幂等性判断。
查询操作查一次和查多次的结果都是一致的,因此我们无须进行幂等性判断。删除操作也是一样,删除一次和删除多次都是把相关的数据进行删除(这里的删除指的是条件删除而不是删除所有数据),因此也无须进行幂等性判断。
所以到底哪些接口需要幂等?关于这个问题需要从具体业务出发,但是也有规律可循如下表:
三、幂等设计思路
- 请求开始前,根据 key 查询 查到结果:报错 未查到结果:存入
key-value-expireTime
key=ip+url+args - 请求结束后,直接
删除 key
不管 key 是否存在,直接删除 是否删除,可配置 expireTime
过期时间,防止一个请求卡死,会一直阻塞
,超过过期时间,自动删除 过期时间要大于业务执行时间,需要大概评估下;- 此方案直接切的是接口请求层面。
- 过期时间需要大于业务执行时间,否则业务请求 1 进来还在执行中,前端未做遮罩,或者用户跳转页面后再回来做重复请求
2,在业务层面上看,结果依旧是不符合预期的。 - 建议
delKey = false
。即使业务执行完,也不删除 key,强制锁expireTime
的时间。预防 5 的情况发生。 - 实现思路:同一个请求 ip 和接口,相同参数的请求,在 expireTime 内多次请求,只允许成功一次。
- 页面做遮罩,数据库层面的唯一索引,先查询再添加,等处理方式应该都处理下。
- 此设计只用于幂等,不用于锁,100 个并发这种压测,会出现问题,在这种场景下也没有意义,实际中用户也不会出现 1s 或者 3s内手动发送了 50 个或者 100 个重复请求,或者弱网下有 100 个重复请求;
四、实现代码
1、自定义注解
自定义注解Idempotent
package com.sk.idempotenttest.annotation;
import java.lang.annotation.*;
import java.util.concurrent.TimeUnit;
/**
* @description: Idempotent annotation
*
* @author dylan
*
*/
@Inherited
@Target(ElementType.METHOD)
@Retention(value = RetentionPolicy.RUNTIME)
public @interface Idempotent {
/**
* 是否做幂等处理
* false:非幂等
* true:幂等
* @return
*/
boolean isIdempotent() default false;
/**
* 有效期
* 默认:1
* 有效期要大于程序执行时间,否则请求还是可能会进来
* @return
*/
int expireTime() default 1;
/**
* 时间单位
* 默认:s
* @return
*/
TimeUnit timeUnit() default TimeUnit.SECONDS;
/**
* 提示信息,可自定义
* @return
*/
String info() default "重复请求,请稍后重试";
/**
* 是否在业务完成后删除key
* true:删除
* false:不删除
* @return
*/
boolean delKey() default false;
}
注解使用:
@Idempotent(isIdempotent = true,expireTime = 10,timeUnit = TimeUnit.SECONDS,info = "请勿重复请求",delKey = false)
@GetMapping("/test")
public String test(){
return "哈哈哈";
}
注解属性说明:
key
: 幂等操作的唯一标识,使用 spring el 表达式 用#来引用方法参数 。 可为空则取当前url + args
做请求的唯一标识expireTime
: 有效期默认:1
有效期要大于程序执行时间,否则请求还是可能会进来timeUnit
: 时间单位 默认:s (秒)
info
: 幂等失败提示信息,可自定义delKey
: 是否在业务完成后删除 key true:删除 false:不删除
2、切面实现
package com.sk.idempotenttest.aspect;
import com.sk.idempotenttest.annotation.Idempotent;
import com.sk.idempotenttest.exception.IdempotentException;
import com.sk.idempotenttest.utils.ServerTool;
import lombok.RequiredArgsConstructor;
import lombok.extern.slf4j.Slf4j;
import org.aspectj.lang.JoinPoint;
import org.aspectj.lang.annotation.After;
import org.aspectj.lang.annotation.Aspect;
import org.aspectj.lang.annotation.Before;
import org.aspectj.lang.annotation.Pointcut;
import org.aspectj.lang.reflect.MethodSignature;
import org.redisson.Redisson;
import org.redisson.api.RMapCache;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.stereotype.Component;
import org.springframework.util.CollectionUtils;
import org.springframework.web.context.request.RequestContextHolder;
import org.springframework.web.context.request.ServletRequestAttributes;
import javax.servlet.http.HttpServletRequest;
import java.lang.reflect.Method;
import java.time.LocalDateTime;
import java.util.Arrays;
import java.util.HashMap;
import java.util.Map;
import java.util.concurrent.TimeUnit;
/**
* @description: The Idempotent Aspect
*
* @author dylan
*
*/
@Slf4j
@Aspect
@Component
@RequiredArgsConstructor
public class IdempotentAspect {
private ThreadLocal<Map<String,Object>> threadLocal = new ThreadLocal();
private static final String RMAPCACHE_KEY = "idempotent";
private static final String KEY = "key";
private static final String DELKEY = "delKey";
private final Redisson redisson;
@Pointcut("@annotation(com.sk.idempotenttest.annotation.Idempotent)")
public void pointCut(){}
@Before("pointCut()")
public void beforePointCut(JoinPoint joinPoint)throws Exception{
ServletRequestAttributes requestAttributes =
(ServletRequestAttributes) RequestContextHolder.getRequestAttributes();
HttpServletRequest request = requestAttributes.getRequest();
MethodSignature signature = (MethodSignature)joinPoint.getSignature();
Method method = signature.getMethod();
if(!method.isAnnotationPresent(Idempotent.class)){
return;
}
Idempotent idempotent = method.getAnnotation(Idempotent.class);
boolean isIdempotent = idempotent.isIdempotent();
if(!isIdempotent){
return;
}
String ip = ServerTool.getIpAddress(request);
String url = request.getRequestURL().toString();
String argString = Arrays.asList(joinPoint.getArgs()).toString();
String key = ip+ url + argString; //key主要用于对请求者,请求客户端的请求频次做限制,也可以使用token作为key
long expireTime = idempotent.expireTime();
String info = idempotent.info();
TimeUnit timeUnit = idempotent.timeUnit();
boolean delKey = idempotent.delKey();
//do not need check null
RMapCache<String, Object> rMapCache = redisson.getMapCache(RMAPCACHE_KEY);
String value = LocalDateTime.now().toString().replace("T", " ");
Object v1;
if (null != rMapCache.get(key)){
//had stored
throw new IdempotentException(info);
}
synchronized (this){
v1 = rMapCache.putIfAbsent(key, value, expireTime, TimeUnit.SECONDS);
if(null != v1){
throw new IdempotentException(info);
}else {
log.info("[idempotent]:has stored key={},value={},expireTime={}{},now={}",key,value,expireTime,timeUnit,LocalDateTime.now().toString());
}
}
Map<String, Object> map =
CollectionUtils.isEmpty(threadLocal.get()) ? new HashMap<>(4):threadLocal.get();
map.put(KEY,key);
map.put(DELKEY,delKey);
threadLocal.set(map);
}
@After("pointCut()")
public void afterPointCut(JoinPoint joinPoint){
Map<String,Object> map = threadLocal.get();
if(CollectionUtils.isEmpty(map)){
return;
}
RMapCache<Object, Object> mapCache = redisson.getMapCache(RMAPCACHE_KEY);
if(mapCache.size() == 0){
return;
}
String key = map.get(KEY).toString();
boolean delKey = (boolean)map.get(DELKEY);
if(delKey){
mapCache.fastRemove(key);
log.info("[idempotent]:has removed key={}",key);
}
threadLocal.remove();
}
}
3、全局异常处理
GlobalExceptionHandler
类
package com.sk.idempotenttest.exception;
import lombok.extern.slf4j.Slf4j;
import org.springframework.http.HttpStatus;
import org.springframework.web.bind.annotation.ControllerAdvice;
import org.springframework.web.bind.annotation.ExceptionHandler;
import org.springframework.web.bind.annotation.ResponseBody;
import org.springframework.web.bind.annotation.ResponseStatus;
@Slf4j
@ControllerAdvice
@ResponseBody
public class GlobalExceptionHandler {
@ExceptionHandler(IdempotentException.class)
@ResponseStatus(value= HttpStatus.BAD_REQUEST)
public JsonResult handleHttpMessageNotReadableException(IdempotentException ex){
log.error("请求异常,{}",ex.getMessage());
return new JsonResult("400",ex.getMessage());
}
}
返回值JsonResult
类
package com.sk.idempotenttest.exception;
import lombok.AllArgsConstructor;
import lombok.Data;
import lombok.NoArgsConstructor;
@Data
@NoArgsConstructor
@AllArgsConstructor
public class JsonResult {
private String code;//状态码
private String msg;//请求信息
}
4、redis存储请求信息
RedissonConfig
类
package com.sk.idempotenttest.config;
import org.redisson.Redisson;
import org.redisson.config.Config;
import org.springframework.beans.factory.annotation.Value;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
/**
* @description: Redisson配置类
*
* @author ITyunqing
* @since 1.0.0
*/
@Configuration
public class RedissonConfig {
@Value("${singleServerConfig.address}")
private String address;
@Value("${singleServerConfig.password}")
private String password;
@Value("${singleServerConfig.pingTimeout}")
private int pingTimeout;
@Value("${singleServerConfig.connectTimeout}")
private int connectTimeout;
@Value("${singleServerConfig.timeout}")
private int timeout;
@Value("${singleServerConfig.idleConnectionTimeout}")
private int idleConnectionTimeout;
@Value("${singleServerConfig.retryAttempts}")
private int retryAttempts;
@Value("${singleServerConfig.retryInterval}")
private int retryInterval;
@Value("${singleServerConfig.reconnectionTimeout}")
private int reconnectionTimeout;
@Value("${singleServerConfig.failedAttempts}")
private int failedAttempts;
@Value("${singleServerConfig.subscriptionsPerConnection}")
private int subscriptionsPerConnection;
@Value("${singleServerConfig.subscriptionConnectionMinimumIdleSize}")
private int subscriptionConnectionMinimumIdleSize;
@Value("${singleServerConfig.subscriptionConnectionPoolSize}")
private int subscriptionConnectionPoolSize;
@Value("${singleServerConfig.connectionMinimumIdleSize}")
private int connectionMinimumIdleSize;
@Value("${singleServerConfig.connectionPoolSize}")
private int connectionPoolSize;
@Bean(destroyMethod = "shutdown")
public Redisson redisson() {
Config config = new Config();
config.useSingleServer().setAddress(address)
.setPassword(password)
.setIdleConnectionTimeout(idleConnectionTimeout)
.setConnectTimeout(connectTimeout)
.setTimeout(timeout)
.setRetryAttempts(retryAttempts)
.setRetryInterval(retryInterval)
.setReconnectionTimeout(reconnectionTimeout)
.setPingTimeout(pingTimeout)
.setFailedAttempts(failedAttempts)
.setSubscriptionsPerConnection(subscriptionsPerConnection)
.setSubscriptionConnectionMinimumIdleSize(subscriptionConnectionMinimumIdleSize)
.setSubscriptionConnectionPoolSize(subscriptionConnectionPoolSize)
.setConnectionMinimumIdleSize(connectionMinimumIdleSize)
.setConnectionPoolSize(connectionPoolSize);
return (Redisson) Redisson.create(config);
}
}
5、pom.xml
<!--aop-->
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-aop</artifactId>
</dependency>
<!--redisson-->
<dependency>
<groupId>org.redisson</groupId>
<artifactId>redisson</artifactId>
<version>3.5.4</version>
</dependency>
五、总结
幂等性
不但可以保证程序正常执行,还可以杜绝一些垃圾数据以及无效请求对系统资源的消耗。推荐使用分布式锁
来实现,这样的解决方案更加通用。关于分布式锁的总结,后续再做介绍。
建议:
- 对 redis 中是否存在 token 以及删除的代码逻辑建议用
Lua 脚本
实现,保证原子性 - 全局唯一 ID 可以用
百度的 uid-generator
、美团的 Leaf
去生成
==如果文章对你有帮助,请帮忙点赞、收藏和给个评价=