剑指JUC原理-6.wait notify

news2024/12/25 1:05:52

wait nofity

小故事 - 为什么需要 wait

由于条件不满足,小南不能继续进行计算

但小南如果一直占用着锁,其它人就得一直阻塞,效率太低

在这里插入图片描述

于是老王单开了一间休息室(调用 wait 方法),让小南到休息室(WaitSet)等着去了,但这时锁释放开,
其它人可以由老王随机安排进屋

直到小M将烟送来,大叫一声 [ 你的烟到了 ] (调用 notify 方法)

在这里插入图片描述

小南于是可以离开休息室,重新进入竞争锁的队列

在这里插入图片描述

原理之 wait / notify

在这里插入图片描述

  • Owner 线程发现条件不满足,调用 wait 方法,即可进入 WaitSet 变为 WAITING 状态
  • BLOCKED 和 WAITING 的线程都处于阻塞状态,不占用 CPU 时间片
  • BLOCKED 线程会在 Owner 线程释放锁时唤醒
  • WAITING 线程会在 Owner 线程调用 notify 或 notifyAll 时唤醒,但唤醒后并不意味者立刻获得锁,仍需进入
    EntryList 重新竞争

API 介绍

obj.wait() 让进入 object 监视器的线程到 waitSet 等待

obj.notify() 在 object 上正在 waitSet 等待的线程中随机挑一个唤醒

obj.notifyAll() 让 object 上正在 waitSet 等待的线程全部唤醒

它们都是线程之间进行协作的手段,都属于 Object 对象的方法。必须获得此对象的锁,才能调用这几个方法

final static Object obj = new Object();
public static void main(String[] args) {
 new Thread(() -> {
 synchronized (obj) {
 log.debug("执行....");
 try {
 obj.wait(); // 让线程在obj上一直等待下去
 } catch (InterruptedException e) {
 e.printStackTrace();
 }
 log.debug("其它代码....");
 }
 }).start();
 new Thread(() -> {
 synchronized (obj) {
 log.debug("执行....");
 try {
 obj.wait(); // 让线程在obj上一直等待下去
 } catch (InterruptedException e) {
 e.printStackTrace();
 }
 log.debug("其它代码....");
 }
 }).start();
 // 主线程两秒后执行
 sleep(2);
 log.debug("唤醒 obj 上其它线程");
 synchronized (obj) {
 obj.notify(); // 唤醒obj上一个线程
 // obj.notifyAll(); // 唤醒obj上所有等待线程
 }
}

notify 的一种结果

20:00:53.096 [Thread-0] c.TestWaitNotify - 执行.... 
20:00:53.099 [Thread-1] c.TestWaitNotify - 执行.... 
20:00:55.096 [main] c.TestWaitNotify - 唤醒 obj 上其它线程
20:00:55.096 [Thread-0] c.TestWaitNotify - 其它代码.... 

notifyAll 的结果

19:58:15.457 [Thread-0] c.TestWaitNotify - 执行.... 
19:58:15.460 [Thread-1] c.TestWaitNotify - 执行.... 
19:58:17.456 [main] c.TestWaitNotify - 唤醒 obj 上其它线程
19:58:17.456 [Thread-1] c.TestWaitNotify - 其它代码.... 
19:58:17.456 [Thread-0] c.TestWaitNotify - 其它代码.... 

wait() 方法会释放对象的锁,进入 WaitSet 等待区,从而让其他线程就机会获取对象的锁。无限制等待,直到
notify 为止

wait(long n) 有时限的等待, 到 n 毫秒后结束等待(如果等待期间,被其他线程唤醒了,就不会继续等待了),或是被 notify

进入wait源码查看:

在这里插入图片描述

无参方法相当于调用了一个wait(0)

wait notify 的正确姿势

开始之前先看看

sleep(long n) 和 wait(long n) 的区别

  • sleep 是 Thread 方法,而 wait 是 Object 的方法
  • sleep 不需要强制和 synchronized 配合使用,但 wait 需要和 synchronized 一起用
  • sleep 在睡眠的同时,不会释放对象锁的,但 wait 在等待的时候会释放对象锁
  • 它们状态都是 TIMED_WAITING

以一个例子来展示:

static final Object lock = new Object();
    public static void main(String[] args) throws InterruptedException {
        new Thread(() -> {
            synchronized (lock) {

                System.out.println("子线程1获得锁");
                try {
//                    Thread.sleep(20000);
                    // 如果使用sleep 那么主线程不能释放锁
                    lock.wait(20000);
                } catch (InterruptedException e) {
                    e.printStackTrace();
                }
            }
        }, "t1").start();

        Thread.sleep(1000);
        synchronized (lock) {
            System.out.println("子线程2获得锁");
        }
    }

输出:

