分布式锁实现原理与最佳实践

news2024/11/15 8:38:16

作者:秦泽涛 阿里云教育基座团队

在单体的应用开发场景中涉及并发同步时,大家往往采用Synchronized(同步)或同一个JVM内Lock机制来解决多线程间的同步问题。而在分布式集群工作的开发场景中,就需要一种更加高级的锁机制来处理跨机器的进程之间的数据同步问题,这种跨机器的锁就是分布式锁。接下来本文将为大家分享分布式锁的最佳实践。

一、超卖问题复现

1.1 现象

存在如下的几张表:

  • 商品表

  • 订单表

  • 订单item表

商品的库存为1,但是并发高的时候有多笔订单。

错误案例一:数据库update相互覆盖

直接在内存中判断是否有库存,计算扣减之后的值更新数据库,并发的情况下会导致相互覆盖发生:

@Transactional(rollbackFor = Exception.class)
public Long createOrder() throws Exception {
    Product product = productMapper.selectByPrimaryKey(purchaseProductId);
    // ... 忽略校验逻辑
​
    //商品当前库存
    Integer currentCount = product.getCount();
    //校验库存
    if (purchaseProductNum > currentCount) {
        throw new Exception("商品" + purchaseProductId + "仅剩" + currentCount + "件,无法购买");
    }
    // 计算剩余库存
    Integer leftCount = currentCount - purchaseProductNum;
    // 更新库存
    product.setCount(leftCount);
    product.setGmtModified(new Date());
    productMapper.updateByPrimaryKeySelective(product);
​
    Order order = new Order();
    // ... 省略 Set
    orderMapper.insertSelective(order);
​
    OrderItem orderItem = new OrderItem();
    orderItem.setOrderId(order.getId());
    // ... 省略 Set
    return order.getId();
}

错误案例二:扣减串行执行,但是库存被扣减为负数

在 SQL 中加入运算避免值的相互覆盖,但是库存的数量变为负数,因为校验库存是否足够还是在内存中执行的,并发情况下都会读到有库存:

@Transactional(rollbackFor = Exception.class)
public Long createOrder() throws Exception {
    Product product = productMapper.selectByPrimaryKey(purchaseProductId);
    // ... 忽略校验逻辑
​
    //商品当前库存
    Integer currentCount = product.getCount();
    //校验库存
    if (purchaseProductNum > currentCount) {
        throw new Exception("商品" + purchaseProductId + "仅剩" + currentCount + "件,无法购买");
    }
    // 使用 set count =  count - #{purchaseProductNum,jdbcType=INTEGER}, 更新库存
    productMapper.updateProductCount(purchaseProductNum,new Date(),product.getId());
    Order order = new Order();
    // ... 省略 Set
    orderMapper.insertSelective(order);
​
    OrderItem orderItem = new OrderItem();
    orderItem.setOrderId(order.getId());
    // ... 省略 Set
    return order.getId();
}

错误案例三:使用 synchronized 实现内存中串行校验,但是依旧扣减为负数

因为我们使用的是事务的注解,synchronized加在方法上,方法执行结束的时候锁就会释放,此时的事务还没有提交,另一个线程拿到这把锁之后就会有一次扣减,导致负数。

@Transactional(rollbackFor = Exception.class)
public synchronized Long createOrder() throws Exception {
    Product product = productMapper.selectByPrimaryKey(purchaseProductId);
    // ... 忽略校验逻辑
​
    //商品当前库存
    Integer currentCount = product.getCount();
    //校验库存
    if (purchaseProductNum > currentCount) {
        throw new Exception("商品" + purchaseProductId + "仅剩" + currentCount + "件,无法购买");
    }
    // 使用 set count =  count - #{purchaseProductNum,jdbcType=INTEGER}, 更新库存
    productMapper.updateProductCount(purchaseProductNum,new Date(),product.getId());
    Order order = new Order();
    // ... 省略 Set
    orderMapper.insertSelective(order);
​
    OrderItem orderItem = new OrderItem();
    orderItem.setOrderId(order.getId());
    // ... 省略 Set
    return order.getId();
}

1.2 解决办法

从上面造成问题的原因来看,只要是扣减库存的动作,不是原子性的。多个线程同时操作就会有问题。

  • 单体应用:使用本地锁 + 数据库中的行锁解决

  • 分布式应用:

  • 使用数据库中的乐观锁,加一个 version 字段,利用CAS来实现,会导致大量的 update 失败

  • 使用数据库维护一张锁的表 + 悲观锁 select,使用 select for update 实现

  • 使用Redis 的 setNX实现分布式锁

  • 使用zookeeper的watcher + 有序临时节点来实现可阻塞的分布式锁

  • 使用Redisson框架内的分布式锁来实现

  • 使用curator 框架内的分布式锁来实现

二、单体应用解决超卖的问题

正确示例:将事务包含在锁的控制范围内

保证在锁释放之前,事务已经提交。

//@Transactional(rollbackFor = Exception.class)
public synchronized Long createOrder() throws Exception {
    TransactionStatus transaction1 = platformTransactionManager.getTransaction(transactionDefinition);
    Product product = productMapper.selectByPrimaryKey(purchaseProductId);
    if (product == null) {
        platformTransactionManager.rollback(transaction1);
        throw new Exception("购买商品:" + purchaseProductId + "不存在");
    }
    
    //商品当前库存
    Integer currentCount = product.getCount();
    //校验库存
    if (purchaseProductNum > currentCount) {
        platformTransactionManager.rollback(transaction1);
        throw new Exception("商品" + purchaseProductId + "仅剩" + currentCount + "件,无法购买");
    }
​
    productMapper.updateProductCount(purchaseProductNum, new Date(), product.getId());
​
    Order order = new Order();
    // ... 省略 Set
    orderMapper.insertSelective(order);
​
    OrderItem orderItem = new OrderItem();
    orderItem.setOrderId(order.getId());
    // ... 省略 Set
    return order.getId();
    platformTransactionManager.commit(transaction1);
}

