文章目录
- 1 概述
- 2 性质
- 3 简单测试
- 4 模拟数据缓存
- 4.1 应用初始化无缓存
- 4.2 加入缓存改造
- 5 后记
1 概述
ReentrantReadWriteLock 是读写锁,和ReentrantLock会有所不同,对于读多写少的场景使用ReentrantReadWriteLock 性能会比ReentrantLock高出不少。在多线程读时互不影响,不像ReentrantLock即使是多线程读也需要每个线程获取锁。不过任何一个线程在写的时候就和ReentrantLock类似,其他线程无论读还是写都必须获取锁。需要注意的是同一个线程可以拥有 writeLock 与 readLock (但必须先获取 writeLock 再获取 readLock, 反过来进行获取会导致死锁)
ReentrantReadWriteLock 类结构图如下1-1所示:
2 性质
ReentrantReadWriteLock锁分读锁和写锁,那么ReentrantReadWriteLock是有2把锁吗?为解答这个问题,我们来看下ReentrantReadWriteLock获取读锁和写锁的源代码,如下2-1所示:
private final ReentrantReadWriteLock.ReadLock readerLock;
/** Inner class providing writelock */
private final ReentrantReadWriteLock.WriteLock writerLock;
/** Performs all synchronization mechanics */
final Sync sync;
public ReentrantReadWriteLock() {
this(false);
}
public ReentrantReadWriteLock(boolean fair) {
sync = fair ? new FairSync() : new NonfairSync();
readerLock = new ReadLock(this);
writerLock = new WriteLock(this);
}
public ReentrantReadWriteLock.WriteLock writeLock() { return writerLock; }
public ReentrantReadWriteLock.ReadLock readLock() { return readerLock; }
通过查看源代码,我们知道ReentrantReadWriteLock的读锁和写锁公用一个同步器,公用同一个锁竞争队列,公用一个锁状态。
公用一个锁竞争队列的话如何区分是读锁阻塞的还是写锁阻塞的呢?这个问题等下面我们讲解加锁和解锁原理的时候讲解。
公用一个锁状态state,怎么区分是加的读锁还是写锁,以及怎么记录锁重入的呢?我们继续查看源代码,如下:
static final int SHARED_SHIFT = 16;
static final int SHARED_UNIT = (1 << SHARED_SHIFT);
static final int MAX_COUNT = (1 << SHARED_SHIFT) - 1;
static final int EXCLUSIVE_MASK = (1 << SHARED_SHIFT) - 1;
/** Returns the number of shared holds represented in count */
static int sharedCount(int c) { return c >>> SHARED_SHIFT; }
/** Returns the number of exclusive holds represented in count */
static int exclusiveCount(int c) { return c & EXCLUSIVE_MASK; }
- SHARED_SHIFT:读锁占用state的高位位数16
- SHARED_UNIT:读锁加锁状态+1对应的数值就是加 2 16 2^{16} 216
- MAX_COUNT:最大锁重入次数 2 16 − 1 2^{16}-1 216−1
- EXCLUSIVE_MASK:计算写锁计数掩码 2 16 − 1 2^{16}-1 216−1
- sharedCount():计算读锁计数,我们知道读锁占用int state高16位,这里通过直接无符号右移16位得到读锁计数
- exclusiveCount():计数写锁计数,写锁占用int state低16位,通过&EXCLUSIVE_MASK运算,得到写锁计数
- 对位运算不熟悉的自行查询相关文档
特点:
-
ReentrantReadWriteLock 在多线程环境下的锁关系:读读共享,其他都是互斥,包括读写,写写
- 我们通过下面一个小测试来验证下。
-
重入时升级不支持,即持有读锁的情况下获取写锁,会导致回去读锁永久等待。
- 原理,在下面我们分析加锁原理的时候讲解。
-
重入时支持锁降级,即持有写锁的情况下获取读锁
-
读锁不支持条件变量,写锁支持条件变量
3 简单测试
简单做个测试,看看ReentrantReadWriteLock的读写锁如何使用,及验证下锁共享和互相关系,代码如下2-1所示:
import lombok.extern.slf4j.Slf4j;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.locks.ReentrantReadWriteLock;
/**
* @author Administrator
*/
@Slf4j(topic = "c.TestReadWriteLock")
public class TestReadWriteLock {
public static void main(String[] args) {
DataContainer dataContainer = new DataContainer();
// new Thread(() -> {
// dataContainer.read();
// }, "t1").start();
//
//
// new Thread(() -> {
// dataContainer.write();
// }, "t2").start();
new Thread(() -> {
dataContainer.writeRead();
}, "t3").start();
}
}
@Slf4j(topic = "c.DataContainer")
class DataContainer {
private Object data;
private ReentrantReadWriteLock rw = new ReentrantReadWriteLock();
private ReentrantReadWriteLock.ReadLock r = rw.readLock();
private ReentrantReadWriteLock.WriteLock w = rw.writeLock();
public Object read() {
log.debug("获取读锁...");
r.lock();
try {
log.debug("读取。。。");
TimeUnit.SECONDS.sleep(1);
} catch (InterruptedException e) {
e.printStackTrace();
} finally {
log.debug("释放读锁。。。");
r.unlock();
return data;
}
}
public void write() {
log.debug("获取写锁...");
w.lock();
try {
log.debug("写数据。。。");
TimeUnit.SECONDS.sleep(1);
} catch (InterruptedException e) {
e.printStackTrace();
} finally {
log.debug("释放写锁。。。");
w.unlock();
}
}
public void readWrite() {
log.debug("获取读锁...");
r.lock();
try {
log.debug("获取写锁...");
try {
w.lock();
log.debug("写数据。。。");
TimeUnit.SECONDS.sleep(1);
} catch (Exception e) {
e.printStackTrace();
} finally {
w.unlock();
}
} finally {
log.debug("释放读锁。。。");
r.unlock();
}
}
public Object writeRead() {
log.debug("获取写锁...");
w.lock();
try {
try {
log.debug("获取读锁...");
r.lock();
log.debug("读数据。。。");
TimeUnit.SECONDS.sleep(1);
return data;
} catch (Exception e) {
e.printStackTrace();
return null;
} finally {
log.debug("释放写锁。。。");
w.unlock();
}
} finally {
log.debug("释放读锁。。。");
r.unlock();
}
}
}
4 模拟数据缓存
场景描述:在很多应用中,我们都需要对数据库进行读写操作,获取需要的数据。在大多数情况下,数据的读取远大于数据的修改、删除等操作,如果在查询路径及条件一样的情况下,每次都从新从数据库获取数据,很影响系统的性能。这时我们考虑加入缓存,那么在多用户(多线程)环境下,如何保证缓存数据的一致性呢?考虑到读多写少的情况,我们使用ReentrantReadWriteLock读写锁加锁。
关于缓存更新,有2种策略:
- 先清空缓存在更新数据库
- 多线程环境下,先清空缓存,此时如果数据还没有更新完成,有其他线程来读取数据,读取就是旧数据,放入缓存。等数据库更新完成,数据库的数据更新了,但是应用一直读取的是旧数据。
- 先更新数据库在清空缓存
- 多线程环境下,先更新数据库。在没有更新完数据的情况下,就算有线程读取旧数据,放入缓存。等数据更新完成,情况缓存之后,之后的数据读取都是新数据。此时出现数据不一致的概率大大降低。
要想保证数据的强一致性,需要数据库加锁相关的知识,等后面讲解到Mqsql时,详细说明。这里我们选择先更新数据库在清空缓存的方式。
4.1 应用初始化无缓存
构建个GenericDao工具类实现从数据库读写操作,源代码下:
import java.beans.BeanInfo;
import java.beans.IntrospectionException;
import java.beans.Introspector;
import java.beans.PropertyDescriptor;
import java.lang.reflect.Constructor;
import java.lang.reflect.InvocationTargetException;
import java.lang.reflect.Method;
import java.sql.*;
import java.util.*;
public class GenericDao {
static String URL = "jdbc:mysql://localhost:3306/exercise";
static String USERNAME = "root";
static String PASSWORD = "root";
{
try {
Class.forName("com.mysql.cj.jdbc.Driver");
} catch (ClassNotFoundException e) {
e.printStackTrace();
}
}
public <T> List<T> queryList(Class<T> beanClass, String sql, Object... args) {
System.out.println("sql: [" + sql + "] params:" + Arrays.toString(args));
BeanRowMapper<T> mapper = new BeanRowMapper<>(beanClass);
return queryList(sql, mapper, args);
}
public <T> T queryOne(Class<T> beanClass, String sql, Object... args) {
System.out.println("sql: [" + sql + "] params:" + Arrays.toString(args));
BeanRowMapper<T> mapper = new BeanRowMapper<>(beanClass);
return queryOne(sql, mapper, args);
}
private <T> List<T> queryList(String sql, RowMapper<T> mapper, Object... args) {
try (Connection conn = DriverManager.getConnection(URL, USERNAME, PASSWORD)) {
try (PreparedStatement psmt = conn.prepareStatement(sql)) {
if (args != null) {
for (int i = 0; i < args.length; i++) {
psmt.setObject(i + 1, args[i]);
}
}
List<T> list = new ArrayList<>();
try (ResultSet rs = psmt.executeQuery()) {
while (rs.next()) {
T obj = mapper.map(rs);
list.add(obj);
}
}
return list;
}
} catch (SQLException e) {
throw new RuntimeException(e);
}
}
private <T> T queryOne(String sql, RowMapper<T> mapper, Object... args) {
List<T> list = queryList(sql, mapper, args);
return list.size() == 0 ? null : list.get(0);
}
public int update(String sql, Object... args) {
System.out.println("sql: [" + sql + "] params:" + Arrays.toString(args));
try (Connection conn = DriverManager.getConnection(URL, USERNAME, PASSWORD)) {
try (PreparedStatement psmt = conn.prepareStatement(sql)) {
if (args != null) {
for (int i = 0; i < args.length; i++) {
psmt.setObject(i + 1, args[i]);
}
}
return psmt.executeUpdate();
}
} catch (SQLException e) {
throw new RuntimeException(e);
}
}
interface RowMapper<T> {
T map(ResultSet rs);
}
static class BeanRowMapper<T> implements RowMapper<T> {
private Class<T> beanClass;
private Map<String, PropertyDescriptor> propertyMap = new HashMap<>();
public BeanRowMapper(Class<T> beanClass) {
this.beanClass = beanClass;
try {
BeanInfo beanInfo = Introspector.getBeanInfo(beanClass);
PropertyDescriptor[] propertyDescriptors = beanInfo.getPropertyDescriptors();
for (PropertyDescriptor pd : propertyDescriptors) {
propertyMap.put(pd.getName().toLowerCase(), pd);
}
} catch (IntrospectionException e) {
throw new RuntimeException(e);
}
}
@Override
public T map(ResultSet rs) {
try {
ResultSetMetaData metaData = rs.getMetaData();
int columnCount = metaData.getColumnCount();
Constructor<T> constructor = beanClass.getDeclaredConstructor();
constructor.setAccessible(true);
T t = constructor.newInstance();
for (int i = 1; i <= columnCount; i++) {
String columnLabel = metaData.getColumnLabel(i);
PropertyDescriptor pd = propertyMap.get(columnLabel.toLowerCase());
if (pd != null) {
Method method = pd.getWriteMethod();
method.setAccessible(true);
method.invoke(t, rs.getObject(i));
}
}
return t;
} catch (SQLException | InstantiationException | IllegalAccessException | InvocationTargetException | NoSuchMethodException e) {
throw new RuntimeException(e);
}
}
}
}
注:基于msyql不同版本,选择合适的msyql驱动器。
- queryList():查询结果为集合
- queryOne():查询一个
- update():更新操作
- BeanRowMapper():将查询结果集转化为对应的Bean
进行二次同样的查询和一次更新,测试结果如下:
=============>查询
sql: [select * from emp where empno = ?] params:[7369]
Emp(empno=7369, ename=王五, job=销售员, sal=1000.00)
sql: [select * from emp where empno = ?] params:[7369]
Emp(empno=7369, ename=王五, job=销售员, sal=1000.00)
================>更新
sql: [update emp set sal = ? where empno = ?] params:[1500, 7369]
sql: [select * from emp where empno = ?] params:[7369]
Emp(empno=7369, ename=王五, job=销售员, sal=1500.00)
4.2 加入缓存改造
如果确定查询是一样的呢?这里我们构建map,key为SqlPair,值为对应的查询结果。SqlPari成员变量sql模板语句和对应的参数值,通过重写hashcode来保证如果查询语句和参数都相同,那么查询就是一样的。
缓存我们用HashMap模拟。因为是简单的模拟,我们把相应的类和测试放一个文件中。
改造后的带缓冲的dao源代码如下:
static class GenericDaoCached extends GenericDao {
private GenericDao dao = new GenericDao();
private Map<SqlPair, Object> map = new HashMap<>();
private ReentrantReadWriteLock rw = new ReentrantReadWriteLock();
@Override
public <T> List<T> queryList(Class<T> beanClass, String sql, Object... args) {
return dao.queryList(beanClass, sql, args);
}
@Override
public <T> T queryOne(Class<T> beanClass, String sql, Object... args) {
// 先从缓存中找,找到直接返回
SqlPair key = new SqlPair(sql, args);
;
rw.readLock().lock();
try {
T value = (T) map.get(key);
if (value != null) {
return value;
}
} finally {
rw.readLock().unlock();
}
rw.writeLock().lock();
try {
// 多个线程
T value = (T) map.get(key);
if (value == null) {
// 缓存中没有,查询数据库
value = dao.queryOne(beanClass, sql, args);
map.put(key, value);
}
return value;
} finally {
rw.writeLock().unlock();
}
}
@Override
public int update(String sql, Object... args) {
rw.writeLock().lock();
try {
// 先更新库
int update = dao.update(sql, args);
// 清空缓存
map.clear();
return update;
} finally {
rw.writeLock().unlock();
}
}
class SqlPair {
private String sql;
private Object[] args;
public SqlPair(String sql, Object[] args) {
this.sql = sql;
this.args = args;
}
@Override
public boolean equals(Object o) {
if (this == o) {
return true;
}
if (o == null || getClass() != o.getClass()) {
return false;
}
SqlPair sqlPair = (SqlPair) o;
return Objects.equals(sql, sqlPair.sql) &&
Arrays.equals(args, sqlPair.args);
}
@Override
public int hashCode() {
int result = Objects.hash(sql);
result = 31 * result + Arrays.hashCode(args);
return result;
}
}
查找流程如下:
- 根据sql语句和参数构建SqlPair
- 加读锁,先从缓存中获取。
- 缓存有返回数据,读锁解锁
- 如果没有加写锁,在此判断缓存中是否没有。如果还是没有从数据库读取。
- 双重判断,多线程环境下
- 返回数据,读锁解锁
更新流程:
- 加写锁
- 更新数据库
- 清空缓存
- 写锁解锁
测试代码部分,GenericDao的实现改为GenericDaoCached,其他同上,测试结果:
=============>查询
sql: [select * from emp where empno = ?] params:[7369]
Emp(empno=7369, ename=王五, job=销售员, sal=1500.00)
Emp(empno=7369, ename=王五, job=销售员, sal=1500.00)
================>更新
sql: [update emp set sal = ? where empno = ?] params:[1600, 7369]
sql: [select * from emp where empno = ?] params:[7369]
Emp(empno=7369, ename=王五, job=销售员, sal=1600.00)
此时相同的查询只在第一次的时候从数据库获取,其他查询从缓存获取。更新后正确的获取更新后的数据。完整代码见下面代码仓库。
下面我们从源代码层面,对ReentrantReadWriteLock进行详细的分析。
5 后记
如有问题,欢迎交流讨论。
❓QQ:806797785
⭐️源代码仓库地址:https://gitee.com/gaogzhen/concurrent
参考:
[1]黑马程序员.黑马程序员深入学习Java并发编程,JUC并发编程全套教程[CP/OL].2020-01-18/2022-12-12.p247~p252.