如果使用Thread.sleep(20000);

输出 子线程1获得锁 20s后输出 子线程2获得锁

如果使用lock.wait(20000);
输出 子线程1获得锁 1s后输出 子线程2获得锁

step 1

static final Object room = new Object();
static boolean hasCigarette = false;
static boolean hasTakeout = false;

思考下面的解决方案好不好,为什么?

new Thread(() -> {
 synchronized (room) {
 log.debug("有烟没?[{}]", hasCigarette);
 if (!hasCigarette) {
 log.debug("没烟,先歇会!");
 sleep(2);
 }
 log.debug("有烟没?[{}]", hasCigarette);
 if (hasCigarette) {
 log.debug("可以开始干活了");
 }
 }
}, "小南").start();
for (int i = 0; i < 5; i++) {
 new Thread(() -> {
 synchronized (room) {
 log.debug("可以开始干活了");
 }
 }, "其它人").start();
}
sleep(1);
new Thread(() -> {
 // 这里能不能加 synchronized (room)?
 hasCigarette = true;
 log.debug("烟到了噢!");
}, "送烟的").start()

输出

20:49:49.883 [小南] c.TestCorrectPosture - 有烟没?[false] 
20:49:49.887 [小南] c.TestCorrectPosture - 没烟,先歇会!
20:49:50.882 [送烟的] c.TestCorrectPosture - 烟到了噢!
20:49:51.887 [小南] c.TestCorrectPosture - 有烟没?[true] 
20:49:51.887 [小南] c.TestCorrectPosture - 可以开始干活了
20:49:51.887 [其它人] c.TestCorrectPosture - 可以开始干活了
20:49:51.887 [其它人] c.TestCorrectPosture - 可以开始干活了
20:49:51.888 [其它人] c.TestCorrectPosture - 可以开始干活了
20:49:51.888 [其它人] c.TestCorrectPosture - 可以开始干活了
20:49:51.888 [其它人] c.TestCorrectPosture - 可以开始干活了
  • 其它干活的线程,都要一直阻塞,效率太低
  • 小南线程必须睡足 2s 后才能醒来,就算烟提前送到,也无法立刻醒来
  • 加了 synchronized (room) 后,就好比小南在里面反锁了门睡觉,烟根本没法送进门,main 没加
    synchronized 就好像 main 线程是翻窗户进来的
  • 解决方法,使用 wait - notify 机制

step 2

思考下面的实现行吗,为什么?

new Thread(() -> {
 synchronized (room) {
 log.debug("有烟没?[{}]", hasCigarette);
 if (!hasCigarette) {
 log.debug("没烟,先歇会!");
 try {
 room.wait(2000);
 } catch (InterruptedException e) {
 e.printStackTrace();
 }
 }
 log.debug("有烟没?[{}]", hasCigarette);
 if (hasCigarette) {
 log.debug("可以开始干活了");
 }
 }
}, "小南").start();
for (int i = 0; i < 5; i++) {
 new Thread(() -> {
 synchronized (room) {
 log.debug("可以开始干活了");
 }
 }, "其它人").start();
}
sleep(1);
new Thread(() -> {
 synchronized (room) {
 hasCigarette = true;
 log.debug("烟到了噢!");
 room.notify();
 }
}, "送烟的").start();

输出:

20:51:42.489 [小南] c.TestCorrectPosture - 有烟没?[false] 
20:51:42.493 [小南] c.TestCorrectPosture - 没烟,先歇会!
20:51:42.493 [其它人] c.TestCorrectPosture - 可以开始干活了
20:51:42.493 [其它人] c.TestCorrectPosture - 可以开始干活了
20:51:42.494 [其它人] c.TestCorrectPosture - 可以开始干活了
20:51:42.494 [其它人] c.TestCorrectPosture - 可以开始干活了
20:51:42.494 [其它人] c.TestCorrectPosture - 可以开始干活了
20:51:43.490 [送烟的] c.TestCorrectPosture - 烟到了噢!
20:51:43.490 [小南] c.TestCorrectPosture - 有烟没?[true] 
20:51:43.490 [小南] c.TestCorrectPosture - 可以开始干活了
  • 解决了其它干活的线程阻塞的问题
  • 但如果有其它线程也在等待条件呢?

step 3

new Thread(() -> {
 synchronized (room) {
 log.debug("有烟没?[{}]", hasCigarette);
 if (!hasCigarette) {
 log.debug("没烟,先歇会!");
 try {
 room.wait();
 } catch (InterruptedException e) {
 e.printStackTrace();
 }
 }
 log.debug("有烟没?[{}]", hasCigarette);
 if (hasCigarette) {
 log.debug("可以开始干活了");
 } else {
 log.debug("没干成活...");
 }
 }
}, "小南").start();

