并发专题(7)之JUC并发工具源码分析

news2025/1/9 14:31:18

一、CountdownLatch源码分析

1.1 CountdownLatch应用

        CountDownLatch本身就好像一个计数器,可以让一个线程或多个线程等待其他线程完成后再执行。

public static void main(String[] args) throws InterruptedException, BrokenBarrierException {
    // 声明CountDownLatch,有参构造传入的值,会赋值给state,CountDownLatch基于AQS实现
    // 3 - 1 = 2 - 1 = 1 - 1
    CountDownLatch countDownLatch = new CountDownLatch(3);

    new Thread(() -> {
        System.out.println("111");
        countDownLatch.countDown();
    }).start();

    new Thread(() -> {
        System.out.println("222");
        countDownLatch.countDown();
    }).start();

    new Thread(() -> {
        try {
            Thread.sleep(1000);
        } catch (InterruptedException e) {
            e.printStackTrace();
        }
        System.out.println("333");
        countDownLatch.countDown();
    }).start();

    // 主线会阻塞在这个位置,直到CountDownLatch的state变为0
    countDownLatch.await();
    System.out.println("main");
}

1.2 CountdownLatch构造函数源码剖析 

// CountDownLatch 的有参构造
public CountDownLatch(int count) {
    // 健壮性校验
    if (count < 0) throw new IllegalArgumentException("count < 0");
    // 构建Sync给AQS的state赋值
    this.sync = new Sync(count);
}

1.3 countDown源码剖析 

// countDown本质就是调用了AQS的释放共享锁操作
public void countDown() {
    sync.releaseShared(1);
}
public final boolean releaseShared(int arg) {
    if (tryReleaseShared(arg)) {
        // 唤醒在AQS中等待的线程
        doReleaseShared();
        return true;
    }
    return false;
}

protected boolean tryReleaseShared(int releases) {
    // Decrement count; signal when transition to zero
    for (;;) {
        int c = getState();
        if (c == 0)
            return false;
        int nextc = c-1;
        // 通过CAS修改state的值
        if (compareAndSetState(c, nextc))
            // 判断state的值是否已经等于0,如果是等于0的话,那么直接doReleaseShared唤醒线程
            return nextc == 0;
    }
}

// 如果CountDownLatch中的state已经为0了,那么再次执行countDown跟没执行一样。
// 而且只要state变为0,await就不会阻塞线程。

private void doReleaseShared() {
    for (;;) {
        // 获取头结点
        Node h = head;
        // 判断是否有线程等待
        if (h != null && h != tail) {
            int ws = h.waitStatus;
            // 如果头结点ws为-1说明后继节点存在等待
            if (ws == Node.SIGNAL) {
                if (!compareAndSetWaitStatus(h, Node.SIGNAL, 0))
                    continue;            // loop to recheck cases
                // 唤醒后继线程
                unparkSuccessor(h);
            }
            // 这里是为了修复jdk1.5中的bug,后续会说明
            else if (ws == 0 &&
                     !compareAndSetWaitStatus(h, 0, Node.PROPAGATE))
                continue;                // loop on failed CAS
        }
        if (h == head)                   // loop if head changed
            break;
    }
}

1..4 await源码剖析 

public void await() throws InterruptedException {
    // 调用AQS提供的获取共享锁并且允许中断的方法
    sync.acquireSharedInterruptibly(1);
}
public final void acquireSharedInterruptibly(int arg)
        throws InterruptedException {
    // 允许线程中断
    if (Thread.interrupted())
        throw new InterruptedException();
    if (tryAcquireShared(arg) < 0)
        // 如果返回的是-1,代表state肯定是大于0
        doAcquireSharedInterruptibly(arg);
}
// 判断state是否为0,如果为0则直接结束返回
protected int tryAcquireShared(int acquires) {
    return (getState() == 0) ? 1 : -1;
}

