一、ReentrantReadWriteLock(P247)
当读操作远远高于写操作时,这时候使用 【 读写锁】 让 【 读 - 读】 可以并发,提高性能。 类似于数据库中的 select ... from ... lock in share mode
提供一个 数据容器类 内部分别使用读锁保护数据的 read() 方法,写锁保护数据的 write() 方法
@Slf4j(topic = "c.TestReadWriteLock") public class TestReadWriteLock { public static void main(String[] args) throws InterruptedException { DataContainer dataContainer = new DataContainer(); new Thread(() -> { dataContainer.read(); }, "t1").start(); new Thread(() -> { dataContainer.read(); }, "t2").start(); } } @Slf4j(topic = "c.DataContainer") class DataContainer { private Object data; private ReentrantReadWriteLock rw = new ReentrantReadWriteLock(); private ReentrantReadWriteLock.ReadLock r = rw.readLock(); private ReentrantReadWriteLock.WriteLock w = rw.writeLock(); public Object read() { log.debug("获取读锁..."); r.lock(); try { log.debug("读取"); sleep(1); return data; } finally { log.debug("释放读锁..."); r.unlock(); } } public void write() { log.debug("获取写锁..."); w.lock(); try { log.debug("写入"); sleep(1); } finally { log.debug("释放写锁..."); w.unlock(); } } }
注意事项(1)读锁不支持条件变量。(2)重入时升级不支持:即持有读锁的情况下去获取写锁,会导致获取写锁永久等待(3) 重入时降级支持:即持有写锁的情况下去获取读锁。
二、 * 应用之缓存
1. 缓存更新策略
更新时,是先清缓存还是先更新数据库
2. 读写锁实现一致性缓存
public class TestGenericDao { public static void main(String[] args) { GenericDao dao = new GenericDaoCached(); System.out.println("============> 查询"); String sql = "select * from emp where empno = ?"; int empno = 7369; Emp emp = dao.queryOne(Emp.class, sql, empno); System.out.println(emp); emp = dao.queryOne(Emp.class, sql, empno); System.out.println(emp); emp = dao.queryOne(Emp.class, sql, empno); System.out.println(emp); System.out.println("============> 更新"); dao.update("update emp set sal = ? where empno = ?", 800, empno); emp = dao.queryOne(Emp.class, sql, empno); System.out.println(emp); } } class GenericDaoCached extends GenericDao { private GenericDao dao = new GenericDao(); private Map<SqlPair, Object> map = new HashMap<>(); private ReentrantReadWriteLock rw = new ReentrantReadWriteLock(); @Override public <T> List<T> queryList(Class<T> beanClass, String sql, Object... args) { return dao.queryList(beanClass, sql, args); } @Override public <T> T queryOne(Class<T> beanClass, String sql, Object... args) { // 先从缓存中找,找到直接返回 SqlPair key = new SqlPair(sql, args);; rw.readLock().lock(); try { T value = (T) map.get(key); if(value != null) { return value; } } finally { rw.readLock().unlock(); } rw.writeLock().lock(); try { // 多个线程 T value = (T) map.get(key); if(value == null) { // 缓存中没有,查询数据库 value = dao.queryOne(beanClass, sql, args); map.put(key, value); } return value; } finally { rw.writeLock().unlock(); } } @Override public int update(String sql, Object... args) { rw.writeLock().lock(); try { // 先更新库 int update = dao.update(sql, args); // 清空缓存 map.clear(); return update; } finally { rw.writeLock().unlock(); } } class SqlPair { private String sql; private Object[] args; public SqlPair(String sql, Object[] args) { this.sql = sql; this.args = args; } @Override public boolean equals(Object o) { if (this == o) { return true; } if (o == null || getClass() != o.getClass()) { return false; } SqlPair sqlPair = (SqlPair) o; return Objects.equals(sql, sqlPair.sql) && Arrays.equals(args, sqlPair.args); } @Override public int hashCode() { int result = Objects.hash(sql); result = 31 * result + Arrays.hashCode(args); return result; } } }
注意
以上实现体现的是读写锁的应用,保证缓存和数据库的一致性,但有下面的问题没有考虑1️⃣适合读多写少,如果写操作比较频繁,以上实现性能低2️⃣没有考虑缓存容量3️⃣没有考虑缓存过期4️⃣只适合单机5️⃣并发性还是低,目前只会用一把锁6️⃣更新方法太过简单粗暴,清空了所有 key (考虑按类型分区或重新设计 key )
三、* 读写锁原理
1. 图解流程
读写锁用的是同一个 Sycn 同步器,因此等待队列、 state 等也是同一个
1.1 t1 w.lock,t2 r.lock
(1)t1 成功上锁,流程与 ReentrantLock 加锁相比没有特殊之处,不同是写锁状态占了 state 的低 16 位,而读锁使用的是 state 的高 16 位
(2)t2 执行 r.lock,这时进入读锁的 sync.acquireShared(1) 流程,首先会进入 tryAcquireShared 流程。如果有写锁占据,那么 tryAcquireShared 返回 -1 表示失败
tryAcquireShared 返回值表示
1️⃣-1 表示失败
2️⃣0 表示成功,但后继节点不会继续唤醒
3️⃣正数表示成功,而且数值是还有几个后继节点需要唤醒,读写锁返回 1
(3)这时会进入 sync.doAcquireShared(1) 流程,首先也是调用 addWaiter 添加节点,不同之处在于节点被设置为 Node.SHARED 模式而非 Node.EXCLUSIVE 模式,注意此时 t2 仍处于活跃状态
(4)t2 会看看自己的节点是不是老二,如果是,还会再次调用 tryAcquireShared(1) 来尝试获取锁
(5)如果没有成功,在 doAcquireShared 内 for (;;) 循环一次,把前驱节点的 waitStatus 改为 -1,再 for (;;) 循环一次尝试 tryAcquireShared(1) 如果还不成功,那么在 parkAndCheckInterrupt() 处 park
1.2 t3 r.lock,t4 w.lock
这种状态下,假设又有 t3 加读锁和 t4 加写锁,这期间 t1 仍然持有锁,就变成了下面的样子
1.3 t1 w.unlock
2. 源码分析
四、StampedLock
该类自 JDK 8 加入,是为了进一步优化读性能,它的特点是在使用读锁、写锁时都必须配合【戳】使用
加解读锁
long stamp = lock.readLock(); lock.unlockRead(stamp);
加解写锁long stamp = lock.writeLock(); lock.unlockWrite(stamp);
乐观读, StampedLock 支持 tryOptimisticRead() 方法(乐观读),读取完毕后需要做一次 戳校验 如果校验通过,表示这期间确实没有写操作,数据可以安全使用,如果校验没通过,需要重新获取读锁,保证数据安全。long stamp = lock.tryOptimisticRead(); // 验戳 if(!lock.validate(stamp)){ // 锁升级 }
提供一个 数据容器类 内部分别使用读锁保护数据的 read() 方法,写锁保护数据的 write() 方法@Slf4j(topic = "c.TestStampedLock") public class TestStampedLock { public static void main(String[] args) { DataContainerStamped dataContainer = new DataContainerStamped(1); new Thread(() -> { dataContainer.read(1); }, "t1").start(); sleep(0.5); new Thread(() -> { dataContainer.read(0); }, "t2").start(); } } @Slf4j(topic = "c.DataContainerStamped") class DataContainerStamped { private int data; private final StampedLock lock = new StampedLock(); public DataContainerStamped(int data) { this.data = data; } public int read(int readTime) { long stamp = lock.tryOptimisticRead(); log.debug("optimistic read locking...{}", stamp); sleep(readTime); if (lock.validate(stamp)) { log.debug("read finish...{}, data:{}", stamp, data); return data; } // 锁升级 - 读锁 log.debug("updating to read lock... {}", stamp); try { stamp = lock.readLock(); log.debug("read lock {}", stamp); sleep(readTime); log.debug("read finish...{}, data:{}", stamp, data); return data; } finally { log.debug("read unlock {}", stamp); lock.unlockRead(stamp); } } public void write(int newData) { long stamp = lock.writeLock(); log.debug("write lock {}", stamp); try { sleep(2); this.data = newData; } finally { log.debug("write unlock {}", stamp); lock.unlockWrite(stamp); } } }
注意StampedLock 不支持条件变量StampedLock 不支持可重入