Java 并发编程的艺术(三)
文章目录
- Java 并发编程的艺术(三)
- Java 内存模型
- Java 内存模型的基础
- Java 内存模型的抽象结构
- 从源代码到指令序列的重排序
- 重排序
- happens-before
- JMM 的设计
- happens-before 的定义
- Java 中的锁
- Lock 接口
- 代码清单
- 相关API
- 重入锁
- 公平锁和非公平锁
- 读写锁
- 读写锁的接口与实例
- Condition 接口
- Java 并发容器和框架
- ConcurrentHashMap 的实现原理与使用
- ConcurrentHashMap 的结构
- ConcurrentHashMap 的初始化
- segments 数组
- 定位 Segment
- ConcurrentHashMap 的操作
- get
- put
- 阻塞队列
- 什么是阻塞队列
- Java 里的阻塞队列
- ArrayBlockingQueue
- LinkedBlockingQueue
- SynchronousQueue
- PriorityBlockingQueue
- DelayQueue
- 阻塞队列的实现原理
Java 内存模型
Java 内存模型的基础
Java 内存模型的抽象结构
- 线程之间的共享变量存储在主内存(
Main Memory
)中,每个线程都有一个私有的本地内存(Local Memory
),本地内存中存储了该线程以读/写共享变量的副本。本地内存是JMM
的一个抽象概念,并不真实存在。
从源代码到指令序列的重排序
-
编译器优化的重排序。编译器在不改变单线程程序语义的前提下,可以重新安排
语句的执行顺序。 -
指令级并行的重排序。现代处理器采用了指令级并行技术将多条指令重叠执行。如果不存在数据依赖性,处理器可以改变语句对应机器指令的执行顺序。
-
内存系统的重排序。由于处理器使用缓存和读/写缓冲区,这使得加载和存储操作看上去可能是在乱序执行。
- 从
Java
源代码到最终实际执行的指令序列,会分别经历下面3
种重排序
- 从
重排序
- 重排序是指编译器和处理器为了优化程序性能而对指令序列进行重新排序的一种手段
happens-before
JMM 的设计
/**
* 存在 3 个 happens-before 关系
* a happens-before c
* b happens-before c
* a happens-before b
* <p> 1 和 2 是硬性条件 计算 c;3 这个条件不会影响计算结果 </p>
*/
@Test
public void test1() {
int a = 1;
int b = 2;
int c = a + b;
}
JMM
把happens-before
要求禁止的重排序分为了下面两类 :- 会改变程序执行结果的重排序
- 不会改变程序执行结果的重排序
JMM
对这两种不同性质的重排序,采取了不同的策略:- 对于会改变程序执行结果的重排序,
JMM
要求编译器和处理器必须禁止这种重排序 - 对于不会改变程序执行结果的重排序,
JMM
对编译器和处理器不做要求(JMM
允许这种重排序)
- 对于会改变程序执行结果的重排序,
happens-before 的定义
- 如果一个操作
happens-before
另一个操作,那么第一个操作的执行结果将对第二个操作可见,而且第一个操作的执行顺序排在第二个操作之前。 - 两个操作之间存在
happens-before
关系,并不意味着Java
平台的具体实现必须要按照happens-before
关系指定的顺序来执行。如果重排序之后的执行结果,与按happens-before
关系来执行的结果一致,那么这种重排序并不非法(也就是说,JMM
允许这种重排序)。
Java 中的锁
Lock 接口
- 锁是用来控制多个线程访问共享资源的方式,一般来说,一个锁能够防止多个线程同时访问共享资源(但是有些锁可以允许多个线程并发的访问共享资源,比如读写锁)。
代码清单
Lock lock = new ReentrantLock();
lock.lock();
try {
// 具体业务
} finally {
lock.unlock();
}
相关API
- 实例代码:三个窗口共售出 100 张票
/**
* 三个窗口总共买 100 张票
*
* @throws InterruptedException interrupted exception
*/
@Test
public void test2() throws InterruptedException {
Tickets tickets = new Tickets();
new Thread(tickets, "1号窗口").start();
new Thread(tickets, "2号窗口").start();
new Thread(tickets, "3号窗口").start();
Thread.sleep(10000);
}
static class Tickets implements Runnable {
private final Logger logger = LoggerFactory.getLogger(Tickets.class);
private int tickets = 100;
private final Lock lock = new ReentrantLock(false);
@Override
public void run() {
while (true) {
lock.lock();
try {
/*上Lock锁*/
if (tickets > 0) {
logger.info("{} ======完成售票,余票为{}", Thread.currentThread().getName(), --tickets);
} else {
logger.info("{} ======余票为{}", Thread.currentThread().getName(), tickets);
break;
}
} catch (Exception e) {
logger.error(e.getMessage(), e);
} finally {
// 释放 Lock 锁避免发生死锁
lock.unlock();
}
}
}
}
重入锁
- 支持重进入的锁,它表示该锁能够支持一个线程对资源的重复加锁。除此之外,该锁的还支持获取锁时的公平和非公平性选择。
公平锁和非公平锁
- 如果在绝对时间上,先对锁进行获取的请求一定先被满足,那么这个锁是公平的,反之,是不公平的。公平的获取锁,也就是等待时间最长的线程最优先获取锁,也可以说锁获取是顺序的。 ReentrantLock 提供了一个构造函数,能够控制锁是否是公平的。
// 非公平锁
Lock lock = new ReentrantLock(false);
// 公平锁
Lock lock = new ReentrantLock(true);
读写锁
- 读写锁维护了一对锁,一个读锁和一个写锁,通过分离读锁和写锁,使得并发性相比一般的排他锁有了很大提升。
ReadWriteLock
中的读锁是共享锁,写锁是排他锁,共享锁允许不同线程同时读,排他锁只允许一个线程写,其他线程等待。
读写锁的接口与实例
public interface ReadWriteLock {
/**
* Returns the lock used for reading.
*
* @return the lock used for reading
*/
Lock readLock();
/**
* Returns the lock used for writing.
*
* @return the lock used for writing
*/
Lock writeLock();
}
ReadWriteLockTest.java
public class ReadWriteLockTest {
@Test
public void test() throws InterruptedException {
// 创建读写锁
ReadWriteLock lock = new ReentrantReadWriteLock();
// 读锁
Lock readLock = lock.readLock();
// 写锁
Lock writeLock = lock.writeLock();
Map<String, Object> map = new HashMap<>();
for (int i = 10; i > 0; i--) {
String key = String.valueOf(System.currentTimeMillis());
WriteTask writeTask = new WriteTask(writeLock, map, key);
ReadTask readTask = new ReadTask(readLock, map, key);
ThreadPoolUtils.executor(writeTask);
ThreadPoolUtils.executor(readTask);
}
Thread.sleep(10000);
}
/**
* 读任务
*/
private class ReadTask implements Runnable {
private final Logger logger = LoggerFactory.getLogger(ReadTask.class);
private Lock readLock;
private Map<String, Object> map;
private String key;
private ReadTask(Lock readLock, Map<String, Object> map, String key) {
this.readLock = readLock;
this.map = map;
this.key = key;
}
@Override
public void run() {
readLock.lock();
try {
if (Objects.nonNull(map)) {
Object object = map.get(key);
logger.info("ReadTask read value:{}", object);
}
} catch (Exception e) {
logger.error(e.getMessage(), e);
} finally {
// 释放锁
readLock.unlock();
}
}
}
/**
* 写任务
*/
private class WriteTask implements Runnable {
private final Logger logger = LoggerFactory.getLogger(WriteTask.class);
private Lock writeLock;
private Map<String, Object> map;
private String key;
private WriteTask(Lock writeLock, Map<String, Object> map, String key) {
this.writeLock = writeLock;
this.map = map;
this.key = key;
}
@Override
public void run() {
writeLock.lock();
try {
if (Objects.isNull(map)) {
map = new HashMap<>();
}
long l = System.currentTimeMillis();
map.put(key, l);
logger.info("WriteTask write value:{}", l);
} catch (Exception e) {
logger.error(e.getMessage(), e);
} finally {
// 释放锁
writeLock.unlock();
}
}
}
}
Condition 接口
- 任意一个
Java
对象,都拥有一组监视器方法(定义在java.lang.Object
上),主要包括wait()
、wait(long timeout)
、notify()
以及notifyAll()
方法,这些方法与synchronized
同步关键字配合,可以实现等待/通知模式。
public class ConditionTest {
private static final Logger logger = LoggerFactory.getLogger(ConditionTest.class);
/**
* Condition 是一个多线程间协调通信的工具类,使得某个,或者某些线程一起等待某个条件(Condition),
* 只有当该条件具备( signal 或者 signalAll方法被带调用)时 ,这些等待线程才会被唤醒,从而重新争夺锁。
*
* @throws InterruptedException interrupt exception
*/
@Test
public void test1() throws InterruptedException {
Lock lock = new ReentrantLock();
Condition condition = lock.newCondition();
new Thread(() -> {
try {
lock.tryLock();
logger.info("线程:{} 等待信号", Thread.currentThread().getId());
condition.await();
} catch (InterruptedException e) {
logger.error(e.getMessage(), e);
} finally {
logger.info("线程:{} 得到信号", Thread.currentThread().getId());
lock.unlock();
}
}).start();
TimeUnit.MILLISECONDS.sleep(10);
new Thread(() -> {
lock.tryLock();
logger.info("线程:{} 拿到锁", Thread.currentThread().getId());
condition.signal();
logger.info("线程:{} 发出信号", Thread.currentThread().getId());
lock.unlock();
}).start();
TimeUnit.SECONDS.sleep(2);
}
}
Java 并发容器和框架
ConcurrentHashMap 的实现原理与使用
- 在并发编程中使用
HashMap
可能导致程序死循环。而使用线程安全的HashTable
效率又非常低下,基于以上两个原因,便有了ConcurrentHashMap
的登场机会。
ConcurrentHashMap 的结构
- 类图
- 结构图
ConcurrentHashMap 的初始化
ConcurrentHashMap
初始化方法是通过initialCapacity
、loadFactor
和concurrencyLevel
等几个参数来初始化segment
数组、段偏移量 segmentShift、段掩码segmentMask
和每个segment
里的HashEntry
数组来实现的 。
segments 数组
- 初始化
segments
数组 :segments
数组的长度是 2 的 N 次方 - 初始化
segmentShift
和segmentMask
:这两个全局变量需要在定位segment
时的散列算法里使用,sshift
等于ssize
从 1 向左移位的次数 - 初始化每个
segment
:segment
的容量threshold=(int) cap*loadFactor
,默认情况下initialCapacity
等于16
,loadfactor
等于0.75
,通过运算cap
等于1
,threshold
等于零。
定位 Segment
ConcurrentHashMap
使用分段锁Segment
来保护不同段的数据,那么在插入和获取元素的时候,必须先通过散列算法定位到Segment
。
ConcurrentHashMap 的操作
get
get
操作的高效之处在于整个get
过程不需要加锁,除非读到的值是空才会加锁重读
public V get(Object key) {
Node<K,V>[] tab; Node<K,V> e, p; int n, eh; K ek;
// 根据key.hashCode()计算hash: 运算后得到得到更散列的hash值
int h = spread(key.hashCode());
if ((tab = table) != null && (n = tab.length) > 0 &&
(e = tabAt(tab, (n - 1) & h)) != null) {
// 如果所要找的元素就在数组上,直接返回结果
if ((eh = e.hash) == h) {
if ((ek = e.key) == key || (ek != null && key.equals(ek)))
return e.val;
}
// 条件成立:即,hash小于0 分2种情况,是树或者正在扩容,需要借助find方法寻找元素,find的寻找方式依据Node的不同子类有不同的实现方式:
// 情况一:eh=-1 是fwd结点 -> 说明当前table正在扩容,且当前查询的这个桶位的数据已经被迁移走了,需要借助fwd结点的内部方法find去查询
// 情况二:eh=-2 是TreeBin节点 -> 需要使用TreeBin 提供的find方法查询。
else if (eh < 0)
return (p = e.find(h, key)) != null ? p.val : null;
// 说明是当前桶位已经形成链表的这种情况: 遍历整个链表寻找元素
while ((e = e.next) != null) {
if (e.hash == h &&
((ek = e.key) == key || (ek != null && key.equals(ek))))
return e.val;
}
}
return null;
}
put
- 由于
put
方法里需要对共享变量进行写入操作,所以为了线程安全,在操作共享变量时必须加锁。 put
方法首先定位到Segment
,然后在Segment
里进行插入操作。- 插入操作需要经历两个步骤,第一步判断是否需要对
Segment
里的HashEntry
数组进行扩容,第二步定位添加元素的位置,然后将其放在HashEntry
数组里。
阻塞队列
什么是阻塞队列
-
在队列为空时等待从队列中获取元素,或者在队列已满时等待向队列中添加元素
-
阻塞队列常用于生产者和消费者的场景,生产者是向队列里添加元素的线程,消费者是从队列里取元素的线程。阻塞队列就是生产者用来存放元素、消费者用来获取元素的容器。
Java 里的阻塞队列
ArrayBlockingQueue
- 有界的队列,基于数组创建,在创建的时候容量确定
// 公平的阻塞队列
ArrayBlockingQueue fairQueue = new ArrayBlockingQueue(1000, true);
LinkedBlockingQueue
- 可选的有界的队列,它的容量可以在创建时指定,也可以不指定,默认为
Integer.MAX_VALUE
,即无界队列。 - 与
ArrayBlockingQueue
不同,LinkedBlockingQueue
在队列已满时仍然可以继续添加元素,只有在指定了容量并且队列已满时才会阻塞添加操作
SynchronousQueue
- 一个没有存储空间的阻塞队列。每个插入操作必须等待另一个线程的移除操作。
PriorityBlockingQueue
PriorityBlockingQueue
是一个支持优先级的无界阻塞队列。- 默认情况下元素采取自然顺序升序排列。也可以自定义类实现
compareTo()
方法来指定元素排序规则,或者初始化PriorityBlockingQueue
时,指定构造参数Comparator
来对元素进行排序。 - 需要注意的是不能保证同优先级元素的顺序。
DelayQueue
-
DelayQueue
是一个支持延时获取元素的无界阻塞队列。队列使用PriorityQueue
来实
现。队列中的元素必须实现Delayed
接口,在创建元素时可以指定多久才能从队列中获
取当前元素。只有在延迟期满时才能从队列中提取元素。 -
DelayQueue
非常有用,可以将DelayQueue
运用在以下应用场景。- 缓存系统的设计:可以用
DelayQueue
保存缓存元素的有效期,使用一个线程循环查询DelayQueue
,一旦能从DelayQueue
中获取元素时,表示缓存有效期到了。 - 定时任务调度:使用
DelayQueue
保存当天将会执行的任务和执行时间,一旦从DelayQueue
中获取到任务就开始执行,比如TimerQueue
就是使用DelayQueue
实现的
- 缓存系统的设计:可以用
阻塞队列的实现原理
- 当队列满时,插入线程会被阻塞,直到队列有空闲位置;当队列为空时,获取线程会被阻塞,直到队列有元素可用。这种阻塞和唤醒的机制是通过条件变量来实现的。
public ArrayBlockingQueue(int capacity, boolean fair) {
if (capacity <= 0)
throw new IllegalArgumentException();
this.items = new Object[capacity];
// 使用ReentrantLock提供线程安全。此锁确保一次只有一个线程可以访问队列。
lock = new ReentrantLock(fair);
// 两个条件变量notEmpty和notFull用于管理队列的阻塞行为。notEmpty条件用于在队列为空时阻塞线程,而notFull条件用于在队列已满时阻塞线程。
notEmpty = lock.newCondition();
notFull = lock.newCondition();
}
take
方法
public E take() throws InterruptedException {
final ReentrantLock lock = this.lock;
lock.lockInterruptibly();
try {
while (count == 0)
// 当前获取lock的线程进入到等待队列
notEmpty.await();
return dequeue();
} finally {
lock.unlock();
}
}
private E dequeue() {
// assert lock.getHoldCount() == 1;
// assert items[takeIndex] != null;
final Object[] items = this.items;
@SuppressWarnings("unchecked")
E x = (E) items[takeIndex];
items[takeIndex] = null;
if (++takeIndex == items.length)
takeIndex = 0;
count--;
if (itrs != null)
itrs.elementDequeued();
// signal()唤醒一个等待线程
notFull.signal();
return x;
}