1 概述
非阻塞队列的特色是队列里面没有数据时,返回异常或null。在JDK的并发包中,常见的非阻塞队列有:ConcurrentHashMap、ConcurrentSkipListMap、ConcurrentSkipListSet、ConcurrentLinkedQueue、ConcurrentLinkedDeque、CopyOnWriteArrayList、CopyOnWriteArraySet。本篇将介绍这7个非阻塞队列的特点与使用。
2 ConcurrentHashMap类的使用
2.1 验证HashMap不是线程安全的
public class MyService1 {
public HashMap map = new HashMap();
public void testMethod(){
for (int i = 0; i < 5000; i++) {
map.put(Thread.currentThread().getName()+" " + (i+1),Thread.currentThread().getName()+" " + (i+1));
System.out.println(Thread.currentThread().getName() + " " + (i+1));
}
}
}
public class Thread1 extends Thread{
private MyService1 service1;
public Thread1(MyService1 service1) {
this.service1 = service1;
}
@Override
public void run(){
service1.testMethod();
}
}
public class Thread2 extends Thread{
private MyService1 service1;
public Thread2(MyService1 service1) {
this.service1 = service1;
}
@Override
public void run(){
service1.testMethod();
}
}
public class Run1 {
public static void main(String[] args) {
MyService1 service1 = new MyService1();
Thread1 t1 = new Thread1(service1);
t1.start();
}
}
如果只创建一个线程Thread1,那么运行结果是正确的。此时控制台运行结果是:
在创建一个线程Thread2,运行结果是:
public class Run1 {
public static void main(String[] args) {
MyService1 service1 = new MyService1();
Thread1 t1 = new Thread1(service1);
Thread2 t2 = new Thread2(service1);
t1.start();
t2.start();
}
}
程序运行后会有很小的概率出现异常(笔者试了很多次没有出现-_-||,读者请自行实验),说明hashMap不能被多个线程操作,也就证明HashMap是非线程安全的。
2.2 验证HashTable是线程安全的
public class MyService1 {
public Hashtable hashtable = new Hashtable();
public void method(){
for (int i = 0; i < 50000; i++) {
hashtable.put(Thread.currentThread().getName()+ " "+(i+1),
Thread.currentThread().getName()+ " "+(i+1));
System.out.println(Thread.currentThread().getName() + " " + (i+1));
}
}
}
public class Thread2 extends Thread{
private MyService1 service1;
public Thread2(MyService1 service1) {
this.service1 = service1;
}
@Override
public void run(){
service1.method();
}
}
public class Thread1 extends Thread{
private MyService1 service1;
public Thread1(MyService1 service1) {
this.service1 = service1;
}
@Override
public void run(){
service1.method();
}
}
public class Run1 {
public static void main(String[] args) {
MyService1 service1 = new MyService1();
Thread1 t1 = new Thread1(service1);
Thread1 t2 = new Thread1(service1);
t1.start();
t2.start();
}
}
程序运行正确,证明HashTable类在多线程环境中执行put操作不会出错,是线程安全的类。但是,多个线程分别调用该类的iteartor()方法返回Iterator对象,并调用next()方法取得元素,再执行remove()方法时会出现修改并发修改异常,说明HashTable不支持Iterator并发删除。
2.3 验证ConcurrentHashMap线程安全
ConcurrentHashMap类是JDK并发包中提供的支持并发操作的Map对象。其继承与实现信息如下:
public class ConcurrentHashMap<K,V> extends AbstractMap<K,V>
implements ConcurrentMap<K,V>, Serializable {
private static final long serialVersionUID = 7249069246763182397L;
}
下面开始验证:
public class MyService1 {
public ConcurrentHashMap map = new ConcurrentHashMap();
public void method(){
for (int i = 0; i < 50000; i++) {
map.put(Thread.currentThread().getName()+ " "+(i+1),
Thread.currentThread().getName()+ " "+(i+1));
System.out.println(Thread.currentThread().getName() + " " + (i+1));
}
}
}
public class Thread2 extends Thread{
private MyService1 service1;
public Thread2(MyService1 service1) {
this.service1 = service1;
}
@Override
public void run(){
service1.method();
}
}
public class Thread1 extends Thread{
private MyService1 service1;
public Thread1(MyService1 service1) {
this.service1 = service1;
}
@Override
public void run(){
service1.method();
}
}
public class Run1 {
public static void main(String[] args) {
MyService1 service1 = new MyService1();
Thread1 t1 = new Thread1(service1);
Thread1 t2 = new Thread1(service1);
t1.start();
t2.start();
}
}
2.4 验证ConcurrentHashMap并发删除
public class MyService2 {
public ConcurrentHashMap map = new ConcurrentHashMap();
public MyService2() {
for (int i = 0; i < 50000; i++) {
map.put(Thread.currentThread().getName() + (i+1),"abc");
}
}
public void test(){
Iterator iterator = map.keySet().iterator();
while(iterator.hasNext()){
Object next = iterator.next();
iterator.remove();
System.out.println(map.size() + " " + Thread.currentThread().getName());
}
}
}
public class Thread1 extends Thread{
public MyService2 myService2;
public Thread1(MyService2 myService2) {
this.myService2 = myService2;
}
@Override
public void run(){
myService2.test();
}
}
public class Thread2 extends Thread{
public MyService2 service2;
public Thread2(MyService2 service2) {
this.service2 = service2;
}
@Override
public void run(){
service2.test();
}
}
public class Run2 {
public static void main(String[] args) {
MyService2 myService2 = new MyService2();
Thread1 t1 = new Thread1(myService2);
Thread2 t2 = new Thread2(myService2);
t1.start();
t2.start();
}
}
从运行结果看,ConcurrentHashMap在并发情况下支持put和remove。ConcurrentHashMap不支持排序,LinkedHashMap支持key排序,但不支持并发。如果出现既要求并发又要求排序的情况,就可以使用ConcurrentSkipListMap类。
3 ConcurrentSkipListMap类的使用
ConcurrentSkipListMap支持排序。
public class UserInfo implements Comparable<UserInfo>{
private int id;
private String username;
public UserInfo(int id, String username) {
this.id = id;
this.username = username;
}
public int getId() {
return id;
}
public void setId(int id) {
this.id = id;
}
public String getUsername() {
return username;
}
public void setUsername(String username) {
this.username = username;
}
@Override
public int compareTo(UserInfo o) {
if(this.getId() > o.getId()){
return 1;
}else{
return -1;
}
}
}
public class MyService {
public ConcurrentSkipListMap<UserInfo,String> map = new ConcurrentSkipListMap<>();
public MyService(){
UserInfo userInfo1 = new UserInfo(1,"userinfo1");
UserInfo userInfo3 = new UserInfo(3,"userinfo1");
UserInfo userInfo5 = new UserInfo(5,"userinfo1");
UserInfo userInfo2 = new UserInfo(2,"userinfo1");
UserInfo userInfo4 = new UserInfo(4,"userinfo1");
map.put(userInfo1,"u1");
map.put(userInfo3,"u3");
map.put(userInfo5,"u5");
map.put(userInfo2,"u2");
map.put(userInfo4,"u4");
}
public void method(){
Map.Entry<UserInfo, String> entry = map.pollFirstEntry();
System.out.println("map size() = " + map.size());
UserInfo userInfo = entry.getKey();
System.out.println(userInfo.getId() + " " + userInfo.getUsername() + " " + map.get(userInfo) + " " +entry.getValue());
}
}
public class MyThread extends Thread{
private MyService service;
public MyThread(MyService service) {
this.service = service;
}
@Override
public void run(){
service.method();
}
}
public class Run1 {
public static void main(String[] args) throws InterruptedException {
MyService service = new MyService();
MyThread t1 = new MyThread(service);
MyThread t2 = new MyThread(service);
MyThread t3 = new MyThread(service);
MyThread t4 = new MyThread(service);
MyThread t5 = new MyThread(service);
t1.start();
Thread.sleep(1000);
t2.start();
Thread.sleep(1000);
t3.start();
Thread.sleep(1000);
t4.start();
Thread.sleep(1000);
t5.start();
}
}
控制台打印出null的值是使用pollFirstEntry()方法将当前的Entry对象从类ConcurrentSkipListMap中删除造成的。
4 ConcurrentSkipListSet类的使用
ConcurrentSkipListSet 类支持排序且不允许元素重复。
public class UserInfo implements Comparable<UserInfo> {
private int id;
private String username;
public UserInfo(int id, String username) {
this.id = id;
this.username = username;
}
public int getId() {
return id;
}
public void setId(int id) {
this.id = id;
}
public String getUsername() {
return username;
}
public void setUsername(String username) {
this.username = username;
}
@Override
public int compareTo(UserInfo userInfo){
if(this.getId() < userInfo.getId()){
return -1;
}
if(this.getId() > userInfo.getId()){
return 1;
}
return 0;
}
@Override
public boolean equals(Object o) {
if (this == o) return true;
if (o == null || getClass() != o.getClass()) return false;
UserInfo userInfo = (UserInfo) o;
return id == userInfo.id && Objects.equals(username, userInfo.username);
}
@Override
public int hashCode() {
return Objects.hash(id, username);
}
}
public class MyService {
public ConcurrentSkipListSet set = new ConcurrentSkipListSet();
public MyService(){
UserInfo userInfo1 = new UserInfo(1,"username1");
UserInfo userInfo3 = new UserInfo(3,"username3");
UserInfo userInfo5 = new UserInfo(5,"username5");
UserInfo userInfo41 = new UserInfo(4,"username4");
UserInfo userInfo42 = new UserInfo(4,"username4");
UserInfo userInfo2 = new UserInfo(2,"username2");
set.add(userInfo1);
set.add(userInfo3);
set.add(userInfo5);
set.add(userInfo41);
set.add(userInfo42);
set.add(userInfo2);
}
}
public class MyThread extends Thread{
private MyService service;
public MyThread(MyService service) {
this.service = service;
}
@Override
public void run(){
UserInfo userInfo = (UserInfo) service.set.pollFirst();
System.out.println(userInfo.getId() + " " + userInfo.getUsername());
}
}
public class Run1 {
public static void main(String[] args) throws InterruptedException {
MyService service = new MyService();
MyThread t1 = new MyThread(service);
MyThread t2 = new MyThread(service);
MyThread t3 = new MyThread(service);
MyThread t4 = new MyThread(service);
MyThread t5 = new MyThread(service);
t1.start();
Thread.sleep(1000);
t2.start();
Thread.sleep(1000);
t3.start();
Thread.sleep(1000);
t4.start();
Thread.sleep(1000);
t5.start();
}
}
从运行结果可以看到,排序成功,并且不支持数据重复。
5 ConcurrentLinkedQueue类的使用
ConcurrentLinkedQueue类提供了并发环境下的队列操作。
public class MyService {
public ConcurrentLinkedDeque queue = new ConcurrentLinkedDeque();
}
public class ThreadA extends Thread{
private MyService service;
public ThreadA(MyService service) {
this.service = service;
}
@Override
public void run(){
for (int i = 0; i < 50; i++) {
service.queue.add("threadA" + (i +1));
}
}
}
public class ThreadB extends Thread{
private MyService service;
public ThreadB(MyService service) {
this.service = service;
}
@Override
public void run(){
for (int i = 0; i < 50; i++) {
service.queue.add("ThreadB"+ (i+1));
}
}
}
public class Run1 {
public static void main(String[] args) throws InterruptedException {
MyService service = new MyService();
ThreadA a = new ThreadA(service);
ThreadB b = new ThreadB(service);
a.start();
b.start();
a.join();
b.join();
System.out.println(service.queue.size());
}
}