分布式锁的应用场景与分布式锁实现(二):基于Redis实现分布式锁

news2025/1/22 17:02:18

分布式锁的应用场景与分布式锁实现(一):传统锁处理并发及传统锁的问题

基于Redis实现分布式锁

所有代码已同步到GitCode:https://gitcode.net/ruozhuliufeng/distributed-project.git

基本实现

​ 借助Redis中的命令setnx(key,value),key不存在就新增,存在就什么都不做。同时有多个客户端发送setnx命令,只有一个客户端可以成功,返回1(true);其他客户端返回0(false)。在这里插入图片描述

  • 多个客户端同时获取锁(setnx)
  • 获取成功,执行业务逻辑,执行完成释放锁(del)
  • 其他客户端等待重试

​ 改造StockService方法:

    /**
     * 减库存
     */
    @Override
    public void checkAndLock() {
         // 加锁setnx
        Boolean lock = this.redisTemplate.opsForValue().setIfAbsent("lock", "1");
        // 重试:递归调用
        if (!lock){
            try {
                Thread.sleep(50);
                this.deduct();
            } catch (InterruptedException e) {
                e.printStackTrace();
            }
        } else {
            try {
                // 1. 查询库存信息
                String stock = redisTemplate.opsForValue().get("stock").toString();

                // 2. 判断库存是否充足
                if (stock != null && stock.length() != 0) {
                    Integer st = Integer.valueOf(stock);
                    if (st > 0) {
                        // 3.扣减库存
                        redisTemplate.opsForValue().set("stock", String.valueOf(--st));
                    }
                }
            } finally {
                // 解锁
                this.redisTemplate.delete("lock");
            }
        }
    }

​ 其中,加锁也可以使用循环:

// 加锁,获取锁失败重试
while (!this.redisTemplate.opsForValue().setIfAbsent("lock", "1")){
    try {
        Thread.sleep(40);
    } catch (InterruptedException e) {
        e.printStackTrace();
    }
}

​ 解锁:

      this.redisTemplate.delete("lock");

​ 重置缓存值,使用Jmeter进行压力测试,并获取库存余量:0

防死锁

代码均已上传至GitCode,可根据提交信息获取文件的更改内容

]

​ 问题:setnx刚刚获取到锁,当前服务器宕机,导致del释放锁无法执行,进而导致锁无法释放(死锁)

​ 解决:给锁设置过期时间,自动释放锁

​ 设置过期时间的两种方式:

  • 通过expire设置过期时间(缺乏原子性:如果setnx与expire之间出现异常,锁也无法释放)
  • 通过set指令设置过期时间:set key value ex 3 nx(既达到setnx的效果,又设置了过期时间)

防误删

​ 问题:可能会释放其他服务器的锁

​ 场景:如果业务逻辑的执行时间是7s。执行流程如下:

  • index1业务逻辑没执行完,3秒后锁被自动释放
  • index2业务获取到锁,执行业务逻辑,3秒后锁被自动释放
  • index3业务获取到锁,执行业务逻辑
  • index1业务逻辑执行完成,开始调用del释放锁,这时释放的是index3的锁,导致index3的业务只执行1s就被别人释放
  • 最终等于没锁的情况,仍会导致超卖现象发生

​ 解决:setnx获取到锁时,设置一个指定的唯一值(例如:uuid),释放前获取这个值,判断是否是自己的锁。

(img-Be8UMpg2-1685597872695)

​ 实现如下:

img-u3wlagwy-1685597872697

​ 问题:删除操作缺乏原子性

​ 场景:

  • index1执行删除时,查询到的lock值确实和uuid相同
  • index1执行删除前,lock刚好过期时间已到,被redis自动释放
  • index2获取到了lock
  • index1执行了删除,此时会把index2的lock删除

​ 解决方案:没有一个命令可以同时做到判断+删除,所以只能通过其他方式实现(lua脚本)

Redis中的lua脚本

lua脚本可以一次性发送多个指令给redis,由于Redis是单线程的,执行指令遵守one-by-one规则

现实问题

​ Redis采用单线程架构,可以保证单个命令的原子性,但是无法保证一组命令在高并发场景下的原子性。例如:

(img-DRy9Navd-1685597872698)

​ 在串行场景下:A和B的值肯定都是3

​ 在并发场景下:A和B的值可能在0-6之间。

​ 极限情况下1:

(img-i701hDvz-1685597872700)

​ 则A的结果是0,B的结果是3

​ 极限情况下2:

(img-msTewSuq-1685597872701)

​ 则A和B的结果都是6。

​ 如果Redis客户端通过lua脚本把3个命令一次性发送给redis服务器,那么这三个指令就不会被其他客户端指令打断。Redis也保证脚本会以原子性(atomic)的方式执行:当某个脚本正在运行的时候,不会有其他脚本或Redis命令被执行。这和使用MULTI/EXEC包围的事务很类似。

​ 但是MULTI/EXEC方法来使用事务功能,将一组命令打包执行,无法进行业务逻辑的操作。这期间有某一条命令执行报错(例如给字符串自增),其他的命令还是会执行,并不会回滚。

lua介绍

​ lua是一种轻量小巧的脚本语言,用标准C语言编写并以源代码形式开放,其设计目的是为了嵌入应用程序中,从而为应用程序提供灵活的扩展和定制功能。

设计目的

​ 其设计目的是为了嵌入应用程序中,从而为应用程序提供灵活的扩展和定制功能。

lua特性

  • 轻量级:它用标准C语言编写并以源代码形式开放,编译后仅仅一百余K,可以很方便的嵌入到别的程序里。
  • 可扩展:Lua提供了非常易于使用的扩展接口和机制:由于宿主语言(通常是C或C++)提供这些功能,Lua可以使用它们,就像是本来就内置的功能一样。
  • 其他特性:
    • 支持面向过程(procedure-oriented)编程和函数式变成(functional programming);
    • 自动内存管理;只提供了一种通用类型的表(table),用它可以实现数组、哈希表、集合、对象;
    • 语言内置模式匹配;闭包(closure);函数也可以看做一个值;提供多线程(协同进程,并非操作系统所支持的线程)支持;
    • 通过闭包和table可以很方便的支持面向对象变成所需要的一些关键机制,比如数据抽象、虚函数、继承和重载等。
lua基本语言

​ 这里不做深究,感兴趣可以到官方教程或菜鸟教程,这里以Redis中可能会用到的部分语法作介绍。

变量:

a = 5             -- 全局变量
local b = 10      -- 局部变量,redis只支持局部变量
a,b = 10,2*x      -- 等价于   a = 10;  b = 2*x

