多线程与高并发——并发编程(7)

news2025/1/21 7:18:56

文章目录

  • 七、JUC并发工具
    • 1 CountDownLatch应用&源码分析
      • 1.1 CountDownLatch介绍
      • 1.2 CountDownLatch应用
      • 1.3 CountDownLatch源码分析
        • 1.3.1 有参构造
        • 1.3.2 await 方法
        • 1.3.3 countDown方法
    • 2 CyclicBarrier应用&源码分析
      • 2.1 CyclicBarrier介绍
      • 2.2 CyclicBarrier应用
      • 2.3 CyclicBarrier源码分析
        • 2.3.1 CyclicBarrier的核心属性
        • 2.3.2 CyclicBarrier的有参构造
        • 2.3.3 CyclicBarrier中的await方法
    • 3 Semaphone应用&源码分析
      • 3.1 Semaphore介绍
      • 3.2 Semaphore应用
      • 3.3 Semaphore源码分析
        • 3.3.1 Semaphore的整体结构
        • 3.3.2 Semaphore的非公平获取资源
        • 3.3.3 Semaphore公平实现
        • 3.3.4 Semaphore释放资源
      • 3.4 AQS中PROPAGATE节点
        • 3.4.1 掌握JDK1.5 Semaphore执行流程图
        • 3.4.2 分析JDK1.8的变化

七、JUC并发工具

1 CountDownLatch应用&源码分析

1.1 CountDownLatch介绍

  • CountDownLatch 就是 JUC 包下的一个工具,整个工具最核心的功能就是计数器。
  • 假设,有三个业务需要并行处理,并且需要知道三个业务全部都处理完毕了,以及需要一个并发安全的计数器来操作,那么 CountDownLatch 就可以实现。给 CountDownLatch 设置一个数值 3,每个业务处理完毕之后,执行 countDown 方法,指定的 3 在每次执行 countDown 方法时,对 3 进行 -1。主线程可以在业务处理时,执行 await 方法,主线程会阻塞等待任务处理完毕。当设置的 3 基于 countDown 方法减到 0 之后,主线程就会被唤醒,继续处理后续业务。

image.png

  • 当咱们得业务中,出现 2 个以上允许并行处理的任务,并且需要在任务都处理完毕之后,再做其他处理时,可以采用 CountDownLatch 去实现这个功能。

1.2 CountDownLatch应用

  • 模拟三个任务需要并行处理,在三个任务全部处理完毕后,再执行后续操作。CountDownLatch 中,执行 countDown 方法,代表一个任务结束,对计数器 -1;执行 await 方法,代表等待计数器变为 0 时,再继续执行;执行 await(time,unit) 代表等待 time 时长,如果计数器不为 0,返回 false,如果在等待期间,计数器为 0,方法返回 true。
static ThreadPoolExecutor executor = (ThreadPoolExecutor) Executors.newFixedThreadPool(3);

static CountDownLatch countDownLatch = new CountDownLatch(3);

public static void main(String[] args) throws InterruptedException {
    System.out.println("主业务开始执行");
    sleep(1000);
    executor.execute(CompanyTest::a);
    executor.execute(CompanyTest::b);
    executor.execute(CompanyTest::c);
    System.out.println("三个任务并行执行,主业务线程等待");
    // 死等任务结束
    // countDownLatch.await();
    // 如果在规定时间内,任务没有结束,返回false
    if (countDownLatch.await(10, TimeUnit.SECONDS)) {
        System.out.println("三个任务处理完毕,主业务线程继续执行");
    }else{
        System.out.println("三个任务没有全部处理完毕,执行其他的操作");
    }
}

private static void a() {
    System.out.println("A任务开始");
    sleep(1000);
    System.out.println("A任务结束");
    countDownLatch.countDown();
}
private static void b() {
    System.out.println("B任务开始");
    sleep(1500);
    System.out.println("B任务结束");
    countDownLatch.countDown();
}
private static void c() {
    System.out.println("C任务开始");
    sleep(2000);
    System.out.println("C任务结束");
    countDownLatch.countDown();
}

private static void sleep(long timeout){
    try {
        Thread.sleep(timeout);
    } catch (InterruptedException e) {
        e.printStackTrace();
    }
}

1.3 CountDownLatch源码分析

  • CountDownLatch 就是一个计数器,没有什么特殊功能,查看源码也只是查看计数器实现的方式。
  • 发现 CountDownLatch 的内部类 Sync 继承了 AQS,CountDownLatch 就是基于 AQS 实现的计数器。而 AQS 就是一个 state 属性,以及 AQS 双向链表,所以猜测计数器的数值实现就是基于 state 实现的,主线程阻塞的方式,也是阻塞在了 AQS 的双向链表中。

