一、同步模式之保护性暂停定义
保护性暂停即Guarded Suspension,用在一个线程等待另一个线程的执行结果。
要点
- 有一个结果需要从一个线程传递到另一个线程,让他们关联到同一个Guarded Object。
- 如果有结果不断从一个线程到另一个线程,那么可以使用消息队列(见生产者/消费者模型)
- JDK中,join的实现,Future的实现,采用的就是此模式
- 因为要等待另一方的结果,因此归类到同步模式
代码实例
public class GuardedObject {
//结果
private Object response;
/**
* 获取结果
* @param timeout:表示要等待的超时时间,单位毫秒
* @return Object
*/
public Object getResponse(long timeout){
synchronized (this){
//开始时间
long beginTime = System.currentTimeMillis();
//已经等待时间
long passedTime = 0;
while(response == null){
//这一轮循环应该等待的时间
long waitTime = timeout - passedTime;
//经历的时间超过了超时时间,则退出循环,不再等待
if(waitTime<=0){
break;
}
//等待结果
try {
//此处的wait时间为剩余的最大等待时间,如果中间被虚假唤醒,再次进入循环时,wait的等待时间会越来越小
this.wait(waitTime);
}catch (Exception e){
e.printStackTrace();
}
//经历的等待时间
passedTime = System.currentTimeMillis()-beginTime;
}
return response;
}
}
public void generateResponse(Object response){
synchronized (this){
//给结果成员变量赋值
this.response = response;
this.notifyAll();
}
}
}
测试代码实例
@Slf4j
public class TestGuarded {
public static void main(String[] args){
GuardedObject obj = new GuardedObject();
//线程1 等待线程2的结果
new Thread(()->{
log.info("begin......");
//2000表示最多等待2秒,2秒后不论是否拿到结果,都会返回response
Object response = obj.getResponse(3000);
log.info("结果为{}",response);
},"t1").start();
new Thread(()->{
log.info("begin......");
try {
Thread.sleep(1000);
}catch (Exception e){
e.printStackTrace();
}
obj.generateResponse(12);
},"t2").start();
}
}
测试结果
10:46:42.839 [t2] INFO com.atorientsec.TestGuarded - begin......
10:46:42.839 [t1] INFO com.atorientsec.TestGuarded - begin......
10:46:43.857 [t1] INFO com.atorientsec.TestGuarded - 结果为12
二、线程同步模式扩展
解耦的中间类代码示例:
/**
* 解耦多线程的中间类,通用的类
*/
public class Mailboxes {
private static Map<Integer,GuardedObject> boxes= new ConcurrentHashMap<>();
private static int id =0;
//产生唯一的id
private static synchronized int generateId(){
return id++;
}
public static GuardedObject getGuardedObject(int id){
return boxes.remove(id);
}
public static GuardedObject createGuardedObject(){
GuardedObject go = new GuardedObject(generateId());
boxes.put(go.getId(),go);
return go;
}
public static Set<Integer> getIds(){
return boxes.keySet();
}
}
GuardedObject改造后的代码示例:
//保护性暂停模式:一个等待者对应一个结果生产者
public class GuardedObject {
//结果
private Object response;
//标识GuardedObject
private int id;
public GuardedObject(int id) {
this.id = id;
}
public int getId() {
return id;
}
/**
* 获取结果
* @param timeout:表示要等待的超时时间,单位毫秒
* @return Object
*/
public Object getResponse(long timeout){
synchronized (this){
//开始时间
long beginTime = System.currentTimeMillis();
//已经等待时间
long passedTime = 0;
while(response == null){
//这一轮循环应该等待的时间
long waitTime = timeout - passedTime;
//经历的时间超过了超时时间,则退出循环,不再等待
if(waitTime<=0){
break;
}
//等待结果
try {
//此处的wait时间为剩余的最大等待时间,如果中间被虚假唤醒,再次进入循环时,wait的等待时间会越来越小
this.wait(waitTime);
}catch (Exception e){
e.printStackTrace();
}
//经历的等待时间
passedTime = System.currentTimeMillis()-beginTime;
}
return response;
}
}
public void generateResponse(Object response){
synchronized (this){
//给结果成员变量赋值
this.response = response;
this.notifyAll();
}
}
}
测试代码示例:
@Slf4j
public class TestGuarded {
public static void main(String[] args){
for (int i = 0; i < 3; i++) {
new People().start();
try {
Thread.sleep(1000);
}catch (Exception e){
e.printStackTrace();
}
}
try {
Thread.sleep(1000);
}catch (Exception e){
e.printStackTrace();
}
for (Integer id:Mailboxes.getIds()) {
new Postman(id,"内容"+id).start();
}
}
}
/**
* 收信
*/
@Slf4j(topic = "c.People")
class People extends Thread{
@Override
public void run(){
GuardedObject guardedObject = Mailboxes.createGuardedObject();
log.info("开始收信,id:{}",guardedObject.getId());
Object response = guardedObject.getResponse(5000);
log.info("收到信,id:{},内容:{}",guardedObject.getId(),response);
}
}
@Slf4j(topic = "c.Postman")
class Postman extends Thread{
private int id;
private String mail;
public Postman(int id,String mail){
this.id = id;
this.mail = mail;
}
@Override
public void run(){
GuardedObject guardedObject = Mailboxes.getGuardedObject(id);
log.info("送信 id:{},内容:{}",id,mail);
guardedObject.generateResponse(mail);
}
}