JAVA并发编程--4.1理解Condition

news2025/1/10 18:00:05

背景:Condition 多线程条件并发控制,与Lock配合可以实现等待/通知模式;

1 condition 使用demo(生产者与消费者模型):

package org.lgx.bluegrass.bluegrasscoree.util.testcondition;

import java.util.ArrayList;
import java.util.List;
import java.util.UUID;
import java.util.concurrent.locks.Condition;
import java.util.concurrent.locks.Lock;
import java.util.concurrent.locks.ReentrantLock;

/**
 * @Description TODO
 * @Date 2022/11/25 16:19
 * @Author lgx
 * @Version 1.0
 */
public class TestCondition {
    public static void main(String[] args) {
    	// 声明一把lock锁
        Lock lock = new ReentrantLock();
        // 声明队列不为空的条件
        Condition notEmpty = lock.newCondition();
        // 声明队列不满的条件
        Condition notFull = lock.newCondition();
        // 声明队列的最大长度
        int maxSize = 10;
        List<String> msg = new ArrayList<>();
        // 构造生产者
        Producer producer = new Producer(msg, lock, notEmpty, notFull, maxSize);
         // 构造消费者
        Consumer Consumer = new Consumer(msg, lock, notEmpty, notFull, maxSize);

        new Thread(producer).start();
        new Thread(Consumer).start();
    }

}
// 生产者
class Producer implements Runnable {
    private List<String> msg;
    private Lock lock;
    private Condition notEmpty;
    private Condition notFull;
    private Integer maxSize;

    public Producer(List<String> msg, Lock lock, Condition notEmpty, Condition notFull, Integer maxSize) {
        this.msg = msg;
        this.lock = lock;
        this.notEmpty = notEmpty;
        this.notFull = notFull;
        this.maxSize = maxSize;
    }
	/**
	* 生产者产生数据模型
	**/
    @Override
    public void run() {
        while (true) {
        	// 获取lock 锁,如果获取失败则进入到AQS 同步阻塞队列(双向队列)
            lock.lock();
            try {
                while (msg.size() >= maxSize) {
                    // 消息已满-- 需要阻塞
                    System.out.println(" 消息已满-- 需要阻塞");
                    notFull.await();
                }
                String msgStr = "写入消息" + UUID.randomUUID();
                msg.add(msgStr);
                System.out.println(msgStr);
                Thread.sleep(1000);
                // 生产者产生消息后通知对应的消费者
                notEmpty.signal();
            } catch (InterruptedException e) {
                e.printStackTrace();
            } finally {
            	// 生产者释放锁
                lock.unlock();
            }
        }


    }


}
/**
	* 消费者产生数据模型
	**/
class Consumer implements Runnable {
    private List<String> msg;
    private Lock lock;
    private Condition notEmpty;
    private Condition notFull;
    private Integer maxSize;

    public Consumer(List<String> msg, Lock lock, Condition notEmpty, Condition notFull, Integer maxSize) {
        this.msg = msg;
        this.lock = lock;
        this.notEmpty = notEmpty;
        this.notFull = notFull;
        this.maxSize = maxSize;
    }

    @Override
    public void run() {
        while (true) {
        // 获取lock 锁,如果获取失败则进入到AQS 同步阻塞队列(双向队列)
            lock.lock();
            try {
                while (msg.isEmpty()) {
                    // 消息队列为空-- 需要阻塞
                    System.out.println("消息队列为空-- 需要阻塞:");
                    notEmpty.await();
                }
                System.out.println("获取消息:" + msg.get(0));
                msg.remove(0);
                Thread.sleep(1000);
                // 消费者消费消息后通知对应的生产者
                notFull.signal();
            } catch (InterruptedException e) {
                e.printStackTrace();
            } finally {
              // 消费者释放锁
                lock.unlock();
            }
        }


    }

}