new Thread(() -> {
 synchronized (room) {
 Thread thread = Thread.currentThread();
 log.debug("外卖送到没?[{}]", hasTakeout);
 if (!hasTakeout) {
 log.debug("没外卖,先歇会!");
 try {
 room.wait();
 } catch (InterruptedException e) {
 e.printStackTrace();
 }
 }
 log.debug("外卖送到没?[{}]", hasTakeout);
 if (hasTakeout) {
 log.debug("可以开始干活了");
 } else {
 log.debug("没干成活...");
 }
 }
}, "小女").start();

sleep(1);

new Thread(() -> {
 synchronized (room) {
 hasTakeout = true;
 log.debug("外卖到了噢!");
 room.notify();
 }
}, "送外卖的").start();

输出

20:53:12.173 [小南] c.TestCorrectPosture - 有烟没?[false] 
20:53:12.176 [小南] c.TestCorrectPosture - 没烟,先歇会!
20:53:12.176 [小女] c.TestCorrectPosture - 外卖送到没?[false] 
20:53:12.176 [小女] c.TestCorrectPosture - 没外卖,先歇会!
20:53:13.174 [送外卖的] c.TestCorrectPosture - 外卖到了噢!
20:53:13.174 [小南] c.TestCorrectPosture - 有烟没?[false] 
20:53:13.174 [小南] c.TestCorrectPosture - 没干成活... 
  • notify 只能随机唤醒一个 WaitSet 中的线程,这时如果有其它线程也在等待,那么就可能唤醒不了正确的线
    程,称之为**【虚假唤醒】**
  • 解决方法,改为 notifyAll

step 4

new Thread(() -> {
 synchronized (room) {
 hasTakeout = true;
 log.debug("外卖到了噢!");
 room.notifyAll();
 }
}, "送外卖的").start();

输出

20:55:23.978 [小南] c.TestCorrectPosture - 有烟没?[false] 
20:55:23.982 [小南] c.TestCorrectPosture - 没烟,先歇会!
20:55:23.982 [小女] c.TestCorrectPosture - 外卖送到没?[false] 
20:55:23.982 [小女] c.TestCorrectPosture - 没外卖,先歇会!
20:55:24.979 [送外卖的] c.TestCorrectPosture - 外卖到了噢!
20:55:24.979 [小女] c.TestCorrectPosture - 外卖送到没?[true] 
20:55:24.980 [小女] c.TestCorrectPosture - 可以开始干活了
20:55:24.980 [小南] c.TestCorrectPosture - 有烟没?[false] 
20:55:24.980 [小南] c.TestCorrectPosture - 没干成活... 
  • 用 notifyAll 仅解决某个线程的唤醒问题,但使用 if + wait 判断仅有一次机会,一旦条件不成立,就没有重新
    判断的机会了
  • 解决方法,用 while + wait,当条件不成立,再次 wait

step 5

将 if 改为 while

if (!hasCigarette) {
 log.debug("没烟,先歇会!");
 try {
 room.wait();
 } catch (InterruptedException e) {
 e.printStackTrace();
 }
}

改动后

while (!hasCigarette) {
 log.debug("没烟,先歇会!");
 try {
 room.wait();
 } catch (InterruptedException e) {
 e.printStackTrace();
 }
}

输出

20:58:34.322 [小南] c.TestCorrectPosture - 有烟没?[false] 
20:58:34.326 [小南] c.TestCorrectPosture - 没烟,先歇会!
20:58:34.326 [小女] c.TestCorrectPosture - 外卖送到没?[false] 
20:58:34.326 [小女] c.TestCorrectPosture - 没外卖,先歇会!
20:58:35.323 [送外卖的] c.TestCorrectPosture - 外卖到了噢!
20:58:35.324 [小女] c.TestCorrectPosture - 外卖送到没?[true] 
20:58:35.324 [小女] c.TestCorrectPosture - 可以开始干活了
20:58:35.324 [小南] c.TestCorrectPosture - 没烟,先歇会!

模板:

synchronized(lock) {
 while(条件不成立) {
 lock.wait();
 }
 // 干活
}
//另一个线程
synchronized(lock) {
 lock.notifyAll();
}

同步模式之保护性暂停

定义

即 Guarded Suspension,用在一个线程等待另一个线程的执行结果

要点:

  • 有一个结果需要从一个线程传递到另一个线程,让他们关联同一个 GuardedObject
  • 如果有结果不断从一个线程到另一个线程那么可以使用消息队列(见生产者/消费者)
  • JDK 中,join 的实现、Future 的实现,采用的就是此模式
  • 因为要等待另一方的结果,因此归类到同步模式

在这里插入图片描述

实现

