文章目录
- 同步模式之保护性暂停
- 带超时版 GuardedObject
- join 原理
- 多任务版 GuardedObject
- 异步模式之生产者/消费者模式
- 标准库中的阻塞队列
- 阻塞队列的实现
- 加锁实现
- 生产者消费者模型的作用是什么
同步模式之保护性暂停
定义
- 即 Guarded Suspension,用在一个线程等待另一个线程的执行结果
要点
- 有一个结果需要从一个线程传递到另一个线程,让他们关联同一个 GuardedObject
- 如果有结果不断从一个线程到另一个线程那么可以使用消息队列(见生产者/消费者)
- JDK 中,join 的实现、Future 的实现,采用的就是此模式
- 因为要等待另一方的结果,因此归类到同步模式
带超时版 GuardedObject
public class GuardedObject {
private Object response;
private final Object lock = new Object();
public Object get(long time){
//1记录最初时间
long startTime = System.currentTimeMillis();
//2记录还需要等待的时间
long passedTime = 0;
synchronized (lock){
while (response==null){
//4计算当前还需要等待多久时间
long waitTime=time-passedTime;
if (waitTime <= 0) {
System.out.println("break");
break;
}
try {
lock.wait(waitTime);
} catch (InterruptedException e) {
e.printStackTrace();
}
//如果提前被唤醒(以为存在被虚假唤醒的情况)
passedTime = System.currentTimeMillis() - startTime;
System.out.println("已经过去了"+passedTime);
}
return response;
}
}
public void complete(Object response){
synchronized (lock){
this.response=response;
System.out.println("notify");
lock.notifyAll();
}
}
}
测试
public class Demo1 {
public static void main(String[] args) {
GuardedObject v1 = new GuardedObject();
new Thread(()->{
try {
TimeUnit.SECONDS.sleep(1);
v1.complete(null);
} catch (InterruptedException e) {
e.printStackTrace();
}
},"t1").start();
Object o = v1.get(2500);
if(o!=null){
System.out.println(o);
}else {
System.out.println("不能获取response");
}
}
}
notify
已经过去了1002
已经过去了2504
break
不能获取response
join 原理
public final synchronized void join(long millis) throws InterruptedException {
long base = System.currentTimeMillis();
long now = 0;
if (millis < 0) {
throw new IllegalArgumentException("timeout value is negative");
}
if (millis == 0) {
while (isAlive()) {
wait(0);
}
} else {
while (isAlive()) {
long delay = millis - now;
if (delay <= 0) {
break;
}
wait(delay);
now = System.currentTimeMillis() - base;
}
}
}
- join(0)表示一直等待,因为wait(0)表示一直等待
- 这里的等待的结果其实就是对应线程是否存活,因为我们调用这个方法是采用对象.join()
- delay就是对应还需要等待多久时间 now就是已经过去多久时间
多任务版 GuardedObject
图中 Futures 就好比居民楼一层的信箱(每个信箱有房间编号),左侧的 t0,t2,t4 就好比等待邮件的居民,右侧的 t1,t3,t5 就好比邮递员
- 如果需要在多个类之间使用 GuardedObject 对象,作为参数传递不是很方便,因此设计一个用来解耦的中间类,这样不仅能够解耦【结果等待者】和【结果生产者】,还能够同时支持多个任务的管理
public class GuardedObjects {
// 标识 Guarded Object
private int id;
public GuardedObjects(int id) {
this.id = id;
}
public int getId() {
return id;
}
//结果
private Object response;
//获取结果
//timeOut表示要等待多久
public Object get(long timeout){
synchronized (this){
//1开始时间
long startTime = System.currentTimeMillis();
long passedTime= 0;
while (response==null){
long waitTime = timeout - passedTime;
if(waitTime<=0){
break;
}
try {
this.wait(waitTime);
} catch (InterruptedException e) {
e.printStackTrace();
}
passedTime = System.currentTimeMillis() - startTime;
}
return response;
}
}
public void complete(Object object){
synchronized (this){
this.response=object;
this.notifyAll();
}
}
}
邮箱结构
public class Mailboxes {
private static Map<Integer,GuardedObjects> boxes = new Hashtable<>();
private static int id = 1;
//产生唯一id
private static synchronized int generateId(){
return id++;
}
public static GuardedObjects getGuardedObject(int id) {
return boxes.remove(id);
}
public static GuardedObjects createGuardedObject() {
GuardedObjects go = new GuardedObjects(generateId());
boxes.put(go.getId(), go);
return go;
}
public static Set<Integer> getIds() {
return boxes.keySet();
}
}
测试
class People extends Thread{
@Override
public void run() {
// 收信
GuardedObjects guardedObject = Mailboxes.createGuardedObject();
System.out.println("开始收信 id:"+guardedObject.getId());
Object mail = guardedObject.get(5000);
System.out.println("收到信 id:"+guardedObject.getId() + "内容为"+ mail);
}
}
class Postman extends Thread {
private int id;
private String mail;
public Postman(int id, String mail) {
this.id = id;
this.mail = mail;
}
@Override
public void run() {
GuardedObjects guardedObject = Mailboxes.getGuardedObject(id);
System.out.println("送信 id为"+id+"内容为"+mail);
guardedObject.complete(mail);
}
}
public class Demo2 {
public static void main(String[] args) throws InterruptedException {
for (int i = 0; i < 3; i++) {
new People().start();
}
TimeUnit.SECONDS.sleep(1);
for (Integer id : Mailboxes.getIds()) {
new Postman(id,"你好"+id).start();
}
}
}
异步模式之生产者/消费者模式
生产者消费者模式就是通过一个容器来解决生产者和消费者的强耦合问题。
生产者和消费者彼此之间不直接通讯,而通过阻塞队列来进行通讯,所以生产者生产完数据之后不用等 待消费者处理,直接扔给阻塞队列,消费者不找生产者要数据,而是直接从阻塞队列里取.
-
阻塞队列就相当于一个缓冲区,平衡了生产者和消费者的处理能力.
- 比如在 " 秒杀 " 场景下 , 服务器同一时刻可能会收到大量的支付请求 . 如果直接处理这些支付请求 , 服务器可能扛不住( 每个支付请求的处理都需要比较复杂的流程 ). 这个时候就可以把这些请求都放 到一个阻塞队列中, 然后再由消费者线程慢慢的来处理每个支付请求 .这样做可以有效进行 " 削峰 ", 防止服务器被突然到来的一波请求直接冲垮 .
-
阻塞队列也能使生产者和消费者之间 解耦.
- 比如过年一家人一起包饺子 . 一般都是有明确分工 , 比如一个人负责擀饺子皮 , 其他人负责包 . 擀饺子皮的人就是 " 生产者 ", 包饺子的人就是 " 消费者 ". 擀饺子皮的人不关心包饺子的人是谁( 能包就行 , 无论是手工包 , 借助工具 , 还是机器包 ), 包饺子的人 也不关心擀饺子皮的人是谁( 有饺子皮就行 , 无论是用擀面杖擀的 , 还是拿罐头瓶擀 , 还是直接从超 市买的).
阻塞队列的大小
标准库中的阻塞队列
在 Java 标准库中内置了阻塞队列 . 如果我们需要在一些程序中使用阻塞队列 , 直接使用标准库中的即可 .
BlockingQueue 是一个接口 .真正实现的类是 LinkedBlockingQueue和ArrayBlockingQueue等
- put 方法用于阻塞式的入队列, take 用于阻塞式的出队列.
- BlockingQueue 也有 offer, poll, peek 等方法, 但是这些方法不带有阻塞特性.
- 必须先生产 ,再消费,不然会阻塞
通过构造方法来确定阻塞队列的大小
若没有声明数字,就表示无界
阻塞队列的实现
- 通过 “循环队列” 的方式来实现.
- 使用 synchronized 进行加锁控制.
- put 插入元素的时候, 判定如果队列满了, 就进行 wait. (注意, 要在循环中进行 wait. 被唤醒时不一 定队列就不满了, 因为同时可能是唤醒了多个线程).
- take 取出元素的时候, 判定如果队列为空, 就进行 wait. (也是循环 wait)
加锁实现
要点
- 与前面的保护性暂停中的 GuardObject 不同,不需要产生结果和消费结果的线程一一对应
- 消费队列可以用来平衡生产和消费的线程资源
- 生产者仅负责产生结果数据,不关心数据该如何处理,而消费者专心处理结果数据
- 消息队列是有容量限制的,满时不会再加入数据,空时不会再消耗数据
- JDK 中各种阻塞队列,采用的就是这种模式
简单实现线程间阻塞队列
class Message {
private int id;
private Object message;
public Message(int id, Object message) {
this.id = id;
this.message = message;
}
public int getId() {
return id;
}
public Object getMessage() {
return message;
}
}
class BlockingQueue {
private LinkedList<Message> queue;
private int capacity;
public BlockingQueue(int capacity){
this.capacity=capacity;
queue = new LinkedList<>();
}
public Message take(){
synchronized (queue){
while (queue.isEmpty()){
System.out.println("没货了,wait");
try {
queue.wait();
} catch (InterruptedException e) {
e.printStackTrace();
}
}
Message message = queue.removeFirst();
System.out.println("输出一个资源");
queue.notifyAll();
return message;
}
}
public void put(Message message){
synchronized (queue){
while (queue.size()==capacity){
System.out.println("队列满了,wait");
try {
TimeUnit.SECONDS.sleep(1);
queue.wait();
} catch (InterruptedException e) {
e.printStackTrace();
}
}
queue.push(message);
System.out.println("进入一个资源");
queue.notifyAll();
}
}
}
- put 插入元素的时候, 判定如果队列满了, 就进行 wait. (注意, 要在循环中进行 wait. 被唤醒时不一定队列就不满了, 因为同时可能是唤醒了多个线程).
- take 取出元素的时候, 判定如果队列为空, 就进行 wait. (也是循环 wait)
测试
public class BlockingQueueDemo{
public static void main(String[] args) {
BlockingQueue blockingQueue = new BlockingQueue(2);
// 4 个生产者线程
for (int i = 0; i < 4; i++) {
int id = i;
new Thread(()->{
blockingQueue.put(new Message(id,"资源"+id));
},"t1").start();
}
// 1 个消费者线程, 处理结果
new Thread(() -> {
while (true) {
Message message = blockingQueue.take();
System.out.println("id为"+message.getId()+" 信息为"+message.getMessage());
}
}, "消费者").start();
}
}
没货了,wait
进入一个资源
输出一个资源
进入一个资源
进入一个资源
id为2 信息为资源2
队列满了,wait
输出一个资源
id为0 信息为资源0
输出一个资源
id为3 信息为资源3
进入一个资源
输出一个资源
id为1 信息为资源1
没货了,wait
生产者消费者模型的作用是什么
这个问题很理论,但是很重要:
- 1 通过平衡生产者的生产能力和消费者的消费能力来提升整个系统的运行效率,这是生产者消费者模型最重要的作用
- 2解耦,这是生产者消费者模型附带的作用,解耦意味着生产者和消费者之间的联系少,联系越少越可以独自发展而不需要收到相互的制约