// countDownLatch实现
private void doAcquireSharedInterruptibly(int arg)
    throws InterruptedException {
    // 封装当前线程为node节点,并且加入到aqs队列中
    final Node node = addWaiter(Node.SHARED);
    boolean failed = true;
    try {
        for (;;) {
            final Node p = node.predecessor();
            if (p == head) {
                // 继续判断state是否为0
                int r = tryAcquireShared(arg);
                if (r >= 0) {
                    // 会将当前线程和后面所有排队的线程都唤醒
                    setHeadAndPropagate(node, r);
                    p.next = null; // help GC
                    failed = false;
                    return;
                }
            }
            // 跟lock一样,挂起线程
            if (shouldParkAfterFailedAcquire(p, node) &&
                parkAndCheckInterrupt())
                throw new InterruptedException();
        }
    } finally {
        if (failed)
            cancelAcquire(node);
    }
}

 二、Samaphore源码分析

2.1 Samaphore应用

         也是常用的JUC并发工具,一般用于流控。比如有一个公共资源,多线程都可以访问时,可以用信号量做限制。

        连接池,内部的链接对象有限,每当有一个线程获取连接对象时,对信号量-1,当这个线程归还资源时对信号量+1。如果线程拿资源时,发现Semaphore内部的资源个数为0,就会被阻塞。例如:Hystrix的隔离策略 - 线程池,信号量。

public static void main(String[] args) throws InterruptedException, BrokenBarrierException {
    // 声明信号量
    Semaphore semaphore = new Semaphore(1);
    // 能否去拿资源
    semaphore.acquire();
    // 拿资源处理业务
    System.out.println("main");
    // 归还资源
    semaphore.release();
}

2.2 有参构造 

    public Semaphore(int permits) {
        // 默认非公平锁实现
        sync = new NonfairSync(permits);
    }

    /**
     * Creates a {@code Semaphore} with the given number of
     * permits and the given fairness setting.
     *
     * @param permits the initial number of permits available.
     *        This value may be negative, in which case releases
     *        must occur before any acquires will be granted.
     * @param fair {@code true} if this semaphore will guarantee
     *        first-in first-out granting of permits under contention,
     *        else {@code false}
     */
    public Semaphore(int permits, boolean fair) {
        // 设置资源个数,State其实就是信号量的资源个数
        sync = fair ? new FairSync(permits) : new NonfairSync(permits);
    }

2.3 acquire源码剖析 

public void acquire(int permits) throws InterruptedException {
    if (permits < 0) throw new IllegalArgumentException();
    sync.acquireSharedInterruptibly(permits);
}


public final void acquireSharedInterruptibly(int arg)
        throws InterruptedException {
    if (Thread.interrupted())
        throw new InterruptedException();
    if (tryAcquireShared(arg) < 0)
        doAcquireSharedInterruptibly(arg);
}
// 公平锁实现
protected int tryAcquireShared(int acquires) {
    for (;;) {
        // 上来就先判断队列是否有线程在等待,如果有那就取排队
        if (hasQueuedPredecessors())
            return -1;
        int available = getState();
        int remaining = available - acquires;
        // 通过CAS去修改state的值,并且返回还剩余的state
        if (remaining < 0 ||
            compareAndSetState(available, remaining))
            return remaining;
    }
}

protected int tryAcquireShared(int acquires) {
    return nonfairTryAcquireShared(acquires);
}
// 非公平锁实现
final int nonfairTryAcquireShared(int acquires) {
    for (;;) {
        int available = getState();
        int remaining = available - acquires;
        if (remaining < 0 ||
            compareAndSetState(available, remaining))
            return remaining;
    }
}
// 拿不到资源那就挂起线程
private void doAcquireSharedInterruptibly(int arg)
    throws InterruptedException {
    // 将当前线程封装到node并且加入到aqs队列中
    final Node node = addWaiter(Node.SHARED);
    boolean failed = true;
    try {
        for (;;) {
            // 拿到前继节点
            final Node p = node.predecessor();
            if (p == head) {
                // 获取剩余的资源
                int r = tryAcquireShared(arg);
                if (r >= 0) {
                    // 唤醒当前节点以及后继节点
                    setHeadAndPropagate(node, r);
                    p.next = null; // help GC
                    failed = false;
                    return;
                }
            }
            // 挂起线程
            if (shouldParkAfterFailedAcquire(p, node) &&
                parkAndCheckInterrupt())
                throw new InterruptedException();
        }
    } finally {
        if (failed)
            cancelAcquire(node);
    }
}

 2.4 release源码剖析