class GuardedObject {
 	private Object response;
 	private final Object lock = new Object();
 	public Object get() {
			synchronized (lock) {
			// 条件不满足则等待
				while (response == null) {
					try {
						lock.wait();
					}catch (InterruptedException e) {
						e.printStackTrace();
					}
				}
			return response;
		}
	}
 	public void complete(Object response) {
		synchronized (lock) {
			// 条件满足,通知等待线程
			this.response = response;
			lock.notifyAll();
		}
 	}
}

应用

一个线程等待另一个线程的执行结果

join的局限性是只能等第二个线程结束

用join那种办法,等待结果那个变量只能设计成全局的,而使用保护性暂停的话可以设计成局部的

public static void main(String[] args) {
 GuardedObject guardedObject = new GuardedObject();
 new Thread(() -> {
	try {
		// 子线程执行下载
		List<String> response = download();// 执行下载操作,去完成网页的信息
		log.debug("download complete...");
		guardedObject.complete(response);
	} catch (IOException e) {
		e.printStackTrace();
	}
 }).start();
 log.debug("waiting...");
 // 主线程阻塞等待
 Object response = guardedObject.get();
 log.debug("get response: [{}] lines", ((List<String>) response).size());
}

执行结果:

08:42:18.568 [main] c.TestGuardedObject - waiting...
08:42:23.312 [Thread-0] c.TestGuardedObject - download complete...
08:42:23.312 [main] c.TestGuardedObject - get response: [3] lines

可以看到,是存在等待时间的,大概5s左右

带超时版 GuardedObject

如果要控制超时时间呢

class GuardedObjectV2 {
 	private Object response;
 	private final Object lock = new Object();
 	public Object get(long millis) {
		synchronized (lock) {
				// 1) 记录最初时间
				long begin = System.currentTimeMillis();
				// 2) 已经经历的时间
				long timePassed = 0;
				while (response == null) {
					// 4) 假设 millis 是 1000,结果在 400 时唤醒了,那么还有 600 要等
					long waitTime = millis - timePassed;
					log.debug("waitTime: {}", waitTime);
					if (waitTime <= 0) {
						log.debug("break...");
						break;
					}
					try {
						lock.wait(waitTime);
					} catch (InterruptedException e) {
						e.printStackTrace();
					}
					// 3) 如果提前被唤醒,这时已经经历的时间假设为 400
					timePassed = System.currentTimeMillis() - begin;
					log.debug("timePassed: {}, object is null {}", timePassed, response == null);
				}
			return response;
		}
 	}
 	public void complete(Object response) {
		synchronized (lock) {
			// 条件满足,通知等待线程
			this.response = response;
			log.debug("notify...");
			lock.notifyAll();
		}
 	}
}

测试,没有超时

		public static void main(String[] args) {
            GuardedObjectV2 v2 = new GuardedObjectV2();
            new Thread(() -> {
                sleep(1);
                v2.complete(null);
                sleep(1);
                v2.complete(Arrays.asList("a", "b", "c"));
            }).start();
            Object response = v2.get(2500);
            if (response != null) {
                log.debug("get response: [{}] lines", ((List<String>) response).size());
            } else {
                log.debug("can't get response");
            }
        }

输出:

08:49:39.917 [main] c.GuardedObjectV2 - waitTime: 2500
08:49:40.917 [Thread-0] c.GuardedObjectV2 - notify...
08:49:40.917 [main] c.GuardedObjectV2 - timePassed: 1003, object is null true
08:49:40.917 [main] c.GuardedObjectV2 - waitTime: 1497
08:49:41.918 [Thread-0] c.GuardedObjectV2 - notify...
08:49:41.918 [main] c.GuardedObjectV2 - timePassed: 2004, object is null false
08:49:41.918 [main] c.TestGuardedObjectV2 - get response: [3] lines

测试,超时

// 等待时间不足
List<String> lines = v2.get(1500);

输出:

08:47:54.963 [main] c.GuardedObjectV2 - waitTime: 1500
08:47:55.963 [Thread-0] c.GuardedObjectV2 - notify...
08:47:55.963 [main] c.GuardedObjectV2 - timePassed: 1002, object is null true
08:47:55.963 [main] c.GuardedObjectV2 - waitTime: 498
08:47:56.461 [main] c.GuardedObjectV2 - timePassed: 1500, object is null true
08:47:56.461 [main] c.GuardedObjectV2 - waitTime: 0
08:47:56.461 [main] c.GuardedObjectV2 - break...
08:47:56.461 [main] c.TestGuardedObjectV2 - can't get response
08:47:56.963 [Thread-0] c.GuardedObjectV2 - notify...

join原理

保护性暂停,是一个线程等待另一个线程的结果

而join是一个线程等待另一个线程的结束

进入源码查看 thread.join 和 带超时版 GuardedObject 有异曲同工之妙

在这里插入图片描述