流程控制:

if(布尔表达式 1)
then 
    -- [ 在布尔表达式 1 为true时执行改语句块 ]
elseif(布尔表达式 2)
then 
    -- [ 在布尔表达式 2 为true时执行改语句块 ]
else 
    -- [ 在以上表达式都不为true时执行改语句块 ]
end
Redis执行lua脚本-EVAL指令

​ 在Redis中需要通过eval命令执行lua脚本。

​ 格式:

EVAL script numkeys key [key ...] arg [arg ...]
script: lua脚本字符串,这段lua脚本不需要(也不应该)定义函数
numkeys: lua脚本中keys数组的大小
key [key ...]: KEYS数组中的元素
arg [arg ...]: ARGV数组中的元素
  • 案例1:基本案例
EVAL "return 10" 0
# 输出:10
  • 案例2:动态传参
EVAL "return {KEYS[1],KEYS[2],ARGV[1],ARGV[2]}" 5 10 20 30 40 50 60 70 80 90
# 输出:10 20 60 70 
# 下标从1开始
EVAL "if KEYS[1] > ARGV[1] then return 1 else return 0 end" 1 10 20 
# 输出:0
EVAL "if KEYS[1] > ARGV[1] then return 1 else return 0 end" 1 20 10 
# 输出:1

​ 传入了两个参数 10 和 20,KEYS的长度是1,所以KEYS中有一个元素10,剩下的一个20就是ARGV数组中的元素

​ redis.call()中的redis是redis中提供的lua脚本类库,仅在redis环境中使用该类库。

  • 案例3:执行redis类库方法
set a 10 -- 设置一个a 值为10 
EVAL "return redis.call('get','a')" 0
# 通过return把call方法返回给redis客户端,打印:10

​ 注意:**脚本里使用的所有键都应该由KEYS数组来传递。**但并不是强制性的,代价是这样写出的脚本不能被Redis集群所兼容。

  • 案例4:给Redis类库方法动态传参
EVAL "return redis.call('set',KEYS[1],ARGV[1])" 1 b 20

(img-NNiGUiA1-1685597872706)

​ 以上案例基本可以应付Redis分布式锁所需要的脚本知识了。

  • 案例5:pcall函数的使用(了解)
-- 当call()在执行命令的过程中发生错误时,脚本会停止运行,并返回一个脚本错误,输出错误信息
EVAL "return redis.call('sets', KEYS[1], ARGV[1]),redis.call('set', KEYS[2], ARGV[2])" 2 c d 20 30 
- pcall函数不影响后续指令的执行
EVAL "return redis.pcall('sets',KEYS[1],ARGV[1]),redis.pcall('set' ,KEYS[2],ARGV[2])" 2 c d 20 30 

注意:set方法写成了sets,肯定会报错。

(img-wvrw7Da1-1685597872707)

使用lua保证删除原子性

​ 删除lua脚本:

if redis.call('get',KEYS[1]) == ARGV[1] then return redis.call('del',KEYS[1]) else return 0 end

​ 更新代码:

package tech.msop.distributed.lock.service.impl;

import com.baomidou.mybatisplus.core.conditions.query.QueryWrapper;
import com.baomidou.mybatisplus.core.conditions.update.UpdateWrapper;
import com.baomidou.mybatisplus.extension.service.impl.ServiceImpl;
import lombok.extern.slf4j.Slf4j;
import org.jetbrains.annotations.NotNull;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.context.annotation.Scope;
import org.springframework.context.annotation.ScopedProxyMode;
import org.springframework.dao.DataAccessException;
import org.springframework.data.redis.core.RedisOperations;
import org.springframework.data.redis.core.SessionCallback;
import org.springframework.data.redis.core.StringRedisTemplate;
import org.springframework.data.redis.core.script.DefaultRedisScript;
import org.springframework.stereotype.Service;
import org.springframework.transaction.annotation.Transactional;
import org.springframework.util.CollectionUtils;
import org.springframework.util.StringUtils;
import tech.msop.distributed.lock.constants.StockConstant;
import tech.msop.distributed.lock.entity.StockEntity;
import tech.msop.distributed.lock.mapper.StockMapper;
import tech.msop.distributed.lock.service.IStockService;

import java.util.Arrays;
import java.util.List;
import java.util.UUID;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.locks.ReentrantLock;

/**
 * 库存服务实现类 <br/>
 */
@Service
@Slf4j
public class StockServiceImpl extends ServiceImpl<StockMapper, StockEntity>
        implements IStockService {

    @Autowired
    private StringRedisTemplate redisTemplate;

    /**
     * 减库存
     */
    @Override
    public void checkAndLock() {
        String uuid = UUID.randomUUID().toString();
        // 加锁 setnx
        while (!redisTemplate.opsForValue().setIfAbsent("lock", uuid,3, TimeUnit.SECONDS)) {
            // 重试循环
            try {
                Thread.sleep(30);
            } catch (InterruptedException e) {
                throw new RuntimeException(e);
            }
        }
        try {
            // 1. 查询库存信息
            String stock = redisTemplate.opsForValue().get("stock");
            // 2. 判断库存是否充足
            if (stock != null && stock.length() != 0) {
                Integer st = Integer.valueOf(stock);
                if (st > 0) {
                    // 3.更新到数据库
                    redisTemplate.opsForValue().set("stock", String.valueOf(--st));
                }
            }
        } finally {
            String script = "if redis.call('get',KEYS[1]) == ARGV[1] " +
                    "then " +
                    "     return redis.call('del',KEYS[1]) " +
                    "else " +
                    "     return 0 " +
                    "end";
            redisTemplate.execute(new DefaultRedisScript<>(script, Boolean.class), Arrays.asList("lock"),uuid);
        }
    }
}

​ 进行压力测试并查询库存余量:0

(img-Cg9cLIMS-1685597872709)

可重入锁

​ 以上加锁命令存在一个问题,由于加锁命令使用了SETNX,一旦键存在就无法再设置成功,这就导致后续同一线程内继续加锁,将会加锁失败。当一个线程执行一段代码成功获取锁之后,继续执行,又遇到加锁的子任务代码,可重入性就保证线程能继续执行,而不可重入就是需要等待锁释放之后,再次获取锁成功,才能继续往下执行。

​ 用一段Java代码解释可重入:

public synchronized void a(){
    b();
}
public synchronized void b(){
   // pass
}

​ 假设X线程在a方法获取锁之后,继续执行b方法,如果此时不可重入,线程就必须等待锁释放,再次争抢锁。

