多线程交互时,满足条件才去执行,否则阻塞一直到满足条件。当然可以用wait/notify实现。
本文用JUC包下的reentrantlock和其条件变量来完成。
文章目录
- 首先定义Predicate 和GuardAction;
- 然后定义Blocker
- 如何使用
- 完整代码如下
- 嵌套死锁问题
首先定义Predicate 和GuardAction;
Predicate:保护条件。只有保护条件为真(或为假),才去真正执行目标动作。
GuardAction : 抽象了目标动作(里面的call()方法),并关联了目标动作所需的保护条件(predicate)。意思是,如果predicate为真,才去调用call(),否者就去阻塞。当然阻塞和唤醒都是通过Blocker实现的。
interface Predicate {
boolean evaluate();
}
abstract class GuardAction<V> implements Callable<V> {
protected final Predicate guard;
public GuardAction(Predicate guard) {
this.guard = guard;
}
}
然后定义Blocker
它的作用是判断保护条件并阻塞调用的线程;唤醒阻塞的线程
interface Blocker {
/**
* 在保护条件成立时立即执行目标动作。
* 否则阻塞当前线程,直到保护条件成立
*/
<V> V callWithGuard(GuardAction<V> guardAction) throws Exception;
/**
* 在执行stateOperation所指定的操作后,决定是否唤醒本Blocker所暂挂的所有线程中的一个
*/
void signalAfter(Callable<Boolean> stateOperation) throws Exception;
void signal() throws InterruptedException;
/**
* 在执行stateOperation所指定的操作后,决定是否唤醒本Blocker所暂挂的所有线程
*/
void broadcastAfter(Callable<Boolean> stateOperation) throws Exception;
}
@Slf4j
class ConditionVarBlocker implements Blocker {
private final Lock lock;
private final Condition condition;
private final boolean allowAccess2Lock;
public ConditionVarBlocker(Lock lock) {
this(lock, true);
}
public ConditionVarBlocker(Lock lock, boolean flag) {
this.lock = lock;
this.allowAccess2Lock = flag;
this.condition = lock.newCondition();
}
public ConditionVarBlocker() {
this(false);
}
public ConditionVarBlocker(boolean flag) {
this(new ReentrantLock(), flag);
}
public Lock getLock() {
if (allowAccess2Lock) {
return this.lock;
}
throw new IllegalStateException("Access to this lock disallowed");
}
@Override
public <V> V callWithGuard(GuardAction<V> guardAction) throws Exception {
lock.lockInterruptibly();
V result;
try {
final Predicate guard = guardAction.guard;
while (!guard.evaluate()) {
log.info("waiting...");
condition.await();
}
result = guardAction.call();
return result;
} finally {
lock.unlock();
}
}
@Override
public void signalAfter(Callable<Boolean> stateOperation) throws Exception {
lock.lockInterruptibly();
try {
if (stateOperation.call()) {
condition.signal();
}
} finally {
lock.unlock();
}
}
@Override
public void signal() throws InterruptedException {
lock.lockInterruptibly();
try {
condition.signal();
} finally {
lock.unlock();
}
}
@Override
public void broadcastAfter(Callable<Boolean> stateOperation) throws Exception {
lock.lockInterruptibly();
try {
if (stateOperation.call()) {
condition.signalAll();
}
}finally {
lock.unlock();
}
}
}
如何使用
AlarmAgent是给告警服务器发送告警消息的。调用其sendAlarm()就可以发送。
但是必须与告警服务器建立了连接后才能发送。
若连接未建立(或者连接中断),sendAlarm()的执行线程应该被暂挂直到连接建立完毕(或者恢复)。
- 首先我们定义好什么是需要保护的条件Predicate:是否与服务器建立了连接
- 然后定义好此条件为true时,我们需要做什么。也就是写好guardAction:发送消息给服务器
- 通过Blocker调用
按照上面的方法,写出的代码如下:
class AlarmAgentDemo {
private volatile boolean connectedToServer = false;
private final Predicate predicate = new Predicate() {
@Override
public boolean evaluate() {
return connectedToServer;
}
};
private final GuardAction<Void> guardAction = new GuardAction<Void>(predicate) {
@Override
public Void call() throws Exception {
// 发送告警日志给服务器
return null;
}
};
private final Blocker blocker = new ConditionVarBlocker();
public void sendAlarm() {
// 当前main线程就是执行操作的线程。如果连接到了服务器,就直接发送;否者就去阻塞
try {
blocker.callWithGuard(guardAction);
} catch (Exception e) {
e.printStackTrace();
}
}
public static void main(String[] args) {
AlarmAgentDemo alarmAgentDemo = new AlarmAgentDemo();
alarmAgentDemo.sendAlarm();
}
}
继续完善:
4. 和服务器连接的代码:连接成功,就修改状态
5. 失连后重连的代码:搞一个定时任务(心跳),轮询步骤4的代码。
public void init() {
// 1.连接服务器
connectServer();
// 2.开启定时任务监控与服务器的连接状态
new Timer(true).schedule(new TimerTask() {
@Override
public void run() {
if(!testConnection()) {
// 失连后,重新连接
connectedToServer = false;
connectServer();
// 唤醒被阻塞的线程
try {
blocker.broadcastAfter(() -> {
return connectedToServer;
});
} catch (Exception e) {
e.printStackTrace();
}
}
}
// 测试与服务器的连接状态
private boolean testConnection() {
return new Random().nextBoolean();
}
}, 60000, 2000);
}
private void connectServer() {
// 模拟连接
try {
Thread.sleep(100);
} catch (InterruptedException e) {
e.printStackTrace();
}
// 连接成功
connectedToServer = true;
}
完整代码如下
@Slf4j
class AlarmAgentDemo {
private volatile boolean connectedToServer = false;
private final Predicate predicate = new Predicate() {
@Override
public boolean evaluate() {
return connectedToServer;
}
};
private final GuardAction<Void> guardAction = new GuardAction<Void>(predicate) {
@Override
public Void call() throws Exception {
// 发送告警日志给服务器
log.info("send message to server success");
return null;
}
};
private final Blocker blocker = new ConditionVarBlocker();
public void sendAlarm() {
// 当前main线程就是执行操作的线程。如果连接到了服务器,就直接发送;否者就去阻塞
try {
blocker.callWithGuard(guardAction);
} catch (Exception e) {
e.printStackTrace();
}
}
public void init() {
// 1.连接服务器
connectServer();
// 2.开启定时任务监控与服务器的连接状态
new Timer(true).schedule(new TimerTask() {
@Override
public void run() {
if(!testConnection()) {
log.info("失连,重连服务器中...");
// 失连后,重新连接
connectedToServer = false;
connectServer();
// 唤醒被阻塞的线程
try {
blocker.broadcastAfter(() -> {
return connectedToServer;
});
} catch (Exception e) {
e.printStackTrace();
}
} else {
log.info("与服务器正常连接中");
}
}
// 测试与服务器的连接状态
private boolean testConnection() {
return new Random().nextBoolean();
}
}, 6000, 2000);
}
private void connectServer() {
// 模拟连接
try {
Thread.sleep(new Random().nextInt(50000));
} catch (InterruptedException e) {
e.printStackTrace();
}
log.info("connect server success");
// 连接成功
connectedToServer = true;
}
public static void main(String[] args) {
AlarmAgentDemo alarmAgentDemo = new AlarmAgentDemo();
alarmAgentDemo.init();
for (int i = 0; i < 100; i++) {
try {
Thread.sleep(new Random().nextInt(1000));
} catch (InterruptedException e) {
e.printStackTrace();
}
alarmAgentDemo.sendAlarm();
}
}
}
嵌套死锁问题
文末最后讨论了一个嵌套死锁问题。
代码大概是这样的:
这两个方法在不同的线程中运行,可能导致死锁。
因为方法1可能会在条件变量上await(),释放了lock锁。但是外层的synchronized锁没有释放。导致方法2一直被阻塞。而方法1又需要方法2通知才能继续运行。
所以在上面的block初始化时,可以传入一个lock锁实例,这样就可以避免了锁嵌套问题。
ps: synchronized释放锁的条件:代码执行完、抛出异常、调用了wait方法。
方法1:
synchronized (obj) {
blocker.callWithGuard(); //
}
方法2:
synchronized (obj) {
blocker.signalAfter(); //
}
原文代码:
本文作者:WKP9418
原文地址:https://blog.csdn.net/qq_43179428/article/details/141760448