J.U.C并发工具集实战及原理分析

news2024/9/21 14:48:00

​在J.U.C里提供了很多的并发控制工具类,这些工具类可以使得线程按照业务的某种约束来执行。本节包含CountDownLatch、Semaphore、CyclicBarrier等工具类。目的是了解他们基本使用、原理及实际应用。

1. CountDownLatch主题

1.1 CountDownLatch简介

CountDownLatch是一个线程同步工具,它允许一个或多个线程一直处于等待状态,直到其他线程执行结束。

从名字上看CountDown是倒数的意思,类似于一个倒计时的概念,而CountDownLatch本身的作用也是如此,它提供了两个核心方法。

  • countdown()方法,对计数器进行递减。
  • await()方法,使调用该方法的线程进入等待状态。

CountDownLatch在构造的时候需要传递的是一个正整数,现成每调用一次countdown()方法,都会对该正整数减1。当计数器为0时,会唤醒所有处于await()方法阻塞线程。

1.1.1 CountDownLatch的基本使用
/**
 * 构建一个倒计时为2的CountDownLatch实例
 * 定义2个线程调用,主方法负责阻塞,当计数器为0唤醒
 * 
 */
public class CountDownLatchExample {

    public static void main(String[] args) throws InterruptedException {
        CountDownLatch countDownLatch = new CountDownLatch(2);
        new Thread(new RelationService(countDownLatch),"t1").start();
        new Thread(new RelationService(countDownLatch),"t2").start();
        countDownLatch.await();
    }

    static class RelationService implements Runnable{

        private CountDownLatch countDownLatch;

        public RelationService(CountDownLatch countDownLatch) {
            this.countDownLatch = countDownLatch;
        }

        @Override
        public void run() {
            System.out.println(Thread.currentThread().getName()+"已经完成");
            countDownLatch.countDown();
        }
    }

}
1.1.2 CountDownLatch的实际落地

假设一个场景:当我们启动一个应用的时候,希望能够检查依赖的第三方服务是否运行正常,一旦依赖的服务没有启动,那么当前应用在启动时需要等待。

1.2 CountDownLatch底层原理

在这里插入图片描述

