文章目录
- 一、并发集合框架
- 1. 简介
- 2. 接口Iterable
- 2. 接口Collection
- 3. 接口List
- 4. 接口Set
- 5. 接口Queue
- 6. Deque
- 二、非阻塞队列
- 1. 概念
- 2. ConcurrentHashMap使用
- (1)简介
- (2)验证HashMap不是线程安全的
- (3)验证HashTable是线程安全的
- (4)验证HashTable不支持并发remove()
- (5)验证ConcurrentHashMap是线程安全的
- (6)验证ConcurrentHashMap支持并发remove()删除操作
- 3. ConcurrentSkipListMap使用
- 4. ConcurrentLinkedQueue使用
- 5. ConcurrentLinkedDeque使用
- 6. 类CopyOnWriteArrayList的使用
- 7. 类CopyOnWriteArraySet的使用
- 三、阻塞队列
- 1. 概念
- 2. 类ArrayBlockingQueue与公平/非公平锁的使用
- 3. 类PriorityBlockingQueue的使用
- 4. 类LinkedBlockingQueue的使用
- 5. 类LinkedBlockingDeque的使用
- 6. 类SynChronousQueue的使用
- 7. 类DelayQueue的使用
- 8. 类LinkedTransferQueue的使用
一、并发集合框架
1. 简介
JDK提供了丰富的集合框架工具,Java语言的集合框架的父接口时Iterable,从这个接口向下一一继承可以得到完整的Java集合架构。集合架构的继承与实现关系相当的复杂,出现了3个继承分支(List、Set、Queue-图中没画出来),下面来分析一下:
2. 接口Iterable
源码如下:
public interface Iterable<T> {
Iterator<T> iterator();
default void forEach(Consumer<? super T> action) {
Objects.requireNonNull(action);
for (T t : this) {
action.accept(t);
}
}
default Spliterator<T> spliterator() {
return Spliterators.spliteratorUnknownSize(iterator(), 0);
}
}
该接口的主要作用是迭代循环,接口Iterable结构非常简洁,其中包括iterator(),它返回一个iterator对象,以进行遍历使用。接口Iterable是Java中的一个泛型接口,它位于java.lang包下。它定义了一种表示可迭代对象的协议,该协议要求实现类能够生成一个迭代器(Iterator)对象,以便对元素进行迭代访问。
2. 接口Collection
源码如下:
public interface Collection<E> extends Iterable<E> {
int size();
boolean isEmpty();
boolean contains(Object o);
Iterator<E> iterator();
Object[] toArray();
<T> T[] toArray(T[] a);
boolean add(E e);
boolean remove(Object o);
boolean containsAll(Collection<?> c);
boolean addAll(Collection<? extends E> c);
boolean removeAll(Collection<?> c);
default boolean removeIf(Predicate<? super E> filter) {
Objects.requireNonNull(filter);
boolean removed = false;
final Iterator<E> each = iterator();
while (each.hasNext()) {
if (filter.test(each.next())) {
each.remove();
removed = true;
}
}
return removed;
}
boolean retainAll(Collection<?> c);
void clear();
boolean equals(Object o);
int hashCode();
@Override
default Spliterator<E> spliterator() {
return Spliterators.spliterator(this, 0);
}
default Stream<E> stream() {
return StreamSupport.stream(spliterator(), false);
}
default Stream<E> parallelStream() {
return StreamSupport.stream(spliterator(), true);
}
}
Collection接口提供了一种通用的方式来操作集合,并定义了基本的集合操作,例如添加、删除、查找等。它的子接口和实现类(例如List、Set、Queue等)在其基础上添加了更多特定的行为和约束
3. 接口List
接口list对接口Conllection进行了扩展,运行根据索引位置操作数据(数组下标),并且允许内容重复,接口List最常用的非并发实现类是ArrayList,它是非线程安全的,可以对数据以链表的形式进行组织,使数据呈现有序的效果。如果想使用线程安全的链表则可以使用Vectror类。类Vector是线程安全的,所以在多线程并发操作数据时可以无误的处理集合中的数据。需要说明一下,多个线程分别调用Vector的iterator方法,返回Iterator对象,再调用remove()时会出现ConcurrentModificationException异常,也就是说并不支持Iterator并发删除,所以在这点功能上还是有欠缺的。
4. 接口Set
接口Set也就是对接口Conllection进行了扩展,特点是不允许内容重复,排序的方式为自然排序,其防止元素重复的原理是元素需要重写hashcode()方法和equals方法。接口set最常用的非并发实现类是HashSet,HashSet默认以无序的方式组织元素,LinkedHashSet类可以有序的组织元素,接口Set还有一个实现类,即TreeSet,它不仅实现了接口Set还实现了SortedSet和NavigaleSet。接口SortedSet的父接口是Set,接口SortedSet和接口NavigableSet在功能上得到了扩展,比如可以获取Set中内容的子集,支持获取表头和表尾的数据。
5. 接口Queue
接口Queue对接口Conllection进行了扩展,它可以方便的操作列头,接口Queue的非并发实现类有PriorityQueue,它是一个基于优先级的无界优先级队列。
6. Deque
接口Qeque支持对表头的操作,而接口Deque不仅支持对表头的操作,而且还支持对表尾的操作,所以Deque的全称为双端队列,接口Deque的非并发实现类有ArrayDeque和LinkedList,他们之间有一些区别,如果只想从队列两端获取数据,则使用ArrayDeque,如果想从队列两端获取数据的同时还可以根据索引的位置操作数据,则使用LinkedList。
二、非阻塞队列
1. 概念
非阻塞队列(Non-blocking Queue)是一种线程安全的队列数据结构,它提供了一组非阻塞的操作,使得多个线程可以并发地对队列进行操作而无需互斥锁。在传统的阻塞队列中,当队列为空时,获取元素的操作会被阻塞,直到队列中有新的元素可用;当队列已满时,插入元素的操作会被阻塞,直到队列中有空的位置可用。这种阻塞行为需要使用锁或其他同步机制来实现,会导致线程在队列操作上进行等待,从而可能降低系统的并发性能。相反,非阻塞队列通过使用一些特定的算法和原子操作,允许多个线程并发地进行插入和删除操作,而不会出现线程阻塞。在非阻塞队列中,线程在执行插入或删除操作时,会检查队列的状态并采取适当的措施,例如重试或者放弃操作。这样,即使某个线程的操作失败,其他线程仍然可以继续执行操作,从而提高了并发性能和响应性。非阻塞队列通常使用一些原子操作,如CAS(Compare and Swap)来实现线程安全和并发操作。常见的非阻塞队列实现包括ConcurrentLinkedQueue和LinkedTransferQueue等。非阻塞队列适用于高并发环境下的多线程场景,其中大量线程需要并发地进行队列操作。通过避免线程的阻塞和等待,非阻塞队列可以提供更好的性能和可伸缩性。然而,非阻塞队列的实现较为复杂,需要考虑并发一致性和原子操作等问题,因此在使用时需要谨慎处理相关逻辑。
在JDK包中,常见的非阻塞队列有:
ConcurrentHashMap
,ConcurrentSkipListMap
,ConcurrentSkipListSet
,ConcurrentLinkedQueue
,ConcurrentLinkedDeque
,CopyOnWriteArrayList
,CopyOnWriteArraySet
,下面一一来介绍:
2. ConcurrentHashMap使用
(1)简介
ConcurrentHashMap是支持并发操作的Map,ConcurrentHashMap是线程安全的主要基于以下几个原因:
- 分段锁机制:ConcurrentHashMap内部使用了分段锁(Segment Locking)的机制,将内部哈希表分割成多个段(Segment)。每个段拥有独立的锁,因此不同的线程可以并发地访问不同的段,减少了锁的竞争范围,提高了并发性能。相比于使用单一锁的全局同步机制,分段锁机制能够提供更好的并发度。
- 使用volatile关键字:ConcurrentHashMap内部的一些关键字段(如table、sizeCtl等)使用了volatile关键字进行修饰。volatile关键字确保了多个线程之间对这些字段的可见性,即一个线程对字段的修改对其他线程是可见的。这样,当一个线程修改了ConcurrentHashMap的状态时,其他线程能够立即感知到这个修改,避免了数据不一致的情况。
- CAS操作:ConcurrentHashMap使用了CAS(Compare and Swap)操作来实现对节点的原子操作。CAS操作是一种乐观锁机制,通过比较某个变量的当前值与期望值是否相等,如果相等则执行更新操作,否则重试或放弃操作。CAS操作允许多个线程并发地对数据进行修改,而不需要加锁,减少了锁的竞争开销。
通过上述机制的结合使用,ConcurrentHashMap能够提供高度的线程安全性和并发性能。它允许多个线程并发地读取和修改映射表,而无需显式的同步机制。这使得ConcurrentHashMap成为处理高并发场景下的首选数据结构,尤其适用于读多写少的情况。需要注意的是,虽然ConcurrentHashMap提供了线程安全的操作,但并不意味着所有操作都是原子的。对于一些复合操作(如putIfAbsent()),虽然单个操作是原子的,但多个操作的组合可能会导致一致性问题。因此,在特定的使用场景中,仍需要根据需求考虑额外的同步措施来保证一致性。
(2)验证HashMap不是线程安全的
代码如下:
public class Main{
private static HashMap<Integer, String> map = new HashMap<>();
public static void main(String[] args) throws InterruptedException {
Thread thread1 = new Thread(new Runnable() {
@Override
public void run() {
for (int i = 0; i < 10000; i++) {
map.put(i, "Thread1-" + i);
}
}
});
Thread thread2 = new Thread(new Runnable() {
@Override
public void run() {
for (int i = 0; i < 10000; i++) {
map.put(i, "Thread2-" + i);
}
}
});
thread1.start();
thread2.start();
thread1.join();
thread2.join();
// 输出map的大小
System.out.println("Map size: " + map.size());
}
}
由输出我们可以知道,出现了问题,在这个例子中,由于两个线程同时执行循环插入操作,存在竞争条件。当线程1执行到某个键时,线程2也可能同时执行到相同的键,并尝试插入不同的值。因为HashMap不提供同步机制来处理并发冲突,结果可能是任意的。
(3)验证HashTable是线程安全的
上面的代码我们改装一下:
public class Main{
private static Hashtable<Integer, String> map = new Hashtable<>();
public static void main(String[] args) throws InterruptedException {
Thread thread1 = new Thread(new Runnable() {
@Override
public void run() {
for (int i = 0; i < 10000; i++) {
map.put(i, "Thread1-" + i);
}
}
});
Thread thread2 = new Thread(new Runnable() {
@Override
public void run() {
for (int i = 0; i < 10000; i++) {
map.put(i, "Thread1-" + i);
}
}
});
thread1.start();
thread2.start();
thread1.join();
thread2.join();
// 输出map的大小
System.out.println("Map size: " + map.size());
}
}
结果为10000,输出正确,说明HashTable是线程安全的
(4)验证HashTable不支持并发remove()
import java.util.Hashtable;
import java.util.Iterator;
public class Main {
private static Hashtable<Integer, String> table = new Hashtable<>();
public static void main(String[] args) {
// 添加一些键值对到 Hashtable
table.put(1, "One");
table.put(2, "Two");
table.put(3, "Three");
// 创建一个线程,在迭代过程中删除元素
Thread thread = new Thread(new Runnable() {
@Override
public void run() {
Iterator<Integer> iterator = table.keySet().iterator();
while (iterator.hasNext()) {
int key = iterator.next();
if (key == 2) {
iterator.remove(); // 尝试删除元素
}
}
}
});
// 启动线程
thread.start();
// 在主线程中遍历 Hashtable
try {
for (Integer key : table.keySet()) {
System.out.println(key + ": " + table.get(key));
Thread.sleep(100); // 添加一些延迟
}
} catch (InterruptedException e) {
e.printStackTrace();
}
}
}
上述代码中,创建了一个 Hashtable 对象 table,并在其中添加了三个键值对。然后,创建了一个线程,在迭代过程中尝试删除键值对中的某个元素。在主线程中,使用增强的 for 循环遍历 Hashtable 并打印键值对。由于存在另一个线程在迭代过程中尝试删除元素,因此可能会抛出 ConcurrentModificationException 异常。所以HashTable仅仅支持多线程环境下的Put添加操作,却不支持remove操作
(5)验证ConcurrentHashMap是线程安全的
public class Main{
private static ConcurrentHashMap<Integer, String> map = new ConcurrentHashMap<>();
public static void main(String[] args) throws InterruptedException {
Thread thread1 = new Thread(new Runnable() {
@Override
public void run() {
for (int i = 0; i < 10000; i++) {
map.put(i, "Thread1-" + i);
}
}
});
Thread thread2 = new Thread(new Runnable() {
@Override
public void run() {
for (int i = 0; i < 10000; i++) {
map.put(i, "Thread1-" + i);
}
}
});
thread1.start();
thread2.start();
thread1.join();
thread2.join();
// 输出map的大小
System.out.println("Map size: " + map.size());
}
}
由输出结果可以知道,ConcurrentHashMap是线程安全的
(6)验证ConcurrentHashMap支持并发remove()删除操作
public class Main {
private static ConcurrentHashMap<Integer, String> table = new ConcurrentHashMap<>();
public static void main(String[] args) {
// 添加一些键值对到 Hashtable
table.put(1, "One");
table.put(2, "Two");
table.put(3, "Three");
// 创建一个线程,在迭代过程中删除元素
Thread thread = new Thread(new Runnable() {
@Override
public void run() {
Iterator<Integer> iterator = table.keySet().iterator();
while (iterator.hasNext()) {
int key = iterator.next();
if (key == 2) {
iterator.remove(); // 尝试删除元素
}
}
}
});
// 启动线程
thread.start();
// 在主线程中遍历 Hashtable
try {
for (Integer key : table.keySet()) {
System.out.println(key + ": " + table.get(key));
Thread.sleep(100); // 添加一些延迟
}
} catch (InterruptedException e) {
e.printStackTrace();
}
}
}
可以看到删除成功了,说明ConcurrentHashMap在执行remove操作也是线程安全的,ConcurrentHashMap 在执行 remove 操作时也是线程安全的,这是因为它内部使用了一种称为分段锁(Segment Locking)的机制。ConcurrentHashMap 将整个数据结构分成了多个段(Segments),每个段都拥有自己的锁。不同的键值对会被分配到不同的段中,这样不同的线程可以同时操作不同的段,从而实现并发操作。当执行 remove 操作时,只需要获取对应段的锁,而不需要对整个 ConcurrentHashMap 加锁。这意味着在删除元素时,只有涉及到同一个段的线程才会被阻塞,其他线程可以继续访问其他段,提高了并发性能。因此,ConcurrentHashMap 的 remove 操作是线程安全的,并且能够在多线程环境下正确地处理删除操作。
3. ConcurrentSkipListMap使用
类ConcurrentHashMap不支持排序,类LinkedHashMap支持Key排序,但不支持并发。那么如果出现这种既要求并发安全,又要求排序的时候就可以使用ConcurrentSkipListMap
public class Main {
public static void main(String[] args) throws InterruptedException {
Myservice myservice=new Myservice();
Thread thread1=new Thread(new Runnable() {
@Override
public void run() {
myservice.testmethod();
}
});
Thread thread2=new Thread(new Runnable() {
@Override
public void run() {
myservice.testmethod();
}
});
Thread thread3=new Thread(new Runnable() {
@Override
public void run() {
myservice.testmethod();
}
});
Thread thread4=new Thread(new Runnable() {
@Override
public void run() {
myservice.testmethod();
}
});
Thread thread5=new Thread(new Runnable() {
@Override
public void run() {
myservice.testmethod();
}
});
Thread thread6=new Thread(new Runnable() {
@Override
public void run() {
myservice.testmethod();
}
});
thread1.start();
Thread.sleep(1000);
thread2.start();
Thread.sleep(1000);
thread3.start();
Thread.sleep(1000);
thread4.start();
Thread.sleep(1000);
thread5.start();
Thread.sleep(1000);
thread6.start();
}
}
class User implements Comparable<User>{
private int id;
private String username;
public User(int id,String username){
super();
this.id=id;
this.username=username;
}
public int getId() {
return id;
}
public void setId(int id) {
this.id = id;
}
public String getUsername() {
return username;
}
public void setUsername(String username) {
this.username = username;
}
//定义比较规则
@Override
public int compareTo(User o) {
return this.getId()>o.getId()? 1: -1;
}
}
class Myservice{
public ConcurrentSkipListMap<User,String> map=new ConcurrentSkipListMap<>();
public Myservice(){
User user1=new User(1,"a");
User user2=new User(6,"b");
User user3=new User(3,"c");
User user4=new User(8,"f");
User user5=new User(2,"3");
User user6=new User(9,"23");
User user7=new User(11,"dasf");
map.put(user1,"u1");
map.put(user2,"u2");
map.put(user3,"u3");
map.put(user4,"u4");
map.put(user5,"u5");
map.put(user6,"u6");
map.put(user7,"u7");
}
public void testmethod(){
Map.Entry<User,String> entry=map.pollFirstEntry();
System.out.println("Map.size:"+map.size());
User user=entry.getKey();
System.out.println(user.getId()+" "+user.getUsername()+" "+map.get(user)+" "+entry.getValue());
}
}
由输出可以得出,是有序的,并且不支持数据重复
4. ConcurrentLinkedQueue使用
ConcurrentLinkedQueue 是 Java 中的一个线程安全的非阻塞队列实现,它是基于链表的先进先出(FIFO)队列。与传统的阻塞队列不同,ConcurrentLinkedQueue 不使用锁或阻塞操作来实现线程安全性。它利用无锁(lock-free)算法和原子操作,以实现高效的并发访问。ConcurrentLinkedQueue 的主要特点包括:
- 线程安全:ConcurrentLinkedQueue 是线程安全的,可以被多个线程同时操作而无需额外的同步措施。它使用原子操作和无锁算法来保证并发访问的安全性。
- 非阻塞算法:ConcurrentLinkedQueue 使用非阻塞算法,不会出现线程被阻塞等待队列操作的情况。这使得它具有良好的并发性能和可伸缩性。
- 无界队列:ConcurrentLinkedQueue 是一个无界队列,它不会限制队列的容量。可以根据需求添加任意数量的元素。
- FIFO顺序:ConcurrentLinkedQueue 采用先进先出(FIFO)的顺序,即最先插入的元素最先被移除。
- 迭代支持:ConcurrentLinkedQueue 提供了迭代器(Iterator)来遍历队列中的元素。迭代器遍历时不会抛出 ConcurrentModificationException 异常,并且可以在并发环境下安全使用。
ConcurrentLinkedQueue 提供了常见的队列操作,如插入元素、获取头部元素、移除头部元素等。一些常用的方法包括 offer()、poll()、peek() 等。需要注意的是,ConcurrentLinkedQueue 并不保证队列的一致性排序。在高度并发的情况下,可能会出现一些不确定的元素顺序。如果需要有序性,可以考虑使用 PriorityQueue 或其他有序队列实现。总结起来,ConcurrentLinkedQueue 是一个高效的线程安全非阻塞队列实现,适用于并发环境下的多线程操作。它的设计目标是提供高性能和可伸缩性,适用于高度并发的场景。
public class Main {
public static void main(String[] args) {
try{
Myservice myservice=new Myservice();
ThreadA a=new ThreadA(myservice);
ThreadB b=new ThreadB(myservice);
a.start();
b.start();
a.join();
b.join();
System.out.println(myservice.queue.size());
}catch (InterruptedException e){
e.printStackTrace();
}
}
}
class Myservice{
public ConcurrentLinkedQueue queue=new ConcurrentLinkedQueue<>();
}
class ThreadA extends Thread{
private Myservice myservice;
public ThreadA(Myservice myservice){
super();
this.myservice=myservice;
}
@Override
public void run() {
for (int i = 0; i < 50; i++) {
myservice.queue.add("threadA"+(i+1));
}
}
}
class ThreadB extends Thread{
private Myservice myservice;
public ThreadB(Myservice myservice){
super();
this.myservice=myservice;
}
@Override
public void run() {
for (int i = 0; i < 50; i++) {
myservice.queue.add("threadB"+(i+1));
}
}
}
ConcurrentLinkedQueue有如下常用的方法:
- poll():没有获得数据时返回null,获得数据时则移除表头进行返回
- element():没有获得数据时出现NoSuchElementException异常,获得数据时则不移除表头,并将表头进行返回
- peek():没有获得数据时返回null,获得数据时则不移除表头,并将表头进行返回
5. ConcurrentLinkedDeque使用
ConcurrentLinkedDeque 是 Java 中的一个并发安全的双向链表实现,它是 Deque 接口的一个实现类。ConcurrentLinkedDeque 提供了高效的并发操作,可以在多线程环境下安全地进行插入、删除和访问元素的操作ConcurrentLinkedDeque 的特点如下:
-
无界容量:ConcurrentLinkedDeque 是一个无界容量的双向链表,可以根据需要动态地增加元素。
-
非阻塞算法:ConcurrentLinkedDeque 内部使用了非阻塞算法,不依赖于锁机制,因此可以在并发环境下提供更好的性能。
-
双向链表:ConcurrentLinkedDeque 支持从两端进行插入和删除元素的操作,可以在头部或尾部插入、删除和访问元素。
-
线程安全:ConcurrentLinkedDeque 是线程安全的,多线程可以同时进行操作而不会导致数据不一致的问题。它通过使用 CAS(Compare and Swap)操作和无锁算法来实现线程安全性。
-
迭代器支持:ConcurrentLinkedDeque 提供了迭代器,可以安全地遍历集合中的元素。迭代器遍历时不会抛出 ConcurrentModificationException 异常。
ConcurrentLinkedDeque 提供了一系列的方法,包括添加元素、删除元素、获取元素以及判断集合是否为空等操作。一些常用的方法有:
- addFirst(E e) 和 addLast(E e):在链表的头部或尾部添加指定元素。
- removeFirst() 和 removeLast():从链表的头部或尾部删除并返回元素。
- peekFirst() 和 peekLast():返回链表的头部或尾部的元素,但不删除。
- isEmpty():判断链表是否为空。
- size():返回链表中的元素个数。
需要注意的是,ConcurrentLinkedDeque 不允许存储 null 元素。由于 ConcurrentLinkedDeque 是线程安全的,它适用于多线程环境下需要高效并发操作的场景。
public class Main {
public static void main(String[] args) throws InterruptedException {
Myservice myservice=new Myservice();
ThreadA aFirst=new ThreadA(myservice);
ThreadA bFirst=new ThreadA(myservice);
ThreadB aLast=new ThreadB(myservice);
ThreadB bLast=new ThreadB(myservice);
aFirst.start();
Thread.sleep(1000);
aLast.start();
Thread.sleep(1000);
bFirst.start();
Thread.sleep(1000);
bLast.start();
}
}
class Myservice{
public ConcurrentLinkedDeque queue=new ConcurrentLinkedDeque<>();
public Myservice(){
for (int i = 0; i < 4; i++) {
queue.add("String"+(i+1));
}
}
}
class ThreadA extends Thread{
private Myservice myservice;
public ThreadA(Myservice myservice){
super();
this.myservice=myservice;
}
@Override
public void run() {
System.out.println("value="+myservice.queue.pollLast()+"queue.size()="+myservice.queue.size() );
}
}
class ThreadB extends Thread{
private Myservice myservice;
public ThreadB(Myservice myservice){
super();
this.myservice=myservice;
}
@Override
public void run() {
System.out.println("value="+myservice.queue.pollFirst()+"queue.size()="+myservice.queue.size() );
}
}
6. 类CopyOnWriteArrayList的使用
CopyOnWriteArrayList是Java集合框架中的一种线程安全的List实现。它是在Java 5中引入的,并位于java.util.concurrent包中。CopyOnWriteArrayList的特点是它通过在修改操作时创建底层数组的副本来实现线程安全。这意味着在对CopyOnWriteArrayList进行修改时,不会影响已经存在的迭代器或其他线程正在进行的读取操作。因此,它适用于读取操作频繁而修改操作相对较少的场景。下面是CopyOnWriteArrayList的一些关键特性:
-
线程安全性:CopyOnWriteArrayList是线程安全的,多个线程可以同时进行读取操作,而无需额外的同步操作。这是通过在修改操作时创建底层数组的副本来实现的。
-
不支持修改操作:CopyOnWriteArrayList不支持修改操作,例如add、remove和set等。如果需要对列表进行修改,必须先将其复制为一个新的副本,然后对副本进行修改。
-
高并发性能:由于CopyOnWriteArrayList的读取操作不需要进行同步,因此多个线程可以同时进行读取操作,而不会出现数据不一致的情况。这使得CopyOnWriteArrayList在读取操作频繁的场景下具有较好的性能。
-
迭代器一致性:CopyOnWriteArrayList的迭代器提供弱一致性的保证。即使在迭代器创建后,其他线程对列表进行修改,迭代器仍将返回创建时的快照数据。这可以防止在迭代过程中抛出ConcurrentModificationException异常。
需要注意的是,由于CopyOnWriteArrayList在修改操作时需要创建数组的副本,因此对于大型列表或频繁的修改操作,它可能会带来较高的内存开销。因此,应该根据具体情况选择使用CopyOnWriteArrayList
7. 类CopyOnWriteArraySet的使用
CopyOnWriteArraySet是Java集合框架中的一种线程安全的Set实现。它是在Java 5中引入的,并位于java.util.concurrent包中。CopyOnWriteArraySet基于CopyOnWriteArrayList实现,具有类似的特性。它采用了"写时复制"的策略,在修改操作时创建底层数组的副本,以保证线程安全性。下面是CopyOnWriteArraySet的一些关键特性:
-
线程安全性:CopyOnWriteArraySet是线程安全的,多个线程可以同时进行读取操作,而无需额外的同步操作。这是通过在修改操作时创建底层数组的副本来实现的。
-
基于HashSet实现:CopyOnWriteArraySet内部使用CopyOnWriteArrayList来存储元素。它使用HashSet的机制来保证集合中的元素唯一性。
-
不支持修改操作:与CopyOnWriteArrayList类似,CopyOnWriteArraySet不支持修改操作,例如add、remove等。如果需要对集合进行修改,必须先将其复制为一个新的副本,然后对副本进行修改。
-
高并发性能:由于CopyOnWriteArraySet的读取操作不需要进行同步,因此多个线程可以同时进行读取操作,而不会出现数据不一致的情况。这使得CopyOnWriteArraySet在读取操作频繁的场景下具有较好的性能。
-
迭代器一致性:CopyOnWriteArraySet的迭代器提供弱一致性的保证。即使在迭代器创建后,其他线程对集合进行修改,迭代器仍将返回创建时的快照数据。这可以防止在迭代过程中抛出ConcurrentModificationException异常。
需要注意的是,由于CopyOnWriteArraySet在修改操作时需要创建数组的副本,因此对于大型集合或频繁的修改操作,它可能会带来较高的内存开销。因此,应该根据具体情况选择使用CopyOnWriteArraySet。同时,由于它基于HashSet实现,不保证元素的顺序。
public class Main {
public static void main(String[] args) throws InterruptedException {
Myservice myservice=new Myservice();
ThreadA[] aArray=new ThreadA[100];
for (int i = 0; i < aArray.length; i++) {
aArray[i]=new ThreadA(myservice);
}
for (int i = 0; i < aArray.length; i++) {
aArray[i].start();
}
ThreadA.sleep(3000);
System.out.println(myservice.set.size());
}
}
class Myservice{
public CopyOnWriteArraySet set=new CopyOnWriteArraySet<>();
}
class ThreadA extends Thread{
private Myservice myservice;
public ThreadA(Myservice myservice){
super();
this.myservice=myservice;
}
@Override
public void run() {
for (int i = 0; i < 100; i++) {
myservice.set.add(ThreadA.currentThread().getName()+" "+"anything"+(i+1));
}
}
}
100个线程每个线程加100个元素,最终结果为10000,类ConcurrentSkipListSet是线程安全的有序集合,类CopyOnwriteArraySet是线程安全的无序集合,我们可以将类CopyOnwriteArraySet理解成线程安全HashSet
三、阻塞队列
1. 概念
阻塞队列(Blocking Queue)是Java集合框架中的一种特殊队列,提供了线程安全的操作和阻塞的特性。它主要用于在多线程环境下进行数据交换和协调。阻塞队列的特点是当队列为空时,获取操作(例如take()方法)会被阻塞,直到队列中有可用元素;当队列已满时,插入操作(例如put()方法)会被阻塞,直到队列有空闲位置。下面是阻塞队列的一些关键特性:
- 线程安全性:阻塞队列是线程安全的,多个线程可以同时进行插入和获取操作,而无需额外的同步操作。它通过内部的同步机制来保证线程安全性。
- 阻塞操作:阻塞队列提供了阻塞的特性。当队列为空时,获取操作会被阻塞,直到队列中有可用元素。当队列已满时,插入操作会被阻塞,直到队列有空闲位置。
- 支持限定容量:阻塞队列可以设置容量上限,限制队列中的元素数量。当队列达到容量上限时,后续的插入操作将被阻塞,直到队列有空闲位置。
- 支持等待超时:阻塞队列的插入和获取操作可以设置等待超时时间。如果在指定的超时时间内无法进行操作,将返回特定的结果或抛出异常。
- 提供多种实现:Java提供了多种阻塞队列的实现,例如ArrayBlockingQueue、LinkedBlockingQueue、PriorityBlockingQueue等,每种实现都具有不同的特性和适用场景。
阻塞队列在多线程编程中常用于实现生产者-消费者模型,其中一个线程负责生产数据并插入队列,另一个线程负责从队列中获取数据进行消费。它可以有效地进行线程间的数据交换和协调,避免了手动实现线程同步和等待/通知机制的复杂性。
2. 类ArrayBlockingQueue与公平/非公平锁的使用
ArrayBlockingQueue是Java集合框架中的一种阻塞队列实现,它基于数组实现,并提供了线程安全的操作和阻塞的特性。它是在Java 1.5中引入的,并位于java.util.concurrent包中。ArrayBlockingQueue的特点如下:
-
有界队列:ArrayBlockingQueue是一个有界队列,它在创建时需要指定一个固定的容量,即队列中元素的最大数量。这意味着队列的容量是固定的,无法动态扩展。
-
先进先出(FIFO):ArrayBlockingQueue遵循先进先出的原则,即最早插入的元素会最早被获取。
-
线程安全性:ArrayBlockingQueue是线程安全的,多个线程可以同时进行插入和获取操作,而无需额外的同步操作。它使用内部的锁机制来保证线程安全性。
-
阻塞操作:ArrayBlockingQueue提供了阻塞的特性。当队列为空时,获取操作会被阻塞,直到队列中有可用元素。当队列已满时,插入操作会被阻塞,直到队列有空闲位置。
-
公平性:ArrayBlockingQueue可以在构造时指定是否为公平队列。如果设置为公平队列,那么线程在进行插入和获取操作时将遵循先进先出的顺序。
-
可选的等待超时:ArrayBlockingQueue的插入和获取操作可以选择设置等待超时时间。如果在指定的超时时间内无法进行操作,将返回特定的结果或抛出异常。
-
由于ArrayBlockingQueue是有界队列,所以在插入操作时如果队列已满,后续的插入操作将会被阻塞。同样地,当队列为空时,获取操作将被阻塞,直到队列中有可用元素。这使得ArrayBlockingQueue非常适合于实现生产者-消费者模型和限流控制等场景。
需要注意的是,ArrayBlockingQueue的固定容量可能会导致一些限制和性能影响。当队列已满时,插入操作将会被阻塞,这可能会对生产者的速度造成影响。而且,由于它使用了内部锁机制,存在一定的竞争和上下文切换开销。因此,在选择使用ArrayBlockingQueue时,需要根据具体场景和需求进行权衡。
public class Main {
public static void main(String[] args) throws InterruptedException {
//限定阻塞队列的容量为3
ArrayBlockingQueue queue=new ArrayBlockingQueue(3);
queue.put("a1");
queue.put("a2");
queue.put("a3");
System.out.println("添加第四个元素,发生阻塞");
queue.put("a4");
System.out.println("这句话打印不出来");
}
}
出现阻塞的原因是,ArrayBlockingQueue的容量为3,而添加第四个元素的时候,put方法呈现阻塞状态,需要等待有空余时间再添加。
- put方法用于存放元素,如果没有空余的空间来存放数据 ,则呈阻塞状态
- take方法用于获取元素,如果没有元素获取,也会呈现阻塞状态
下面测试ArrayBlockingQueue的公平/非公平锁的效果
public class Main {
public static void main(String[] args) throws InterruptedException {
//非公平锁
Myservice myservice=new Myservice(false);
TakeThread[] array1=new TakeThread[10];
TakeThread[] array2=new TakeThread[10];
for (int i = 0; i < array1.length; i++) {
array1[i]=new TakeThread(myservice);
array1[i].setName("+++");
}
for (int i = 0; i < array1.length; i++) {
array1[i].start();
}
for (int i = 0; i < array2.length; i++) {
array2[i]=new TakeThread(myservice);
array2[i].setName("---");
}
Thread.sleep(300);
myservice.queue.put("abc");
myservice.queue.put("abc");
myservice.queue.put("abc");
myservice.queue.put("abc");
myservice.queue.put("abc");
myservice.queue.put("abc");
myservice.queue.put("abc");
myservice.queue.put("abc");
myservice.queue.put("abc");
myservice.queue.put("abc");
for (int i = 0; i < array2.length; i++) {
array2[i].start();
}
}
}
class Myservice{
public ArrayBlockingQueue queue;
public Myservice(boolean fair){
//fair为公平锁,否则为非公平锁
queue=new ArrayBlockingQueue(10,fair);
}
public void take(){
try{
System.out.println(Thread.currentThread().getName()+"take");
String takeString=" "+queue.take();
System.out.println(Thread.currentThread().getName()+"take value="+takeString);
}catch (InterruptedException e){
e.printStackTrace();
}
}
}
class TakeThread extends Thread{
private Myservice myservice;
public TakeThread(Myservice myservice){
super();
this.myservice=myservice;
}
@Override
public void run(){
myservice.take();
}
}
输出结果说明后启动的线程有机会优先获得锁,这证明线程的执行顺序是随机的,也就是存在不公平性,下面测试公平锁。
Myservice myservice=new Myservice(true);
因为是公平锁,所以获取数据时全部都是+++线程,是按先来后到的顺序获取的。
3. 类PriorityBlockingQueue的使用
PriorityBlockingQueue是Java集合框架中的一种阻塞队列实现,它基于优先级堆(Priority Heap)实现,并提供了线程安全的操作和阻塞的特性。它是在Java 1.5中引入的,并位于java.util.concurrent包中。PriorityBlockingQueue的特点如下:
-
无界队列:PriorityBlockingQueue是一个无界队列,它可以根据需要动态地增加元素的数量,没有固定的容量限制。
-
优先级排序:PriorityBlockingQueue中的元素按照优先级进行排序。元素的排序可以通过元素自身的比较器(Comparator)或元素自身的自然顺序(如果元素实现了Comparable接口)来确定。
-
线程安全性:PriorityBlockingQueue是线程安全的,多个线程可以同时进行插入和获取操作,而无需额外的同步操作。它使用内部的锁机制来保证线程安全性。
-
阻塞操作:PriorityBlockingQueue提供了阻塞的特性。当队列为空时,获取操作会被阻塞,直到队列中有可用元素。当队列已满时,插入操作会被阻塞,直到队列有空闲位置。
-
公平性:PriorityBlockingQueue可以在构造时指定是否为公平队列。如果设置为公平队列,那么线程在进行插入和获取操作时将遵循优先级顺序。
-
可选的等待超时:PriorityBlockingQueue的插入和获取操作可以选择设置等待超时时间。如果在指定的超时时间内无法进行操作,将返回特定的结果或抛出异常。
-
PriorityBlockingQueue主要用于按优先级顺序处理元素的场景。它适用于生产者-消费者模型中的任务调度,其中生产者生成具有不同优先级的任务,并将其插入到PriorityBlockingQueue中,消费者按照优先级顺序获取任务进行处理。
需要注意的是,PriorityBlockingQueue并不保证元素的顺序在并发环境下的完全一致性。如果有多个具有相同优先级的元素,它们在队列中的顺序可能不确定。如果需要严格的顺序控制,可能需要使用其他的并发数据结构或手动进行同步。
public class Main {
public static void main(String[] args) throws InterruptedException {
PriorityBlockingQueue<user> priorityBlockingQueue=new PriorityBlockingQueue<>();
priorityBlockingQueue.add(new user(12));
priorityBlockingQueue.add(new user(1123));
priorityBlockingQueue.add(new user(2));
priorityBlockingQueue.add(new user(14));
priorityBlockingQueue.add(new user(1123123));
System.out.println(priorityBlockingQueue.poll().getId());
System.out.println(priorityBlockingQueue.poll().getId());
System.out.println(priorityBlockingQueue.poll().getId());
System.out.println(priorityBlockingQueue.poll().getId());
System.out.println(priorityBlockingQueue.poll().getId());
System.out.println(priorityBlockingQueue.poll().getId());
System.out.println(priorityBlockingQueue.poll().getId());
}
}
class user implements Comparable<user>{
private int id;
public user(){
}
public user(int id){
this.id=id;
}
public int getId() {
return id;
}
public void setId(int id) {
this.id = id;
}
@Override
public int compareTo(user o) {
return this.id>o.id? 1:(this.id==o.id? 0:-1);
}
}
4. 类LinkedBlockingQueue的使用
类LinkedBlockingQueue和类ArrayBlockingQueue的功能大体上是一致的,都是有界的,都有阻塞特性。两者使用上的区别是类ArrayBlocking比类LinkedBlockingQueue运行效率快得多。LinkedBlockingQueue是Java集合框架中的一种阻塞队列实现,它基于链表(Linked List)实现,并提供了线程安全的操作和阻塞的特性。它是在Java 1.5中引入的,并位于java.util.concurrent包中。LinkedBlockingQueue的特点如下:
-
可选的有界队列:LinkedBlockingQueue可以选择创建有界队列或无界队列。如果在创建时指定了容量上限,那么它是一个有界队列;如果没有指定容量上限,则为无界队列。
-
先进先出(FIFO):LinkedBlockingQueue遵循先进先出的原则,即最早插入的元素会最早被获取。
-
线程安全性:LinkedBlockingQueue是线程安全的,多个线程可以同时进行插入和获取操作,而无需额外的同步操作。它使用内部的锁机制来保证线程安全性。
-
阻塞操作:LinkedBlockingQueue提供了阻塞的特性。当队列为空时,获取操作会被阻塞,直到队列中有可用元素。当队列已满时,插入操作会被阻塞,直到队列有空闲位置。
-
可选的等待超时:LinkedBlockingQueue的插入和获取操作可以选择设置等待超时时间。如果在指定的超时时间内无法进行操作,将返回特定的结果或抛出异常。
-
可选的公平性:LinkedBlockingQueue可以在构造时指定是否为公平队列。如果设置为公平队列,那么线程在进行插入和获取操作时将遵循先进先出的顺序。
-
LinkedBlockingQueue适用于多线程环境下的生产者-消费者模型,其中一个线程负责生产数据并插入队列,另一个线程负责从队列中获取数据进行消费。它可以提供可靠的数据传递和协调,避免了手动实现线程同步和等待/通知机制的复杂性。
需要注意的是,当使用无界队列时,由于队列没有容量上限,可能会占用较大的内存空间。而在使用有界队列时,当队列已满时插入操作将会被阻塞,这可能会对生产者的速度造成影响。因此,在选择使用LinkedBlockingQueue时,需要根据具体场景和需求进行权衡。
5. 类LinkedBlockingDeque的使用
LinkedBlockingDeque是Java集合框架中的一种双向阻塞队列实现,它基于链表(Linked List)实现,并提供了线程安全的操作和阻塞的特性。它是在Java 1.6中引入的,并位于java.util.concurrent包中。LinkedBlockingDeque的特点如下:
-
双向队列:LinkedBlockingDeque是一个双向队列,可以在队列的两端进行插入和获取操作。可以在队列的头部和尾部插入元素,也可以从头部和尾部获取元素。
-
先进先出(FIFO):LinkedBlockingDeque遵循先进先出的原则,即最早插入的元素会最早被获取。
-
线程安全性:LinkedBlockingDeque是线程安全的,多个线程可以同时进行插入和获取操作,而无需额外的同步操作。它使用内部的锁机制来保证线程安全性。
-
阻塞操作:LinkedBlockingDeque提供了阻塞的特性。当队列为空时,获取操作会被阻塞,直到队列中有可用元素。当队列已满时,插入操作会被阻塞,直到队列有空闲位置。
-
可选的等待超时:LinkedBlockingDeque的插入和获取操作可以选择设置等待超时时间。如果在指定的超时时间内无法进行操作,将返回特定的结果或抛出异常。
-
可选的公平性:LinkedBlockingDeque可以在构造时指定是否为公平队列。如果设置为公平队列,那么线程在进行插入和获取操作时将遵循先进先出的顺序。
-
LinkedBlockingDeque可以用于需要同时支持队列和栈操作的场景,以及需要双向数据传递和协调的多线程环境。它提供了更灵活的数据操作方式,既可以从队列的头部获取和插入元素,也可以从尾部获取和插入元素。
需要注意的是,当使用LinkedBlockingDeque时,同样需要根据具体的需求和场景选择是否使用有界队列或无界队列。无界队列可能会占用较大的内存空间,而有界队列可能会对生产者的速度造成影响。
6. 类SynChronousQueue的使用
SynchronousQueue是Java集合框架中的一种特殊的阻塞队列实现,它在内部不保持任何元素,用于进行线程间的直接传递。它是在Java 1.5中引入的,并位于java.util.concurrent包中。SynchronousQueue的特点如下:
-
无容量:SynchronousQueue在内部不保持任何元素,它不具备容量。与其他阻塞队列不同,SynchronousQueue没有固定的容量上限。
-
线程配对:SynchronousQueue用于在线程之间进行直接传递。一个线程在调用SynchronousQueue的插入方法(put()或offer())时,将会被阻塞,直到另一个线程调用了相应的获取方法(take()或poll())来接收元素。
-
线程安全性:SynchronousQueue是线程安全的,多个线程可以同时进行插入和获取操作,而无需额外的同步操作。它使用内部的锁机制来保证线程安全性。
-
阻塞操作:SynchronousQueue提供了阻塞的特性。当插入线程没有匹配的获取线程时,插入操作将会被阻塞。同样地,当获取线程没有匹配的插入线程时,获取操作将会被阻塞。
-
公平性:SynchronousQueue默认是非公平的,但可以在构造时指定是否为公平队列。如果设置为公平队列,那么线程在进行插入和获取操作时将遵循先进先出的顺序。
SynchronousQueue主要用于实现生产者和消费者之间的直接传递,而不是在队列中进行存储。当一个线程尝试插入元素时,它会等待另一个线程来获取这个元素。这种特性使得SynchronousQueue非常适合于实现一些任务分配和线程池的场景,可以实现更紧密的线程协作和任务调度。需要注意的是,使用SynchronousQueue时,插入和获取操作必须是成对出现的,否则会导致线程阻塞。此外,由于SynchronousQueue不保持元素,因此它不适用于缓存或存储数据的需求。
public class Main {
public static void main(String[] args) throws InterruptedException {
Myservice myservice=new Myservice();
ThreadPut threadPut=new ThreadPut(myservice);
ThreadTake threadTake=new ThreadTake(myservice);
threadTake.start();
Thread.sleep(2000);
threadPut.start();
}
}
class Myservice{
public static SynchronousQueue queue=new SynchronousQueue();
public void putMethod(){
try{
String putString="anyString"+Math.random();
System.out.println("put="+putString);
queue.put(putString);
}catch (InterruptedException e){
e.printStackTrace();
}
}
public void takeMethod(){
try{
System.out.println("take"+queue.take());
}catch (InterruptedException e){
e.printStackTrace();
}
}
}
class ThreadPut extends Thread{
private Myservice myservice;
public ThreadPut(Myservice myservice){
super();
this.myservice=myservice;
}
@Override
public void run(){
for (int i = 0; i < 10; i++) {
myservice.putMethod();
}
}
}
class ThreadTake extends Thread{
private Myservice myservice;
public ThreadTake(Myservice myservice){
super();
this.myservice=myservice;
}
@Override
public void run(){
for (int i = 0; i < 10; i++) {
myservice.takeMethod();
}
}
}
7. 类DelayQueue的使用
DelayQueue是Java集合框架中的一种特殊阻塞队列实现,它用于存储实现了Delayed接口的元素,并按照元素的延迟时间进行排序。它是在Java 1.5中引入的,并位于java.util.concurrent包中。DelayQueue的特点如下:
-
延迟元素:DelayQueue中的元素必须实现Delayed接口,该接口定义了一个getDelay(TimeUnit unit)方法,用于返回元素的剩余延迟时间。根据元素的延迟时间,DelayQueue会对元素进行排序。
-
线程安全性:DelayQueue是线程安全的,多个线程可以同时进行插入和获取操作,而无需额外的同步操作。它使用内部的锁机制来保证线程安全性。
-
阻塞操作:DelayQueue提供了阻塞的特性。当调用获取方法(take()或poll())时,如果队列为空,获取操作会被阻塞,直到有元素到达延迟时间。插入操作没有阻塞特性,元素会根据其延迟时间进行排序。
-
定时移除:DelayQueue提供了定时移除元素的功能。当一个元素的延迟时间到期时,它将可以从DelayQueue中被获取,即使其他线程还未显式地调用获取方法。
-
DelayQueue适用于需要延迟处理任务的场景,例如定时任务调度、缓存过期策略等。通过使用DelayQueue,可以将元素按照延迟时间的顺序进行排序,确保在到达指定延迟时间后才能获取和处理元素。这样可以实现更精确的任务调度和时间控制。
需要注意的是,DelayQueue并不保证元素的顺序在并发环境下的完全一致性。如果多个元素具有相同的延迟时间,它们在队列中的顺序可能不确定。如果需要严格的顺序控制,可能需要使用其他的并发数据结构或手动进行同步。
public class Main {
public static void main(String[] args) throws InterruptedException {
User user5=new User("中国5",5);
User user4=new User("中国4",4);
User user3=new User("中国3",3);
User user2=new User("中国2",2);
User user1=new User("中国1",1);
DelayQueue<User> queue=new DelayQueue<>();
queue.add(user5);
queue.add(user4);
queue.add(user3);
queue.add(user2);
queue.add(user1);
System.out.println(queue.take().getUsername()+" "+System.currentTimeMillis());
System.out.println(queue.take().getUsername()+" "+System.currentTimeMillis());
System.out.println(queue.take().getUsername()+" "+System.currentTimeMillis());
System.out.println(queue.take().getUsername()+" "+System.currentTimeMillis());
System.out.println(queue.take().getUsername()+" "+System.currentTimeMillis());
}
}
class Myservice{
public static SynchronousQueue queue=new SynchronousQueue();
public void putMethod(){
try{
String putString="anyString"+Math.random();
System.out.println("put="+putString);
queue.put(putString);
}catch (InterruptedException e){
e.printStackTrace();
}
}
public void takeMethod(){
try{
System.out.println("take"+queue.take());
}catch (InterruptedException e){
e.printStackTrace();
}
}
}
class User implements Delayed{
private String username;
private long runNanoTime;
public String getUsername() {
return username;
}
public User(String username, long runNanoTime)
{
this.username=username;
runNanoTime=System.nanoTime()+ TimeUnit.SECONDS.toNanos(runNanoTime);
}
@Override
public long getDelay(TimeUnit unit) {
return runNanoTime-System.nanoTime();
}
@Override
public int compareTo(Delayed o) {
User other=(User) o;
return this.runNanoTime>other.runNanoTime? 1:-1;
}
}
8. 类LinkedTransferQueue的使用
LinkedTransferQueue是Java集合框架中的一种特殊阻塞队列实现,它结合了队列和传输机制的特性。它是在Java 7中引入的,并位于java.util.concurrent包中。LinkedTransferQueue的特点如下:
-
链表结构:LinkedTransferQueue使用链表结构来存储元素,类似于LinkedBlockingQueue。它支持高效地插入和获取操作,并且不限制容量。
-
双模式:LinkedTransferQueue支持两种模式的操作:传输模式和队列模式。
-
传输模式:当一个线程调用transfer()方法时,它会等待另一个线程来接收该元素。如果没有其他线程等待接收,当前线程将被阻塞,直到有线程接收该元素。
-
队列模式:当一个线程调用put()或offer()方法时,它将把元素插入队列尾部,并继续执行其他操作。此时,元素可以被其他线程从队列中获取。
-
线程安全性:LinkedTransferQueue是线程安全的,多个线程可以同时进行插入、获取和传输操作,而无需额外的同步操作。它使用内部的锁机制来保证线程安全性。
-
阻塞操作:LinkedTransferQueue提供了阻塞的特性。在传输模式下,当一个线程调用transfer()方法时,如果没有其他线程等待接收,当前线程将被阻塞。在队列模式下,如果队列已满,插入操作将会被阻塞,直到队列有空闲位置。
-
LinkedTransferQueue适用于需要实现生产者和消费者之间直接传递数据的场景。与SynchronousQueue相比,
LinkedTransferQueue具有更多的灵活性和扩展性。它可以用于实现更复杂的数据交换模式,允许生产者和消费者之间进行双向的数据传递和交互。需要注意的是,使用LinkedTransferQueue时,传输模式和队列模式的选择应根据具体的需求和场景来决定。传输模式适用于需要确保数据被接收的情况,而队列模式适用于需要在队列中进行缓存和异步处理的情况。
public class Main {
public static void main(String[] args) throws InterruptedException {
LinkedTransferQueue<Integer> transferQueue = new LinkedTransferQueue<>();
// 添加元素
transferQueue.add(1);
transferQueue.add(2);
transferQueue.add(3);
// 演示传输模式
Thread transferThread = new Thread(() -> {
try {
System.out.println("Transfer thread is waiting to transfer element...");
transferQueue.transfer(4); // 阻塞直到元素被接收
System.out.println("Element transferred successfully");
} catch (InterruptedException e) {
e.printStackTrace();
}
});
transferThread.start();
// 演示队列模式
Thread queueThread = new Thread(() -> {
System.out.println("Queue thread is putting an element...");
transferQueue.put(5); // 阻塞直到队列有空闲位置
System.out.println("Element put successfully");
});
queueThread.start();
// 获取并处理元素
for (int i = 0; i < 5; i++) {
Integer element = transferQueue.take();
System.out.println("Processing element: " + element);
}
// 等待线程完成
transferThread.join();
queueThread.join();
}
}
在上述示例代码中,我们首先创建了一个LinkedTransferQueue并向其中添加了一些元素。然后,我们创建了两个线程来演示传输模式和队列模式。传输模式线程使用transfer()方法来传输一个元素,它会等待另一个线程来接收该元素。在我们的示例中,我们启动了一个传输线程,它将尝试传输元素4。由于没有其他线程等待接收,传输线程将被阻塞。队列模式线程使用put()方法将一个元素插入队列中。在我们的示例中,我们启动了一个队列线程,它将尝试将元素5放入队列中。由于队列已满,队列线程将被阻塞。接下来,我们使用take()方法从LinkedTransferQueue中获取并处理元素。在我们的示例中,我们循环5次,并输出处理的元素。最后,我们使用join()方法等待传输线程和队列线程完成。