进入源码可以发现,join的底层也是使用的保护性暂停的手段

是调用者轮询检查线程 alive 状态

t1.join();

等价于下面的代码

synchronized (t1) {
 // 调用者线程进入 t1 的 waitSet 等待, 直到 t1 运行结束
 while (t1.isAlive()) {
 t1.wait(0);
 }
}

多任务版 GuardedObject

图中 Futures 就好比居民楼一层的信箱(每个信箱有房间编号),左侧的 t0,t2,t4 就好比等待邮件的居民,右
侧的 t1,t3,t5 就好比邮递员

如果需要在多个类之间使用 GuardedObject 对象,作为参数传递不是很方便,因此设计一个用来解耦的中间类,
这样不仅能够解耦【结果等待者】和【结果生产者】,还能够同时支持多个任务的管理

在这里插入图片描述

新增 id 用来标识 Guarded Object

class GuardedObject {
    // 标识 Guarded Object
    private int id;
    public GuardedObject(int id) {
        this.id = id;
    }
    public int getId() {
        return id;
    }
    // 结果
    private Object response;
    // 获取结果
    // timeout 表示要等待多久 2000
    public Object get(long timeout) {
        synchronized (this) {
            // 开始时间 15:00:00
            long begin = System.currentTimeMillis();
            // 经历的时间
            long passedTime = 0;
            while (response == null) {
                // 这一轮循环应该等待的时间
                long waitTime = timeout - passedTime;
                // 经历的时间超过了最大等待时间时,退出循环
                if (timeout - passedTime <= 0) {
                    break;
                }
                try {
                    this.wait(waitTime); // 虚假唤醒 15:00:01
                } catch (InterruptedException e) {
                    e.printStackTrace();
                }
                // 求得经历时间
                passedTime = System.currentTimeMillis() - begin; // 15:00:02 1s
            }
            return response;
        }
    }
    // 产生结果
    public void complete(Object response) {
        synchronized (this) {
            // 给结果成员变量赋值
            this.response = response;
            this.notifyAll();
        }
    }
}

中间解耦类

class Mailboxes {
    private static Map<Integer, GuardedObject> boxes = new Hashtable<>();
    private static int id = 1;
    // 产生唯一 id
    private static synchronized int generateId() {
        return id++;
    }
    public static GuardedObject getGuardedObject(int id) {
        return boxes.remove(id);
    }
    public static GuardedObject createGuardedObject() {
        GuardedObject go = new GuardedObject(generateId());
        boxes.put(go.getId(), go);
        return go;
    }
    public static Set<Integer> getIds() {
        return boxes.keySet();
    }
}

其中中间解耦类中的共享资源要设置成线程安全的,如synchronizedHashtable

业务相关类

class People extends Thread{
 @Override
 public void run() {
 // 收信
 GuardedObject guardedObject = Mailboxes.createGuardedObject();
 log.debug("开始收信 id:{}", guardedObject.getId());
 Object mail = guardedObject.get(5000);
 log.debug("收到信 id:{}, 内容:{}", guardedObject.getId(), mail);
 }
}

class Postman extends Thread {
 private int id;
 private String mail;
 public Postman(int id, String mail) {
 this.id = id;
 this.mail = mail;
 }
 @Override
 public void run() {
 GuardedObject guardedObject = Mailboxes.getGuardedObject(id);
 log.debug("送信 id:{}, 内容:{}", id, mail);
 guardedObject.complete(mail);
 }
}

测试

public static void main(String[] args) throws InterruptedException {
 for (int i = 0; i < 3; i++) {
 new People().start();
 }
 Sleeper.sleep(1);
 for (Integer id : Mailboxes.getIds()) {
 new Postman(id, "内容" + id).start();
 }
}

某次运行结果

10:35:05.689 c.People [Thread-1] - 开始收信 id:3
10:35:05.689 c.People [Thread-2] - 开始收信 id:1
10:35:05.689 c.People [Thread-0] - 开始收信 id:2
10:35:06.688 c.Postman [Thread-4] - 送信 id:2, 内容:内容2
10:35:06.688 c.Postman [Thread-5] - 送信 id:1, 内容:内容1
10:35:06.688 c.People [Thread-0] - 收到信 id:2, 内容:内容2
10:35:06.688 c.People [Thread-2] - 收到信 id:1, 内容:内容1
10:35:06.688 c.Postman [Thread-3] - 送信 id:3, 内容:内容3
10:35:06.689 c.People [Thread-1] - 收到信 id:3, 内容:内容3

异步模式之生产者/消费者

之所以是异步,是因为,产生了一个消息,并不能立刻被消费。而保护性暂停是同步的

定义