// 创建一个CountDownLatch实例
CountDownLatch countDownLatch = new CountDownLatch(2);
// count不允许为负数
public CountDownLatch(int count) {
    if (count < 0) throw new IllegalArgumentException("count < 0");
    this.sync = new Sync(count);
}
// 给state赋值
Sync(int count) {
    setState(count);
}
protected final void setState(int newState) {
    state = newState;
}
1.2.1 await底层原理
public void await() throws InterruptedException {
    // acquireSharedInterruptibly是AbstractQueuedSynchronizer中共享锁的获取方法,该方法以Interruptibly结尾说明await()方法允许被interrupt()中断。在	        //  acquireSharedInterruptibly()中,先通过tryAcquireShared()判断返回结果
    sync.acquireSharedInterruptibly(1);
}
public final void acquireSharedInterruptibly(int arg)
        throws InterruptedException {
    // 如果线程有中断标识,则直接抛出异常即可。 (interrupted:返回当前的中断标识,并且复位中断标识)
    if (Thread.interrupted())
        throw new InterruptedException();
    // 如果小于0,则说明有线程处理,需要调用doAcquireSharedInterruptibly()方法进行阻塞;
    // 如果不小于0,则说明没有线程处理,不需要做任何事
    if (tryAcquireShared(arg) < 0)
        
        doAcquireSharedInterruptibly(arg);
}
// 返回当前的中断标识,并且复位中断标识
public static boolean interrupted() {
    // 参数传递的是是否复位,如果为true代表着可复位;false代表不可复位。
    // JVM里中断标识是通过true和false来判断的,默认为false表示无中断标识
    // 如果参数传递为可复位即此方法传递true,则将JVM层中断标识改为false。
    return currentThread().isInterrupted(true);
}
protected int tryAcquireShared(int acquires) {
    // 获取当前的锁的状态,如果当前状态为0,则返回1;不为0返回-1.
    return (getState() == 0) ? 1 : -1;
}
private void doAcquireSharedInterruptibly(int arg)
    throws InterruptedException {
    
    // 添加新的Node节点时,维护双向链表
    final Node node = addWaiter(Node.SHARED);
    
    boolean failed = true;
    try {
        for (;;) {
            final Node p = node.predecessor();
            if (p == head) {
                // 根据判断的结果获取锁
                int r = tryAcquireShared(arg);
                // 如果r<0,这块是不走的
                if (r >= 0) {
                    setHeadAndPropagate(node, r);
                    p.next = null; // help GC
                    failed = false;
                    return;
                }
            }
            // 阻塞线程
            if (shouldParkAfterFailedAcquire(p, node) &&
                parkAndCheckInterrupt())
                throw new InterruptedException();
        }
    } finally {
        if (failed)
            cancelAcquire(node);
    }
}
// 添加新的Node节点时,维护双向链表
private Node addWaiter(Node mode) {
    // 创建了一个Node.SHARED节点
    Node node = new Node(Thread.currentThread(), mode);
    
    Node pred = tail;
    if (pred != null) {
        node.prev = pred;
        if (compareAndSetTail(pred, node)) {
            pred.next = node;
            return node;
        }
    }
    enq(node);
    return node;
}
// 构造一个双向链表,通过尾插法
private Node enq(final Node node) {
    for (;;) {
        Node t = tail;
        if (t == null) { // Must initialize
            if (compareAndSetHead(new Node()))
                tail = head;
        } else {
            node.prev = t;
            if (compareAndSetTail(t, node)) {
                t.next = node;
                return t;
            }
        }
    }
}
1.2.2 countDown底层原理
public void countDown() {
    sync.releaseShared(1);
}
public final boolean releaseShared(int arg) {
    // 尝试释放锁(递减共享信号)
    if (tryReleaseShared(arg)) {
        // 
        doReleaseShared();
        return true;
    }
    return false;
}
protected boolean tryReleaseShared(int releases) {
    // 通过自旋判断
    for (;;) {
        // 获取对应的状态值
        int c = getState();
        // 如果这个状态已经为0,说明锁已经释放完,无需处理
        if (c == 0)
            return false;
        // 做减一的动作
        int nextc = c-1;
        // 通过CAS去更新state的状态
        if (compareAndSetState(c, nextc))
            // 如果返回true,则可以进行锁的释放
            return nextc == 0;
    }
}
protected final int getState() {
    return state;
}
private void doReleaseShared() {

    for (;;) {
        Node h = head;
        if (h != null && h != tail) {
            int ws = h.waitStatus;
            // Node.SIGNAL 代表着当前节点往后的节点都是阻塞状态,可以唤醒的
            if (ws == Node.SIGNAL) {
                
                if (!compareAndSetWaitStatus(h, Node.SIGNAL, 0))
                    continue;            // loop to recheck cases
                // 解除挂起状态
                unparkSuccessor(h);
            }
            // 如果ws为初始状态,
            else if (ws == 0 &&
                     // 修改ws状态为PROPAGATE(-3)
                     !compareAndSetWaitStatus(h, 0, Node.PROPAGATE))
                continue;                // loop on failed CAS
        }
        if (h == head)                   // loop if head changed
            break;
    }
}
/**
 * node = head
 *
 */
private void unparkSuccessor(Node node) {

    // 获取到head的waitStatus
    int ws = node.waitStatus;
    // 如果ws 小于0 更新waitStatus状态为初始状态
    if (ws < 0)
        compareAndSetWaitStatus(node, ws, 0);

    // 获取head节点的下一个节点s
    Node s = node.next;
    
    // 可能存在两种情况 1.s不存在,队列不存在。2. waitStatus > 0, 这个代表取消状态
    if (s == null || s.waitStatus > 0) {
        
        s = null;
        // 从尾部往前查找一直找到距离头部最近的那个waitStatus的节点。
        // 从尾部往前查找而不是从前往后查找的原因是因为,避免在多线程竞争激烈的情况下,尾部一直添加导致死循环问题
        for (Node t = tail; t != null && t != node; t = t.prev)
            if (t.waitStatus <= 0)
                s = t;
    }
    // 把挂起的线程唤醒(解除挂起状态)
    if (s != null)
        LockSupport.unpark(s.thread);
}

线程是怎么回收的?

2. Semaphore主题

2.1 Semaphore简介

Semaphore方法核心是一个许可证管理。通过acquire()方法获取一个许可证,通过release()方法释放一个许可证,实际上并没有一个真实令牌发放给线程。只是维护一个可分配数量进行计数维护。

2.1.1 Semaphore方法