public void release() {
    sync.releaseShared(1);
}
public final boolean releaseShared(int arg) {
    if (tryReleaseShared(arg)) {
        // 唤醒形成去竞争资源
        doReleaseShared();
        return true;
    }
    return false;
}
protected final boolean tryReleaseShared(int releases) {
    for (;;) {
        int current = getState();
        // state +1
        int next = current + releases;
        // state +1 等于负数,最大值+1为负数表示超过最大值
        if (next < current) // overflow
            throw new Error("Maximum permit count exceeded");
        // 通过CAS一定要把state +1
        if (compareAndSetState(current, next))
            return true;
    }
}

 2.5 分析AQS中的PROPAGATE类型节点的作用

        在JDK1.5中,使用信号量时,可能会造成在有资源的情况下,后继节点无法呗唤醒,如下流程:

        在JDK1.8中,问题被修复,修复方式就是追加了PROPAGATE节点状态来解决。

        共享锁在释放资源后,如果头节点为0,无法确认真的没有后继节点。如果头节点为0,需要将头节点的状态修改为-3,当最新拿到锁资源的线程,查看是否有后继节点并且为共享锁,就唤醒排队的线程  【这里需要对比JDK1.5中的实现来看才能理解这个场景】。

       jdk8多了以下操作。

三、CyclicBarrier源码分析

        一般称为栅栏,和CountDownLatch很像。 CountDownLatch在操作时,只能使用一次,也就是state变为0之后,就无法继续玩了。 CyclicBarrier是可以复用的,他的计数器可以归位,然后再处理。而且可以在计数过程中出现问题后,重置当前CyclicBarrier,再次重新操作!

public static void main(String[] args) throws InterruptedException, BrokenBarrierException {
    // 声明栅栏
    CyclicBarrier barrier = new CyclicBarrier(3,() -> {
        System.out.println("打手枪!");
    });

    new Thread(() -> {
        System.out.println("第一位选手到位");
        try {
            barrier.await();
            System.out.println("第一位往死里跑!");
        } catch (Exception e) {
            e.printStackTrace();
        }
    }).start();

    new Thread(() -> {
        System.out.println("第二位选手到位");
        try {
            barrier.await();
            System.out.println("第二位也往死里跑!");
        } catch (Exception e) {
            e.printStackTrace();
        }
    }).start();

    System.out.println("裁判已经到位");
    barrier.await();
}

3.1 有参构造 

         CyclicBarrier没有直接使用AQS,而是使用ReentrantLock,简介的使用的AQS。

// CyclicBarrier的有参
public CyclicBarrier(int parties, Runnable barrierAction) {、
    // 健壮性判断!
    if (parties <= 0) throw new IllegalArgumentException();
    // parties是final修饰的,需要在重置时,使用!
    this.parties = parties;
    // count是在执行await用来计数的。
    this.count = parties;
    // 当计数count为0时 ,先执行这个Runnnable!在唤醒被阻塞的线程
    this.barrierCommand = barrierAction;
}

3.2 await源码剖析 

         线程执行await方法,会对count-1,再判断count是否为0,如果不为0,需要添加到AQS中的ConditionObject的Waiter队列中排队,并park当前线程。        

        如果为0,证明线程到齐,需要执行nextGeneration,会先将Waiter队列中的Node全部转移到AQS的队列中,并且有后继节点的,ws设置为-1。没有后继节点设置为0。然后重置count和broker标记。等到unlock执行后,每个线程都会被唤醒。