要点

  • 与前面的保护性暂停中的 GuardObject 不同,不需要产生结果和消费结果的线程一一对应
  • 消费队列可以用来平衡生产和消费的线程资源
  • 生产者仅负责产生结果数据,不关心数据该如何处理,而消费者专心处理结果数据
  • 消息队列是有容量限制的,满时不会再加入数据,空时不会再消耗数据
  • JDK 中各种阻塞队列,采用的就是这种模式

在这里插入图片描述

实现

class Message {
    private int id;
    private Object message;
    public Message(int id, Object message) {
        this.id = id;
        this.message = message;
    }
    public int getId() {
        return id;
    }
    public Object getMessage() {
        return message;
    }
}
class MessageQueue {
    private LinkedList<Message> queue;
    private int capacity;
    public MessageQueue(int capacity) {
        this.capacity = capacity;
        queue = new LinkedList<>();
    }
    public Message take() {
        synchronized (queue) {
            while (queue.isEmpty()) {
                log.debug("没货了, wait");
                try {
                    queue.wait();
                } catch (InterruptedException e) {
                    e.printStackTrace();
                }
            }
            Message message = queue.removeFirst();
            queue.notifyAll();
            return message;
        }
    }
    public void put(Message message) {
        synchronized (queue) {
            while (queue.size() == capacity) {
                log.debug("库存已达上限, wait");
                try {
                    queue.wait();
                } catch (InterruptedException e) {
                    e.printStackTrace();
                }
            }
            queue.addLast(message);
            queue.notifyAll();
        }
    }
}

应用

public static void main(String[] args) {
        MessageQueue messageQueue = new MessageQueue(2);
// 4 个生产者线程, 下载任务
        for (int i = 0; i < 4; i++) {
            int id = i;
            new Thread(() -> {
                try {

                    System.out.println("download...");
                    List<String> response = Downloader.download();
                    System.out.println("try put message()"+ id);
                    messageQueue.put(new Message(id, response));
                } catch (InterruptedException e) {
                    e.printStackTrace();
                }
            }, "生产者" + i).start();
        }
// 1 个消费者线程, 处理结果
        new Thread(() -> {
            while (true) {
                Message message = messageQueue.take();
                List<String> response = (List<String>) message.getMessage();
                System.out.println("take message({}):" + message.getId());
            }
        }, "消费者").start();

    }
10:48:38.070 [生产者3] c.TestProducerConsumer - download...
10:48:38.070 [生产者0] c.TestProducerConsumer - download...
10:48:38.070 [消费者] c.MessageQueue - 没货了, wait
10:48:38.070 [生产者1] c.TestProducerConsumer - download...
10:48:38.070 [生产者2] c.TestProducerConsumer - download...
10:48:41.236 [生产者1] c.TestProducerConsumer - try put message(1)
10:48:41.237 [生产者2] c.TestProducerConsumer - try put message(2)
10:48:41.236 [生产者0] c.TestProducerConsumer - try put message(0)
10:48:41.237 [生产者3] c.TestProducerConsumer - try put message(3)
10:48:41.239 [生产者2] c.MessageQueue - 库存已达上限, wait
10:48:41.240 [生产者1] c.MessageQueue - 库存已达上限, wait
10:48:41.240 [消费者] c.TestProducerConsumer - take message(0): [3] lines
10:48:41.240 [生产者2] c.MessageQueue - 库存已达上限, wait
10:48:41.240 [消费者] c.TestProducerConsumer - take message(3): [3] lines
10:48:41.240 [消费者] c.TestProducerConsumer - take message(1): [3] lines
10:48:41.240 [消费者] c.TestProducerConsumer - take message(2): [3] lines
10:48:41.240 [消费者] c.MessageQueue - 没货了, wait

多加几个消费者线程本质上也是一样的。

Park & Unpark

基本使用

它们是 LockSupport 类中的方法

// 暂停当前线程
LockSupport.park(); 

// 恢复某个线程的运行
LockSupport.unpark(暂停线程对象)

先 park 再 unpark

Thread t1 = new Thread(() -> {
 log.debug("start...");
 sleep(1);
 log.debug("park...");
 LockSupport.park();
 log.debug("resume...");
},"t1");
t1.start();
sleep(2);
log.debug("unpark...");
LockSupport.unpark(t1);

输出

18:42:52.585 c.TestParkUnpark [t1] - start... 
18:42:53.589 c.TestParkUnpark [t1] - park... 
18:42:54.583 c.TestParkUnpark [main] - unpark... 
18:42:54.583 c.TestParkUnpark [t1] - resume... 

先 unpark 再 park

Thread t1 = new Thread(() -> {
 log.debug("start...");
 sleep(2);
 log.debug("park...");
 LockSupport.park();
 log.debug("resume...");
}, "t1");
t1.start();
sleep(1);
log.debug("unpark...");
LockSupport.unpark(t1);

输出

