内容
-
理解消费者、生产者的案例执行过程,
-
理解用队列方式做消费者、生产者的案例
-
会使用线程池运行任务,
-
理解ThreadPoolExecutor7个参数的含义(会根据需要 通过参数控制线程池的总数量)
匿名内部类里的异常处理
Thread 使用匿名内部类时 里面的异常 必须用try处理 无法使用声明
生产者消费者
生产者和消费者模式概述
-
所谓生产者消费者问题,实际上主要是包含了两类线程:
- 一类是生产者线程用于生产数据
- 一类是消费者线程用于消费数据
-
为了解耦生产者和消费者的关系,通常会采用共享的数据区域,就像是一个仓库
- 生产者生产数据之后直接放置在共享数据区中,并不需要关心消费者的行为
- 消费者只需要从共享数据区中去获取数据,并不需要关心生产者的行为
-
Object类的等待和唤醒方法
方法名 说明 void wait() 导致当前线程等待,wait会释放同步锁, 直到另一个线程调用该对象的 notify()方法或 notifyAll()方法, void notify() 唤醒正在等待对象监视器的单个线程 void notifyAll() 唤醒正在等待对象监视器的所有线程
案例
案例需求
- 桌子类(Desk):定义一个整型变量表示购买的汉堡的总数量,定义锁对象变量,定义变量 用来标记桌子上有无汉堡
- 生产者类(Cooker):实现Runnable接口,重写run()方法,设置线程任务
- 1.判断桌子是否有汉堡,决定当前线程是否执行
- 2.如果有汉堡,就进入等待状态,如果没有汉堡,继续执行,生产汉堡
- 3.生产汉堡之后,更新桌子上汉堡状态,唤醒消费者消费汉堡
- 消费者类(Foodie):实现Runnable接口,重写run()方法,设置线程任务
- 1.判断是否有汉堡,决定当前线程是否执行
- 2.如果没有汉堡,就进入等待状态,如果有汉堡,就消费汉堡
- 3.消费汉堡后,更新桌子上汉堡状态,唤醒生产者生产汉堡
- 测试类(Demo):里面有main方法,main方法中的代码步骤如下
- 创建生产者线程和消费者线程对象
- 分别开启两个线程
注意
- obj.wait() 可以让线程处于等待,并且把obj的锁释放
- obj.notifyAll()可以唤醒等待的线程继续执行
代码1 放汉堡的桌子
package com.heima.test2;
public class Desk {
public static int count = 10;//买了10个汉堡
public static final Object obj = new Object();//对象 作为锁来用
//true表示有 false表示没有
public static boolean flag = false;//标记桌子是否有汉堡
}
代码2 生产者
package com.heima.test2;
//1.判断桌子是否有汉堡,决定当前线程是否执行
// 2.如果桌子有汉堡,就进入等待状态,如果没有汉堡,继续执行,生产汉堡
//3.生产汉堡之后,更新桌子上汉堡状态,唤醒消费者消费汉堡
public class Cooker implements Runnable {
@Override
public void run() {
while (true) {
System.out.println("大厨被锁挡住了");
//当前桌子里的count变为0 循环结束
synchronized (Desk.obj) {
System.out.println("大厨进来了");
if (Desk.count == 0) {
break;
} else {
if (Desk.flag) {
//桌子有汉堡
System.out.println("桌子有汉堡,大厨在等待");
try {
Desk.obj.wait();
} catch (InterruptedException e) {
e.printStackTrace();
}
System.out.println("大厨等待结束");
} else {
//桌子没有汉堡
System.out.println("当前生产了一个统一银座的汉堡");
//生产汉堡之后,更新桌子上汉堡状态
Desk.flag = true;
//唤醒消费者消费汉堡
System.out.println("大厨发出了通知");
Desk.obj.notifyAll();
}
}
}
}
}
}
代码3 消费者
package com.heima.test2;
public class Foodie implements Runnable {
@Override
public void run() {
while (true) {
System.out.println("吃货被锁挡住了");
synchronized (Desk.obj) {
System.out.println("吃货进来了");
//判断是否有汉堡
if (Desk.count == 0) {
break;
} else {
//判断桌子上是否有汉堡
if (Desk.flag) {
//如果有汉堡 就消费
System.out.println("吃了一个美味的汉堡");
System.out.println(Desk.count);
//汉堡数量-1
Desk.count--;
//更新汉堡状态
Desk.flag = false;
//通知生产者生产
System.out.println("通知生产者生产");
Desk.obj.notifyAll();
} else {
//如果没有汉堡 就等待
System.out.println("没有汉堡 消费者等待");
try {
Desk.obj.wait();
} catch (InterruptedException e) {
e.printStackTrace();
}
System.out.println("消费者等待结束");
}
}
}
}
}
}
代码4 测试类
public class Demo {
public static void main(String[] args) {
/*消费者步骤:
1,判断桌子上是否有汉堡包。
2,如果没有就等待。
3,如果有就开吃
4,吃完之后,桌子上的汉堡包就没有了
叫醒等待的生产者继续生产
汉堡包的总数量减一*/
/*生产者步骤:
1,判断桌子上是否有汉堡包
如果有就等待,如果没有才生产。
2,把汉堡包放在桌子上。
3,叫醒等待的消费者开吃。*/
new Thread(new Foodie()).start();
new Thread(new Cooker()).start();
}
}
代码逻辑
- 打印结果 观察过程
- 观察 锁的作用 wait的作用 和notifyall的作用
代码优化1
- 把Desk类的属性 改为成员 变量,用创建对象的方式把Desk对象传给消费者对象和生产者对象
Desk类代码
package com.heima.test3;
public class Desk {
public int count = 10;//买了10个汉堡
public final Object obj = new Object();//对象 作为锁来用
//true表示有 false表示没有
public boolean flag = false;//标记桌子是否有汉堡
public Desk(int count) {
this.count = count;
}
public Desk() {
}
}
cooker
package com.heima.test3;
//1.判断桌子是否有汉堡,决定当前线程是否执行
// 2.如果桌子有汉堡,就进入等待状态,如果没有汉堡,继续执行,生产汉堡
//3.生产汉堡之后,更新桌子上汉堡状态,唤醒消费者消费汉堡
public class Cooker implements Runnable {
private Desk desk;
public Cooker(Desk desk) {
this.desk = desk;
}
@Override
public void run() {
while (true) {
System.out.println("大厨被锁挡住了");
//当前桌子里的count变为0 循环结束
synchronized (desk.obj) {
System.out.println("大厨进来了");
if (desk.count == 0) {
break;
} else {
if (desk.flag) {
//桌子有汉堡
System.out.println("桌子有汉堡,大厨在等待");
try {
desk.obj.wait();
} catch (InterruptedException e) {
e.printStackTrace();
}
System.out.println("大厨等待结束");
} else {
//桌子没有汉堡
System.out.println("当前生产了一个统一银座的汉堡");
//生产汉堡之后,更新桌子上汉堡状态
desk.flag = true;
//唤醒消费者消费汉堡
System.out.println("大厨发出了通知");
desk.obj.notifyAll();
}
}
}
}
}
}
Foodie
package com.heima.test3;
public class Foodie implements Runnable {
private Desk desk;
public Foodie(Desk desk) {
this.desk = desk;
}
@Override
public void run() {
while (true) {
System.out.println("吃货被锁挡住了");
synchronized (desk.obj) {
System.out.println("吃货进来了");
//判断是否有汉堡
if (desk.count == 0) {
break;
} else {
//判断桌子上是否有汉堡
if (desk.flag) {
//如果有汉堡 就消费
System.out.println("吃了一个美味的汉堡");
System.out.println(desk.count);
//汉堡数量-1
desk.count--;
//更新汉堡状态
desk.flag = false;
//通知生产者生产
System.out.println("通知生产者生产");
desk.obj.notifyAll();
} else {
//如果没有汉堡 就等待
System.out.println("没有汉堡 消费者等待");
try {
desk.obj.wait();
} catch (InterruptedException e) {
e.printStackTrace();
}
System.out.println("消费者等待结束");
}
}
}
}
}
}
测试类
package com.heima.test3;
public class Demo01 {
public static void main(String[] args) {
Desk desk01 = new Desk(5);
new Thread(new Foodie(desk01)).start();
new Thread(new Cooker(desk01)).start();
Desk desk02 = new Desk(15);
new Thread(new Foodie(desk02)).start();
new Thread(new Cooker(desk02)).start();
}
}
队列的使用 (理解 后期学习也会用的)
ArrayBlockingQueue类的使用
- put 添加数据到队列,如果队列满了 代码阻塞等待
- take 从队列获取数据,如果队列空了,代码阻塞等待
import java.util.concurrent.ArrayBlockingQueue;
public class TestQueue {
public static void main(String[] args) {
ArrayBlockingQueue q = new ArrayBlockingQueue(3);
MyRun1 m1 = new MyRun1(q);
MyRun2 m2 = new MyRun2(q);
new Thread(m1).start();
new Thread(m2).start();
}
}
class MyRun1 implements Runnable {
public ArrayBlockingQueue q;
public MyRun1(ArrayBlockingQueue q) {
this.q = q;
}
@Override
public void run() {
//向队列里添加数据
for (int i = 0; i < 100; i++) {
try {
q.put(i);//存入队列 队列满了 代码就停在这里
System.out.println("存入了" + i);
} catch (InterruptedException e) {
e.printStackTrace();
}
try {
Thread.sleep(10);
} catch (InterruptedException e) {
e.printStackTrace();
}
}
}
}
class MyRun2 implements Runnable {
public ArrayBlockingQueue q;
public MyRun2(ArrayBlockingQueue q) {
this.q = q;
}
@Override
public void run() {
while (true) {
int peek = 0;
try {
System.out.println("从队列里取数据");
peek = (int) q.take();//从队列里取数据 如果没有数据 就停在这里
System.out.println(peek);
} catch (InterruptedException e) {
e.printStackTrace();
}
}
}
}
- 无界队列 LinkedBlockingQueue 创建 的时候不需要给容量
- ArrayBlockingQueue是有界队列 创建 的时候必须给容量
队列版 消费者生产者
Cooker
public class Cooker implements Runnable {
ArrayBlockingQueue q;
public Cooker(ArrayBlockingQueue q) {
this.q = q;
}
@Override
public void run() {
while (true) {
try {
this.q.put("汉堡");
System.out.println("大厨做了一个精美的汉堡");
} catch (InterruptedException e) {
e.printStackTrace();
}
}
}
}
Foodie
public class Foodie implements Runnable {
ArrayBlockingQueue q;
public Foodie(ArrayBlockingQueue q) {
this.q = q;
}
@Override
public void run() {
while (true) {
try {
String bun = (String) this.q.take();
System.out.println("拿起一个汉堡吃");
} catch (InterruptedException e) {
e.printStackTrace();
}
}
}
}
测试类
public class Demo01 {
public static void main(String[] args) {
ArrayBlockingQueue q = new ArrayBlockingQueue(5);
Cooker cooker = new Cooker(q);
Foodie foodie = new Foodie(q);
Thread t1 = new Thread(cooker);
Thread t2 = new Thread(foodie);
t2.start();
t1.start();
}
}
线程状态介绍
- 当线程被创建并启动以后,它既不是一启动就进入了执行状态,也不是一直处于执行状态。线程对象在不同的时期有不同的状态。那么Java中的线程存在哪几种状态呢?
- Java中的线程状态被定义在了java.lang.Thread.State枚举类中,State枚举类的源码如下:
public class Thread {
public enum State {
/* 新建 */
NEW ,
/* 可运行状态 */
RUNNABLE ,
/* 阻塞状态 */
BLOCKED ,
/* 无限等待状态 */
WAITING ,
/* 计时等待 */
TIMED_WAITING ,
/* 终止 */
TERMINATED;
}
// 获取当前线程的状态
public State getState() {
return jdk.internal.misc.VM.toThreadState(threadStatus);
}
}
通过源码我们可以看到Java中的线程存在6种状态,每种线程状态的含义如下
线程状态 | 具体含义 |
---|---|
NEW | 一个尚未启动的线程的状态。也称之为初始状态、开始状态。线程刚被创建,但是并未启动。还没调用start方法。MyThread t = new MyThread()只有线程象,没有线程特征。 |
RUNNABLE | 当我们调用线程对象的start方法,那么此时线程对象进入了RUNNABLE状态。那么此时才是真正的在JVM进程中创建了一个线程,线程一经启动并不是立即得到执行,线程的运行与否要听令与CPU的调度,那么我们把这个中间状态称之为可执行状态(RUNNABLE)也就是说它具备执行的资格,但是并没有真正的执行起来而是在等待CPU的度。 |
BLOCKED | 当一个线程试图获取一个对象锁,而该对象锁被其他的线程持有,则该线程进入Blocked状态;当该线程持有锁时,该线程将变成Runnable状态。 |
WAITING | 一个正在等待的线程的状态。也称之为等待状态。造成线程等待的原因有两种,分别是调用Object.wait()、join()方法。处于等待状态的线程,正在等待其他线程去执行一个特定的操作。例如:因为wait()而等待的线程正在等待另一个线程去调用notify()或notifyAll();一个因为join()而等待的线程正在等待另一个线程结束。 |
TIMED_WAITING | 一个在限定时间内等待的线程的状态。也称之为限时等待状态。造成线程限时等待状态的原因有三种,分别是:Thread.sleep(long),Object.wait(long)、join(long)。 |
TERMINATED | 一个完全运行完成的线程的状态。也称之为终止状态、结束状态 |
各个状态的转换,如下图所示:
Executor创建线程池
- 英*/ɪɡˈzekjətə®/*
- 美*/ɪɡˈzekjətər/*
体验代码
- 自己查询学习newCachedThreadPool 的底层队列SynchronousQueue
public static void main(String[] args) throws InterruptedException {
//1,创建一个默认的线程池对象.池子中默认是空的.默认最多可以容纳int类型的最大值.
ExecutorService executorService = Executors.newCachedThreadPool();
//Executors --- 可以帮助我们创建线程池对象
//ExecutorService --- 可以帮助我们控制线程池
//给线程池添加任务 这里用lambda 实际传入的是Runnable的实现类的对象
executorService.submit(()->{
System.out.println(Thread.currentThread().getName() + "在执行了");
});
Thread.sleep(2000);
//给线程池添加任务 这里用lambda 实际传入的是Runnable的实现类的对象
executorService.submit(()->{
System.out.println(Thread.currentThread().getName() + "在执行了");
});
//关闭线程池,
executorService.shutdown();
}
创建线程指定最大数量,并且获取线程数量
- static ExecutorService newFixedThreadPool(int nThreads)
代码
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.ThreadPoolExecutor;
public class MyThreadPoolDemo2 {
public static void main(String[] args) {
//参数不是初始值而是最大值
ExecutorService executorService = Executors.newFixedThreadPool(10);
//强转为ThreadPoolExecutor对象
ThreadPoolExecutor pool = (ThreadPoolExecutor) executorService;
System.out.println(pool.getPoolSize());//0 获取线程的数量
executorService.submit(() -> {
System.out.println(Thread.currentThread().getName() + "在执行了");
});
executorService.submit(() -> {
System.out.println(Thread.currentThread().getName() + "在执行了");
});
System.out.println(pool.getPoolSize());//2 获取线程的数量
executorService.shutdown();
}
}
线程池-ThreadPoolExecutor
创建线程池对象 :
ThreadPoolExecutor threadPoolExecutor = new ThreadPoolExecutor(核心线程数量,最大线程数量,空闲线程最大存活时间,时间单位,任务队列,创建线程工厂,任务的拒绝策略);
代码实现 :
package com.itheima.mythreadpool;
import java.util.concurrent.ArrayBlockingQueue;
import java.util.concurrent.Executors;
import java.util.concurrent.ThreadPoolExecutor;
import java.util.concurrent.TimeUnit;
public class MyThreadPoolDemo3 {
// 参数一:核心线程数量
// 参数二:最大线程数
// 参数三:空闲线程最大存活时间
// 参数四:时间单位
// 参数五:任务队列
// 参数六:创建线程工厂
// 参数七:任务的拒绝策略
public static void main(String[] args) {
ThreadPoolExecutor pool = new ThreadPoolExecutor(2,5,20,TimeUnit.SECONDS,new ArrayBlockingQueue<>(10), Executors.defaultThreadFactory(),new ThreadPoolExecutor.AbortPolicy());
pool.submit(new MyRunnable());
pool.submit(new MyRunnable());
pool.shutdown();
}
}
线程池-参数详解
public ThreadPoolExecutor(int corePoolSize,
int maximumPoolSize,
long keepAliveTime,
TimeUnit unit,
BlockingQueue<Runnable> workQueue,
ThreadFactory threadFactory,
RejectedExecutionHandler handler)
corePoolSize: 核心线程的最大值,不能小于0
maximumPoolSize:最大线程数,不能小于等于0,maximumPoolSize >= corePoolSize
keepAliveTime: 空闲线程最大存活时间,不能小于0
unit: 时间单位
workQueue: 任务队列,不能为null
threadFactory: 创建线程工厂,不能为null
handler: 任务的拒绝策略,不能为null
线程池的运行过程
ThreadPoolExecutor pool = new ThreadPoolExecutor(
2, //核心线程数量 不被轻易销毁
5, // 最大数量 超出核心线程的 其它线程 临时线程(当任务结束后,空闲后超过指定时间就销毁)
2, //指定的空闲的临时线程存活时间
TimeUnit.SECONDS,//存活时间的单位
new ArrayBlockingQueue<>(3),//存放任务队列
Executors.defaultThreadFactory(),//线程工厂
new ThreadPoolExecutor.AbortPolicy());//当任务的数量 超过了 最大线程数量+队列的长度 就会拒绝
- 1 如果添加的任务数量 大于等于核心线程数量 但是 小于等于核心线程数量+任务队列的数量, 会创建核心线程,多余的任务放到队列里
- 2 如果任务数量大于等于核心线程数量+任务队列的数量, 多出的任务 会创建临时线程会执行,但是总线程数量不能超过设置的最大数量
1.7 线程池-非默认任务拒绝策略
- RejectedExecutionHandler是jdk提供的一个任务拒绝策略接口,它下面存在4个子类。
ThreadPoolExecutor.AbortPolicy: 丢弃任务并抛出RejectedExecutionException异常。是默认的策略。
ThreadPoolExecutor.DiscardPolicy: 丢弃任务,但是不抛出异常 这是不推荐的做法。
ThreadPoolExecutor.DiscardOldestPolicy: 抛弃队列中等待最久的任务 然后把当前任务加入队列中。
ThreadPoolExecutor.CallerRunsPolicy: 调用任务的run()方法绕过线程池直接执行。
- 注:明确线程池对多可执行的任务数 = 队列容量 + 最大线程数
代码演示1:演示ThreadPoolExecutor.AbortPolicy任务处理策略
public class ThreadPoolExecutorDemo01 {
public static void main(String[] args) {
/**
* 核心线程数量为1 , 最大线程池数量为3, 任务容器的容量为1 ,空闲线程的最大存在时间为20s
*/
ThreadPoolExecutor threadPoolExecutor = new ThreadPoolExecutor(1 , 3 , 20 , TimeUnit.SECONDS ,
new ArrayBlockingQueue<>(1) , Executors.defaultThreadFactory() , new ThreadPoolExecutor.AbortPolicy()) ;
// 提交5个任务,而该线程池最多可以处理4个任务,当我们使用AbortPolicy这个任务处理策略的时候,就会抛出异常
for(int x = 0 ; x < 5 ; x++) {
threadPoolExecutor.submit(() -> {
System.out.println(Thread.currentThread().getName() + "---->> 执行了任务");
});
}
}
}
控制台输出结果
pool-1-thread-1---->> 执行了任务
pool-1-thread-3---->> 执行了任务
pool-1-thread-2---->> 执行了任务
pool-1-thread-3---->> 执行了任务
控制台报错,仅仅执行了4个任务,有一个任务被丢弃了
案例演示2:演示ThreadPoolExecutor.DiscardPolicy任务处理策略
public class ThreadPoolExecutorDemo02 {
public static void main(String[] args) {
/**
* 核心线程数量为1 , 最大线程池数量为3, 任务容器的容量为1 ,空闲线程的最大存在时间为20s
*/
ThreadPoolExecutor threadPoolExecutor = new ThreadPoolExecutor(1 , 3 , 20 , TimeUnit.SECONDS ,
new ArrayBlockingQueue<>(1) , Executors.defaultThreadFactory() , new ThreadPoolExecutor.DiscardPolicy()) ;
// 提交5个任务,而该线程池最多可以处理4个任务,当我们使用DiscardPolicy这个任务处理策略的时候,控制台不会报错
for(int x = 0 ; x < 5 ; x++) {
threadPoolExecutor.submit(() -> {
System.out.println(Thread.currentThread().getName() + "---->> 执行了任务");
});
}
}
}
控制台输出结果
pool-1-thread-1---->> 执行了任务
pool-1-thread-1---->> 执行了任务
pool-1-thread-3---->> 执行了任务
pool-1-thread-2---->> 执行了任务
控制台没有报错,仅仅执行了4个任务,有一个任务被丢弃了