29.jdk源码阅读之Exchanger

news2025/1/2 0:24:15

1. 写在前面

类继承和实现关系
Exchanger 是 Java 并发包 (java.util.concurrent) 中的一个同步点工具类,用于在两个线程之间交换数据。它提供了一种简单而强大的方式来实现线程之间的数据交换。不知道大家在日常工作中或者面试中 有遇到它?下面几个问题可以一块来探讨下:

  1. Exchanger 的工作原理是什么?
  2. Exchanger 在什么情况下会导致线程阻塞?
  3. Exchanger 是否线程安全?
  4. Exchanger 和其他同步工具类(如 CyclicBarrier 和 CountDownLatch)有什么区别?
  5. Exchanger 的常见使用场景有哪些?
  6. Exchanger 的 exchange 方法在交换数据时是否会丢失数据?
  7. Exchanger 能否用于多个线程之间的数据交换?

2. 从使用说起

2.1 生产者-消费者模式

在生产者-消费者模式中,Exchanger 可以用于生产者和消费者之间交换数据。

import java.util.concurrent.Exchanger;

public class ProducerConsumerExample {
    public static void main(String[] args) {
        Exchanger<String> exchanger = new Exchanger<>();

        Thread producer = new Thread(() -> {
            try {
                String producedData = "Produced Data";
                System.out.println("Producer is producing: " + producedData);
                String consumedData = exchanger.exchange(producedData);
                System.out.println("Producer received: " + consumedData);
            } catch (InterruptedException e) {
                e.printStackTrace();
            }
        });

        Thread consumer = new Thread(() -> {
            try {
                String consumedData = exchanger.exchange(null);
                System.out.println("Consumer is consuming: " + consumedData);
                String feedback = "Consumed Data";
                exchanger.exchange(feedback);
            } catch (InterruptedException e) {
                e.printStackTrace();
            }
        });

        producer.start();
        consumer.start();
    }
}

2.2 任务分配和结果收集

在任务分配和结果收集的场景中,Exchanger 可以用于任务分配线程和结果收集线程之间交换数据。

import java.util.concurrent.Exchanger;

public class TaskAssignmentExample {
    public static void main(String[] args) {
        Exchanger<String> exchanger = new Exchanger<>();

        Thread taskDistributor = new Thread(() -> {
            try {
                String task = "Task for Worker";
                System.out.println("Distributor is distributing: " + task);
                String result = exchanger.exchange(task);
                System.out.println("Distributor received result: " + result);
            } catch (InterruptedException e) {
                e.printStackTrace();
            }
        });

        Thread worker = new Thread(() -> {
            try {
                String task = exchanger.exchange(null);
                System.out.println("Worker received task: " + task);
                String result = "Result of " + task;
                exchanger.exchange(result);
            } catch (InterruptedException e) {
                e.printStackTrace();
            }
        });

        taskDistributor.start();
        worker.start();
    }
}

2.3 双向数据传递

在某些场景中,两个线程可能需要进行双向数据传递。Exchanger 可以简化这一过程。

import java.util.concurrent.Exchanger;

public class BidirectionalDataExchangeExample {
    public static void main(String[] args) {
        Exchanger<String> exchanger = new Exchanger<>();

        Thread thread1 = new Thread(() -> {
            try {
                String data1 = "Data from Thread 1";
                System.out.println("Thread 1 is exchanging: " + data1);
                String receivedData = exchanger.exchange(data1);
                System.out.println("Thread 1 received: " + receivedData);
            } catch (InterruptedException e) {
                e.printStackTrace();
            }
        });

        Thread thread2 = new Thread(() -> {
            try {
                String data2 = "Data from Thread 2";
                System.out.println("Thread 2 is exchanging: " + data2);
                String receivedData = exchanger.exchange(data2);
                System.out.println("Thread 2 received: " + receivedData);
            } catch (InterruptedException e) {
                e.printStackTrace();
            }
        });

        thread1.start();
        thread2.start();
    }
}

2.4 多步数据处理

在多步数据处理的场景中,Exchanger 可以用于在不同处理步骤之间交换数据。

import java.util.concurrent.Exchanger;