1.3.1 有参构造

  • 就是构造内部类 Sync,并且给 AQS 的 state 赋值
// CountDownLatch的有参构造
public CountDownLatch(int count) {
    // 健壮性校验
    if (count < 0) throw new IllegalArgumentException("count < 0");
    // 构建内部类,Sync传入count
    this.sync = new Sync(count);
}
// AQS子类,Sync的有参构造
Sync(int count) {
    // 就是给AQS中的state赋值
    setState(count);
}

1.3.2 await 方法

  • await 方法就是判断当前 CountDownLatch 中的 state 是否为 0,如果为 0,直接正常执行后续任务;如果不为 0,以共享锁的方式,插入到 AQS 的双向链表中,并且挂起线程。
// 一般主线程await的方法,阻塞主线程,等待state为0
public void await() throws InterruptedException {
    sync.acquireSharedInterruptibly(1);
}
// 执行了AQS的acquireSharedInterruptibly方法
public final void acquireSharedInterruptibly(int arg) throws InterruptedException {
    // 判断线程是否中断,如果中断标记位是true,直接抛出异常
    if (Thread.interrupted())
        throw new InterruptedException();
    if (tryAcquireShared(arg) < 0)
        // 共享锁挂起的操作
        doAcquireSharedInterruptibly(arg);
}
// tryAcquireShared在CountDownLatch中的实现
protected int tryAcquireShared(int acquires) {
    // 查看state是否为0,如果为0,返回1,不为0,返回-1
    return (getState() == 0) ? 1 : -1;
}

private void doAcquireSharedInterruptibly(int arg) throws InterruptedException {
    // 封装当前先成为Node,属性为共享锁
    final Node node = addWaiter(Node.SHARED);
    boolean failed = true;
    try {
        for (;;) {
            final Node p = node.predecessor();
            if (p == head) {
                int r = tryAcquireShared(arg);
                if (r >= 0) {
                    setHeadAndPropagate(node, r);
                    p.next = null; // help GC
                    failed = false;
                    return;
                }
            }
            // 在这,就需要挂起当前线程。
            if (shouldParkAfterFailedAcquire(p, node) &&
                parkAndCheckInterrupt())
                throw new InterruptedException();
        }
    } finally {
        if (failed)
            cancelAcquire(node);
    }
}

1.3.3 countDown方法

  • countDown 方法本质就是对 state -1,如果 state -1后变为 0,需要去 AQS 的链表中唤醒挂起的节点。
// countDown对计数器-1
public void countDown() {
    sync.releaseShared(1);	// 是-1
}
// AQS提供的功能
public final boolean releaseShared(int arg) {
    // 对state - 1
    if (tryReleaseShared(arg)) {
        // state - 1后,变为0,执行doReleaseShared
        doReleaseShared();
        return true;
    }
    return false;
}
// CountDownLatch的tryReleaseShared实现
protected boolean tryReleaseShared(int releases) {
    // 死循环是为了避免CAS并发问题
    for (;;) {
        int c = getState();		// 获取state
        if (c == 0)				// state已经为0,直接返回false
            return false;
        int nextc = c-1;		// 对获取到的state - 1
        // 基于CAS的方式,将值赋值给state
        if (compareAndSetState(c, nextc))
            // 赋值完,发现state为0了。此时可能会有线程在await方法处挂起,那边挂起,需要这边唤醒
            return nextc == 0;
    }
}

// 如何唤醒在await方法处挂起的线程
private void doReleaseShared() {
    for (;;) {	// 死循环
        Node h = head;	// 拿到head
        // head不为null,有值,并且head != tail,代表至少2个节点
        // 一个虚拟的head,加上一个实质性的Node
        if (h != null && h != tail) {
            int ws = h.waitStatus;	// 说明AQS队列中有节点
            if (ws == Node.SIGNAL) {// 如果head节点的状态为 -1
                // 先对head节点将状态从-1,修改为0,避免重复唤醒的情况
                if (!compareAndSetWaitStatus(h, Node.SIGNAL, 0))
                    continue;  
                // 正常唤醒节点即可,先看head.next,能唤醒就唤醒,如果head.next有问题,从后往前找有效节点
                unparkSuccessor(h);
            }
            // 会在Semaphore中谈到这个位置
            else if (ws == 0 &&
                     !compareAndSetWaitStatus(h, 0, Node.PROPAGATE))
                continue;  
        }
        // 会在Semaphore中谈到这个位置
        if (h == head)  
            break;
    }
}

