题目
最近要参加一个秒杀商品系统比赛
【题目】设计并演示一款商品秒杀系统
【要求】设计并实现程序,模拟该商品秒杀系统的基本功能包括但不限于:
1.商品管理:每个商品都有唯一的ID、名称、库存数量和秒杀价格。
2.用户管理:每个用户都有唯一的ID、用户名和账户余额。
3.秒杀活动处理:在指定的时间段内,允许用户购买特定的商品且只能购买一次。
在处理购买请求时,需要考虑实现以下情况:
① 商品是否存在;
② 商品是否还有库存;
③ 用户账户余额是否足够支付商品价格;
④ 用户是否已经购买过商品;
⑤ 是否在秒杀活动的时间段内。
4.秒杀结果返回:返回用户购买结果,购买结果应包括成功或失败的状态以及相应的提示信息。
5.并发处理和性能优化:由于秒杀活动可能会引发大量用户同时购买同一商品的情况,
要求程序能够在高并发环境下正确处理购买请求,避免超卖和重复购买的问题。
请考虑并实现适当的并发控制措施。
限制:压测环境以 2C4U 环境,数据库为2C4U。
只让部署java程序 ,不让使用redis等工具
ps:评比 请求量\响应时间\QPS\CPU利用率\订单生成量\库存余额扣除 等。。
系统压测结果,比赛用的服务器压测所以比我们文章最后的自己本机服务器压测要好很多
5分钟
请求量400万次
响应时间50ms
成功率100%
cpu利用率60%
订单生成量:100% (因为我们余额有限,其实在一分钟10w订单已经生成了,后面都是余额不足)
文章最后也附加了我自己压测10分钟因为是本机订单量在10分钟300w条和qps报告。
1.思路
看完题目之后,也就是我们只能写一份程序部署到2C4U服务器,这个时候我脑子的第一方案是
1.请求入口肯定要限流,在不使用网关或者nginx的情况下,我考虑了三个工具去做Spring Cloud Gateway/RequestRateLimiter/bucket4j。
2.接口是重复提交限制机制(基于IP/用户参数/用户设备等条件)。
3.接口参数校验。
4.接口代码实现,我第一反应肯定是异步去下单/扣减余额/扣减库存/生成订单
5.操作数据库去更新余额\库存肯定要根据扣减之前的数字 乐观锁版本锁保证安全。
(但是异步下单,扣减库存余额 不能重复购买肯定要放到缓存去判断,如何保证安全性?)
6.异步下单,异步接口如何保证安全性,如何更快生成订单/扣减库存。
考虑到我们既要扣减库存\也要扣减余额,其实和现实生活中的先扣库存,
在去付款两个步骤不一样,我们是要同时扣除生成订单,所以用策略分通道执行,
保证同一个userid或者goodid 在不同的通道 这样不会产生多线程下不安全的问题 就行不通了。
所以只能采取单线程。
7.也经过我大量压测和打印发现,最耗时的是修改用户表的余额和商品表的库存、插入订单最耗时。
2.最终方案:
因为我们的比赛,只有一场秒杀,所以我们写代码基于一场设计的,
就不需要说考虑多场,包括添加秒杀商品等操作,而且因为要看订单量最后,
包括对与普通商品可以重复下单,秒杀商品只能购买一次等因素,
所以在入口我取消了重复校验机制。这里说明一下,不是没考虑,
是为了贴近比赛,大家可以参考我这个设计思想。
- 入口限流:固定令牌桶(固定令牌500/每秒生成3000)差不多每秒最多有3000请求。
- 接口参数非空校验
- 接口采取基于内存的校验扣减库存、余额、重复购买等操作和判断AtomicInteger去扣减,保证一定性的安全
private Map<Integer, AtomicInteger> goodsStockMap = new ConcurrentHashMap<>();
private Map<Integer, AtomicReference> userBalanceMap = new ConcurrentHashMap<>(); - 采取异步下单,下单存入队列BlockingQueue。
- 判断队列数量,根据数量让单线程去执行一条/还是批量取数据。
- 批量操作的话,更新余额\库存\订单,采取批量更新、批量插入。保证事物很重要。
- 失败机制,如果处理失败判断异常,如果非余额不足、库存不足等问题,则重新假如队列,重试机制。(因为是比赛考虑成功率大,这里重试了一次)
- 部署考虑2C4U的服务器,指定内存-Xms2800m -Xmx2800m.
- 压测在不同的服务器,发现如果请求响应快,导致会重新进行下一轮线程继续请求,大量请求导致CPU升高,导致读写降低,在接口入口处,加上限制,判断未处理队列数量,大于多少,则返回友情提示,其实采取批处理,速度很快,3w/1s的处理量,如果我们CPU升至99%,我们只需要挡掉1s请求,给1s让队列处理数据,处理之后,就可以继续接受请求,保证系统无论如何压,不会崩溃。
上面是简单的设计思路。下面我们开始编码和为什么选取这样的方式:
创建工程
- 创建SpringBoot工程
- 做一些基本配置
- 全局异常处理和返回类型封装
- 错误异常
public enum ResponseCode {
UNKNOWN(-1, "未知错误"),
SUCCESS(20000, "成功"),
FAILURE(20002, "失败"),
PARAM_IS_BLANK(10002, "参数为空"),
USER_LOGIN_ERROR(20013, "用户不存在"),
GOODS_ERROR(20015, "商品不存在"),
SYSTEM_ERROR(40000, "系统繁忙,请稍后重试!"),
SYSTEM_INNER_ERROR(40001, "系统繁忙,请稍后重试"),
REPEATED_ERROR(40002, "请勿频繁点击提交订单!"),
ACTIVE_DATA_NONE(50003, "活动不存在"),
ACTIVE_START(50004, "秒杀未开始"),
ACTIVE_END(50005, "秒杀已结束"),
NOT_ENOUGH_STOCK(50006, "商品库存不足"),
NOT_ENOUGH_STOCK_MSG(50007, "商品库存扣减失败"),
NOT_ENOUGH_BALANCE(50008, "账户余额不足"),
NOT_ENOUGH_BALANCE_MSG(50009, "账户余额扣减失败"),
REPEAT_ORDER(50010, "您已购买此商品,请勿重复下单"),
REPEAT_GOODS(50011, "系统繁忙,请重新提交!"),
DATA_IS_WRONG(50002, "数据有误"),
INTERFACE_EXCEED_LOAD(60006, "接口负载过高"),
PERMISSION_NO_ACCESS(70001, "无访问权限");
private Integer code;
private String message;
private ResponseCode(Integer code, String message) {
this.code = code;
this.message = message;
}
public Integer code() {
return this.code;
}
public String message() {
return this.message;
}
public static String getMessage(String name) {
ResponseCode[] var1 = values();
int var2 = var1.length;
for(int var3 = 0; var3 < var2; ++var3) {
ResponseCode item = var1[var3];
if (item.name().equals(name)) {
return item.message;
}
}
return name;
}
public static Integer getCode(String name) {
ResponseCode[] var1 = values();
int var2 = var1.length;
for(int var3 = 0; var3 < var2; ++var3) {
ResponseCode item = var1[var3];
if (item.name().equals(name)) {
return item.code;
}
}
return null;
}
public String toString() {
return this.name();
}
}
入口限流
-
Spring Cloud Gateway:它是一个基于Spring Boot 2.x的API网关,用于提供统一的路由访问,以及非功能性需求的处理,如安全性、监控、限流等。其优点包括:
与Spring生态系统的无缝集成。
提供了丰富的路由规则配置。
可以进行动态路由配置,支持热部署。
提供了全局过滤器和路由过滤器,可以进行全局的请求处理。 -
RequestRateLimiter:这是Spring Cloud Gateway中的一个过滤器,用于限制客户端的请求速率,防止服务因过多请求而过载。其优点包括:
可以根据需要动态配置限流规则。
可以防止服务过载,保证服务的稳定性和可用性。 -
Bucket4j:这是一个基于令牌桶算法的Java库,用于实现强大的限流功能。其优点包括:
提供了灵活的API,可以根据需要定制限流策略。
采用了高效的令牌桶算法,可以进行精确的速率限制。
支持分布式环境,可以在微服务架构中使用。
提供了丰富的扩展点,可以根据需要进行扩展。
我们这里最后选取了Bucket4j,因为我们服务器2C4U,而且内存还得加载一些缓存,所以就不考虑上面两种,也想固定令牌桶所以选取了这个。代码如下 就很简单:
@Configuration
public class BucketConfiguration {
@Bean
public Bucket bucket() {
Refill refill = Refill.greedy(2500, Duration.ofSeconds(1)); //每小时添加1000个令牌
Bandwidth limit = Bandwidth.classic(500, refill); //桶的容量为1000个令牌
return Bucket4j.builder()
.addLimit(limit)
.build(); //构建Bucket实例
}
}
因为我们中间说,没有秒杀按照普通价格购买,可以购买商品,所以把秒杀设置注释掉了。
@Component
public class BucketInterceptor implements HandlerInterceptor {
@Autowired
private Bucket bucket;
private static final String BUSY_MSG = JSONObject.toJSONString(new ResponseResult(ResponseCode.SYSTEM_INNER_ERROR, ResponseCode.SYSTEM_INNER_ERROR.message()));
// private static final String REPEATED_MSG = JSONObject.toJSONString(new ResponseResult(ResponseCode.REPEATED_ERROR, ResponseCode.REPEATED_ERROR.message()));
// private static final String ACTIVE_NO_MSG = JSONObject.toJSONString(new ResponseResult(ResponseCode.ACTIVE_DATA_NONE, ResponseCode.ACTIVE_DATA_NONE.message()));
// private static final String ACTIVE_START = JSONObject.toJSONString(new ResponseResult(ResponseCode.ACTIVE_START, ResponseCode.ACTIVE_START.message()));
// private static final String ACTIVE_END = JSONObject.toJSONString(new ResponseResult(ResponseCode.ACTIVE_END, ResponseCode.ACTIVE_END.message()));
// private static final String PATH = "/seckill/order/buy";
// private static final Integer ACTIVE_ID= 1;
// private final Cache<String, String> cache = CacheBuilder.newBuilder()
// .expireAfterWrite(2, TimeUnit.SECONDS)
// .build();
public BucketInterceptor() {
}
@Override
public boolean preHandle(HttpServletRequest request, HttpServletResponse response, Object handler) throws Exception {
// long remainingTokens = bucket.getAvailableTokens();
// System.out.println("Remaining tokens: " + remainingTokens);
if (bucket.tryConsume(1)) {
// String requestURI = request.getRequestURI();
// if (PATH.equals(requestURI)) {
// //可以优化的获取 TODO
// Active details = activeService.details(ACTIVE_ID);
// if (Objects.isNull(details)) {
// responseMsg(response, ACTIVE_NO_MSG);
// return false;
// } else {
// if (LocalDateTime.now().isBefore(details.getStartTime())) {
// responseMsg(response, ACTIVE_START);
// return false;
// }
// if (LocalDateTime.now().isAfter(details.getEndTime())) {
// responseMsg(response, ACTIVE_END);
// return false;
// }
// String key = request.getParameter("userId")+request.getParameter("goodsId");
// if (cache.getIfPresent(key) != null) {
// // 在4S内已经提交过,所以不接受这次请求
// responseMsg(response, REPEATED_MSG);
// return false;
// } else {
// // 没有提交过或者已经过了4S,所以接受这次请求,并在缓存中记录这次提交
// cache.put(key, "");
// return true;
// }
// }
// }
return true;
} else {
responseMsg(response, BUSY_MSG);
return false;
}
}
private void responseMsg(HttpServletResponse response, String msg) throws IOException {
response.setContentType("application/json");
response.setCharacterEncoding("utf-8");
response.getWriter().print(msg);
}
2. 接口参数非空校验
本来是get请求,但是因为要用JMeter压测,测试规定改成了POST请求。
3. 接口内余额库存判断和预扣减
private Map<Integer, BigDecimal> priceMap = new ConcurrentHashMap<>();
private Map<Integer, String> isKillMap = new ConcurrentHashMap<>();
private Map<Integer, AtomicInteger> goodsStockMap = new ConcurrentHashMap<>();
private Map<Integer, AtomicReference<BigDecimal>> userBalanceMap = new ConcurrentHashMap<>();
private Map<String, String> purchasedMap = new ConcurrentHashMap<>();
private Active active = null;
private final int isKill = 0;
private final Integer activeId = 1;
private final int status = 0;
public static final ResponseResult NOT_ENOUGH_STOCK = new ResponseResult(ResponseCode.NOT_ENOUGH_STOCK);
public static final ResponseResult NOT_ENOUGH_BALANCE = new ResponseResult(ResponseCode.NOT_ENOUGH_BALANCE);
public static final ResponseResult USER_LOGIN_ERROR = new ResponseResult(ResponseCode.USER_LOGIN_ERROR);
public static final ResponseResult GOODS_ERROR = new ResponseResult(ResponseCode.GOODS_ERROR);
public static final ResponseResult REPEAT_ORDER = new ResponseResult(ResponseCode.REPEAT_ORDER);
public static final ResponseResult SUCCESS = new ResponseResult(ResponseCode.SUCCESS);
private Set<Integer> emptySet = new ConcurrentHashSet<>();
@Override
public ResponseResult<String> buy(Integer goodsId, Integer userId) throws SeckillException {
//重复提交 和 活动是否开始了 在拦截器判断
//商品是否存在
if (!goodsStockMap.containsKey(goodsId)) {
return GOODS_ERROR;
}
//用户是否存在
if (!userBalanceMap.containsKey(userId)) {
return USER_LOGIN_ERROR;
}
if (emptySet.contains(goodsId)){
return NOT_ENOUGH_STOCK;
}
//判断是否已经秒杀到了 且开启 限制用户请求频率防止并发重复下单 缓存判断
Boolean flag = false;
if (active != null) {
if (LocalDateTime.now().isAfter(active.getStartTime()) && LocalDateTime.now().isBefore(active.getEndTime()) && status == active.getStatus()) {
if (isKillMap.containsKey(goodsId)) {
flag = true;
}
}
}
//如果是秒杀商品且在秒杀活动期间 判断是否超卖
String key = "";
if (flag) {
key = userId + "-" + goodsId;
if (purchasedMap.containsKey(key)) {
return REPEAT_ORDER;
}
}
//商品是否售罄 缓存判断 有可能漏网之鱼
//余额是否充足 ??????????????????????==========漏网之鱼
//预减库存 加锁库存减扣==========
//预减余额 加锁余额减减扣========== 如果不足 就库存 +1
AtomicInteger stockAtomic = goodsStockMap.get(goodsId);
int stock = stockAtomic.get();
if (stock <= 0) {
emptySet.add(goodsId);
return NOT_ENOUGH_STOCK;
}
stockAtomic.decrementAndGet();
// if (!stockAtomic.compareAndSet(stock, stock - 1)) { 这种方式安全 但是容易遗漏请求
// return REPEAT_GOODS;
// }
//因为有做同一个用户短期无法重复下单 所以这个地方减少了并发 但是还是不安全 所以真正下单时候才是真正的扣减(减少但是不保障)
BigDecimal price;
//判断是不是秒杀商品
if (flag) {
price = active.getSeckillPrice();
} else {
price = priceMap.get(goodsId);
}
AtomicReference<BigDecimal> userAtomic = userBalanceMap.get(userId);
if (userAtomic.get().compareTo(price) < 0) {
stockAtomic.incrementAndGet();
return NOT_ENOUGH_BALANCE;
}
BigDecimal finalPrice = price;
userAtomic.updateAndGet(balance -> balance.subtract(finalPrice));
//缓存添加已下单 ==============purchasedMap
if (flag) {
purchasedMap.put(key, "");
}
//异步下单 失败加库存 如果是库存已不足 则失败就行 缓存+库存 余额+余额 purchasedMap删除已下单
if (flag) {
asyncOrderService.doAsyncOrder(userId, goodsId, activeId, price,LocalDateTime.now());
} else {
asyncOrderService.doAsyncOrder(userId, goodsId, null, price,LocalDateTime.now());
}
//返回信息封装
return SUCCESS;
}
上面代码,其实我们余额和库存都是基于ConcurrentHashMap去操作的,复杂度O1,保证了速度和安全,AtomicInteger利用原子锁去扣减。 当然代码没贴全,我们在添加商品用户时候要往map去增删改查,包括项目启动我们要把数据库加载到内存中
唯一有问题的是我们的扣减方式举个例子:
// if (!stockAtomic.compareAndSet(stock, stock - 1)) { 这种方式安全 但是容易遗漏请求
// return REPEAT_GOODS;
// }
这个其实是正确的扣减方式,安全可靠,但是高并发成功率有点低,所以我没考虑,余额也是一样的道理,我这里虽然有可能高并发情况下,余额扣减同一时间点,A扣了1 另一个线程A也扣了1,这样最多会造成我们缓存的库存超卖,但是我们在异步队列去处理下单时候会再次判断,而且这种扣减方式不会错太多 ,差在1~5之间可以忽略,我们是有二次校验的,而且校验之后,我们会把缓存库存置0。
4. 采取异步下单 下单存入队列BlockingQueue。
//异步下单 失败加库存 如果是库存已不足 则失败就行 缓存+库存 余额+余额 purchasedMap删除已下单
if (flag) {
asyncOrderService.doAsyncOrder(userId, goodsId, activeId, price,LocalDateTime.now());
} else {
asyncOrderService.doAsyncOrder(userId, goodsId, null, price,LocalDateTime.now());
}
我这里主要是是为了区分是否秒杀商品 因为我分表了。把秒杀订单和普通订单分开是因为,我们秒杀订单,判断是否已经购买,肯定要基于userid和goodid去查询,而数据量越大查询越慢,所以分开了。
private BlockingQueue<Orders> queue = new LinkedBlockingQueue<>();
public int queueSize(){
return queue.size();
}
@Async
public void doAsyncOrder(Integer userId, Integer goodsId, Integer activeId, BigDecimal price, LocalDateTime dateTime) {
try {
Orders orders = new Orders(userId, goodsId, price, activeId,dateTime);
//放入队列
queue.offer(orders);
if (flag) {
start();
}
} catch (Exception e) {
throw new RuntimeException(e);
}
}
5. 判断队列数量,根据数量让单线程去执行一条/还是批量取数据。处理事务保证回滚
@PostConstruct
public void start() {
flag = false;
new Thread(() -> {
while (true) {
try {
if (queue.size() > maxSize) {
List<Orders> array = new ArrayList<>();
for (int i = 0; i < maxListSize; i++) {
Orders orders = queue.take();
array.add(orders);
if (Objects.isNull(queue.peek())) {
break;
}
}
CreateOrderArray(array);
} else {
Orders orders = queue.take();
if (Objects.nonNull(orders)) {
CreateOrder(orders);
}
}
} catch (Exception e) {
flag = true;
e.printStackTrace();
}
}
}).start();
}
这里代码太多我就不一一粘贴了 ,粘贴一些核心处理 具体代码 我会把工程上传到我的资源,大家可以去下载。
这里主要是是先批量统计用户和订单,把用户修改前的余额和修改后的余额存起来用于修改,库存也是相同逻辑。
核心主要是批量修改和批量插入
//多线程修改 只修改改变的值 //TODO可以修改成批量修改
if (CollUtil.isNotEmpty(updateGoodsSet)){
List<UpdateGoodsDto> updateGoods = updateGoodsSet.stream().map(id -> {
Goods goods = goodsMap.get(id);
if (Objects.isNull(goods)){
return null;
}
return new UpdateGoodsDto(id, goods.getStock(), stockMap.get(id), goods.getSold());
}).filter(Objects::nonNull).collect(Collectors.toList());
if (CollUtil.isNotEmpty(updateGoods)){
List<List<UpdateGoodsDto>> partitions = Lists.partition(updateGoods, insertSize);
for (List<UpdateGoodsDto> updateBatch : partitions) {
goodsMapper.updateBatch(updateBatch);
}
}
}
// updateGoodsSet.parallelStream().forEach(id -> {
// Goods goods = goodsMap.get(id);
// goodsMapper.updateStock(id, stockMap.get(id), goods.getStock(), goods.getSold());
// });
if (CollUtil.isNotEmpty(updateUserSet)){
List<UpdateUserDto> updateUser = updateUserSet.stream().map(id -> {
User user = userMap.get(id);
if (Objects.isNull(user)){
return null;
}
return new UpdateUserDto(id, balanceMap.get(id),user.getBalance());
}).filter(Objects::nonNull).collect(Collectors.toList());
if (CollUtil.isNotEmpty(updateUser)){
List<List<UpdateUserDto>> partitions = Lists.partition(updateUser, insertSize);
for (List<UpdateUserDto> updateBatch : partitions) {
userMapper.updateBatch(updateBatch);
}
}
}
// updateUserSet.parallelStream().forEach(id -> {
// User user = userMap.get(id);
// userMapper.updateBalance(id, balanceMap.get(id), user.getBalance());
// });
//批量插入多少条性能最优 TODO
if (CollUtil.isNotEmpty(ordersList)) {
List<List<Orders>> partitions = Lists.partition(ordersList, insertSize);
for (List<Orders> addList : partitions) {
ordersMapper.addBatch(addList);
}
// partitions.parallelStream().forEach(addList->{
// ordersMapper.addBatch(addList);
// });
}
if (CollUtil.isNotEmpty(seckillOrdersList)) {
List<List<SeckillOrders>> partitions = Lists.partition(seckillOrdersList, insertSize);
for (List<SeckillOrders> addList : partitions) {
seckillOrdersMapper.addBatch(addList);
}
// partitions.parallelStream().forEach(addList->{
// seckillOrdersMapper.addBatch(addList);
// });
}
transactionManager.commit(transaction);
这里我发现如果用多线程去批量插入,事物会失效,考虑到数据库插入和批量修改性能,我这里采取了分组批量插入
值得一提的是要在数据库的连接上加上&allowMultiQueries=true支持开启批量修改
批量之后我们考虑上面一些的扣除的回滚缓存
7. 失败机制,如果处理失败判断异常,如果非余额不足、库存不足等问题,则重新假如队列,重试机制。(因为是比赛考虑成功率大,这里重试了一次)
8 部署到2C4U的服务器
因为是比赛只能要求是这一台服务器,所以就不按照docker\jenkins等工具去方便部署了,之间采取了上传服务器java -jar的方式启动,指定运行内存。想学习docker和jenkins包括k8s小伙伴可以看我其他博客。
本次常用的命令
scp -r D:\xxxxxxxt\seckill\target\seckill.jar root@172.18.7.42:/usr/local/seckill
cd /usr/local/seckil
nohup java -jar -Xms2800m -Xmx2800m -Dspring.profiles.active=prod seckill.jar &
ps aux | grep seckill.jar
netstat -tunlp | grep 8080
kill -9
sudo reboot
9. 防止cpu百分之百读写满限制
在控制器层加上队列数量校验,当cpu上去,挡住1s,给队列处理数据,可以忽略不记,也可以防止内存爆掉。
10.看一下我们的压测结果
我发现如果库存和余额充足,我们插入订单和修改用户余额库存,每分钟可以处理30w数据。本机压测,本机笔记本性能没服务器好。
小结
令牌桶 + 异步队列下单 +队列BlockingQueue保证有序+
单线程消费队列保证安全 + 批量从队列取数据+批量操作=减少了几张表更新的次数
批量更新(where条件是id和老的余额或者库存)+批量插入订单(考虑了性能一次批量五百条)
+事物控制如果失败保证回滚+失败重试机制(这里考虑比赛时间短成功率高 所以直接就一次采取了flag重试一次)
+另外部署内存4G考虑指定2.8+
有可能请求太多造成CPU高考虑在接口层加了 队列数量限制。