2 生产者与消费者模型过程分析:
线程获取锁的过程,参考:JAVA并发编程–4.1理解ReentrantLock
2.1 生产者获取lock 锁, 生产消息,当队列满时,调用awaitt() 释放当前线程持有的锁,并阻塞当前线程:
AbstractQueuedSynchronizer.await():

 public final void await() throws InterruptedException {
            if (Thread.interrupted())
                throw new InterruptedException();
             // 构建Condition单向链表,将当前节点加入到此单向链表中
            Node node = addConditionWaiter();
            //  // 完全释放锁,返回当前线程对锁的重入次数
            int savedState = fullyRelease(node);
            int interruptMode = 0;
            while (!isOnSyncQueue(node)) {
            // 如果当前node 节点只在Condition单向链表 不在AQS 同步阻塞队列中,则返回false,进入此while 循环
                LockSupport.park(this);// 挂起档当前的线程
                if ((interruptMode = checkInterruptWhileWaiting(node)) != 0)
                    break;.// 当前线程中断则跳出循环
            }
            //  在AQS 同步队列中唤醒的node 节点去抢占锁
            if (acquireQueued(node, savedState) && interruptMode != THROW_IE)
                interruptMode = REINTERRUPT;
            if (node.nextWaiter != null) // clean up if cancelled
                unlinkCancelledWaiters();//  将Condition单向链表中年已经是取消状态的线程从队列中剔除
            if (interruptMode != 0)
                reportInterruptAfterWait(interruptMode);// 线程中断标记
        }

addConditionWaiter:

 /**
         * Adds a new waiter to wait queue.
         * @return its new wait node
         */
        private Node addConditionWaiter() {
            Node t = lastWaiter;// 最后一个等待节点 初始为null,后续线程进入时 t指向行单向链表的尾节点
            // If lastWaiter is cancelled, clean out.
            if (t != null && t.waitStatus != Node.CONDITION) {
                unlinkCancelledWaiters();// 清除失效节点
                t = lastWaiter;
            }
             // 构建一个新的节点 static final int CONDITION = -2;
            Node node = new Node(Thread.currentThread(), Node.CONDITION);
            if (t == null)// 第一次 t 为null
                firstWaiter = node;// firstWaiter指针指向新创建的node
            else // 尾节点的下一节点指向新创建的node 节点;即将 Node 节点加入到单向链表中
                t.nextWaiter = node;
            lastWaiter = node;// lastWaiter 指针指向新创建的node
            return node;
        }

