- Callable接口
- ReentrantLock
- 常用的方法
- 创建公平锁
- 创建读写锁
- 唤醒机制
- ReentrantLock与synchronized的区别
- 原子类
- 工具类
- Semaphore
- CountDownLatch
- CyclicBarrier-循环栅栏
- 线程安全的集合类
- CopyOnWriteArrayList
- 多线程环境使用队列
- 多线程环境使用哈希表
- ConcurrentHashMap
java.util.concurrent包简称JUC,是JDK1.5之后对多线程的一种实现,这个包下的类都和多线程有关。
Callable接口
前面介绍过Runnable接口,它是java.long下的接口,在创建线程时可以使用。Callable是JUC包下描述线程任务的接口。
1.Callable实现的是call方法,Runnable实现的是run方法;
2.Callable可以返回一个结果,Runnable不能返回结果;
3.Callable要配合FurtureTask使用;
public class Demo03_Callable {
public static void main(String[] args) throws ExecutionException, InterruptedException {
//先定义一个线程的任务
Callable<Integer> callable = new Callable<Integer>() {
@Override
public Integer call() throws Exception {
int sum = 0;
for (int i = 0; i < 10; i++) {
sum += i;
TimeUnit.SECONDS.sleep(1);
System.out.println("等待1秒");
}
//返回sum
return sum;
}
};
//通过 FutureTaskl类来创建一个对象,这个对象持有callable
FutureTask<Integer> futureTask = new FutureTask<>(callable);
//创建线程并指定任务
Thread thread = new Thread(futureTask);
//让线程执行定义好的任务
thread.start();
//获取线程执行的结果,抛出异常
System.out.println("等待结果....");
Integer result = futureTask.get();
//打印结果
System.out.println(result);
}
}
4.Callable可以抛出异常,Runnable不可以。
public class Demo04_CallableException {
public static void main(String[] args) throws ExecutionException, InterruptedException {
// 先定义一个线程的任务
Callable<Integer> callable = new Callable<Integer>() {
@Override
public Integer call() throws Exception {
int sum = 0;
for (int i = 0; i < 5; i++) {
sum += i;
TimeUnit.SECONDS.sleep(1);
throw new Exception("业务出现异常");
}
// 返回结果
return sum;
}
};
// 通过FutureTask类来创建一个对象,这个对象持有callable
FutureTask<Integer> futureTask = new FutureTask<>(callable);
// 创建线程并指定任务
Thread thread = new Thread(futureTask);
// 让线程执行定义好的任务
thread.start();
// 获取线程执行的结果
System.out.println("等待结果...");
Integer result = null;
// 捕获异常
try {
result = futureTask.get();
} catch (InterruptedException e) {
e.printStackTrace();
} catch (ExecutionException e) {
System.out.println("处理异常" + e.getMessage());
e.printStackTrace();
}
// 打印结果
System.out.println(result);
}
}
ReentrantLock
本身就是一个锁,是基于CAS实现的一个纯用户态的锁。
常用的方法
1.lock() : 加锁
2.tryLock() : 尝试加锁
3.unLock() :释放锁
public static void demo01_lock() throws InterruptedException {
// 创建一个ReentrantLock对象
ReentrantLock reentrantLock = new ReentrantLock();
// 加锁
reentrantLock.lock();
// 尝试加锁, 死等
reentrantLock.tryLock();
// 尝试加锁,有超时时间
reentrantLock.tryLock(1, TimeUnit.SECONDS);
// 释放锁
reentrantLock.unlock();
}
模拟业务中如果出现异常情况,如何释放锁?将释放锁的操作写在finally中,保证出现异常的时候也可以释放锁。
public static void demo02 () throws Exception {
// 创建一个ReentrantLock对象
ReentrantLock reentrantLock = new ReentrantLock();
// 加锁
reentrantLock.lock();
try {
// TODO : 业务逻辑
throw new Exception("业务出现异常");
} finally {
// 保证出现异常的时候也可以释放锁
reentrantLock.unlock();
}
}
创建公平锁
通过构造方法中传入true时为公平锁,false为非公平锁,默认为false
public static void demo03_fair () {
// 通过构造方法,传入true时为公平锁,false为非公平锁,默认为false
ReentrantLock reentrantLock = new ReentrantLock(true);
}
创建读写锁
readLock()和writeLock()。
public static void demo04_ReadWriteLock () {
// 创建
ReentrantReadWriteLock readWriteLock = new ReentrantReadWriteLock();
// 获取读锁, 共享锁,读与读可以同时进行
readWriteLock.readLock();
// 获取写锁,排他锁(互斥锁),读写,写读,写写不能共存
readWriteLock.writeLock();
}
唤醒机制
ReentrantLock可以根据不同的Condition去休眠或唤醒线程,同一把锁可以分为不同的休眠或唤醒条件。
public class Demo05_ReentrantLock {
/**
* ReentrantLock可以根据不同的Condition去休眠或唤醒线程
* 同一把锁可以分为不同的休眠或唤醒条件
*/
private static ReentrantLock reentrantLock = new ReentrantLock();
// 定义不同的条件
private static Condition boyCondition = reentrantLock.newCondition();
private static Condition girlCondition = reentrantLock.newCondition();
public static void demo05_Condition () throws InterruptedException {
Thread threadBoy = new Thread(() -> {
// 让处理男生任务的线程去休眠
try {
boyCondition.await();
} catch (InterruptedException e) {
e.printStackTrace();
}
// 唤醒处理女生任务的线程
girlCondition.signalAll();
});
Thread threadGirl = new Thread(() -> {
// 让处理女生任务的线程去休眠
try {
girlCondition.await();
} catch (InterruptedException e) {
e.printStackTrace();
}
// 唤醒处理男生任务的线程
boyCondition.signalAll();
});
}
ReentrantLock与synchronized的区别
①synchronized 是一个关键字,是 JVM 内部实现的(大概率是基于 C++ 实现).。ReentrantLock 是标准库的一个类,在 JVM 外实现的(基于 Java 实现)。
②synchronized 使用时不需要手动释放锁。ReentrantLock 使用时需要手动释放,使用起来更灵活, 但是也容易遗漏 unlock。
③synchronized在申请锁失败时,会死等。ReentrantLock 可以通过 trylock 的方式等待一段时间就放弃。
④synchronized是非公平锁,ReentrantLock 默认是非公平锁。可以通过构造方法传入一个 true 开启公平锁模式。
原子类
原子类内部用的是 CAS 实现,所以性能要比加锁实现 i++ 高很多。原子类有以下几个
AtomicBoolean
AtomicInteger
AtomicIntegerArray
AtomicLong
AtomicReference
AtomicStampedReference
以AtomicInteger 为例
public class Demo01_Atomic {
public static void main(String[] args) {
AtomicInteger atomicInteger = new AtomicInteger();
// 相当于i++
atomicInteger.getAndIncrement();
// 相当于++i
atomicInteger.incrementAndGet();
// 相当于i--
atomicInteger.getAndDecrement();
// 相当于--i
atomicInteger.decrementAndGet();
// 相当于 i + 10
atomicInteger.getAndAdd(10);
}
}
工具类
Semaphore
信号量,用来表示 “可用资源的个数”。本质上就是一个计数器。
可以把信号量想象成是停车场的展示牌:当前有车位 100 个,表示有 100 个可用资源;
当有车开进去的时候,就相当于申请一个可用资源,可用车位就 -1 (这个称为信号量的 P 操作);
当有车开出来的时候,就相当于释放一个可用资源,可用车位就 +1 (这个称为信号量的 V 操作);
如果计数器的值已经为 0 了,还尝试申请资源,就会阻塞等待,直到有其他线程释放资源。
示例:定义20个线程模拟调用3个资源。
public class Demo02_Semaphore {
// 定义一个信号变量,指定可用资源的个数
private static Semaphore semaphore = new Semaphore(3);
public static void main(String[] args) {
// 定义一个任务
Runnable runnable = new Runnable() {
@Override
public void run() {
try {
System.out.println(Thread.currentThread().getName() + "[.]申请资源.");
// 调用acquire方法,让可用资源数 减 1
semaphore.acquire();
System.out.println(Thread.currentThread().getName() + "[+]申请到了资源");
// 模拟业务处理的过程
TimeUnit.SECONDS.sleep(1);
// 释放资源
semaphore.release();
System.out.println(Thread.currentThread().getName() + "[-]释放资源");
} catch (InterruptedException e) {
e.printStackTrace();
}
}
};
// 创建多个线程
for (int i = 0; i < 20; i++) {
Thread thread = new Thread(runnable);
thread.start();
}
}
}
acquire()和release()方法是成对出现的。
🎯应用场景:
在遇到业务中需要指定有限资源的个数时,可以考虑使用Semaphore来处理。比如,最多可以同时支持多少个并发…
CountDownLatch
在短跑比赛中,最终的颁奖操作必须要等待所有的运动员都到达终点才能进行。CountDownLatch可以设置所有的线程都到达某一个关键点之后才进行下一步操作。
public class Demo03_CountDownLatch {
// 定义一个CountDownLatch
private static CountDownLatch countDownLatch = new CountDownLatch(10);
public static void main(String[] args) throws InterruptedException {
System.out.println("所有选手各就各位....");
// 创建线程模拟比赛
for (int i = 0; i < 10; i++) {
Thread thread = new Thread(() -> {
System.out.println(Thread.currentThread().getName() + "出发");
// 模拟比赛过程
try {
TimeUnit.SECONDS.sleep(1);
} catch (InterruptedException e) {
e.printStackTrace();
}
// 到达终点,计数减1
countDownLatch.countDown();
System.out.println(Thread.currentThread().getName() + "到达终点");
}, "player" + (i + 1));
// 启动线程
thread.start();
}
// 等待所有的线程执行完成
countDownLatch.await();
// 颁奖
System.out.println("开始颁奖");
}
}
countDown()方法每执行一次,构造时指定的值就会减1,直到为0;
await()方法一直会等到countDownLatch维护的值为0时,才会进行继续运行下面的操作。
🎯应用场景:
把一个大任务分为若干个小的任务,或是等待一些前置资源时,可以考虑使用CountDownLatch。
CyclicBarrier-循环栅栏
CountDownLatch的进阶版,可以实现线程间的相互等待,计数重置。
线程安全的集合类
定义一个普通集合类,用多个线程同时对这个集合进行add操作,并打印集合:
public static void main(String[] args) throws InterruptedException {
// 定义一个普通集合类
List<Integer> list = new ArrayList<>();
// 用多个线程同时对这个集合进行add操作,并打印集合
for (int i = 0; i < 10; i++) {
int finalI = i;
Thread thread = new Thread(() -> {
list.add(finalI);
System.out.println(list);
});
thread.start();
}
TimeUnit.SECONDS.sleep(1);
System.out.println("=================");
System.out.println(list);
}
运行这段代码可能会报错;并发修改异常。
如果出现这个异常首先要考虑是不是集合类使用的不恰当,也就是说在多线程环境下使用了线程不安全的的集合类,那么在多线程环境下如何使用线程安全的集合类呢?
1.使用Vector、Hashtable之类的,JDK中提供的线程安全的类。可以解决上述问题,但是不推荐使用。
2.自己使用同步机制。使用(synchronized或者ReentrantLock)。这种方式和上面的效果差不多,也不推荐。
3.通过工具类转换Collections.synchronizedList(new Arraylist)
可以创建一个线程安全的类。
public static void main (String[] args) throws InterruptedException {
// 通过工具类来创建一个线程安全的集合类
List<Object> list = Collections.synchronizedList(new ArrayList<>());
// 用多个线程同时对这个集合进行add操作,并打印集合
for (int i = 0; i < 10; i++) {
int finalI = i;
Thread thread = new Thread(() -> {
list.add(finalI);
System.out.println(list);
});
thread.start();
}
TimeUnit.SECONDS.sleep(1);
System.out.println("=================");
System.out.println(list);
}
它是通过在普通集合对象外层又包裹了一层synchronized完成的线程安全。也不推荐使用。
CopyOnWriteArrayList
这是JUC包下的类,使用的是一种写时复制技术来实现的。写时复制就是当要修改一个集合时,先复制一份这个集合的复本,修改复本的数据;修改完之后,用复本覆盖原始集合。
public static void main(String[] args) throws InterruptedException {
// 使用CopyOnWriteArrayList
CopyOnWriteArrayList<Integer> list = new CopyOnWriteArrayList<>();
// 用多个线程同时对这个集合进行add操作,并打印集合
for (int i = 0; i < 10; i++) {
int finalI = i;
Thread thread = new Thread(() -> {
list.add(finalI);
System.out.println(list);
});
thread.start();
}
TimeUnit.SECONDS.sleep(1);
System.out.println("=================");
System.out.println(list);
}
优点:
在读多写少的场景下, 性能很高, 不需要加锁竞争;
缺点:
1、由于新复制了一份数据进行修改,所以占用内存较多;
2、新写的数据不能被第一时间读取到。
多线程环境使用队列
1)ArrayBlockingQueue:基于数组实现的阻塞队列
2)LinkedBlockingQueue:基于链表实现的阻塞队列
3)PriorityBlockingQueue:基于堆实现的带优先级的阻塞队列
4)TransferQueue:最多只包含一个元素的阻塞队列
多线程环境使用哈希表
1.Hashtable线程安全
实现方法是通过synchronized给this加锁,也就是给自己加锁,读写的时候都加锁,这样效率比较低,不推荐使用。
2.HashMap线程不安全
正常单线程环境下使用HashMap没有问题,由于本身没有加锁处理,在多线程环境下会产生线程安全问题。
ConcurrentHashMap
在多线程环境下推荐使用这种方式保证线程安全。在JDK1.8中,使用synchronized实现加锁。
既然Hashtable和 ConcurrentHashMap都是线程安全的,那它们有啥区别呢?
Hashtable是对所有操作全部加锁,必然会影响性能。一个Hashtable只有一把锁,两个线程访问Hashtable中的任意数据就会出现锁锁竞争。
1.ConcurrentHashMap优化了锁的粒度,它是对每个哈希桶加锁,意味着哈希桶的数组长度有多少,就可以支持多少个并发。只有当两个线程访问的恰好是同一个哈希桶上的数据才会出现锁冲突。
2.ConcurrentHashMap的读操作没有加锁(但是使用了 volatile 保证从内存读取结果), 只对写操作进行加锁,加锁方式使用synchronized。
3.充分利用了CAS机制。比如size的属性通过CAS来更新,避免出现重量级锁。
4.对扩容机制做了优化。对于需要扩容的操作,新建一个新的Hash桶,随后的每次操作都搬运一些元素去新的Hash桶。在扩容没有完成时,两个Hash桶同时存在,每次写入时只写入新的Hash桶,每次读取需要从新旧桶中同时读,哪个读到了就返回哪个,所有数据搬运完成后,把老的Hash桶删除。这是一个典型的以空间换时间的例子。
继续加油~