JUC并发编程
- 八、ReentrantReadWriteLock 读写锁
- 8.1、概述
- 8.2、案例
- 8.3、读写锁的降级
- 九、BlockingQueue阻塞队列
- 9.1、阻塞队列概述
- 9.2、阻塞队列分类
- 9.2.1、ArrayBlockingQueue(常用)
- 9.2.2、LinkedBlockingQueue(常用)
- 9.2.3、 DelayQueue
- 9.2.4、 PriorityBlockingQueue
- 9.2.5、 SynchronousQueue
- 9.3、阻塞队列方法案例
- 十、ThreadPool线程池
- 10.1、概述
- 10.2、使用
- 10.2.1、一池N线程
- 10.2.2、一个任务一个任务执行,一池一线程线程池
- 10.2.3、根据需求创建线程,可扩容,遇强则强
- 10.3、参数说明
- 10.4、线程池底层流程
- 10.5、自定义线程池
- 十一、分支合并Fork/Join
- 十二、异步回调
八、ReentrantReadWriteLock 读写锁
8.1、概述
读锁:共享锁
写锁:独占锁
都会发生死锁
读读不互斥,读写互斥,写写互斥
8.2、案例
不带锁:
import java.util.HashMap;
import java.util.Map;
import java.util.concurrent.TimeUnit;
class MyCache{
//创建map集合
private volatile Map<String,Object> map = new HashMap<>();
//放数据
public void put(String k,Object v) throws InterruptedException {
System.out.println(Thread.currentThread( ).getName()+"正在放数据"+k);
TimeUnit.MILLISECONDS.sleep(300);
map.put(k,v);
System.out.println(Thread.currentThread( ).getName()+"写完数据"+k);
}
//取数据
public void get(String k) throws InterruptedException {
Object res=null;
System.out.println(Thread.currentThread( ).getName()+"正在取数据"+k);
TimeUnit.MILLISECONDS.sleep(300);
res=map.get(k);
System.out.println(Thread.currentThread( ).getName()+"取完数据"+k);
}
}
public class ReentrantReadWriteLockDemo {
public static void main(String[] args) {
MyCache myCache=new MyCache();
for (int i = 1; i < 5; i++) {
final int num=i;
new Thread(()->{
try {
myCache.put(num+"",num);
} catch (InterruptedException e) {
e.printStackTrace();
}
},String.valueOf(i)).start();
}
for (int i = 1; i < 5; i++) {
final int num=i;
new Thread(()->{
try {
myCache.get(num+"");
} catch (InterruptedException e) {
e.printStackTrace();
}
},String.valueOf(i)).start();
}
}
}
会出现逻辑错误
加上读写锁
import java.util.HashMap;
import java.util.Map;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.locks.ReadWriteLock;
import java.util.concurrent.locks.ReentrantReadWriteLock;
class MyCache{
//创建map集合
private volatile Map<String,Object> map = new HashMap<>();
//创建读写锁对象
private ReadWriteLock readWriteLock=new ReentrantReadWriteLock();
//放数据
public void put(String k,Object v) {
readWriteLock.writeLock().lock();
try {
System.out.println(Thread.currentThread( ).getName()+"正在放数据"+k);
TimeUnit.MILLISECONDS.sleep(300);
map.put(k,v);
System.out.println(Thread.currentThread( ).getName()+"写完数据"+k);
} catch (InterruptedException e) {
e.printStackTrace();
}finally {
readWriteLock.writeLock().unlock();
}
}
//取数据
public void get(String k){
readWriteLock.readLock().lock();
try {
Object res=null;
System.out.println(Thread.currentThread( ).getName()+"正在取数据"+k);
TimeUnit.MILLISECONDS.sleep(300);
res=map.get(k);
System.out.println(Thread.currentThread( ).getName()+"取完数据"+k);
} catch (InterruptedException e) {
e.printStackTrace();
}finally {
readWriteLock.readLock().unlock();
}
}
}
public class ReentrantReadWriteLockDemo {
public static void main(String[] args) {
MyCache myCache=new MyCache();
for (int i = 1; i < 5; i++) {
final int num=i;
new Thread(()->{
myCache.put(num+"",num);
},String.valueOf(i)).start();
}
for (int i = 1; i < 5; i++) {
final int num=i;
new Thread(()->{
myCache.get(num+"");
},String.valueOf(i)).start();
}
}
}
8.3、读写锁的降级
读时候,不能写,只有读完成之后,才可以写,写操作的同时可以读。容易造成锁饥饿,一直读,没有写操作。
锁降级的步骤
获取写锁 -->>获取读锁 -->>释放写锁 -->>释放读锁
public static void main( String[] args) {
//可重入读写锁对象
ReentrantReadwriteLock rwLock = new ReentrantReadwriteLock();
ReentrantReadwriteLock.ReadLock readLock = rwLock.readLock();//读锁
ReentrantReadwriteLock.writeLock writeLock = rwLock.writeLock();//写锁
//锁降级
//1获取写锁
writeLock.lock();
system.out.println("123");
//2获取读锁
readLock.lock();
system.out.println("456");
//3释放写锁
writeLock.unlock();
//4释放读锁
readLock.unlock();
}
九、BlockingQueue阻塞队列
9.1、阻塞队列概述
阻塞队列,顾名思义,首先它是一个队列,通过一个共享的队列,可以使得数据由队列的一端输入,从另外一端输出;
- 当队列是空的,从队列中获取元素的操作将会被阻塞
- 当队列是满的,从队列中添加元素的操作将会被阻塞
- 试图从空的队列中获取元素的线程将会被阻塞,直到其他线程往空的队列插入新的元素
- 试图向已满的队列中添加新元素的线程将会被阻塞,直到其他线程从队列中移除一个或多个元素或者完全清空,使队列变得空闲起来并后续新增
好处是我们不需要关心什么时候需要阻塞线程,什么时候需要唤醒线程,因为这一切BlockingQueue都给你一手包办
9.2、阻塞队列分类
9.2.1、ArrayBlockingQueue(常用)
基于数组的阻塞队列实现,在ArrayBlockingQueue内部,维护了一个定长数组,以便缓存队列中的数据对象,这是一个常用的阻塞队列,除了一个定长数组外,ArrayBlockingQueue 内部还保存着两个整形变量,分别标识着队列的头部和尾部在数组中的位置。”
由数组结构组成的有界阻基队列。
9.2.2、LinkedBlockingQueue(常用)
ArrayBlockingQueue和LinkedBlockingQueue是两个最普通也是最常用的阻塞队列,一般情况下,在处理多线程间的生产者消费者问题,使用这两个类足以。·
一句话总结:由链表结构组成的有界(但大小默认值为integer.MAX_VALUE)阻塞队列
9.2.3、 DelayQueue
DelayQueue 中的元素只有当其指定的延迟时间到了,才能够从队列中获取到该元素。DelayQueue是一个没有大小限制的队列,因此往队列中插入数据的操作(生产者)永远不会被阻塞,而只有获取数据的操作(消费者)才会被阻塞。
使用优先级队列实现的延迟无界阻塞队列
9.2.4、 PriorityBlockingQueue
基于优先级的阻塞队列(优先级的判断通过构造函数传入的Compator对象来决定),但需要注意的是PriorityBlockingQueue并不会阻塞数据生产者,而只会在没有可消费的数据时,阻塞数据的消费者。
支持优先级排序的无界阻塞队列
9.2.5、 SynchronousQueue
不存储元素的阻塞队列,也即单个元素的队列
9.3、阻塞队列方法案例
import java.util.concurrent.ArrayBlockingQueue;
import java.util.concurrent.BlockingQueue;
public class BlockQueueDemo {
public static void main(String[] args) throws InterruptedException {
BlockingQueue<String> blockQueue=new ArrayBlockingQueue<>(3);
blockQueue.put("a");
blockQueue.put("b");
blockQueue.put("c");
//blockQueue.put("d");会阻塞
System.out.println(blockQueue.take());
System.out.println(blockQueue.take());
System.out.println(blockQueue.take());
//System.out.println(blockQueue.take());会阻塞
}
}
十、ThreadPool线程池
10.1、概述
线程池的优势: 线程池做的工作只要是控制运行的线程数量,处理过程中将任务放入队列,然后在线程创建后启动这些任务,如果线程数量超过了最大数量,超出数量的线程排队等候,等其他线程执行完毕,再从队列中取出任务来执行。
它的主要特点为:
- 降低资源消耗:通过重复利用已创建的线程降低线程创建和销毁造成的销耗
- 提高响应速度:当任务到达时。任务可以不需要等待线程创建就能立即执行
- 提高线程的可管理性:线程是稀缺资源,如果无限制的创建,不仅会销耗系统资源,还会降低系统的稳定性,使用线程池可以进行统一的分配,调优和监控。
10.2、使用
10.2.1、一池N线程
- 线程池中的线程处于一定的量,可以很好的控制线程的并发量
- 线程可以重复被使用,在显示关闭之前,都将一直存在
- 超出—定量的线程被提交时候需在队列中等待.
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
public class ThreadPoolDemo1 {
public static void main(String[] args) {
//一池5线程
ExecutorService executorService = Executors.newFixedThreadPool(5);
//10个任务
try {
for (int i = 1; i <=10 ; i++) {
//执行
executorService.execute(()->{
System.out.println(Thread.currentThread() .getName()+"处理任务ing...");
});
}
}catch (Exception e){
e.printStackTrace();
}finally {
//关闭线程池
executorService.shutdown();
}
}
}
10.2.2、一个任务一个任务执行,一池一线程线程池
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
public class ThreadPoolDemo2 {
public static void main(String[] args) {
//一池1线程
ExecutorService executorService = Executors.newSingleThreadExecutor();
//10个任务
try {
for (int i = 1; i <=10 ; i++) {
//执行
executorService.execute(()->{
System.out.println(Thread.currentThread() .getName()+"处理任务ing...");
});
}
}catch (Exception e){
e.printStackTrace();
}finally {
//关闭线程池
executorService.shutdown();
}
}
}
10.2.3、根据需求创建线程,可扩容,遇强则强
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
public class ThreadPoolDemo3 {
public static void main(String[] args) {
//根据需求创建线程,可扩容
ExecutorService executorService = Executors.newCachedThreadPool();
//10个任务
try {
for (int i = 1; i <=100 ; i++) {
//执行
executorService.execute(()->{
System.out.println(Thread.currentThread() .getName()+"处理任务ing...");
});
}
}catch (Exception e){
e.printStackTrace();
}finally {
//关闭线程池
executorService.shutdown();
}
}
}
10.3、参数说明
newFixedThreadPool
、newSingleThreadExecutor
、newCachedThreadPool
底层都是new了ThreadPoolExecutor
ThreadPoolExecutor
有七个参数
参数 | 意义 |
---|---|
int corePoolsize | 核心线程数量 |
int maximumPoolsize, | 最大线程数量 |
long keepAliveTime | 线程存活时间的值 |
TimeUnit unit | 线程存活时间的单位 |
BlockingQueue workQueue | 阻塞队列 |
ThreadFactory threadFactory | 线程工厂 用于创建线程 |
RejectedExecutionHandler handler | 拒绝策略 |
10.4、线程池底层流程
- 程序在执行execute()时才会真正创建线程
- 当任务数量到达corePool【个数:2】时并不会再创建新线程 而是会到阻塞队列里【序号2】
- 当阻塞队列快满了【个数:3 最大:4】但运行的线程没有达到最大数【个数:2 最大5】时又来了新的任务会创建新的线程执行刚来的任务【序号3】
- 阻塞放不下了,运行的线程也达到最大值 此时再来新任务就会执行拒绝策略【序号4】
拒绝策略:
- AbortPolicy(默认)︰直接抛出RejectedExecutionException异常阻止系统正常运行
- callerRunsPolicy :“调用者运行“—种调节机制,该策略既不会抛弃任务,也不会抛出异常,而是将某些任务回退到调用者,从而降低新任务的流量。
- DiscardoldestPolicy :抛弃队列中等待最久的任务,然后把当前任务加人队列中尝试再次提交当前任务。
- DiscardPolicy :该策略默默地丢弃无法处理的任务,不予任何处理也不抛出异常。如果允许任务丢失,这是最好的一种策略。
10.5、自定义线程池
import java.util.concurrent.*;
public class ThreadPoolDemo4 {
public static void main(String[] args) {
ExecutorService executorService = new ThreadPoolExecutor(
2, 5, 2L, TimeUnit.SECONDS, new ArrayBlockingQueue<>(3),
Executors.defaultThreadFactory(),
new ThreadPoolExecutor.AbortPolicy()
);
//10个任务
try {
for (int i = 1; i <=10 ; i++) {
//执行
executorService.execute(()->{
System.out.println(Thread.currentThread() .getName()+"处理任务ing...");
});
}
}catch (Exception e){
e.printStackTrace();
}finally {
//关闭线程池
executorService.shutdown();
}
}
}
十一、分支合并Fork/Join
Fork:把一个复杂任务进行分拆
Join:把分拆任务的结果进行合并
计算1+2+…100 其中查分值不能超过10(二分法)
import java.util.concurrent.ExecutionException;
import java.util.concurrent.ForkJoinPool;
import java.util.concurrent.ForkJoinTask;
import java.util.concurrent.RecursiveTask;
class MyThread extends RecursiveTask<Integer> {
private static final Integer VALUE = 10;//拆分差值
private int begin;
private int end;
private int result;
public MyThread(int begin,int end) {
this.begin=begin;
this.end=end;
}
@Override
protected Integer compute() {
//判断相加两个数值是否大于10
if((end-begin)<=VALUE) {
//相加操作
for (int i = begin; i <=end; i++) {
result = result+i;
}
} else {//进一步拆分
//获取中间值
int middle =(end + begin)/2;//拆分
MyThread thread01 = new MyThread(begin,middle);
MyThread thread02 = new MyThread(middle+1,end);
thread01.fork();
thread02.fork();
result=thread01.join()+thread02.join();
}
return result;
}
}
public class demo01 {
public static void main(String[] args) throws ExecutionException, InterruptedException {
MyThread thread = new MyThread(1,100);
//创建分支合并池对象
ForkJoinPool forkJoinPool = new ForkJoinPool();
//加入任务
ForkJoinTask<Integer> submit = forkJoinPool.submit(thread);
//获取结果
Integer result = submit.get();
System.out.println(result);
forkJoinPool.shutdown();
}
}
十二、异步回调
package com.ynx.exarejuc;
import org.apache.coyote.http11.filters.VoidInputFilter;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.ExecutionException;
public class Completable {
public static void main(String[] args) throws ExecutionException, InterruptedException {
//异步调用没有返回值
CompletableFuture<Void> completableFuture1=CompletableFuture.runAsync(()->{
System.out.println(Thread.currentThread().getName()+"completableFuture1");
});
completableFuture1.get();
//异步调用有返回值
CompletableFuture<Integer> completableFuture2=CompletableFuture.supplyAsync(()->{
System.out.println(Thread.currentThread().getName()+"completableFuture2");
//int a=1/0;
return 1024;
});
completableFuture2.whenComplete((t,u)->{
System.out.println("t:::"+t);//返回值
System.out.println("u:::"+u);//异常的信息
}).get();
}
}