Java并发工具包(Java Util Concurrent, 简称JUC)是Java提供的一组用于简化多线程编程的类和接口,它包含了用于线程同步、并发数据结构、线程池、锁、原子操作以及其他并发实用工具的丰富集合。
1. 线程池
线程池是 Java 并发编程中非常重要的工具,它可以管理和复用一组线程,减少线程创建和销毁的开销,提高程序性能和资源利用率。Java 提供了丰富的线程池实现,其中最常用的是 ThreadPoolExecutor。以下是对线程池及其相关概念的详细介绍:
1.1 线程池的核心概念
线程复用: 通过线程池,线程可以在执行完任务后被复用,从而避免频繁创建和销毁线程带来的性能开销。
任务队列: 当有新的任务提交给线程池时,如果没有空闲线程可用,任务会被放入队列中等待执行。
线程数量管理: 线程池可以通过配置核心线程数、最大线程数来灵活控制线程的数量。
1.2 ThreadPoolExecutor
ThreadPoolExecutor
是 Java 中线程池的核心实现类,提供了丰富的配置选项,允许我们根据需求创建不同类型的线程池。它的构造函数主要参数包括:
corePoolSize:
核心线程数,线程池在空闲时保留的最小线程数,即使它们处于空闲状态也不会被回收。
maximumPoolSize:
线程池中允许的最大线程数。当任务数超过核心线程数时,线程池会创建新的线程,直到线程数达到 maximumPoolSize。
keepAliveTime:
非核心线程的存活时间。当线程池中线程数量超过 corePoolSize 时,空闲时间超过 keepAliveTime 的线程会被回收。
unit:
keepAliveTime 的时间单位,可以是秒、毫秒等。
workQueue:
任务队列,用于存储等待执行的任务。常见的队列类型有 LinkedBlockingQueue(无界队列)、ArrayBlockingQueue(有界队列)等。
threadFactory:
线程工厂,用于创建线程,可以自定义线程的名称、优先级等。
handler:
拒绝策略,当任务数量超过线程池和队列容量限制时,执行的处理策略。常见的策略有 AbortPolicy(抛出异常)、CallerRunsPolicy(由调用线程执行任务)、DiscardPolicy(直接丢弃任务)、DiscardOldestPolicy(丢弃队列中最旧的任务)。
1.3 ThreadPoolExecutor 的工作流程
- 当一个新的任务提交到线程池时,线程池会判断当前运行的线程数是否小于 corePoolSize。
如果小于 corePoolSize,则创建一个新的线程执行任务。
如果大于或等于 corePoolSize,任务会被放入 workQueue 中等待执行。 - 如果 workQueue 已满,且运行的线程数小于 maximumPoolSize,则创建新的线程执行任务。
- 如果运行的线程数达到 maximumPoolSize,并且 workQueue 也满了,线程池会根据拒绝策略来处理任务。
1.4 常用线程池
Java 提供了一些预定义的线程池,使用 Executors 工具类可以方便地创建:
newFixedThreadPool(int nThreads): 创建固定大小的线程池,线程池中的线程数始终保持为 nThreads。适用于负载较为均衡的场景。
newCachedThreadPool(): 创建一个会根据需要创建新线程的线程池,但在之前构建的线程可用时将重用它们。适用于执行很多短期异步任务的小程序。
newSingleThreadExecutor(): 创建只有一个线程的线程池,任务会按照提交的顺序逐一执行。
newScheduledThreadPool(int corePoolSize): 创建支持定时和周期性任务执行的线程池。
1.5 线程池的优点
提高性能: 通过复用线程,减少了线程创建和销毁的开销,提高了性能。
资源管理: 通过限制线程池的最大线程数,防止过多的线程导致系统资源耗尽。
任务管理: 通过任务队列,可以对提交的任务进行排队、调度。
1.6 线程池的使用注意事项
合理配置线程池大小: 线程池大小需要根据系统资源和任务特性合理配置,以避免资源浪费或线程饥饿。
避免使用无界队列: 使用无界队列可能导致内存溢出,应根据实际情况选择合适的队列类型和大小。
设置合适的拒绝策略: 对于无法处理的任务,应选择合适的拒绝策略,以避免任务丢失或系统崩溃。
线程泄漏: 在使用线程池时要确保任务不会永远阻塞,避免线程泄漏导致线程池无法释放。
2. 并发数据结构
Java 并发工具包(java.util.concurrent)提供了许多线程安全的并发数据结构。这些数据结构在多线程环境中避免了传统同步方式可能带来的性能瓶颈,利用高效的机制来确保数据的一致性和操作的线程安全。下面对常用的并发数据结构进行详细介绍:
2.1 ConcurrentHashMap
简介: ConcurrentHashMap 是线程安全的哈希表,类似于 HashMap,但在多线程环境中表现更好。
实现机制: 在 Java 8 之前,ConcurrentHashMap 使用分段锁(Segment),将整个哈希表分成多个段,每个段维护自己的锁,从而提高并发度。Java 8 之后,改用 CAS(Compare-And-Swap)和 synchronized 实现,将锁粒度缩小到桶(bucket)级别,进一步提升性能。
特点:
不支持 null 键和 null 值。
读取操作不会被阻塞,写入时只锁定相应的桶,减少锁的争用。
computeIfAbsent、computeIfPresent、merge 等方法,提供原子操作,避免需要额外的同步。
2.2 CopyOnWriteArrayList
简介: CopyOnWriteArrayList 是一个线程安全的 ArrayList,通过在每次修改时创建数组的副本来实现线程安全。
实现机制: 在进行修改操作时,如 add、set 等,先复制数组,修改副本,然后将原子地替换原数组。这使得读操作无需加锁,始终读取到一个稳定的快照。
特点:
适用于读多写少的场景,如缓存、配置等场景。
因为每次修改都会复制整个数组,所以在写操作频繁的场景中性能较差。
在迭代过程中,可以安全地进行修改,不会抛出 ConcurrentModificationException。
2.3 ConcurrentLinkedQueue
简介: ConcurrentLinkedQueue 是一个基于链接节点的无界线程安全队列,采用非阻塞算法(CAS)实现。
实现机制: 基于 Michael & Scott 的无锁队列算法,实现了高效的入队和出队操作,offer 和 poll 操作使用 CAS 来保证线程安全。
特点:
适用于高并发场景。
非阻塞,不会因为队列满或空而阻塞线程。
无界队列,可能导致内存耗尽,因此需要确保生产者的速度不超过消费者。
2.4 BlockingQueue
简介: BlockingQueue 是一种支持阻塞操作的队列,在队列为空时,获取元素的操作将被阻塞;在队列满时,添加元素的操作将被阻塞。
常见实现:
ArrayBlockingQueue:
基于数组的有界阻塞队列,内部维护一个固定大小的数组作为缓冲区。
LinkedBlockingQueue:
基于链表的阻塞队列,默认无界(实际大小为 Integer.MAX_VALUE),也可以指定容量。
PriorityBlockingQueue:
支持元素按优先级排序的无界阻塞队列,不保证元素的 FIFO 顺序。
SynchronousQueue:
不存储元素的阻塞队列,每个插入操作必须等待一个对应的移除操作,反之亦然。
特点:
在生产者-消费者模式中非常有用。
支持阻塞的 put、take、offer、poll 操作,提供线程同步和通信的机制。
2.5 ConcurrentSkipListMap 和 ConcurrentSkipListSet
简介: ConcurrentSkipListMap 和 ConcurrentSkipListSet 是基于跳表的线程安全的有序集合和映射。
实现机制: 利用跳表(SkipList)的结构,提供有序的数据存储,并发性通过无锁算法(CAS)实现。
特点:
保证元素的有序性,适合需要有序访问的场景。
非阻塞的读操作,支持高并发。
比 TreeMap 和 TreeSet 在并发环境中表现更优。
2.6 其他数据结构
CopyOnWriteArraySet:
基于 CopyOnWriteArrayList 实现的线程安全集合,不允许重复元素。
LinkedTransferQueue:
比 LinkedBlockingQueue 更高效,支持生产者将元素直接传递给消费者。
DelayQueue:
支持延迟获取元素的无界阻塞队列,常用于定时任务。
LinkedBlockingDeque:
支持双端阻塞的双向队列,提供 putFirst、putLast 等操作。
2.7 使用注意事项
选择适当的数据结构: 根据应用场景和性能需求选择合适的并发数据结构。例如,如果需要频繁读操作且数据较少变化,选择 CopyOnWriteArrayList;需要线程安全且无序访问,可以选择 ConcurrentHashMap。
避免过度使用: 并发数据结构在多线程环境下表现优秀,但在单线程环境或低并发场景中可能存在不必要的性能开销。
内存使用: 一些并发数据结构(如 CopyOnWriteArrayList)可能在修改时占用较多内存,应根据应用需求进行权衡。
3. 同步辅助工具
Java 并发工具包提供了多种同步辅助工具类,用于协调多个线程之间的协作和同步。这些工具类能够简化线程间的通信,避免复杂的锁机制,使得多线程编程更加简单和高效。下面是对 Java 同步辅助工具的详细介绍:
3.1 CountDownLatch
简介: CountDownLatch 允许一个或多个线程等待,直到在其他线程中执行的一组操作完成。它维护一个计数器,该计数器初始化为一个给定的正数,表示需要等待的操作数量。
实现机制:
countDown():
每当某个线程完成任务后调用该方法,计数器减一。
await():
使当前线程等待,直到计数器变为零。
使用场景:
让主线程等待一组线程执行完毕,比如等待多个服务初始化完成后再执行某个任务。
实现一个简单的开关或触发器,确保在某个操作完成之前,线程不会执行。
CountDownLatch latch = new CountDownLatch(3);
new Thread(() -> {
// 执行任务
latch.countDown();
}).start();
latch.await(); // 等待所有线程完成
3.2 CyclicBarrier
简介: CyclicBarrier 是一个同步辅助工具,允许一组线程互相等待,直到它们都达到一个公共的屏障点。与 CountDownLatch 不同,它可以被重用。
实现机制:
await():
每个线程调用 await() 方法,在达到指定数量的线程后,所有线程才能继续执行。
可选的 Runnable
参数:在所有线程到达屏障点时,先执行指定的 Runnable 操作。
使用场景:
多线程并发执行任务,等待所有线程完成一个阶段后再进行下一阶段。
模拟并行计算,比如分阶段的计算和数据汇总。
CyclicBarrier barrier = new CyclicBarrier(3, () -> {
System.out.println("所有线程已到达屏障点");
});
new Thread(() -> {
// 执行任务
barrier.await();
}).start();
3.3 Semaphore
简介: Semaphore 是一种计数信号量,用于控制对资源的访问线程数。它维护一个许可证集合,线程可以通过 acquire() 获取许可证,通过 release() 释放许可证。
实现机制:
acquire():
从信号量中获取一个许可证,如果没有可用的许可证,线程将阻塞直到获得一个许可证。
release():
释放一个许可证,唤醒阻塞等待的线程。
使用场景:
限制同时访问某个资源的线程数,比如实现一个连接池。
控制并发执行的任务数量,比如限制一次只能有一定数量的线程进入某个关键区域。
Semaphore semaphore = new Semaphore(3); // 最大许可数量
semaphore.acquire();
try {
// 执行任务
} finally {
semaphore.release();
}
3.4 Exchanger
简介: Exchanger 提供了一种线程间数据交换的机制,两个线程可以在同步点交换数据。每个线程在到达同步点时,都会等待另一个线程到达,然后交换数据。
实现机制:
exchange():
用于交换数据,线程调用此方法后将阻塞,直到另一个线程调用 exchange()。
使用场景:
两个线程之间的数据交换,比如生产者和消费者之间交换缓冲区。
双线程协作任务,如游戏中的两个玩家进行数据交换。
Exchanger<String> exchanger = new Exchanger<>();
new Thread(() -> {
try {
String data = exchanger.exchange("来自线程1的数据");
System.out.println("线程1收到:" + data);
} catch (InterruptedException e) {
Thread.currentThread().interrupt();
}
}).start();
3.5 Phaser
简介: Phaser 提供了更灵活的多阶段任务协作机制,是 CountDownLatch 和 CyclicBarrier 的增强版。它允许线程在多个阶段中进行同步,每个阶段可以有不同的参与者数量。
实现机制:
arriveAndAwaitAdvance():
等待其他线程到达当前阶段,所有参与者都到达后,阶段推进到下一阶段。
register() 和 bulkRegister():动态注册新的参与者。
使用场景:
多阶段任务,如分阶段处理数据,阶段间有依赖关系。
可变参与者数量的任务协调。
Phaser phaser = new Phaser(1); // 注册主线程
for (int i = 0; i < 3; i++) {
phaser.register();
new Thread(() -> {
// 执行任务
phaser.arriveAndAwaitAdvance();
}).start();
}
phaser.arriveAndDeregister(); // 主线程到达并注销
3.6 使用注意事项
避免死锁: 使用这些工具类时,需要避免死锁等同步问题。确保线程不会永久阻塞在同步点上。
资源消耗: 一些工具类(如 CyclicBarrier)可以被重用,但也可能占用资源,需合理使用和释放。
异常处理: 在同步辅助工具中调用的方法通常会抛出 InterruptedException,需要适当处理这些异常,避免线程在等待过程中被意外中断。
4. 锁
在并发编程中,锁和原子操作是确保线程安全的两种主要机制。锁通过对关键区域加锁,确保只有一个线程可以访问;而原子操作利用硬件指令,实现对变量的无锁、原子性操作。下面详细介绍这两种机制:
4.1 内置锁 (synchronized)
简介: synchronized 是 Java 提供的内置锁,可以通过同步方法或同步代码块来使用。它在方法或代码块开始时锁定对象,执行完后自动释放锁。
实现机制:
每个对象都有一个关联的监视器锁(Monitor),线程进入同步方法或块时,必须先获取该对象的监视器锁。
Java 6 引入了偏向锁、轻量级锁和重量级锁的锁升级机制,以优化 synchronized 的性能。
优点:
简单易用,自动获取和释放锁,避免死锁的风险。
缺点:
粒度较粗,可能导致性能瓶颈,特别是在高并发环境中。
使用:
适用于简单的同步场景,或者当只需要对少量代码进行同步时。
使用 synchronized 可以避免手动管理锁的细节,代码简洁明了
特性:
-
简洁性:synchronized 是 Java 提供的关键字,可以直接应用于方法或代码块,使用简单,开发者无需手动管理锁的获取和释放。
-
隐式释放锁:线程执行完同步方法或代码块后,JVM 会自动释放锁,避免了忘记释放锁导致死锁的问题。
-
不可中断:synchronized 不可中断,线程在等待锁时只能阻塞,无法被中断或取消。
-
可重入性:synchronized 是可重入的,同一个线程可以多次进入同步代码块,且不会出现死锁。
-
效率优化:Java 6 以后对 synchronized 做了许多优化,包括偏向锁、轻量级锁和锁粗化等,提升了性能。
public synchronized void method() {
// 同步方法
}
public void method2() {
synchronized (this) {
// 同步代码块
}
}
4.2 显示锁 (Lock 接口)
简介: Lock 接口提供了更灵活的锁机制,允许手动获取和释放锁。常见实现包括 ReentrantLock 和 ReadWriteLock。
ReentrantLock:
可重入锁,支持公平锁和非公平锁。
提供了 lock() 和 unlock() 方法,必须手动释放锁,避免死锁。
支持条件变量(Condition),可以实现多条件等待与通知。
ReadWriteLock:
提供了读写锁(ReadLock 和 WriteLock),允许多个线程并发读取,同时只允许一个线程写入。
提高读多写少场景的性能。
优点:
灵活性高,可以控制锁的获取和释放。
支持公平锁,避免线程饥饿。
缺点:
必须手动管理锁的释放,容易导致死锁或锁泄漏。
使用:
适用于需要更高控制度的同步场景,例如:
需要尝试获取锁而不阻塞的情况。
需要可中断的锁获取。
需要使用条件变量来实现更复杂的同步逻辑。
需要公平锁的场景。
特性:
- 手动控制:Lock 提供了更灵活的锁机制,开发者需要手动获取和释放锁,这也意味着需要更加谨慎地管理锁,确保在 finally 块中释放锁,避免死锁。
- 可中断性:Lock 提供了可中断的锁获取方式,如 lockInterruptibly(),线程在等待锁时可以响应中断。
- 尝试获取锁:Lock 提供了 tryLock() 方法,允许线程尝试获取锁,如果锁不可用,则立即返回 false,而不阻塞线程。
- 条件变量:Lock 提供了 Condition 接口,用于实现复杂的等待/通知机制,可以有多个条件队列(相比于 synchronized 的 wait/notify 更灵活)。
- 公平锁:ReentrantLock 提供了公平锁(FIFO)和非公平锁两种模式,公平锁可以避免线程饥饿。
ReentrantLock lock = new ReentrantLock();
lock.lock();
try {
// 临界区代码
} finally {
lock.unlock();
}
synchronized 和 Lock 的区别
4.3 其他锁
StampedLock:
Java 8 引入的读写锁,提供乐观读锁,支持 tryOptimisticRead(),在无竞争情况下性能更好。
ReentrantReadWriteLock:
读写锁实现,允许多个读线程并发执行,提高性能。
4.4 锁的优化
锁粗化: 将多次加锁和解锁操作合并成一个大的锁操作,减少锁的频繁请求和释放,提高性能。
锁消除: JIT 编译器在运行时分析程序,消除不可能存在竞争的锁操作,进一步提升性能。
自旋锁: 线程在短时间内反复尝试获取锁,而不是立即阻塞,提高线程竞争时的性能。
5. 原子操作
5.1 java.util.concurrent.atomic 包
简介: Java 提供了 java.util.concurrent.atomic 包,包含了一系列原子变量类,如 AtomicInteger
、AtomicLong
、AtomicReference
等。这些类利用底层的 CAS(Compare-And-Swap)操作实现了无锁的线程安全性。
实现机制: 利用 CAS 指令,通过硬件支持的原子操作来直接更新变量的值,避免使用锁。CAS 操作包括三个参数:内存位置、期望值和新值,只有当内存位置的当前值等于期望值时,才将其更新为新值。
常见类:
AtomicInteger:
提供了对 int 类型的原子操作,如 incrementAndGet()、compareAndSet() 等。
AtomicLong:
类似于 AtomicInteger,用于 long 类型。
AtomicReference:
提供对对象引用的原子操作,适用于不可变对象的线程安全操作。
AtomicStampedReference:
解决了 ABA 问题,除了存储值,还存储一个时间戳或标记。
优点:
无锁实现,性能高,避免了线程阻塞和上下文切换。
缺点:
适用于简单的原子性操作,对于复杂的同步逻辑仍需使用锁。
AtomicInteger atomicInteger = new AtomicInteger(0);
atomicInteger.incrementAndGet(); // 原子递增
atomicInteger.compareAndSet(1, 2); // CAS 操作
5.2 LongAdder 和 LongAccumulator
简介: LongAdder 和 LongAccumulator 是 Java 8 引入的,针对高并发情况下的性能优化。
LongAdder: 将变量分成多个分段,每个线程操作不同的分段,减少竞争,提高并发性能。适用于频繁增减操作。
LongAccumulator: 类似于 LongAdder,但可以自定义累加操作,如最大值、最小值。
LongAdder longAdder = new LongAdder();
longAdder.increment();
longAdder.sum();
5.3 CAS 的局限性
ABA 问题: 线程在执行 CAS 时,如果某变量的值从 A 变为 B,再变回 A,CAS 无法检测到这种变化。可以使用 AtomicStampedReference 来解决。
自旋导致的性能问题: 在高并发情况下,CAS 操作可能反复失败,导致自旋等待,消耗 CPU 资源。
只能保证一个变量的原子性: 对于涉及多个变量的复杂操作,需要使用锁或更高级的同步机制。
6. 其他工具
Java 并发编程中,ForkJoinPool、Future、Callable、以及 CompletableFuture 是非常重要的工具和接口,用于处理异步任务、多线程任务的拆分与合并以及任务的执行和结果获取。
6.1 ForkJoinPool
概述
ForkJoinPool 是 Java 7 引入的框架,用于并行执行任务。它基于“工作窃取”算法,将一个大任务分解为若干小任务,然后通过多线程并行执行这些小任务,并将结果合并。
适用于可分解的任务(如递归任务),可以充分利用多核处理器的性能。
工作原理
任务拆分:任务分为 ForkJoinTask,通常是 RecursiveTask(有返回值)和 RecursiveAction(无返回值)。
工作窃取:每个线程维护一个双端队列,自己处理队列前端的任务,如果空闲则窃取其他线程队列后端的任务,提高资源利用率。
使用示例:
RecursiveTask:用于有返回值的任务。
import java.util.concurrent.RecursiveTask;
import java.util.concurrent.ForkJoinPool;
public class SumTask extends RecursiveTask<Integer> {
private final int[] array;
private final int start, end;
public SumTask(int[] array, int start, int end) {
this.array = array;
this.start = start;
this.end = end;
}
@Override
protected Integer compute() {
if (end - start <= 10) { // 小任务直接计算
int sum = 0;
for (int i = start; i < end; i++) {
sum += array[i];
}
return sum;
} else { // 大任务拆分
int mid = (start + end) / 2;
SumTask leftTask = new SumTask(array, start, mid);
SumTask rightTask = new SumTask(array, mid, end);
leftTask.fork();
int rightResult = rightTask.compute();
int leftResult = leftTask.join();
return leftResult + rightResult;
}
}
}
public class ForkJoinExample {
public static void main(String[] args) {
ForkJoinPool pool = new ForkJoinPool();
int[] array = new int[100];
for (int i = 0; i < 100; i++) array[i] = i;
SumTask task = new SumTask(array, 0, array.length);
int result = pool.invoke(task);
System.out.println("Sum: " + result);
}
}
6.2 Future 和 Callable
1.Callable
概述:Callable 是 Java 5 引入的接口,用于并发任务的执行,可以返回一个结果并抛出异常。它是对 Runnable 的增强。
主要方法:call() 方法,用于执行任务并返回结果。
import java.util.concurrent.Callable;
Callable<Integer> task = () -> {
// 执行一些耗时操作
return 123;
};
2.Future
概述:Future 用于表示异步任务的结果,提供了检查任务是否完成、等待任务完成以及获取任务结果的方法。
主要方法:
get():阻塞等待任务完成,并获取结果。
isDone():检查任务是否完成。
cancel():取消任务。
使用示例
import java.util.concurrent.Callable;
import java.util.concurrent.Future;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
public class FutureExample {
public static void main(String[] args) {
ExecutorService executor = Executors.newSingleThreadExecutor();
Callable<Integer> task = () -> {
// 模拟耗时任务
Thread.sleep(1000);
return 42;
};
Future<Integer> future = executor.submit(task);
try {
System.out.println("Result: " + future.get()); // 阻塞等待结果
} catch (InterruptedException | ExecutionException e) {
e.printStackTrace();
} finally {
executor.shutdown();
}
}
}
6.3 CompletableFuture
1.概述
CompletableFuture 是 Java 8 引入的类,提供了更丰富的功能用于异步编程。它支持回调、组合多个异步任务,以及处理异常。
它实现了 Future 接口,提供了更强大的 API,如 thenApply、thenAccept、thenCombine、exceptionally 等方法。
2. 主要方法
创建异步任务:supplyAsync()、runAsync()。
结果处理:
thenApply():处理并转换结果。
thenAccept():处理结果,无返回值。
thenCombine():组合两个异步任务的结果。
异常处理:exceptionally()。
组合:allOf()、anyOf()。
3.使用示例
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.ExecutionException;
public class CompletableFutureExample {
public static void main(String[] args) {
CompletableFuture<Integer> future = CompletableFuture.supplyAsync(() -> {
// 异步任务
return 42;
}).thenApply(result -> result * 2) // 对结果进行处理
.exceptionally(ex -> {
ex.printStackTrace();
return 0; // 异常处理
});
try {
System.out.println("Result: " + future.get()); // 获取结果
} catch (InterruptedException | ExecutionException e) {
e.printStackTrace();
}
}
}
6.4 工具选择与使用场景
1.ForkJoinPool:
适用于可分解的递归任务,如分治算法(快速排序、归并排序)、矩阵运算等。
通过“工作窃取”机制可以充分利用多核处理器,提升性能。
2.Future 和 Callable:
适用于需要从异步任务中获取返回结果的场景,或者任务需要抛出异常。
Future 提供了阻塞获取结果的方法,在任务未完成时会阻塞当前线程。
3.CompletableFuture:
适用于复杂的异步编排和处理,包括链式调用、任务组合和异步回调。
支持非阻塞获取结果,可以进行更丰富的任务组合与异常处理。