※Redis的事务、乐观锁和悲观锁

news2025/1/12 21:08:12

1.是神魔

 在高并发的环境下,多个线程去竞争同一个资源, 
 比较常见的有高铁抢票系统,商品秒杀系统等,
 我们需要保证数据正确,同时系统的吞吐也要尽可能高。

2.解决方案

1.  一般多线程同步我们就会想到加锁,用synchornized关键字给并发代码块加锁,
2. 但是在我们的业务场景中,比如高铁抢票,有很多张不同的票,但是synchornized锁住     了秒杀那个代码块,
3. 所有的票全都上了这一把锁,这么看锁的力度还是太大了,其实我们只需要分别锁住每张票,比如根据票id来上锁
4. 这样锁刚刚好。我们可以通过Redis来实现分布式锁,下面分别介绍悲观锁和乐观锁。

3.reids 乐观锁

乐观锁顾名思义,就是很乐观,我看到了一件商品库存是1,我觉得这件商品很少人抢,我尝试去下单,当我提交订单的时候,如果商品的库存还是1,那就说明在我之前没人下单,我就可以成功下单,否则的话就下单失败。

在写操作较少的情况下,乐观锁是优于悲观锁的,吞吐量会大大提高,但是写操作比较频繁的话就会导致一直重试,如果重试代价较高则不推荐用乐观锁。

Redis的乐观锁主要是通过watch()来实现的,watch()的作用是监视键值对,首先是用multi()开启事务,exec()提交事务,提交事务的时候如果发现键值对的值发生变化则会取消事务,下面是一个测试例子:

/**
 * 初始化商品
 */
private static void initProduct() {
    // 商品个数
    int prdNum = 3;
    String key = "prdNum";
    String customers = "customers";
    Jedis jedis = RedisUtil.getInstance().getJedis();
    if(jedis.exists(key)){
        jedis.del(key);
    }
    if(jedis.exists(customers) != null){
        jedis.del(customers);
    }
    jedis.set(key, String.valueOf(prdNum));
    RedisUtil.returnResource(jedis);
}

/**
 * 顾客抢购商品(秒杀操作)
 */
private static void initClient() throws InterruptedException {
    ExecutorService cachedThreadPool = Executors.newCachedThreadPool();
    int num = 10;
    for(int i=1; i<=num; i++){
        cachedThreadPool.execute(new Customer(i)); //启动与顾客数目相等的线程
    }
    cachedThreadPool.shutdown();
    while(true){
        if(cachedThreadPool.isTerminated()){
            System.out.println("所有的消费组线程均结束了!");
            break;
        }
        Thread.sleep(100);
    }

}

/**
 * 打印结果
 */
private static void printResult() {
    Jedis jedis = RedisUtil.getInstance().getJedis();
    Set<String> customers = jedis.smembers("customers");
    int i = 1;
    for (String customer : customers) {
        System.out.println("第"+(i++)+"个抢到商品的顾客:"+customer);
    }
    RedisUtil.returnResource(jedis);
}

static class Customer implements Runnable{
    private String name;
    private Jedis jedis = null;
    private String key = "prdNum";
    private String cust = "customers";

    public Customer(int name) {
        this.name = "编号=" + name;
    }

    /**
     * multi:开启redis事务,置客户端为事务态
     * exec:提交事务,执行从multi到此命令之前的命令队列,置客户端为非事务态
     * discard:取消事务,置客户端为非事务态
     * watch:监视键值对,exec提交事务时发现监视的键值发生变化则取消事务
     */
    @Override
    public void run() {

        // 随机睡眠一下
        try {
            Thread.sleep((int)Math.random()*5000);
        } catch (InterruptedException e) {
            e.printStackTrace();
        }

        while(true){
            System.out.println("顾客:" + name + "开始抢购商品!");
            jedis = RedisUtil.getInstance().getJedis();
            try{
                // 监视键值对
                jedis.watch(key);
                int prdNum = Integer.parseInt(jedis.get(key));
                if(prdNum > 0){
                    // 开启事务
                    Transaction transaction = jedis.multi();
                    transaction.set(key, String.valueOf(prdNum - 1));
                    // 提交事务
                    List<Object> result = transaction.exec();
                    if(result == null || result.isEmpty()){
                        System.out.println("很抱歉,顾客" + name + "没有抢购到商品!");
                    }else{
                        // 抢到商品的话记录一下
                        jedis.sadd(cust, name);
                        System.out.println("恭喜顾客" + name + "抢到商品!");
                        break;
                    }
                }else{
                    System.out.println("很抱歉,库存为0,顾客:" + name + "没有抢到商品");
                    break;
                }
            }catch (Exception e){

            }finally {
                jedis.unwatch();
                RedisUtil.returnResource(jedis);
            }
        }

    }
}
    public static void main(String[] args) throws InterruptedException {
    long startTime = System.currentTimeMillis();
    initProduct();
    initClient();
    printResult();
    long endTime = System.currentTimeMillis();
    long time = endTime - startTime;
    System.out.println("程序运行时间:" + (int )time + "ms");
}

