目录
并发集合
1. ConcurrentHashMap:
2. CopyOnWriteArrayList:
3. CopyOnWriteArraySet:
4. BlockingQueue系列:
5. ConcurrentSkipListMap 和 ConcurrentSkipListSet:
6. ConcurrentLinkedDeque:
注意事项
七种线程池
1. FixedThreadPool
2.CachedThreadPool
3.ScheduledThreadPool
4.SingleThreadExecutor
5.SingleThreadScheduledExecutor
6.ForkJoinPool
7. CustomThreadPool(自定义线程池)
四种阻塞队列
1. ArrayBlockingQueue
2. LinkedBlockingQueue
3. PriorityBlockingQueue
4. DelayQueue
综合示例
1.在实际应用中,应该确保线程池和阻塞队列的正确关闭和资源释放。
2.消费者任务中的无限循环需要额外的机制来优雅地终止,例如使用中断或额外的停止标志。
3.根据具体需求选择合适的线程池和阻塞队列实现。
并发集合
1. ConcurrentHashMap:
2. CopyOnWriteArrayList:
3. CopyOnWriteArraySet:
4. BlockingQueue系列:
5. ConcurrentSkipListMap 和 ConcurrentSkipListSet:
6. ConcurrentLinkedDeque:
import java.util.ArrayList;
import java.util.List;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.locks.Lock;
import java.util.concurrent.locks.ReentrantLock;
public class ConcurrentWriteExample {
// 定义线程数量和每个线程写入的次数
private static final int NUM_THREADS = 10;
private static final int NUM_WRITES_PER_THREAD = 100;
// 共享资源:一个线程安全的ArrayList(实际上这里不是线程安全的,因为我们会手动添加锁)
// 注意:如果直接使用ArrayList进行并发写入,需要外部同步机制。
// 更推荐使用ConcurrentLinkedQueue或其他并发集合。
private static final List<String> sharedList = new ArrayList<>();
// 用于控制对共享资源的访问的锁
private static final Lock lock = new ReentrantLock();
public static void main(String[] args) {
// 创建一个固定大小的线程池
ExecutorService executorService = Executors.newFixedThreadPool(NUM_THREADS);
// 提交多个任务到线程池
for (int i = 0; i < NUM_THREADS; i++) {
executorService.submit(() -> {
// 每个任务会循环写入数据到共享列表中
for (int j = 0; j < NUM_WRITES_PER_THREAD; j++) {
writeData("Data from thread " + Thread.currentThread().getId());
}
});
}
// 关闭线程池,不再接受新任务,但会继续执行已提交的任务
executorService.shutdown();
// 等待所有任务完成
// 注意:在实际应用中,更推荐使用awaitTermination来等待指定时间或直到任务完成,
// 并且要处理InterruptedException。这里的简单while循环只是为了演示。
while (!executorService.isTerminated()) {
// 等待所有任务完成
}
// 打印共享列表的大小和内容,以验证并发写入的结果
System.out.println("Shared list size: " + sharedList.size());
System.out.println("Shared list content: " + sharedList);
}
// 写入数据到共享列表的方法,使用锁来确保线程安全
private static void writeData(String data) {
// 获取锁,如果锁不可用,则当前线程会阻塞直到锁可用
lock.lock();
try {
// 在锁的保护下,安全地向共享列表添加数据
sharedList.add(data);
} finally {
// 确保无论如何都会释放锁,以避免死锁
lock.unlock();
}
}
}
注意事项
七种线程池
1. FixedThreadPool
// 创建一个固定大小的线程池
ExecutorService fixedThreadPool = Executors.newFixedThreadPool(3);
2.CachedThreadPool
示例:
// 创建一个可缓存的线程池
ExecutorService cachedThreadPool = Executors.newCachedThreadPool();
3.ScheduledThreadPool
特点:可以调度任务在给定的延迟后运行,或者定期执行。
示例:
// 创建一个调度线程池
ScheduledExecutorService scheduledThreadPool =
Executors.newScheduledThreadPool(3);
scheduledThreadPool.schedule(() -> {
System.out.println("Task executed after delay");
}, 1, TimeUnit.SECONDS);
4.SingleThreadExecutor
// 创建一个单线程的线程池
ExecutorService singleThreadExecutor = Executors.newSingleThreadExecutor();
5.SingleThreadScheduledExecutor
// 实际上可以通过ScheduledThreadPool(1)来实现单线程调度
ScheduledExecutorService singleThreadScheduledExecutor =
Executors.newScheduledThreadPool(1);
6.ForkJoinPool
特点:用于执行分而治之算法的任务,适用于大量小规模任务的并行处理。
// 创建一个ForkJoinPool
ForkJoinPool forkJoinPool = new ForkJoinPool();
forkJoinPool.submit(() -> {
// 分而治之的任务
});
7. CustomThreadPool(自定义线程池)
// 创建一个自定义的线程池
ThreadPoolExecutor customThreadPool = new ThreadPoolExecutor(
2, // 核心线程数
4, // 最大线程数
60L, // 线程存活时间
TimeUnit.SECONDS, // 时间单位
new LinkedBlockingQueue<Runnable>() // 任务队列
);
四种阻塞队列
1. ArrayBlockingQueue
BlockingQueue<Integer> arrayBlockingQueue = new ArrayBlockingQueue<>(10);
2. LinkedBlockingQueue
BlockingQueue<Integer> linkedBlockingQueue = new LinkedBlockingQueue<>();
3. PriorityBlockingQueue
BlockingQueue<Integer> priorityBlockingQueue = new PriorityBlockingQueue<>();
4. DelayQueue
BlockingQueue<DelayedElement> delayQueue = new DelayQueue<>();
// DelayedElement 需要实现 Delayed 接口
综合示例
import java.util.concurrent.*;
public class ThreadPoolAndBlockingQueueExample {
public static void main(String[] args) {
// 创建一个固定大小的线程池
ExecutorService fixedThreadPool = Executors.newFixedThreadPool(3);
// 创建一个有界的阻塞队列
BlockingQueue<Integer> arrayBlockingQueue = new ArrayBlockingQueue<>(10);
// 生产者任务
Runnable producer = () -> {
try {
for (int i = 0; i < 20; i++) {
arrayBlockingQueue.put(i); // 如果队列满了,会阻塞
System.out.println("Produced: " + i);
}
} catch (InterruptedException e) {
Thread.currentThread().interrupt();
}
};
// 消费者任务
Runnable consumer = () -> {
try {
while (!Thread.currentThread().isInterrupted()) {
Integer value = arrayBlockingQueue.take(); // 如果队列空了,会阻塞
System.out.println("Consumed: " + value);
// 模拟任务处理时间
Thread.sleep(500);
}
} catch (InterruptedException e) {
Thread.currentThread().interrupt();
}
};
// 提交生产者任务给线程池
fixedThreadPool.submit(producer);
// 提交多个消费者任务给线程池(这里为了演示只提交一个,实际可以根据需要提交多个)
fixedThreadPool.submit(consumer);
// 注意:为了示例简洁,这里没有添加关闭线程池的代码。在实际应用中,应该在适当的时候调用 shutdown() 方法。
// 优雅地关闭线程池
try {
Thread.sleep(10000); // 等待一段时间,允许生产者和消费者执行
} catch (InterruptedException e) {
Thread.currentThread().interrupt();
}
// 关闭线程池
fixedThreadPool.shutdown();
}
}