线程间的通信机制与生产者消费者案例
线程间通信
为什么要处理线程间通信:
当我们需要多个线程
来共同完成一件任务,并且我们希望他们有规律的执行
,那么多线程之间需要一些通信机制,可以协调他们的工作,以此实现多线程共同操作一份数据。
比如:线程A用来生产包子,线程B用来吃包子,包子可以理解为同一资源,线程A与线程B处理的动作,一个是生产,一个是消费,此时B线程必须等A线程完成后才执行,那么线程A与线程B之间就需要线程通信,即——等待唤醒机制。
等待唤醒机制
这是多个线程间的一种协作机制
。谈到线程我们经常想到的是线程间的竞争(race)
,比如去争夺锁,但这并不是故事的全部,线程间也会有协作机制。
在一个线程满足某个条件时,就进入等待状态(wait() / wait(time)
),等待某个线程执行完他们的指定代码过后再将其唤醒(notify()
);或可以指定wait的时间,等时间到了自动唤醒;在有多个线程进行等待时,如果需要,可以使用notifyAll()
来唤醒所有的等待线程。wait/notify就是线程间的一种协作机制。
-
wait:线程不再活动,不再参与调度,进入
wait set
中,因此不会浪费CPU资源,也不会去竞争锁了,这时的线程状态是WAITING或TIMED-WAITING。他还要等着别的线程执行一个特别的动作
,也即“通知(notify)
”或者等待时间到,在这个对象上等待的线程从wait set中释放出来,重新进入到调度队列(ready queue)中。 -
notify:则选取所通知对象的wait set中的一个线程释放;
-
notifyAll:则释放所通知对象的wait set上的全部线程。
1. 线程间通信的理解 当我们`需要多个线程`来共同完成一件任务,并且我们希望他们`有规律的执行`,那么多线程之间需要一些通信机制, 可以协调他们的工作,以此实现多线程共同操作一份数据。 2. 涉及到三个方法的使用: wait(): 线程一旦执行此方法,就进入等待状态。同时,会释放对同步监视器的调用。 notify(): 一旦执行此方法,就会唤醒被wait()的线程中优先级最高的那一个线程。(如果被wait的多个线程的优先级相同, 则随机唤醒一个)。被唤醒的线程从当初被wait的位置继续执行。 notifyAll(): 一旦执行此方法,就会唤醒所有被wait的线程。 3.注意点: > 此三个方法的使用,必须在同步代码块或者同步方法中。 (超纲:Lock需要配合Condition实现线程间的通信) > 此三个方法的调用者,必须是同步监视器。否则会报IllegalMonitorStateException异常。 > 此三个方法声明在Object类中。(这样才能每个同步监视器都能调用的到,同步监视器又是任何唯一的对象) 4. 案例: 案例1:使用两个线程打印1-100.线程1,线程2 交替打印 案例2:生产者&消费者 生产者(Producer)将产品交给店员(Clerk),而消费者(Consumer)从店员处取走产品,店员一次只能持有 固定数量的产品(比如:20),如果生产者试图生产更多的产品,店员会叫生产者停一下,如果店中有空位放 产品了再通知生产者继续生产;如果店中没有产品了,店员会告诉消费者等一下,如果店中有产品了再通知消 费者来取走产品。 5. wait()和sleep()的区别? 相同点:一旦执行,当前线程都会进入阻塞状态 不同点: > 声明的位置:wait():声明在Object类中。 sleep():声明在Thread类中,静态的。 > 使用的场景不同:wait():只能使用在同步代码块或同步方法中。 sleep():可以在任何需要使用的场景。 > 使用在同步代码块或同步方法中:wait():一旦执行,会释放同步监视器。 sleep():一旦执行,不会释放同步监视器。 > 结束阻塞的方式:wait():到达结束时间,自动结束阻塞 或 通过被notify()唤醒,结束阻塞。 sleep():到达指定时间自动结束阻塞。
案例1:
package thread.demo05_communication; //案例1:使用两个线程打印1-100.线程1,线程2 交替打印 public class PrintNumberTest { public static void main(String[] args) { PrintNumber p = new PrintNumber(); Thread t1 = new Thread(p,"线程1"); Thread t2 = new Thread(p,"线程2"); t1.start(); t2.start(); } } class PrintNumber implements Runnable{ private int number = 1; Object obj = new Object(); @Override public void run() { while (true){ // synchronized (this) { synchronized (obj){ // this.notify(); obj.notify();//wait,notify,notifyAll这三个方法的调用必须是同步监视器。 if (number <= 100){ try { Thread.sleep(10); } catch (InterruptedException e) { e.printStackTrace(); } System.out.println(Thread.currentThread().getName() + ": " + number); number++; try { // this.wait(); obj.wait();//线程一旦执行此方法,就进入等待状态。同时,会释放对同步监视器的调用。 } catch (InterruptedException e) { e.printStackTrace(); } }else { break; } } } } }
输出:
案例2:
package thread.demo05_communication;
/**
* 案例2:生产者&消费者
* 生产者(Producer)将产品交给店员(Clerk),而消费者(Consumer)从店员处取走产品,店员一次只能持有
* 固定数量的产品(比如:20),如果生产者试图生产更多的产品,店员会叫生产者停一下,如果店中有空位放
* 产品了再通知生产者继续生产;如果店中没有产品了,店员会告诉消费者等一下,如果店中有产品了再通知消
* 费者来取走产品。
*
* 分析:
* 1. 是否是多线程问题?是,生产者,消费者
* 2. 是否有共享数据?有! 共享数据是:产品
* 3. 是否有线程安全问题? 有!因为有共享数据
* 4. 是否需要处理线程安全问题?是! 如何处理?使用同步机制
* 5. 是否存在线程间的通信? 存在。
*
*/
public class ProducerConsumerTest {
public static void main(String[] args) {
Clerk clerk = new Clerk();
Producer pro1 = new Producer(clerk);
Consumer con1 = new Consumer(clerk);
Consumer con2 = new Consumer(clerk);
pro1.setName("生产者1");
con1.setName("消费者1");
con2.setName("消费者2");
pro1.start();
con1.start();
con2.start();
}
}
class Clerk{ //店员
private int productNum = 0; //产品的数量
//增加产品数量的方法
public synchronized void addProduct(){
if (productNum >= 20){
//等待
try {
wait();
} catch (InterruptedException e) {
e.printStackTrace();
}
}else {
productNum++;
System.out.println(Thread.currentThread().getName() + "生产了第" + productNum + "个产品");
//唤醒
notifyAll();
}
}
//减少产品数量的方法
public synchronized void minusProduct(){
if (productNum <= 0){
try {
wait();
} catch (InterruptedException e) {
e.printStackTrace();
}
}else {
System.out.println(Thread.currentThread().getName() + "消费了第" + productNum + "个产品");
productNum--;
//唤醒
notifyAll();
}
}
}
class Producer extends Thread{ //生产者
private Clerk clerk;
public Producer(Clerk clerk){
this.clerk = clerk;
}
@Override
public void run() {
while (true){
System.out.println("生产者开始生产产品...");
try {
Thread.sleep(200);
} catch (InterruptedException e) {
e.printStackTrace();
}
clerk.addProduct();
}
}
}
class Consumer extends Thread{ //消费者
private Clerk clerk;
public Consumer(Clerk clerk){
this.clerk = clerk;
}
@Override
public void run() {
while (true){
System.out.println("消费者开始消费产品...");
try {
Thread.sleep(300);
} catch (InterruptedException e) {
e.printStackTrace();
}
clerk.minusProduct();
}
}
}
输出: