第十四章 Java多线程--阻塞队列--SynchronousQueue

news2025/1/13 11:46:41

目录

一、SynchronousQueue基础概念

主要特点

使用场景

示例代码

二、SynchronousQueue深入了解

1 SynchronousQueue介绍

2 SynchronousQueue核心属性

3 SynchronousQueue的TransferQueue源码

3.1 QNode源码信息

3.2 transfer方法实现

3.3 tansfer方法流程图


一、SynchronousQueue基础概念

SynchronousQueue是Java并发包java.util.concurrent中的一种特殊的BlockingQueue实现类。它并不像其他的队列那样拥有固定的容量大小,而是仅仅充当生产者和消费者之间的“传递”作用。当一个元素被放入队列时,必须立即有一个消费者来获取它,否则生产者的线程将会阻塞。同样地,如果试图从队列中取出一个元素,那么必须立即有一个生产者来放入一个元素,否则消费者的线程也会被阻塞。

主要特点

  1. 无缓冲:SynchronousQueue不存储元素,它仅仅作为一个传递元素的场所。

  2. 生产者消费者模式:SynchronousQueue非常适合用于实现生产者-消费者模式,其中生产者产生的元素必须立即被消费者消费掉。

  3. 线程阻塞:如果生产者尝试向队列中插入元素,但没有消费者来接收,则生产者的线程会被阻塞;反之亦然。

使用场景

SynchronousQueue适用于如下几种场景:

  • 需要立即处理数据的情况,不能有任何延迟。

  • 不希望在队列中保留任何数据,而是希望尽快传递给下一个处理者。

  • 需要在两个线程之间直接传递数据。

示例代码

下面是一个简单的使用SynchronousQueue的例子:

import java.util.concurrent.SynchronousQueue;
import java.util.concurrent.TimeUnit;
public class SynchronousQueueExample {
    public static void main(String[] args) {
        SynchronousQueue<Integer> queue = new SynchronousQueue<>();
        // 生产者线程
        Thread producer = new Thread(() -> {
            try {
                System.out.println("Producer: Adding element to the queue");
                queue.put(42); // 生产者放入数据
                System.out.println("Producer: Element added");
            } catch (InterruptedException e) {
                e.printStackTrace();
            }
        });

        // 消费者线程
        Thread consumer = new Thread(() -> {
            try {
                Integer value = queue.take(); // 消费者获取数据
                System.out.println("Consumer: Got " + value + " from the queue");
            } catch (InterruptedException e) {
                e.printStackTrace();
            }
        });

        producer.start();
        consumer.start();

        try {
            producer.join();
            consumer.join();
        } catch (InterruptedException e) {
            e.printStackTrace();
        }
    }
}

在这个例子中,生产者线程尝试向队列中放入一个整数42,而消费者线程则尝试从中取出数据。由于SynchronousQueue的特点,生产者线程只有在消费者线程成功取出数据后才能继续执行。

总之,SynchronousQueue是一个非常有用的工具,特别是在需要即时通信和传递数据的场景中。然而,由于其无缓冲的特性,使用时需要特别注意同步和线程安全问题。

二、SynchronousQueue深入了解

1 SynchronousQueue介绍

SynchronousQueue这个阻塞队列和其他的阻塞队列有很大的区别

在咱们的概念中,队列肯定是要存储数据的,但是SynchronousQueue不会存储数据的

SynchronousQueue队列中,他不存储数据,存储生产者或者是消费者

当存储一个生产者到SynchronousQueue队列中之后,生产者会阻塞(看你调用的方法)

生产者最终会有几种结果:

  • 如果在阻塞期间有消费者来匹配,生产者就会将绑定的消息交给消费者

  • 生产者得等阻塞结果,或者不允许阻塞,那么就直接失败

  • 生产者在阻塞期间,如果线程中断,直接告辞。

同理,消费者和生产者的效果是一样。

生产者和消费者的数据是直接传递的,不会经过SynchronousQueue。

SynchronousQueue是不会存储数据的。

经过阻塞队列的学习:

生产者:

  • offer():生产者在放到SynchronousQueue的同时,如果有消费者在等待消息,直接配对。如果没有消费者在等待消息,这里直接返回,告辞。

  • offer(time,unit):生产者在放到SynchronousQueue的同时,如果有消费者在等待消息,直接配对。如果没有消费者在等待消息,阻塞time时间,如果还没有,告辞。

  • put():生产者在放到SynchronousQueue的同时,如果有消费者在等待消息,直接配对。如果没有,死等。