正确示例:使用synchronized的代码块

public Long createOrder() throws Exception {
    Product product = null;
    //synchronized (this) {
    //synchronized (object) {
    synchronized (DBOrderService2.class) {
        TransactionStatus transaction1 = platformTransactionManager.getTransaction(transactionDefinition);
        product = productMapper.selectByPrimaryKey(purchaseProductId);
        if (product == null) {
            platformTransactionManager.rollback(transaction1);
            throw new Exception("购买商品:" + purchaseProductId + "不存在");
        }
​
        //商品当前库存
        Integer currentCount = product.getCount();
        System.out.println(Thread.currentThread().getName() + "库存数:" + currentCount);
        //校验库存
        if (purchaseProductNum > currentCount) {
            platformTransactionManager.rollback(transaction1);
            throw new Exception("商品" + purchaseProductId + "仅剩" + currentCount + "件,无法购买");
        }
​
        productMapper.updateProductCount(purchaseProductNum, new Date(), product.getId());
        platformTransactionManager.commit(transaction1);
    }
​
    TransactionStatus transaction2 = platformTransactionManager.getTransaction(transactionDefinition);
​
    Order order = new Order();
    // ... 省略 Set
    orderMapper.insertSelective(order);
​
    OrderItem orderItem = new OrderItem();
    // ... 省略 Set
    orderItemMapper.insertSelective(orderItem);
    platformTransactionManager.commit(transaction2);
    return order.getId();

正确示例:使用Lock

private Lock lock = new ReentrantLock();
​
public Long createOrder() throws Exception{  
    Product product = null;
​
    lock.lock();
​
    TransactionStatus transaction1 = platformTransactionManager.getTransaction(transactionDefinition);
    try {
        product = productMapper.selectByPrimaryKey(purchaseProductId);
        if (product==null){
            throw new Exception("购买商品:"+purchaseProductId+"不存在");
        }
​
        //商品当前库存
        Integer currentCount = product.getCount();
        System.out.println(Thread.currentThread().getName()+"库存数:"+currentCount);
        //校验库存
        if (purchaseProductNum > currentCount){
            throw new Exception("商品"+purchaseProductId+"仅剩"+currentCount+"件,无法购买");
        }
​
        productMapper.updateProductCount(purchaseProductNum,new Date(),product.getId());
        platformTransactionManager.commit(transaction1);
    } catch (Exception e) {
        platformTransactionManager.rollback(transaction1);
    } finally {
        // 注意抛异常的时候锁释放不掉,分布式锁也一样,都要在这里删掉
        lock.unlock();
    }
​
    TransactionStatus transaction = platformTransactionManager.getTransaction(transactionDefinition);
    Order order = new Order();
    // ... 省略 Set
    orderMapper.insertSelective(order);
​
    OrderItem orderItem = new OrderItem();
    // ... 省略 Set
    orderItemMapper.insertSelective(orderItem);
    platformTransactionManager.commit(transaction);
    return order.getId();
}

三、常见分布式锁的使用

上面使用的方法只能解决单体项目,当部署多台机器的时候就会失效,因为锁本身就是单机的锁,所以需要使用分布式锁来实现。

3.1 数据库乐观锁

数据库中的乐观锁,加一个version字段,利用CAS来实现,乐观锁的方式支持多台机器并发安全。但是并发量大的时候会导致大量的update失败

3.2 数据库分布式锁

db操作性能较差,并且有锁表的风险,一般不考虑。

3.2.1 简单的数据库锁

select for update

直接在数据库新建一张表:

锁的code预先写到数据库中,抢锁的时候,使用select for update查询锁对应的key,也就是这里的code,阻塞就说明别人在使用锁。

// 加上事务就是为了 for update 的锁可以一直生效到事务执行结束
// 默认回滚的是 RunTimeException
@Transactional(rollbackFor = Exception.class)
public String singleLock() throws Exception {
    log.info("我进入了方法!");
    DistributeLock distributeLock = distributeLockMapper.
        selectDistributeLock("demo");
    if (distributeLock==null) {
        throw new Exception("分布式锁找不到");
    }
    log.info("我进入了锁!");
    try {
        Thread.sleep(1000);
    } catch (InterruptedException e) {
        e.printStackTrace();
    }
    return "我已经执行完成!";
}

<select id="selectDistributeLock" resultType="com.deltaqin.distribute.model.DistributeLock">
  select * from distribute_lock
  where businessCode = #{businessCode,jdbcType=VARCHAR}
  for update
</select>

使用唯一键作为限制,插入一条数据,其他待执行的SQL就会失败,当数据删除之后再去获取锁 ,这是利用了唯一索引的排他性。

insert lock

直接维护一张锁表:

@Autowired
private MethodlockMapper methodlockMapper;
​
@Override
public boolean tryLock() {
    try {
        //插入一条数据   insert into
        methodlockMapper.insert(new Methodlock("lock"));
    }catch (Exception e){
        //插入失败
        return false;
    }
    return true;
}
​
@Override
public void waitLock() {
    try {
        Thread.sleep(10);
    } catch (InterruptedException e) {
        e.printStackTrace();
    }
}
​
@Override
public void unlock() {
    //删除数据   delete
    methodlockMapper.deleteByMethodlock("lock");
    System.out.println("-------释放锁------");
}

3.3 Redis setNx

Redis 原生支持的,保证只有一个会话可以设置成功,因为Redis自己就是单线程串行执行的。

<dependency>
  <groupId>org.springframework.boot</groupId>
  <artifactId>spring-boot-starter-data-redis</artifactId>
</dependency>

spring.redis.host=localhost

封装一个锁对象:

@Slf4j
public class RedisLock implements AutoCloseable {
​
    private RedisTemplate redisTemplate;
    private String key;
    private String value;
    //单位:秒
    private int expireTime;
​
    /**
     * 没有传递 value,因为直接使用的是随机值
     */
    public RedisLock(RedisTemplate redisTemplate,String key,int expireTime){
        this.redisTemplate = redisTemplate;
        this.key = key;
        this.expireTime=expireTime;
        this.value = UUID.randomUUID().toString();
    }
​
    /**
     * JDK 1.7 之后的自动关闭的功能
     */
    @Override
    public void close() throws Exception {
        unLock();
    }
​
    /**
     * 获取分布式锁
     * SET resource_name my_random_value NX PX 30000
     * 每一个线程对应的随机值 my_random_value 不一样,用于释放锁的时候校验
     * NX 表示 key 不存在的时候成功,key 存在的时候设置不成功,Redis 自己是单线程,串行执行的,第一个执行的才可以设置成功
     * PX 表示过期时间,没有设置的话,忘记删除,就会永远不过期
     */
    public boolean getLock(){
        RedisCallback<Boolean> redisCallback = connection -> {
            //设置NX
            RedisStringCommands.SetOption setOption = RedisStringCommands.SetOption.ifAbsent();
            //设置过期时间
            Expiration expiration = Expiration.seconds(expireTime);
            //序列化key
            byte[] redisKey = redisTemplate.getKeySerializer().serialize(key);
            //序列化value
            byte[] redisValue = redisTemplate.getValueSerializer().serialize(value);
            //执行setnx操作
            Boolean result = connection.set(redisKey, redisValue, expiration, setOption);
            return result;
        };
​
        //获取分布式锁
        Boolean lock = (Boolean)redisTemplate.execute(redisCallback);
        return lock;
    }
​
    /**
     * 释放锁的时候随机数相同的时候才可以释放,避免释放了别人设置的锁(自己的已经过期了所以别人才可以设置成功)
     * 释放的时候采用 LUA 脚本,因为 delete 没有原生支持删除的时候校验值,证明是当前线程设置进去的值
     * 脚本是在官方文档里面有的
     */
    public boolean unLock() {
        // key 是自己才可以释放,不是就不能释放别人的锁
        String script = "if redis.call(\"get\",KEYS[1]) == ARGV[1] then\n" +
                "    return redis.call(\"del\",KEYS[1])\n" +
                "else\n" +
                "    return 0\n" +
                "end";
        RedisScript<Boolean> redisScript = RedisScript.of(script,Boolean.class);
        List<String> keys = Arrays.asList(key);
​
        // 执行脚本的时候传递的 value 就是对应的值
        Boolean result = (Boolean)redisTemplate.execute(redisScript, keys, value);
        log.info("释放锁的结果:"+result);
        return result;
    }
}

每次获取的时候,自己线程需要new对应的RedisLock:

public String redisLock(){
    log.info("我进入了方法!");
    try (RedisLock redisLock = new RedisLock(redisTemplate,"redisKey",30)){
        if (redisLock.getLock()) {
            log.info("我进入了锁!!");
            Thread.sleep(15000);
        }
    } catch (InterruptedException e) {
        e.printStackTrace();
    } catch (Exception e) {
        e.printStackTrace();
    }
    log.info("方法执行完成");
    return "方法执行完成";
}

3.4 zookeeper 瞬时znode节点 + watcher监听机制

临时节点具备数据自动删除的功能。当client与ZooKeeper连接和session断掉时,相应的临时节点就会被删除。zk有瞬时和持久节点,瞬时节点不可以有子节点。会话结束之后瞬时节点就会消失,基于zk的瞬时有序节点实现分布式锁:

  • 多线程并发创建瞬时节点的时候,得到有序的序列,序号最小的线程可以获得锁;

  • 其他的线程监听自己序号的前一个序号。前一个线程执行结束之后删除自己序号的节点;

  • 下一个序号的线程得到通知,继续执行;

  • 以此类推,创建节点的时候,就确认了线程执行的顺序。

<dependency>
  <groupId>org.apache.zookeeper</groupId>
  <artifactId>zookeeper</artifactId>
  <version>3.4.14</version>
  <exclusions>
    <exclusion>
      <groupId>org.slf4j</groupId>
      <artifactId>slf4j-log4j12</artifactId>
    </exclusion>
  </exclusions>
</dependency>

zk的观察器只可以监控一次,数据发生变化之后可以发送给客户端,之后需要再次设置监控。existscreategetChildren三个方法都可以添加watcher ,也就是在调用方法的时候传递true就是添加监听。注意这里Lock 实现了Watcher和AutoCloseable:

当前线程创建的节点是第一个节点就获得锁,否则就监听自己的前一个节点的事件:

/**
 * 自己本身就是一个 watcher,可以得到通知
 * AutoCloseable 实现自动关闭,资源不使用的时候
 */
@Slf4j
public class ZkLock implements AutoCloseable, Watcher {
​
    private ZooKeeper zooKeeper;
​
    /**
     * 记录当前锁的名字
     */
    private String znode;
​
    public ZkLock() throws IOException {
        this.zooKeeper = new ZooKeeper("localhost:2181",
                10000,this);
    }
​
    public boolean getLock(String businessCode) {
        try {
            //创建业务 根节点
            Stat stat = zooKeeper.exists("/" + businessCode, false);
            if (stat==null){
                zooKeeper.create("/" + businessCode,businessCode.getBytes(),
                        ZooDefs.Ids.OPEN_ACL_UNSAFE,
                        CreateMode.PERSISTENT);
            }
​
            //创建瞬时有序节点  /order/order_00000001
            znode = zooKeeper.create("/" + businessCode + "/" + businessCode + "_", businessCode.getBytes(),
                    ZooDefs.Ids.OPEN_ACL_UNSAFE,
                    CreateMode.EPHEMERAL_SEQUENTIAL);
​
            //获取业务节点下 所有的子节点
            List<String> childrenNodes = zooKeeper.getChildren("/" + businessCode, false);
            //获取序号最小的(第一个)子节点
            Collections.sort(childrenNodes);
            String firstNode = childrenNodes.get(0);
            //如果创建的节点是第一个子节点,则获得锁
            if (znode.endsWith(firstNode)){
                return true;
            }
            //如果不是第一个子节点,则监听前一个节点
            String lastNode = firstNode;
            for (String node:childrenNodes){
                if (znode.endsWith(node)){
                    zooKeeper.exists("/"+businessCode+"/"+lastNode,true);
                    break;
                }else {
                    lastNode = node;
                }
            }
            synchronized (this){
                wait();
            }
            return true;
        } catch (Exception e) {
            e.printStackTrace();
        }
        return false;
    }
​
    @Override
    public void close() throws Exception {
        zooKeeper.delete(znode,-1);
        zooKeeper.close();
        log.info("我已经释放了锁!");
    }
​
    @Override
    public void process(WatchedEvent event) {
        if (event.getType() == Event.EventType.NodeDeleted){
            synchronized (this){
                notify();
            }
        }
    }
}

3.5 zookeeper curator

在实际的开发中,不建议去自己“重复造轮子”,而建议直接使用Curator客户端中的各种官方实现的分布式锁,例如其中的InterProcessMutex可重入锁。

<dependency>
  <groupId>org.apache.curator</groupId>
  <artifactId>curator-recipes</artifactId>
  <version>4.2.0</version>
  <exclusions>
    <exclusion>
      <artifactId>slf4j-api</artifactId>
      <groupId>org.slf4j</groupId>
    </exclusion>
  </exclusions>
</dependency>

@Bean(initMethod="start",destroyMethod = "close")
public CuratorFramework getCuratorFramework() {
    RetryPolicy retryPolicy = new ExponentialBackoffRetry(1000, 3);
    CuratorFramework client = CuratorFrameworkFactory.
        newClient("localhost:2181", retryPolicy);
    return client;
}

框架已经实现了分布式锁。zk的Java客户端升级版。使用的时候直接指定重试的策略就可以。

官网中分布式锁的实现是在curator-recipes依赖中,不要引用错了。

@Autowired
private CuratorFramework client;
​
@Test
public void testCuratorLock(){
    InterProcessMutex lock = new InterProcessMutex(client, "/order");
    try {
        if ( lock.acquire(30, TimeUnit.SECONDS) ) {
            try  {
                log.info("我获得了锁!!!");
            }
            finally  {
                lock.release();
            }
        }
    } catch (Exception e) {
        e.printStackTrace();
    }
    client.close();
}

3.6 Redission

重新实现了Java并发包下处理并发的类,让其可以跨JVM使用,例如CHM等。

3.6.1 非SpringBoot项目引入

https://redisson.org/

引入Redisson的依赖,然后配置对应的XML即可:

<dependency>
  <groupId>org.redisson</groupId>
  <artifactId>redisson</artifactId>
  <version>3.11.2</version>
  <exclusions>
    <exclusion>
      <artifactId>slf4j-api</artifactId>
      <groupId>org.slf4j</groupId>
    </exclusion>
  </exclusions>
</dependency>

编写相应的redisson.xml

<beans xmlns="http://www.springframework.org/schema/beans"
       xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
       xmlns:context="http://www.springframework.org/schema/context"
       xmlns:redisson="http://redisson.org/schema/redisson"
       xsi:schemaLocation="
       http://www.springframework.org/schema/beans
       http://www.springframework.org/schema/beans/spring-beans.xsd
       http://www.springframework.org/schema/context
       http://www.springframework.org/schema/context/spring-context.xsd
       http://redisson.org/schema/redisson
       http://redisson.org/schema/redisson/redisson.xsd
">
​
    <redisson:client>
        <redisson:single-server address="redis://127.0.0.1:6379"/>
    </redisson:client>
</beans>

配置对应@ImportResource("classpath*:redisson.xml")资源文件。

3.6.2 SpringBoot项目引入

或者直接使用springBoot的starter即可。

https://github.com/redisson/redisson/tree/master/redisson-spring-boot-starter

<dependency>
  <groupId>org.redisson</groupId>
  <artifactId>redisson-spring-boot-starter</artifactId>
  <version>3.19.1</version>
</dependency>

修改application.properties即可:#spring.redis.host=

3.6.3 设置配置类

@Bean
public RedissonClient getRedissonClient() {
    Config config = new Config();
    config.useSingleServer().setAddress("redis://127.0.0.1:6379");
    return Redisson.create(config);
}

3.6.4 使用

@Test
public void testRedissonLock() {
    RLock rLock = redisson.getLock("order");
    try {
        rLock.lock(30, TimeUnit.SECONDS);
        log.info("我获得了锁!!!");
        Thread.sleep(10000);
    } catch (InterruptedException e) {
        e.printStackTrace();
    }finally {
        log.info("我释放了锁!!");
        rLock.unlock();
    }
}

3.7 Etcd

参考以下文章,普通项目不会为了一把锁引入etcd,此处不再赘述:

https://time.geekbang.org/column/article/350285

四、常见分布式锁的原理

4.1 Redisson

Redis 2.6之后才可以执行lua脚本,比起管道而言,这是原子性的,模拟一个商品减库存的原子操作:

//lua脚本命令执行方式:redis-cli --eval /tmp/test.lua , 10
jedis.set("product_stock_10016", "15");  
//初始化商品10016的库存
String script = " local count = redis.call('get', KEYS[1]) " +
        " local a = tonumber(count) " +
        " local b = tonumber(ARGV[1]) " +
        " if a >= b then " +
        "   redis.call('set', KEYS[1], a-b) " +
        "   return 1 " +
        " end " +
        " return 0 ";
Object obj = jedis.eval(script, Arrays.asList("product_stock_10016"), 
                        Arrays.asList("10"));
System.out.println(obj);

4.1.1 尝试加锁的逻辑

上面的org.redisson.RedissonLock#lock()通过调用自己方法内部的lock方法的org.redisson.RedissonLock#tryAcquire方法。之后调用 org.redisson.RedissonLock#tryAcquireAsync

首先调用内部的org.redisson.RedissonLock#tryLockInnerAsync:设置对应的分布式锁

到这里获取锁的逻辑就结束了,如果这里没有获取到,在Future的回调里面就会直接return,会在外层有一个while true的循环,订阅释放锁的消息准备被唤醒。如果说加锁成功,就开始执行锁续命逻辑。

4.1.2 锁续命逻辑

lua脚本最后是以毫秒为单位返回key的剩余过期时间。成功加锁之后org.redisson.RedissonLock#scheduleExpirationRenewal中将会调用org.redisson.RedissonLock#renewExpiration,这个方法内部就有锁续命的逻辑,是一个定时任务,等10s执行。

执行的时候尝试执行的续命逻辑使用的是Lua脚本,当前的锁有值,就续命,没有就直接返回0:

返回0之后外层会判断,延时成功就会再次调用自己,否则延时调用结束,不再为当前的锁续命。所以这里的续命不是一个真正的定时,而是循环调用自己的延时任务。

4.1.3 循环间隔抢锁机制

如果一开始就加锁成功就直接返回。

如果一开始加锁失败,没抢到锁的线程就会在while循环中尝试加锁,加锁成功就结束循环,否则等待当前锁的超时时间之后再次尝试加锁。所以实现逻辑默认是非公平锁:

里面有一个subscribe的逻辑,会监听对应加锁的key,当锁释放之后publish对应的消息,此时如果没有到达对应的锁的超时时间,也会尝试获取锁,避免时间浪费。

4.1.4 释放锁和唤醒其他线程的逻辑

前面没有抢到锁的线程会监听对应的queue,后面抢到锁的线程释放锁的时候会发送一个消息。

订阅的时候指定收到消息时候的逻辑:会唤醒阻塞之后执行while循环

4.1.5 重入锁的逻辑

存在对应的锁,就对对应的hash结构的value直接+1,和Java重入锁的逻辑是一致的。

4.2 RedLock解决非单体项目的Redis主从架构的锁失效

https://redis.io/docs/manual/patterns/distributed-locks/

查看Redis官方文档,对于单节点的Redis ,使用setnx和lua del删除分布式锁是足够的,但是主从架构的场景下:锁先加在一个master节点上,默认是异步同步到从节点,此时master挂了会选择slave为master,此时又可以加锁,就会导致超卖。但是如果使用zookeeper来实现的话,由于zk是CP的,所以CP不存在这样的问题。

Redis文档中给出了RedLock的解决办法,使用redLock真的可以解决吗?

4.2.1 RedLock 原理

基于客户端的实现,是基于多个独立的Redis Master节点的一种实现(一般为5)。client依次向各个节点申请锁,若能从多数个节点中申请锁成功并满足一些条件限制,那么client就能获取锁成功。它通过独立的N个Master节点,避免了使用主备异步复制协议的缺陷,只要多数Redis节点正常就能正常工作,显著提升了分布式锁的安全性、可用性。

注意图中所有的节点都是master节点。加锁超过半数成功,就认为是成功。具体流程:

  • 获取锁
    • 获取当前时间T1,作为后续的计时依据;
    • 按顺序地,依次向5个独立的节点来尝试获取锁 SET resource_name my_random_value NX PX 30000;
    • 计算获取锁总共花了多少时间,判断获取锁成功与否;
    • 时间:T2-T1;
    • 多数节点的锁(N/2+1);
    • 当获取锁成功后的有效时间,要从初始的时间减去第三步算出来的消耗时间;
    • 如果没能获取锁成功,尽快释放掉锁。
  • 释放锁
    • 向所有节点发起释放锁的操作,不管这些节点有没有成功设置过。

public String redlock() {
    String lockKey = "product_001";
    //这里需要自己实例化不同redis实例的redisson客户端连接,这里只是伪代码用一个redisson客户端简化了
    RLock lock1 = redisson.getLock(lockKey);
    RLock lock2 = redisson.getLock(lockKey);
    RLock lock3 = redisson.getLock(lockKey);
​
    /**
     * 根据多个 RLock 对象构建 RedissonRedLock (最核心的差别就在这里)
     */
    RedissonRedLock redLock = new RedissonRedLock(lock1, lock2, lock3);
    try {
        /**
         * waitTimeout 尝试获取锁的最大等待时间,超过这个值,则认为获取锁失败
         * leaseTime   锁的持有时间,超过这个时间锁会自动失效(值应设置为大于业务处理的时间,确保在锁有效期内业务能处理完)
         */
        boolean res = redLock.tryLock(10, 30, TimeUnit.SECONDS);
        if (res) {
            //成功获得锁,在这里处理业务
        }
    } catch (Exception e) {
        throw new RuntimeException("lock fail");
    } finally {
        //无论如何, 最后都要解锁
        redLock.unlock();
    }
​
    return "end";
}

但是,它的实现建立在一个不安全的系统模型上的,它依赖系统时间,当时钟发生跳跃时,也可能会出现安全性问题。分布式存储专家Martin对RedLock的分析文章,Redis作者的也专门写了一篇文章进行了反驳。

Martin Kleppmann:How to do distributed locking

https://martin.kleppmann.com/2016/02/08/how-to-do-distributed-locking.html

Antirez:Is Redlock safe?

http://antirez.com/news/101

4.2.2 RedLock 问题一:持久化机制导致重复加锁

如果是上面的架构图,一般生产都不会配置AOF的每一条命令都落磁盘,一般会设置一些间隔时间,比如1s,如果ABC节点加锁成功,有一个节点C恰好是在1s内加锁,还没有落盘,此时挂了,就会导致其他客户端通过CDE又会加锁成功。

4.2.3 RedLock 问题二:主从下重复加锁

除非多部署一些节点,但是这样会导致加锁时间变长,这样比较下来效果就不如zk了。

4.2.4 RedLock 问题三:时钟跳跃导致重复加锁

C节点发生了时钟跳跃,导致加上的锁没有到达实际的超时时间,就被误以为超时而释放,此时其他客户端就可以重复加锁了。

4.3 Curator

InterProcessMutex 可重入锁的分析

五、业务中使用分布式锁的注意点

获取的锁要设置有效期,假设我们未设置key自动过期时间,在Set key value NX 后,如果程序crash或者发生网络分区后无法与Redis节点通信,毫无疑问其他 client 将永远无法获得锁,这将导致死锁,服务出现中断。

SETNX和EXPIRE命令去设置key和过期时间,这也是不正确的,因为你无法保证SETNX和EXPIRE命令的原子性。

自己使用 setnx 实现Redis锁的时候,注意并发情况下不要释放掉别人的锁(业务逻辑执行时间超过锁的过期时间),导致恶性循环。一般:

1)加锁的时候需要指定value的内容是当前进程中的当前线程的唯一标记,不要使用线程ID作为当前线程的锁的标记,因为不同实例上的线程ID可能是一样的。

