前言
在你立足处深挖下去,就会有泉水涌出!别管蒙昧者们叫嚷:“下边永远是地狱!”
博客主页:KC老衲爱尼姑的博客主页
博主的github,平常所写代码皆在于此
共勉:talk is cheap, show me the code
作者是爪哇岛的新手,水平很有限,如果发现错误,一定要及时告知作者哦!感谢感谢!
文章目录
- 线程之间的协作
- wait()
- notify()方法
- notifyAll()方法
- wait 和 sleep 的对比(面试题)
- 阻塞式队列
- 阻塞队列是什么?
- 标准库中阻塞队列类
- 生产者-消费者模型
- 为什么需要使用生产者-消费者模型
- 生产者-消费者模型特点
- 生产者-消费者模型作用
- 基于BlockingQueue 实现生产者-消费者模型
- 模拟阻塞队列
- 基于模拟阻塞队列实现生产者-消费者模型
- 任务间使用管道进行输入/输出
线程之间的协作
再次之前我们已经解决了,如果多个任务交替着步入某项共享资源,可以使用互斥来使得任何时刻只有一个任务可以访问这项资源。现在我们需要学习如何使任务彼此之间可以协作,可以达到多个任务一起工作去解决某个问题。现在的问题不是线程之间的干涉,而是线程之间的协作。线程之间的协调涉及到某些部分任务必须在其他 部分被解决之前解决。这非常像盖房子,必须先挖好房子的地基,然后同时设计好地基所需的钢结构和和水泥,而这两项任务必须在浇筑地基之前完成。水泥浇筑完之后才可以在此基础上砌墙。在这些任务中,某些可以并行执行,但是某些步骤需要所有的任务结束之后才能开动。
当线程协作时,关键的问题是这些任务之间的握手,所谓的握手可以视为一种通知机制。为了实现这种握手,依旧需要使用到互斥,在多线程环境下,互斥能保证只有一个线程可以响应某个信号,这样就可以避免多个线程之间的竞争。在互斥的基础上,我们为线程添加了一种新途径,可以将自身挂起,直到某些外部条件发生变化时,表示是时候这个线程可以干活了。这种握手可以通过Object的方法wait()和notify()来安全地实现。
wait()
wait()使得线程可以等待某个条件发生变化,而自身是无法改变这个条件。通常,这种条件将由另一个任务来改变。你肯定不想你的线程不断测试这个任务,不断的进行空循环,这个被称为忙等,通常是一种不良好的CPU周期使用方式。这就好比张三的舍友率先进入了厕所,巧了此时张三也想上厕所,张三就不断在敲门说:“你好了没”。因此wait()方法会在等待外界条件的时候会将任务挂起,并且只有在notify()或notifyAll()触发时,即表示发生某些感感兴趣的事物,这个线程才会被唤醒去检查所产生的变化。这个通知就像,舍友告诉张三我已经解决了,你可以进去了。wait通常搭配synchronized使用,脱离synchronized使用wait会直接抛出异常。所以使用wait首先得获取锁,然后使当前执行代码的线程进行等待,然后释放锁,当满足条件时会被唤醒,重新尝试获取锁。
wait 结束等待的条件:
- 其他线程调用该对象的 notify 方法.
- wait 等待时间超时 (wait 方法提供一个带有 timeout 参数的版本, 来指定等待时间).
- 其他线程调用该等待线程的 interrupted 方法, 导致 wait 抛出 InterruptedException 异常
代码示例: 观察wait()方法使用
public class WaitTask implements Runnable{
private Object lock;
public WaitTask(Object lock) {
this.lock = lock;
}
@Override
public void run() {
synchronized (lock) {
System.out.println("你好,我是:"+Thread.currentThread().getName());
try {
System.out.println("等待林妹妹回复");
lock.wait();
//lock.wait(1000);//具有时间的等待,过期不候。
System.out.println("林妹妹回复我了");
} catch (InterruptedException e) {
e.printStackTrace();
}
}
}
public static void main(String[] args) {
Object lock = new Object();
Thread t1 = new Thread(new WaitTask(lock),"贾宝玉");
t1.start();
}
}
wait方法属于Object,而Object是被所有类都继承的。当我们调用的时候实际前面省略了this.wait是必须包含在同步代码块或者同步代码块中,其同步监视器的对象(锁 的对象)与this也就是当前的对象必须一致,不然会抛出IllegalMonitorStateException。
运行结果:
该程序执行到wait之后就会一直等待下去,那么程序不可能一直等待下去,这个时候就该唤醒方法notify()出场 了。
IllegalMonitorStateException复现
示例代码
public class WaitTask implements Runnable{
private Object lock;
public WaitTask(Object lock) {
this.lock = lock;
}
@Override
public void run() {
synchronized (this) {
System.out.println("你好,我是:"+Thread.currentThread().getName());
try {
System.out.println("等待林妹妹回复");
lock.wait();
System.out.println("林妹妹回复我了");
} catch (InterruptedException e) {
e.printStackTrace();
}
}
}
public static void main(String[] args) {
Object lock = new Object();
Thread t1 = new Thread(new WaitTask(lock),"贾宝玉");
t1.start();
}
}
运行结果:
notify()方法
notify 方法是唤醒等待的线程,notify()所在的同步代码块或者同步方法的锁对象必须和wait方法所在的同步代码块或者同步方法的锁对象一致,不然不会唤醒。
-
方法notify()也要在同步方法或同步块中调用,该方法是用来通知那些可能等待该对象的对象锁的其它线程,对其发出通知notify,并使它们重新获取该对象的对象锁。
-
如果有多个线程等待,则有线程调度器随机挑选出一个呈 wait 状态的线程。(并没有 “先来后到”)
-
在notify()方法后,当前线程不会马上释放该对象锁,要等到执行notify()方法的线程将程序执行完,也就是退出同步代码块之后才会释放对象锁。
示例代码
public class NotifyTask implements Runnable {
private Object lock;
public NotifyTask(Object lock) {
this.lock = lock;
}
@Override
public void run() {
synchronized (lock) {
System.out.println("你好,我是:"+Thread.currentThread().getName());
lock.notify();
}
}
public static void main(String[] args) {
Object lock = new Object();
Thread t1 = new Thread(new WaitTask(lock),"贾宝玉");
t1.start();
Thread t2 = new Thread(new NotifyTask(lock),"林黛玉");
t2.start();
}
}
运行结果:
notifyAll()方法
notify方法只是唤醒某一个等待线程. 使用notifyAll方法可以一次唤醒所有的等待线程.
代码示例
使用notifyAll()方法唤醒所有等待线程, 在上面的代码基础上做出修改,创建 3 个 WaitTask 实例. 1 个 NotifyTask 实例.。
public class NotifyTask implements Runnable {
private Object lock;
public NotifyTask(Object lock) {
this.lock = lock;
}
@Override
public void run() {
synchronized (lock) {
System.out.println("你们好,我是:"+Thread.currentThread().getName());
lock.notifyAll();
}
}
public static void main(String[] args) throws InterruptedException {
Object lock = new Object();
Thread t1 = new Thread(new WaitTask(lock),"贾宝玉");
Thread t2 = new Thread(new WaitTask(lock),"妙玉");
Thread t3 = new Thread(new WaitTask(lock),"史湘云");
Thread t4 = new Thread(new NotifyTask(lock),"林黛玉");
t1.start();
t2.start();
t3.start();
Thread.sleep(2000);
t4.start();
}
}
运行结果:
wait 和 sleep 的对比(面试题)
理论上wait和sleep没有可比性,因为一个是用于线程通信,一个是让线程阻塞一段时间。唯一的相同点就是让线程放弃执行一段时间。
在此就浅浅的总结:
-
wait 需要搭配 synchronized 使用. sleep 不需要.
-
wait 是 Object 的方法 sleep 是 Thread 的静态方法.
阻塞式队列
阻塞队列是什么?
阻塞队列是一种特殊的队列. 也遵守 “先进先出” 的原则,在此基础上,如果队列满的时候,继续入队列就会阻塞,到有其他线程从队列中取走元素。如果队列空的时候,继续出队列也会阻塞, 直到有其他线程往队列中插入元素。、
标准库中阻塞队列类
在 Java 标准库中内置了阻塞队列. 如果我们需要在一些程序中使用阻塞队列, 直接使用标准库中的即可. BlockingQueue 是个接口,需要使用它的实现之一来使用 BlockingQueue,java.util.concurrent
包下具有以下 BlockingQueue 接口的实现类:
JDK 提供了 7 个阻塞队列。分别是
- ArrayBlockingQueue :一个由数组结构组成的有界阻塞队列
- LinkedBlockingQueue :一个由链表结构组成的有界阻塞队列
- PriorityBlockingQueue :一个支持优先级排序的无界阻塞队列
- DelayQueue:一个使用优先级队列实现的无界阻塞队列
- SynchronousQueue:一个不存储元素的阻塞队列
- LinkedTransferQueue:一个由链表结构组成的无界阻塞队列(实现了继承于 BlockingQueue 的 TransferQueue)
- LinkedBlockingDeque:一个由链表结构组成的双向阻塞队列
BlockingQueue 主要提供四类方法,如下表所示
方法 | 抛出异常 | 返回特定值 | 阻塞 | 阻塞特定时间 |
---|---|---|---|---|
入队 | add(e) | offer(e) | put(e) | offer(e,time,unit) |
出队 | remove() | poll() | take() | poll(e,time,unit) |
获取队首元素 | element() | peek() |
生产者-消费者模型
假设有两个线程分别是线程A和线程B,两个线程共享一个缓冲区,线程A负责往缓冲区中放入数据,线程B往缓冲区取出数据,那么这就是 生产者-消费者模型,其中线程A就是生产者,线程B就是消费者。
为什么需要使用生产者-消费者模型
在多线程环境下,如果生产者生产数据的速度足够快,而消费者消费数据的速度相对于生产者比慢,那么生产者就得等到消费者把数据消费完了再生产,因为生产者再生产数据没地方放啊!!!。同理,如果消费者消费的速度赶上了生产者生产的速度,那么消费者就经常处于等待状态。所以 为了平衡生产者和消费者之间的生产和消费数据的能力,就引入了缓冲区来存储生产者生产的数据,所以就有生产者-消费者模型。
生产者-消费者模型特点
- 保证生产者不会在缓冲区满的时候继续向缓冲区放入数据,而消费者也不会在缓冲区空的时候,消耗数据。
- 当缓冲区满的时候,生产者会进入等待状态,当下次消费者开始消耗缓冲区的数据时,生产者才会被唤醒,开始往缓冲区中添加数据;当缓冲区空的时候,消费者也会进入等待状态,直到生产者往缓冲区中添加数据时才会被唤醒
生产者-消费者模型作用
- 削峰填谷:当服务器短时间收到了大量的请求,服务器可能直接被打没了,为了避免服务器宕机,可以将请求放到一个阻塞队列中,然后再由消费者线程慢慢的来处理每个请求.
- 解耦:生产者不需要关心谁去消费数据,反正有人消费就行。消费者不需要关心生产数据,反正有人生产就行。
基于BlockingQueue 实现生产者-消费者模型
示例代码
import java.util.Random;
import java.util.concurrent.BlockingQueue;
import java.util.concurrent.LinkedBlockingDeque;
import java.util.concurrent.TimeUnit;
public class ProducerConsumerModel {
private static int count;
public static void main(String[] args) throws InterruptedException {
BlockingQueue<Integer> blockingQue = new LinkedBlockingDeque<>();
Thread consumer = new Thread(() -> {
while (true) {
try {
Integer num = blockingQue.take();
TimeUnit.MILLISECONDS.sleep(1000);
count++;
System.out.println("消费者消费了"+count+"个数据,"+"数据是:"+num);
} catch (InterruptedException e) {
throw new RuntimeException(e);
}
}
});
consumer.start();
Thread producer = new Thread(() -> {
while (true) {
Random rand = new Random();
try {
Integer num = rand.nextInt();
blockingQue.put(num);
TimeUnit.MILLISECONDS.sleep(1000);
count++;
System.out.println("生产者生产了"+count+"个数据,"+"数据是:"+num);
} catch (InterruptedException e) {
throw new RuntimeException(e);
}
}
});
producer.start();
consumer.join();
producer.join();
}
运行结果:
模拟阻塞队列
使用循环队列以及synchronized来模拟阻塞队列
示例代码
public class BlockingQueue {
/**
* 队列数据
*/
private int[] elem = new int[100];
/**
* 队头指针
*/
private int head;
/**
* 队尾指针
*/
private int tail;
/**
* 队列元素个数
*/
private int size;
/**
* 出队头元素
* @return
*/
public Integer take() throws InterruptedException {
synchronized (this) {
if (size == 0) {
//队列为空
wait();
}
int ret = elem[head];
head++;
//作用等价于 head %= elem.length
if (head >= elem.length) {
head = 0;
}
size--;
notifyAll();
return ret;
}
}
/**
* 入队尾元素
* @param val
*/
public void put(int val) throws InterruptedException {
synchronized (this) {
while (size == elem.length) {
//队列满
wait();
}
elem[tail++] = val;
//作用等价于 tail %= elem.length
if (tail >= elem.length) {
tail = 0;
}
size++;
notifyAll();
}
}
}
基于模拟阻塞队列实现生产者-消费者模型
示例代码
import java.util.Random;
import java.util.concurrent.TimeUnit;
public class ProducerConsumerModel {
private static int count;
public static void main(String[] args) throws InterruptedException {
BlockingQueue blockingQue = new BlockingQueue();
Thread consumer = new Thread(() -> {
while (true) {
try {
Integer num = (Integer) blockingQue.take();
TimeUnit.MILLISECONDS.sleep(1000);
System.out.println("消费者消费了"+count+"个数据,"+"数据是:"+num);
} catch (InterruptedException e) {
throw new RuntimeException(e);
}
}
});
consumer.start();
Thread producer = new Thread(() -> {
while (true) {
Random rand = new Random();
try {
Integer num = rand.nextInt();
count++;
blockingQue.put(num);
TimeUnit.MILLISECONDS.sleep(1000);
System.out.println("生产者生产了"+count+"个数据,"+"数据是:"+num);
} catch (InterruptedException e) {
throw new RuntimeException(e);
}
}
});
producer.start();
consumer.join();
producer.join();
}
}
运行结果:
任务间使用管道进行输入/输出
Java 中以标准库的形式支持了对线程间的输入/输出。其中输出类库中的对应物是PipedWriter类,允许任务向管道写,输入类库中的对应物是PipedReader类,允许不同的任务从同一个管道中读取。管道基本上是一个阻塞队列,而任务间使用管道进行输入/输出,可以看做是生产者-消费者”问题的变体。
示例代码
下面是一个简单例子,两个任务使用一个管道进行通信。
Sender负责向管道写数据
import java.io.IOException;
import java.io.PipedReader;
import java.io.PipedWriter;
import java.util.Random;
import java.util.concurrent.TimeUnit;
public class Sender implements Runnable {
private Random random = new Random(47);
private PipedWriter out = new PipedWriter();
public PipedWriter getPipedWriter() {
return out;
}
@Override
public void run() {
try {
while (true) {
for (char c='A'; c <= 'Z'; c++) {
out.write(c);
TimeUnit.MILLISECONDS.sleep(random.nextInt(500));
}
}
} catch (IOException e) {
System.out.println(e+"Sender writer exception");
} catch (InterruptedException e) {
System.out.println(e+"Sender sleep exception");
}
}
}
Receiver负责向管道读数据
import java.io.IOException;
import java.io.PipedReader;
public class Receiver implements Runnable {
private PipedReader in;
public Receiver(Sender sender) throws IOException {
in = new PipedReader(sender.getPipedWriter());
}
@Override
public void run() {
try {
while (true) {
System.out.println("Read:"+(char)in.read());
}
} catch (IOException e) {
System.out.println(e+"Receiver read exception");
}
}
}
测试代码
import java.io.IOException;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.TimeUnit;
public class PipedIO {
public static void main(String[] args) throws IOException, InterruptedException {
Sender sender = new Sender();
Receiver receiver = new Receiver(sender);
ExecutorService exec = Executors.newCachedThreadPool();
exec.execute(sender);
exec.execute(receiver);
TimeUnit.SECONDS.sleep(6);
exec.shutdownNow();
}
}
运行结果:
Read:A
Read:B
Read:C
Read:D
Read:E
Read:F
Read:G
Read:H
Read:I
Read:J
Read:K
Read:L
Read:M
Read:N
Read:O
Read:P
Read:Q
Read:R
Read:S
Read:T
Read:U
Read:V
Read:W
Read:X
Read:Y
Read:Z
java.lang.InterruptedException: sleep interruptedSender sleep exception
java.io.InterruptedIOExceptionReceiver read exceptionProcess finished with exit code 0