​ 锁明明是X线程拥有,却还需要等待自己释放锁,然后再去抢锁,这看起来就很奇怪,我释放我自己。

​ 而可重入性就可以解决这个尴尬的问题,当线程拥有锁之后,往后再遇到加锁方法,直接将加锁次数加1,然后在执行方法逻辑。退出加锁方法之后,加锁此处再减1,当加锁次数为0时,锁才被真正的释放。

​ 可以看到可重入锁的最大特性就是计数,计算加锁的次数。所以当可重入锁需要在分布式环境实现时,我们也就需要统计加锁次数。

​ 通过阅读JDK中的可重入锁源码,可知可重入锁ReentrantLock的加锁流程:ReentrantLock.lock) —> NonfairSync.lock() —> AQS.acquire(1) —> NonfairSync.tryAcquire(1) —> Sync.nonfairTryAcquire(1)

  • CAS获取锁,如果没有线程占用锁 (state == 0),加锁成功并记录当前线程为有锁线程(两次)
  • 如果state的值不为0,说明锁已经占用,则判断当前线程是否为有锁线程,若是,则重入(state + 1)
  • 若否,加锁失败,入队等待

​ 可重入锁的解锁流程:Reentrant.unlock() —> AQS.release(1) —> SyncRelease(1)

  • 判断当前线程是否为有锁线程,不是则抛出异常
  • 对state的值减1之后,判断state的值是否为0,为0则解锁成功,返回true
  • 如果减1后的值不为0,返回false

​ 确定解决方案:Redis+hash

加锁脚本

参照ReentrantLock中的非公平可重入锁实现分布式可重入锁:hash + lua脚本

​ Redis提供了Hash(哈希表)这种可以存储键值对数据结构。所以我们可以使用Redis Hash存储锁的重入次数,然后利用lua脚本判断逻辑。通过JDK的可重入锁分析,继续分析Redis中的加锁流程:

  • 判断锁是否存在(exists),不存在则直接获取锁 hset key field value
  • 如果锁存在则判断是否是自己的锁(hexists),如果是自己的锁则重入:hincrby key field increment
  • 否则重试:递归/循环
  • 锁lua脚本
if (redis.call('exists', KEYS[1]) == 0 or redis.call('hexists', KEYS[1], ARGV[1]) == 1) 
then
    redis.call('hincrby', KEYS[1], ARGV[1], 1);
    redis.call('expire', KEYS[1], ARGV[2]);
    return 1;
else
	return 0;
end

-- key: lock
-- arg: uuid  30

​ 假设值为:KEYS[lock],ARGV[uuid,expire]

​ 如果锁不存在或者这是自己的锁,就通过hincrby(不存在就新增并加1,存在就加1)获取锁或者锁此处加1

解锁脚本

​ 分析Redis的解锁流程:

  • 判断自己的锁是否存在(hexists),不存在则返回nil
  • 如果自己的锁存在,则减1(hincrby - 1),判断减一后的值是否为0,为0,则返回1
  • 不为0,返回0
  • 解锁lua脚本
-- 判断hash set 可重入key的值是否等于0
-- 如果为 nil 代表 自己的锁不存在,在尝试解其他线程的锁,解锁失败
-- 如果为 0 代表 可重入次数被减1
-- 如果为 1 代表 该可重入 key 解锁成功
if(redis.call('hexists', KEYS[1], ARGV[1]) == 0) then 
    return nil; 
elseif(redis.call('hincrby', KEYS[1], ARGV[1], -1) > 0) then 
    return 0; 
else 
    redis.call('del', KEYS[1]); 
    return 1; 
end

-- key: lock
-- arg: uuid
代码实现

​ 由于后续会有基于Zookeeper和基于MySQL实现的分布式锁,我们可以通过工厂类,获取不同类型的分布式锁。

​ DistributedLockClient工厂类具体实现:

@Component
public class DistributedLockClient {

    @Autowired
    private StringRedisTemplate redisTemplate;

    private String uuid;

    public DistributedLockClient() {
        this.uuid = UUID.randomUUID().toString();
    }

    public DistributedRedisLock getRedisLock(String lockName){
        return new DistributedRedisLock(redisTemplate, lockName, uuid);
    }
}

​ DistributedRedisLock实现如下:

public class DistributedRedisLock implements Lock {

    private StringRedisTemplate redisTemplate;

    private String lockName;

    private String uuid;

    private long expire = 30;

    public DistributedRedisLock(StringRedisTemplate redisTemplate, String lockName, String uuid) {
        this.redisTemplate = redisTemplate;
        this.lockName = lockName;
        this.uuid = uuid;
    }

    @Override
    public void lock() {
        this.tryLock();
    }

    @Override
    public void lockInterruptibly() throws InterruptedException {

    }

    @Override
    public boolean tryLock() {
        try {
            return this.tryLock(-1L, TimeUnit.SECONDS);
        } catch (InterruptedException e) {
            e.printStackTrace();
        }
        return false;
    }

    /**
     * 加锁方法
     * @param time
     * @param unit
     * @return
     * @throws InterruptedException
     */
    @Override
    public boolean tryLock(long time, TimeUnit unit) throws InterruptedException {
        if (time != -1){
            this.expire = unit.toSeconds(time);
        }
        String script = "if redis.call('exists', KEYS[1]) == 0 or redis.call('hexists', KEYS[1], ARGV[1]) == 1 " +
                "then " +
                "   redis.call('hincrby', KEYS[1], ARGV[1], 1) " +
                "   redis.call('expire', KEYS[1], ARGV[2]) " +
                "   return 1 " +
                "else " +
                "   return 0 " +
                "end";
        while (!this.redisTemplate.execute(new DefaultRedisScript<>(script, Boolean.class), Arrays.asList(lockName), getId(), String.valueOf(expire))){
            Thread.sleep(50);
        }
        return true;
    }

    /**
     * 解锁方法
     */
    @Override
    public void unlock() {
        String script = "if redis.call('hexists', KEYS[1], ARGV[1]) == 0 " +
                "then " +
                "   return nil " +
                "elseif redis.call('hincrby', KEYS[1], ARGV[1], -1) == 0 " +
                "then " +
                "   return redis.call('del', KEYS[1]) " +
                "else " +
                "   return 0 " +
                "end";
        Long flag = this.redisTemplate.execute(new DefaultRedisScript<>(script, Long.class), Arrays.asList(lockName), getId());
        if (flag == null){
            throw new IllegalMonitorStateException("this lock doesn't belong to you!");
        }
    }

    @Override
    public Condition newCondition() {
        return null;
    }

