剑指JUC原理-16.读写锁

news2024/11/29 2:35:38
  • 👏作者简介:大家好,我是爱吃芝士的土豆倪,24届校招生Java选手,很高兴认识大家
  • 📕系列专栏:Spring源码、JUC源码
  • 🔥如果感觉博主的文章还不错的话,请👍三连支持👍一下博主哦
  • 🍂博主正在努力完成2023计划中:源码溯源,一探究竟
  • 📝联系方式:nhs19990716,加我进群,大家一起学习,一起进步,一起对抗互联网寒冬👀

文章目录

    • ReentrantReadWriteLock
    • 应用之缓存
      • 缓存更新策略
        • 先清缓存
      • 读写锁实现一致性缓存
    • 读写锁原理
      • t1 w.lock,t2 r.lock
      • t3 r.lock,t4 w.lock
      • t1 w.unlock
      • t2 r.unlock,t3 r.unlock
    • StampedLock

ReentrantReadWriteLock

其定义就是支持冲入的读写锁,本质上也就是基于 ReentrantLock 实现的

当读操作远远高于写操作时,这时候使用 读写锁 让 读-读 可以并发,提高性能。

类似于数据库中的 select … from … lock in share mode

提供一个 数据容器类 内部分别使用读锁保护数据的 read() 方法,写锁保护数据的 write() 方法

class DataContainer {
    private Object data;
    private ReentrantReadWriteLock rw = new ReentrantReadWriteLock();
    private ReentrantReadWriteLock.ReadLock r = rw.readLock();
    private ReentrantReadWriteLock.WriteLock w = rw.writeLock();
    public Object read() {
        log.debug("获取读锁...");
        r.lock();
        try {
            log.debug("读取");
            sleep(1);
            return data;
        } finally {
            log.debug("释放读锁...");
            r.unlock();
        }
    }
    public void write() {
        log.debug("获取写锁...");
        w.lock();
        try {
            log.debug("写入");
            sleep(1);
        } finally {
            log.debug("释放写锁...");
            w.unlock();
        }
    }
}

测试 读锁-读锁 可以并发

		DataContainer dataContainer = new DataContainer();
        new Thread(() -> {
            dataContainer.read();
        }, "t1").start();
        new Thread(() -> {
            dataContainer.read();
        }, "t2").start();

输出结果,从这里可以看到 Thread-0 锁定期间,Thread-1 的读操作不受影响

14:05:14.341 c.DataContainer [t2] - 获取读锁... 
14:05:14.341 c.DataContainer [t1] - 获取读锁... 
14:05:14.345 c.DataContainer [t1] - 读取
14:05:14.345 c.DataContainer [t2] - 读取
14:05:15.365 c.DataContainer [t2] - 释放读锁... 
14:05:15.386 c.DataContainer [t1] - 释放读锁... 

测试 读锁-写锁 相互阻塞

		DataContainer dataContainer = new DataContainer();
        new Thread(() -> {
            dataContainer.read();
        }, "t1").start();
        Thread.sleep(100);
        new Thread(() -> {
            dataContainer.write();
        }, "t2").start();

输出结果

14:04:21.838 c.DataContainer [t1] - 获取读锁... 
14:04:21.838 c.DataContainer [t2] - 获取写锁... 
14:04:21.841 c.DataContainer [t2] - 写入
14:04:22.843 c.DataContainer [t2] - 释放写锁... 
14:04:22.843 c.DataContainer [t1] - 读取
14:04:23.843 c.DataContainer [t1] - 释放读锁... 

写锁-写锁 也是相互阻塞的,这里就不测试了

注意事项

  • 读锁不支持条件变量

ReentrantReadWriteLock 中的读锁不支持条件变量,主要是因为读锁在 ReentrantReadWriteLock 中是共享的,多个线程可以同时持有读锁来访问共享资源。条件变量通常用于在多线程环境下实现线程间的协调和通信,而读锁的共享特性可能导致条件变量的信号在多个线程之间产生歧义或不确定性。

  • 重入时升级不支持:即持有读锁的情况下去获取写锁,会导致获取写锁永久等待
		r.lock();
        try {
            // ...
            w.lock();
            try {
                // ...
            } finally{
                w.unlock();
            }
        } finally{
            r.unlock();
        }
  • 重入时降级支持:即持有写锁的情况下去获取读锁
// 下面以ReentrantReadWriteLock 的 CachedData 类来说明,这段代码主要是使用读写锁来实现对缓存数据的并发访问,以提高并发读取操作的性能。