2)释放锁的逻辑会写在finally ,释放锁时候要判断锁对应的value,而且要使用lua脚本实现原子 del 操作。因为if逻辑判断完之后也可能失效导致删除别人的锁。

3)针对扣减库存这个逻辑,lua脚本里面实现Redis比较库存、扣减库存操作的原子性。通过判断Redis Decr命令的返回值即可。此命令会返回扣减后的最新库存,若小于0则表示超卖。

5.1 自己实现分布式锁的坑

5.1.1 setnx不关心锁的顺序导致删除别人的锁

锁失效之后,别人加锁成功,自己把别人的锁删了。

我们无法预估程序执行需要的锁的时间。

public String deductStock() {
    String lockKey = "lock:product_101";
    Boolean result = stringRedisTemplate.opsForValue().setIfAbsent(lockKey, "deltaqin");
    stringRedisTemplate.expire(lockKey, 10, TimeUnit.SECONDS);
​
    try {
        int stock = Integer.parseInt(stringRedisTemplate.opsForValue().get("stock")); // jedis.get("stock")
        if (stock > 0) {
            int realStock = stock - 1;
            stringRedisTemplate.opsForValue().set("stock", realStock + ""); // jedis.set(key,value)
            System.out.println("扣减成功,剩余库存:" + realStock);
        } else {
            System.out.println("扣减失败,库存不足");
        }
    } finally {
        stringRedisTemplate.delete(lockKey);
    }
​
    return "end";
}