消费者:poll(),poll(time,unit),take()。道理和上面的生产者一致。

测试效果:

public static void main(String[] args) throws InterruptedException {
    // 因为当前队列不存在数据,没有长度的概念。
    SynchronousQueue queue = new SynchronousQueue();

    String msg = "消息!";
    /*new Thread(() -> {
        // b = false:代表没有消费者来拿
        boolean b = false;
        try {
            b = queue.offer(msg,1, TimeUnit.SECONDS);
        } catch (InterruptedException e) {
            e.printStackTrace();
        }
        System.out.println(b);
    }).start();

    Thread.sleep(100);

    new Thread(() -> {
        System.out.println(queue.poll());
    }).start();*/
    new Thread(() -> {
        try {
            System.out.println(queue.poll(1, TimeUnit.SECONDS));
        } catch (InterruptedException e) {
            e.printStackTrace();
        }
    }).start();

    Thread.sleep(100);

    new Thread(() -> {
        queue.offer(msg);
    }).start();
}

2 SynchronousQueue核心属性

进到SynchronousQueue类的内部后,发现了一个内部类,Transferer,内部提供了一个transfer的方法

abstract static class Transferer<E> {
    abstract E transfer(E e, boolean timed, long nanos);
}

当前这个类中提供的transfer方法,就是生产者和消费者在调用读写数据时要用到的核心方法。

生产者在调用上述的transfer方法时,第一个参数e会正常传递数据

消费者在调用上述的transfer方法时,第一个参数e会传递null

SynchronousQueue针对抽象类Transferer做了几种实现。

一共看到了两种实现方式:

  • TransferStack

  • TransferQueue

这两种类继承了Transferer抽象类,在构建SynchronousQueue时,会指定使用哪种子类

// 到底采用哪种实现,需要把对应的对象存放到这个属性中
private transient volatile Transferer<E> transferer;
// 采用无参时,会调用下述方法,再次调用有参构造传入false
public SynchronousQueue() {
    this(false);
}
// 调用的是当前的有参构造,fair代表公平还是不公平
public SynchronousQueue(boolean fair) {
    // 如果是公平,采用Queue,如果是不公平,采用Stack
    transferer = fair ? new TransferQueue<E>() : new TransferStack<E>();
}

TransferQueue的特点

 

代码查看效果

public static void main(String[] args) throws InterruptedException {
    // 因为当前队列不存在数据,没有长度的概念。
    SynchronousQueue queue = new SynchronousQueue(true);
    SynchronousQueue queue = new SynchronousQueue(false);

    new Thread(() -> {
        try {
            queue.put("生1");
        } catch (InterruptedException e) {
            e.printStackTrace();
        }
    }).start();
    new Thread(() -> {
        try {
            queue.put("生2");
        } catch (InterruptedException e) {
            e.printStackTrace();
        }
    }).start();
    new Thread(() -> {
        try {
            queue.put("生3");
        } catch (InterruptedException e) {
            e.printStackTrace();
        }
    }).start();

    Thread.sleep(100);
    new Thread(() -> {
        System.out.println("消1:" + queue.poll());
    }).start();
    Thread.sleep(100);
    new Thread(() -> {
        System.out.println("消2:" + queue.poll());
    }).start();
    Thread.sleep(100);
    new Thread(() -> {
        System.out.println("消3:" + queue.poll());
    }).start();
}

3 SynchronousQueue的TransferQueue源码

为了查看清除SynchronousQueue的TransferQueue源码,需要从两点开始查看源码信息

3.1 QNode源码信息

static final class QNode {
    // 当前节点可以获取到next节点
    volatile QNode next;  
    // item在不同情况下效果不同
    // 生产者:有数据
    // 消费者:为null
    volatile Object item;   
    // 当前线程
    volatile Thread waiter;   
    // 当前属性是区分消费者和生产者的属性
    final boolean isData;
    // 最终生产者需要将item交给消费者
    // 最终消费者需要获取生产者的item
    // 省略了大量提供的CAS操作
    ....
}

3.2 transfer方法实现