public class MultiStepProcessingExample {
    public static void main(String[] args) {
        Exchanger<String> exchanger = new Exchanger<>();

        Thread step1 = new Thread(() -> {
            try {
                String step1Data = "Step 1 Data";
                System.out.println("Step 1 processing: " + step1Data);
                String step2Data = exchanger.exchange(step1Data);
                System.out.println("Step 1 received: " + step2Data);
            } catch (InterruptedException e) {
                e.printStackTrace();
            }
        });

        Thread step2 = new Thread(() -> {
            try {
                String step1Data = exchanger.exchange(null);
                System.out.println("Step 2 received: " + step1Data);
                String step2Data = "Processed " + step1Data;
                System.out.println("Step 2 processing: " + step2Data);
                exchanger.exchange(step2Data);
            } catch (InterruptedException e) {
                e.printStackTrace();
            }
        });

        step1.start();
        step2.start();
    }
}

2.5 超时处理

在某些情况下,线程可能需要在一定时间内完成数据交换,否则就放弃操作。Exchanger 提供了带超时的 exchange 方法。

import java.util.concurrent.Exchanger;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.TimeoutException;

public class TimeoutExample {
    public static void main(String[] args) {
        Exchanger<String> exchanger = new Exchanger<>();

        Thread thread1 = new Thread(() -> {
            try {
                String data1 = "Data from Thread 1";
                System.out.println("Thread 1 is exchanging: " + data1);
                String receivedData = exchanger.exchange(data1, 3, TimeUnit.SECONDS);
                System.out.println("Thread 1 received: " + receivedData);
            } catch (InterruptedException | TimeoutException e) {
                System.out.println("Thread 1 timed out or interrupted");
            }
        });

        Thread thread2 = new Thread(() -> {
            try {
                Thread.sleep(5000); // Simulate delay
                String data2 = "Data from Thread 2";
                System.out.println("Thread 2 is exchanging: " + data2);
                String receivedData = exchanger.exchange(data2);
                System.out.println("Thread 2 received: " + receivedData);
            } catch (InterruptedException e) {
                e.printStackTrace();
            }
        });

        thread1.start();
        thread2.start();
    }
}

3. exchange(V x)底层实现

Exchanger 类的 exchange 方法是其核心方法,用于在两个线程之间交换数据。下面将详细介绍 public V exchange(V x) throws InterruptedException 方法的实现。

 public V exchange(V x) throws InterruptedException {
        Object v;
        Object item = (x == null) ? NULL_ITEM : x; // translate null args
        if ((arena != null ||
             (v = slotExchange(item, false, 0L)) == null) &&
            ((Thread.interrupted() || // disambiguates null return
              (v = arenaExchange(item, false, 0L)) == null)))
            throw new InterruptedException();
        return (v == NULL_ITEM) ? null : (V)v;
    }

3.1 方法签名

public V exchange(V x) throws InterruptedException

3.2 参数和返回值

  • 参数 x:当前线程希望交换的数据。
  • 返回值 V:从另一个线程接收到的数据。
  • 抛出 InterruptedException:如果线程在等待交换时被中断,则抛出此异常。

3.3 代码解析

3.3.1 处理 null 参数

Object item = (x == null) ? NULL_ITEM : x; // translate null args

由于 null 不能直接用于交换,因此将 null 转换为一个特殊的标记 NULL_ITEM。

3.3.2 尝试在单槽模式下交换数据

if ((arena != null || (v = slotExchange(item, false, 0L)) == null) &&
  • arena 用于多槽模式,如果 arena 不为 null,则表示当前处于多槽模式。
  • slotExchange 是单槽模式下的交换方法。尝试在单槽模式下进行数据交换。如果交换成功,则返回接收到的数据;否则返回 null。

3.3.3 处理中断情况

((Thread.interrupted() || // disambiguates null return
  (v = arenaExchange(item, false, 0L)) == null)))
  • 如果线程被中断,或者在单槽模式下交换失败,则尝试在多槽模式下进行交换。
  • arenaExchange 是多槽模式下的交换方法。如果交换成功,则返回接收到的数据;否则返回 null。

3.3.4 抛出 InterruptedException

throw new InterruptedException();

如果线程在等待交换时被中断,或者在多槽模式下交换也失败,则抛出 InterruptedException。

3.3.5 返回接收到的数据

return (v == NULL_ITEM) ? null : (V)v;
  • 如果接收到的数据是 NULL_ITEM,则返回 null。
  • 否则,返回接收到的数据。

3.4 方法调用流程

3.4.1 单槽模式 (slotExchange)

  • 适用于简单的两个线程之间的直接交换。
  • 尝试在单槽模式下进行数据交换。如果交换成功,返回接收到的数据;否则返回 null。

3.4.2 多槽模式 (arenaExchange)

  • 适用于高并发环境下的多线程数据交换。
  • 如果单槽模式交换失败,或者线程被中断,则尝试在多槽模式下进行数据交换。

4. slotExchange()的底层实现

slotExchange 方法是 Exchanger 类中用于实现单槽模式下线程间数据交换的核心方法。下面将详细介绍该方法的实现和工作机制。

private final Object slotExchange(Object item, boolean timed, long ns) {
        Node p = participant.get();
        Thread t = Thread.currentThread();
        if (t.isInterrupted()) // preserve interrupt status so caller can recheck
            return null;

        for (Node q;;) {
            if ((q = slot) != null) {
                if (U.compareAndSwapObject(this, SLOT, q, null)) {
                    Object v = q.item;
                    q.match = item;
                    Thread w = q.parked;
                    if (w != null)
                        U.unpark(w);
                    return v;
                }
                // create arena on contention, but continue until slot null
                if (NCPU > 1 && bound == 0 &&
                    U.compareAndSwapInt(this, BOUND, 0, SEQ))
                    arena = new Node[(FULL + 2) << ASHIFT];
            }
            else if (arena != null)
                return null; // caller must reroute to arenaExchange
            else {
                p.item = item;
                if (U.compareAndSwapObject(this, SLOT, null, p))
                    break;
                p.item = null;
            }
        }

        // await release
        int h = p.hash;
        long end = timed ? System.nanoTime() + ns : 0L;
        int spins = (NCPU > 1) ? SPINS : 1;
        Object v;
        while ((v = p.match) == null) {
            if (spins > 0) {
                h ^= h << 1; h ^= h >>> 3; h ^= h << 10;
                if (h == 0)
                    h = SPINS | (int)t.getId();
                else if (h < 0 && (--spins & ((SPINS >>> 1) - 1)) == 0)
                    Thread.yield();
            }
            else if (slot != p)
                spins = SPINS;
            else if (!t.isInterrupted() && arena == null &&
                     (!timed || (ns = end - System.nanoTime()) > 0L)) {
                U.putObject(t, BLOCKER, this);
                p.parked = t;
                if (slot == p)
                    U.park(false, ns);
                p.parked = null;
                U.putObject(t, BLOCKER, null);
            }
            else if (U.compareAndSwapObject(this, SLOT, p, null)) {
                v = timed && ns <= 0L && !t.isInterrupted() ? TIMED_OUT : null;
                break;
            }
        }
        U.putOrderedObject(p, MATCH, null);
        p.item = null;
        p.hash = h;
        return v;
    }

4.1 方法签名

private final Object slotExchange(Object item, boolean timed, long ns)

4.2 参数和返回值

  • 参数 item:当前线程希望交换的数据。
  • 参数 timed:是否启用超时机制。
  • 参数 ns:超时时间,单位为纳秒。
  • 返回值 Object:从另一个线程接收到的数据。

4.3 代码解析

4.3.1 初始化和中断检查

Node p = participant.get();
Thread t = Thread.currentThread();
if (t.isInterrupted()) // preserve interrupt status so caller can recheck
    return null;
  • 获取当前线程的 Node 对象。
  • 检查当前线程是否被中断,如果是,则返回 null。

4.3.2 尝试在单槽模式下进行数据交换

for (Node q;;) {
    if ((q = slot) != null) {
        if (U.compareAndSwapObject(this, SLOT, q, null)) {
            Object v = q.item;
            q.match = item;
            Thread w = q.parked;
            if (w != null)
                U.unpark(w);
            return v;
        }
        // create arena on contention, but continue until slot null
        if (NCPU > 1 && bound == 0 &&
            U.compareAndSwapInt(this, BOUND, 0, SEQ))
            arena = new Node[(FULL + 2) << ASHIFT];
    }
    else if (arena != null)
        return null; // caller must reroute to arenaExchange
    else {
        p.item = item;
        if (U.compareAndSwapObject(this, SLOT, null, p))
            break;
        p.item = null;
    }
}
  • 如果 slot 不为空,尝试通过 CAS 操作将 slot 设置为 null,表示当前线程占用了这个槽。
  • 如果成功,获取槽中的数据 q.item,并将当前线程的数据 item 设置为槽的匹配数据 q.match。
  • 如果槽中的线程 q.parked 不为空,唤醒该线程。
  • 如果 slot 为空,尝试将当前线程的数据 item 放入槽中,并通过 CAS 操作将 slot 设置为当前线程的 Node 对象。
  • 如果 arena 不为空,表示需要切换到多槽模式,返回 null。

4.3.3 等待数据交换完成

// await release
int h = p.hash;
long end = timed ? System.nanoTime() + ns : 0L;
int spins = (NCPU > 1) ? SPINS : 1;
Object v;
while ((v = p.match) == null) {
    if (spins > 0) {
        h ^= h << 1; h ^= h >>> 3; h ^= h << 10;
        if (h == 0)
            h = SPINS | (int)t.getId();
        else if (h < 0 && (--spins & ((SPINS >>> 1) - 1)) == 0)
            Thread.yield();
    }
    else if (slot != p)
        spins = SPINS;
    else if (!t.isInterrupted() && arena == null &&
             (!timed || (ns = end - System.nanoTime()) > 0L)) {
        U.putObject(t, BLOCKER, this);
        p.parked = t;
        if (slot == p)
            U.park(false, ns);
        p.parked = null;
        U.putObject(t, BLOCKER, null);
    }
    else if (U.compareAndSwapObject(this, SLOT, p, null)) {
        v = timed && ns <= 0L && !t.isInterrupted() ? TIMED_OUT : null;
        break;
    }
}
U.putOrderedObject(p, MATCH, null);
p.item = null;
p.hash = h;
return v;
  • 初始化一些变量,如 hash 值、超时时间 end 和自旋次数 spins。
  • 在循环中等待数据交换完成,通过检查 p.match 是否为空来判断。
  • 如果自旋次数 spins 大于 0,进行自旋操作。
  • 如果 slot 不等于当前线程的 Node 对象 p,重置自旋次数。
  • 如果线程没有被中断且 arena 为空且未超时,则将当前线程挂起等待数据交换完成。
  • 如果超时或被中断,则通过 CAS 操作将 slot 设置为 null,并返回 null 或 TIMED_OUT。
  • 最后,清理当前线程的 Node 对象,并返回接收到的数据。

系列文章

1.JDK源码阅读之环境搭建

2.JDK源码阅读之目录介绍

3.jdk源码阅读之ArrayList(上)

4.jdk源码阅读之ArrayList(下)

5.jdk源码阅读之HashMap

6.jdk源码阅读之HashMap(下)

7.jdk源码阅读之ConcurrentHashMap(上)

8.jdk源码阅读之ConcurrentHashMap(下)

9.jdk源码阅读之ThreadLocal

10.jdk源码阅读之ReentrantLock

11.jdk源码阅读之CountDownLatch

12.jdk源码阅读之CyclicBarrier

13.jdk源码阅读之Semaphore

14.jdk源码阅读之线程池(上)

15.jdk源码阅读之线程池(下)

16.jdk源码阅读之ArrayBlockingQueue

17.jdk源码阅读之LinkedBlockingQueue

18.jdk源码阅读之CopyOnWriteArrayList

19.jdk源码阅读之FutureTask

20.jdk源码阅读之CompletableFuture

21.jdk源码阅读之AtomicLong

22.jdk源码阅读之Thread(上)

23.jdk源码阅读之Thread(下)

24.jdk源码阅读之ExecutorService

25.jdk源码阅读之Executors

26.jdk源码阅读之ConcurrentLinkedQueue