5.1.2 setnx关心锁的顺序还是删除了别人的锁

并发会卡在各种地方,卡住的时候过期了,就会删掉别人加的锁:

错误的原因还是因为解锁的逻辑不是原子性的,这里可以参考Redisson的解锁逻辑使用lua脚本实现。

public String deductStock() {
    String lockKey = "lock:product_101";
    String clientId = UUID.randomUUID().toString();
    Boolean result = stringRedisTemplate.opsForValue().setIfAbsent(lockKey, clientId, 30, TimeUnit.SECONDS); //jedis.setnx(k,v)
    if (!result) {
        return "error_code";
    }
    try {
        int stock = Integer.parseInt(stringRedisTemplate.opsForValue().get("stock")); // jedis.get("stock")
        if (stock > 0) {
            int realStock = stock - 1;
            stringRedisTemplate.opsForValue().set("stock", realStock + ""); // jedis.set(key,value)
            System.out.println("扣减成功,剩余库存:" + realStock);
        } else {
            System.out.println("扣减失败,库存不足");
        }
    } finally {
        if (clientId.equals(stringRedisTemplate.opsForValue().get(lockKey))) {
            // 卡在这里,锁过期了,其他线程又可以加锁,此时又把其他线程新加的锁删掉了
            stringRedisTemplate.delete(lockKey);
        }
    }
    return "end";
}

