一、等待唤醒机制
生产者和消费者,也叫等待唤醒机制。他是一个十分经典的多线程协作的模式。我们来讲一个小故事:
在一个繁忙的工厂里,有一个生产线,我们称之为“共享资源”。这个生产线一次只能生产一个产品,而且需要在完全完成后才能进行下一个。工厂里有两类工人:一类是生产者,他们负责制造产品;另一类是消费者,他们负责将生产好的产品打包发货。
为了避免生产线混乱,工人们约定了一些规则:
生产者不会同时制造多个产品,每完成一个产品后,他们会通知消费者产品已准备好。 消费者不会试图包装未完成的产品,他们会等待生产者的通知。
有一天,生产者和消费者都开始工作了。生产者每制造出一个产品,就大声喊:“产品做好了!”然后消费者听到后,就会过来取走产品,并开始打包。但是,如果生产线上已经有了一个产品,生产者就不能开始制造下一个,他们必须等待消费者把当前的产品取走。同样,如果生产线上没有产品,消费者就不能进行打包,他们必须等待生产者制造出新的产品。
为了更好地协调工作,他们在生产线旁边放了一个铃铛。每当生产者完成一个产品,他们就会敲响铃铛,通知消费者:“产品做好了,快来取!”而消费者听到铃声后,就会过来取走产品,并开始打包。
这个故事中的生产者和消费者,就像Java中的线程一样,通过同步机制(铃铛)来协调对共享资源(生产线)的访问,确保生产和消费的过程既高效又有序。
在这个故事中,生产线就是Java中的一个对象,生产者和消费者是两个线程,他们通过调用这个对象的同步方法来访问共享资源,并通过wait()和notify()方法来控制对资源的访问,确保资源在任何时候只被一个线程使用。这样,生产者和消费者就能和谐地工作,既不会生产出未完成的产品,也不会让消费者无事可做。
1.1、生产者与消费者的方法
类型 | 方法名 | 描述 |
---|---|---|
线程 | start() | 启动线程,使线程开始执行其run()方法。 |
线程 | run() | 线程执行的入口方法,需要重写以定义线程要执行的操作。 |
同步控制 | wait() | 导致线程等待,直到另一个线程调用相同对象的notify()或notifyAll()方法。 |
同步控制 | notify() | 唤醒在此对象监视器上等待的单个线程。 |
同步控制 | notifyAll() | 唤醒在此对象监视器上等待的所有线程。 |
锁机制 | synchronized | 用于方法或代码块,确保同一时间只有一个线程可以执行该段代码。 |
显式锁 | ReentrantLock | 可重入的互斥锁,比synchronized提供更灵活的锁定机制。 |
显式锁 | lock() | 获取锁,如果锁被另一个线程持有,则等待直到锁被释放。 |
显式锁 | unlock() | 释放锁,允许其他线程获取该锁。 |
条件变量 | Condition | 与锁对象配合使用,用于更复杂的线程间协调。 |
条件变量 | await() | 导致当前线程等待,直到另一个线程调用相同Condition的signal()或signalAll()方法。 |
条件变量 | signal() | 唤醒在此Condition上等待的单个线程。 |
条件变量 | signalAll() | 唤醒在此Condition上等待的所有线程。 |
这其中最重要的代码就是:
类型 | 方法名 | 描述 |
---|---|---|
同步控制 | wait() | 导致线程等待,直到另一个线程调用相同对象的notify()或notifyAll()方法。 |
同步控制 | notify() | 唤醒在此对象监视器上等待的单个线程。 |
同步控制 | notifyAll() | 唤醒在此对象监视器上等待的所有线程。 |
我们来看一个例子:
1.1.1 消费者代码:
public class Foodie extends Thread{
@Override
public void run() {
/**
* 1. 循环
* 2. 同步代码块
* 3. 判断共享数据是否到达末尾(到了)
* 4. 判断共享数据是否到达末尾(没到)
*/
while (true) {
synchronized (Desk.lock) {
if (Desk.count == 0) {
break;
} else {
// 先去判断生产线上是否有产品
if (Desk.state == 0) {
// 如果没有就等
try {
Desk.lock.wait();// 让当前线程与锁对象进行绑定
} catch (InterruptedException e) {
throw new RuntimeException(e);
}
} else {
// 货物总数-1
Desk.count--;
// 如果有就打包
System.out.println("[消费者]正在打包商品,还剩" + Desk.count + "件商品");
// 打包完毕唤醒生产者生产
Desk.lock.notifyAll();
// 修改生产线的状态为没有(0)
Desk.state=0;
}
}
}
}
}
}
1.1.2 生产者代码:
public class Cook extends Thread {
@Override
public void run() {
/**
* 1. 循环
* 2. 同步代码块
* 3. 判断共享数据是否到达末尾(到了)
* 4. 判断共享数据是否到达末尾(没到)
*/
while (true) {
synchronized (Desk.lock) {
// 判断是否到达末尾
if (Desk.count == 0) {
break;
} else {
// 判断生产线上是否有食物
if (Desk.state == 0) {
// 如果没有食物
// 生产食物
System.out.println("[生产者]生产了一份食物");
// 修改生产线上食物状态
Desk.state = 1;
// 叫醒消费者开始打包
Desk.lock.notifyAll();
} else {
// 如果有食物 则等待
try {
Desk.lock.wait();
} catch (InterruptedException e) {
throw new RuntimeException(e);
}
}
}
}
}
}
}
1.1.3 中间代码与启动代码
public class Desk {
//生产线上是否有货物
public static int state=0;
//总个数
public static int count=10;
// 锁对象
public static Object lock = new Object();
}
public class TestDemo {
public static void main(String[] args) {
Cook c = new Cook();
Foodie f = new Foodie();
c.start();
f.start();
}
}
1.1.4 输出图片
可以看到,非常符合我们的 一交一提的理想状态,生产者生产然后消费者消费。
1.2 阻塞队列
1.2.1 阻塞队列的继承结构
在Java中,阻塞队列(BlockingQueue
)是java.util.concurrent
包的一部分,它提供了一个线程安全的队列,可以在队列为空时阻塞插入操作,或者在队列为满时阻塞移除操作。阻塞队列继承自java.util.Queue
接口,并且是java.io.Serializable
的实现。
1.2.2 继承结构图
java.lang.Object
|
|-- java.io.Serializable
|
|-- java.util.Collection
|
|-- java.util.Queue
|
|-- java.util.concurrent.BlockingQueue
1.2.3 阻塞队列成员方法
BlockingQueue
接口定义了以下阻塞操作的方法:
put(E e)
: 插入一个元素,如果需要的话,等待空间变得可用。take()
: 移除并返回队列头部的元素,如果需要的话,等待直到有一个元素变得可用。offer(E e, long timeout, TimeUnit unit)
: 如果可能的话,尝试插入一个元素,否则在指定的时间内等待。poll(long timeout, TimeUnit unit)
: 如果可能的话,尝试移除并返回队列头部的元素,否则在指定的时间内等待。
1.2.4 阻塞队列的实现
BlockingQueue
接口有多个实现,例如:
ArrayBlockingQueue
: 一个固定大小的数组实现,它是有界的。LinkedBlockingQueue
: 一个基于链表的实现,可以选择有界或无界。
1.2.5 阻塞队列的细节
- 生产者和消费者必须使用同一个阻塞队列
- ArrayBlockingQueue阻塞队列在声明时需要填写参数(也就是阻塞队列的长度)
1.2.6 阻塞队列实现代码以及讲解
我们先来看代码
有一个问题就是:我们的阻塞队列定义在哪里?
因为生产者和消费者必须使用同一个阻塞队列,那如果我们在生产者类和消费者类中都定义了阻塞队列,很明显那就是他俩分别用了不同的阻塞队列,所以我们需要将阻塞队列定义在额外的一个类中,然后通过构造方法的形式将阻塞队列传递进去,这样他俩就是一个类了。
来看生产者的代码,由于我们的队列长度是
1
,这里用put
添加。
ublic class Cook extends Thread {
ArrayBlockingQueue<String> queue;
public Cook(ArrayBlockingQueue<String> queue) {
this.queue = queue;
}
@Override
public void run() {
while (true) {
try {
queue.put("食品");
System.out.println("生产者放了一个食品在产线上");
} catch (InterruptedException e) {
throw new RuntimeException(e);
}
}
}
}
这里我们可以看一下
put
的源码,我们可以看到这里面已经声明了锁,所以在我们写代码的时候切记不要在外面再嵌套一层锁了,两层锁容易发生死锁的现象。
消费者的代码同样如此,用
take
取出。
public class Foodie extends Thread{
ArrayBlockingQueue<String> queue;
public Foodie(ArrayBlockingQueue<String> queue) {
this.queue = queue;
}
@Override
public void run() {
while (true){
try {
String take = queue.take();
System.out.println("消费者取出了一个"+take+"并且打包");
} catch (InterruptedException e) {
throw new RuntimeException(e);
}
}
}
}
这里我们可以看一下
take
源码:
然后是main方法
public class TestDemo {
public static void main(String[] args) {
// 创建阻塞队列要指定数量
ArrayBlockingQueue<String> queue = new ArrayBlockingQueue<>(1);
Cook cook = new Cook(queue);
Foodie foodie = new Foodie(queue);
cook.start();
foodie.start();
}
}
最后我们来看看执行结果,你会发现,诶?怎么又多条相同的?不应该是一替一换吗?
这其中的原因就是我们的输出语句不在锁
里面,但是数据确实是一替一换的
二、线程的状态
三、巩固练习题
3.1 打印奇数
同时开启两个线程,获取1-10000之间的数字,将输出所有的奇数。
public class ThreadTest1 extends Thread {
private final Object object;
public ThreadTest1(Object object) {
this.object = object;
}
@Override
public void run() {
while (true) {
synchronized (object) {
if (TestDemo.count > 10000) {
break;
} else {
// 这是奇数
if (TestDemo.count % 2 == 1) {
System.out.println(getName() + " 获取了奇数 " + TestDemo.count);
}
TestDemo.count++;
}
}
}
}
}
public class ThreadTest2 extends Thread {
private final Object object;
public ThreadTest2(Object object) {
this.object = object;
}
@Override
public void run() {
while (true) {
synchronized (object) {
if (TestDemo.count > 10000) {
break;
} else {
// 这是奇数
if (TestDemo.count % 2 == 1) {
System.out.println(getName() + " 获取了奇数 " + TestDemo.count);
}
TestDemo.count++;
}
}
}
}
}
public class TestDemo {
public static int count = 1;
public static void main(String[] args) {
Object o = new Object();
ThreadTest1 threadTest1 = new ThreadTest1(o);
ThreadTest2 threadTest2 = new ThreadTest2(o);
threadTest1.setName("线程1");
threadTest2.setName("线程2");
threadTest1.start();
threadTest2.start();
}
}
3.2 抢红包
首先是工具类 返回一个随机数,这里用到了BigDecimal,对于精确计算,要用BigDecimal ,这个之前我们讲过,可以看看JAVA小知识9
public class Random {
public static BigDecimal returnBalance(BigDecimal balance){
java.util.Random random = new java.util.Random();
// 怕随机到0,所以在最外面+0.01
double v = (random.nextDouble()+0.01);
// 转化为BigDecimal 类型
BigDecimal bigDecimal = new BigDecimal(v);
// 乘以balance 这就类似于一个数学题 x/1=y/balance 占比多少份
// 例如0.1/1 就等于 10/100 如果balance是100 0.1是x 如何算出来这个10 就是用100*0.1
bigDecimal=bigDecimal.multiply(balance);
BigDecimal roundedDecimal = bigDecimal.setScale(2, RoundingMode.DOWN);
return roundedDecimal;
}
}
接下来是实现类
public class ThreadTest extends Thread {
@Override
public void run() {
synchronized (Demomain.o) {
// 首先判断红宝数是否为1 是的话直接返回剩余余额
if (Demomain.count == 1) {
System.out.println(getName() + "抢到了" + Demomain.balance);
Demomain.count--;
} else if (Demomain.count < 1) {
System.out.println(getName() + "没抢到");
} else {// 证明还有两个以上的红包
// 随机出现在抢的钱数
BigDecimal i = Random.returnBalance(Demomain.balance);
Demomain.balance = Demomain.balance.subtract(i);
System.out.println(getName() + "抢到了" + i);
Demomain.count--;
}
}
}
}
然后是启动类
public class Demomain {
// 红包余额
public static BigDecimal balance= BigDecimal.valueOf(100);
// 红包剩余个数
public static int count=3;
public static Object o = new Object();
public static void main(String[] args) {
ThreadTest threadTest = new ThreadTest();
ThreadTest threadTest2 = new ThreadTest();
ThreadTest threadTest3 = new ThreadTest();
ThreadTest threadTest4 = new ThreadTest();
ThreadTest threadTest5 = new ThreadTest();
threadTest.setName("线程1");
threadTest2.setName("线程2");
threadTest3.setName("线程3");
threadTest4.setName("线程4");
threadTest5.setName("线程5");
threadTest.start();
threadTest2.start();
threadTest3.start();
threadTest4.start();
threadTest5.start();
}
}
看程序截图
四、线程池
4.1、核心原理
① 创建一个池子,池子中是空的
② 提交任务时,池子会创建新的线程对象,任务执行完毕,线程归还给池子下回再次提交任务时,不需要创建新的线程,直接复用已有的线程即可
③ 但是如果提交任务时,池子中没有空闲线程,也无法创建新的线程,任务就会排队等待
4.2 线程池的构造方法
构造方法 | 说明 |
---|---|
public static ExecutorService newFixedThreadPool(int nThreads) | 创建一个固定线程数量的线程池。 |
public static ExecutorService newCachedThreadPool() | 创建一个根据需要创建新线程的线程池。(没有上限的线程池) |
这种方法构造的线程池有很大的弊端,一般来讲我们都会使用自定义的线程池。
4.3 自定义线程池
4.3.1 构造方法
构造方法 | 说明 |
---|---|
public ThreadPoolExecutor(int corePoolSize, int maximumPoolSize, long keepAliveTime, TimeUnit unit, BlockingQueue<Runnable> workQueue) | 创建一个线程池,具有给定的初始参数。核心线程池大小、最大线程池大小、线程空闲时间、时间单位和任务队列。 |
public ThreadPoolExecutor(int corePoolSize, int maximumPoolSize, long keepAliveTime, TimeUnit unit, BlockingQueue<Runnable> workQueue, ThreadFactory threadFactory) | 创建一个线程池,具有给定的初始参数和线程工厂。线程工厂用于创建新线程。 |
public ThreadPoolExecutor(int corePoolSize, int maximumPoolSize, long keepAliveTime, TimeUnit unit, BlockingQueue<Runnable> workQueue, RejectedExecutionHandler handler) | 创建一个线程池,具有给定的初始参数和拒绝策略。拒绝策略用于处理无法执行的新任务。 |
public ThreadPoolExecutor(int corePoolSize, int maximumPoolSize, long keepAliveTime, TimeUnit unit, BlockingQueue<Runnable> workQueue, ThreadFactory threadFactory, RejectedExecutionHandler handler) | 创建一个线程池,具有给定的初始参数、线程工厂和拒绝策略。 |
我们以最后一个构造函数举例, 来看代码
public static void main(String[] args) {
ThreadPoolExecutor executor = new ThreadPoolExecutor(
3,//核心线程数量 最小值为0
8,// 最大线程数量 不小于零 且要大于核心线程数量
60,// 空闲线程最大存活时间
TimeUnit.SECONDS,// 空闲线程最大存活时间单位
new ArrayBlockingQueue<>(3),// 任务队列
Executors.defaultThreadFactory(), // 创建线程的工厂
new ThreadPoolExecutor.AbortPolicy() // 任务的拒绝策略
);
executor.submit(new demo1());
executor.submit(new demo1());
executor.submit(new demo1());
executor.submit(new demo1());
}
public class demo1 implements Runnable{
@Override
public void run() {
for (int i = 0; i < 10; i++) {
System.out.println( Thread.currentThread().getName()+ ": " + i);
}
}
}