// 选手到位!!!
private int dowait(boolean timed, long nanos) throws InterruptedException, BrokenBarrierException, TimeoutException {
    // 加锁??  因为CyclicBarrier是基于ReentrantLock-Condition的await和singalAll方法实现的。
    // 相当于synchronized中使用wait和notify
    // 别忘了,只要挂起,会释放锁资源。
    final ReentrantLock lock = this.lock;
    lock.lock();
    try {
        // 里面就是boolean,默认false
        final Generation g = generation;

        // 判断之前栅栏加入线程时,是否有超时、中断等问题,如果有,设置boolean为true,其他线程再进来,直接凉凉
        if (g.broken)
            throw new BrokenBarrierException();

        if (Thread.interrupted()) {
            breakBarrier();
            throw new InterruptedException();
        }


        // 对计数器count--
        int index = --count;
        // 如果--完,是0,代表突破栅栏,干活!
        if (index == 0) {  
            // 默认false
            boolean ranAction = false;
            try {
                // 如果你用的是2个参数的有参构造,说明你传入了任务,index == 0,先执行CyclicBarrier有参的任务
                final Runnable command = barrierCommand;
                if (command != null)
                    command.run();
                // 设置为true
                ranAction = true;
                nextGeneration();
                return 0;
            } finally {
                if (!ranAction)
                    breakBarrier();
            }
        }

        // --完之后,index不是0,代表还需要等待其他线程
        for (;;) {
            try {
                // 如果没设置超时时间。  await()
                if (!timed)
                    trip.await();
                // 设置了超时时间。  await(1,SECOND)
                else if (nanos > 0L)
                    nanos = trip.awaitNanos(nanos);
            } catch (InterruptedException ie) {
                if (g == generation && ! g.broken) {
                    breakBarrier();
                    throw ie;
                } else {
                    Thread.currentThread().interrupt();
                }
            }

            if (g.broken)
                throw new BrokenBarrierException();

            if (g != generation)
                return index;

            if (timed && nanos <= 0L) {
                breakBarrier();
                throw new TimeoutException();
            }
        }
    } finally {
        lock.unlock();
    }
}



// 挂起线程
public final void await() throws InterruptedException {
    // 允许中断
    if (Thread.interrupted())
        throw new InterruptedException();
    // 添加到队列(不是AQS队列,是AQS里的ConditionObject中的队列)
    Node node = addConditionWaiter();
    int savedState = fullyRelease(node);
    int interruptMode = 0;
    while (!isOnSyncQueue(node)) {
        // 挂起当前线程
        LockSupport.park(this);
        if ((interruptMode = checkInterruptWhileWaiting(node)) != 0)
            break;
    }
}


// count到0,唤醒所有队列里的线程线程
private void nextGeneration() {
    // 这个方法就是将Waiter队列中的节点遍历都扔到AQS的队列中,真正唤醒的时机,是unlock方法
    trip.signalAll();
    // 重置计数器
    count = parties;
    // 重置异常判断
    generation = new Generation();
}

3.3 总结 

        其实可以看到,上面已经把我们AQS中的node节点的每一个状态都说明了,1,-1,-2,-3,其中AQS中存在两个队列,其中是一个等待队列,另外一个是在AQS内部中的 ConditionObject维护的条件队列,当ConditionObject满足唤醒条件的时候,机会将条件队列的节点移动到AQS的等待队列中等待唤醒。

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.coloradmin.cn/o/2251268.html

如若内容造成侵权/违法违规/事实不符,请联系多彩编程网进行投诉反馈,一经查实,立即删除!

相关文章

SQL进阶技巧:如何寻找同一批用户 | 断点分组应用【最新面试题】

目录 0 问题描述 1 数据准备 2 问题分析 ​编辑 3 小结 0 问题描述 用户登录时间不超过10分钟的视为同一批用户,找出以下用户哪些属于同一批用户(SQL实现) 例如: user_name time a 2024-10-01 09:55 b 2024-10-01 09:57 c 2024-10-01…

分页查询日期格式不对