    /**
     * 给线程拼接唯一标识
     * @return
     */
    String getId(){
        return uuid + ":" + Thread.currentThread().getId();
    }
}
使用及测试

​ 在业务代码中使用:

package tech.msop.distributed.lock.service.impl;

import com.baomidou.mybatisplus.extension.service.impl.ServiceImpl;
import lombok.extern.slf4j.Slf4j;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.data.redis.core.StringRedisTemplate;
import org.springframework.stereotype.Service;
import tech.msop.distributed.lock.constants.StockConstant;
import tech.msop.distributed.lock.entity.StockEntity;
import tech.msop.distributed.lock.lock.DistributedLockClient;
import tech.msop.distributed.lock.lock.DistributedRedisLock;
import tech.msop.distributed.lock.mapper.StockMapper;
import tech.msop.distributed.lock.service.IStockService;

/**
 * 库存服务实现类 <br/>
 */
@Service
@Slf4j
public class StockServiceImpl extends ServiceImpl<StockMapper, StockEntity>
        implements IStockService {

    @Autowired
    private StringRedisTemplate redisTemplate;

    @Autowired
    private DistributedLockClient distributedLockClient;
    /**
     * 减库存
     */
    @Override
    public void checkAndLock() {
        DistributedRedisLock redisLock = this.distributedLockClient.getRedisLock("lock");
        redisLock.lock();
        try {
            // 1. 查询库存信息
            String stock = redisTemplate.opsForValue().get("stock").toString();
            // 2. 判断库存是否充足
            if (stock != null && stock.length() != 0) {
                Integer st = Integer.valueOf(stock);
                if (st > 0) {
                    // 3.扣减库存
                    redisTemplate.opsForValue().set("stock", String.valueOf(--st));
                }
            }
        } finally {
            redisLock.unlock();
        }
    }
}

​ 使用Jmet测试并查询库存余量:0

img-aWnBhXQ9-1685597872710

​ 测试可重入性:

在这里插入图片描述

自动续期

​ 借助Timer定时器+lua脚本实现自动续期:

if (redis.call('hexists',KEYS[1],ARGV[1]) == 1)
then 
    return redis.call('expire',KEYS[1],ARGV[2])
else 
    return 0
end

-- key: lock
-- arg: uuid  30

​ 修改Redis分布式锁,实现锁自动续期:

package tech.msop.distributed.lock.lock;

import org.springframework.data.redis.core.StringRedisTemplate;
import org.springframework.data.redis.core.script.DefaultRedisScript;

import java.util.Arrays;
import java.util.Timer;
import java.util.TimerTask;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.locks.Condition;
import java.util.concurrent.locks.Lock;

/**
 * 基于Redis实现分布式锁
 */
public class DistributedRedisLock implements Lock {
    private final StringRedisTemplate redisTemplate;
    private final String lockName;
    private final String uuid;
    private long expire = 30;

    public DistributedRedisLock(StringRedisTemplate redisTemplate, String lockName, String uuid) {
        this.redisTemplate = redisTemplate;
        this.lockName = lockName;
        this.uuid = uuid + ":" + Thread.currentThread().getId();
    }

    @Override
    public void lock() {
        this.tryLock();
    }

    @Override
    public void lockInterruptibly() throws InterruptedException {

    }

    @Override
    public boolean tryLock() {
        try {
            return this.tryLock(-1L, TimeUnit.SECONDS);
        } catch (InterruptedException e) {
            e.printStackTrace();
        }
        return false;
    }

    /**
     * 加锁方法
     *
     * @param time 超时时间
     * @param unit 时间单位
     * @return 加锁是否成功
     * @throws InterruptedException 异常
     */
    @Override
    public boolean tryLock(long time, TimeUnit unit) throws InterruptedException {
        if (time != -1) {
            this.expire = unit.toSeconds(time);
        }
        String script = "if redis.call('exists', KEYS[1]) == 0 or redis.call('hexists', KEYS[1], ARGV[1]) == 1 " +
                "then " +
                "   redis.call('hincrby', KEYS[1], ARGV[1], 1) " +
                "   redis.call('expire', KEYS[1], ARGV[2]) " +
                "   return 1 " +
                "else " +
                "   return 0 " +
                "end";
        while (!this.redisTemplate.execute(new DefaultRedisScript<>(script, Boolean.class), Arrays.asList(lockName), uuid, String.valueOf(expire))) {
            Thread.sleep(50);
        }
        // 在加锁成功返回之前,开启定时器,自动续期
        renewExpire();
        return true;
    }

    /**
     * 解锁方法
     */
    @Override
    public void unlock() {
        String script = "if redis.call('hexists', KEYS[1], ARGV[1]) == 0 " +
                "then " +
                "   return nil " +
                "elseif redis.call('hincrby', KEYS[1], ARGV[1], -1) == 0 " +
                "then " +
                "   return redis.call('del', KEYS[1]) " +
                "else " +
                "   return 0 " +
                "end";
        Long flag = this.redisTemplate.execute(new DefaultRedisScript<>(script, Long.class), Arrays.asList(lockName), uuid);
        if (flag == null) {
            throw new IllegalMonitorStateException("this lock doesn't belong to you!");
        }
    }

    @Override
    public Condition newCondition() {
        return null;
    }

    /**
     * 给线程拼接唯一标识
     *
     * @return 唯一标识
     */
//    String getId() {
//        return uuid + ":" + Thread.currentThread().getId();
//    }

    private void renewExpire(){
        String script = "if redis.call('hexists', KEYS[1], ARGV[1]) == 1 " +
                "then " +
                "   return redis.call('expire', KEYS[1], ARGV[2]) " +
                "else " +
                "   return 0 " +
                "end";
        new Timer().schedule(new TimerTask() {
            @Override
            public void run() {
                if (redisTemplate.execute(new DefaultRedisScript<>(script,Boolean.class),Arrays.asList(lockName),uuid,String.valueOf(expire))){
                    renewExpire();
                }
            }
        },this.expire * 1000 /3);
    }

}

​ 在tryLock方法中使用:

(img-rQ2oZf5e-1685597872720)
构造方法作如下修改:

(img-rQ2oZf5e-1685597872720)
解锁方法作如下修改:

(img-gW6NoWxY-1685597872728)

手写分布式锁小结

