一、背景
上一系列文章,我们说了积分的数模设计及接口设计,接下里,我们将梳理一下具体的代码实现。
使用的语言的java,基本框架是spring-boot,持久化框架则是Jpa。
使用到的技术点有:
- 分布式锁(积分发放和消耗,在分布式场景下,防止网络重试带来的重复请求问题)
- 乐观锁(更新积分账户表和积分订单表)
- 事件驱动机制(积分账户遇有变更,及时通知用户,发送消息提醒)
限于篇幅,我们将分文两篇来讲:
- 积分账户及收支记录
- 积分订单的退款和结算
二、积分的收入
用户能够通过三种途径获得积分,发放的核心逻辑是一致的,但三者的前置校验不同。
所以,我们定义三个不同的方法,各自校验完成,统一调用发放积分的方法。
因为涉及多次操作数据库,需要开启事务,并且修改事务的隔离级别为Isolation.READ_COMMITTED,见下代码。(原因见后文的乐观锁实现)
- 购买虚拟货币/积分
@Lock(name = POINTS_DISTRIBUTE_LOCK_PRE, key = "'buy:orderNo:' + #orderNo")
@Transactional(rollbackFor = Throwable.class, isolation = Isolation.READ_COMMITTED)
public Long grantByBuy(Integer schoolId, Long userId, String pointsType, Integer points,
String orderNo, String remark, String token) {
// 查询是否已经发放过,防止重复
boolean syncSuccess = this.syncPointsOrder(orderNo, schoolId, userId, pointsType, points);
if (!syncSuccess) {
return null;
}
return this.grant(schoolId, userId, PointsChannelEnum.BUY.getCode(), PointsChannelEnum.BUY.getName(),
pointsType, points,
orderNo, null, remark, token);
}
- 手动发放积分
@Lock(name = POINTS_DISTRIBUTE_LOCK_PRE, key = "'artificial:userId:' + #userId")
@Transactional(rollbackFor = Throwable.class, isolation = Isolation.READ_COMMITTED)
public Long grantByArtificial(Integer schoolId, Long userId,
String pointsType, Integer points,
String optUserId, String remark, String token) {
return this.grant(schoolId, userId, PointsChannelEnum.GRANT_BY_HAND.getCode(),
PointsChannelEnum.GRANT_BY_HAND.getName(), pointsType, points,
null, optUserId, remark, token);
}
- 做任务,获得积分奖励
@Lock(name = POINTS_DISTRIBUTE_LOCK_PRE, key = "'grant:userId:' + #userId + ':channelCode:' + #channelCode")
@Transactional(rollbackFor = Throwable.class, isolation = Isolation.READ_COMMITTED)
public Long grantByChannel(Integer schoolId, Long userId,
String channelCode, String pointsType,
String remark, String token) {
// 校验channelCode
PointsChannel pointsChannel = pointsChannelRepository.findByCodeAndPointsType(channelCode, pointsType);
Precondition.notNull(pointsChannel, "积分渠道[%s]未配置", channelCode);
Precondition.isTrue(pointsChannel.getRewardPoints() > 0, "积分渠道配置的积分数必须大于0");
return this.grant(schoolId, userId,
channelCode, pointsChannel.getName(), pointsType, pointsChannel.getRewardPoints(),
null, null, remark, token);
}
1、发放积分
除了校验token不能重复使用外,第一步是增加账户的余额,第二步是保存账户的收支记录,第三步是异步通知操作(提醒用户,积分账户有变更,因为非主流程,所以异步,这里采用事件驱动机制)。
private Long grant(Integer schoolId, Long userId,
String channelCode, String channelName,
String pointsType, Integer rewardPoints,
String orderNo, String optUserId, String remark, String token) {
if (log.isInfoEnabled()) {
log.info("开始发放积分, 入参列表:[schoolId={}, userId={}, channelCode={}, pointsType={}, rewardPoints={}, " +
"orderNo={}, optUserId={}, remark={}, token={}]",
schoolId, userId, channelCode, pointsType, rewardPoints, orderNo, optUserId, remark, token);
}
// 校验token不能重复使用(略)
//1.账户增加余额
PointsAccount pointsAccount = pointsAccountService.findPointsAccount(schoolId, userId, pointsType);
if (null == pointsAccount) {
// 创建账户
pointsAccountService.save(schoolId, userId, pointsType, rewardPoints);
} else {
boolean updateSuccess = this.optimisticUpdateAccount(GRANT_POINTS_ACCOUNT,
pointsAccount.getId(), pointsAccount.getPoints(),
rewardPoints, pointsAccount.getVersion());
if (!updateSuccess) {
if (log.isWarnEnabled()) {
log.warn("发放积分出现错误, [schoolId={}, userId={}, channelCode={}, pointsType={}, rewardPoints={}]",
schoolId, userId, channelCode, pointsType, rewardPoints);
}
Precondition.isTrue(false, "发放积分给用户%d出现错误", userId);
}
}
//2.保存账户变更记录
PointsAccountFlow flow = pointsAccountFlowService.savePointsAccountFlow(FlowTypeEnum.INCREASE,
schoolId, userId, pointsType,
rewardPoints, channelCode, channelName,
orderNo, optUserId, remark);
// 3. 发布异步事件,提醒用户其账户有变更。(略)
if (log.isInfoEnabled()) {
log.info("完成发放积分, [schoolId={}, userId={}, channelCode={}, pointsType={}, rewardPoints={}, " +
"orderNo={}, optUserId={}, remark={}, token={}]",
schoolId, userId, channelCode, pointsType, rewardPoints, orderNo, optUserId, remark, token);
}
return flow.getId();
}
2、乐观锁实现账户的变更
乐观更新账户, 重试N次,如果更新失败,则再次查询DB最新数据。
我们使用的是mysql数据库,其默认隔离级别是可重复读,所以上文需要指定方法的隔离级别是Isolation.READ_COMMITTED,否则在同一个事务中,读取不到其他事务提交的最新数据。
这是关于数据库的隔离级别,第二点,因为我们使用的jpa持久化框架,它有着著名的一级缓存和二级缓存;所以我们需要手动清除其一级缓存。
@Autowired
private EntityManager entityManager;
//清除jpa一级缓存
entityManager.clear();
第三点,我们在update行记录的时候,判断version是否一致。
- optimisticUpdateAccount()
private boolean optimisticUpdateAccount(int optType, long accountId, int points, int thisPoints, long version) {
int time = 0;
boolean success = false;
while (time < MAX_RETRY_TIME) {
int result = 0;
switch (optType) {
// 这两种情况是增加余额
// 发放积分
case GRANT_POINTS_ACCOUNT:
// 回退积分
case ROLLBACK_POINTS_ACCOUNT:
result = pointsAccountService.updateAccountPoints(accountId,
points + thisPoints,
version);
break;
// 这两种情况是减少余额
// 使用积分
case USE_POINTS_ACCOUNT:
// 积分订单的退款
case REFUND_POINTS_ACCOUNT:
result = pointsAccountService.updateAccountPoints(accountId,
points - thisPoints,
version);
break;
default:
break;
}
if (result == 1) {
success = true;
break;
}
//清除jpa一级缓存
entityManager.clear();
try {
Thread.sleep(100);
} catch (InterruptedException e) {
log.error("乐观锁更新账户余额中的sleep出现异常", e);
}
PointsAccount pointsAccount = pointsAccountService.findPointsAccount(accountId);
Precondition.notNull(pointsAccount, "积分账户不存在");
version = pointsAccount.getVersion();
points = pointsAccount.getPoints();
time++;
}
return success;
}
- modifyAccountPoints()
@Modifying
@Query(value = "update PointsAccount set points = :points, version = version + 1, modifiedDate = now() " +
" where id = :id and version = :oldVersion ")
int modifyAccountPoints(@Param("id") long id,
@Param("points") int points,
@Param("oldVersion") long oldVersion);
三、消耗积分
分为四步:
- 1、更新账户的余额,保证此次消耗的积分是小于等于账户的余额
- 2、保存账户变更记录
- 3、发布异步事件,通知用户其账户变更
- 4、更新积分订单表:已使用积分数、可用积分数、可结算积分数
关于积分订单表的更新,见下一篇文章。
@Lock(name = POINTS_DISTRIBUTE_LOCK_PRE, key = "'use:orderNo:' + #orderNo")
@Transactional(rollbackFor = Throwable.class, isolation = Isolation.READ_COMMITTED)
public void use(Integer schoolId, Long userId, String orderNo, String pointsType, Integer points, String remark) {
if (log.isInfoEnabled()) {
log.info("开始消费积分, 入参列表:[schoolId={}, userId={}, orderNo={}, pointsType={}, points={}, remark={}]",
schoolId, userId, orderNo, pointsType, points, remark);
}
//1.更新账户的余额
this.updateAccount(USE_POINTS_ACCOUNT, schoolId, userId, pointsType, points);
//2.保存账户变更记录
pointsAccountFlowService.savePointsAccountFlow(FlowTypeEnum.DECREASE,
schoolId, userId,
pointsType, points,
PointsChannelEnum.USE.getCode(), PointsChannelEnum.USE.getName(),
orderNo, null,
remark);
//3.发布异步事件,通知用户其账户变更(略)
//4.更新积分订单表中的已使用积分数和可用积分数以及可结算积分数
// 根据userId/schoolId/pointsType查询可用的的积分,按时间先后顺序扣减订单的可用积分数
this.updatePointsOrderByUse(schoolId, userId, pointsType, points);
if (log.isInfoEnabled()) {
log.info("完成消费积分, [schoolId={}, userId={}, orderNo={}, pointsType={}, points={}, remark={}]",
schoolId, userId, orderNo, pointsType, points, remark);
}
}
四、积分的回退
当商品的定价是纯积分方式,或者积分+现金的组合方式,这类商品发生退款后,我们需要把用户消耗的积分回退其账户。
所谓积分的回退,相当于给用户再次发放等量的积分。
@Lock(name = POINTS_DISTRIBUTE_LOCK_PRE, key = "'rollback:orderNo:' + #orderNo")
@Transactional(rollbackFor = Throwable.class, isolation = Isolation.READ_COMMITTED)
public void rollback(Integer schoolId, Long userId, String orderNo, String pointsType, Integer points) {
if (log.isInfoEnabled()) {
log.info("开始回退积分, 入参列表:[schoolId={}, userId={}, orderNo={}, pointsType={}, points={}]",
schoolId, userId, orderNo, pointsType, points);
}
// 仅检查账户是否存在
PointsAccount pointsAccount = pointsAccountService.findPointsAccount(schoolId, userId, pointsType);
Precondition.isTrue(null != pointsAccount, "积分账户[%d]不存在", userId);
//1.把扣除的积分回退到用户的账户余额里
boolean updateSuccess = this.optimisticUpdateAccount(ROLLBACK_POINTS_ACCOUNT,
pointsAccount.getId(), pointsAccount.getPoints(),
points, pointsAccount.getVersion());
if (!updateSuccess) {
if (log.isWarnEnabled()) {
log.warn("回退积分出现错误, [schoolId={}, userId={}, orderNo={}, pointsType={}, points={}]",
schoolId, userId, orderNo, pointsType, points);
}
Precondition.isTrue(false, "回退用户[%d]的积分出现错误", userId);
}
//2.保存账户变更记录
pointsAccountFlowService.savePointsAccountFlow(FlowTypeEnum.INCREASE,
schoolId, userId, pointsType,
points, PointsChannelEnum.CANCEL_ORDER.getCode(), PointsChannelEnum.CANCEL_ORDER.getName(),
orderNo, null, "订单号[" + orderNo + "]取消");
//3.发布异步事件,通知用户其账户变更(略)
if (log.isInfoEnabled()) {
log.info("完成回退积分, [schoolId={}, userId={}, orderNo={}]", schoolId, userId, orderNo);
}
}
五、总结
本文详细介绍了积分操作的五个方法,总体的实现逻辑都是更新账户的余额、保存账户的收支记录、最后通知用户其账户余额有变更。
无非是他们的校验逻辑不一样罢了,所以逻辑实现的方法必须复用。
消耗积分和积分的回退,区别有两点:
- 1、是否更新积分订单表
- 2、前者是减少账户的余额,后者是增加账户的余额。
后文,我们将梳理积分订单的实现。