在这里插入图片描述
在这里插入图片描述

Redis悲观锁

所谓悲观锁,就是比如有很多人抢同一件商品,这件商品的id是101,我就把商品id作为key加一把锁,锁住对商品库存进行写操作的代码块,如果有人来抢,首先需要尝试获取这把锁,才能对商品库存进行写的操作。

当商品库存减少成功之后,再把锁释放,这样下一个人就再进来扣除这件商品的库存了。

当然如果有人一直持有锁不放怎么办呢,就是所谓的“站着茅坑不拉shi”,我们可以设定锁的过期时间。所以悲观锁中的锁就是一个唯一标识的锁和该锁的过期时间,Redis实现悲观锁是用setNX操作实现的阻塞式分布式锁。

setNX

Redis的setNX就是设置一个key的value时,如果该key已经有value,则返回1,否则返回0。

redis> EXISTS job # job 不存在 (integer) 0

redis> SETNX job “programmer” # job 设置成功 (integer) 1

redis> SETNX job “code-farmer” # 尝试覆盖 job ,失败 (integer) 0

redis> GET job # 没有被覆盖 “programmer”

我们可以用它来实现redis分布式锁,把锁过期时间作为value:

SETNX lock.foo <current Unix time + lock timeout + 1>

当有多个线程来set“lock.foo”的值时,如果返回1说明获取到了锁,键值是锁的过期时间(当前时间+锁的有效时间),返回0则可以设定一个循环不断setNX来获取锁,而释放锁则是直接把这个key删除即可。在一个进程持有锁期间,其他进程还会不断调用“GET lock.foo”命令来检测锁是否超时。

下面是Redis悲观锁实现示例:

AbstractLock

public abstract class AbstractLock implements Lock {
/**
 * <pre>
 * 这里需不需要保证可见性值得讨论, 因为是分布式的锁,
 * 1.同一个jvm的多个线程使用不同的锁对象其实也是可以的, 这种情况下不需要保证可见性
 * 2.同一个jvm的多个线程使用同一个锁对象, 那可见性就必须要保证了.
 * </pre>
 */
protected volatile boolean locked;

/**
 * 当前jvm内持有该锁的线程(if have one)
 */
private Thread exclusiveOwnerThread;

public void lock() {
    try {
        lock(false, 0, null, false);
    } catch (InterruptedException e) {
        // TODO ignore
    }
}

public void lockInterruptibly() throws InterruptedException {
    lock(false, 0, null, true);
}

@Override
public boolean tryLock(long time, TimeUnit unit) {
    try {
        System.out.println("ghggggggggggggg");
        return lock(true, time, unit, false);
    } catch (InterruptedException e) {
        e.printStackTrace();
        System.out.println("" + e);
    }
    return false;
}

public boolean tryLockInterruptibly(long time, TimeUnit unit) throws InterruptedException {
    return lock(true, time, unit, true);
}

public void unlock() {
    // TODO 检查当前线程是否持有锁
    if (Thread.currentThread() != getExclusiveOwnerThread()) {
        throw new IllegalMonitorStateException("current thread does not hold the lock");
    }

    unlock0();
    setExclusiveOwnerThread(null);
}

protected void setExclusiveOwnerThread(Thread thread) {
    exclusiveOwnerThread = thread;
}

protected final Thread getExclusiveOwnerThread() {
    return exclusiveOwnerThread;
}

protected abstract void unlock0();

/**
 * 阻塞式获取锁的实现
 *
 * @param useTimeout 是否超时
 * @param time 获取锁的等待时间
 * @param unit 时间单位
 * @param interrupt
 *            是否响应中断
 * @return
 * @throws InterruptedException
 */
protected abstract boolean lock(boolean useTimeout, long time, TimeUnit unit, boolean interrupt)
        throws InterruptedException;
		}

RedisBasedDistributedLock