class CachedData {
    Object data;
    // 是否有效,如果失效,需要重新计算 data
    volatile boolean cacheValid;
    final ReentrantReadWriteLock rwl = new ReentrantReadWriteLock();
    void processCachedData() {
        // 先加读锁,判断缓存是否失效,如果没有失效,那么可以直接返回即可。使用完了将读锁解开即可
        rwl.readLock().lock();
        if (!cacheValid) {
            // 如果失效了,释放读锁,然后获得写锁,重新对其进行计算
            // 获取写锁前必须释放读锁
            rwl.readLock().unlock();
            rwl.writeLock().lock();
            try {
                // 判断是否有其它线程已经获取了写锁、更新了缓存, 避免重复更新
                
                /*
                在上述代码中,两次检查 cacheValid 的作用是为了在获取写锁之前和获取写锁后再次确认缓存的有效性。让我来详细解释一下:

第一次检查 cacheValid 发生在首次获取写锁之前。这是为了避免出现竞态条件(race condition)的情况,即在当前线程释放读锁后,有可能其他线程已经获取了写锁并更新了缓存。如果没有进行第一次检查,当前线程获取写锁后可能会重复更新缓存,造成不必要的计算和数据更新。

第二次检查 cacheValid 发生在获取写锁之后。虽然在第一次检查时缓存无效,但在当前线程获取写锁之前,可能有其他线程已经更新了缓存并将 cacheValid 设置为有效。因此,在获取写锁后再次检查 cacheValid 可以避免重复更新缓存,确保只有一个线程更新缓存数据。

通过这样的双重检查机制,可以有效地避免多个线程同时更新缓存数据,确保在并发环境下对缓存的更新操作是正确且高效的。此外,结合读写锁的降级操作,可以使得在缓存有效时多个线程能够同时读取数据,从而提高系统的并发性能。
                
                */
                
                
                if (!cacheValid) {
                    data = ...
                    cacheValid = true;
                }
                /*
                锁的降级,写锁释放开,但是我还想同时持有它的读锁,这是为了释放开的那个瞬间,其他线程的读取权限就ok了

				加读锁的目的也是为了你在读的的时候不受其他的写的线程的干扰
                */
                
                // 降级为读锁, 释放写锁, 这样能够让其它线程读取缓存
                rwl.readLock().lock();
            } finally {
                rwl.writeLock().unlock();
            }
        }
        // 自己用完数据, 释放读锁 
        try {
            use(data);
        } finally {
            rwl.readLock().unlock();
        }
    }
}

应用之缓存

缓存更新策略

更新时,是先清缓存还是先更新数据库

先清缓存

读操作的速度是大于写操作的

在这里插入图片描述

先更新数据库

在这里插入图片描述

补充一种情况,假设查询线程 A 查询数据时恰好缓存数据由于时间到期失效,或是第一次查询

在这里插入图片描述

这种情况的出现几率非常小

读写锁实现一致性缓存