特征
  • 独占排他使用 setnx

  • 防止死锁:设置锁的过期时间

    • 如果Redis客户端程序从Redis服务中获取到锁突然宕机,无法执行后续操作并释放锁,则其他程序也无法获取到锁,并导致服务阻塞
    • 解决:设置锁的过期时间,即使未手动释放锁,也会在一定时间后自动过期释放
    • 不可重入:需要可重入
  • 原子性

    • 获取锁和设置过期时间必须具有原子性:set key value ex 3 nx
    • 判断和释放锁之间,也需要原子性:借助lua脚本实现
  • 防误删:解铃还须系铃人

    • 先判断是否是自己的锁,再删除
  • 可重入性:hash(key field value) + lua脚本

  • 自动续期:Timer定时器 + lua脚本

    • 程序执行时间过长,超过锁的过期时间,为了防止锁机制失效,需要判断程序是否执行完成,未完成则需要续期锁的过期时间
  • 在集群情况下,导致锁机制失效:

    • 客户端程序C1,从主服务器中获取锁
    • 从服务器还没来得及同步数据,主服务器宕机
    • 于是从服务器升级为主服务器
    • 客户端程序C2就从新主服务器中获取到锁,导致锁机制失效
锁操作
加锁
  • setnx:独占排他、死锁、不可重入、原子性
  • set k v ex 30 nx:独占排他、死锁 不可重入
  • hash + lua脚本:可重入锁
    • 判断锁是否被占用(exists),如果没有被占用则直接获取锁(hset/hincrby),并设置过期时间(expire)
    • 如果锁被占用,则判断是否当前线程占用的(hexists),如果是则重入(hincrby)并重置过期时间(expire)
    • 否则获取锁失败,在代码中重试
  • Time定时器 + lua脚本:实现锁的自动续期
    • 判断锁是否是自己的锁(hexists==1),如果是自己的锁,则执行expire重置过期时间
解锁
  • del:可能导致误删
  • 先判断在删除同时保证原子性:lua脚本
  • hash + lua脚本:可重入
    • 判断当前线程的锁是否存在,不存在则返回nil,将来在代码中排除异常
    • 存在则直接减1(hincrby - 1),判断减1后的值是否为0,为0则释放锁(del),并返回1
    • 不为0,则返回0
获取锁重试
  • 递归获取锁
  • 循环获取锁

红锁算法

​ Redis集群状态下的问题:

  • 客户端A从master获取到锁
  • 在master将锁同步到slave之前,master宕机了
  • slave节点被晋级到master节点
  • 客户端B取得了同一个资源被客户端A已经获取到的另一个锁

安全失效!

​ 解决集群下锁失效,参照redis官方网站针对redlock文档:https://redis.io/topics/distlock

​ 在算法的分布式版本中,我们假设有N个Redis服务器。这些节点是完全独立的,因此我们不使用复制或任何其他隐式协调系统。在前面已经描述了如何在单个实例中安全地获取和释放锁,在分布式锁算法中,将使用相同的方法在单个实例中获取和释放锁。 将N设置为5是一个合理的值,因此需要在不同的计算机或虚拟机上运行5个Redis主从服务器,确保它们以独立的方式发生故障。

​ 为了获取锁,客户端执行以下操作:

  • 1、客户端以毫秒为单位获取当前时间的时间戳,作为起始时间
  • 2、客户端尝试在所有N个实例中顺序使用相同的键名、相同的随机值来获取锁定。每个实例尝试获取锁都需要时间,客户端应该设置一个远小于总锁定时间的超时时间。例如,如果自动释放锁时间为10秒,则尝试获取锁的超时时间可能在5到50毫秒之间。这样可以防止客户端长时间与处于故障状态的Redis节点进行通信:如果某个示例不可用,尽快尝试与下一个实例进行通信。
  • 3、客户端获取当前时间 减去 步骤1 中获取的起始时间,来计算获取锁所花费的时间。当且仅当客户端能够在大多数实例(至少3个)中获取锁时,并且获取锁所花费的总时间小于锁有效时间,则认为已获取锁。
  • 4、如果获取了锁,则将锁有效时间减去获取锁花费的时间,如步骤3中所计算
  • 5、如果客户端由于某种原因(无法锁定N / 2 + 1个实例或有效时间为负)而未能获得该锁,它将尝试解锁所有实例(即使没有锁定成功的实例)。

​ 每台计算机都有一个本地时钟,我们通常可以依靠不同的计算机产生很小的时钟漂移。只有在拥有锁的客户端将在锁的有效时间内(如步骤3中获得的)减去一段时间(仅几毫秒)的情况下终止工作,才能保证这一点。以补偿进程之间的时钟漂移。

​ 当客户端无法获取锁时,它应该在随机延迟后重试,以避免同时获取同一资源的多个客户端之间不同步(这可能会导致脑裂的情况:没人胜)。同样,客户端在大多数Redis实例中尝试获取锁的速度越快,出现裂脑情况(以及需要重试)的窗口就越小,因此在理想情况下,客户端应尝试将SET命令发送到N个实例同时使用多路复用。

​ 值得强调的是,对于未能获得大多数锁的客户端,尽快释放(部分)获得的锁有多么重要,这样就不必等待锁定期满才能再次获得锁(但是,如果发生了网络分区,并且客户端不再能够与Redis实例进行通信,则在等待密钥到期时需要付出可用性损失)。

Redisson中的分布式锁

(img-41TdSDuY-1685597872733)

​ Redisson是一个在Redis的基础上实现的Java驻内存数据网格(In-Memory Data Grid)。它不仅提供了一系列的分布式的Java常用对象,还提供了许多分布式服务。其中包括(BitSet、Set、Multimap、SortedSet、Map、List、Queue、BlockingQueue、Semaphore、Lock、AtomicLong、CountDownLatch、Publish/Subscribe、Bloom filter、Remote Service、Spring cache、Executor Service、Live Object Service、Scheduler Service)。Redisson提供了使用Redis的最简单和最便捷的方法。Redisson的宗旨是促进使用者对Redis的关注分离(Separation of Concern),从而让使用者能够将精力更集中地放在处理业务逻辑上。

(img-V6ljWwWY-1685597872737)

​ 官方文档地址:https://github.com/redisson/redisson/wiki

可重入锁(Reentrant Lock)

​ 基于Redis的Redisson分布式可重入锁RLock Java对象实现了java.util.concurrent.locks.Lock接口。

​ 众所周知,如果负责存储这个分布式锁的Redisson节点宕机以后,而且这个锁正好处于锁住的状态时,这个锁会出现锁死的状态。为了避免这种情况的发生,Redisson内部提供了一个监控锁的看门狗,它的作用是在Redisson示例被关闭前,不断的延长锁的有效期。默认情况下,看门狗检查锁的超时时间是30秒钟,也可以通过吸怪Config.lockWatchdogTimeout来另行指定。