public class RedisBasedDistributedLock extends AbstractLock {
private Jedis jedis;
protected String lockKey;//锁的名字
protected long lockExpire;//锁的有效时长(毫秒)

public RedisBasedDistributedLock(Jedis jedis,String lockKey,long lockExpire){
    this.jedis = jedis;
    this.lockKey = lockKey;
    this.lockExpire = lockExpire;
}

@Override
public boolean tryLock() {
    long lockExpireTime = System.currentTimeMillis() + lockExpire + 1;//锁超时时间
    String stringOfLockExpireTime = String.valueOf(lockExpireTime);
    if (jedis.setnx(lockKey, stringOfLockExpireTime) == 1) {//获取到锁
        //设置相关标识
        locked = true;
        setExclusiveOwnerThread(Thread.currentThread());
        return true;
    }
    String value = jedis.get(lockKey);
    if (value != null && isTimeExpired(value)) {//锁是过期的
        //假设多个线程(非单jvm)同时走到这里
        String oldValue = jedis.getSet(lockKey, stringOfLockExpireTime);//原子操作
        // 但是走到这里时每个线程拿到的oldValue肯定不可能一样(因为getset是原子性的)
        // 假如拿到的oldValue依然是expired的,那么就说明拿到锁了
        if (oldValue != null && isTimeExpired(oldValue)) {//拿到锁
            //设置相关标识
            locked = true;
            setExclusiveOwnerThread(Thread.currentThread());
            return true;
        }
    }
    return false;
}

@Override
public Condition newCondition() {
    // TODO Auto-generated method stub
    return null;
}

/**
 * 锁未过期,释放锁
 */
@Override
protected void unlock0() {
    // 判断锁是否过期
    String value = jedis.get(lockKey);
    if (!isTimeExpired(value)) {
        doUnlock();
    }
}

/**
 * 释放锁
 * @date 2017-10-18
 */
public void doUnlock() {
    jedis.del(lockKey);
}

/**
 * 查询当前的锁是否被别的线程占有
 * @return 被占有true,未被占有false
 * @date 2017-10-18
 */
public boolean isLocked(){
    if (locked) {
        return true;
    }else {
        String value = jedis.get(lockKey);
        // TODO 这里其实是有问题的, 想:当get方法返回value后, 假设这个value已经是过期的了,
        // 而就在这瞬间, 另一个节点set了value, 这时锁是被别的线程(节点持有), 而接下来的判断
        // 是检测不出这种情况的.不过这个问题应该不会导致其它的问题出现, 因为这个方法的目的本来就
        // 不是同步控制, 它只是一种锁状态的报告.
        return !isTimeExpired(value);
    }
}

/**
 * 检测时间是否过期
 * @param value
 * @return
 * @date 2017-10-18
 */
public boolean isTimeExpired(String value) {
    return Long.parseLong(value) < System.currentTimeMillis();
}

/**
 * 阻塞式获取锁的实现
 * @param useTimeout 是否超时
 * @param time 获取锁的等待时间
 * @param unit 时间单位
 * @param interrupt
 *            是否响应中断
 */
protected boolean lock(boolean useTimeout, long time, TimeUnit unit, boolean interrupt) throws InterruptedException {
    if (interrupt) {
        checkInterruption();
    }
    long start = System.currentTimeMillis();
    long timeout = unit.toMillis(time);//
    while (useTimeout ? isTimeout(start, timeout) : true) {
        if (interrupt) {
            checkInterruption();
        }
        long lockExpireTime = System.currentTimeMillis() + lockExpire + 1;//锁的超时时间
        String stringLockExpireTime = String.valueOf(lockExpireTime);
        if (jedis.setnx(lockKey, stringLockExpireTime) == 1) {//获取到锁
            //成功获取到锁,设置相关标识
            locked = true;
            setExclusiveOwnerThread(Thread.currentThread());
            return true;
        }
    }
    return false;
}

/*
 * 判断是否超时(开始时间+锁等待超时时间是否大于系统当前时间)
 */
public boolean isTimeout(long start, long timeout) {
    return start + timeout > System.currentTimeMillis();
}

/*
 * 检查当前线程是否阻塞
 */
public void checkInterruption() throws InterruptedException {
    if (Thread.currentThread().isInterrupted()) {
        throw new InterruptedException();
    }
}
}

Test

static class PessCustomer implements Runnable{
    private String name;
    private Jedis jedis = null;
    private String key = "prdNum";
    private String cust = "customers";
    private RedisBasedDistributedLock redisBasedDistributedLock;

    public PessCustomer(int num) {
        this.name = "编号=" + num;
        init();
    }

    private void init() {
        jedis = RedisUtil.getInstance().getJedis();
        redisBasedDistributedLock = new RedisBasedDistributedLock(jedis, "lock.lock", 5*1000);
    }