解决办法

这种问题解决的办法就是使用锁续命,比如使用一个定时任务间隔小于锁的超时时间,每隔一段时间就给锁续命,除非线程自己主动删除。这也是Redisson的实现思路。

5.2 锁优化:分段加锁逻辑

针对一个商品,要开启秒杀的时候,会将商品的库存预先加载到Redis缓存中,比如有100个库存,此时可以分为5个key,每一个key有20个库存。可以把分布式锁的性能提升5倍。

例如:

  • product_10111_stock = 100
    • product_10111_stock1 = 20
    • product_10111_stock2 = 20
    • product_10111_stock3 = 20
    • product_10111_stock4 = 20
    • product_10111_stock5 = 20

请求来了可以随机可以轮询,扣减完之后就标记不要下次再分配到这个库存。

六、分布式锁的真相与选择

6.1 分布式锁的真相

需要满足的几个特性

  • 互斥:不同线程、进程互斥。
  • 超时机制:临界区代码耗时导致,网络原因导致。可以使用额外的线程续命保证。
  • 完备的锁接口:阻塞的和非阻塞的接口都要有,lock和tryLock。
  • 可重入性:当前请求的节点+ 线程唯一标识。
  • 公平性:锁唤醒时候,按照顺序唤醒。
  • 正确性:进程内的锁不会因为报错死锁,因为崩溃的时候整个进程都会结束。但是多实例部署时死锁就很容易发生,如果粗暴使用超时机制解决死锁问题,就默认了下面这个假设:
    • 锁的超时时间 >> 获取锁的时延 + 执行临界区代码的时间 + 各种进程的暂停(比如 GC)
    • 但上述假设其实无法保证的。