第一次:ThreadA(单向链表构建示意)
在这里插入图片描述
第二个ThreadB(单向链表构建示意)
在这里插入图片描述
fullyRelease 完全释放锁 :

 final int fullyRelease(Node node) {
        boolean failed = true;
        try {
        // 获取当前lock 的state (锁的次数)
            int savedState = getState();
            if (release(savedState)) {{// 释放锁
                failed = false;// 释放锁成功,失败标识置为false
                return savedState;
            } else {// 释放失败抛出异常
                throw new IllegalMonitorStateException();
            }
        } finally {
            if (failed)// 如果释放锁失败,则证明释放锁过程中线程出现异常
                node.waitStatus = Node.CANCELLED;// 将当前condition 单向链表中的改节点置为取消状态
        }
    }

release(int arg):

public final boolean release(int arg) {
        if (tryRelease(arg)) {
        	// 释放锁成功
            Node h = head;
            if (h != null && h.waitStatus != 0)
                unparkSuccessor(h);// 唤醒AQS 中的头部节点去抢占锁
            return true;
        }
        return false;
    }

unparkSuccessor:

 private void unparkSuccessor(Node node) {
        /*
         * If status is negative (i.e., possibly needing signal) try
         * to clear in anticipation of signalling.  It is OK if this
         * fails or if status is changed by waiting thread.
         */
        int ws = node.waitStatus;
        if (ws < 0)
            compareAndSetWaitStatus(node, ws, 0);

        /*
         * Thread to unpark is held in successor, which is normally
         * just the next node.  But if cancelled or apparently null,
         * traverse backwards from tail to find the actual
         * non-cancelled successor.
         */
        Node s = node.next;
        if (s == null || s.waitStatus > 0) {
            s = null;
            for (Node t = tail; t != null && t != node; t = t.prev)
                if (t.waitStatus <= 0)
                    s = t;
        }
        if (s != null)
            LockSupport.unpark(s.thread);// 唤醒线程
    }

isOnSyncQueue:是否在AQS同步双向链表中:

/**
     * Returns true if a node, always one that was initially placed on
     * a condition queue, is now waiting to reacquire on sync queue.
     * @param node the node
     * @return true if is reacquiring
     */
    final boolean isOnSyncQueue(Node node) {
        if (node.waitStatus == Node.CONDITION || node.prev == null)
            return false;// 当前节点的waitStatus  是CONDITION  或者当前节点的前置节点为空则标明在Condition 单向链表中
        if (node.next != null) // If has successor, it must be on queue 不在Condition 单向链表中 已定在AQS队列中
            return true;// 挡圈节点不为尾节点返回true
        /*
         * node.prev can be non-null, but not yet on queue because
         * the CAS to place it on queue can fail. So we have to
         * traverse from tail to make sure it actually made it.  It
         * will always be near the tail in calls to this method, and
         * unless the CAS failed (which is unlikely), it will be
         * there, so we hardly ever traverse much.
         */
        return findNodeFromTail(node);
    }

findNodeFromTail 遍历AQS队列 寻找node 节点:

 /**
     * Returns true if node is on sync queue by searching backwards from tail.
     * Called only when needed by isOnSyncQueue.
     * @return true if present
     */
    private boolean findNodeFromTail(Node node) {
        Node t = tail;
        for (;;) {
            if (t == node)
                return true;
            if (t == null)
                return false;
            t = t.prev;
        }
    }

acquireQueued(node, savedState) 当前线程获取锁:

 /**
     * Acquires in exclusive uninterruptible mode for thread already in
     * queue. Used by condition wait methods as well as acquire.
     *
     * @param node the node
     * @param arg the acquire argument
     * @return {@code true} if interrupted while waiting
     */
    final boolean acquireQueued(final Node node, int arg) {
        boolean failed = true;
        try {
        // 是否中断标识
            boolean interrupted = false;
            for (;;) {
            	// 当前节点的前置节点是头结点,则尝试去获取锁
                final Node p = node.predecessor();
                if (p == head && tryAcquire(arg)) {
                	// 获取锁成功从AQS中移除改node 节点
                    setHead(node);
                    p.next = null; // help GC
                    failed = false;
                    return interrupted;
                }
                // 抢占不到锁则挂起当前线程
                if (shouldParkAfterFailedAcquire(p, node) &&
                    parkAndCheckInterrupt())
                    interrupted = true;
            }
        } finally {
            if (failed)
                cancelAcquire(node);// 从AQS 中移除失效节点
        }
    }

setHead(node):

 private void setHead(Node node) {
        head = node;
        node.thread = null;
        node.prev = null;
    }

2.2 消费者获取lock 锁 ,在消费消息后,调用signal() 唤醒生产者:
消费者获取lock 锁, 消费消息,当队列为空时,也会调用awaitt() 释放当前线程持有的锁,并阻塞当前线程:
AbstractQueuedSynchronizer:
signal() 将当前condition队列中的一个头部元素转移至AQS队列中:

/**
         * Moves the longest-waiting thread, if one exists, from the
         * wait queue for this condition to the wait queue for the
         * owning lock.
         *
         * @throws IllegalMonitorStateException if {@link #isHeldExclusively}
         *         returns {@code false}
         */
        public final void signal() {
            if (!isHeldExclusively())// 如果当前线程没有获取锁则抛出异常
                throw new IllegalMonitorStateException();
            Node first = firstWaiter;// 获取condition队列中的头部节点
            if (first != null)
                doSignal(first);// 转移改节点至AQS队列
        }

doSignal(Node first):

 /**
         * Removes and transfers nodes until hit non-cancelled one or
         * null. Split out from signal in part to encourage compilers
         * to inline the case of no waiters.
         * @param first (non-null) the first node on condition queue
         */
        private void doSignal(Node first) {
            do {
                if ( (firstWaiter = first.nextWaiter) == null)// condition队列中只有一个节点
                    lastWaiter = null;
                first.nextWaiter = null;// 从condition队列中移除改node 节点
            } while (!transferForSignal(first) &&
                     (first = firstWaiter) != null);
        }

transferForSignal(first):

 /**
     * Transfers a node from a condition queue onto sync queue.
     * Returns true if successful.
     * @param node the node
     * @return true if successfully transferred (else the node was
     * cancelled before signal)
     */
    final boolean transferForSignal(Node node) {
        /*
         * If cannot change waitStatus, the node has been cancelled.
         */
         // 设置node 的waitstate为0,设置失败意味改线程已经被取消
        if (!compareAndSetWaitStatus(node, Node.CONDITION, 0))
            return false;

        /*
         * Splice onto queue and try to set waitStatus of predecessor to
         * indicate that thread is (probably) waiting. If cancelled or
         * attempt to set waitStatus fails, wake up to resync (in which
         * case the waitStatus can be transiently and harmlessly wrong).
         */
         // 将当前node 加入到同步阻塞队列中并返回之前AQS 中tail 节点
        Node p = enq(node);
        int ws = p.waitStatus;
        if (ws > 0 || !compareAndSetWaitStatus(p, ws, Node.SIGNAL))
        // 如果waitStatus  >0 (线程取消状态);或者设置node 的waitStatus   为SIGNAL 失败时 则唤醒之前AQS 中tail 节点线程;
            LockSupport.unpark(node.thread);// 优化方式此时唤醒可以使得AQS队列中及时的清除失效节点
     

消费者线程调用unlock() 方法从AQS 队列中唤醒线程去抢占锁。

3 await 和signal 过程:
(1)生产者(Producer ) 线程A ,线程B,去抢占锁;线程A获取到锁,线程B没有抢占到锁则进入AQS 队列;消费者线程C 没有抢占到锁则进入AQS 队列;
(2)线程A 执行任务后调用signal()/signalAll();此时condition 队列中中没有元素;
(3)线程A 在执行任务过程中,达到一定条件,则调用await() 方法;将当前的node 节点(new Node(Thread.currentThread(), Node.CONDITION))放入到condition 单向队列中;并且完全释放锁,并且挂起当前的线程;并且从AQS 同步队列中唤醒一个加入时间最早的Node去抢占锁;
(4)线程B 抢占到锁同线程A一样,在达到一定条件,则调用await() 方法;将当前的node 节点(new Node(Thread.currentThread(), Node.CONDITION))放入到condition 单向队列中;并且完全释放锁,并且挂起当前的线程;并且从AQS 同步队列中唤醒一个加入时间最早的Node去抢占锁;
(5)线程C(消费者) 抢占到锁,消费信息后,调用用signal()/signalAll();将位于condition 单向链表中的Node 一个/全部节点转移到AQS 队列中;
(6)线程C(消费者) 业务完成调unlock() 方法,从从AQS 同步队列中唤醒一个加入时间最早的Node去抢占锁;
(7)线程A(生产者) 抢占锁,如果抢占到锁则进行执行任务,抢占不到锁则被park,挂起当前线程,等锁的抢占;

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.coloradmin.cn/o/42946.html

如若内容造成侵权/违法违规/事实不符,请联系多彩编程网进行投诉反馈,一经查实,立即删除!

相关文章

java 每日一练 (5)

java 每日一练(5) 文章目录单选不定项选择题编程题单选 1.下面的程序 编译运行后&#xff0c;在屏幕上显示的结果是&#xff08;&#xff09; A&#xff1a; 0 B &#xff1a; 2 C&#xff1a; 5 D &#xff1a;80 5的二进制 &#xff1a; 0101   右移 >>: 最右侧位不…

_gdb和进程概念

gdb 在windows下的vs2013下&#xff0c;我们可以进行调试&#xff0c;方便我们了解程序的具体的运行步骤和存在的问题&#xff0c;那么在Linux中&#xff0c;也存在这样一种调试机制&#xff0c;gdb就是在Linux系统下的调试指令。 Linux下和Windows下的调试的区别 答&#x…

如何在保证身份安全的同时提升员工体验

自疫情以来越来越多的企业员工的工作方式都发生了改版。远程和线上下混合工作形式已经成了新常态&#xff0c;企业员工希望随时随地都能访问其工作资源。而且为了方便办公他们更希望使用多种设备&#xff08;甚至是个人设备&#xff09;来访问公司数据。 运维有小邓 在如此复杂…

网上商城购物系统设计与实现(Java+Web+SSM+MySQL)

目 录 1 绪论 1 1.1 研究背景 1 1.2 目的和意义 1 1.3 开发工具及技术 1 2 需求分析 3 2.1 功能需求分析 3 2.1.1 网站前台功能 3 2.1.2 网站后台功能 3 2.2 性能分析 3 2.3 系统用户用例图 4 3 系统设计 5 3.1 系统的总体设计 5 3.2 数据库的分析与设计 5 3.2.1数据库概念设计…

DDR PCB设计布线时,拓扑结构的选择

在PCB设计时我们在处理DDR部分的时候都会进行一个拓扑的选择&#xff0c;一般DDR有T点和Fly-by两种拓扑结构&#xff0c;那么这两种拓扑结构的应用场景和区别有哪些呢&#xff1f; T点拓扑结构&#xff1a;CPU出来的信号线经过一个过孔后分别向两边进行连接,分叉点一般在信号的…

哪款蓝牙耳机打电话好用?打电话用的蓝牙耳机推荐

我们需要集中精神&#xff0c;闹中取静。特别是工作或者学习的时候&#xff0c;为了让意志力力更加集中&#xff0c;提高工作或者学习的效率&#xff0c;或是对于一个热爱音乐的人来说&#xff0c;蓝牙耳机肯定是必不可少的&#xff1b;蓝牙耳机现在的功能有很多&#xff0c;质…

锐捷交换机系统安装与升级

锐捷交换机系统安装与升级 文章目录锐捷交换机系统安装与升级一、实验步骤二、实验实施1、到锐捷官网下载交换机型号对应的系统文件2、配置本机IP&#xff0c;与将要升级设备互联3、打开文件中的TFTPserver4、进入BOOT模式恢复系统交换机在MGMT的情况下&#xff0c;通过MGMT口来…

主数据管理系统mdm哪个产品好,为什么,越详细越好?

当企业对主数据管理不善时&#xff0c;会出现数据冗余、数据不一致、业务低效、数据孤岛等问题&#xff0c;多个系统之间的数据难以协调&#xff0c;难以发挥数据的价值。这时我们就需要对企业主数据进行管理。 亿信华辰前不久在“2022中国数字经济创新发展大会”上荣获“2022年…

艾美捷QuickTiter 逆转录病毒定量试剂盒的制备方案

Cell Biolabs艾美捷QuickTiter逆转录病毒定量试剂盒提供了一种测定逆转录病毒滴度的快速方法。该测定法测量逆转录病毒的病毒核酸含量&#xff0c;可以在纯化病毒之前或之后进行。 试剂的制备&#xff1a; •1X QuickTiter™ 解决方案C&#xff1a;准备1X QuickTiter™ 溶液C通…

[Linux] 常用命令--文件操作grep/nl/more/less/head tail/set

✨✨个人主页:沫洺的主页 &#x1f4da;&#x1f4da;系列专栏: &#x1f4d6; JavaWeb专栏&#x1f4d6; JavaSE专栏 &#x1f4d6; Java基础专栏&#x1f4d6;vue3专栏 &#x1f4d6;MyBatis专栏&#x1f4d6;Spring专栏&#x1f4d6;SpringMVC专栏&#x1f4d6;SpringBoot专…

【仿牛客网笔记】项目进阶,构建安全高效的企业服务——将文件上传至云服务器

使用七牛云,首先进行注册&#xff0c;注册之后认证。 https://www.qiniu.com 登录七牛云 查看手册 存储的时候使用对象存储 SDK 存储空间 域名30天&#xff0c;到期后重新创建 具有独立域名可以绑定域名。 再创建一个空间为community_share 在项目中引用七牛云 首先…

python的继承知识点总结

python继承&#xff0c;python丰富的类因为继承而变得多姿多彩&#xff0c;如果语言不支持继承&#xff0c;那么类就没什么优势。 1、首先我们来定义两个类 一个dog类&#xff0c;一个bird类class Dog: def sleeping(self): print (dog 正在睡觉) def speaking(…

指纹浏览器是什么?可以用来解决广告投放的什么问题?

说到指纹浏览器&#xff0c;相信很多跨境电商人都不陌生&#xff0c;但是很多小伙伴不知道的是&#xff0c;指纹浏览器对于广告投放来说也是非常有帮助的工具&#xff01;为什么呢&#xff1f;今天&#xff0c;东哥就跟大家聊一聊指纹浏览器到底为什么适合用于广告投放和广告营…

项目管理中,项目干系人的角色和责任

项目干系人是指企业内部或外部的团体、单位、个人或组织&#xff0c;他们受到项目结果的影响&#xff0c;或能够影响到项目的结果。主要干系人是干系人的一个子集&#xff0c;由于他们直接受到项目结果变化的影响&#xff0c;如果他们的支持被撤回&#xff0c;将导致项目失败。…

D1. 388535 (Easy Version)(异或+二进制位)

Problem - 1658D1 - Codeforces 这是该问题的简单版本。两个版本的约束条件的差异在下面用红色标出。只有当所有版本的问题都解决了&#xff0c;你才能进行黑客攻击。 Marin和Gojou正在和一个数组玩捉迷藏。 Gojou最初执行了以下步骤。 首先&#xff0c;Gojou选择了2个整数l…

如何选择合适的 API 网关

如今&#xff0c;API 网关是设计具有多个 API 服务或微服务的分布式系统架构的重要组成部分。这篇文章帮助您了解什么是 API 网关、何时以及为何使用它&#xff0c;并指导您如何为您的应用程序选择最佳的 API 网关解决方案。 什么是 API 网关&#xff1f; API 网关是一种服务…

【项目_02】隐藏tabbar、对城市数据进行获取、处理、渲染到页面上、城市回显 | 基于Vue3全家桶

&#x1f4ad;&#x1f4ad; ✨&#xff1a;隐藏tabbar、对城市数据进行获取、处理、渲染到页面上 | 旅途拾景   &#x1f49f;&#xff1a;东非不开森的主页   &#x1f49c;: 怎么会没有遗憾呢&#xff0c;一直向前就对了&#x1f49c;&#x1f49c;   &#x1f338;: 如有…

考 PMP 证书真有用吗?

有用还是有用的&#xff0c;但是毕竟是一纸证书&#xff0c;本身的作用有限&#xff0c;还是要看就业环境看行业对 PMP 证书的重视程度&#xff0c;目前来说&#xff0c;pmp 在行业还是吃香的。 ​ 一、PMP 证书的市场需求 1、行业认可度高&#xff0c;市场需求大 PMP 是由…

【附源码】计算机毕业设计JAVA郑工社团交流服务信息平台

【附源码】计算机毕业设计JAVA郑工社团交流服务信息平台 目运行 环境项配置&#xff1a; Jdk1.8 Tomcat8.5 Mysql HBuilderX&#xff08;Webstorm也行&#xff09; Eclispe&#xff08;IntelliJ IDEA,Eclispe,MyEclispe,Sts都支持&#xff09;。 项目技术&#xff1a; …

2009(408)数据结构有关链表代码题

算法思想 第一种&#xff08;普通算法&#xff09;:遍历链表&#xff0c;输出链表的长度&#xff0c;比较链表长度与k的关系&#xff0c;若长度小于k则失败&#xff0c;返回数值0。如果长度大于k&#xff0c;将指针移动到第倒数第k个位置&#xff0c;输出data的值&#xff0c;…