2 CyclicBarrier应用&源码分析

2.1 CyclicBarrier介绍

从名字上来看 CyclicBarrier,就是代表循环屏障。

  • Barrier屏障:让一个或多个线程到达一个屏障点,会被阻塞。屏障点会有一个数值,当到达一个线程阻塞在屏障点时,就会对屏障点的数值进行 -1 操作。当屏障点的数值减到 0 时,屏障就会打开,唤醒所有阻塞在屏障点的线程。在释放屏障点之后,可以先执行一个任务,再让所有阻塞被唤醒的线程继续之后后续任务。
  • Cyclic循环:所有线程被释放后,屏障点的数值可以再次被重置。

CyclicBarrier 一般被称为栅栏,是一种同步机制,允许一组线程相互等待。线程达到屏障点其实是基于 await 方法在屏障点阻塞。

CyclicBarrier 并没有基于 AQS 实现,它是基于 ReentrantLock 锁机制实现了对屏障点的 --,以及线程挂起的操作。(CountDownLatch本身是基于 AQS,对 state 进行 release 操作后,可以 -1)

CyclicBarrier 每来一个线程执行 await 方法,都会对屏障数值 -1 操作,每次 -1 后,立即查看数值是否为 0。如果为 0,直接唤醒所有的相互等待的线程。

CyclicBarrier 和 CountDownLatch 区别?

  • 底层实现不同:CyclicBarrier 基于 ReentrantLock 实现,CountDownLatch 直接基于 AQS 实现。
  • 应用场景不同:CountDownLatch 的计数器只能使用一次,而 CyclicBarrier 在计数器达到 0 之后,可以重置计数器。CyclicBarrier 可以实现相比 CountDownLatch 更复杂的业务,执行业务时出现错误,可以充值 CyclicBarrier 计数器,再执行一次。
  • CyclicBarrier 还提供了很多其他功能:
    • 可以获取到阻塞的线程有多少;
    • 在线程相互等待是,如果等待的线程中断,可以抛异常,避免无限等待的问题。
  • CountDownLatch 一般是让主线程等待,让子线程对计数器 --。CyclicBarrier 更多的是让子线程一起技术和等待,等待的线程达到数值后,再统一唤醒。
  • CyclicBarrier 多个线程相互等待,直到达到同一个同步点,再一起执行。

2.2 CyclicBarrier应用

举个🌰:出国旅游

导游小姐姐需要等待所有乘客都到位后,发护照、签证等文件,再一起出发。

比如 Tom、Jack、Rose 三个人组团出门旅游

public static void main(String[] args) throws InterruptedException {
    CyclicBarrier barrier = new CyclicBarrier(3,() -> {
        System.out.println("等到各位大佬都到位之后,分发护照和签证等内容!");
    });

    new Thread(() -> {
        System.out.println("Tom到位!!!");
        try {
            barrier.await();
        } catch (Exception e) {
            System.out.println("悲剧,人没到齐!");
            return;
        }
        System.out.println("Tom出发!!!");
    }).start();
    Thread.sleep(100);
    new Thread(() -> {
        System.out.println("Jack到位!!!");
        try {
            barrier.await();
        } catch (Exception e) {
            System.out.println("悲剧,人没到齐!");
            return;
        }
        System.out.println("Jack出发!!!");
    }).start();
    Thread.sleep(100);
    new Thread(() -> {
        System.out.println("Rose到位!!!");
        try {
            barrier.await();
        } catch (Exception e) {
            System.out.println("悲剧,人没到齐!");
            return;
        }
        System.out.println("Rose出发!!!");
    }).start();
    /*
    tom到位,jack到位,rose到位
    导游发签证
    tom出发,jack出发,rose出发
     */

}
  • 在构建 CyclicBarrier 时可以选择性指定 barrierAction,如果指定了,那么会在 barrier 归为 0 后,优先执行 barrierAction 任务,然后再去唤醒所有阻塞挂起的线程,并行去处理后续任务。
  • 所有相互等待的线程,可以指定等待时间,并且再等待过程中,如果有线程中断,所有相互等待的线程都会被唤醒。
  • 如果在等待期间,有线程终端了,唤醒所有线程后,CyclicBarrier 无法继续使用。如果需要继续使用当前 CyclicBarrier,需要调用 reset 方法,让 CyclicBarrier 重置。
  • 如果 CyclicBarrier 的屏障数值达到 0 之后,会默认重置屏障数值,CyclicBarrier 在没有线程中断时,是可以重复使用的。

