写在前面
本文一起看下jdk并发包的相关内容。
1:JUC包提供了哪些功能
先通过包结构看下JUC提供的功能:
接下来分别看下。
1.1:锁
JUC中的锁机制提供了比synchronized,wait/notify更加灵活的同步控制,在java.util.concurrent.locks
包中,提供的主要类如下:
Lock接口:顶层接口
ReentrantLock:Lock接口的实现类,重入锁
ReadWriteLock接口:分为读锁和写锁的接口
LockSupport:加锁的工具类
1.2:原子类
解决了多线程+1的问题。所在的包是java.uti.concurrent.atomic
,主要类如下:
AtomicInteger:int原子类
AomicLong:long原子类
LongAdder:分段+1的原子类,进一步提高性性能
1.3:线程池相关类
提供了线程池 功能,所在包为根目录java.util.concurrent
,主要类如下:
1:Executor
任务执行顶层接口
2:ExecutorService
任务执行接口,继承Executor,提供更加全面的任务执行相关方法
3:ThreadFactory
线程工厂接口,提供了创建线程的能力
4:ThreadPoolExecutor
线程池类
5:Execcutors
创建线程池的工厂类,提供创建ThreadPoolExecutor的便捷操作
1.4:并发工具类
所在的包是并发包根目录java.util.concurrent
,主要类如下:
Semaphor:控制并发的线程数
CountDownLatch:控制某个线程等待一组线程执行完毕后再继续执行
CyclicBarrier:控制一组线程在某个状态,待所有线程都执行到该状态时再一起开始执行
1.5:并发集合类
所在的包是并发包根目录java.util.concurrent
,主要类如下:
1:ConcurrentHashMap 并发hashmap
2:ConcurrentSkipListMap 有序的并发map
3:CopyOnWriteArrayList 基于COW的arraylist
写时可读,读写分离思想,提高读数据效率,写会增加内存消耗
2:锁
在jdk中已经提供了synchronized关键字来实现加锁和解锁,为什么在JUC中还额外实现锁机制呢?这是因为synchronized存在如下的不足:
1:解锁和解锁不能灵活控制
进入synchronized同步块加锁,同步块执行完毕解锁
2:没有锁释放的超时机制
必须同步块执行完毕,才会释放锁,降低程序性能,可能造成死锁
3:无法中断
因为此时线程是出于BLOCKED状态的,所以不会响应中断,即调用interrupt方法没有响应
对于3
,测试如下:
- Lock测试
代码:
public class ChongRuLockMain {
private static Lock lock = new ReentrantLock();
public static void main(String[] args) {
// 线程1先上锁,结束,但是不释放锁,即不unlock
Thread t1 = new Thread(() -> ChongRuLockMain.m1(), "线程1号");
t1.start();
// 因为线程1没有unlock,所以线程2将被lockInterruptibly
Thread t2 = new Thread(() -> ChongRuLockMain.m1(), "线程2号");
t2.start();
// 中断被lockInterruptibly方法阻塞的线程2
t2.interrupt();
}
private static void m1() {
try {
lock.lockInterruptibly();
} catch (InterruptedException e) {
System.out.println(Thread.currentThread().getName() + " is interrupted!!!");
}
}
}
运行:
线程2号 is interrupted!!!
- synchronized测试
代码:
public class SynchronizedLockMain {
private static Object lock1 = new Object();
public static void main(String[] args) {
Thread t1 = new Thread(() -> SynchronizedLockMain.m1(), "线程1号");
t1.start();
Thread t2 = new Thread(() -> SynchronizedLockMain.m1(), "线程2号");
t2.start();
// 中断被lockInterruptibly方法阻塞的线程2
t2.interrupt();
}
private static void m1() {
synchronized (lock1) {
try {
TimeUnit.SECONDS.sleep(300000);
} catch (InterruptedException e) {
System.out.println(Thread.currentThread().getName() + " is interrupted!!!");
}
}
}
}
什么也不会输出,即没有响应中断,因为此时线程处于BLOCKED状态。
2.1:Lock接口
类权限定名称java.util.concurrent.locks.Lock
。
2.1.1:主要方法
1:lock()
加锁,类比synchronized(obj)
2:unlock()
解锁,类比synchronized (obj) {} 中的}
3:lockInterruptibly()
支持阻塞的线程方法,即被该方法阻塞的线程会响应中断,亦即调用thread.interrupt()方法可以中断该线程
4:tryLock
尝试加锁方法,加锁成功则返回true,反之返回false,程序通过返回值判断是否加锁成功即可
5:tryLock(long time, TimeUnit unit)
带有超时的尝试获取锁,阻塞期间线程可以响应中断
6:newCondition
创建Condition,类似obj.wait(),obj.notify机制,阻塞和唤醒机制,这里对应的方法是await(),signal()
如:
Condition notFull = lock.newCondition();
Condition notEmpty = lock.newCondition();
2.2:可重入锁
重入,即已经获取了锁的线程,可以重复获取锁,对应的实现类是java.util.concurrent.locks.ReentrantLock
,如下通过可重入锁实现多线程+1操作并保证结果正确的例子,源码:
public class LoopCounter {
public static void main(String[] args) {
int loopNum = 100_0000;
LoopCounter loopCounter = new LoopCounter();
IntStream.range(0, loopNum).parallel().forEach(i -> loopCounter.addAndGet());
System.out.println("sum 最终:" + loopCounter.getSum());
}
private int sum = 0;
// 1:可重入 ReentrantLock
// 2:公平锁 true --> 越早阻塞的线程越早获取锁,反之大家获取锁的机会平等,随机
private Lock lock = new ReentrantLock(true);
public int addAndGet() {
try {
lock.lock();
return ++sum;
} finally {
lock.unlock();
}
}
public int getSum() {
return sum;
}
}
运行:
sum 最终:1000000
2.3:读写锁
主要方法:
// 获取写锁
public ReentrantReadWriteLock.WriteLock writeLock() { return writerLock; }
// 获取读锁
public ReentrantReadWriteLock.ReadLock readLock() { return readerLock; }
类全限定名称java.util.concurrent.locks.ReentrantReadWriteLock
,分为分为读锁和写锁,其中读锁是共享锁,写锁是排它锁,如下测试代码:
public class ReadWriteLockCounter {
public static void main(String[] args) {
int loopNum = 100_0090;
ReadWriteLockCounter loopCounter = new ReadWriteLockCounter();
IntStream.range(0, loopNum).parallel().forEach(i -> loopCounter.incrAndGet());
System.out.println("sum 最终:" + loopCounter.getSum());
}
private int sum = 0;
// 可重复 读写 公平锁
private ReadWriteLock readWriteLock = new ReentrantReadWriteLock(true);
public int incrAndGet() {
try {
readWriteLock.writeLock().lock();
return ++sum;
} finally {
readWriteLock.writeLock().unlock();
}
}
public int getSum() {
try {
readWriteLock.readLock().lock();
return sum;
} finally {
readWriteLock.readLock().unlock();
}
}
}
运行:
sum 最终:1000090
2.4:Condition
通过Lock.newCondition方法创建,类全限定名称java.util.concurrent.locks.Condition
,主要方法如下:
1:await()
等待信号唤醒,类似Object.wait(),线程可响应中断
2:awaitUninterruptily()
等待信号唤醒,类似Object.wait(),线程不响应中断,即不可被中断
3:await(long time, TimeUnit unit)
await()方法的带有超时版本,超时返回false
4:awaitUntil(Date deadline)
await()方法的带有超时版本,超时返回false
5:signal()
给一个被await的线程发送信号,类似Object.notify
6:signalAll()
给所有被await的线程发送信号,类似Object.notifyAll
2.5:LockSupport
加锁工具类,类全限定名称java.util.concurrent.locks.LockSupport
,主要方法如下:
1:park()
线程阻塞方法
2:park(Object blocker)
线程阻塞方法
3:park(Object blocker, long nacos)
带有超时的线程阻塞方法
4:unpark(Thread thread)
解锁,一个要解锁的线程作为参数
5:getBlocker(Thread t)
2.6:锁的最佳实践
在douge lea的著作<Java并发编程:设计原则与模式>
一书中总结了锁的3条最佳实践,如下:
1:永远只在更新对象的成员变量时加锁
2:永远只在访问可变的成员变量时加锁
3:永远不在调用其他对象的方法时加锁
总结其实就是如下2条:
1:降低锁的范围
尽量降低同步代码块的范围
2:细化锁的粒度
将一个大锁分为多个小锁
3:原子类
所在的包为java.util.concurrent.atomic
,如下:
用来解决多线程环境下+1的问题。
3.1:底层原理
使用Unsafe的ComareAndSwap,依赖于CPU指令cas,如果经过cas操作发现不能写入,则会进行自旋,自旋后再次尝试写入。另外value使用volatile修改,private volatile long value;
,保证可见性。所以cas本质上是一种乐观锁的实现。
3.2:适用场景
当压力不大或者一般时,大部分可以一次cas操作成功,即写入数据,但是当压力较大时,写入不成功导致大量的自旋,会消耗很大的系统资源。因此原子类适用是压力不大或压力一般的场景。
3.3:升级类LongAdder
在前面的分析中,我们提到了如果是cas操作不成功,则会通过自旋来尝试再次写入,而自旋本身则会消耗CPU等系统资源,为了降低竞争,提供了类XxxxAdder
,当前有LongAdder,DoubleAdder,使用了分段思想
,如下:
1:将原子类的热点数据value分散到cell[]数组中,这样线程分散到cell数组的不同元素,降低冲突的概率,每个线程执行cell[i]++
2:通过将cell[]数组所有元素求和,获取结果
4:线程池相关类
参考多线程之线程池 。
5:并发工具类
为了更加方便控制线程的行为,提供了并发工具类,当然简单的我们通过wait,notify,也是可以实现的,但是比较复杂的,比如希望若干个线程在同一时间开始执行,一个线程等待其他几个线程执行完毕之后再开始执行,接下来就一起看下。
5.1:AQS
并发工具类的核心是AQS,abstract queue synchronizer,高度抽象竞争的资源为一个int变量state,抽象要竞争资源的线程为一个线程CLH队列(CLH 是三个人名字的首字母)
,是一个双向队列,如下图:
5.2:Semaphor
信号量,用来控制并发执行的线程数,超过限制的并发数state后的线程将会被阻塞,直到有其它线程结束,如下一个8个工人只允许2个工人同时操作的例子:
public class SemaphoreDemo {
public static void main(String[] args) {
int N = 8; //工人数
Semaphore semaphore = new Semaphore(2); //机器数目
for (int i = 0; i < N; i++)
new Worker(i, semaphore).start();
}
static class Worker extends Thread {
private int num;
private Semaphore semaphore;
public Worker(int num, Semaphore semaphore) {
this.num = num;
this.semaphore = semaphore;
}
@Override
public void run() {
try {
semaphore.acquire(); // 在子线程里控制资源占用
System.out.println("工人" + this.num + "占用一个机器在生产...");
Thread.sleep(2000);
System.out.println("工人" + this.num + "释放出机器");
semaphore.release(); // 在子线程里控制释放资源占用
} catch (InterruptedException e) {
e.printStackTrace();
}
}
}
}
运行(**后是我添加的注释)
:
工人2占用一个机器在生产...
工人4占用一个机器在生产... ** 2,4运行
工人2释放出机器
工人7占用一个机器在生产... ** 7,4运行
工人4释放出机器
工人0占用一个机器在生产... ** 7,0运行
工人7释放出机器
工人1占用一个机器在生产... ** 1,0运行
工人0释放出机器
工人3占用一个机器在生产... ** 1,3运行
工人1释放出机器
工人3释放出机器
工人5占用一个机器在生产...
工人6占用一个机器在生产... ** 5,6运行
工人5释放出机器
工人6释放出机器
5.3:CountDownLatch
适用于一个线程需要等待另外一组线程执行完毕之后再执行的场景,如下主线程等待5个线程执行完毕之后继续执行的例子,源码:
public class CountDownLatchDemo {
public static void main(String[] args) throws InterruptedException {
CountDownLatch countDownLatch = new CountDownLatch(5);
for(int i=0;i<5;i++){
new Thread(new readNum(i,countDownLatch)).start();
}
countDownLatch.await(); // 注意跟CyclicBarrier不同,这里在主线程await
System.out.println("==>各个子线程执行结束。。。。");
System.out.println("==>主线程执行结束。。。。");
}
static class readNum implements Runnable{
private int id;
private CountDownLatch latch;
public readNum(int id,CountDownLatch latch){
this.id = id;
this.latch = latch;
}
@Override
public void run() {
System.out.println("id:"+id+","+Thread.currentThread().getName());
System.out.println("线程组任务"+id+"结束,其他任务继续");
latch.countDown();
}
}
}
运行:
id:0,Thread-0
id:4,Thread-4
id:3,Thread-3
id:1,Thread-1
id:2,Thread-2
线程组任务1结束,其他任务继续
线程组任务3结束,其他任务继续
线程组任务0结束,其他任务继续
线程组任务4结束,其他任务继续
线程组任务2结束,其他任务继续
==>各个子线程执行结束。。。。
==>主线程执行结束。。。。
5.4:CyclicBarrier
栅栏,使用在希望几个线程同时开始执行的场景中,另外改类还有一个和CountDownLatch的相似之处,当所有将要同时开始执行前,即所有的线程都到达了聚合点
,会回调CyclicBarrier的一个回调任务,回调执行完毕后即在构造函数public CyclicBarrier(int parties, Runnable barrierAction)
中指定的第二个参数,如下测试代码:
public class CyclicBarrierDemo {
public static void main(String[] args) throws InterruptedException {
CyclicBarrier cyclicBarrier = new CyclicBarrier(5, new Runnable() {
@Override
public void run() {
System.out.println("==>各个子线程都到达聚合点,开始执行一些聚合操作");
try {
TimeUnit.SECONDS.sleep(2);
} catch (InterruptedException e) {}
}
});
for (int i = 0; i < 5; i++) {
new Thread(new readNum(i,cyclicBarrier)).start();
}
}
static class readNum implements Runnable{
private int id;
private CyclicBarrier cyc;
public readNum(int id,CyclicBarrier cyc){
this.id = id;
this.cyc = cyc;
}
@Override
public void run() {
synchronized (this){
System.out.println("id:"+id+","+Thread.currentThread().getName());
try {
cyc.await();
System.out.println("线程组任务" + id + "结束,其他任务继续");
} catch (Exception e) {
e.printStackTrace();
}
}
}
}
}
运行:
id:0,Thread-0
id:4,Thread-4
id:3,Thread-3
id:1,Thread-1
id:2,Thread-2
==>各个子线程都到达聚合点,开始执行一些聚合操作
线程组任务0结束,其他任务继续
线程组任务2结束,其他任务继续
线程组任务1结束,其他任务继续
线程组任务4结束,其他任务继续
线程组任务3结束,其他任务继续
底层并没有使用AQS,因为比较简单,所以使用的是Condition。
6:并发集合类
使用同常规类,只不过提供了线程安全,且并发性能由于简单的synchronized和加锁。
写在后面
参考文章列表
Lock锁------lockInterruptibly()方法 。
LockSupport详解 。