18:43:50.765 c.TestParkUnpark [t1] - start... 
18:43:51.764 c.TestParkUnpark [main] - unpark... 
18:43:52.769 c.TestParkUnpark [t1] - park... 
18:43:52.769 c.TestParkUnpark [t1] - resume... 

特点

与 Object 的 wait & notify 相比

  • wait,notify 和 notifyAll 必须配合 Object Monitor (必须先获得monitor锁才可以)一起使用,而 park,unpark 不必
  • park & unpark 是以线程为单位来【阻塞】和【唤醒】线程,而 notify 只能随机唤醒一个等待线程,notifyAll
    是唤醒所有等待线程,就不那么【精确】
  • park & unpark 可以先 unpark,而 wait & notify 不能先 notify

原理之 park & unpark

每个线程都有自己的一个 Parker 对象,由三部分组成 _counter , _cond 和 _mutex

原理分为两种情况

先park再unpark

在这里插入图片描述

  1. 当前线程调用 Unsafe.park() 方法
  2. 检查 _counter ,本情况为 0,这时,获得 _mutex 互斥锁
  3. 线程进入 _cond 条件变量阻塞
  4. 设置 _counter = 0

在这里插入图片描述

  1. 调用 Unsafe.unpark(Thread_0) 方法,设置 _counter 为 1
  2. 唤醒 _cond 条件变量中的 Thread_0
  3. Thread_0 恢复运行
  4. 设置 _counter 为 0
先unpark后park

在这里插入图片描述

  1. 调用 Unsafe.unpark(Thread_0) 方法,设置 _counter 为 1
  2. 当前线程调用 Unsafe.park() 方法
  3. 检查 _counter ,本情况为 1,这时线程无需阻塞,继续运行
  4. 设置 _counter 为 0

注意:多次调用unpark也仅仅只是将 counter设置为1

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.coloradmin.cn/o/1147714.html

如若内容造成侵权/违法违规/事实不符,请联系多彩编程网进行投诉反馈,一经查实,立即删除!

相关文章

C++可视化 有穷自动机NFA 有穷自动机DFA

一、项目介绍 根据正则表达式,可视化显示NFA&#xff0c;DFA&#xff1b;词法分析程序 二、项目展示

洛谷 B2007 A+B问题 C++代码

目录 题目描述 AC Code 题目描述 AC Code #include<bits/stdc.h> using namespace std; typedef long long ll; int main() { int a,b;cin>>a>>b;cout<<ab<<endl;return 0; }

F5修复了允许远程代码执行攻击的BIG-IP认证绕过漏洞

图片 导语 近日&#xff0c;网络安全公司Praetorian Security的研究人员发现了一项影响F5 BIG-IP配置工具的严重漏洞&#xff0c;该漏洞被命名为CVE-2023-46747。攻击者可以通过远程访问配置工具来执行未经身份验证的远程代码&#xff0c;从而对系统进行攻击。本文将详细介绍该…

基于 Python 的个性化电影推荐系统的研究与实现

1 简介 本毕业设计的内容是设计并且实现一个电影个性化推荐系统。它是在Windows下&#xff0c;以MYSQL为数据库开发平台&#xff0c;Python技术和Tomcat网络信息服务作为应用服务器。电影个性化推荐系统的功能已基本实现&#xff0c;主要实现首页&#xff0c;个人中心&#xf…

热力学第三定律

热力学第三定律能斯特定理 凝聚系的熵在等温过程中的改变随绝对温度趋于零 如果有 即有&#xff1a; 也就是说&#xff1a;当绝对温度趋于零时&#xff0c;熵和状态参量y无关。 普朗克绝对熵 当绝对温度趋于零时&#xff0c;一个化学均匀系统的熵趋向于一个极限值 也就是说&a…

【数据结构】面试OJ题——时间复杂度2

目录 一&#xff1a;移除元素 思路&#xff1a; 二&#xff1a;删除有序数组中的重复项 思路&#xff1a; 三&#xff1a;合并两个有序数组 思路1&#xff1a; 什么&#xff1f;你不知道qsort&#xff08;&#xff09; 思路2&#xff1a; 一&#xff1a;移除元素 27. 移…

离线语音通断器开发-稳定之后顺应新需求

使用云知声的US516p6方案开发了一系列的离线语音通断器&#xff0c;目前已经取得了不小的收获&#xff0c;有1路的&#xff0c;3路的&#xff0c;4路的&#xff0c;唛头和扬声器包括唛头线材也在不断的更新打磨中找到了效果特别好的供应商。 离线语音通断器&#xff0c;家用控…

【OpenCV实现平滑图像金字塔,轮廓:入门】