27.jdk源码阅读之ConcurrentLinkedDeque

28.jdk源码阅读之CopyOnWriteArraySet

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.coloradmin.cn/o/1960337.html

如若内容造成侵权/违法违规/事实不符,请联系多彩编程网进行投诉反馈,一经查实,立即删除!

相关文章

028-GeoGebra中级篇-脚本的初步的探索

GeoGebra 的脚本功能允许用户通过不同的触发机制&#xff08;如点击、更新、输入框变化、拖动结束&#xff09;和全局 JavaScript 自定义图形和交互行为&#xff0c;实现动态数学模型和用户交互&#xff0c;同时 ggbOnInit() 函数可在应用初始化时设置默认状态&#xff0c;提供…

Git基本原理介绍及常用指令

文章目录 前言一、Git是什么&#xff1f;集中化的版本控制系统分布式版本控制系统 二、Git基本概念三、git命令操作配置用户信息常用指令 总结 前言 如果你用Microsoft Word写过论文&#xff0c;那你一定有这样的经历&#xff1a;想删除一个段落&#xff0c;又怕将来想恢复找不…

MySQL基础练习题8-每月交易1

题目&#xff1a; 查询来查找每个月和每个国家/地区的事务数及其总金额、已批准的事务数及其总金额。 准备数据 分析数据 第一步&#xff1a;用substr()函数来截取到月份&#xff0c;用group by为每个国家分组来查找每个国家 第二步&#xff1a;用count()和sum()来求事务数…

14.FineReport制作带筛选按钮的报表和图表

1.首先连接自带的sqlite数据库&#xff0c;具体方法参考下面的链接 FineReport连接自带的sqlite数据库 2.文件 – 新建普通报表 3.新建数据库查询 4.查询自带的销量表 5.模版&#xff0c;页面设置 6.方向选择横向 7.合并单元格&#xff0c;并添加斜线表头 8.表格中添加字段信…

iPhone手机识别提取藏文字体,推荐《藏语翻译通》藏文OCR识别神器!

如果你正在寻找一款支持藏文OCR识别提取文字的App&#xff0c;我们将向你推荐《藏语翻译通》App&#xff0c;一款专门为iPhone手机用户设计的藏文识别与翻译工具。 功能特点 《藏语翻译通》是一款集藏文OCR识别、藏汉互译、语音识别翻译于一体的应用。用户只需要拿起手机扫一…

【原创】java+ssm+mysql图书信息管理系统设计与实现

个人主页&#xff1a;程序员杨工 个人简介&#xff1a;从事软件开发多年&#xff0c;前后端均有涉猎&#xff0c;具有丰富的开发经验 博客内容&#xff1a;全栈开发&#xff0c;分享Java、Python、Php、小程序、前后端、数据库经验和实战 开发背景&#xff1a; 随着数字化和信…

【排序】快速排序详解

✨✨欢迎大家来到Celia的博客✨✨ &#x1f389;&#x1f389;创作不易&#xff0c;请点赞关注&#xff0c;多多支持哦&#x1f389;&#x1f389; 所属专栏&#xff1a;排序 个人主页&#xff1a;Celias blog~ 一、快速排序的思想 快速排序的核心思想是&#xff1a; 选定一个…

关于Unity转微信小程序的流程记录

1.准备工作 1.unity微信小程序转换工具,minigame插件,导入后工具栏出现“微信小游戏" 2.微信开发者工具稳定版 3.MP微信公众平台申请微信小游戏,获得游戏appid 4.unity转webgl开发平台,Player Setting->Other Settings->Color Space->Linear 5. unity工…

程序员面试的“八股文“:助力还是阻力?

&#x1f49d;&#x1f49d;&#x1f49d;欢迎来到我的博客&#xff0c;很高兴能够在这里和您见面&#xff01;希望您在这里可以感受到一份轻松愉快的氛围&#xff0c;不仅可以获得有趣的内容和知识&#xff0c;也可以畅所欲言、分享您的想法和见解。 推荐:kwan 的首页,持续学…

知识付费培训考试题库h5小程序开源版开发