将分布式锁定位为,可以容忍非常小概率互斥语义失效场景下的锁服务。一般来说,一个分布式锁服务,它的正确性要求越高,性能可能就会越低。

6.2 分布式锁的选择

  • 数据库:db操作性能较差,并且有锁表的风险,一般不考虑。
    • 优点:实现简单、易于理解
    • 缺点:对数据库压力大
  • Redis:适用于并发量很大、性能要求很高而可靠性问题可以通过其他方案去弥补的场景。
    • 优点:易于理解
    • 缺点:自己实现、不支持阻塞
    • Redisson:相对于Jedis其实更多用在分布式的场景。
      • 优点:提供锁的方法,可阻塞
  • Zookeeper:适用于高可靠(高可用),而并发量不是太高的场景。
    • 优点:支持阻塞
    • 缺点:需理解Zookeeper、程序复杂
  • Curator
    • 优点:提供锁的方法
    • 缺点:Zookeeper,强一致,慢
  • Etcd:安全和可靠性上有保证,但是比较重。

不推荐自己编写的分布式锁,推荐使用Redisson和Curator实现的分布式锁。

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.coloradmin.cn/o/376934.html

如若内容造成侵权/违法违规/事实不符,请联系多彩编程网进行投诉反馈,一经查实,立即删除!

