基于Redis的分布式设备库存服务设计与实现
概述
本文介绍一个基于Redis实现的分布式设备库存服务方案,通过分布式锁、重试机制和事务补偿等关键技术,保证在并发场景下库存操作的原子性和一致性。该方案适用于物联网设备管理、分布式资源调度等场景。
代码实现
import java.util.HashMap;
import java.util.Map;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import redis.clients.jedis.Jedis;
import redis.clients.jedis.params.SetParams;
// 模拟设备库存服务
public class DeviceInventoryService {
private static final Logger logger = LoggerFactory.getLogger(DeviceInventoryService.class);
private final Map<String, Integer> inventoryMap = new HashMap<>();
private static final int MAX_RETRIES = 3;
private static final int LOCK_EXPIRE_TIME = 10; // 锁的过期时间,单位:秒
private final Jedis jedis;
public DeviceInventoryService(Jedis jedis) {
this.jedis = jedis;
}
// 初始化库存
public void initializeInventory(String deviceId, int quantity) {
inventoryMap.put(deviceId, quantity);
logger.info("设备 {} 初始化库存为 {}", deviceId, quantity);
}
// 尝试获取分布式锁
private boolean tryLock(String lockKey) {
SetParams setParams = SetParams.setParams().nx().ex(LOCK_EXPIRE_TIME);
String result = jedis.set(lockKey, "locked", setParams);
return "OK".equals(result);
}
// 释放分布式锁
private void releaseLock(String lockKey) {
jedis.del(lockKey);
}
// 定时更新库存
public boolean updateInventory(String deviceId, int updateQuantity) {
String lockKey = "inventory_lock:" + deviceId;
int retries = 0;
//重试次数
while (retries < MAX_RETRIES) {
if (tryLock(lockKey)) {
try {
return doUpdateInventory(deviceId, updateQuantity);
} catch (Exception e) {
logger.error("设备 {} 库存更新失败,重试第 {} 次", deviceId, retries + 1, e);
} finally {
releaseLock(lockKey);
}
}
retries++;
try {
Thread.sleep(100); // 等待一段时间后重试
} catch (InterruptedException e) {
Thread.currentThread().interrupt();
}
}
logger.error("设备 {} 库存更新失败,达到最大重试次数", deviceId);
return false;
}
// 实际执行库存更新操作
private boolean doUpdateInventory(String deviceId, int updateQuantity) {
int oldQuantity = inventoryMap.getOrDefault(deviceId, 0);
try {
// 记录操作日志
logger.info("设备 {} 开始更新库存,更新前库存: {}", deviceId, oldQuantity);
// 模拟更新操作
int newQuantity = oldQuantity + updateQuantity;
if (newQuantity < 0) {
throw new IllegalArgumentException("库存不能为负数");
}
inventoryMap.put(deviceId, newQuantity);
logger.info("设备 {} 库存更新成功,当前库存: {}", deviceId, newQuantity);
return true;
} catch (Exception e) {
logger.error("设备 {} 库存更新失败: {}", deviceId, e.getMessage());
// 进行事务补偿
compensateInventory(deviceId, oldQuantity);
return false;
}
}
// 事务补偿
private void compensateInventory(String deviceId, int oldQuantity) {
inventoryMap.put(deviceId, oldQuantity);
logger.info("设备 {} 库存已恢复到更新前的状态,当前库存: {}", deviceId, oldQuantity);
}
// 模拟定时任务
public static void main(String[] args) {
try (Jedis jedis = new Jedis("localhost", 6379)) {
DeviceInventoryService service = new DeviceInventoryService(jedis);
service.initializeInventory("device001", 10);
// 模拟定时更新库存
service.updateInventory("device001", 5);
service.updateInventory("device001", -20); // 模拟更新失败
}
}
}
核心设计
分布式锁机制
private boolean tryLock(String lockKey) {
SetParams setParams = SetParams.setParams().nx().ex(LOCK_EXPIRE_TIME);
String result = jedis.set(lockKey, "locked", setParams);
return "OK".equals(result);
}
- 使用Redis的set nx ex命令实现原子性加锁
- 将锁的颗粒度设置到了设备上(根据实际业务设置)
- 设置10秒过期时间,防止死锁(根据实际业务设置过期时间)
重试机制
int retries = 0;
//重试次数
while (retries < MAX_RETRIES) {
if (tryLock(lockKey)) {
try {
return doUpdateInventory(deviceId, updateQuantity);
} catch (Exception e) {
logger.error("设备 {} 库存更新失败,重试第 {} 次", deviceId, retries + 1, e);
} finally {
releaseLock(lockKey);
}
}
retries++;
try {
Thread.sleep(100); // 等待一段时间后重试
} catch (InterruptedException e) {
Thread.currentThread().interrupt();
}
}
- 最大重试次数三次(MAX_RETRIES)
- 如果没有获取到锁则等待重试,超过重试次数则终止
补偿机制
private void compensateInventory(String deviceId, int oldQuantity) {
inventoryMap.put(deviceId, oldQuantity);
logger.info("设备 {} 库存已恢复到更新前的状态,当前库存: {}", deviceId, oldQuantity);
}
- 在doUpdateInventory捕获异常后自动回滚
- 基于版本号/快照的恢复机制
- 保证最终数据一致性
关键代码解析
public boolean updateInventory(String deviceId, int updateQuantity) {
String lockKey = "inventory_lock:" + deviceId;
int retries = 0;
while (retries < MAX_RETRIES) {
if (tryLock(lockKey)) {
try {
return doUpdateInventory(deviceId, updateQuantity);
} finally {
releaseLock(lockKey);
}
}
// ...重试逻辑...
}
return false;
}
- 获取设备级别的分布式锁
- 执行库存更新操作
- 无论成功失败都释放锁(finally保证)
- 达到重试上限后返回失败
核心操作方法
private boolean doUpdateInventory(String deviceId, int updateQuantity) {
int oldQuantity = inventoryMap.getOrDefault(deviceId, 0);
int newQuantity = oldQuantity + updateQuantity;
if (newQuantity < 0) {
throw new IllegalArgumentException("库存不能为负数");
}
inventoryMap.put(deviceId, newQuantity);
return true;
}
- 前置校验:库存不能为负数
- 原子性操作:库存增减计算
- 事务性更新:先计算后写入
使用示例
初始化与测试
public static void main(String[] args) {
try (Jedis jedis = new Jedis("localhost", 6379)) {
DeviceInventoryService service = new DeviceInventoryService(jedis);
service.initializeInventory("device001", 10);
service.updateInventory("device001", 5); // 成功:库存15
service.updateInventory("device001", -20); // 失败:触发补偿
}
}
预期输出
INFO - 设备 device001 初始化库存为 10
INFO - 设备 device001 开始更新库存,更新前库存: 10
INFO - 设备 device001 库存更新成功,当前库存: 15
INFO - 设备 device001 开始更新库存,更新前库存: 15
ERROR - 设备 device001 库存更新失败: 库存不能为负数
INFO - 设备 device001 库存已恢复到更新前的状态,当前库存: 15
扩展思考
优化方向
- Redis集群支持:当前为单节点Redis,可升级为Redis Cluster
- 锁续期机制:添加看门狗线程自动续期锁
- 库存持久化:结合数据库实现库存持久化存储
- 监控体系:添加Prometheus监控指标
注意事项
- 网络分区场景下可能出现锁状态不一致
- 库存更新操作应保持幂等性
- Redis连接需要配置合理的超时参数
- 生产环境建议使用Lua脚本保证原子性
通过本文实现的库存服务,在保证线程安全的基础上,能够有效应对分布式环境下的资源竞争问题。实际部署时建议结合具体业务场景进行压力测试和参数调优。