文章目录
- 死锁
- 哲学家就餐问题
- 死锁的检测方式
- 死锁的产生条件
- 死锁的规避
- 死锁的恢复
- 锁死
- 信号丢失锁死
- 嵌套监视器锁死
- 线程饥饿
- 活锁
死锁
概念
如果两个或者更多的线程因为相互等待对方而被永远暂停,线程的生命周期变成了BLOCKED或者WAITING,则我们称这些线程产生了死锁(DeadLock)
哲学家就餐问题
现在来用代码描述一下“哲学家就餐问题”,先将问题简化,假设只有两个哲学家面对面坐着,每个哲学家吃饭都先拿自己左手边的筷子,再拿右手边;当哲学家A拿起他左手边的的筷子,打算拿右边的时,哲学家B拿起了他自己左手边的筷子,而其正好是哲学家A的右手边,由于双方都等着拿自己右手边筷子,但自己又不会先放下左手中的筷子,这就导致了“死锁”。
1、哲学家的抽象类
public abstract class AbsPhilosopher extends Thread {
protected String id;
protected Chopstick left;
protected Chopstick right;
public AbsPhilosopher(String id, Chopstick left, Chopstick right) {
this.id = id;
this.left = left;
this.right = right;
}
@Override
public void run() {
for (; ; ) {
think();
eat();
}
}
protected abstract void eat();
protected void think() {
try {
Thread.sleep(1000);
} catch (InterruptedException e) {
e.printStackTrace();
}
System.out.printf("the %s is thinking\n", id);
}
protected void doEat() {
try {
Thread.sleep(1000);
} catch (InterruptedException e) {
e.printStackTrace();
}
System.out.printf("the %s is eating\n", id);
}
}
2、具有死锁的哲学家实现类
public class DeadLockPhilosopherImpl extends AbsPhilosopher {
public DeadLockPhilosopherImpl(String id, Chopstick left, Chopstick right) {
super(id, left, right);
}
@Override
protected void eat() {
synchronized (left) {
left.pickup();
synchronized (right) {
right.pickup();
//两根筷子同时拿起开始吃饭
doEat();
right.putDown();
}
left.putDown();
}
}
}
3、筷子类
public class Chopstick {
public final int id;
public Chopstick(int id) {
this.id = id;
}
public void pickup(){
}
public void putDown(){
}
}
4、客户端
public class PhilosopherTest {
public static void main(String[] args) {
int nums = 2;
//两个人,两只筷子
Chopstick[] chopsticks = new Chopstick[nums];
for (int i = 0; i < 2; i++) {
chopsticks[i] = new Chopstick(i);
}
//创建两个哲学家
for (int i = 0; i < nums; i++) {
AbsPhilosopher philosopher0 = new DeadLockPhilosopherImpl(String.valueOf(i), chopsticks[i], chopsticks[(i + 1) % nums]);
AbsPhilosopher philosopher1 = new DeadLockPhilosopherImpl(String.valueOf(i), chopsticks[i], chopsticks[(i + 1) % nums]);
philosopher0.start();
philosopher1.start();
}
}
}
死锁的检测方式
上述问题出现了死锁,有两种检测方式
1、启动客户端之后,在控制台输入jps,找到启动类的pid,进行jstack,即可发现死锁的产生:
2、另外一种本地检查死锁的方式,如果是win电脑的话,可以按win+r,输入jconsole,选择执行的demo后按如下操作可以快速定位死锁。
死锁的产生条件
资源互斥
涉及的资源必须是独立的,即一个资源只能被一个线程访问。比如一根筷子只能被一个哲学家使用。
资源不可抢夺
涉及到的资源只能被线程持有者主动释放,其他线程无法抢夺。比如筷子只能由当前哲学家进行放下,其他哲学家不能抢夺。
占用并等待资源
涉及到的线程当前至少持有一份资源,并等待其他线程持有的资源,在资源等待中,当前线程并不释放自己持有的资源。比如哲学家A拿起自己左手的筷子,并申请他右手边的筷子,但并不会放下左手中的筷子。
循环等待资源
涉及的线程必须在等待别的线程的资源,而这些线程又等待第一个线程的资源。第一个哲学家在等待第二个哲学家左手的筷子,第二个哲学家在等待第一个哲学家的左手的筷子。
死锁的规避
死锁的四个产生条件时必要不充分条件,产生死锁的话,这四个条件一定要同时成立,但这四个条件同时成立不一定产生死锁。所以只要消除死锁的任意一个条件就可以规避死锁。
粗锁法:使用一个粗粒度的锁来代替多个锁,从而消除“占用并等待资源”和“循环等待资源”这两个条件。
public class GlobalLockPhilosopherImpl extends AbsPhilosopher {
//使用static变量
private static final Object GLOBAL_LOCK = "lock";
public GlobalLockPhilosopherImpl(String id, Chopstick left, Chopstick right) {
super(id, left, right);
}
@Override
protected void eat() {
synchronized (GLOBAL_LOCK) {
left.pickup();
right.pickup();
//两根筷子同时拿起开始吃饭
doEat();
right.putDown();
left.putDown();
}
}
}
粗锁法有一个缺点是会导致资源的浪费,因为同一时刻只能有一个线程使用资源,也就是同一时刻只能有一个哲学家就餐。假设有5个哲学家,当有1个哲学家就餐时,还剩下3根筷子,其实还够一个哲学家就餐的。
锁排序法:相关线程使用全局统一的顺序去申请锁。假设有多个线程要去申请资源,则可以给这些资源排上序号,让这些线程按照序号从小到大或者从大到小去申请,则可以打破“循环等待资源”这个条件。哲学家就餐导致死锁的一个原因是哲学家必须先拿起自己左手的筷子,再拿起右手的筷子。我们给这些筷子排上序号,当一个哲学家就餐时,他必须先拿起序号小的筷子,再拿起序号大的筷子,而不是先左手再右手。
public class SortLockPhilosopherImpl extends AbsPhilosopher {
private Chopstick one;
private Chopstick theOther;
private static final Object GLOBAL_LOCK = "lock";
public SortLockPhilosopherImpl(String id, Chopstick left, Chopstick right) {
super(id, left, right);
//对资源进行排序
int leftHash = Objects.hash(left);
int rightHash = Objects.hash(right);
if (leftHash < rightHash) {
one = left;
theOther = right;
} else if (leftHash > rightHash) {
one = right;
theOther = left;
} else { //hash相同
one = null;
}
}
@Override
protected void eat() {
if (Objects.nonNull(one)){
synchronized (one){
one.pickup();
synchronized (theOther){
theOther.pickup();
eat();
theOther.putDown();
}
one.putDown();
}
}else{
synchronized (GLOBAL_LOCK) {
left.pickup();
right.pickup();
doEat();
right.putDown();
left.putDown();
}
}
}
}
使用ReentrantLock.tryLock(long,TimeUnit):为申请锁设置一个超时时间,在超时时间内,如果获取锁则返回true,没有则返回false。可以打破“占用并等待资源这个条件”。
public class TryLockPhilosopherImpl extends AbsPhilosopher {
private static Map<Chopstick, ReentrantLock> LOCK_MAP = new ConcurrentHashMap<>();
public TryLockPhilosopherImpl(String id, Chopstick left, Chopstick right) {
super(id, left, right);
LOCK_MAP.putIfAbsent(left, new ReentrantLock());
LOCK_MAP.putIfAbsent(right, new ReentrantLock());
}
@Override
protected void eat() {
if (pickUp(left) && pickUp(right)) {
try {
doEat();
} finally {
putDown(left);
putDown(right);
}
}
}
private boolean pickUp(Chopstick chopstick) {
ReentrantLock lock = LOCK_MAP.get(chopstick);
boolean tryLock = false;
try {
tryLock = lock.tryLock(30, TimeUnit.MILLISECONDS);
} catch (InterruptedException e) {
e.printStackTrace();
//当前线程拿起另一只筷子,使其放下
Chopstick theOther = chopstick == left ? right : left;
if (LOCK_MAP.get(theOther).isHeldByCurrentThread()) {
theOther.putDown();
LOCK_MAP.get(theOther).unlock();
}
return false;
}
if (tryLock) {
chopstick.pickup();
return true;
} else {
return false;
}
}
private void putDown(Chopstick chopstick) {
ReentrantLock lock = LOCK_MAP.get(chopstick);
chopstick.putDown();
lock.unlock();
}
}
死锁的恢复
以上是死锁的规避方法,但如果死锁已经产生了,需要具有恢复的手段。如果代码使用的是内部锁或者Lock.lock(),则无法恢复,只能重启虚拟机。但如果代码中使用的是Lock.lockInterruptibly()实现的,那是可以进行恢复的。
public class DeadLockDetector extends Thread {
static final ThreadMXBean threadMXBean = ManagementFactory.getThreadMXBean();
private final long monitorInterval;
public DeadLockDetector(long monitorInterval) {
super("DeadLockDetector");
setDaemon(true);
this.monitorInterval = monitorInterval;
}
public DeadLockDetector() {
this(2000);
}
public static ThreadInfo[] findDeadLockThreads() {
long[] deadlockedThreads = threadMXBean.findDeadlockedThreads();
return null == deadlockedThreads ? new ThreadInfo[0] : threadMXBean.getThreadInfo(deadlockedThreads);
}
public static Thread findThreadById(long threadId) {
for (Thread thread : Thread.getAllStackTraces().keySet()) {
if (thread.getId() == threadId) {
return thread;
}
}
return null;
}
public static boolean interruptThread(long threadId) {
Thread thread = findThreadById(threadId);
if (null != thread) {
thread.interrupt();
return true;
}
return false;
}
@Override
public void run() {
ThreadInfo[] threadInfos;
ThreadInfo threadInfo;
int i = 0;
for (; ; ) {
//检测系统中是否存在死锁
threadInfos = DeadLockDetector.findDeadLockThreads();
if (threadInfos.length > 0) {
//选取任意一个死锁的线程,给它发送中断请求
threadInfo = threadInfos[i++ % threadInfos.length];
DeadLockDetector.interruptThread(threadInfo.getThreadId());
continue;
} else {
i = 0;
System.out.println("No deadLock found");
}
try {
//每隔一段时间进行一次死锁的检测
Thread.sleep(monitorInterval);
} catch (InterruptedException e) {
e.printStackTrace();
}
}
}
}
大概思想是建立一个心跳线程,间断性的去检测jvm有没有死锁产生,有的话就发送一个中断请求。
所以把哲学家就餐中加锁的代码换成如下:
private boolean pickUp(Chopstick chopstick) {
ReentrantLock lock = LOCK_MAP.get(chopstick);
try {
lock.lockInterruptibly();
chopstick.pickup();
return true;
} catch (InterruptedException e) {
//当前线程拿起另一只筷子,使其放下
Chopstick theOther = chopstick == left ? right : left;
theOther.putDown();
LOCK_MAP.get(theOther).unlock();
return false;
} finally {
lock.unlock();
}
}
锁死
由于唤醒等待线程所需的条件永远无法成立,或者其他线程无法唤醒该线程而一直处于非运行状态,我们称这个线程被锁死。
锁死和死锁的最终结果很相似,都是线程无法运行。但二者产生的条件是不同的,死锁产生的条件在如上四个;锁死产生的条件分为信号丢失锁死和嵌套监视器锁死。
信号丢失锁死
信号丢失锁死是没有相应的通知线程来唤醒等待线程,从而使线程一直等待下去。最常见的例子就是线程执行wait()/notify()的时候,没有对保护条件进行判断,但先执行了nofity,再执行的wait,从而错过了线程唤醒,导致等待线程一直等待下去。错误的代码如下:
synchronized (someObject) {
//调用someObject来暂停当前线程
someObject.wait();
//其它线程执行了notify,使条件满足时,执行目标动作
doAction();
}
正确的代码如下:
//原子操作,在调用之前需要新获取内部锁
synchronized (someObject){
while(保护条件不成立){
//调用someObject来暂停当前线程
someObject.wait()
}
//其它线程执行了notify,使条件满足时,执行目标动作
doAction();
}
嵌套监视器锁死
嵌套监视器锁死是嵌套锁导致等待线程永远无法被唤醒的一种活性障碍。
如上情况,左边是等待线程,右边是通知线程。当语句2准备执行的时候,语句1已经在执行了,从而导致通知线程被阻塞,当等待线程执行完语句1之后,并不会释放monitorX,继续执行到monitorY.wait()处,从而使等待线程阻塞,但是因为等待线程没有释放monitorX资源,导致通知线程也无法执行monitorY.notifyAll(),因此等待线程会一直等待下去。这种由于嵌套锁导致通知线程始终无法唤醒等待线程的活性障碍被称为嵌套监视器锁死。
线程饥饿
线程饥饿指的是线程一直无法获得其所需的资源而导致任务一直无法进展的一种活性障碍。
产生线程饥饿的一般条件都是非公平锁的使用。死锁也属于线程饥饿,因为死锁也是因为某种情况从而导致无法获取资源。导致线程饥饿的条件仅仅是无法获取资源,比如显示锁(非公平锁)的使用,但这并不意味会导致死锁。
活锁
没有阻塞当前线程申请资源,但申请资源一直未成功,也就是当前线程一直处于做无用功的状态,这是活锁。----屡战屡败,屡败屡战。