RLock对象完全符合Java的Lock规范。也就是说只有锁的进程才能解锁,其他进程则会抛出IllegalMonitorStateException错误。

​ 另外Redisson还通过加锁的方法提供了leaseTime的参数来指定加锁的时间。超过这个时间锁便自动解开了。

RLock lock = redisson.getLock("anyLock");
// 最常见的使用方法
lock.lock();

// 加锁以后10秒钟自动解锁
// 无需调用unlock方法手动解锁
lock.lock(10, TimeUnit.SECONDS);

// 尝试加锁,最多等待100秒,上锁以后10秒自动解锁
boolean res = lock.tryLock(100, 10, TimeUnit.SECONDS);
if (res) {
   try {
     ...
   } finally {
       lock.unlock();
   }
}
  • 1、引入Redisson依赖
<!-- Redisson -->
<dependency>
    <groupId>org.redisson</groupId>
    <artifactId>redisson</artifactId>
    <version>3.21.3</version>
</dependency>
  • 2、添加配置
package tech.msop.distributed.lock.config;

import org.redisson.Redisson;
import org.redisson.api.RedissonClient;
import org.redisson.config.Config;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;

/**
 * Redisson 配置
 */
@Configuration
public class RedissonConfig {

    /**
     * Redisson 客户端配置
     *
     * @return Redisson客户端
     */
    @Bean
    public RedissonClient redissonClient() {
        // 初始化配置对象
        Config config = new Config();
        // 单机Redis服务
        config.useSingleServer()
                .setAddress("redis://127.0.0.1:6379") // redis服务地址,必须 redis://ip:port
//                .setDatabase(0)  // 指定Redis数据库编号
//                .setUsername("") // redis 用户名
//                .setPassword("")// redis 密码
//                .setConnectionMinimumIdleSize(10)// 连接池最小空闲连接数
//                .setConnectionPoolSize(50) // 连接池最大线程数
//                .setIdleConnectionTimeout(60000) // 线程超时时间
//                .setConnectTimeout() // 客户端获取redis 链接的超时时间
//                .setTimeout()// 响应超时时间
        ;

        return Redisson.create(config);
    }
}

  • 3、代码中使用
package tech.msop.distributed.lock.service.impl;

import com.baomidou.mybatisplus.extension.service.impl.ServiceImpl;
import lombok.extern.slf4j.Slf4j;
import org.redisson.api.RLock;
import org.redisson.api.RedissonClient;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.data.redis.core.StringRedisTemplate;
import org.springframework.stereotype.Service;
import tech.msop.distributed.lock.constants.StockConstant;
import tech.msop.distributed.lock.entity.StockEntity;
import tech.msop.distributed.lock.lock.DistributedLockClient;
import tech.msop.distributed.lock.lock.DistributedRedisLock;
import tech.msop.distributed.lock.mapper.StockMapper;
import tech.msop.distributed.lock.service.IStockService;

/**
 * 库存服务实现类 <br/>
 */
@Service
@Slf4j
public class StockServiceImpl extends ServiceImpl<StockMapper, StockEntity>
        implements IStockService {

    @Autowired
    private StringRedisTemplate redisTemplate;

    @Autowired
    private DistributedLockClient distributedLockClient;
    @Autowired
    private RedissonClient redissonClient;
    /**
     * 减库存
     */
    @Override
    public void checkAndLock() {
        RLock lock = redissonClient.getLock("lock");
        lock.lock();
        try {
            // 1. 查询库存信息
            String stock = redisTemplate.opsForValue().get("stock").toString();
            // 2. 判断库存是否充足
            if (stock != null && stock.length() != 0) {
                Integer st = Integer.valueOf(stock);
                if (st > 0) {
                    // 3.扣减库存
                    redisTemplate.opsForValue().set("stock", String.valueOf(--st));
                }
            }
        } finally {
            lock.unlock();
        }
    }
}

  • 4、使用jmeter进行压力测试并获取库存余量:0
公平锁(Fair Lock)

​ 基于Redis的Redisson分布式可重入公平锁也是实现了java.util.concurrent.locks.Lock接口的一种RLock对象。同时还提供了异步(Async)、反射式(Reactive)和RxJava2标准的接口。它保证了当多个Redisson客户端线程同时请求加锁时,优先分配给先发出请求的线程。所有请求线程会在一个队列中排队,当某个线程出现宕机时,Redisson会在等待5秒后继续下一个线程,也就是说如果前面5个线程都处于等待状态,那么后面的线程会等待至少25秒。

RLock fairLock = redissonClient.getFairLock("anyLock");
// 最常见的使用方法
fairLock.lock();

// 10秒钟以后自动解锁
// 无需调用unlock方法手动解锁
fairLock.lock(10, TimeUnit.SECONDS);

// 尝试加锁,最多等待100秒,上锁以后10秒自动解锁
boolean res = fairLock.tryLock(100, 10, TimeUnit.SECONDS);
fairLock.unlock();
联锁(MultiLock)(了解)

​ 基于Redis的Redisson分布式联锁RedissonMultiLock对象可以将多个RLock对象关联为一个联锁,每个RLock对象实例可以来自于不同的Redisson实例。

RLock lock1 = redissonInstance1.getLock("lock1");
RLock lock2 = redissonInstance2.getLock("lock2");
RLock lock3 = redissonInstance3.getLock("lock3");

RedissonMultiLock lock = new RedissonMultiLock(lock1, lock2, lock3);
// 同时加锁:lock1 lock2 lock3
// 所有的锁都上锁成功才算成功。
lock.lock();
...
lock.unlock();
红锁(RedLock)(了解)

​ 基于Redis的Redisson红锁RedissonRedLock对象实现了Redlock介绍的加锁算法。该对象也可以用来将多个RLock对象关联为一个红锁,每个RLock对象实例可以来自于不同的Redisson实例。

RLock lock1 = redissonInstance1.getLock("lock1");
RLock lock2 = redissonInstance2.getLock("lock2");
RLock lock3 = redissonInstance3.getLock("lock3");

RedissonRedLock lock = new RedissonRedLock(lock1, lock2, lock3);
// 同时加锁:lock1 lock2 lock3
// 红锁在大部分节点上加锁成功就算成功。
lock.lock();
...
lock.unlock();
读写锁(ReadWriteLock)

​ 基于Redis的Redisson分布式可重入读写锁RReadWriteLock Java对象实现了java.util.concurrent.locks.ReadWriteLock接口。其中读锁和写锁都继承了RLock接口。

​ 分布式可重入读写锁允许同时有多个读锁和一个写锁处于加锁状态。