    @Override
    public void run() {
        try {
            Thread.sleep(1000);
        } catch (InterruptedException e) {
            e.printStackTrace();
        }

        while(true){
            if(Integer.parseInt(jedis.get(key)) < 0){
                break;
            }
            System.out.println("顾客:" + name + "开始抢购商品!");
            if(redisBasedDistributedLock.tryLock()){
                int prdNum = Integer.parseInt(jedis.get(key));
                if(prdNum > 0){
                    jedis.decr(key); // 商品数减一
                    jedis.sadd(cust, name);
                    System.out.println("恭喜顾客:" + name + "抢购到商品!");
                }else{
                    System.out.println("抱歉,库存为0,顾客:" + name + "没有抢到商品!");
                }
                redisBasedDistributedLock.unlock0(); // 释放锁
                break;
            }
        }

        // 释放资源
        redisBasedDistributedLock = null;
        RedisUtil.returnResource(jedis);

    }
}

在这里插入图片描述

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.coloradmin.cn/o/735816.html

如若内容造成侵权/违法违规/事实不符,请联系多彩编程网进行投诉反馈,一经查实,立即删除!

相关文章

c语言进阶-文件的打开和读写

本节重点知识点 为什么使用文件 什么是文件 文件名的组成 操作文件的基本过程 文件的打开与关闭 文件打开函数&#xff1a; 参数介绍 打开文件的方式&#xff1a; 使用绝对路径和相对路径都可以打开文件 文件的顺序读写函数&#xff1a; 写文件模式下&#xff0c;在打开文件fo…

-XX:NewSize=20m -XX:MaxNewSize=40m,-Xmn30m,-XX:NewRatio=5

高优先级&#xff1a;-XX:NewSize -XX:MaxNewSize设置新生代初始值&#xff0c;最大值中优先级&#xff1a;-Xmn&#xff08;默认等效 -Xmn-XX:NewSize-XX:MaxNewSize?&#xff09;设置新生代初始值-XX:NewRatio设置老年代和新生代的比例&#xff1b;例如&#xff1a;-XX:NewR…

CMake+OpenMP加速运算测试

目录 写在前面代码编译运行关于加速效果参考完 写在前面 1、本文内容 cmake编译测试openmp的效果 2、平台/环境 windows/linux均可&#xff0c;cmake 3、转载请注明出处&#xff1a; https://blog.csdn.net/qq_41102371/article/details/131629705 代码 代码包含同样的for循…

Dockerfile自定义镜像 - 基于 java:8-alpine 镜像,将一个Java项目构建为镜像

目录 一、前置知识 1.镜像结构 2.Dockerfile是什么 二、自定义一个 java 项目镜像 1.创建一个空目录&#xff0c;在这个空目录中创建一个文件&#xff0c;命名为 DockerFile&#xff0c;最后将 java 项目打包成 jar 包&#xff0c;放到这个目录中 2.编写 Dockerfile 文件 …

Vue3+Vite项目引入Element-plus并配置按需自动导入

一、安装Element-plus # 选择一个你喜欢的包管理器# NPM $ npm install element-plus --save# Yarn $ yarn add element-plus# pnpm $ pnpm install element-plus我使用的是 pnpm&#xff0c;并且顺便将 element-plus/icons一起引入 pnpm install element-plus element-plus/…

Python开启Http Server

用 Python 部署了一个具有 FTP 功能的服务器&#xff0c;电脑在局域网内通过 FTP 下载想要传输的文件。 注&#xff1a;这种方法不仅在自己家的路由器上可行&#xff0c;亲测在下面两种场景也可行&#xff1a; 需要用手机验证码连接的公共 WIFI 上&#xff1b;用手机开热点&a…

Kubernetes的Pod中进行容器初始化

Kubernetes的Pod中进行容器初始化 在很多应用场景中&#xff0c;应用在启动之前都需要进行如下初始化操作&#xff1a; 等待其他关联组件正确运行(例如数据库或某个后台服务)。 基于环境变量或配置模板生成配置文件。 从远程数据库获取本地所需配置&#xff0c;或者将自身注…

将一个3x3的OpenCV旋转矩阵转换为Eigen的Euler角

代码将一个3x3的OpenCV旋转矩阵转换为Eigen的Euler角。 #include <iostream> #include <Eigen/Core> #include <Eigen/Geometry> #include <opencv2/core.hpp>using

Sharding-JDBC【Sharding-JDBC介绍、数据分片剖析实战】(一)-全面详解(学习总结---从入门到深化)

目录 Sharding-JDBC介绍 数据分片剖析实战 Sharding-JDBC介绍 背景 随着通信技术的革新&#xff0c;全新领域的应用层出不穷&#xff0c;数据存量随着应 用的探索不断增加&#xff0c;数据的存储和计算模式无时无刻不面临着创新。面向交易、大数据、关联分析、物联网等场景…