文章目录 概要图像金字塔轮廓&#xff1a;入门 概要 文章内容的概要&#xff1a; 平滑图像金字塔&#xff1a; 图像金字塔是什么&#xff1f; 图像金字塔是指将原始图像按照不同的分辨率进行多次缩小&#xff08;下采样&#xff09;得到的一系列图像。这种处理方式常用于图像…

openpnp - Warning - Unknown firmware

文章目录 openpnp - Warning - Unknown firmware概述笔记https://github.com/openpnp/openpnp/wiki/Motion-Controller-Firmwares备注END openpnp - Warning - Unknown firmware 概述 接上飞达控制板后, 显示未知固件的警告 开始没看源码之前, 总以为是回答的版本号不合适, …

L2-1 插松枝

L2-1 插松枝 分数 25 全屏浏览题目 切换布局 作者 陈越 单位 浙江大学 人造松枝加工场的工人需要将各种尺寸的塑料松针插到松枝干上&#xff0c;做成大大小小的松枝。他们的工作流程&#xff08;并不&#xff09;是这样的&#xff1a; 每人手边有一只小盒子&#xff0c;初始…

【算法-数组2】有序数组的平方 和 长度最小的子数组

今天&#xff0c;带来数组相关算法的讲解。文中不足错漏之处望请斧正&#xff01; 理论基础点这里 有序数组的平方 给你一个按 非递减顺序 排序的整数数组 nums&#xff0c;返回 每个数字的平方 组成的新数组&#xff0c;要求也按 非递减顺序 排序。 示例 1&#xff1a; 输…

多线程---阻塞队列+生产者消费者模型

文章目录 阻塞队列自己实现一个阻塞队列&#xff08;三步&#xff09;标准库中的阻塞队列使用阻塞队列的优势 生产者消费者模型 阻塞队列 队列&#xff08;Queue&#xff09;是我们熟悉的一个数据结构&#xff0c;它是“先进先出”的。但是并不是所有的队列都是“先进先出”的…

椭圆曲线在SM2加解密中的应用(三)

一、SM2加密运算 1.1加密原始数据 SM2加密运算首先是用户A对数据加密,用户A拥有原始数据 椭圆曲线系统参数长度为klen比特的消息M公钥Pb椭圆曲线系统参数,已经在 椭圆曲线参数(二)中详细介绍;M就是需要加密消息,长度为klen; 1.1.1 公钥Pb的计算方式 公钥Pb=dBG,其中…

【MySQL--->复合查询】

文章目录 [TOC](文章目录) 一、基本查询二、多表查询三、自连接四、子查询1. 单行子查询2. 多行查询3.多列子查询4.在from语句中使用子查询5.合并查询 一、基本查询 查询工资高于500或岗位为MANAGER的雇员&#xff0c;同时还要满足他们的姓名首字母为大写的J 按照部门号升序而…

C++进阶篇3---二叉搜索树(Binary Search Tree)

一、二叉搜索树的概念 二叉搜索树又称二叉排序树&#xff0c;它或者是一棵空树&#xff0c;或者是具有以下性质的二叉树: 若它的左子树不为空&#xff0c;则左子树上所有节点的值都小于根节点的值 若它的右子树不为空&#xff0c;则右子树上所有节点的值都大于根节点的值 它的…

java之输入与输出的详细介绍

文章目录 输出的相关格式使用 Scanner 类进行控制台输入步骤&#xff1a;示例&#xff1a; 如何格式化输出&#xff1f;1. 使用 System.out.printf2. 使用 String.format printf与println 的区别printfprintln主要区别&#xff1a; 输出的相关格式 控制台输入是指通过命令行或…

JAVA中的垃圾回收器(2)------G1

一)G1垃圾回收器:-XX:UseG1GC:使用G1收集器 1)垃圾收集器迭代停顿时间越少越好&#xff0c;但是垃圾回收的总时间会增多&#xff0c;默认暂停时间默认是200ms&#xff0c;G1的内部底层算法非常复杂比CMS复杂&#xff0c;如果大内存&#xff0c;G1还比较有效果&#xff0c;但是如…

leetcode-数组

1.二分法手撕704&#xff08;诀窍在于用合法区间判断&#xff09;230810 左闭右闭: while(left<right)合法&#xff0c;middle(leftright)/2, if(nums[middle]>target)说明nums[middle]一定不是我们搜索的值&#xff0c;所以rightmiddle-1; elseif(nums[middle]<targe…

基于单片机的太阳跟踪系统的设计

欢迎大家点赞、收藏、关注、评论啦 &#xff0c;由于篇幅有限&#xff0c;只展示了部分核心代码。 技术交流认准下方 CSDN 官方提供的联系方式 文章目录 概要 一、设计的主要内容二、硬件电路设计2.1跟踪控制方案的选择2.1.1跟踪系统坐标系的选择2.2系统总体设计及相关硬件介绍…