相关文章

AI绘画第一步,安装Stable-Diffusion-WebUI全过程 !

别玩那些小孩子的玩意儿了&#xff0c;大人应该玩这些^_^&#xff01;我是真没想到&#xff0c;AI绘画已经进化到这种程度了。实在是太&#xff01;逼&#xff01;真! 了&#xff01;不上重马赛克都不敢贴图&#xff01;看了这些图&#xff0c;有没有心痒痒的&#xff1f;今天就…

逆向-还原代码之除法 (Interl 64)

除法和32位差不多&#xff0c;毕竟背后的数学公式是一样的。区别只是32位的乘法需要两个寄存器来存放大数相乘的结果&#xff0c;而64位的不需要&#xff0c;一个寄存器就能存下。所以在64位的环境下&#xff0c;多了右移32位这条指令&#xff0c;其他指令一样。 //code #incl…

升级Android Studio Electric Eel问题汇总

1.升级以后找不到java可执行程序 问题原因&#xff1a;升级后&#xff0c;Android Studio自带的java目录不再是根目录/jre&#xff0c;调整为一个新目录 Studio根目录/jbr 修改方法&#xff1a;1&#xff09;修改系统环境变量&#xff0c; JAVA_HOME调整为Studio下对应的java…

烟厂能耗控制管理系统_烟厂能源管理信息系统

烟厂也是能耗大厂&#xff0c;为了更好的让烟厂完成资源调配、成本核算、能耗统计等&#xff0c;需要建立一套有效的能源数据管理系统&#xff0c;对能源进行监测&#xff0c;自动获取能源信息&#xff0c;方便查看厂区能源实时情况。烟厂能耗控制管理系统是利用信息化技术手段…

java多线程(七)线程等待与唤醒

一、wait()、notify()、notifyAll()等方法介绍 在Object.java中&#xff0c;定义了wait(), notify()和notifyAll()等接口。wait()的作用是让当前线程进入等待状态&#xff0c;同时&#xff0c;wait()也会让当前线程释放它所持有的锁。而notify()和notifyAll()的作用&#xff0…

阶段十:总结专题(第六章:缓存篇)

阶段十&#xff1a;总结专题&#xff08;第六章&#xff1a;缓存篇&#xff09;Day-第六章&#xff1a;缓存篇1. Redis 数据类型**String****List****Hash****Sorted Set**2. keys 命令问题3. 过期 key 的删除策略4. Redis 持久化**AOF 持久化****AOF 重写****RDB 持久化****混…

值得关注!可控生成!近期diffusion图像生成进展!