// 当前方法是TransferQueue的核心内容
// e:传递的数据
// timed:false,代表无限阻塞,true,代表阻塞nacos时间
E transfer(E e, boolean timed, long nanos) {
    // 当前QNode是要封装当前生产者或者消费者的信息
    QNode s = null; 
    // isData == true:代表是生产者
    // isData == false:代表是消费者
    boolean isData = (e != null);
    // 死循环
    for (;;) {
        // 获取尾节点和头结点
        QNode t = tail;
        QNode h = head;
        // 为了避免TransferQueue还没有初始化,这边做一个健壮性判断
        if (t == null || h == null)   
            continue;  

        // 如果满足h == t 条件,说明当前队列没有生产者或者消费者,为空
        // 如果有节点,同时当前节点和队列节点属于同一种角色。
        // if中的逻辑是进到队列
        if (h == t || t.isData == isData) { 
            // ===================在判断并发问题==========================
            // 拿到尾节点的nextQNode tn = t.next;
            // 如果t不为尾节点,进来说明有其他线程并发修改了tail
            if (t != tail)   
                // 重新走for循环   
                continue;
            // tn如果为不null,说明前面有线程并发,添加了一个节点
            if (tn != null) {  
                // 直接帮助那个并发线程修改tail的指向   
                advanceTail(t, tn);
                // 重新走for循环   
                continue;
            }
            // 获取当前线程是否可以阻塞
            // 如果timed为true,并且阻塞的时间小于等于0
            // 不需要匹配,直接告辞!!!
            if (timed && nanos <= 0)   
                return null;
            // 如果可以阻塞,将当前需要插入到队列的QNode构建出来
            if (s == null)
                s = new QNode(e, isData);
            // 基于CAS操作,将tail节点的next设置为当前线程
            if (!t.casNext(null, s))   
                // 如果进到if,说明修改失败,重新执行for循环修改   
                continue;
            // CAS操作成功,直接替换tail的指向
            advanceTail(t, s);   
            // 如果进到队列中了,挂起线程,要么等生产者,要么等消费者。
            // x是返回替换后的数据
            Object x = awaitFulfill(s, e, timed, nanos);
            // 如果元素和节点相等,说明节点取消了
            if (x == s) {  
                // 清空当前节点,将上一个节点的next指向当前节点的next,直接告辞   
                clean(t, s);
                return null;
            }
            // 判断当前节点是否还在队列中
            if (!s.isOffList()) {   
                // 将当前节点设置为head
                advanceHead(t, s);   
                // 如果 x != null, 如果拿到了数据,说明我是消费者
                if (x != null)   
                    // 将当前节点的item设置为自己   
                    s.item = s;
                // 线程置位null
                s.waiter = null;
            }
            // 返回数据
            return (x != null) ? (E)x : e;
        } 
        // 匹配队列中的橘色
        else {   
            // 拿到head的next,作为要匹配的节点   
            QNode m = h.next;  
            // 做并发判断,如果头节点,尾节点,或者head.next发生了变化,这边要重新走for循环
            if (t != tail || m == null || h != head)
                continue;  
            // 没并发问题,可以拿数据
            // 拿到m节点的item作为x。Object x = m.item;
            // 如果isData == (x != null)满足,说明当前出现了并发问题,避免并发消费出现坑
            if (isData == (x != null) ||  
                // 如果排队的节点取消,就会讲当前QNode中的item指向QNode
                x == m ||   
                // 如果前面两个都没满足,可以交换数据了。 
                // 如果交换失败,说明有并发问题,
                !m.casItem(x, e)) {   
                // 重新设置head节点,并且再走一次循环  
                advanceHead(h, m);  
                continue;
            }
            // 替换head
            advanceHead(h, m);  
            // 唤醒head.next中的线程
            LockSupport.unpark(m.waiter);
            // 这边匹配好了,数据也交换了,直接返回
            // 如果 x != null,说明队列中是生产者,当前是消费者,这边直接返回x具体数据
            // 反之,队列中是消费者,当前是生产者,直接返回自己的数据
            return (x != null) ? (E)x : e;
        }
    }
}

3.3 tansfer方法流程图

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.coloradmin.cn/o/2204591.html

如若内容造成侵权/违法违规/事实不符,请联系多彩编程网进行投诉反馈,一经查实,立即删除!

