1. 写在前面
Exchanger 是 Java 并发包 (java.util.concurrent) 中的一个同步点工具类,用于在两个线程之间交换数据。它提供了一种简单而强大的方式来实现线程之间的数据交换。不知道大家在日常工作中或者面试中 有遇到它?下面几个问题可以一块来探讨下:
- Exchanger 的工作原理是什么?
- Exchanger 在什么情况下会导致线程阻塞?
- Exchanger 是否线程安全?
- Exchanger 和其他同步工具类(如 CyclicBarrier 和 CountDownLatch)有什么区别?
- Exchanger 的常见使用场景有哪些?
- Exchanger 的 exchange 方法在交换数据时是否会丢失数据?
- Exchanger 能否用于多个线程之间的数据交换?
2. 从使用说起
2.1 生产者-消费者模式
在生产者-消费者模式中,Exchanger 可以用于生产者和消费者之间交换数据。
import java.util.concurrent.Exchanger;
public class ProducerConsumerExample {
public static void main(String[] args) {
Exchanger<String> exchanger = new Exchanger<>();
Thread producer = new Thread(() -> {
try {
String producedData = "Produced Data";
System.out.println("Producer is producing: " + producedData);
String consumedData = exchanger.exchange(producedData);
System.out.println("Producer received: " + consumedData);
} catch (InterruptedException e) {
e.printStackTrace();
}
});
Thread consumer = new Thread(() -> {
try {
String consumedData = exchanger.exchange(null);
System.out.println("Consumer is consuming: " + consumedData);
String feedback = "Consumed Data";
exchanger.exchange(feedback);
} catch (InterruptedException e) {
e.printStackTrace();
}
});
producer.start();
consumer.start();
}
}
2.2 任务分配和结果收集
在任务分配和结果收集的场景中,Exchanger 可以用于任务分配线程和结果收集线程之间交换数据。
import java.util.concurrent.Exchanger;
public class TaskAssignmentExample {
public static void main(String[] args) {
Exchanger<String> exchanger = new Exchanger<>();
Thread taskDistributor = new Thread(() -> {
try {
String task = "Task for Worker";
System.out.println("Distributor is distributing: " + task);
String result = exchanger.exchange(task);
System.out.println("Distributor received result: " + result);
} catch (InterruptedException e) {
e.printStackTrace();
}
});
Thread worker = new Thread(() -> {
try {
String task = exchanger.exchange(null);
System.out.println("Worker received task: " + task);
String result = "Result of " + task;
exchanger.exchange(result);
} catch (InterruptedException e) {
e.printStackTrace();
}
});
taskDistributor.start();
worker.start();
}
}
2.3 双向数据传递
在某些场景中,两个线程可能需要进行双向数据传递。Exchanger 可以简化这一过程。
import java.util.concurrent.Exchanger;
public class BidirectionalDataExchangeExample {
public static void main(String[] args) {
Exchanger<String> exchanger = new Exchanger<>();
Thread thread1 = new Thread(() -> {
try {
String data1 = "Data from Thread 1";
System.out.println("Thread 1 is exchanging: " + data1);
String receivedData = exchanger.exchange(data1);
System.out.println("Thread 1 received: " + receivedData);
} catch (InterruptedException e) {
e.printStackTrace();
}
});
Thread thread2 = new Thread(() -> {
try {
String data2 = "Data from Thread 2";
System.out.println("Thread 2 is exchanging: " + data2);
String receivedData = exchanger.exchange(data2);
System.out.println("Thread 2 received: " + receivedData);
} catch (InterruptedException e) {
e.printStackTrace();
}
});
thread1.start();
thread2.start();
}
}
2.4 多步数据处理
在多步数据处理的场景中,Exchanger 可以用于在不同处理步骤之间交换数据。
import java.util.concurrent.Exchanger;
public class MultiStepProcessingExample {
public static void main(String[] args) {
Exchanger<String> exchanger = new Exchanger<>();
Thread step1 = new Thread(() -> {
try {
String step1Data = "Step 1 Data";
System.out.println("Step 1 processing: " + step1Data);
String step2Data = exchanger.exchange(step1Data);
System.out.println("Step 1 received: " + step2Data);
} catch (InterruptedException e) {
e.printStackTrace();
}
});
Thread step2 = new Thread(() -> {
try {
String step1Data = exchanger.exchange(null);
System.out.println("Step 2 received: " + step1Data);
String step2Data = "Processed " + step1Data;
System.out.println("Step 2 processing: " + step2Data);
exchanger.exchange(step2Data);
} catch (InterruptedException e) {
e.printStackTrace();
}
});
step1.start();
step2.start();
}
}
2.5 超时处理
在某些情况下,线程可能需要在一定时间内完成数据交换,否则就放弃操作。Exchanger 提供了带超时的 exchange 方法。
import java.util.concurrent.Exchanger;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.TimeoutException;
public class TimeoutExample {
public static void main(String[] args) {
Exchanger<String> exchanger = new Exchanger<>();
Thread thread1 = new Thread(() -> {
try {
String data1 = "Data from Thread 1";
System.out.println("Thread 1 is exchanging: " + data1);
String receivedData = exchanger.exchange(data1, 3, TimeUnit.SECONDS);
System.out.println("Thread 1 received: " + receivedData);
} catch (InterruptedException | TimeoutException e) {
System.out.println("Thread 1 timed out or interrupted");
}
});
Thread thread2 = new Thread(() -> {
try {
Thread.sleep(5000); // Simulate delay
String data2 = "Data from Thread 2";
System.out.println("Thread 2 is exchanging: " + data2);
String receivedData = exchanger.exchange(data2);
System.out.println("Thread 2 received: " + receivedData);
} catch (InterruptedException e) {
e.printStackTrace();
}
});
thread1.start();
thread2.start();
}
}
3. exchange(V x)底层实现
Exchanger 类的 exchange 方法是其核心方法,用于在两个线程之间交换数据。下面将详细介绍 public V exchange(V x) throws InterruptedException 方法的实现。
public V exchange(V x) throws InterruptedException {
Object v;
Object item = (x == null) ? NULL_ITEM : x; // translate null args
if ((arena != null ||
(v = slotExchange(item, false, 0L)) == null) &&
((Thread.interrupted() || // disambiguates null return
(v = arenaExchange(item, false, 0L)) == null)))
throw new InterruptedException();
return (v == NULL_ITEM) ? null : (V)v;
}
3.1 方法签名
public V exchange(V x) throws InterruptedException
3.2 参数和返回值
- 参数 x:当前线程希望交换的数据。
- 返回值 V:从另一个线程接收到的数据。
- 抛出 InterruptedException:如果线程在等待交换时被中断,则抛出此异常。
3.3 代码解析
3.3.1 处理 null 参数
Object item = (x == null) ? NULL_ITEM : x; // translate null args
由于 null 不能直接用于交换,因此将 null 转换为一个特殊的标记 NULL_ITEM。
3.3.2 尝试在单槽模式下交换数据
if ((arena != null || (v = slotExchange(item, false, 0L)) == null) &&
- arena 用于多槽模式,如果 arena 不为 null,则表示当前处于多槽模式。
- slotExchange 是单槽模式下的交换方法。尝试在单槽模式下进行数据交换。如果交换成功,则返回接收到的数据;否则返回 null。
3.3.3 处理中断情况
((Thread.interrupted() || // disambiguates null return
(v = arenaExchange(item, false, 0L)) == null)))
- 如果线程被中断,或者在单槽模式下交换失败,则尝试在多槽模式下进行交换。
- arenaExchange 是多槽模式下的交换方法。如果交换成功,则返回接收到的数据;否则返回 null。
3.3.4 抛出 InterruptedException
throw new InterruptedException();
如果线程在等待交换时被中断,或者在多槽模式下交换也失败,则抛出 InterruptedException。
3.3.5 返回接收到的数据
return (v == NULL_ITEM) ? null : (V)v;
- 如果接收到的数据是 NULL_ITEM,则返回 null。
- 否则,返回接收到的数据。
3.4 方法调用流程
3.4.1 单槽模式 (slotExchange)
- 适用于简单的两个线程之间的直接交换。
- 尝试在单槽模式下进行数据交换。如果交换成功,返回接收到的数据;否则返回 null。
3.4.2 多槽模式 (arenaExchange)
- 适用于高并发环境下的多线程数据交换。
- 如果单槽模式交换失败,或者线程被中断,则尝试在多槽模式下进行数据交换。
4. slotExchange()的底层实现
slotExchange 方法是 Exchanger 类中用于实现单槽模式下线程间数据交换的核心方法。下面将详细介绍该方法的实现和工作机制。
private final Object slotExchange(Object item, boolean timed, long ns) {
Node p = participant.get();
Thread t = Thread.currentThread();
if (t.isInterrupted()) // preserve interrupt status so caller can recheck
return null;
for (Node q;;) {
if ((q = slot) != null) {
if (U.compareAndSwapObject(this, SLOT, q, null)) {
Object v = q.item;
q.match = item;
Thread w = q.parked;
if (w != null)
U.unpark(w);
return v;
}
// create arena on contention, but continue until slot null
if (NCPU > 1 && bound == 0 &&
U.compareAndSwapInt(this, BOUND, 0, SEQ))
arena = new Node[(FULL + 2) << ASHIFT];
}
else if (arena != null)
return null; // caller must reroute to arenaExchange
else {
p.item = item;
if (U.compareAndSwapObject(this, SLOT, null, p))
break;
p.item = null;
}
}
// await release
int h = p.hash;
long end = timed ? System.nanoTime() + ns : 0L;
int spins = (NCPU > 1) ? SPINS : 1;
Object v;
while ((v = p.match) == null) {
if (spins > 0) {
h ^= h << 1; h ^= h >>> 3; h ^= h << 10;
if (h == 0)
h = SPINS | (int)t.getId();
else if (h < 0 && (--spins & ((SPINS >>> 1) - 1)) == 0)
Thread.yield();
}
else if (slot != p)
spins = SPINS;
else if (!t.isInterrupted() && arena == null &&
(!timed || (ns = end - System.nanoTime()) > 0L)) {
U.putObject(t, BLOCKER, this);
p.parked = t;
if (slot == p)
U.park(false, ns);
p.parked = null;
U.putObject(t, BLOCKER, null);
}
else if (U.compareAndSwapObject(this, SLOT, p, null)) {
v = timed && ns <= 0L && !t.isInterrupted() ? TIMED_OUT : null;
break;
}
}
U.putOrderedObject(p, MATCH, null);
p.item = null;
p.hash = h;
return v;
}
4.1 方法签名
private final Object slotExchange(Object item, boolean timed, long ns)
4.2 参数和返回值
- 参数 item:当前线程希望交换的数据。
- 参数 timed:是否启用超时机制。
- 参数 ns:超时时间,单位为纳秒。
- 返回值 Object:从另一个线程接收到的数据。
4.3 代码解析
4.3.1 初始化和中断检查
Node p = participant.get();
Thread t = Thread.currentThread();
if (t.isInterrupted()) // preserve interrupt status so caller can recheck
return null;
- 获取当前线程的 Node 对象。
- 检查当前线程是否被中断,如果是,则返回 null。
4.3.2 尝试在单槽模式下进行数据交换
for (Node q;;) {
if ((q = slot) != null) {
if (U.compareAndSwapObject(this, SLOT, q, null)) {
Object v = q.item;
q.match = item;
Thread w = q.parked;
if (w != null)
U.unpark(w);
return v;
}
// create arena on contention, but continue until slot null
if (NCPU > 1 && bound == 0 &&
U.compareAndSwapInt(this, BOUND, 0, SEQ))
arena = new Node[(FULL + 2) << ASHIFT];
}
else if (arena != null)
return null; // caller must reroute to arenaExchange
else {
p.item = item;
if (U.compareAndSwapObject(this, SLOT, null, p))
break;
p.item = null;
}
}
- 如果 slot 不为空,尝试通过 CAS 操作将 slot 设置为 null,表示当前线程占用了这个槽。
- 如果成功,获取槽中的数据 q.item,并将当前线程的数据 item 设置为槽的匹配数据 q.match。
- 如果槽中的线程 q.parked 不为空,唤醒该线程。
- 如果 slot 为空,尝试将当前线程的数据 item 放入槽中,并通过 CAS 操作将 slot 设置为当前线程的 Node 对象。
- 如果 arena 不为空,表示需要切换到多槽模式,返回 null。
4.3.3 等待数据交换完成
// await release
int h = p.hash;
long end = timed ? System.nanoTime() + ns : 0L;
int spins = (NCPU > 1) ? SPINS : 1;
Object v;
while ((v = p.match) == null) {
if (spins > 0) {
h ^= h << 1; h ^= h >>> 3; h ^= h << 10;
if (h == 0)
h = SPINS | (int)t.getId();
else if (h < 0 && (--spins & ((SPINS >>> 1) - 1)) == 0)
Thread.yield();
}
else if (slot != p)
spins = SPINS;
else if (!t.isInterrupted() && arena == null &&
(!timed || (ns = end - System.nanoTime()) > 0L)) {
U.putObject(t, BLOCKER, this);
p.parked = t;
if (slot == p)
U.park(false, ns);
p.parked = null;
U.putObject(t, BLOCKER, null);
}
else if (U.compareAndSwapObject(this, SLOT, p, null)) {
v = timed && ns <= 0L && !t.isInterrupted() ? TIMED_OUT : null;
break;
}
}
U.putOrderedObject(p, MATCH, null);
p.item = null;
p.hash = h;
return v;
- 初始化一些变量,如 hash 值、超时时间 end 和自旋次数 spins。
- 在循环中等待数据交换完成,通过检查 p.match 是否为空来判断。
- 如果自旋次数 spins 大于 0,进行自旋操作。
- 如果 slot 不等于当前线程的 Node 对象 p,重置自旋次数。
- 如果线程没有被中断且 arena 为空且未超时,则将当前线程挂起等待数据交换完成。
- 如果超时或被中断,则通过 CAS 操作将 slot 设置为 null,并返回 null 或 TIMED_OUT。
- 最后,清理当前线程的 Node 对象,并返回接收到的数据。
系列文章
1.JDK源码阅读之环境搭建
2.JDK源码阅读之目录介绍
3.jdk源码阅读之ArrayList(上)
4.jdk源码阅读之ArrayList(下)
5.jdk源码阅读之HashMap
6.jdk源码阅读之HashMap(下)
7.jdk源码阅读之ConcurrentHashMap(上)
8.jdk源码阅读之ConcurrentHashMap(下)
9.jdk源码阅读之ThreadLocal
10.jdk源码阅读之ReentrantLock
11.jdk源码阅读之CountDownLatch
12.jdk源码阅读之CyclicBarrier
13.jdk源码阅读之Semaphore
14.jdk源码阅读之线程池(上)
15.jdk源码阅读之线程池(下)
16.jdk源码阅读之ArrayBlockingQueue
17.jdk源码阅读之LinkedBlockingQueue
18.jdk源码阅读之CopyOnWriteArrayList
19.jdk源码阅读之FutureTask
20.jdk源码阅读之CompletableFuture
21.jdk源码阅读之AtomicLong
22.jdk源码阅读之Thread(上)
23.jdk源码阅读之Thread(下)
24.jdk源码阅读之ExecutorService
25.jdk源码阅读之Executors
26.jdk源码阅读之ConcurrentLinkedQueue
27.jdk源码阅读之ConcurrentLinkedDeque
28.jdk源码阅读之CopyOnWriteArraySet