Semaphore一共6个方法:

  • Semaphore(permits),permits表示令牌数,默认非公平锁实现。
  • Semaphore(permits,fair),permits表示令牌数,fair表示公平性,也就是说在令牌被释放的临界点是否允许提前抢占到令牌。
  • acquire(permits),获取指定permits数量的令牌,如果许可证数量不足,则会阻塞当前线程。
  • tryAcquire(permits),尝试获取指定permits数量的令牌,此过程是非阻塞的,如果令牌数不够,直接返回false,否则返回true。
  • release(permits),释放指定permits数量的令牌。
  • drainPermits(),当前线程获得剩下的所有可用令牌。
  • hasQueuedThread(),判断当前Semaphore实例上是否存在正在等待令牌的线程。
2.1.2 Semaphore基本使用

Semaphore常见的应用场景就是实现线程之间的限流,或者限制某些共享资源的访问数量。

public class SemaphoreExample {

    public static void main(String[] args) {
        Semaphore semaphore = new Semaphore(2);
        ExecutorService executorService = Executors.newCachedThreadPool();
        for (int i = 0; i < 1000; i++) {
            executorService.execute(new SomeTask(semaphore));
        }
        executorService.shutdown();
    }

    static class SomeTask implements Runnable{

        private Semaphore semaphore;

        public SomeTask(Semaphore semaphore) {
            this.semaphore = semaphore;
        }

        @Override
        public void run() {
            try {
                semaphore.acquire();
                System.out.println(Thread.currentThread().getName()+"获得一个令牌");
                TimeUnit.SECONDS.sleep(1);

            } catch (InterruptedException e) {
                e.printStackTrace();
            }finally {
                System.out.println(Thread.currentThread().getName()+"释放一个令牌");
                semaphore.release();
            }

        }
    }
}
2.2 Semaphore底层原理

Semaphore实际上也是基于AQS中的共享锁来实现的,因为Semaphore中允许多个线程获得令牌被唤醒。

猜想:基于AQS的实现,在构建Semaphore实例时传递permits。实际上还是改变了AQS里的state属性,假设初始化permits=5,每次调用完release()方法,针对与state进行递减。

2.2.1 Semaphore初始化底层原理
// 实例化一个Semaphore对象
Semaphore semaphore = new Semaphore(2);

public Semaphore(int permits) {
    // 默认还是非公平的实现
    sync = new NonfairSync(permits);
}

// 可选择是初始化公平锁还是非公平锁
public Semaphore(int permits, boolean fair) {
    sync = fair ? new FairSync(permits) : new NonfairSync(permits);
}
static final class NonfairSync extends Sync {
    private static final long serialVersionUID = -2694183684443567898L;
	// 默认非公平实现,调用父类方法
    NonfairSync(int permits) {
        super(permits);
    }

    // 尝试获取共享锁 -acquires = 1;
    protected int tryAcquireShared(int acquires) {
        // 获取令牌,返回剩余可用令牌数
        return nonfairTryAcquireShared(acquires);
    }
}
abstract static class Sync extends AbstractQueuedSynchronizer {
    private static final long serialVersionUID = 1192457210091910933L;
	
    // 给state设置初始值
    Sync(int permits) {
        setState(permits);
    }
    
    final int getPermits() {
        return getState();
    }

    // 获取令牌,返回剩余可用令牌数
    final int nonfairTryAcquireShared(int acquires) {
        // 自旋
        for (;;) {
            // 获取当前state的值
            int available = getState();
            // 当前剩余的令牌数
            int remaining = available - acquires;
            // 如果剩余的令牌数小于0,或者获得令牌成功,直接将剩余的令牌数返回
            if (remaining < 0 ||
                compareAndSetState(available, remaining))
                return remaining;
        }
    }

    protected final boolean tryReleaseShared(int releases) {
        for (;;) {
            int current = getState();
            int next = current + releases;
            if (next < current) // overflow
                throw new Error("Maximum permit count exceeded");
            if (compareAndSetState(current, next))
                return true;
        }
    }

    final void reducePermits(int reductions) {
        for (;;) {
            int current = getState();
            int next = current - reductions;
            if (next > current) // underflow
                throw new Error("Permit count underflow");
            if (compareAndSetState(current, next))
                return;
        }
    }

    final int drainPermits() {
        for (;;) {
            int current = getState();
            if (current == 0 || compareAndSetState(current, 0))
                return current;
        }
    }
}
// 计数器的值
private volatile int state;

//给state设置初始值
protected final void setState(int newState) {
    state = newState;
}

// 获取state的值
protected final int getState() {
    return state;
}

线程怎么回收?

这个暴力stop == kill -9 非暴力的Interrupt

ObjectMonitor

JVM 层面 exit()

先判断这个线程有没有被中断

如果已经是可以安全退出的状态,kill -9 这个时候会抛出异常 -JVM层面吃掉所有的异常。