2.3 CyclicBarrier源码分析

  • 分成两块内容去查看,首先查看 CyclicBarrier 的一些核心属性,然后再去查看 CyclicBarrier 的核心方法。

2.3.1 CyclicBarrier的核心属性

public class CyclicBarrier {
   
    // 这个静态内部类是用来标记是否中断的
    private static class Generation {
   
        boolean broken = false;
    }
	// CyclicBarrier是基于ReentrantLock实现的互斥操作,以及计数原子性操作
    private final ReentrantLock lock = new ReentrantLock();
    // 基于当前的Condition实现线程的挂起和唤醒
    private final Condition trip = lock.newCondition();
    // 记录有参构造出入的屏障数值,不会对这个数值做操作
    private final int parties;
    // 当屏障数值达到0之后,优先执行当前任务
    private final Runnable barrierCommand;
    // 初始化默认的Generation,用来标记线程中断情况
    private Generation generation = new Generation()

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.coloradmin.cn/o/996847.html

如若内容造成侵权/违法违规/事实不符,请联系多彩编程网进行投诉反馈,一经查实,立即删除!

相关文章

python DVWA文件上传POC练习

首先&#xff0c;构造POC我们首先要明白漏洞利用的流程&#xff0c;然后要知道请求包的格式&#xff0c;然后才能针对性的POC 这里先选择低难度的文件上传&#xff0c;低难度的是没有任何过滤可以直接上传的&#xff0c;先上传一个php一句话木马&#xff0c;使用burpsuite抓包 …

go channel实践与源码探索(初始化、发送消息、接收消息、关闭)

文章目录 概要一、并发编程1.1、Actor模型1.2、CSP模型 二、Go Channel实践三、源码分析3.1、初始化3.2、发送消息3.3、接收消息3.4、关闭通道 总结 概要 通道&#xff08;Channel&#xff09;是Go语言提供的协程之间通信与同步的方式。我们知道在并发编程&#xff08;多进程、…

mysql检查表、分析表、优化表

MySQL提供了分析表、检查表和优化表的语句。分析表主要是分析关键字的分布&#xff0c;检查表主要是检查表是否存在错误&#xff0c;优化表主要是消除删除或者更新造成的空间浪费。 文章目录 概述分析表检查表优化表 概述 对数据库的管理常规就是进行预防性的维护&#xff0c;…

SSM - Springboot - MyBatis-Plus 全栈体系(五)

第二章 SpringFramework 四、SpringIoC 实践和应用 2. 基于 XML 配置方式组件管理 2.5 实验五&#xff1a;高级特性&#xff1a;FactoryBean 特性和使用 2.5.1 FactoryBean 简介 FactoryBean 接口是Spring IoC容器实例化逻辑的可插拔性点。 用于配置复杂的Bean对象&#x…

前端list.push,封装多个对象

js var fruit [apple, banana];fruit.push(pear);console.log(fruit); // [apple, banana, pear]现在为对象 data1:{addUser: 1,editUser: 1,addTime: null,editTime: 1527410579000,userId: 3,systemNo: mc,userName: zengzhuo,userPassword: e10adc3949ba59abbe56e057f20f88…

Qt点亮I.MX6U开发板的一个LED

本篇开始将会介绍与开发版相关的Qt项目&#xff0c;首先从点亮一个LED开始。I.MX6U和STM32MP157的相关信息都会用到&#xff0c;但是后期还是将I.MX6U的学习作为重点。当然其他开发版的开发也可以参考本博文。 文章目录 1. Qt是如何操控开发板上的一个LED2. 出厂内核设备树中注…

linux修改最大线程数却未生效的原因

可能是没有重新对新文件进行编译 更改一个进程所能创建的最大进程数之前 更改一个进程所能创建的最大进程数之后 测试代码 #include <iostream> #include <unistd.h> #include <sys/wait.h> #include <string.h> #include <stdio.h> #include…

学生信息系统(python实现)

#codingutf-8 import os.path filenamestudent.txtdef menm():#菜单界面print(学生管理系统)print(-----------------------------功能菜单-----------------------------)print(\t\t\t\t\t\t1.录入学生信息)print(\t\t\t\t\t\t2.查找学生信息)print(\t\t\t\t\t\t3.删除学生信息…

合宙Air724UG LuatOS-Air LVGL API控件-图片(Gif)

图片&#xff08;Gif&#xff09; GIF图片显示&#xff0c;core版本号要>3211 示例代码 方法一 -- 创建GIF图片控件 glvgl.gif_create(lvgl.scr_act()) -- 设置显示的GIF图像 lvgl.gif_set_src(g,"/lua/test.gif") -- gif图片居中 lvgl.obj_align(g, nil, lvgl…

软件测试行业35岁职场魔咒,你准备怎么应对?

以前就流传一种说法“IT行业职场35岁危机”&#xff0c;那时我半信半疑。 或许那时觉得还离我比较遥远&#xff0c;也或许那时每天都重复着996&#xff0c;工作上的任务已经应接不暇&#xff0c;每天都处在忙碌中&#xff0c;也没精力花时间去过多思考这个问题。休息的时候能好…

机器学习实战-系列教程7:SVM分类实战2线性SVM(鸢尾花数据集/软间隔/线性SVM/非线性SVM/scikit-learn框架)项目实战、代码解读

&#x1f308;&#x1f308;&#x1f308;机器学习 实战系列 总目录 本篇文章的代码运行界面均在Pycharm中进行 本篇文章配套的代码资源已经上传 SVM分类实战1之简单SVM分类 SVM分类实战2线性SVM SVM分类实战3非线性SVM 3、不同软间隔C值 3.1 数据标准化的影响 如图左边是没…

GLSL ES着色器 精度限定字

目录 前言 WebGL支持的三种精度 数据类型的默认精度 float类型没有默认精度 预处理指令 在GLSL ES中常用的三种预处理指令。 预定义的内置宏 前言 GLSL ES新引入了精度限定字&#xff0c;目的是帮助着色器程序提高运行效率&#xff0c;削减内存开支。顾名思义&#xf…

Kafka3.0.0版本——消费者(自动提交 offset)

目录 一、自动提交offset的相关参数二、消费者&#xff08;自动提交 offset&#xff09;代码示例 一、自动提交offset的相关参数 官网文档 参数解释 参数描述enable.auto.commi默认值为 true&#xff0c;消费者会自动周期性地向服务器提交偏移量。auto.commit.interval.ms如果…

《TCP/IP网络编程》阅读笔记--I/O复用

1--基于I/O复用的服务器 多进程服务器端具有以下缺点&#xff1a;当有多个客户端发起连接请求时&#xff0c;就会创建多个进程来分别处理客户端的请求&#xff0c;创建多个进程往往需要付出巨大的代价&#xff1b; I/O复用的服务器端可以减少进程数&#xff0c;无论连接多少个客…

Ubuntu22.04_如何调试ROS2_humble的源代码

这里的源码&#xff0c;是指的ros2 humble的官方源码。如果是自己手撸的节点或相关源码&#xff0c;请参考本人以前的贴子&#xff0c; Ubuntu20.04vscode快速调试ROS通用程序_ubuntu20.04vscode那个版本和ros 兼容_高精度计算机视觉的博客-CSDN博客 Ubuntu20.04&#xff0b;…

【pygame】01 pygame制作游戏的最小系统

这次使用sublimepython进行pygame的游戏开发&#xff0c;目的是学习使用python的基本操作和常用模块 添加一个文件夹到工程 最小系统 import pygame import sys ##导入sys模块 主要是为了 exit函数 from pygame.locals import * #导入一些常用的函数和常量pygame.init() …

计算机网络第四章——网络层(上)

提示&#xff1a;朝碧海而暮苍梧,睹青天而攀白日 文章目录 网络层是路由器的最高层次&#xff0c;通过网络层就可以将各个设备连接到一起&#xff0c;从而实现这两个主机的数据通信和资源共享&#xff0c;之前学的数据链路层和物理层也是将两端连接起来&#xff0c;但是却没有网…

C语言——指针进阶(2)

继续上次的指针&#xff0c;想起来还有指针的内容还没有更新完&#xff0c;今天来补上之前的内容&#xff0c;上次我们讲了函数指针&#xff0c;并且使用它来实现一些功能&#xff0c;今天我们就讲一讲函数指针数组等内容&#xff0c;废话不多说&#xff0c;我们开始今天的学习…

ESP32蓝牙主从站模式:主站发送,从站接收,同时附加简单通信协议

主站发送:WXAiBj,六个字符 蓝牙模式是一个字符一个字符发送 主站和从站设置通信协议 使得六个字符一句话完整接收,同时打印出接收完成信息 硬件电路连接如下: 主从站为两个ESP32,只使用了其中的蓝牙功能 代码如下: 主站: //主机模式 #include <Arduino.h> …