1.多线程设计模式
1.1.同步模式之保护性暂停
1.1.1.定义
1>.即Guarded Suspension,用在一个线程等待另一个线程的执行结果的场景中;
2>.使用场景
①.有一个结果(数据)需要从一个线程传递到另一个线程,让他们关联同一个 GuardedObject;
②.如果有结果(数据)不断从一个线程到另一个线程那么可以使用消息队列;
③.JDK中,join的实现、Future的实现,采用的就是此模式;
④.因为要等待另一方的结果,因此归类到同步模式;
1.1.2.代码实现
@Slf4j
public class TestGuardedObject {
public static void main(String[] args) {
//创建用于线程之间传递结果的桥梁类
GuardedObject guardedObject = new GuardedObject();
//线程1等待线程2的处理结果
new Thread(() -> {
//线程t1调用该方法获取线程t2的结果会被阻塞
log.info("线程t1等待线程t2的执行结果");
Object result = guardedObject.get();
log.info(result.toString());
}, "t1").start();
//线程2处理业务,将结果传递到GuardedObject中传递到线程1
new Thread(() -> {
log.info("线程t2处理业务...");
Integer result = 0;
for (int i = 0; i < 10; i++) {
result = result + i;
}
//将处理结果传递到线程t2
//方法内部会唤醒其他处于WAITING状态的线程
guardedObject.complete(result);
}, "t2").start();
}
}
//创建一个GuardedObject桥梁对象用于在线程之间传递结果
class GuardedObject {
//要传递的结果,共享变量
private Object response;
//获取结果
public Object get() {
synchronized (this) {
//线程进入等待状态的条件,为了防止虚假唤醒,需要使用while循环进行不停的判断,而不是只判断一次
while (this.response == null) {
//没有获取到结果,等待
try {
this.wait();
} catch (InterruptedException e) {
e.printStackTrace();
}
}
return this.response;
}
}
//产生结果
public void complete(Object response) {
synchronized (this) {
//给成员变量赋值
this.response = response;
//唤醒所有处于WAITING状态的线程
this.notifyAll();
}
}
}
可以看到使用保护性暂停同步模式可以让多个线程同时执行(/一个线程等待另一个线程的结果),而join必须一个等待某一个线程执行完毕,该线程才能继续执行;
1.2.同步模式之保护性暂停增强1
1.2.1.带超时版GuardedObject
@Slf4j
public class TestGuardedObject {
public static void main(String[] args) {
//创建用于线程之间传递结果的桥梁类
GuardedObject guardedObject = new GuardedObject();
//线程1等待线程2的处理结果
new Thread(() -> {
//线程t1调用该方法获取线程t2的处理结果会被阻塞
log.info("线程t1等待线程t2的执行结果");
Object result = guardedObject.get(2000);
log.info("线程t1获取到的结果:{}",result == null ? null : result.toString());
}, "t1").start();
//线程2处理业务,将结果传递到GuardedObject中传递到线程1
new Thread(() -> {
log.info("线程t2处理业务...");
try {
TimeUnit.SECONDS.sleep(1);
} catch (InterruptedException e) {
e.printStackTrace();
}
Integer result = 0;
for (int i = 0; i < 10; i++) {
result = result + i;
}
//将处理结果传递到线程t2
//方法内部会唤醒其他处于WAITING状态的线程
log.info("业务处理完成,准备唤醒其他线程...");
guardedObject.complete(result);
}, "t2").start();
}
}
//创建一个GuardedObject桥梁对象用于在线程之间传递结果
@Slf4j
class GuardedObject {
//要传递的结果,共享变量
private Object response;
//获取结果
//timeout表示要等待的最大时间
public Object get(long timeout) {
synchronized (this) {
long begin = System.currentTimeMillis();
long passTime = 0; //记录线程已经等待的时间
//线程进入等待状态的条件,为了防止虚假唤醒,需要使用while循环进行不停的判断,而不是只判断一次!
while (response == null) {
//waitTime表示本轮循环中线程要等待的时间,如果该值小于等于0就表示,线程无需再等待了!!!
long waitTime = timeout - passTime;
if (waitTime <= 0) {
log.info("线程等待时间已到达,没有获取到结果,自动结束等待!");
//注意:由于目前使用while循环防止虚假唤醒,因此当线程等待超过一定的时间(/最大等待时间被用完),
//那么线程就需要自动结束等待继续往后执行,即需要退出while循环,避免再进入下一轮等待!!!
break;
}
//没有获取到结果,等待
try {
//注意:线程等待时间并不总是timeout,假如被提前虚假唤醒了,
//那么下一轮等待时间就是(等待的最大时间timeout - 上一轮已经等待过的时间)!!
this.wait(waitTime);
} catch (InterruptedException e) {
e.printStackTrace();
}
//线程已经等待的时间或者说线程经历过的时间
passTime = System.currentTimeMillis() - begin;
}
return response;
}
}
//产生结果
public void complete(Object response) {
synchronized (this) {
//给成员变量赋值
this.response = response;
//唤醒所有处于WAITING状态的线程
this.notifyAll();
}
}
}
1.3.join的原理
1>.join是Thread线程类中的方法;
2>.join源码
public final synchronized void join(long millis)
throws InterruptedException {
long base = System.currentTimeMillis();
long now = 0;
if (millis < 0) {
throw new IllegalArgumentException("timeout value is negative");
}
if (millis == 0) {
while (isAlive()) {
wait(0);
}
} else {
//join底层也使用了保护性暂停同步模式
while (isAlive()) {
long delay = millis - now;
if (delay <= 0) {
break;
}
wait(delay);
now = System.currentTimeMillis() - base;
}
}
}
1.4.同步模式之保护性暂停增强2
1.4.1.多任务版GuardedObject
1>.架构说明
说明:
①.图中Futures就好比居民楼一层的信箱(每个信箱有房间编号),左侧的t0,t2,t4就好比等待邮件的居民,右侧的t1,t3,t5就好比邮递员;
②.如果需要在多个类之间使用GuardedObject对象作为参数传递不是很方便,因此需要设计一个用来解耦的中间类,这样不仅能够解耦"结果等待者"和"结果生产者",还能够同时支持多个任务的管理;
2>.代码实现
@Slf4j
public class TestGuardedObject2 {
public static void main(String[] args) throws InterruptedException{
for (int i = 0; i < 3; i++) {
new People(i+1).start();
}
TimeUnit.SECONDS.sleep(1);
//找到所有的信箱编号
for (Integer id : MailBoxes.getIds()) {
//根据信箱编号找到信箱,将对应的信件
new Postman(id, "将信件投递到编号为{" + id + "}的信箱中").start();
}
}
}
//业务类,居民从对应的GuardedObject对象中获取信件
@Slf4j
class People extends Thread {
//居民编号
private int no;
public People(int no) {
this.no = no;
}
@Override
public void run() {
//关联指定的GuardedObject对象(信箱)
GuardedObject2 guardedObject = MailBoxes.createGuardedObject();
log.info("居民{}开始从编号为{}的信箱中收信", this.no, guardedObject.getId());
//如果获取到的信件为空,则该线程会释放对象锁进入WaitSet中变成WAITING状态进行等待一段时间
Object result = guardedObject.get(5000);
log.info("居民{}从编号为{}的信箱中收到内容为{}的信件", this.no, guardedObject.getId(), result == null ? null : result.toString());
}
}
//业务类,邮递员将信件投递到对应的GuardedObject对象中
@Slf4j
class Postman extends Thread {
//信件属性
private int id;
private String mail;
public Postman(int id, String mail) {
this.id = id;
this.mail = mail;
}
@Override
public void run() {
//根据信件上的信箱ID查询到指定的GuardedObject对象(信箱)
GuardedObject2 guardedObject = MailBoxes.getGuardedObject(id);
log.info("邮递员将信件投递到编号为{}的信箱中,信件内容为{}", guardedObject.getId(), mail);
//将信件投递到对应的GuardedObject对象(信箱)中
guardedObject.complete(mail);
}
}
//中间解耦类,里面管理了多个GuardedObject对象
class MailBoxes {
//存放多个GuardedObject对象的容器,共享变量;线程安全
private static Map<Integer, GuardedObject2> boxes = new ConcurrentHashMap<Integer, GuardedObject2>();
//共享变量
//ID就是信箱编号
private static int id = 1;
//产生唯一ID
private static synchronized int generateId() {
//临界区
return id++;
}
//根据信件ID取出Map容器中指定的信箱,用于邮递员投递信件
public static GuardedObject2 getGuardedObject(int id) {
//避免线创建的GuardedObject对象多而导致Map容器占用大量的堆内存,这里使用remove()方法删除每次取出的GuardedObject对象
return boxes.remove(id);
}
//创建信箱存入到Map容器中,便于后续使用
public static GuardedObject2 createGuardedObject() {
GuardedObject2 guardedObject2 = new GuardedObject2(generateId());
boxes.put(guardedObject2.getId(), guardedObject2);
return guardedObject2;
}
//获取所有信箱ID
public static Set<Integer> getIds() {
return boxes.keySet();
}
}
//信箱类,用于邮递员投递信件以及居民类接收信件
@Slf4j
class GuardedObject2 {
//标识多个GuardedObject对象
//信箱编号
private int id;
//要传递的结果,共享变量
private Object response;
public GuardedObject2(int id) {
this.id = id;
}
public int getId() {
return id;
}
//获取结果
//timeout表示要等待的最大时间
public Object get(long timeout) {
synchronized (this) {
long begin = System.currentTimeMillis();
long passTime = 0; //记录线程已经等待的时间
//线程进入等待状态的条件,为了防止虚假唤醒,需要使用while循环进行不停的判断,而不是只判断一次!
while (response == null) {
//waitTime表示本轮循环中线程要等待的时间,如果该值小于等于0就表示,线程无需再等待了!!!
long waitTime = timeout - passTime;
if (waitTime <= 0) {
log.info("线程等待时间已到达,没有获取到结果,自动结束等待!");
//注意:由于目前使用while循环防止虚假唤醒,因此当线程等待超过一定的时间(/最大等待时间被用完),
//那么线程就需要自动结束等待继续往后执行,即需要退出while循环,避免再进入下一轮等待!!!
break;
}
//没有获取到结果,等待
try {
//注意:线程等待时间并不总是timeout,假如被提前虚假唤醒了,
//那么下一轮等待时间就是(等待的最大时间timeout - 上一轮已经等待过的时间)!!
this.wait(waitTime);
} catch (InterruptedException e) {
e.printStackTrace();
}
//线程已经等待的时间或者说线程经历过的时间
passTime = System.currentTimeMillis() - begin;
}
return response;
}
}
//产生结果
public void complete(Object response) {
synchronized (this) {
//给成员变量赋值
this.response = response;
//唤醒所有处于WAITING状态的线程
this.notifyAll();
}
}
}
1.5.异步模式之生产者与消费者
1.5.1.定义
1>.与前面的保护性暂停中的GuardObject不同,不需要产生结果和消费结果的线程一一对应;
2>.消费队列可以用来平衡生产和消费的线程资源;
3>.生产者仅负责产生结果数据,不关心数据该如何处理,而消费者专心处理结果数据;
4>.消息队列是有容量限制的,满时不会再加入数据,空时不会再消耗数据;
5>.JDK中各种阻塞队列,采用的就是这种模式;
注意:这里的消息队列是用于在Java进程内多个线程之间进行通信的,而不是其他的如RabbitMQ,RocketMQ等下消息队列框架是在多个Java进程之间通信,千万别弄混了!!!
1.5.2.实现
@Slf4j
public class TestConsumerOrProduct {
public static void main(String[] args) {
MessageQueue messageQueue = new MessageQueue(2);
for (int i = 0; i < 3; i++) {
int id = i+1;
new Thread(() -> {
//注意:lambda表达式中引用的外部局部变量必须是"final"修饰的!!!
//可以在lambda表达式外部定义的一个局部变量接收外部的局部变量!
messageQueue.put(new Message(id, "消息:" + id));
}, "生产者线程:" + (i + 1)).start();
}
new Thread(()->{
while (true){
try {
TimeUnit.SECONDS.sleep(1);
} catch (InterruptedException e) {
e.printStackTrace();
}
messageQueue.take();
}
},"消费者线程").start();
}
}
//消息队列类,Java进程内多个线程之间的通信
@Slf4j
class MessageQueue {
//创建一个存储message对象的容器--双向链表
private LinkedList<Message> linkedList = new LinkedList<Message>();
private int capacity;
public MessageQueue(int capacity) {
this.capacity = capacity;
}
//消费消息
public Message take() {
synchronized (linkedList) {
while (linkedList.isEmpty()) {
try {
log.info("消息队列为空,无法消费消息,[{}]等待", Thread.currentThread().getName());
//消息队列为空,线程等待,释放对象锁
linkedList.wait();
} catch (InterruptedException e) {
e.printStackTrace();
}
}
//从链表的头部取元素(Message对象)
Message message = linkedList.removeFirst();
log.info("[{}]已经消费了一个消息{}", Thread.currentThread().getName(), message);
//消费了一个消息,消息队列中有空位,唤醒生产者线程,生产消息
linkedList.notifyAll();
return message;
}
}
//生产消息
public void put(Message message) {
synchronized (linkedList) {
while (linkedList.size() == this.capacity) {
try {
log.info("消息队列已满,无法生产消息,[{}]等待", Thread.currentThread().getName());
//消息队列已满,线程等待,释放对象锁
linkedList.wait();
} catch (InterruptedException e) {
e.printStackTrace();
}
}
//从链表的尾部添加元素(Message对象)
linkedList.addLast(message);
log.info("[{}]已经生产了一个消息{}", Thread.currentThread().getName(), message);
//生产一个消息,消息队列中不为空,唤醒消费者线程,消费消息
linkedList.notifyAll();
}
}
}
//消息类
//保证线程安全:①.final修饰,没有子类;②.没有修改属性的setter方法;
final class Message {
private int id;
private Object value;
public Message(int id, Object value) {
this.id = id;
this.value = value;
}
public int getId() {
return id;
}
public Object getValue() {
return value;
}
@Override
public String toString() {
return "Message{" +
"id=" + id +
", value=" + value +
'}';
}
}