使用读写锁实现一个简单的按需加载缓存(核心:写操作 加 写锁;读操作 加 读锁

class GenericCachedDao<T> {
    // HashMap 作为缓存非线程安全, 需要保护
    HashMap<SqlPair, T> map = new HashMap<>();

    ReentrantReadWriteLock lock = new ReentrantReadWriteLock();
    GenericDao genericDao = new GenericDao();
    public int update(String sql, Object... params) {
        SqlPair key = new SqlPair(sql, params);
        // 加写锁, 防止其它线程对缓存读取和更改
        lock.writeLock().lock();
        try {
            int rows = genericDao.update(sql, params);
            map.clear();
            return rows;
        } finally {
            lock.writeLock().unlock();
        }
    }
    public T queryOne(Class<T> beanClass, String sql, Object... params) {
        SqlPair key = new SqlPair(sql, params);
        // 加读锁, 防止其它线程对缓存更改
        lock.readLock().lock();
        try {
            T value = map.get(key);
            if (value != null) {
                return value;
            }
        } finally {
            lock.readLock().unlock();
        }
        // 加写锁, 防止其它线程对缓存读取和更改
        lock.writeLock().lock();
        try {
            // get 方法上面部分是可能多个线程进来的, 可能已经向缓存填充了数据
            // 为防止重复查询数据库, 再次验证
            T value = map.get(key);
            if (value == null) {
                // 如果没有, 查询数据库
                value = genericDao.queryOne(beanClass, sql, params);
                map.put(key, value);
            }
            return value;
        } finally {
            lock.writeLock().unlock();
        }
    }
    // 作为 key 保证其是不可变的
    class SqlPair {
        private String sql;
        private Object[] params;
        public SqlPair(String sql, Object[] params) {
            this.sql = sql;
            this.params = params;
        }
        @Override
        public boolean equals(Object o) {
            if (this == o) {
                return true;
            }
            if (o == null || getClass() != o.getClass()) {
                return false;
            }
            SqlPair sqlPair = (SqlPair) o;
            return sql.equals(sqlPair.sql) &&
                    Arrays.equals(params, sqlPair.params);
        }
        @Override
        public int hashCode() {
            int result = Objects.hash(sql);
            result = 31 * result + Arrays.hashCode(params);
            return result;
        }
    }
}

注意

以上实现体现的是读写锁的应用,保证缓存和数据库的一致性,但有下面的问题没有考虑:

  • 适合读多写少,如果写操作比较频繁,以上实现性能低
  • 没有考虑缓存容量
  • 没有考虑缓存过期
  • 只适合单机
  • 并发性还是低,目前只会用一把锁
  • 更新方法太过简单粗暴,清空了所有 key(考虑按类型分区或重新设计 key)

读写锁原理

读写锁用的是同一个 Sycn 同步器,因此等待队列、state 等也是同一个

t1 w.lock,t2 r.lock

在这里插入图片描述

其实该流程和 ReentrantLock 几乎是一样的,但是还是有一些区别的,比如state不太一样,因为state既要给读锁用,也要给写锁用,所以要将state分成两部分。

t1 成功上锁,流程与 ReentrantLock 加锁相比没有特殊之处,不同是写锁状态占了 state 的低 16 位,而读锁
使用的是 state 的高 16 位。

其实t1肯定是能加上锁,接下来分析一下源码:

ctrl + f12 找到 writelock 里面的lock方法:

public void lock() {
   sync.acquire(1);
}

public final void acquire(int arg) {
        if (!tryAcquire(arg) &&
            acquireQueued(addWaiter(Node.EXCLUSIVE), arg))
            selfInterrupt();
    }
/*
	首先会调用tryAcquire 尝试加锁,如果成功了那么后续的代码就不执行了,如果加锁失败了,才会进入这个队列

*/
protected final boolean tryAcquire(int acquires) {
            /*
             * Walkthrough:
             * 1. If read count nonzero or write count nonzero
             *    and owner is a different thread, fail.
             * 2. If count would saturate, fail. (This can only
             *    happen if count is already nonzero.)
             * 3. Otherwise, this thread is eligible for lock if
             *    it is either a reentrant acquire or
             *    queue policy allows it. If so, update state
             *    and set owner.
             */
            Thread current = Thread.currentThread();
    // 首先拿到整个state状态
            int c = getState();
            int w = exclusiveCount(c);
    // 如果不等于0,意味着既有可能其他线程加了读锁,也有可能是其他线程加了写锁
    // 因为高16位不等于0或者低16位不等于0都有可能导致 不等于0
            if (c != 0) {
                // (Note: if c != 0 and w == 0 then shared count != 0)
                // w == 0 代表 加的读锁部分   而 往后执行代表着 可能加的写锁,但是这个写锁是不是自己加的呢?比如先加的写锁,发生了重入,又加了一次写锁
                if (w == 0 || current != getExclusiveOwnerThread())
                    return false;
                // 如果写锁部分 再加 1超过写锁的最大范围了 65535 2的16次方
                if (w + exclusiveCount(acquires) > MAX_COUNT)
                    throw new Error("Maximum lock count exceeded");
                // Reentrant acquire
                // 可以理解为发生了可重入
                setState(c + acquires);
                return true;
            }
    // 如果能往下走,说明c是等于0的,代表别的线程都没有加锁,首先判断写锁是否需要阻塞,其实就意味着公平非公平,如果是非公平锁 就 总会返回false,公平锁会检查这个队列。然后就接着往后面走 是否能compareAndSetState
            if (writerShouldBlock() ||
                !compareAndSetState(c, c + acquires))
                return false;
    
    // 将对应线程设置成 owner
            setExclusiveOwnerThread(current);
            return true;
        }

接下来看 加 读锁的lock方法

public void lock() {
            sync.acquireShared(1);
        }

public final void acquireShared(int arg) {
    // 尝试去获取这个读锁
        if (tryAcquireShared(arg) < 0)
            doAcquireShared(arg);
    }

t2 执行 r.lock,这时进入读锁的 sync.acquireShared(1) 流程,首先会进入 tryAcquireShared 流程。如果有写
锁占据,那么 tryAcquireShared 返回 -1 表示失败

tryAcquireShared 返回值表示

  • -1 表示失败
  • 0 表示成功,但后继节点不会继续唤醒(0或者1会在后面的信号量章节介绍)
  • 正数表示成功,而且数值是还有几个后继节点需要唤醒,读写锁返回 1
protected final int tryAcquireShared(int unused) {
            
            Thread current = Thread.currentThread();
            int c = getState();
    // 检查写锁部分是否不为0 此时t1已经将其变为1了,去检查加写锁的是不是当前线程呢?
    // 这种情况其实就是 t2 已经加了写锁,然后又加了读锁,这里是应该成功的,因为这是锁降级的过程
    // 最终我们当前的情况来看,t2 其实就是返回-1了。
            if (exclusiveCount(c) != 0 &&
                getExclusiveOwnerThread() != current)
                return -1;
            int r = sharedCount(c);
            if (!readerShouldBlock() &&
                r < MAX_COUNT &&
                compareAndSetState(c, c + SHARED_UNIT)) {
                if (r == 0) {
                    firstReader = current;
                    firstReaderHoldCount = 1;
                } else if (firstReader == current) {
                    firstReaderHoldCount++;
                } else {
                    HoldCounter rh = cachedHoldCounter;
                    if (rh == null || rh.tid != getThreadId(current))
                        cachedHoldCounter = rh = readHolds.get();
                    else if (rh.count == 0)
                        readHolds.set(rh);
                    rh.count++;
                }
                return 1;
            }
            return fullTryAcquireShared(current);
        }

在这里插入图片描述

返回-1,这时会进入 sync.doAcquireShared(1) 流程,首先也是调用 addWaiter 添加节点,不同之处在于节点被设置为Node.SHARED 模式而非 Node.EXCLUSIVE 模式,注意此时 t2 仍处于活跃状态

在这里插入图片描述

private void doAcquireShared(int arg) {
    // 唤醒的时候,判断的逻辑稍有不同
        final Node node = addWaiter(Node.SHARED);
        boolean failed = true;
        try {
            boolean interrupted = false;
            for (;;) {
                // 死循环,去找t2 有没有前驱节点
                final Node p = node.predecessor();
                // 如果前驱节点是 head,那么说明其是 第二个节点,是有资格争抢锁的
                if (p == head) {
                    // 调用tryAcquireShared  返回-1表示失败,返回0或者1表示成功
                    int r = tryAcquireShared(arg);
                    if (r >= 0) {
                        setHeadAndPropagate(node, r);
                        p.next = null; // help GC
                        if (interrupted)
                            selfInterrupt();
                        failed = false;
                        return;
                    }
                }
                // 如果返回-1,说明并没有释放锁,那么就会走到这个逻辑,和ReentrantLock逻辑一致,走park	shouldParkAfterFailedAcquire将其前驱节点设置成-1,然后 重新for循环,设置为park
                if (shouldParkAfterFailedAcquire(p, node) &&
                    parkAndCheckInterrupt())
                    interrupted = true;
            }
        } finally {
            if (failed)
                cancelAcquire(node);
        }
    }

在这里插入图片描述

t3 r.lock,t4 w.lock

这种状态下,假设又有 t3 加读锁和 t4 加写锁,这期间 t1 仍然持有锁,就变成了下面的样子

在这里插入图片描述

t2 t3都是读锁,所以状态都是 SHARED,而t4是写锁,所以状态是EXCLUSIVE

t1 w.unlock

这时会走到写锁的 sync.release(1) 流程,调用 sync.tryRelease(1) 成功,变成下面的样子

public void unlock() {
            sync.release(1);
        }
        
public final boolean release(int arg) {
    // 如果return true呢,就会执行后续的逻辑
        if (tryRelease(arg)) {
            Node h = head;
            if (h != null && h.waitStatus != 0)
                unparkSuccessor(h);
            return true;
        }
        return false;
    }

protected final boolean tryRelease(int releases) {
            if (!isHeldExclusively())
                throw new IllegalMonitorStateException();
    // 首先在原来的基础上减1
            int nextc = getState() - releases;
    // 然后去查看写锁部分是不是减成0了
            boolean free = exclusiveCount(nextc) == 0;
            if (free)
                setExclusiveOwnerThread(null);
    // 如果没有减成0 代表着这事一次锁的重入
            setState(nextc);
            return free;
        }

在这里插入图片描述

接下来执行唤醒流程 sync.unparkSuccessor,即让老二恢复运行,这时 t2 在 doAcquireShared 内
parkAndCheckInterrupt() 处恢复运行

这回再来一次 for (;😉 执行 tryAcquireShared 成功则让读锁计数加一

private void doAcquireShared(int arg) {
        final Node node = addWaiter(Node.SHARED);
        boolean failed = true;
        try {
            boolean interrupted = false;
            for (;;) {
                final Node p = node.predecessor();
                if (p == head) {
                    int r = tryAcquireShared(arg);
                    if (r >= 0) {
                        
                        // 此时进入到这个条件中
                        
                        // 替换头结点
                        
                        setHeadAndPropagate(node, r);
                        p.next = null; // help GC
                        if (interrupted)
                            selfInterrupt();
                        failed = false;
                        return;
                    }
                }
                if (shouldParkAfterFailedAcquire(p, node) &&
                    // 此时这里唤醒了,就可以继续运行了,然后再循环
                    parkAndCheckInterrupt())
                    interrupted = true;
            }
        } finally {
            if (failed)
                cancelAcquire(node);
        }
    }


// 返回-1表示失败,返回0或者1表示成功
protected final int tryAcquireShared(int unused) {
            
            Thread current = Thread.currentThread();
            int c = getState();
    // 返回-1的之前分析过了,就不再做分析了
            if (exclusiveCount(c) != 0 &&
                getExclusiveOwnerThread() != current)
                return -1;
    // 获取读锁,也就是高16位
            int r = sharedCount(c);
    // 不应该被阻塞住
            if (!readerShouldBlock() &&
                // 没有超过最大基数
                r < MAX_COUNT &&
                // 对于高位来讲,不是加1,是加了65536
                compareAndSetState(c, c + SHARED_UNIT)) {
                
                // 如果都符合,代表后续读锁加锁成功了
                
                if (r == 0) {
                    firstReader = current;
                    firstReaderHoldCount = 1;
                } else if (firstReader == current) {
                    firstReaderHoldCount++;
                } else {
                    HoldCounter rh = cachedHoldCounter;
                    if (rh == null || rh.tid != getThreadId(current))
                        cachedHoldCounter = rh = readHolds.get();
                    else if (rh.count == 0)
                        readHolds.set(rh);
                    rh.count++;
                }
                return 1;
            }
            return fullTryAcquireShared(current);
        }

在这里插入图片描述

这时 t2 已经恢复运行,接下来 t2 调用 setHeadAndPropagate(node, 1),它原本所在节点被置为头节点

在这里插入图片描述

// 替换头结点
private void setHeadAndPropagate(Node node, int propagate) {
        Node h = head; // Record old head for check below
        setHead(node);
        
        if (propagate > 0 || h == null || h.waitStatus < 0 ||
            (h = head) == null || h.waitStatus < 0) {
            // 拿到当前节点的下一个 
            Node s = node.next;
            // 如果节点的状态是 shared的话,
            if (s == null || s.isShared())
                doReleaseShared();
        }
    }

事情还没完,在 setHeadAndPropagate 方法内还会检查下一个节点是否是 shared,如果是则调用
doReleaseShared() 将 head 的状态从 -1 改为 0 并唤醒老二,这时 t3 在 doAcquireShared 内parkAndCheckInterrupt() 处恢复运行

private void doReleaseShared() {
        
        for (;;) {
            Node h = head;
            if (h != null && h != tail) {
                int ws = h.waitStatus;
                if (ws == Node.SIGNAL) {
                    // 在这里会将节点的状态从 -1 改成 0

                    if (!compareAndSetWaitStatus(h, Node.SIGNAL, 0))
                        continue;            // loop to recheck cases
                    // 对头结点的后继结点又要去唤醒,这就接上了 此时就唤醒t3的park
                    unparkSuccessor(h);
                }
                else if (ws == 0 &&
                         !compareAndSetWaitStatus(h, 0, Node.PROPAGATE))
                    continue;                // loop on failed CAS
            }
            if (h == head)                   // loop if head changed
                break;
        }
    }

在这里插入图片描述

private void doAcquireShared(int arg) {
        final Node node = addWaiter(Node.SHARED);
        boolean failed = true;
        try {
            boolean interrupted = false;
            for (;;) {
                final Node p = node.predecessor();
                if (p == head) {
                    // 又再次来到了 tryAcquireShared方法,和前面的流程是一样的
                    int r = tryAcquireShared(arg);
                    if (r >= 0) {
                        setHeadAndPropagate(node, r);
                        p.next = null; // help GC
                        if (interrupted)
                            selfInterrupt();
                        failed = false;
                        return;
                    }
                }
                if (shouldParkAfterFailedAcquire(p, node) &&
                    // 此时t3 被唤醒了,和之前的流程是一样的
                    parkAndCheckInterrupt())
                    interrupted = true;
            }
        } finally {
            if (failed)
                cancelAcquire(node);
        }
    }

protected final int tryAcquireShared(int unused) {
            
            Thread current = Thread.currentThread();
            int c = getState();
    // 返回-1的之前分析过了,就不再做分析了
            if (exclusiveCount(c) != 0 &&
                getExclusiveOwnerThread() != current)
                return -1;
    // 获取读锁,也就是高16位
            int r = sharedCount(c);
    // 不应该被阻塞住
            if (!readerShouldBlock() &&
                // 没有超过最大基数
                r < MAX_COUNT &&
                // 对于高位来讲,不是加1,是加了65536
                compareAndSetState(c, c + SHARED_UNIT)) {
                
                // 如果都符合,代表后续读锁加锁成功了
                
                if (r == 0) {
                    firstReader = current;
                    firstReaderHoldCount = 1;
                } else if (firstReader == current) {
                    firstReaderHoldCount++;
                } else {
                    HoldCounter rh = cachedHoldCounter;
                    if (rh == null || rh.tid != getThreadId(current))
                        cachedHoldCounter = rh = readHolds.get();
                    else if (rh.count == 0)
                        readHolds.set(rh);
                    rh.count++;
                }
                return 1;
            }
            return fullTryAcquireShared(current);
        }

这回再来一次 for (;😉 执行 tryAcquireShared 成功则让读锁计数加一

在这里插入图片描述

这时 t3 已经恢复运行,接下来 t3 调用 setHeadAndPropagate(node, 1),它原本所在节点被置为头节点

在这里插入图片描述

// 替换头结点
private void setHeadAndPropagate(Node node, int propagate) {
        Node h = head; // Record old head for check below
        setHead(node);
        
        if (propagate > 0 || h == null || h.waitStatus < 0 ||
            (h = head) == null || h.waitStatus < 0) {
            // 拿到当前节点的下一个 
            Node s = node.next;
            // 如果节点的状态是 shared的话,
            if (s == null || s.isShared())
                doReleaseShared();
        }
    }

下一个节点不是 shared 了,因此不会继续唤醒 t4 所在节点

t2 r.unlock,t3 r.unlock

t2 进入 sync.releaseShared(1) 中,调用 tryReleaseShared(1) 让计数减一,但由于计数还不为零

在这里插入图片描述

public void unlock() {
            sync.releaseShared(1);
        }

public final boolean releaseShared(int arg) {
        if (tryReleaseShared(arg)) {
            doReleaseShared();
            return true;
        }
        return false;
    }

protected final boolean tryReleaseShared(int unused) {
            Thread current = Thread.currentThread();
            if (firstReader == current) {
                // assert firstReaderHoldCount > 0;
                if (firstReaderHoldCount == 1)
                    firstReader = null;
                else
                    firstReaderHoldCount--;
            } else {
                HoldCounter rh = cachedHoldCounter;
                if (rh == null || rh.tid != getThreadId(current))
                    rh = readHolds.get();
                int count = rh.count;
                if (count <= 1) {
                    readHolds.remove();
                    if (count <= 0)
                        throw unmatchedUnlockException();
                }
                --rh.count;
            }
            for (;;) {
                int c = getState();
                // 获取状态,减去高位的1
                int nextc = c - SHARED_UNIT;
                if (compareAndSetState(c, nextc))
                    // Releasing the read lock has no effect on readers,
                    // but it may allow waiting writers to proceed if
                    // both read and write locks are now free.
                    return nextc == 0;
            }
        }

t3 进入 sync.releaseShared(1) 中,调用 tryReleaseShared(1) 让计数减一,这回计数为零了,进入
doReleaseShared() 将头节点从 -1 改为 0 并唤醒老二,即

在这里插入图片描述

private void doReleaseShared() {
        
        for (;;) {
            Node h = head;
            if (h != null && h != tail) {
                int ws = h.waitStatus;
                if (ws == Node.SIGNAL) {
                    if (!compareAndSetWaitStatus(h, Node.SIGNAL, 0))
                        continue;            // loop to recheck cases
                    // 本质上唤醒t4
                    unparkSuccessor(h);
                }
                else if (ws == 0 &&
                         !compareAndSetWaitStatus(h, 0, Node.PROPAGATE))
                    continue;                // loop on failed CAS
            }
            if (h == head)                   // loop if head changed
                break;
        }
    }

之后 t4 在 acquireQueued 中 parkAndCheckInterrupt 处恢复运行,再次 for (;😉 这次自己是老二,并且没有其他
竞争,tryAcquire(1) 成功,修改头结点,流程结束

在这里插入图片描述

StampedLock

该类自 JDK 8 加入,是为了进一步优化读性能,它的特点是在使用读锁、写锁时都必须配合【戳】使用

这里对优化性能部分做了专门的解读:

之前学读写锁,其实已经看到了读读可以并发,已经很快了,但是还不够快,读读并发的时候,底层还是用cas的方式去修改它的状态,读锁的高16位去修改它的状态,它还是性能上比不上不加锁,如果希望读取的性能达到这种极致,那么就可以使用StampedLock。

加解读锁

long stamp = lock.readLock();
lock.unlockRead(stamp);

加解写锁

long stamp = lock.writeLock();
lock.unlockWrite(stamp);

乐观读,StampedLock 支持 tryOptimisticRead() 方法(乐观读),读取完毕后需要做一次 戳校验 如果校验通
过,表示这期间确实没有写操作,数据可以安全使用,如果校验没通过,需要重新获取读锁,保证数据安全。

// 这里面没有加任何的锁
long stamp = lock.tryOptimisticRead();
// 验戳
if(!lock.validate(stamp)){
 	// 锁升级
}

提供一个 数据容器类 内部分别使用读锁保护数据的 read() 方法,写锁保护数据的 write() 方法

class DataContainerStamped {
    private int data;
    private final StampedLock lock = new StampedLock();
    public DataContainerStamped(int data) {
        this.data = data;
    }
    public int read(int readTime) {
        long stamp = lock.tryOptimisticRead();
        log.debug("optimistic read locking...{}", stamp);
        sleep(readTime);
        if (lock.validate(stamp)) {
            log.debug("read finish...{}, data:{}", stamp, data);
            return data;
        }
        // 锁升级 - 读锁
        log.debug("updating to read lock... {}", stamp);
        try {
            stamp = lock.readLock();
            log.debug("read lock {}", stamp);
            sleep(readTime);
            log.debug("read finish...{}, data:{}", stamp, data);
            return data;
        } finally {
            log.debug("read unlock {}", stamp);
            lock.unlockRead(stamp);
        }
    }
    public void write(int newData) {
        long stamp = lock.writeLock();
        log.debug("write lock {}", stamp);
        try {
            sleep(2);
            this.data = newData;
        } finally {
            log.debug("write unlock {}", stamp);
            lock.unlockWrite(stamp);
        }
    }
}

测试 读-读 可以优化

public static void main(String[] args) {
        DataContainerStamped dataContainer = new DataContainerStamped(1);
        new Thread(() -> {
            dataContainer.read(1);
        }, "t1").start();
        sleep(0.5);
        new Thread(() -> {
            dataContainer.read(0);
        }, "t2").start();
    }

输出结果,可以看到实际没有加读锁

15:58:50.217 c.DataContainerStamped [t1] - optimistic read locking...256 
15:58:50.717 c.DataContainerStamped [t2] - optimistic read locking...256 
15:58:50.717 c.DataContainerStamped [t2] - read finish...256, data:1 
15:58:51.220 c.DataContainerStamped [t1] - read finish...256, data:1

测试 读-写 时优化读补加读锁

public static void main(String[] args) {
        DataContainerStamped dataContainer = new DataContainerStamped(1);
        new Thread(() -> {
            dataContainer.read(1);
        }, "t1").start();
        sleep(0.5);
        new Thread(() -> {
            dataContainer.write(100);
        }, "t2").start();
    }

输出结果

15:57:00.219 c.DataContainerStamped [t1] - optimistic read locking...256 
15:57:00.717 c.DataContainerStamped [t2] - write lock 384 
15:57:01.225 c.DataContainerStamped [t1] - updating to read lock... 256 
15:57:02.719 c.DataContainerStamped [t2] - write unlock 384 
15:57:02.719 c.DataContainerStamped [t1] - read lock 513 
15:57:03.719 c.DataContainerStamped [t1] - read finish...513, data:1000 
15:57:03.719 c.DataContainerStamped [t1] - read unlock 513 

注意

  • StampedLock 不支持条件变量
  • StampedLock 不支持可重入

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.coloradmin.cn/o/1192241.html

如若内容造成侵权/违法违规/事实不符,请联系多彩编程网进行投诉反馈,一经查实,立即删除!

相关文章

炮炮面试——简历中专业技能

自我介绍: 面试官,您好! 我叫XXX,今年25岁,目前是XXX大学电子信息学院研三的一名学生,面试岗位是前端研发工程师。下面我将从以下几个方面来介绍自己: (学习能力)在校期间通过了英语四六级、以及一些计算机专业证书,成绩一直是专业前5%,每年都获得学…

Elastic Stack 8.11:引入一种新的强大查询语言 ES|QL

作者&#xff1a;Tyler Perkins, Ninoslav Miskovic, Gilad Gal, Teresa Soler, Shani Sagiv, Jason Burns Elastic Stack 8.11 引入了数据流生命周期、一种配置数据流保留和降采样&#xff08;downsampling&#xff09; 的简单方法&#xff08;技术预览版&#xff09;&#xf…

LiveMedia视频监控汇聚管理平台视频接入方案(二)

上一篇文章中我们介绍了LiveMedia视频监控汇聚管理平台技术方案的架构。今天我们来介绍下LiveMedia视频监控汇聚管理平台的视频接入方案。 视频集控平台建设充分考虑利旧的建设原则&#xff0c;同时根据各个现有视频监控建设情况&#xff0c;考虑统一规划、分布实施的建设方式。…

电脑怎么录制视频,录制的视频怎么剪辑?

在现今数字化的时代&#xff0c;视频成为了人们日常生活中不可或缺的一部分。因此&#xff0c;对于一些需要制作视频教程、录制游戏或者是进行视频演示的人来说&#xff0c;电脑录屏已经成为了一个必不可少的工具。那么&#xff0c;对于这些人来说&#xff0c;如何选择一个好用…

Mysql视图应用

现在&#xff0c;我们将创建一个视图&#xff0c;将员工的姓名、部门和工资信息组合在一起。 CREATE VIEW EmployeeSalaryView AS SELECT e.FirstName, e.LastName, e.Department, s.MonthlySalary FROM Employees e JOIN Salary s ON e.EmployeeID s.EmployeeID;通过这个视图…

MySQL大表数据导入到MongoDB

修改参数 &#xff0c;开启into outfile的功能 secure_file_priv/home/backups/mysql_outfile 重启数据库是参数生效 按条件导出MySQL数据 select * from receipt_receive_log where gmt_create > 2020-04-13 00:00:00 and gmt_create< 2020-07-13 00:00:00 INTO O…

Azure 机器学习 - 有关为 Azure 机器学习配置 Kubernetes 群集的参考

目录 受支持的 Kubernetes 版本和区域建议的资源计划ARO 或 OCP 群集的先决条件禁用安全增强型 Linux (SELinux)ARO 和 OCP 的特权设置 收集的日志详细信息Azure 机器学习作业与自定义数据存储连接支持的 Azure 机器学习排斥和容许最佳实践 通过 HTTP 或 HTTPS 将其他入口控制器…

面试题:在 Java 中 new 一个对象的流程是怎样的?彻底被问懵了。。

文章目录 前言JVM内存JVM生成.class文件类加载器加载.class文件知识扩展&#xff1a;Class对象首先搞清楚 newInstance 两种方法区别&#xff1a; 连接和初始化创建实例 前言 对象怎么创建&#xff0c;这个太熟悉了&#xff0c;new一下(其实还有很多途径&#xff0c;比如反射、…

EAS 去除重复的 职位 组织树信息

--备份 SELECT * into T_PM_ORGRANGE_bak1110 FROM T_PM_ORGRANGE --检查是否备份成功 select count(1) from T_PM_ORGRANGE_bak1110 --查询是否有重复脏数据 SELECT FORGID,FUSERID,FTYPE FROM T_PM_ORGRANGE group by FORGID,FUSERID,FTYPE having count(1)>1 --删除脏数据…

推送效率低?MobPush带着APP消息推送一站式解决方案来了

随着移动应用竞争的日趋激烈&#xff0c;如何拉新促活&#xff0c;保持用户粘性成为各大APP的运营的焦点和核心。作为一种有效的营销和用户保留工具。APP消息推送可以有效提高用户参与度&#xff0c;增强用户忠诚度&#xff0c;并最终提高业务效益。然而随着各大APP推送的高度同…

企业电子期刊怎么做,用这个平台就对啦!

企业期刊虽然只是一个小小的刊物&#xff0c;但对于企业的文化建设有着重要的作用。近年来&#xff0c;越来越多的企业开始使用企业电子期刊来为公司文化建设规划服务&#xff0c;也越来越重视企业期刊的质量和水平。 那怎么制作企业电子期刊呢&#xff1f;有句话说得好&#…

【Python3】【力扣题】258. 各位相加

【力扣题】题目描述&#xff1a; 【Python3】代码&#xff1a; 1、解题思路&#xff1a;将整数转为字符串&#xff0c;遍历字符串中的数字&#xff0c;求和。 知识点&#xff1a;str(...)&#xff1a;转为字符串。为了遍历每个数字。 int(...)&#xff1a;转为整数。为了数字…

网络缓冲区

windows下的体系&#xff0c;我不是特别了解。以下所有的内容都是在Linux下的理解&#xff0c;如果不对的地方&#xff0c;评论区欢迎留言。 Linux收发数据 接收数据 大体的流程如上图所示&#xff0c;接下来我们对图中一些名词进行解释。 -----------------------------------…

【博士每天一篇文献-算法】Learning without forgetting

阅读时间&#xff1a;2023-10-29 1 介绍 年份&#xff1a;2016 作者&#xff1a;李志忠; 德里克霍伊姆&#xff0c;伊利诺伊大学 期刊&#xff1a;IEEE transactions on pattern analysis and machine intelligence 引用量&#xff1a;3353 提出一种名为"无忘记学习&quo…

Linux下的环境变量【详解】

Linux下的环境变量 一&#xff0c;环境变量的概念1 概述2 环境变量的分类3 常见的环境变量4 查看环境变量4.1 shell变量4.2 查看环境变量 5 添加和删除环境变量5.1 添加环境变量5.2 删除环境变量 6. 通过代码如何获取环境变量6.1 命令行的第三个参数6.2 通过第三方变量environ获…

手把手教你如何扩展(破解)mybatisplus的sql生成 | 京东云技术团队

mybatisplus 的常用CRUD方法 众所周知&#xff0c;mybatisplus提供了强大的代码生成能力&#xff0c;他默认生成的常用的CRUD方法&#xff08;例如插入、更新、删除、查询等&#xff09;的定义&#xff0c;能够帮助我们节省很多体力劳动。 他的BaseMapper中定义了这些常用的C…

3线硬件SPI+DMA驱动 HX8347 TFT屏

3线硬件SPIDMA驱动 HX8347 TFT屏&#xff0c;实现用DMA清屏。 参考&#xff1a;基于stm32 标准库spi驱动st7789彩屏TFT(使用DMA)-技术天地-深圳市修德电子有限公司 一、源码 HX8347.h #ifndef USER_HX8347_H_ #define USER_HX8347_H_#define SPI_hardware #define SPI_hardw…

详解JS的四种异步解决方案:回调函数、Promise、Generator、async/await

同步&异步的概念 在讲这四种异步方案之前&#xff0c;我们先来明确一下同步和异步的概念&#xff1a; 所谓同步(synchronization)&#xff0c;简单来说&#xff0c;就是顺序执行&#xff0c;指的是同一时间只能做一件事情&#xff0c;只有目前正在执行的事情做完之后&am…

AI:80-基于深度学习的医学图像分割与病变识别

🚀 本文选自专栏:人工智能领域200例教程专栏 从基础到实践,深入学习。无论你是初学者还是经验丰富的老手,对于本专栏案例和项目实践都有参考学习意义。 ✨✨✨ 每一个案例都附带有在本地跑过的代码,详细讲解供大家学习,希望可以帮到大家。欢迎订阅支持,正在不断更新中,…

强大好用的shell:shell的工作原理是什么

Shell的工作原理可以简要概括为以下几个步骤&#xff1a; 1.命令行输入&#xff1a;用户在命令行界面输入命令。 2.命令解析&#xff1a;Shell接收用户的输入&#xff0c;并对命令进行解析。这个过程包括解析命令名、参数、选项等&#xff0c;将其转换成计算机可以理解的形式。…