知识付费培训考试题库h5小程序开源版开发 企业内部培训与考试课堂系统&#xff0c;支持丰富课程类型&#xff0c;还拥有全面的题型体系&#xff0c;并能自动评分。应用具备响应式设计&#xff0c;加之学习进度跟踪与评论功能&#xff0c;打造互动式学习环境&#xff0c;是现代…

不知道服务器jenkins账户密码,利用自己账户和sudo登录jenkins账户

在服务器上不知道jenkins账户密码&#xff0c;只知道自己账户密码&#xff0c;如何登录jenkins账户 sudo -u jenkins -i

“八股文”在实际工作中是助力、阻力还是空谈

目录 1.概述 1.1.对实际工作的助力 1.2.存在的问题 2.“八股文”对招聘过程的影响 2.1.“八股文”在筛选候选人时的作用 2.2.面试中的比重及其合理性 2.3.如何平衡“八股文”与实际编程能力的考察 3.“八股文”在日常工作中的实用价值 3.1.在团队协作环境中进行有效沟…

Burning In 测试

什么是老化测试&#xff1f; 芯片Burning In测试系统是一种高度集成的测试设备&#xff0c;它结合了温度控制、电源控制、环境控制以及数据采集与分析等多个子系统。该系统能够在可控的条件下对芯片进行长时间的老化测试&#xff0c;从而有效地排查潜在问题&#xff0c;提高芯片…

MySQL 8.0 新特性汇总

文章目录 前言1. 运维管理1.1 可持久化变量1.2 管理员端口1.3 资源组1.4 数据库粒度只读1.5 show processlist 实现方式1.6 加速索引创建速度1.7 控制连接的内存使用量1.8 克隆插件1.9 mysqldump 新增参数1.10 慢日志增强1.11 快速加列1.12 InnoDB 隐藏主键1.13 Redo 配置1.14 …

快速方便地下载huggingface的模型库和数据集

快速方便地下载huggingface的模型库和数据集 方法一&#xff1a;用于使用 aria2/wgetgit 下载 Huggingface 模型和数据集的 CLI 工具特点Usage 方法二&#xff1a;模型下载【个人使用记录】保持目录结构数据集下载不足之处 方法一&#xff1a;用于使用 aria2/wgetgit 下载 Hugg…

java算法day26

java算法day26 207 课程表208 实现Trie(前缀树) 207 课程表 这题对应的知识是图论里的拓扑排序的知识。从题意就可以感受出来了。题目说如果要学习某课程&#xff0c;那么就需要先完成某课程。 这里我描述比较复杂的情况&#xff1a;课程与课程之间也有可能是多对一的场景或者…

实现halcon中的erosion、connection、fill_up

在halcon中&#xff0c;区域R是用一系列行程&#xff08;run&#xff09;的集合表示的&#xff0c;run的形式为&#xff08;Row&#xff0c;ColumnBegin&#xff0c;ColumnEnd&#xff09;&#xff0c;分别对应行坐标、列开始坐标、列结束坐标&#xff0c;这种保存区域的方法被…

C#中重写tospring方法

在C#中&#xff0c;重写ToString方法允许你自定义对象的字符串表示形式。当你想要打印对象或者在调试时查看对象的状态时&#xff0c;重写ToString方法非常有用。 默认情况下&#xff0c;ToString方法返回对象的类型名称。通过重写这个方法&#xff0c;你可以返回一个更有意义…

1.5 队列概念,应用及部分实现

1.基本概念 队列&#xff08; Queue &#xff09;&#xff1a;也是运算受限的线性表。是一种先进先出&#xff08; First In First Out &#xff0c;简称 FIFO &#xff09;的线性表。只允许在表的一端进行插入&#xff0c;而在另一端进行删除。 队首&#xff08; front &am…

C/C++编程-算法学习-数字滤波器

数字滤波器 一阶低通滤波器结论推导11. 基本公式推导2. 截止频率 和 采样频率 推导 实现 二阶低通滤波器实现1实现2推导1推导2 一阶低通滤波器 结论 其基本原理基于以下公式&#xff1a; o u t p u t [ n ] α ∗ i n p u t [ n ] ( 1 − α ) ∗ o u t p u t [ n − 1 ] …