如果这个线程不可安全退出,调用en_join(join方法),kill -9 这个时候会抛出异常 -JVM层面吃掉所有的异常。

2.2.2 acquire底层原理
semaphore.acquire();

public void acquire() throws InterruptedException {
    // 获取可中断的共享锁
    sync.acquireSharedInterruptibly(1);
}

public final void acquireSharedInterruptibly(int arg)
        throws InterruptedException {
    // 如果线程已经被中断,然后直接抛出中断异常
    if (Thread.interrupted())
        throw new InterruptedException();
    // 获取令牌,返回剩余可用令牌数
    if (tryAcquireShared(arg) < 0)
        // 如果当前剩余的可用令牌数小于0,则进行...
        doAcquireSharedInterruptibly(arg);
}
// 
private void doAcquireSharedInterruptibly(int arg)
    throws InterruptedException {
    // 添加一个共享状态的等待队列
    final Node node = addWaiter(Node.SHARED);
    
    boolean failed = true;
    try {
        // 自旋
        for (;;) {
            // 获取到前一个节点
            final Node p = node.predecessor();
            if (p == head) {
                // 返回当前剩余可用令牌数
                int r = tryAcquireShared(arg);
                
                if (r >= 0) {
                    setHeadAndPropagate(node, r);
                    p.next = null; // help GC
                    failed = false;
                    return;
                }
            }
            if (shouldParkAfterFailedAcquire(p, node) &&
                parkAndCheckInterrupt())
                throw new InterruptedException();
        }
    } finally {
        if (failed)
            cancelAcquire(node);
    }
}
private Node addWaiter(Node mode) {
    Node node = new Node(Thread.currentThread(), mode);
    // Try the fast path of enq; backup to full enq on failure
    Node pred = tail;
    if (pred != null) {
        node.prev = pred;
        if (compareAndSetTail(pred, node)) {
            pred.next = node;
            return node;
        }
    }
    enq(node);
    return node;
}
private Node enq(final Node node) {
    for (;;) {
        Node t = tail;
        if (t == null) { // Must initialize
            if (compareAndSetHead(new Node()))
                tail = head;
        } else {
            node.prev = t;
            if (compareAndSetTail(t, node)) {
                t.next = node;
                return t;
            }
        }
    }
}
// 获取前一个节点
final Node predecessor() throws NullPointerException {
    // 获取前一个节点
    Node p = prev;
    
    if (p == null)
        throw new NullPointerException();
    else
        return p;
}
// node:当前节点 propagate:可以令牌数
private void setHeadAndPropagate(Node node, int propagate) {
    // 头结点
    Node h = head; 
    
    setHead(node);

    if (propagate > 0 || h == null || h.waitStatus < 0 ||
        (h = head) == null || h.waitStatus < 0) {
        Node s = node.next;
        if (s == null || s.isShared())
            doReleaseShared();
    }
}
// 释放共享锁
private void doReleaseShared() {

    for (;;) {
        Node h = head;
        if (h != null && h != tail) {
            int ws = h.waitStatus;
            if (ws == Node.SIGNAL) {
                if (!compareAndSetWaitStatus(h, Node.SIGNAL, 0))
                    continue;            // loop to recheck cases
                unparkSuccessor(h);
            }
            else if (ws == 0 &&
                     !compareAndSetWaitStatus(h, 0, Node.PROPAGATE))
                continue;                // loop on failed CAS
        }
        if (h == head)                   // loop if head changed
            break;
    }
}
public void release() {
    sync.releaseShared(1);
}