猜您喜欢&#xff1a;深入浅出stable diffusion&#xff1a;AI作画技术背后的潜在扩散模型论文解读戳我&#xff0c;查看GAN的系列专辑~&#xff01;一顿午饭外卖&#xff0c;成为CV视觉的前沿弄潮儿&#xff01;最新最全100篇汇总&#xff01;生成扩散模型Diffusion ModelsECC…

JS中三种主要的遍历对象的方法:for in、Object.keys、Object.getOwnProperty

1、for in 主要用于遍历对象的可枚举属性&#xff0c;包括自有属性、继承自原型的属性 var obj {“name”:“tom”,“sex”:“male”}&#xff1b; Object.defineProperty(obj, “age”, {value:“18”, enumerable:false});//增加不可枚举的属性age Object.prototype.pro…

基于强化学习的多模态优化问题解空间聚类进化算法

Reinforcement-Learning-Based Evolutionary Algorithm Using Solution Space Clustering For Multimodal Optimization Problems 基于强化学习的多模态优化问题解空间聚类进化算法 摘要 在进化算法中&#xff0c;如何有效地选择用于生成后代的交互式解决方案是一个具有挑战性的…

《数据库系统概论》学习笔记——第二章 : 关系数据库

教材为数据库系统概论第五版&#xff08;王珊&#xff09; 这一章前面部分基本概念比较多&#xff0c;但学会对后面的学习有很大帮助。基本出题方向就是关于关系数据库的一些概念&#xff08;比较多&#xff09;&#xff0c;然后计算题基本必考关系代数&#xff0c;一些基本的问…

UEditorPlus v2.9.0发布 文档仓库开源,修复若干问题

UEditor是由百度开发的所见即所得的开源富文本编辑器&#xff0c;基于MIT开源协议&#xff0c;该富文本编辑器帮助不少网站开发者解决富文本编辑器的难点。 UEditorPlus 是有 ModStart 团队基于 UEditor 二次开发的富文本编辑器&#xff0c;主要做了样式的定制&#xff0c;更符…

Hbase资源隔离操作指南

1.检查集群的环境配置 1.1 HBase版本号确认> 5.11.0 引入rsgroup的Patch&#xff1a; [HBASE-6721] RegionServer Group based Assignment - ASF JIRA RegionServer Group based Assignment 社区支持版本&#xff1a;2.0.0 引入rsgroup的CDH版本 5.11.0 https://www.…

高通平台开发系列讲解(Sensor篇)Gsensor基础知识

文章目录 一、什么是SENSOR?二、Sensor的分类及作用三、Gsensor的工作原理及介绍3.1、常见Gsensor3.2、Gsensor的特性沉淀、分享、成长,让自己和他人都能有所收获!😄 📢本篇文章将介绍 Sensor 基础 一、什么是SENSOR? 传感器(英文名称:sensor )是一种检测装置,能感…

【Windows】U盘用完不能直接拔的原因?

小时候刚开始学习使用电脑时&#xff0c;总是被告知&#xff0c;用完U盘之后&#xff0c;一定要在电脑上先安全弹出USB设备之后才能拔掉&#xff0c;那时候就一直不明白为什么要这么做&#xff0c;而且最气的是有时候点击了安全弹出&#xff0c;结果被告知“正在使用无法弹出”…

换了固态硬盘需要重装系统吗?教你如何实现不重装系统!

电脑大家都用过嘛&#xff0c;如果您的计算机装的还是机械硬盘&#xff0c;想必阁下肯定是修身养性的高手&#xff0c;因为在这个浮躁的社会中&#xff0c;是很少有人能够忍受5分钟甚至更久的开机时间的&#xff0c;不仅开机慢&#xff0c;应用程序的响应速度也很慢&#xff0c…

STM32开发(16)----CubeMX配置DMA

CubeMX配置DMA前言一、什么是DMA&#xff1f;二、实验过程1.CubeMX配置2.代码实现3.实验结果总结前言 本章介绍使用STM32CubeMX对DMA进行配置的方法&#xff0c;DMA的原理、概念和特点&#xff0c;配置各个步骤的功能&#xff0c;并通过串口DMA传输实验方式验证。 一、什么是…

华为OD机试题,用 Java 解【最短耗时】问题

最近更新的博客 华为OD机试题,用 Java 解【停车场车辆统计】问题华为OD机试题,用 Java 解【字符串变换最小字符串】问题华为OD机试题,用 Java 解【计算最大乘积】问题华为OD机试题,用 Java 解【DNA 序列】问题华为OD机试 - 组成最大数(Java) | 机试题算法思路 【2023】使…

关于IB学习,大学申请需要了解什么?

越来越多的孩子选择高中上IB课程&#xff0c;然而在IB学习中&#xff0c;会遇到很多问题&#xff0c;尤其是大学申请的问题&#xff0c;很多人还不是很了解。这里小编给大家整理了详细的解答。另外&#xff0c;还罗列了关于IB考试之后的相关问题&#xff0c;希望都能帮到学习IB…

pytorch入门6--数据分析(pandas)

pandas是基于Numpy构建的&#xff0c;提供了众多比NumPy更高级、更直观的数据处理功能&#xff0c;尤其是它的DataFrame数据结构&#xff0c;可以用处理数据库或电子表格的方式来处理分析数据。 使用Pandas前&#xff0c;需导入以下内容&#xff1a; import numpy as np from …

数据结构与算法之冒泡排序(含改进版)

目录冒泡排序概念代码实现时间复杂度代码改进冒泡排序概念 冒泡排序&#xff08;Bubble Sort&#xff09;是一种简单的排序算法。它重复地遍历要排序的数列&#xff0c;一次比较两个元素&#xff0c;如果他们的顺序错误就把他们交换过来。遍历数列的工作是重复地进行直到没有再…