RReadWriteLock rwlock = redisson.getReadWriteLock("anyRWLock");
// 最常见的使用方法
rwlock.readLock().lock();
// 或
rwlock.writeLock().lock();

// 10秒钟以后自动解锁
// 无需调用unlock方法手动解锁
rwlock.readLock().lock(10, TimeUnit.SECONDS);
// 或
rwlock.writeLock().lock(10, TimeUnit.SECONDS);

// 尝试加锁,最多等待100秒,上锁以后10秒自动解锁
boolean res = rwlock.readLock().tryLock(100, 10, TimeUnit.SECONDS);
// 或
boolean res = rwlock.writeLock().tryLock(100, 10, TimeUnit.SECONDS);
...
lock.unlock();

​ 添加StockController方法:

@GetMapping("test/read")
public String testRead(){
    String msg = stockService.testRead();

    return "测试读";
}

@GetMapping("test/write")
public String testWrite(){
    String msg = stockService.testWrite();

    return "测试写";
}

​ 添加StockService方法:

public String testRead() {
    RReadWriteLock rwLock = this.redissonClient.getReadWriteLock("rwLock");
    rwLock.readLock().lock(10, TimeUnit.SECONDS);

    System.out.println("测试读锁。。。。");
    // rwLock.readLock().unlock();

    return null;
}

public String testWrite() {
    RReadWriteLock rwLock = this.redissonClient.getReadWriteLock("rwLock");
    rwLock.writeLock().lock(10, TimeUnit.SECONDS);

    System.out.println("测试写锁。。。。");
    // rwLock.writeLock().unlock();

    return null;
}

打开开两个浏览器窗口测试:

  • 同时访问写:一个写完之后,等待一会儿(约10s),另一个写开始
  • 同时访问读:不用等待
  • 先写后读:读要等待(约10s)写完成
  • 先读后写:写要等待(约10s)读完成
信号量(Semaphore)

基于Redis的Redisson的分布式信号量(Semaphore)Java对象RSemaphore采用了与java.util.concurrent.Semaphore相似的接口和用法。同时还提供了异步(Async)、反射式(Reactive)和RxJava2标准的接口。

RSemaphore semaphore = redisson.getSemaphore("semaphore");
semaphore.trySetPermits(3);
semaphore.acquire();
semaphore.release();

在StockController添加方法:

@GetMapping("test/semaphore")
public String testSemaphore(){
    this.stockService.testSemaphore();

    return "测试信号量";
}

在StockService添加方法:

public void testSemaphore() {
    RSemaphore semaphore = this.redissonClient.getSemaphore("semaphore");
    semaphore.trySetPermits(3);
    try {
        semaphore.acquire();

        TimeUnit.SECONDS.sleep(5);
        System.out.println(System.currentTimeMillis());

        semaphore.release();
    } catch (InterruptedException e) {
        e.printStackTrace();
    }
}

​ 添加测试用例:并发10次,循环一次

(img-brg5FRge-1685597872741)

控制台效果:

控制台1:
1606960790234
1606960800337
1606960800443
1606960805248

控制台2:
1606960790328
1606960795332
1606960800245

控制台3:
1606960790433
1606960795238
1606960795437

由此可知:

1606960790秒有3次请求进来:每个控制台各1次

1606960795秒有3次请求进来:控制台2有1次,控制台3有2次

1606960800秒有3次请求进来:控制台1有2次,控制台2有1次

1606960805秒有1次请求进来:控制台1有1次

闭锁(CountDownLatch)

​ 基于Redisson的Redisson分布式闭锁(CountDownLatch)Java对象RCountDownLatch采用了与java.util.concurrent.CountDownLatch相似的接口和用法。

RCountDownLatch latch = redisson.getCountDownLatch("anyCountDownLatch");
latch.trySetCount(1);
latch.await();

// 在其他线程或其他JVM里
RCountDownLatch latch = redisson.getCountDownLatch("anyCountDownLatch");
latch.countDown();

​ 需要两个方法:一个等待,一个计数countDown

​ 给StockController添加测试方法:

@GetMapping("test/latch")
public String testLatch(){
    stockService.testLatch();

    return "班长锁门。。。";
}

@GetMapping("test/countdown")
public String testCountDown(){
    stockService.testCountDown();

    return "出来了一位同学";
}

​ 给StockService添加测试方法:

public void testLatch() {
    RCountDownLatch cdl = this.redissonClient.getCountDownLatch("cdl");
    cdl.trySetCount(6);

    try {
        cdl.await();
    } catch (InterruptedException e) {
        e.printStackTrace();
    }
}

public void testCountDown() {
    RCountDownLatch cdl = this.redissonClient.getCountDownLatch("cdl");
    cdl.countDown();
}

​ 重启测试,打开两个页面:当第二个请求执行6次之后,第一个请求才会执行。

(img-1TvjlLOJ-1685597872743)

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.coloradmin.cn/o/597042.html

如若内容造成侵权/违法违规/事实不符,请联系多彩编程网进行投诉反馈,一经查实,立即删除!

相关文章

ES6-ES13学习笔记(5.0)

ES2022的函数 //findLast findLastIndex() ES2022 发现在电脑自带的联想浏览器不支持此函数&#xff0c;还报错了 对于ECMA的支持还和浏览器有关以及浏览器版本有关&#xff0c;然后我使用Google浏览器就可以正常使用&#xff0c; 1.扩展运算符&#xff1a;三个点... ..…

第一行代码 第十三章 高级技巧

第13章 高级技巧 全局获取Context的技巧 回想这么久以来我们所学的内容&#xff0c;你会发现有很多地方都需要用到Context&#xff0c;弹出Toast的时候需要&#xff0c;启动活动的时候需要&#xff0c;发送广播的时候需要&#xff0c;操作数据库的时候需要&#xff0c;使用通…

图解LeetCode链表题(中等)剖析

文章目录 &#x1f490;文章导读&#x1f490;1.合并零之间的结点解题思路 &#x1f490;2.链表中最大孪生和解题思路 &#x1f490;3.链表的随机节点解题思路 &#x1f490;4.复杂链表的复制解题思路 &#x1f490;5.两辆交换两表中的节点解题思路 &#x1f490;文章导读 &…

关于python pycharm中输出的内容不全的解决办法

控制台输出&#xff1a; 解决方案&#xff1a; pandas 库 # 显示所有列 pd.set_option(display.max_columns, None) # 显示所有行 pd.set_option(display.max_rows, None) # 设置value的显示长度 pd.set_option(max_colwidth, 100) # 设置1000列时才换行 pd.set_option…