初始 Redis - 分布式,内存数据存储,缓存

目录 1. 什么是 Redis 1.1 Redis 内存数据存储 1.2 Redis 用作数据库 1.3 Redis 用作缓存 (cache) 1.4 用作消息中间件 1. 什么是 Redis The open source , in-memory data store used by millions of developers as a database, cache, streaming engine, and message br…

时间序列预测 | Matlab基于自回归移动平均模型(ARMA模型)时间序列预测

文章目录 效果一览文章概述部分源码参考资料效果一览 文章概述 时间序列预测 | Matlab基于自回归移动平均模型(ARMA模型)时间序列预测,单列数据输入模型 评价指标包括:MAE、RMSE和R2等,代码质量极高,方便学习和替换数据。要求2018版本及以上。 部分源码 %% 清空环境变量 w…

zookeper第二三课-Zookeeper经典应用场景实战

1. Zookeeper Java客户端实战 ZooKeeper应用的开发主要通过Java客户端API去连接和操作ZooKeeper集群。可供选择的Java客户端API有&#xff1a; ZooKeeper官方的Java客户端API。第三方的Java客户端API&#xff0c;比如Curator。 ZooKeeper官方的客户端API提供了基本的操作。例…

MFS分布式文件系统

MFS分布式文件系统 应用背景 公司之前的图片服务器采用的是 NFS&#xff0c;随着业务量增加&#xff0c;多台服务器通过 NFS方式共享一个服务器的存储空间&#xff0c;使得 NFS 服务器不堪重负&#xff0c;经常出现超时问题。 而且NFS存在着单点故障问题&#xff0c;尽管可以…

C++STL:顺序容器之vector

文章目录 1. 概述2. 成员函数3. 创建 vector 容器的几种方式4. 迭代器vector容器迭代器的基本用法vector容器迭代器的独特之处 5. 访问元素5.1 访问vector容器中单个元素5.2 访问vector容器中多个元素 6. 添加元素6.1 push_back()6.2 emplace_back()6.3 emplace_back()和push_b…

[元带你学: eMMC协议 24] eMMC Packed Command CMD23读(Read) 写(write) 操作详解

依JEDEC eMMC及经验辛苦整理&#xff0c;原创保护&#xff0c;禁止转载。 专栏 《元带你学&#xff1a;eMMC协议》 内容摘要 全文 3200 字&#xff0c; 主要内容 目录 前言 1 Packed Commands 有什么用处&#xff1f; 2 Packed Commands 怎么实现&#xff1f; Packed Wri…

【爬虫】5.4 Selenium 实现用户登录

目录 任务目标 创建模拟网站 创建服务器程序 键盘输入动作 鼠标点击动作 编写爬虫程序 任务目标 Selenium 查找的 HTML 元素是一个 WebElemen t对象&#xff0c; 这个对象不但可以获取元素的属性值&#xff0c;而且还能执行一 些键盘输入send_keys()与鼠标点击click()的动…

leetcode 617. 合并二叉树

2023.7.9 这题要求合并两二叉树&#xff0c;若节点重叠则将节点值相加。 和之前不同的是需要同时对两棵树进行操作&#xff0c;我选用队列来做这题。 大致思路&#xff1a;通过遍历两棵树的对应节点&#xff0c;将节点值相加并合并到第一棵树上。如果某个节点为空&#xff0c;…

Squid 缓存服务器

Squid 缓存服务器 作为应用层的代理服务软件&#xff0c;Squid 主要提供缓存加速和应用层过滤控制的功能 ☆什么是缓存代理 当客户机通过代理来请求 Web 页面时 指定的代理服务器会先检查自己的缓存&#xff0c;如果缓存中已经有客户机需要访问的页面&#xff0c;则直接将缓…

Qt实现思维导图功能(五)

前文链接&#xff1a;Qt实现思维导图功能&#xff08;四&#xff09; 思维导图纵向分布模式&#xff1a;模式一 百度网盘体验地址&#xff1a; 链接&#xff1a;https://pan.baidu.com/s/1YNSBiFOUwnSSKvHsBvOT3g 提取码&#xff1a;ifyc动态演示效果 静态展示图片 前文BUG维…

Linux 网络IO管理(epoll实现)

文章目录 1、网络IO模型1.1、阻塞IO&#xff08;blocking IO&#xff09;1.2、非阻塞IO&#xff08;non-blocking IO&#xff09;1.3、多路复用 IO&#xff08;IO multiplexing&#xff09;1.4、异步 IO&#xff08;Asynchronous I/O&#xff09;1.5、信号驱动 IO&#xff08;s…