复习概念
Sleep和Wait的区别
- Sleep是Thread的静态方法,wait是Object的方法,任何对象实例都可以使用
- sleep不会释放锁,他也不需要占用锁,暂停。wait会释放锁,但是调用他的前提是线程占有锁
- 他们都可以被Interrupted方法中断。在哪里停,在哪里起
并发并行
并发:同一时刻,多个线程访问同一资源
并行:多项工作一起执行,之后汇总
线程的状态
Thread.status
管程
monitor监视器 所说的锁
是一种同步机制,保证同一时间,只有一个线程 访问被保护的数据或者代码
用户线程和守护线程
用户线程:自定义线程,平时用到的具体逻辑
守护线程:垃圾回收线程
主线程已经结束了,用户线程还在运行,jvm存活、
如果程序中没有用户线程了,守护线程也会结束,在执行start之前设置是否是守护线程。
卖票程序:
package com.policy.thread;
class Ticket{
private int ticket =30;
public synchronized void sale(){
if(ticket>0){
System.out.println(Thread.currentThread().getName()+"卖出:"+(ticket--)+"剩余:"+ticket);
}
}
}
public class SaleTicket {
public static void main(String[] args) {
Ticket ticket = new Ticket();
new Thread(new Runnable() {
@Override
public void run() {
for (int i = 0; i < 40; i++) {
ticket.sale();
}
}
},"a").start();
new Thread(new Runnable() {
@Override
public void run() {
for (int i = 0; i < 40; i++) {
ticket.sale();
}
}
},"b").start();
new Thread(new Runnable() {
@Override
public void run() {
for (int i = 0; i < 40; i++) {
ticket.sale();
}
}
},"c").start();
}
}
可重入锁
Lock.lock()
Lock.unlock()
lock不是java内置的,synchronized是java关键字
sync不需要手动释放锁,lock上锁之后需要手动释放锁
package com.policy.thread;
import java.util.concurrent.locks.ReentrantLock;
class LTicket{
private int num=30;
//创建可重入锁
private final ReentrantLock lock = new ReentrantLock();
//卖票方法
public void sale(){
//上锁
lock.lock();
try {
if(num>0){
System.out.println(Thread.currentThread().getName()+"卖出"+(num--)+"剩余"+num);
}
} catch (Exception e) {
throw new RuntimeException(e);
} finally {
//解锁
lock.unlock();
}
}
}
public class LTicketSale {
public static void main(String[] args) {
LTicket lTicket = new LTicket();
//创建线程 表达式
new Thread(()->{
for (int i = 0; i < 40; i++) {
lTicket.sale();
}
},"a").start();
new Thread(()->{
for (int i = 0; i < 40; i++) {
lTicket.sale();
}
},"b").start();
new Thread(()->{
for (int i = 0; i < 40; i++) {
lTicket.sale();
}
},"c").start();
}
}
Lock 与Sync总结
1、Lock是一个接口,sync是java的关键字,是一种内置的实现语言
2、当sync发生异常时,会自动释放占有的锁,因此不会导致死锁的发生;
Lock在发生异常时,如果没有主动通过unLock释放锁,则很可能会造成死锁现象,因此在Finally中释放锁是很必要的。
3、Lock会让等待锁的线程响应中断,而sync却不会中断等待线程,使用sync会一直等待下去,不能够响应中断。
4、通过lock可以知道有没有成功获取锁,而Sync却无法办到。
5、Lock可以提高多个线程读操作的效率。
线程通信
第一步:创建资源类,在资源类中创建属性和方法
第二步:方法中
判断、干活、通知
第三步:创建多个线程,调用资源类的操作方法
第四步:防止虚假唤醒问题
package com.policy.thread.Thread202308;
class Share{
//线程等待和唤醒
private int num=0;
public synchronized void incr() throws InterruptedException {
//加一操作
if(num!=0){
//不等于0说明不需要加一 等待
this.wait();
}
//加一
num++;
System.out.println(Thread.currentThread().getName()+"::"+num);
//唤醒其他线程 可以减一了
this.notifyAll();
}
public synchronized void decr() throws InterruptedException {
if(num!=1){
//等待
this.wait();
}
//减一
num--;
System.out.println(Thread.currentThread().getName()+"::"+num);
this.notifyAll();
}
}
public class ThreadDemo01 {
public static void main(String[] args) {
Share share=new Share();
new Thread(()->{
for (int i = 0; i < 10; i++) {
try {
share.incr();
} catch (InterruptedException e) {
throw new RuntimeException(e);
}
}
},"aa").start();
new Thread(()->{
for (int i = 0; i < 10; i++) {
try {
share.decr();
} catch (InterruptedException e) {
throw new RuntimeException(e);
}
}
},"bb").start();
}
}
此方法 如果多了线程之后 增加CCDD线程执行操作会有虚假唤醒的问题,问题的原因是出现在判断条件中
A释放之后 BCD都有机会抢到,C抢到了C也会等待,然后再去让其他线程抢,继续唤醒A线程,WAIT有一个问题,就是在哪等待,唤醒之后在哪执行,那么会执行if下面的语句,第二次执行A可能会继续执行+1
解决:把判断放到while循环中,无论什么时候唤醒,都会在执行循环的判断条件。
package com.policy.thread.Thread202308;
class Share{
//线程等待和唤醒
private int num=0;
public synchronized void incr() throws InterruptedException {
//加一操作
while(num!=0){
//不等于0说明不需要加一 等待
this.wait();
}
//加一
num++;
System.out.println(Thread.currentThread().getName()+"::"+num);
//唤醒其他线程 可以减一了
this.notifyAll();
}
public synchronized void decr() throws InterruptedException {
while(num!=1){
//等待
this.wait();
}
//减一
num--;
System.out.println(Thread.currentThread().getName()+"::"+num);
this.notifyAll();
}
}
public class ThreadDemo01 {
public static void main(String[] args) {
Share share=new Share();
new Thread(()->{
for (int i = 0; i < 10; i++) {
try {
share.incr();
} catch (InterruptedException e) {
throw new RuntimeException(e);
}
}
},"aa").start();
new Thread(()->{
for (int i = 0; i < 10; i++) {
try {
share.decr();
} catch (InterruptedException e) {
throw new RuntimeException(e);
}
}
},"bb").start();
new Thread(()->{
for (int i = 0; i < 10; i++) {
try {
share.incr();
} catch (InterruptedException e) {
throw new RuntimeException(e);
}
}
},"cc").start();
new Thread(()->{
for (int i = 0; i < 10; i++) {
try {
share.decr();
} catch (InterruptedException e) {
throw new RuntimeException(e);
}
}
},"dd").start();
}
}
Lock实现上锁和线程通信
lock.lock();上锁
lock.unlock();解锁
condition.await();等待
condition.signalAll();唤醒其他线程
package com.policy.thread.Thread202308;
import java.util.concurrent.locks.Condition;
import java.util.concurrent.locks.Lock;
import java.util.concurrent.locks.ReentrantLock;
class Share02{
//创建锁
private Lock =new ReentrantLock();
Condition condition = lock.newCondition();
private int num = 0;
public void incr() throws InterruptedException {
//上锁
lock.lock();
try {
while (num!=0){
//通信等待
condition.await();
}
//干活
num++;
System.out.println(Thread.currentThread().getName()+"::"+num);
condition.signalAll();
} finally {
//解锁
lock.unlock();
}
//判断 干活 通信
}
public void decr() throws InterruptedException {
//上锁
lock.lock();
try {
while (num!=1){
//通信等待
condition.await();
}
//干活
num--;
System.out.println(Thread.currentThread().getName()+"::"+num);
condition.signalAll();
} finally {
lock.unlock();
}
//判断 干活 通信
}
}
public class ThradDemo02 {
public static void main(String[] args) {
Share02 share02 = new Share02();
new Thread(()->{
try {
for (int i = 0; i < 10; i++) {
share02.incr();
}
} catch (InterruptedException e) {
throw new RuntimeException(e);
}
},"A").start();
new Thread(()->{
try {
share02.decr();
} catch (InterruptedException e) {
throw new RuntimeException(e);
}
},"B").start();
new Thread(()->{
try {
for (int i = 0; i < 10; i++) {
share02.decr();
}
} catch (InterruptedException e) {
throw new RuntimeException(e);
}
},"C").start();
new Thread(()->{
try {
for (int i = 0; i < 10; i++) {
share02.decr();
}
} catch (InterruptedException e) {
throw new RuntimeException(e);
}
},"D").start();
}
}
线程定制化通信
以上不管使用那种方式上锁或者进行通信 但是都不能将数据进行有顺序的固定化的通信
例如 指定三个线程,按照要求执行
AA打印5次 BB打印10次 CC打印15次
进行十轮
package com.policy.thread.Thread202308;
import java.util.concurrent.locks.Condition;
import java.util.concurrent.locks.Lock;
import java.util.concurrent.locks.ReentrantLock;
class ShareResource{
private int falg =1;//1表示A的 2-B 3C
//创建锁
private Lock lock =new ReentrantLock();
//创建三个condition
private Condition c1= lock.newCondition();
private Condition c2 = lock.newCondition();
private Condition c3=lock.newCondition();
public void print5(int loop) throws InterruptedException {
lock.lock();
try {
while (falg!=1){
//等待 不打印A
c1.await();
}
for (int i = 0; i < 5; i++) {
System.out.println(Thread.currentThread().getName()+"::"+i+"当前是第轮:"+loop);
}
//通知
falg=2;//修改flag标示位的值
c2.signal();//此时在A线程中 唤醒的是C2
} finally {
lock.unlock();
}
}
public void print10(int loop) throws InterruptedException {
lock.lock();//上锁
try {
//等待
while (falg!=2){
c2.await();
}
//干活
for (int i = 0; i < 10; i++) {
System.out.println(Thread.currentThread().getName()+"::"+i+"当前是第轮:"+loop);
}
//通知
falg=3;
c3.signal();
} finally {
lock.unlock();
}
}
public void print15(int loop) throws InterruptedException {
lock.lock();//上锁
try {
//等待
while (falg!=3){
c3.await();
}
//干活
for (int i = 0; i < 15; i++) {
System.out.println(Thread.currentThread().getName()+"::"+i+"当前是第轮:"+loop);
}
//通知
falg=1;
c1.signal();
} finally {
lock.unlock();
}
}
}
public class ThreadDemo3 {
public static void main(String[] args) {
ShareResource shareResource = new ShareResource();
new Thread(()->{
for (int i = 0; i < 10; i++) {
try {
shareResource.print5(i);
} catch (InterruptedException e) {
throw new RuntimeException(e);
}
}
},"A").start();
new Thread(()->{
for (int i = 0; i < 10; i++) {
try {
shareResource.print10(i);
} catch (InterruptedException e) {
throw new RuntimeException(e);
}
}
},"B").start();
new Thread(()->{
for (int i = 0; i < 10; i++) {
try {
shareResource.print15(i);
} catch (InterruptedException e) {
throw new RuntimeException(e);
}
}
},"C").start();
}
}
集合的线程安全
集合不安全的情况
List线程不安全的例子
在一边向list中add 一边需要输出list时 有可能会报错:并发修改异常
Exception in thread "2" Exception in thread "5" Exception in thread "1" Exception in thread "4" java.util.ConcurrentModificationException
package com.policy.thread.Thread202308;
import java.util.ArrayList;
import java.util.List;
import java.util.UUID;
public class ThreadDemo04 {
public static void main(String[] args) {
List<String> list = new ArrayList<>();
for (int i = 0; i < 10; i++) {
new Thread(()->{
list.add(UUID.randomUUID().toString().substring(0,8));
System.out.println(list);
},String.valueOf(i)).start();
}
}
}
解决方案:
list之所以会报错并发修改异常 原因是list并没有加上synchronized关键字修饰
1、Vcetor 少用
Vector 的方法上 都加上了synchronized
package com.policy.thread.Thread202308;
import java.util.ArrayList;
import java.util.List;
import java.util.UUID;
import java.util.Vector;
public class ThreadDemo04 {
public static void main(String[] args) {
List<String> list = new Vector<>();
for (int i = 0; i < 10; i++) {
new Thread(()->{
list.add(UUID.randomUUID().toString().substring(0,8));
System.out.println(list);
},String.valueOf(i)).start();
}
}
}
2、Collections
List<String> list = Collections.synchronizedList(new ArrayList<>());
通过调用Collections中的synchronized方法 即可将new出的List变成线程安全的
一般也少用
3、CopyOnWriteArrayList
List<String> list =new CopyOnWriteArrayList<>();
写时复制技术
可以并发的读取list的内容,但是需要向list写入的时候,需要复制一份写入新内容,然后将两份合并起来,读取合并后的新内容。兼顾了并发读 和写的操作
HashSet不安全的情况
Set<String> set =new HashSet<>();
for (int i = 0; i < 30; i++) {
new Thread(()->{
set.add(UUID.randomUUID().toString().substring(0,8));
System.out.println(set);
},String.valueOf(i)).start();
}
Exception in thread "5" Exception in thread "17" Exception in thread "21" Exception in thread "23" Exception in thread "24" Exception in thread "28" java.util.ConcurrentModificationException
解决方法
CopyOnWriteArraySet
Set<String> set = new CopyOnWriteArraySet<>();
HashMap不安全的情况
Map<String,String> map =new HashMap<>();
for (int i = 0; i < 30; i++) {
new Thread(()->{
map.put(UUID.randomUUID().toString().substring(0,8),"1111");
System.out.println(map);
},String.valueOf(i)).start();
}
解决方案
Map<String,String > map =new ConcurrentHashMap<>();
多线程锁
synchronized实现同步方法的基础:java每个对象都可以作为锁,具体的表现有三种形式。
对于普通的同步方法,锁的是当前实例的对象
对于静态同步方法,锁的是当前类的Class
对于同步方法块,锁的是Synchronized括号里面的配置对象
公平锁和非公平锁
非公平锁:线程会饿死,但是执行效率高
公平锁:阳光普照,效率相对低
可重入锁
synchronized 和Lock都是可重入锁
syn是一种隐式的可重入锁,Lock是显示的可重入锁 可重入锁也叫递归锁
死锁
两个或者两个以上的进程在执行过程中,因为争夺资源而造成的一种互相等待的现象,如果没有外力干涉他们就无法执行下去的现象、
产生死锁的原因:
1、系统资源不足
2、进程运行的推进顺序不合适
3、资源分配不当
package com.policy.thread.Thread202308;
import java.sql.Time;
import java.util.concurrent.TimeUnit;
public class DeadLock {
//创建两个对象
static Object a = new Object();
static Object b = new Object();
public static void main(String[] args) {
new Thread(()->{
synchronized (a){
System.out.println(Thread.currentThread().getName()+"持有A,试图获取B");
try {
TimeUnit.SECONDS.sleep(1);
} catch (InterruptedException e) {
throw new RuntimeException(e);
}
synchronized (b){
System.out.println(Thread.currentThread().getName()+"获取B");
}
}
},"A").start();
new Thread(()->{
synchronized (b){
System.out.println(Thread.currentThread().getName()+"持有B,试图获取A");
try {
TimeUnit.SECONDS.sleep(1);
} catch (InterruptedException e) {
throw new RuntimeException(e);
}
synchronized (a){
System.out.println(Thread.currentThread().getName()+"获取A");
}
}
},"B").start();
}
}
验证死锁:
1、jps 类似于:linux ps -ef
2、jstack :jvm自带的堆栈跟踪工具
Callable接口
Callable接口可以提供返回值
Runnable和Callable 区别
run call
无返回值 有返回值
无异常抛出 有异常抛出
package com.policy.thread.Thread202308.callable;
import java.util.concurrent.Callable;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.FutureTask;
//实现runnable接口
class Mythread1 implements Runnable{
@Override
public void run() {
}
}
class Mythread2 implements Callable{
@Override
public Object call() throws Exception {
return 200;
}
}
public class Demo1 {
public static void main(String[] args) throws ExecutionException, InterruptedException {
//调用runnable接口
Mythread1 mythread1 = new Mythread1();
Thread thread = new Thread(mythread1);
thread.setName("a");
thread.start();
//调用callable
//FutureTask是runnable的实现接口
//future 构造方法可以传递Callable
FutureTask<Integer> futureTask = new FutureTask<>(new Mythread2());
//以上的代码 可以用lambda表达式做简化
FutureTask<Integer> futureTask1 = new FutureTask<>(()->{
return 1024;
});
//FutureTask 原理 为什么叫未来任务
new Thread(futureTask1).start();
System.out.println(futureTask1.get());
}
}
Future优缺点
优点:future结合线程池异步多任务配合,能显著的提高程序的执行效率。
package com.policy.thread.JUC;
import java.util.concurrent.*;
public class FutureThreadPoolDemo {
public static void main(String[] args) throws InterruptedException, ExecutionException {
long start = System.currentTimeMillis();
//线程池
ExecutorService threadPool = Executors.newFixedThreadPool(3);
//开启多个异步任务 处理
FutureTask<String> task1 = new FutureTask<>(() -> {
TimeUnit.MILLISECONDS.sleep(500);
return "task1 over";
});
FutureTask<String> task2 = new FutureTask<>(() -> {
TimeUnit.MILLISECONDS.sleep(300);
return "task2 over";
});
FutureTask<String> task3 = new FutureTask<>(() -> {
TimeUnit.MILLISECONDS.sleep(300);
return "task3 over";
});
threadPool.submit(task1);
threadPool.submit(task2);
threadPool.submit(task3);
//获取结果
System.out.println(task1.get());
System.out.println(task2.get());
System.out.println(task3.get());
threadPool.shutdown();
long end = System.currentTimeMillis();
System.out.println(end-start+"毫秒");
System.out.println(Thread.currentThread().getName()+"执行结束");
}
private static void m1() throws InterruptedException {
long start = System.currentTimeMillis();
TimeUnit.MILLISECONDS.sleep(500);
TimeUnit.MILLISECONDS.sleep(300);
TimeUnit.MILLISECONDS.sleep(300);
long end = System.currentTimeMillis();
System.out.println(end-start+"毫秒");
System.out.println(Thread.currentThread().getName()+"执行结束");
}
}
缺点 FutureTask的缺点、
get方法会阻塞:
package com.policy.thread.JUC;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.FutureTask;
import java.util.concurrent.TimeUnit;
public class FutureAPIDemo {
public static void main(String[] args) throws ExecutionException, InterruptedException {
FutureTask<String> stringFutureTask = new FutureTask<>(() -> {
System.out.println(Thread.currentThread().getName() + "come in!");
TimeUnit.SECONDS.sleep(5);
return "task over";
});
Thread thread = new Thread(stringFutureTask);
thread.start();
System.out.println(stringFutureTask.get());//非要等到结果 才会执行
//由于会在上面get不到结果 需要等五秒之后才能get到 所以 线程会阻塞五秒 才能执行下面的输出
System.out.println("main over");
}
}
1、get容易阻塞,一般放到最后才执行
2、如果不愿放到最后,我希望过时不候,可以自动离开
System.out.println(stringFutureTask.get(3,TimeUnit.SECONDS));
最后超过时间会抛出异常
JUC的辅助类
1、减少计数的CountDownLatch
CountDownLatch主要有两个方法,当一个或者多个线程调用 await方法时,这个线程会阻塞。
调用CountDown方法会将计数器减一。当计数器变为0时,因await方法阻塞的线程会被唤醒,继续执行。
2、循环栅栏CyclicBarrier
3、信号灯Semaphore
读写锁
悲观锁
不支持并发操作
乐观锁
表锁
操作表中的一行的记录或者数据 整张表都上锁
行锁
操作表中第一行的数据,只对第一行上锁,其他线程可以操作其他行的数据
读锁
共享锁
写锁
独占锁
读写锁都有可能发生死锁的状况
读锁发生死锁的状况
读写锁的例子,如果不加上读写锁,那么在还没有写完时就会执行读取的操作,
加锁之后,写完才可以读,并且可以看到结果中写操作是独占的,读操作是共享的。
读写锁:一个资源可以被多个读操作线程访问,一个资源只能被一个写操作线程访问,一个资源不能同时存在读写线程访问,读写互斥,写写互斥,读读共享。
无锁时,多个线程共 全的情况发生。
添加锁synchronized或者Lock这样就变成了线程独占的了,资源不共享,变成单线程的了
添加读写锁ReentrantReadWriteLock:相较与上面的锁,读读操作变成了共享的了,提升性能,多线程可共同操作(读)一个资源
缺点:造成 锁饥饿,一直读没有写操作、 读的时候不能写,读完了才可以写,写的时候可以读。=会造成锁降级的过程
锁降级
写入锁降级为读锁的过程
jdk8 中锁降级的过程:
阻塞队列
BlockingQueue阻塞队列
当队列为空的时候,从对列中获取元素,只能阻塞
当队列是满的时候,向队列中添加元素,阻塞
好处是,一系列的判断是否为空,满的操作,都是自动完成的。我们不需要关心什么时候需要挂起线程,什么时候需要唤醒线程。
这些都是阻塞队列封装好的。
阻塞队列的架构
ArrayBlockingQueue:基于数组实现的阻塞队列,内部维护了一个定长的数组,以便缓存队列的数据对象。
LinkedBlockingQueue:由链表组成的阻塞队列
DelayQueue:使用优先级队列 实现的延迟无界的队列。
PriorityBlockQueue:支持优先级排序的队列
SynchronousQueue:无缓冲的阻塞队列,不存储元素的的阻塞队列,只存单个元素‘
LinkedTransferQueue:链表组成的无界阻塞队列
LinkedBlockDeque:有链表组成的双向阻塞队列
add、remove这一组的话,在长度范围内正常操作,add会返回true或者false 超过之后会抛出异常
offer和poll offer满了之后添加不会抛异常,会返回false 队列空了之后,执行poll,会进行返回null 不会抛异常
put 当队列满了之后会一直阻塞在那里,take也是,队列空了之后不会结束也不会完成 会停留在那里
第四组的offer poll设置超时时间,在时间范围内阻塞 超时就会退出 offer会返回false
ThreadPool线程池
一种线程使用模式,线程过多会带来调度的开销,进而影响缓存局部的和整体的性能。线程池维护多个线程,等待着监督管理者分配可并发执行的任务。
优点:降低资源消耗,提高响应速度。提搞可管理性
线程池的分类:
Executors.newFixedThreadPool(int)一个线程池,里面多个线程
Executors.newSingleThreadExecutor()一个线程池一个线程
Executors.newCachedThreadPool()缓存线程池,理论上可无限扩容
举例
newFixedThreadPool
package com.policy.thread.Thread202308.pool;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
public class ThreadPoolDemo1 {
public static void main(String[] args) {
//fixed
ExecutorService threadpool1 = null;//5个窗口
try {
threadpool1 = Executors.newFixedThreadPool(5);
for (int i = 0; i < 10; i++) {
//执行
threadpool1.execute(()->{
System.out.println(Thread.currentThread().getName()+"办理业务");
});
}
} catch (Exception e) {
throw new RuntimeException(e);
} finally {
threadpool1.shutdown();
}
}
}
newSingleExecutor()
package com.policy.thread.Thread202308.pool;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
public class ThreadPoolDemo1 {
public static void main(String[] args) {
//fixed
ExecutorService threadpool1 = Executors.newFixedThreadPool(5);//5个窗口
ExecutorService threadpool2 = Executors.newSingleThreadExecutor();//一个线程
try {
for (int i = 0; i < 10; i++) {
//执行
threadpool2.execute(()->{
System.out.println(Thread.currentThread().getName()+"办理业务");
});
}
} catch (Exception e) {
throw new RuntimeException(e);
} finally {
threadpool2.shutdown();
}
}
}
相当于开启了一个线程 只输出一个线程名
newCacheThreadPool()
package com.policy.thread.Thread202308.pool;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
public class ThreadPoolDemo1 {
public static void main(String[] args) {
//fixed
ExecutorService threadpool1 = Executors.newFixedThreadPool(5);//5个窗口
ExecutorService threadpool2 = Executors.newSingleThreadExecutor();//一个线程
ExecutorService threadpool3 = Executors.newCachedThreadPool();
try {
for (int i = 0; i < 10; i++) {
//执行
threadpool3.execute(()->{
System.out.println(Thread.currentThread().getName()+"办理业务");
});
}
} catch (Exception e) {
throw new RuntimeException(e);
} finally {
threadpool3.shutdown();
}
}
}
执行结果:
可以看到此处执行最多创建了7个线程,来执行一个循环任务,相当于开了七个窗口,理论上可扩充更灵活
底层实现
public static ExecutorService newFixedThreadPool(int nThreads) {
return new ThreadPoolExecutor(nThreads, nThreads,
0L, TimeUnit.MILLISECONDS,
new LinkedBlockingQueue<Runnable>());
}
public static ExecutorService newSingleThreadExecutor() {
return new FinalizableDelegatedExecutorService
(new ThreadPoolExecutor(1, 1,
0L, TimeUnit.MILLISECONDS,
new LinkedBlockingQueue<Runnable>()));
}
public static ExecutorService newCachedThreadPool() {
return new ThreadPoolExecutor(0, Integer.MAX_VALUE,
60L, TimeUnit.SECONDS,
new SynchronousQueue<Runnable>());
}
三个线程池的底层都是new ThreadPoolExecutor
public ThreadPoolExecutor(int corePoolSize,
int maximumPoolSize,
long keepAliveTime,
TimeUnit unit,
BlockingQueue<Runnable> workQueue,
ThreadFactory threadFactory,
RejectedExecutionHandler handler) {
if (corePoolSize < 0 ||
maximumPoolSize <= 0 ||
maximumPoolSize < corePoolSize ||
keepAliveTime < 0)
throw new IllegalArgumentException();
if (workQueue == null || threadFactory == null || handler == null)
throw new NullPointerException();
this.corePoolSize = corePoolSize;
this.maximumPoolSize = maximumPoolSize;
this.workQueue = workQueue;
this.keepAliveTime = unit.toNanos(keepAliveTime);
this.threadFactory = threadFactory;
this.handler = handler;
}
7个线程池参数:
int corePoolSize:常驻的核心线程的数量
int maximumPoolSize :最大的线程数量
long keepAliveTime, TimeUnit unit,存活时间 及存活时间的单位,非核心线程的空闲的存活时间,超出时间就会关闭掉
BlockingQueue<Runnable> workQueue,常驻的线程数量用完之后,进入阻塞队列
ThreadFactory threadFactory,线程工厂,用于创建线程
RejectedExecutionHandler handler拒绝策略
线程池的工作流程以及7个参数的使用
大致的流程图
执行了线程池的execute之后,线程池才会创建,
以上的图中核心线程数2 最大线程数5 阻塞队列数3
当有一个线程进到线程池之后,会最先进入核心线程中执行,如果核心线程此时已满,会加到阻塞队列中,如果阻塞队列长度也满了,那么会创建核心线程池之外的线程进行执行。如果最大线程池也满了,会根据线程策略进行操作
拒绝策略jdk内置的拒绝策略
AbortPolicy(默认) :直接抛出RejectedExecutionException异常,阻止系统正常运行。
CallerRunsPolicy:调用者运行,一种调节机制,该策略不会抛弃任务,也不会抛出异常,而是将任务回退给调用者,降低任务流量
DiscardOldestPolicy:抛弃队列中等待最久的任务,然后把当前任务加入到队列中尝试再次提交当前任务
DiscardPolicy:该策略默默的丢弃无法处理的任务,不予以任何处理也不抛出异常。如果允许丢失任务,这是最好的策略
自定义线程池
线程池不允许使用Executors去创建,而是通过ThreadPoolExecutor传参数的方式进行创建
原因1允许阻塞队列的请求长度是Integer.MAX_VALUE,可能会造成大量的线程堆积。导致OOM
实际中自己自定义线程池进行操作
Fork/join分支合并框架
可以将一个大任务,拆分成多个子任务进行并行处理,最后将子任务结果合并成最后的计算结果进行输出。
FORK:把一个任务进行拆分,大事化小
Join:把拆分任务结果进行合并