DataFrame/字典/列表之间的相互转换

DataFrame —> 字典 参考&#xff1a;pandas关于to_dict的使用_pandas to_dict_曼珠沙华Devil的博客-CSDN博客 pandas提供了 DataFrame.to_dict() 函数&#xff0c;将DataFrame类型转化为字典类型 DataFrame.to_dict(orientdict) # orient 可省略 对于写入的orient不同…

闭包基本知识汇总

闭包基本知识汇总 一、什么是闭包&#xff1f; 闭包是指有权限访问另一个函数作用域中的变量的函数&#xff0c;在Javascript中&#xff0c;只有函数内部的子函数才能读取局部变量&#xff0c;因此可以把闭包简单理解成 “定义在一个函数内部的函数” 。所以&#xff0c;在本…

AI当道,元宇宙赛道是风口还是噱头?

一个新概念的诞生往往要经过无数次的锤炼&#xff0c;宛如一场漫长、深刻的头脑风暴。而发展到今天&#xff0c;处在风口之上&#xff0c;各行各业都急切往元宇宙概念靠拢&#xff0c;元宇宙已经与资本市场共舞。 伴随着全球多家行业巨头的布局以及元宇宙在游戏领域的率先落地…

c#快速入门(上)

欢迎来到Cefler的博客&#x1f601; &#x1f54c;博客主页&#xff1a;那个传说中的man的主页 &#x1f3e0;个人专栏&#xff1a;题目解析 &#x1f30e;推荐文章&#xff1a;题目大解析2 目录 &#x1f449;&#x1f3fb; c#和c不同之处&#x1f449;&#x1f3fb;程序文件的…

华为OD机试真题 Java 实现【简单的解压缩算法】【2023Q1 200分】,附详细解题思路

一、题目描述 现需要实现一种算法&#xff0c;能将一组压缩字符串还原成原始字符串&#xff0c;还原规则如下&#xff1a; 1、字符后面加数字N&#xff0c;表示重复字符N次。例如&#xff1a;压缩内容为A3&#xff0c;表示原始字符串为AAA。 2、花括号中的字符串加数字N&…

tcpdump 抓包工具详细图文教程(上)

目录 一、tcpdump 抓包工具的基本介绍和学习基础 1.1 常用的抓包工具 1.2 tcpdump 抓包工具介绍 二、tcpdump 抓包工具使用环境和初体验 2.1 编译安装 tcpdump 2.2 抓包 三、讲解 TCP 协议报文报头 四、tcpdump 抓包工具常规过滤规则 4.1 tcpdump 的 host 和 net 过…

Flutter 又一元老离职,感谢 Tim 这些年的付出

前天在 insiders 收到 Tim Sneath 的离职邮件时感觉很震惊&#xff0c;因为他绝对是 Flutter 团队的元老级人物&#xff0c;几乎每次一次 Flutter 版本发布和社区活动都有他的身影&#xff0c;可以说他是我的 Flutter 领路人之一。 Tim 是在 2017 加入 Flutter 团队&#xff0…

SpringCloud微服务踩坑系列:UnknownContentTypeException

错误信息如下&#xff1a; org.springframework.web.client.UnknownContentTypeException: Could not extract response: no suitable HttpMessageConverter found for response type [class com.cyf.internalCommon.dto.ResponseResult] and content type [text/plain;charset…

使用WordPress提高企业敏捷性

喜欢WordPress的原因有很多&#xff1a;该平台非常适合内容管理以及控制预算。此外&#xff0c; 在 提高开发效率和简化项目管理方面&#xff0c;WordPress可以通过多种方式提供帮助。 对于任何企业业务&#xff0c;目标始终是在不影响质量的情况下更快地启动项目、发布修复和…

day3 -- select语句学习

文章目录 数据库和表的准备selectselect order byselect whereselect 高级过滤操作使用通配符进行过滤使用正则表达式进行搜索 数据库和表的准备 下载《mysql必知必会》提供的脚本用于创建样例表 cd /mnt/d/unix_dir wget https://forta.com/wp-content/uploads/books/067…

【Linux从入门到精通】进程的状态

当我们了解到进程是什么东西后&#xff0c;我们再来看看进程都会有那些状态。本篇文章会对进程的不同状态进行详解&#xff0c;希望会对你的理解有所帮助&#xff01; 文章目录 一、了解进程的不同状态 二、详解进程的不同状态 2、1 R运行状态&#xff08;running&#xff09; …

开源赋能 普惠未来|中软国际寄语 2023 开放原子全球开源峰会

中软国际作为行业领先的全球化软件与信息技术服务企业及数字化转型服务商&#xff0c;近年来积极布局开源生态&#xff08;OpenHarmony、openEuler&#xff09;、智能云、ERP、AIGC、教育科技、智能车六大赛道&#xff0c;加速业务转型创新。 中软国际为开放原子开源基金会白金…

文字环绕图片效果实现

书接上回&#xff0c;我们来讲讲如何实现“文字环绕图片”的效果吧。整体预计实现的效果如下&#xff1a; 日常杂谈 我喜欢看动漫&#xff0c;接下来的所有博客都会和我日常生活结合在一起写&#xff0c;这样感觉会让自己的博客会有温度&#xff0c;我感觉每个人都应该有自己的…

最新喜讯|易知微入选2023数字孪生解决方案提供商TOP50

近日&#xff0c;互联网周刊发布《2023数字孪生解决方案提供商TOP50》&#xff0c;杭州易知微科技有限公司在榜。 中国科学院主管的《互联网周刊》&#xff0c;创刊于1998年&#xff0c;是中国最具公信力的杂志之一&#xff0c;其颁布的榜单极具权威性与专业度&#xff0c;对产…

基于非局部图注意力网络的鲁棒三维形状分类

文章目录 Robust 3D Shape Classification via Non-local Graph Attention Network摘要本文方法Global Structure Network (GSN)Global Relationship Network (GRN)Local Feature Learning based on MLP-STNetwork Channel Fusion ModuleGlobal descriptor 实验结果 Robust 3D …

分布式锁的应用场景与分布式锁实现(四):基于MySQL实现分布式锁与分布式锁总结

分布式锁的应用场景与分布式锁实现&#xff08;三&#xff09;&#xff1a;基于Zookeeper实现分布式锁 基于MySQL实现分布式锁 ​ 不管是JVM锁还是MySQL锁&#xff0c;为了保证线程的并发安全&#xff0c;都提供了悲观独占排他锁。所以独占排他也是分布式锁的基本要求。 ​ …