// permits 设置 Integer.MAX_VALUE 释放方法  可以改变state数量大小  即 可以改变令牌数
public void release(int permits) {
    if (permits < 0) throw new IllegalArgumentException();
    sync.releaseShared(permits);
}
public final boolean releaseShared(int arg) {
    // 释放令牌桶
    if (tryReleaseShared(arg)) {
        
        doReleaseShared();
        return true;
    }
    return false;
}
// 改变令牌桶大小
protected final boolean tryReleaseShared(int releases) {
    // 自旋
    for (;;) {
        // 获取当前的state
        int current = getState();
        // 当前的state+1等于下一个next
        int next = current + releases;
        // 当添加到超过Integer.MAX_VALUE时,next为负数,此条件成立
        if (next < current) // overflow
            throw new Error("Maximum permit count exceeded");
        
        if (compareAndSetState(current, next))
            return true;
    }
}
// 减少令牌桶的操作
protected void reducePermits(int reduction) {
    if (reduction < 0) throw new IllegalArgumentException();
    sync.reducePermits(reduction);
}
final void reducePermits(int reductions) {
    for (;;) {
        int current = getState();
        int next = current - reductions;
        if (next > current) // underflow
            throw new Error("Permit count underflow");
        if (compareAndSetState(current, next))
            return;
    }
}
// 唤醒当前线程 主要调用unpark
private void doReleaseShared() {

    for (;;) {
        Node h = head;
        if (h != null && h != tail) {
            int ws = h.waitStatus;
            if (ws == Node.SIGNAL) {
                if (!compareAndSetWaitStatus(h, Node.SIGNAL, 0))
                    continue;            // loop to recheck cases
                unparkSuccessor(h);
            }
            else if (ws == 0 &&
                     !compareAndSetWaitStatus(h, 0, Node.PROPAGATE))
                continue;                // loop on failed CAS
        }
        if (h == head)                   // loop if head changed
            break;
    }
}
纵观整个AQS的源码,只有在doReleaseShared方法中具体用到了PROPAGATE这个状态,在其他地方都是没有显式用到的,那么可能就会对这个状态存在的意义有些许质疑了。其实在早期版本的AQS源码中是没有PROPAGATE这个状态的,之所以要引入它是为了解决一个bug(JDK-6801020)

    时刻1:t3调用release->releaseShared->tryReleaseShared,将state+1变为1,同时发现此时的head节点不为null并且waitStatus为-1,于是继续调用unparkSuccessor方法,在该方法中会将head的waitStatus改为0;

    时刻2:t1被上面t3调用的unparkSuccessor方法所唤醒,调用了tryAcquireShared,将state-1又变为了0。注意,此时还没有调用接下来的setHeadAndPropagate方法;

    时刻3:t4调用release->releaseShared->tryReleaseShared,将state+1变为1,同时发现此时的head节点虽然不为null,但是waitStatus为0,所以就不会执行unparkSuccessor方法;

    时刻4:t1执行setHeadAndPropagate->setHead,将头节点置为自己。但在此时propagate也就是剩余的state已经为0了(propagate是在时刻2时通过传参的方式传进来的,那个时候-1后剩余的state是0),所以也不会执行unparkSuccessor方法。

3. CyclicBarrier主题

3.1 CyclicBarrier简介

CyclicBarrier的字面意思是可循环(Cyclic)使用的屏障(Barrier),它的主要作用是让一组线程到达一个屏障(也可以叫做同步点)时被阻塞,直到最后一个线程到达屏障时,屏障才会打开,所有被屏障拦截的线程才会继续往下执行,线程进入屏障是通过CyclicBarrier的await()方法实现的。

3.1.1 CyclicBarrier方法
3.1.2 CyclicBarrier基本使用
public class CyclicBarrierExample {

    public static void main(String[] args) {
        int parties = 4;
        CyclicBarrier barrier = new CyclicBarrier(parties,()->{
            System.out.println("所有的线程执行完毕,继续处理其他任务");
        });

        for (int i = 0; i < parties; i++) {
            new ImportDataTask(barrier).start();
        }
    }


    static class ImportDataTask extends Thread {

        private CyclicBarrier cyclicBarrier;

        public ImportDataTask(CyclicBarrier cyclicBarrier) {
            this.cyclicBarrier = cyclicBarrier;
        }

        @Override
        public void run() {
            try {
                TimeUnit.SECONDS.sleep(1);
                System.out.println(Thread.currentThread().getName() + "数据导入完毕,等待其他线程");
                cyclicBarrier.await();

            } catch (InterruptedException | BrokenBarrierException e) {
                e.printStackTrace();
            }
        }
    }
}
3.2 CyclicBarrier底层原理

在这里插入图片描述

3.2.1 CyclicBarrier实现原理及源码

CyclicBarrier包含两个层面的意思,第一个层面就是前面描述的屏障点,线程调用await()方法都会阻塞在屏障点,直到所有线程都到达屏障点再放行。第二个层面是Cyclic循环,当所有的线程通过屏障点之后,又可以进入下一轮的屏障点进行等待,可以不断地循环。

在CyclicBarrier中定义了两个int类型的变量,分别是parties和count,这两个变量的作用如下:

  • parties:表示每次要求到达屏障点的线程数,只有满足指定数量的线程,所有线程才会被唤醒。
  • count:用来实现内部的计数器,初始值就是parties,后续在每个线程调用await()方法时,会对count减1,当count为0的时候会唤醒所有的线程。
