前文了解了线程通信方式中的Object.wait/Object.notify以及Semaphore,接下来我们继续了解其他的线程间通信方式。
CountDownLatch
CountDownLatch
利用一个指定的计数初始化,由于调用了countDown方法,await方法会阻塞直到当前技术为0,之后所有等待的线程都会被释放,任何后续的await都会立即返回。
CountDownLatch
是一个通用的同步工具,可以用于许多目的。如果CountDownLatch
初始化的计数为1,则可以用作简单的开/关锁存器或门,也就意味着所有调用await的线程都在门处等待,直到由调用countDown()的线程打开它。一个初始化为N的CountDownLatch
可以用来让一个线程等待,直到N个线程完成了某个动作,或者某个动作完成了N次。
CountDownLatch
核心接口说明如下:
方法名称 | 描述 | 备注 |
---|---|---|
await() | 使当前线程进入等待状态,直到计数减为0或者当前线程被中断,否则不会唤醒 | / |
await(long timeout, TimeUnit unit) | 等待timeout时间后,计数还没减成0,则等待线程唤醒继续执行,不再等待 | / |
countDown() | 对计数减1,如果减到了0,就会唤醒等待的线程 | / |
getCount() | 获取当前计数的值 | / |
举个简单例子,某主任务中一共要发起5个子任务,等待所有任务完成后主任务继续,此时在主任务执行线程以计数取值为5初始化CountDownLatch
,调用await等待,在每个子任务完成时,调用countDown方法使计数减1,等到5个子任务全部完成后,此时计数减为0,主任务唤醒,继续执行。
通过例子和描述可以看出CountDownLatch
属于一次性操作,计数无法重置。
以主任务中发起5个子任务为例,使用CountDownLatch
的代码实现如下:
public static void main(String[] args) {
Thread mainThread = new Thread(new Runnable() {
@Override
public void run() {
System.out.println("MainThread start working");
CountDownLatch countDownLatch = new CountDownLatch(5);
try {
for (int i=0;i<5;i++) {
Thread subThread = new Thread(new Runnable() {
@Override
public void run() {
System.out.println(Thread.currentThread().getName()+" start working");
System.out.println(Thread.currentThread().getName()+" completed");
countDownLatch.countDown();
}
});
subThread.setName("SubThread"+(i+1));
subThread.start();
}
countDownLatch.await();
} catch (InterruptedException e) {
e.printStackTrace();
}
System.out.println("MainThread completed");
}
});
mainThread.setName("MainThread");
mainThread.start();
}
运行结果如下:
对于CountDownLatch
而言,如果在计数为0前,有子任务发生异常导致退出,则所有等待的线程都会一致等待,直到超时时间来临,所以使用CountDownLatch
一定要注意线程异常的处理。
CyclicBarrier
CyclicBarrier
同样是一个多线程同步的工具类,译为循环栅栏,其允许一组线程互相等待到达一个同步的屏障点,CyclicBarrier
在一组固定大小的线程中存在相互等待的场景下十分有用,之所以称为循环栅栏,是因为其在其他线程释放后可以重新使用。
CyclicBarrier
使用一个指定计数和Runnable进行初始化,初始化完成后,当CyclicBarrier.await调用次数等于计数,也就是等待线程数等于计数时,则会触发初始化传入Runnable运行,该Runnable运行在最后一个进入等待的线程中,随后所有等待线程唤醒继续执行。
7位选手参加田径比赛,所有人在起点就位,准备完成后,发射信号枪起跑。用CyclicBarrier
来实现比赛逻辑,则以计数7和运行发射信号枪的Runnable初始化CyclicBarrier
,每一个选手起点准备完成后,都调用一次CyclicBarrier.await,当所有选手都准备完成,发射信号枪的Runnable运行,比赛开始,实现代码如下:
public static void main(String[] args) {
CyclicBarrier cyclicBarrier = new CyclicBarrier(7, new Runnable() {
@Override
public void run() {
System.out.println("所有选手准备完成,发射信号枪,比赛开始,运行在:"+Thread.currentThread().getName());
}
});
for (int i=0;i<7;i++) {
int finalI = i;
Thread thread = new Thread(new Runnable() {
@Override
public void run() {
System.out.println((finalI +1)+"号选手准备完成,运行在:"+Thread.currentThread().getName());
try {
cyclicBarrier.await();
} catch (InterruptedException e) {
e.printStackTrace();
} catch (BrokenBarrierException e) {
e.printStackTrace();
}
System.out.println((finalI +1)+"号选手准备达到准点,运行在:"+Thread.currentThread().getName());
}
});
thread.setName("选手"+(i+1));
thread.start();
}
}
输出如下:
对于CyclicBarrier
而言,如果在所有线程都到达屏障陷入阻塞前,如果有线程发生异常导致未到达栅栏提前退出,则所有等待在栅栏都会以BrokenBarrierException或InterruptedException异常退出。
Lock,Condition与ReentrantLock
Lock通常译为锁,其是一种控制多个线程对共享资源访问的工具,与同步方法和语句相比,以接口定义,在其内部提供了更广泛的锁操作,其允许的关联结构更灵活,比如不同的属性,多个关联的Condition对象等。通常情况下,可以通过锁控制一次只有一个线程可以访问共享资源,所有线程在访问共享资源时都需要获取锁。部分锁也支持多共享资源进行并发访问,比如ReadWriteLock。
Lock接口核心函数如下表所示:
函数名称 | 描述 | 备注 |
---|---|---|
lock() | 获取对象锁,如果锁不可用,则当前线程被阻塞,在获取锁前处于休眠状态 | / |
unlock() | 释放锁,锁释放操作应处于finally块内,确保在任何情况下都能释放,以免造成死锁 | / |
tryLock() | 锁是否可用,true-可用,false-不可用 | / |
Condition通常译为条件,其将Object的wait,notify,notifyAll提取到不同的对象中,通过将这些对象与任一锁对象的使用结合起来,从而达到单个对象具有多个等待集的效果,在Lock+Condition的模式中,Lock代替了前文中Object.wait+synchronized中的synchronized,Condition代替了Object.wait/notify/notifyAll。
Condition核心函数如下表所示:
函数名称 | 描述 | 备注 |
---|---|---|
await() | 当前线程进入等待状态,直到被通知或中断,线程从await方法返回进入运行状态的情况包括:1.其他线程调用了该Condition的signal或signalAll方法。2.其他线程中断当前线程。 | / |
signal() | 唤醒一个等待在该Condition上的线程 | / |
signalAll() | 唤醒所有等待在该Condition上的线程 | / |
在实现上Lock和Condition均为接口,所以我们一般使用Lock的实现类ReentrantLock来实现锁机制,借助ReentrantLock对象内部的newCondition获得Condition的实现对象,进而搭配完成线程间通信,接下来我们使用ReentrantLock实现消费者-生产者模式,代码如下:
private static int count = 0;
public static void main(String[] args) {
// 创建Lock对象
ReentrantLock lock = new ReentrantLock();
// 创建Condition对象
Condition condition = lock.newCondition();
// 创建盘子
Counter counter = new Counter();
Thread incThread = new Thread(new Runnable() {
@Override
public void run() {
while (count < 4) {
lock.lock();
try {
if (counter.getCount() != 0) {
condition.await();
}
counter.incCount();
count++;
System.out.println("Inc Thread ++,current count is:" + counter.getCount());
condition.signalAll();
} catch (InterruptedException e) {
throw new RuntimeException(e);
} finally {
lock.unlock();
}
}
}
});
incThread.setName("Inc Thread");
incThread.start();
Thread decThread = new Thread(new Runnable() {
@Override
public void run() {
while (count < 4) {
lock.lock();
try {
if (counter.getCount() == 0) {
condition.await();
}
counter.decCount();
System.out.println("Dec Thread --,current count is:" + counter.getCount());
condition.signalAll();
} catch (InterruptedException e) {
throw new RuntimeException(e);
} finally {
lock.unlock();
}
}
}
});
decThread.setName("Dec Thread");
decThread.start();
}
运行结果如下:
前面提到过,Condition将Object的wait,notify,notifyAll提取到不同的对象中,也就意味着我们可以更灵活的控制锁获取,我们新建两个Condition对象,一个控制生产者,一个控制消费者,代码如下:
private static int count = 0;
public static void main(String[] args) {
ReentrantLock lock = new ReentrantLock();
// 控制生产者
Condition incCondition = lock.newCondition();
// 控制消费者
Condition decCondition = lock.newCondition();
Counter counter = new Counter();
Thread incThread = new Thread(new Runnable() {
@Override
public void run() {
while (count < 4) {
lock.lock();
try {
if (counter.getCount() != 0) {
// 阻塞当前生产线程
incCondition.await();
}
counter.incCount();
count++;
System.out.println("Inc Thread ++,current count is:" + counter.getCount());
// 唤醒消费者
decCondition.signalAll();
} catch (InterruptedException e) {
throw new RuntimeException(e);
} finally {
lock.unlock();
}
}
}
});
incThread.setName("Inc Thread");
incThread.start();
Thread decThread = new Thread(new Runnable() {
@Override
public void run() {
while (count < 4) {
lock.lock();
try {
if (counter.getCount() == 0) {
// 阻塞当前消费线程
decCondition.await();
}
counter.decCount();
System.out.println("Dec Thread --,current count is:" + counter.getCount());
// 唤醒生产线程
incCondition.signalAll();
} catch (InterruptedException e) {
throw new RuntimeException(e);
} finally {
lock.unlock();
}
}
}
});
decThread.setName("Dec Thread");
decThread.start();
}
运行结果如下:
当然ReentrantLock也可以不搭配Condition独立使用的,通过lock函数获取锁,通过unlock解锁,代码如下所示:
public static void main(String[] args) {
ReentrantLock reentrantLock = new ReentrantLock();
ExecutorService testSynchronizedBlock = Executors.newCachedThreadPool();
testSynchronizedBlock.execute(new Runnable() {
@Override
public void run() {
System.out.println(Thread.currentThread().getName()+" enter lock first");
reentrantLock.lock();
System.out.println(Thread.currentThread().getName()+" enter lock");
try {
Thread.sleep(2000);
} catch (InterruptedException e) {
throw new RuntimeException(e);
}
System.out.println(Thread.currentThread().getName()+" enter lock again");
reentrantLock.lock();
try {
Thread.sleep(2000);
} catch (InterruptedException e) {
throw new RuntimeException(e);
}
System.out.println(Thread.currentThread().getName()+" exit lock again");
reentrantLock.unlock();
System.out.println(Thread.currentThread().getName()+" exit lock");
reentrantLock.unlock();
}
});
testSynchronizedBlock.execute(new Runnable() {
@Override
public void run() {
System.out.println(Thread.currentThread().getName()+" enter lock before");
reentrantLock.lock();
System.out.println(Thread.currentThread().getName()+" enter lock");
try {
Thread.sleep(3000);
} catch (InterruptedException e) {
throw new RuntimeException(e);
}
System.out.println(Thread.currentThread().getName()+" exit lock");
reentrantLock.unlock();
}
});
}
上述代码,在第一个代码中演示了ReentrantLock的可重入性,在使用并发锁相关工具类时,一定要注意获取锁和释放锁必须一一配对,在任何情况下都要确保能释放锁,以免造成死锁。