相关文章

【C++堆(优先队列)】1834. 单线程 CPU|1797

本文涉及知识点 C堆(优先队列) LeetCode1834. 单线程 CPU 给你一个二维数组 tasks &#xff0c;用于表示 n​​​​​​ 项从 0 到 n - 1 编号的任务。其中 tasks[i] [enqueueTimei, processingTimei] 意味着第 i​​​​​​​​​​ 项任务将会于 enqueueTimei 时进入任务…

QStandardItemModel的role

QStandardItemModel定义了一些标准的角色&#xff0c;而QAbstractItemModel允许自定义角色。以下是一些常见的数据角色&#xff1a;1. **Qt::DisplayRole**&#xff1a;这是最基本的角色&#xff0c;用于显示在视图中的文本。当一个单元格被绘制时&#xff0c;通常会查询这个角…

Go 语言应用开发:从入门到实战

Go 语言应用开发&#xff1a;从入门到实战 引言 Go&#xff08;Golang&#xff09;是由 Google 开发的一种开源编程语言&#xff0c;设计初衷是提高编程效率&#xff0c;尤其是在高并发场景下表现出色。Go 语言以其简洁、易学、高效并发的特性&#xff0c;逐渐成为开发者的首…

<Project-8.1.1 pdf2tx-mm> Python 调用 ChatGPT API 翻译PDF内容 历程心得

原因 用ZhipuAI&#xff0c;测试用的PDF里&#xff0c;有国名西部省穆斯林&#xff0c;翻译结果返回 “系统检测到输入或生成内容可能包含不安全或敏感内容&#xff0c;请您避免输入易产生敏感内容的提 示语&#xff0c;感谢您的配合” 。想过先替换掉省名、民族名等&#xff…

DM8数据库用户和表空间管理

1 说明 DM8用户管理和表空间管理常用的管理命令&#xff0c;包括创建、修改和查看信息操作等。 2 用户管理 2.1 创建用户 创建一个用户lu9up&#xff0c;密码为"admin2024."&#xff0c;未制定表空间&#xff0c;使用默认的表空间main。 SQL> create user lu…