private static class Generation {
    // 默认给值为false
    boolean broken = false;
}

// 全局定义一把锁
private final ReentrantLock lock = new ReentrantLock();

// 条件等待/唤醒
private final Condition trip = lock.newCondition();
// 参与的线程数量
private final int parties;
/* The command to run when tripped */
private final Runnable barrierCommand;

// 静态内部类Generation,表示对象代表栅栏的当前的generation(代),每次当所有线程通过屏障点(栅栏)后,表示当前generation已经过去了,会进入下一个generation,CyclicBarrier利用它来实现循环等待
private Generation generation = new Generation();

// 内部计数器
private int count;
// 实例化CyclicBarrier 传入一个parties ,一个线程Runnable
public CyclicBarrier(int parties, Runnable barrierAction) {
    // 传入parties如果小于等于0,则报错:非法参数异常
    if (parties <= 0) throw new IllegalArgumentException();
    // 初始化变量
    this.parties = parties;
    this.count = parties;
    this.barrierCommand = barrierAction;
}
3.2.2 await()方法
public int await() throws InterruptedException, BrokenBarrierException {
    try {
        // 调用dowait()
        return dowait(false, 0L);
    } catch (TimeoutException toe) {
        throw new Error(toe); // cannot happen
    }
}

// 自定义超时时间和单位的等待
public int await(long timeout, TimeUnit unit)
    throws InterruptedException,
           BrokenBarrierException,
           TimeoutException {
    return dowait(true, unit.toNanos(timeout));
}
// 默认情况下,调用的是timed=false,nanos=0;
private int dowait(boolean timed, long nanos)
    throws InterruptedException, BrokenBarrierException,
           TimeoutException {
               
    final ReentrantLock lock = this.lock;
    lock.lock();
    try {
        final Generation g = generation;

        // 确认当前generation的barrier是否失效
        if (g.broken)
            throw new BrokenBarrierException();

        // 如果线程是否中断过,如果是则停止barrier
        if (Thread.interrupted()) {
            breakBarrier();
            throw new InterruptedException();
        }

        // 统计当前已经到达当前generation的线程数量
        int index = --count;
        // 如果为0,则表示所有线程都到达了屏障点
        if (index == 0) {  
            boolean ranAction = false;
            try {
                // 初始化的时候传入的线程实例
                final Runnable command = barrierCommand;
                // 如果传入的不为空,直接触发回调
                if (command != null)
                    command.run();
                // 表示执行结束,可以进行下一个屏障周期
                ranAction = true;
                // 执行结束,可以进入下一个屏障周期
                nextGeneration();
                return 0;
            } finally {
                // 如果还是false,则证明中间出了问题,则直接调用breakBarrier();
                if (!ranAction)
                    breakBarrier();
            }
        }

        // 自旋
        for (;;) {
            try {
                // 是否带有等待超时时间,如果没有,直接调用await()阻塞当前线程
                if (!timed)
                    trip.await();
                // 采用带有超时时间的等待机制
                else if (nanos > 0L)
                    nanos = trip.awaitNanos(nanos);
                // 被其他线程通过Interrupt()方法唤醒
            } catch (InterruptedException ie) {
                // 当前的generation没有被broken,则让栅栏失效被抛出异常
                if (g == generation && ! g.broken) {
                    breakBarrier();
                    throw ie;
                } else {
                    // We're about to finish waiting even if we had not
                    // been interrupted, so this interrupt is deemed to
                    // "belong" to subsequent execution.
                    Thread.currentThread().interrupt();
                }
            }

           // 当有任何一个线程被中断时,都会调用breakBarrier()方法,而在该方法中会唤醒所有处于await()阻塞状态下的线程
            // 如果其他线程被唤醒,那么也需要抛出异常
            if (g.broken)
                throw new BrokenBarrierException();

           // 如果被唤醒的线程的generation和当前的generation不同,不做任何处理
            if (g != generation)
                return index;

            // 如果在等待超时之后被唤醒,说明还有线程没有到达此屏障点,则让栅栏失效
            if (timed && nanos <= 0L) {
                breakBarrier();
                throw new TimeoutException();
            }
        }
    } finally {
        // 释放锁
        lock.unlock();
    }
}
// 1.把generation,broken置为true,下次进入栅栏直接抛出异常
// 2.将count重置为初始状态(即parties状态)
// 3.唤醒所有的线程
private void breakBarrier() {
    generation.broken = true;
    count = parties;
    trip.signalAll();
}
// 重置代,进入下一个栅栏周期
private void nextGeneration() {
    
    trip.signalAll();
    
    count = parties;
    
    generation = new Generation();
}

