目录
wait , notify
wait vs sleep
正确使用方法
同步保护性暂停
join的源码
Future
异步生产者/消费者模型
定义
Park & Unpark
原理
wait , notify
小故事小南需要烟才能工作,但它又要占这锁让别人无法进来。那么这个时候开一个waitSet相当于就是休息室让小南进去。并且释放锁。如果烟到了,那么notify小南就能够继续工作了。
Blocked和Waiting区别其实就是waiting是释放了锁,blocked是没有锁waiting被notify之后仍然需要进入到entrylist进行等待。
@Slf4j(topic = "c.TestWaitNotify") public class Test { // 锁对象 final static Object obj = new Object(); public static void main(String[] args) throws InterruptedException { new Thread(() -> { synchronized (obj) { log.debug("执行...."); try { obj.wait(); // 让线程在obj上一直等待下去 } catch (InterruptedException e) { e.printStackTrace(); } log.debug("其它代码...."); } },"t1").start(); new Thread(() -> { synchronized (obj) { log.debug("执行...."); try { obj.wait(); // 让线程在obj上一直等待下去 } catch (InterruptedException e) { e.printStackTrace(); } log.debug("其它代码...."); } },"t2").start(); // 主线程两秒后执行 Thread.sleep(5000); log.debug("唤醒 obj 上其它线程"); synchronized (obj) { // obj.notify(); // 唤醒obj上一个线程 obj.notifyAll(); // 唤醒obj上所有等待线程 } } }
20:17:53.579 [t1] DEBUG c.TestWaitNotify - 执行.... 20:17:53.581 [t2] DEBUG c.TestWaitNotify - 执行.... 20:17:58.584 [main] DEBUG c.TestWaitNotify - 唤醒 obj 上其它线程 20:17:58.584 [t2] DEBUG c.TestWaitNotify - 其它代码.... 20:17:58.584 [t1] DEBUG c.TestWaitNotify - 其它代码.... 进程已结束,退出代码0
wait vs sleep
sleep:Thread调用,静态方法,而且不会释放锁
wait:所有obj,但是要配合synchronize使用,可以释放锁
sleep在睡眠时,不会释放锁,wait会释放对象锁
通常锁会加上final防止被修改
正确使用方法
小南需要烟才能工作,如果是使用sleep不释放锁,那么其他需要等待干活的人就会干等着,等烟来。但是wait可以让小南释放锁,让其他线程工作,并且唤醒小南
@Slf4j(topic = "c.TestCorrectPosture") public class Test { static final Object room = new Object(); // 有无烟 static boolean hasCigarette = false; static boolean hasTakeout = false; public static void main(String[] args) throws InterruptedException { new Thread(() -> { synchronized (room) { log.debug("有烟没?[{}]", hasCigarette); if (!hasCigarette) { log.debug("没烟,先歇会!"); try { Thread.sleep(2000); } catch (InterruptedException e) { e.printStackTrace(); } } log.debug("有烟没?[{}]", hasCigarette); if (hasCigarette) { log.debug("可以开始干活了"); } } }, "小南").start(); for (int i = 0; i < 5; i++) { new Thread(() -> { synchronized (room) { log.debug("可以开始干活了"); } }, "其它人").start(); } Thread.sleep(1000); // 送烟线程 new Thread(() -> { synchronized (room) { hasCigarette = true; log.debug("烟到了噢!"); } }, "送烟的").start(); } }
20:32:22.014 [小南] DEBUG c.TestCorrectPosture - 有烟没?[false] 20:32:22.019 [小南] DEBUG c.TestCorrectPosture - 没烟,先歇会! 20:32:24.024 [小南] DEBUG c.TestCorrectPosture - 有烟没?[false] 20:32:24.024 [送烟的] DEBUG c.TestCorrectPosture - 烟到了噢! 20:32:24.024 [其它人] DEBUG c.TestCorrectPosture - 可以开始干活了 20:32:24.024 [其它人] DEBUG c.TestCorrectPosture - 可以开始干活了 20:32:24.024 [其它人] DEBUG c.TestCorrectPosture - 可以开始干活了 20:32:24.024 [其它人] DEBUG c.TestCorrectPosture - 可以开始干活了 20:32:24.024 [其它人] DEBUG c.TestCorrectPosture - 可以开始干活了 进程已结束,退出代码0
存在的问题 :
-
其它干活的线程,都要一致阻塞,效率低
-
就算烟提前送到,也无法立刻醒来
-
送烟加上锁之后,相当于门一直锁着,烟送不进去
改进 :
@Slf4j(topic = "c.TestCorrectPosture") public class Test { static final Object room = new Object(); // 有无烟 static boolean hasCigarette = false; static boolean hasTakeout = false; public static void main(String[] args) throws InterruptedException { new Thread(() -> { synchronized (room) { log.debug("有烟没?[{}]", hasCigarette); if (!hasCigarette) { log.debug("没烟,先歇会!"); try { room.wait(); } catch (InterruptedException e) { e.printStackTrace(); } } log.debug("有烟没?[{}]", hasCigarette); if (hasCigarette) { log.debug("可以开始干活了"); } } }, "小南").start(); for (int i = 0; i < 5; i++) { new Thread(() -> { synchronized (room) { log.debug("可以开始干活了"); } }, "其它人").start(); } Thread.sleep(1000); // 送烟线程 new Thread(() -> { synchronized (room) { hasCigarette = true; log.debug("烟到了噢!"); room.notify(); // 叫醒小南线程 } }, "送烟的").start(); } }
存在问题
会不会有其他线程在等待着锁?如果是那么会不会唤醒错了线程?(虚假唤醒)
解决 :
可以通过while多次判断条件是否成立,直接使用notifyAll来唤醒所有的线程。然后线程被唤醒之后先再次判断条件是否成立,成立那么往下面执行,如果不成立那么继续执行wait。
while (!hasCigarette) { log.debug("没烟,先歇会!"); try { room.wait(); } catch (InterruptedException e) { e.printStackTrace(); } }
正确使用 :
synchronized(lock){ while(条件不成立){ lock.wait(); } // 干活 } // 另一个线程 synchronized(lock){ lock.notifyAll(); }
同步保护性暂停
定义
-
t1需要t2的结果,那么就可以通过一个中间对象guardedObject来充当这个中间商,t2执行完就发送消息到obj,然后obj交给t1
-
如果是不断发送结果那么可以使用消息队列
-
要等待所以是同步
-
join和future就是用的这个原理
public class Test { public static void main(String[] args) { GuaObj guaObj = new GuaObj(); Thread thread = new Thread(() -> { System.out.println("锁住,等待结果"); guaObj.get(2000); System.out.println("解锁"); }, "t1"); thread.start(); Thread thread1 = new Thread(() -> { System.out.println("先睡两秒"); try { Thread.sleep(2000); } catch (InterruptedException e) { e.printStackTrace(); } System.out.println("解锁,设置对象"); guaObj.set(new Object()); }, "t2"); thread1.start(); } } class GuaObj{ // 结果 public Object response; // 获取结果 // timeout表示最多等多久 public Object get(long timeout){ synchronized (this){ // 开始时间 long cur = System.currentTimeMillis(); // 经历的时间 long paseTime=0; while(response==null){ try { // 这一轮应该等的时间 long waitTime=timeout-paseTime; //超时就不等了 if(waitTime<=0) break; this.wait(waitTime); paseTime=System.currentTimeMillis()-cur; } catch (InterruptedException e) { e.printStackTrace(); } } System.out.println("等待结束"); return response; } } // 产生结果 public void set(Object response){ synchronized (this){ this.response=response; this.notifyAll(); } } }
锁住,等待结果 先睡两秒 解锁,设置对象 等待结束 解锁 进程已结束,退出代码0
-
需要记录超时的时间,并且重新设置waittime,原因是可能会有虚假唤醒,那么这个时候超时时间不是timeout而是timeout-passedTime,也就是线程执行的时间。
-
如果超时的话,那么就会自动结束
join的源码
public final synchronized void join(long millis) throws InterruptedException { //一开始的时间 long base = System.currentTimeMillis(); //线程执行的时间 long now = 0; //如果是<0那么就抛出异常 if (millis < 0) { throw new IllegalArgumentException("timeout value is negative"); } //如果是0那么就一直等待线程执行完,isAlive是否生存 if (millis == 0) { while (isAlive()) { wait(0); } } else { //timeout超时那么就结束 while (isAlive()) { long delay = millis - now; if (delay <= 0) { break; } wait(delay); now = System.currentTimeMillis() - base; } } }
Future
相当于就是一个信箱,里面装了很多GuardObject对象,线程可以通过对应的地址访问对象获取结果
异步生产者/消费者模型
定义
相当于就是生产者给队列生产结果,消费者负责处理结果
-
不需要一一对应
-
平衡资源
-
消息队列有容量控制
-
阻塞队列控制结果出队列
public class Test { public static void main(String[] args) { MesageQueue queue = new MesageQueue(2); for (int i = 0; i < 3; i++) { int id = i; new Thread(() -> { queue.set(new Message(id, "值" + id)); },"生产者" + i).start(); } new Thread(() -> { while(true){ try { Thread.sleep(1000); } catch (InterruptedException e) { throw new RuntimeException(e); } Message message = queue.take(); } }, "消费者").start(); } } @Slf4j class MesageQueue{ //存消息的集合 private LinkedList<Message> list = new LinkedList(); // 消息容量 private int capacity; public MesageQueue(int capacity){ this.capacity = capacity; } // 获取消息 public Message take() { // 检查队列是否为空 synchronized (list){ while(list.isEmpty()){ try { log.debug("队列为空,消费者线程等待"); list.wait(); } catch (InterruptedException e) { throw new RuntimeException(e); } } Message message = list.removeFirst(); log.debug("已经消费了消息 {}",message); list.notifyAll(); return message; } } // 存入消息 public void set(Message message) { // 检查是不是满了 synchronized (list){ while(list.size() == capacity){ try { log.debug("队列已满,生产者线程等待"); list.wait(); } catch (InterruptedException e) { throw new RuntimeException(e); } } list.addLast(message); log.debug("已经生产了消息 {}",message); list.notifyAll(); } } } // 消息类 final class Message{ private int id; private Object value; public Message(int id, Object value){ this.id = id; this.value = value; } public int getId() { return id; } public Object getValue() { return value; } @Override public String toString() { return "Message{" + "id=" + id + ", value=" + value + '}'; } }
12:58:24.373 [生产者1] DEBUG MesageQueue - 已经生产了消息 Message{id=1, value=值1} 12:58:24.375 [生产者2] DEBUG MesageQueue - 已经生产了消息 Message{id=2, value=值2} 12:58:24.377 [生产者0] DEBUG MesageQueue - 队列已满,生产者线程等待 12:58:25.371 [消费者] DEBUG MesageQueue - 已经消费了消息 Message{id=1, value=值1} 12:58:25.371 [生产者0] DEBUG MesageQueue - 已经生产了消息 Message{id=0, value=值0} 12:58:26.386 [消费者] DEBUG MesageQueue - 已经消费了消息 Message{id=2, value=值2} 12:58:27.397 [消费者] DEBUG MesageQueue - 已经消费了消息 Message{id=0, value=值0} 12:58:28.405 [消费者] DEBUG MesageQueue - 队列为空,消费者线程等待
Park & Unpark
与wait和notify的区别
-
不需要与monitor一起使用
-
可以精准唤醒和阻塞线程
-
可以先unpark,但是不能先notify。但是unpark之后park不起作用。
原理
①park,先去到counter里面判断是不是0,如果是那么就让线程进入队列。接着就是把counter设置为0
②unpark,那么唤醒线程,恢复运行,并且把counter设置为1
③先unpark后park,那么就unpark补充counter为1,那么park判断counter是1,认为还有体力可以继续执行。