方式一:在属性上加入注解&#xff0c;对日期进行格式化 方式二:在 WebMvcConfiguration 中扩展Spring MVC的消息转换器&#xff0c;统一对日期类型进行格式化处理 /*** 统一转换处理扩展spring mvc* 后端返回前端的进行统一转化处理* param converters*/Overrideprotected voi…

redis的应用--分布式锁

redis的应用--分布式锁 一、分布式锁的概念二、分布式锁的基础实现2.1初步实现2.2引入过期时间2.3引入校验id2.4引入lua脚本2.5引入watch dog&#xff08;看门狗&#xff09;2.6引入Redlock算法 三、分布式锁的种类 一、分布式锁的概念 在⼀个分布式的系统中, 也会涉及到多个节…

构网型与跟网型混合直驱风电场并网稳定域研究

传统的风机变流器控制采用跟网型(grid-following&#xff0c;GFL)控制&#xff0c;需依赖于锁相环跟踪电网电压的频率/相位信息&#xff0c;以实现与电网的同步。随着能源电力系统的转型&#xff0c;电网逐渐转变为呈现低短路比&#xff08;short-circuitratio&#xff0c;SCR&…

异步处理优化:多线程线程池与消息队列的选择与应用

目录 一、异步处理方式引入 &#xff08;一&#xff09;异步业务识别 &#xff08;二&#xff09;明确异步处理方式 二、多线程线程池&#xff08;Thread Pool&#xff09; &#xff08;一&#xff09;工作原理 &#xff08;二&#xff09;直面优缺点和适用场景 1.需要快…

3DMAX星空图像生成器插件使用方法详解

3DMAX星空图像生成器插件&#xff0c;一键生成星空或夜空的二维图像。它可用于创建天空盒子或空间场景&#xff0c;或作为2D艺术的天空背景。 【主要特点】 -单击即可创建星空图像或夜空。 -星数、亮度、大小、形状等参数。 -支持任何图像大小&#xff08;方形&#xff09;。…

eltable el-table 横向 滚动条常显

又遇到了难受的问题&#xff0c;el-table嵌入在一个div里面&#xff0c;结果因为内容太多,横向、纵向我都得滚动查看&#xff01; 结果发现横向滚动时只能让它纵向触底后才能进行横向操作&#xff0c;这就很变态&#xff0c;明显不符合用户操作习惯。如下图&#xff1a; 要先纵…

《C++ Primer Plus》学习笔记|第8章 函数探幽 (24-11-30更新)

文章目录 8.1 内联函数8.2 引用变量8.2.1 创建引用变量8.2.2 将引用用作函数参数8.2.3 引用的属性和特别之处特点1&#xff1a;在计算过程中&#xff0c;传入的形参的值也被改变了。特点2&#xff1a;使用引用的函数参数只接受变量&#xff0c;而不接受变量与数值的运算左值引用…

在 Mac ARM 架构(例如 M1 或 M2 芯片)上安装 Node.js

文章目录 方法一&#xff1a;使用 Homebrew 安装 Node.js方法二&#xff1a;使用 Node Version Manager (NVM) 安装 Node.js方法三&#xff1a;从 Node.js 官方网站下载安装包注意事项 在 Mac ARM 架构&#xff08;例如 M1 或 M2 芯片&#xff09;上安装 Node.js 可以通过几种不…

电脑提示报错“Directx error”怎么解决?是什么原因导致的?游戏软件提示“Directx error”错误的解决方案

DirectX Error&#xff08;DX错误&#xff09;通常指的是在使用基于DirectX技术的应用程序&#xff08;尤其是游戏&#xff09;时遇到的问题。这个问题可能由多种因素导致&#xff0c;以下是一些可能的原因及相应的解决方案&#xff1a; 可能的原因 DirectX版本不匹配&#x…

JAVA:Spring Boot 3 实现 Gzip 压缩优化的技术指南

1、简述 随着 Web 应用的用户量和数据量增加&#xff0c;网络带宽和页面加载速度逐渐成为瓶颈。为了减少数据传输量&#xff0c;提高用户体验&#xff0c;我们可以使用 Gzip 压缩 HTTP 响应。本文将介绍如何在 Spring Boot 3 中实现 Gzip 压缩优化。 2、配置 Spring Boot 3 对…

哈希表,哈希桶的实现

哈希概念 顺序结构以及平衡树中&#xff0c;元素关键码与其存储位置之间没有对应的关系&#xff0c;因此在查找一个元素 时&#xff0c;必须要经过关键码的多次比较。顺序查找时间复杂度为O(N)&#xff0c;平衡树中为树的高度&#xff0c;即 O(logN)&#xff0c;搜索的效率取决…

从 EXCEL 小白到 EXCEL 高手的成长之路

在职场与日常生活中&#xff0c;Excel 作为一款强大的数据处理与分析工具&#xff0c;扮演着不可或缺的角色。无论是初学者还是资深职场人士&#xff0c;掌握 Excel 技能都能极大地提高工作效率。那么&#xff0c;从一个 Excel 小白蜕变成为 Excel 高手&#xff0c;究竟需要多久…

TiDB 无统计信息时执行计划如何生成

作者&#xff1a; weiyinghua 原文来源&#xff1a; https://tidb.net/blog/4c49ac0d 一、Pseudo 统计信息总体生成规则 TiDB 在表无统计信息时&#xff0c;不会进行动态采样&#xff0c;而是用静态的、预设规则以及经验假设来生成计划。用函数 PseudoTable 创建一个伪统…

org.apache.commons.lang3包下的StringUtils工具类的使用

前言 相信平时在写项目的时候&#xff0c;一定使用到StringUtils.isEmpty()&#xff1b;StringUtils.isBlank();但是你真的了解他们吗&#xff1f; 也许你两个都不知道&#xff0c;也许你除了isEmpty/isNotEmpty/isNotBlank/isBlank外&#xff0c;并不知道还有isAnyEmpty/isNon…

【工具推荐】dnsx——一个快速、多用途的 DNS 查询工具

basic/基本使用方式 echo baidu.com | dnsx -recon # 查询域名所有记录echo baidu.com | dnsx -a -resp # 查询域名的a记录echo baidu.com | dnsx -txt -resp # 查询域名的TXT记录echo ip | dnsx -ptr -resp # ip反查域名 A记录查询 TXT记录查询 ip反查域名 help/帮助信息 输…

Hive高可用配置

在hive的商用上没有集群一说&#xff0c;而且它本身也不是数据库&#xff0c;只是hadoop的数据sql化工具&#xff0c;但是hive可以配置高可用&#xff0c;通常业内对元数据服务会开5个&#xff0c;而HS2服务开3个&#xff0c;来保证hive服务的高可用 配置方式也很简单&#xf…

使用数学方法实现K-Nearest Neighbors(KNN)算法

目录 ​编辑 引言 KNN算法的数学基础 1. 距离度量 欧氏距离 曼哈顿距离 2. 寻找最近邻 3. 决策规则 分类 回归 4. 权重 KNN算法的实现步骤 1. 参数选择 2. 实现 导入必要的库 加载数据集 划分训练集和测试集 创建KNN模型 训练模型 预测测试集 计算准确率 …

提升用户体验、创新产品与高效运营,企业发展三驾马车

​在当今竞争激烈的市场环境中&#xff0c;企业要想脱颖而出并持续发展&#xff0c;需同时在提升用户体验、推动产品创新以及实现内部高效运营方面下功夫。 提升用户体验至关重要。它能提高用户满意度和忠诚度&#xff0c;增加用户口碑与推荐&#xff0c;提升企业品牌形象。可通…

在 Mac(ARM 架构)上安装 JDK 8 环境

文章目录 步骤 1&#xff1a;检查系统版本步骤 2&#xff1a;下载支持 ARM 的 JDK 8步骤 3&#xff1a;安装 JDK步骤 4&#xff1a;配置环境变量步骤 5&#xff1a;验证安装步骤 6&#xff1a;注意事项步骤7&#xff1a;查看Java的安装路径 在 Mac&#xff08;ARM 架构&#xf…