4. 总结

分析了CountDownLatch、Semaphore、CyclicBarrier的使用及实现原理。CountDownLatch、Semaphore通过AbstractQueuedSynchroizer的共享锁实现。CyclicBarrier,通过ReentrantLock和Condition实现一个具有循环周期的栅栏机制。

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.coloradmin.cn/o/2096538.html

如若内容造成侵权/违法违规/事实不符,请联系多彩编程网进行投诉反馈,一经查实,立即删除!

相关文章

ShardingSphere-JDBC实现数据加解密

一、什么是ShardingSphere&#xff1f; ShardingSphere定位为轻量级 Java 框架&#xff0c;在 Java 的 JDBC 层提供的额外服务。 它使用客户端直连数据库&#xff0c;以 jar 包形式提供服务&#xff0c;无需额外部署和依赖&#xff0c;可理解为增强版的 JDBC 驱动&#xff0c;完…

最优化理论(一)

什么是最优化问题 最优化问题是决策问题&#xff0c;选择一下可以执行的策略来使得目标最优。 一个最优化问题包括&#xff1a; 决策变量一个或多个目标函数一个由科兴策略组成的集合&#xff0c;可由等式或者不等式刻画 最优化问题的基本形式&#xff1a; 最优化问题的分类…

upload-labs通关攻略

Pass-1 这里上传php文件说不允许上传 然后咱们开启抓包将png文件改为php文件 放包回去成功上传 Pass-2 进来查看提示说对mime进行检查 抓包把这里改为image/jpg; 放包回去就上传成功了 Pass-3 这里上传php文件它说不允许上传这些后缀的文件 那咱们就可以改它的后缀名来绕过…

Guitar Pro 8.2.1 Build 32+Soundbanks Win/Mac音色库 开心激活版 音乐软件Guitar Pro 8中文破解版

音乐软件Guitar Pro 8中文破解版是一个受吉他手喜爱的吉他和弦、六线谱、BASS 四线谱绘制、打印、查看、试听软件&#xff0c;它也是一款优秀的 MIDI 音序器&#xff0c;MIDI 制作辅助工具&#xff0c;可以输出标准格式的 MIDI。GP 的过人之处就在于它可以直接用鼠标和键盘按标…

过滤器 与 拦截器

文章目录 过滤器 与 拦截器一、过滤器&#xff08;Filter&#xff09;1、特点2、生命周期3、实现4、过滤器链1&#xff09;配置 order2&#xff09;执行顺序 二、拦截器 Inteceptor1、特点2、生命周期3、实现4、拦截器链1&#xff09;配置 order2&#xff09;执行顺序&#xff…

【Linux】保姆级 Linux 常见命令使用

&#x1f970;&#x1f970;&#x1f970;来都来了&#xff0c;不妨点个关注叭&#xff01; &#x1f449;博客主页&#xff1a;欢迎各位大佬!&#x1f448; 文章目录 1. Linux 是什么1.1 Linux 是什么1.2 关于 Linux 我们需要学什么 2. 需提前准备的东西2.1 环境 —— 如何获取…

关于PowerDesigner的使用

1.PowerDesigner概述&#xff1a; 1.PowerDesigner是一款开发人员常用的数据库建模工具&#xff0c;用户利用该软件可以方便地制作 数据流程图、概念数据模型 、 物理数据模型 &#xff0c;它几乎包括了数据库模型设计的全过程&#xff0c;是Sybase公司为企业建模和设计提供的…

蓝色炫酷碎粒子HTML5导航源码

源码介绍 蓝色炫酷碎粒子HTML5导航源码&#xff0c;源码由HTMLCSSJS组成&#xff0c;记事本打开源码文件可以进行内容文字之类的修改&#xff0c;双击html文件可以本地运行效果&#xff0c;也可以上传到服务器里面&#xff0c;重定向这个界面 效果预览 源码获取 蓝色炫酷碎粒…

火焰传感器详解(STM32)

目录 一、介绍 二、传感器原理 1.原理图 2.引脚描述 三、程序设计 main.c文件 IR.h文件 IR.c文件 四、实验效果 五、资料获取 项目分享 一、介绍 火焰传感器是一种常用于检测火焰或特定波长&#xff08;760nm-1100nm&#xff09;红外光的传感器。探测角度60左右&am…

MQTT学习:MQTT vs AMQP,mosquitto安装,调试工具mqttfx mqttx