银河麒麟桌面操作系统V10:解决激活时“无法获取硬件信息(错误码#0017)”问题

银河麒麟桌面操作系统V10&#xff1a;解决激活时“无法获取硬件信息&#xff08;错误码#0017&#xff09;”问题 1、问题描述2、问题解决方法步骤一&#xff1a;打开终端步骤二&#xff1a;删除/etc/.kyhwid文件步骤三&#xff1a;重新激活系统总结 &#x1f490;The Begin&…

【Ubuntu】在Ubuntu上安装IDEA

【Ubuntu】在Ubuntu上安装IDEA 零、前言 最近换了Ubuntu系统&#xff0c;但是还得是要写代码&#xff0c;这样就不可避免地用到IDEA&#xff0c;接下来介绍一下如何在Ubuntu上安装IDEA。 壹、下载 这一步应该很容易的&#xff0c;直接打开IDEA的下载页面&#xff0c;点击下…

Python RabbitMQ 入门 pika

Python RabbitMQ 入门 RabbitMQ是实现了高级消息队列协议&#xff08;AMQP&#xff09;的开源消息代理软件&#xff08;亦称面向消息的中间件&#xff09;。RabbitMQ服务器是用Erlang语言编写的&#xff0c;而集群和故障转移是构建在开放电信平台框架上的。所有主要的编程语言均…

基于vue的酒店预订管理系统(源码+定制+开发)

博主介绍&#xff1a; ✌我是阿龙&#xff0c;一名专注于Java技术领域的程序员&#xff0c;全网拥有10W粉丝。作为CSDN特邀作者、博客专家、新星计划导师&#xff0c;我在计算机毕业设计开发方面积累了丰富的经验。同时&#xff0c;我也是掘金、华为云、阿里云、InfoQ等平台…

Paperless-ngx文档管理系统本地部署并实现远程使用搜索查阅文件

文章目录 前言1. 部署Paperless-ngx2. 本地访问Paperless-ngx3. Linux安装Cpolar4. 配置公网地址5. 远程访问6. 固定Cpolar公网地址7. 固定地址访问 前言 本文主要介绍如何在Linux系统本地部署Paperless-ngx开源文档管理系统&#xff0c;并结合cpolar内网穿透工具解决本地部署…

Visual Studio 2022安装(含重生版)

前言&#xff1a; 昨天调试代码的时候发现程序怎么都运行不了&#xff0c;错误显示无法找到文件啊啊啊&#xff0c;能力有限&#xff0c;找不出错误源&#xff0c;然后就狠心删掉所有相关文件来“重新开始”&#xff01; 正文&#xff1a; 1.官网下载&#xff08;内定中文版…

欧科云链研究院深掘链上数据:洞察未来Web3的隐秘价值

目前链上数据正处于迈向下一个爆发的重要时刻。 随着Web3行业发展&#xff0c;公链数量呈现爆发式的增长&#xff0c;链上积聚的财富效应&#xff0c;特别是由行业热点话题引领的链上交互行为爆发式增长带来了巨量的链上数据&#xff0c;这些数据构筑了一个行为透明但与物理世…

前后分离项目记录

一.前端设置 1.打包问题 打包报错 Thread Loader时&#xff0c;增加以下代码&#xff1a; 上线&#xff0c;打包prod时: 2.上线时api设置 二.Nginx问题 1.缓存问题&#xff1a;添加如下代码以禁止缓存&#xff0c;否则在关闭nginx后仍然可以访问页面 2.跨域问题在后端加Cr…

六西格玛设计DFSS方法论在消费级无人机设计中的应用——张驰咨询

本文基于六西格玛设计方法论&#xff0c;对消费级无人机的设计流程进行系统化研究&#xff0c;探讨如何通过六西格玛设计的理念、工具和方法提升无人机产品的设计质量和市场竞争力。文章从市场定位、客户需求分析出发&#xff0c;深入到关键KPI指标的制定&#xff0c;并逐步阐述…

十、索引优化与查询优化

文章目录 1. 数据准备2. 索引失效案例2.1 全值匹配我最爱2.2 最佳左前缀法则2.3 主键插入顺序2.4 计算、函数、类型转换(自动或手动)导致索引失效2.5 类型转换导致索引失效2.6 范围条件右边的列索引失效2.7 不等于(!=或者<>)索引失效2.8 is null 可以使用索引,is not …

STM32 USB CUBEMX

开发背景 使用的平台&#xff1a;STM32H750 注意事项 时钟必须是48MHZ&#xff0c;其它都不行 2. 将默认任务的堆栈设大一点 如果使用操作系统&#xff0c;USB任务跑在默认任务里&#xff0c;因此需要设置默认任务的堆栈缓存是直接定义的全局变量&#xff0c;需要设置编译器…

通过阿里云Milvus和通义千问快速构建基于专属知识库的问答系统

本文展示了如何使用阿里云向量检索Milvus和灵积&#xff08;Dashscope&#xff09;提供的通用千问大模型能力&#xff0c;快速构建一个基于专属知识库的问答系统。在示例中&#xff0c;我们通过接入灵积的通义千问API及文本嵌入&#xff08;Embedding&#xff09;API来实现LLM大…

中标麒麟操作系统:如何查看系统激活状态

中标麒麟操作系统&#xff1a;如何查看系统激活状态 1、图形界面查看方法方法一&#xff1a;任务栏查看方法二&#xff1a;通过“我的电脑”属性查看 2、命令行查看方法 &#x1f490;The Begin&#x1f490;点点关注&#xff0c;收藏不迷路&#x1f490; 本文将介绍两种查看系…

【AI 工具分享】

&#x1f49d;&#x1f49d;&#x1f49d;欢迎来到我的博客&#xff0c;很高兴能够在这里和您见面&#xff01;希望您在这里可以感受到一份轻松愉快的氛围&#xff0c;不仅可以获得有趣的内容和知识&#xff0c;也可以畅所欲言、分享您的想法和见解。 推荐:kwan 的首页,持续学…

自动驾驶系列—厘米级精度:RTK技术如何革新自动驾驶定位

&#x1f31f;&#x1f31f; 欢迎来到我的技术小筑&#xff0c;一个专为技术探索者打造的交流空间。在这里&#xff0c;我们不仅分享代码的智慧&#xff0c;还探讨技术的深度与广度。无论您是资深开发者还是技术新手&#xff0c;这里都有一片属于您的天空。让我们在知识的海洋中…