前言 物联网vs互联网? 数据量/数据源:物联网的数据多是设备的自动采集,其数量远远超过互联网,互联网的数据更多是人工生成的 MQTT 协议(Message Queuing Telemetry Transport)vs AMQP 协议(Advanced Message Queuing Protocol)是两种在物联网中广泛使用的协议。 物联网…

推荐一款灵活,可靠和快速的开源分布式任务调度平台

今天给大家推荐一款灵活&#xff0c;可靠和快速的开源分布式任务调度平台——SnailJob。 前言 什么是任务调度&#xff1f; 任务调度&#xff0c;是指在多任务的环境下&#xff0c;合理地分配系统资源&#xff0c;调度各个任务在什么时候&#xff0c;由哪一个处理器处理&…

【简单】 猿人学web第一届 第15题 备周则意怠,常见则不疑

数据接口分析 数据接口 https://match.yuanrenxue.cn/api/match/15 请求时需要携带 page 页码&#xff0c;m为加密参数 cookie中没有加密信息&#xff0c;携带 SessionId请求即可 加密参数还原 查看数据接口对应的 requests 栈 m参数 是通过 window.m() 方法执行后得到的 打上…

分布式系统中的Dapper与Twitter Zipkin:链路追踪技术的实现与应用

目录 一、什么是链路追踪&#xff1f; 二、核心思想Dapper &#xff08;一&#xff09;Dapper链路追踪基本概念概要 &#xff08;二&#xff09;Trace、Span、Annotations Trace Span Annotation 案例说明 &#xff08;三&#xff09;带内数据与带外数据 带外数据 带…

『 C++ 』多线程相关

文章目录 极短临界区互斥锁的短板原子操作类 atomicatomic 原子操作原理 CASCAS 操作解决多线程创建链表的节点丢失问题多线程下的 shared_ptr 智能指针最简单的单例模式 极短临界区互斥锁的短板 如果两个线程同时对一个共享资源变量x进行自增操作将会出现线程安全问题,这个线程…

官方宣布Navicat免费使用!

官方宣布Navicat免费使用&#xff01; 对于开发者和数据库管理员来说&#xff0c;Navicat一直是不可或缺的工具之一。官方宣布Navicat可以免费使用&#xff0c;这无疑是个令人振奋的消息&#xff01;虽然是精简版&#xff0c;但足够日常使用。文末有下载链接。 无论你是管理M…

Linux 文件接口和文件管理

目录 一、回顾c语言文件操作 二、系统调用的文件操作 系统调用文件接口 open&#xff1a; close&#xff1a; write&#xff1a; 代码测试&#xff1a; ​编辑 ​编辑 read&#xff1a; 语言和系统函数间的关系&#xff1a; flags的实现思路 三、OS内文件的管理 语…

时序预测 | 基于MAMbaS+transformer时间序列预测模型(pytorch)

目录 效果一览基本介绍程序设计参考资料 效果一览 基本介绍 MAMBAS,transformer python代码&#xff0c;pytorch架构 可以发刊&#xff0c;先发先的&#xff0c;高精度代码。 需知&#xff1a;好的创新性模型可以事半功倍。。 适合功率预测&#xff0c;风电光伏预测&#xff0…

ubuntu通过smba访问华为设备

文章目录 ubuntu通过smba访问华为设备华为设备设置ubuntu设置访问测试 ubuntu通过smba访问华为设备 华为设备设置 华为设备在华为分享一栏下有共享至电脑的选项&#xff0c;打开即可&#xff0c;这里会创建用户名和密码进入设置 -> 关于手机/平板电脑 -> 状态信息&…

Android 10.0 mtk平板camera2横屏预览旋转90度功能实现

1.前言 在10.0的系统rom定制化开发中,在进行一些平板等默认横屏的设备开发的过程中,需要在进入camera2的 时候,默认预览图像也是需要横屏显示的,所以就需要看下mtk的camera2的相关预览功能,然后看下进入 launcher camera的时候看下如何实现预览横屏显示 如图所示: 2.mtk平…

【Linux】文件魔法师:时间与日历的解密(8/15完成)

欢迎来到 CILMY23 的博客 &#x1f3c6;本篇主题为&#xff1a;文件魔法师&#xff1a;时间与日历的解密 &#x1f3c6;个人主页&#xff1a;CILMY23-CSDN博客 &#x1f3c6;系列专栏&#xff1a;Python | C | C语言 | 数据结构与算法 | 贪心算法 | Linux | 算法专题 | 代码…