本文介绍两种BlockingDeque在多线程任务处理时正确结束的方法
一般最开始简单的多线程处理任务过程
- 把总任务放入BlockingDeque
- 创建多个线程,每个线程内逻辑时,判断BlockingDeque任务是否处理完,处理完退出,还有任务就BlockingDeque.take()取任务处理
- 主线程join等待多线程处理完,收尾处理完成任务。
最开始版本代码,10个任务,3个线程来处理
package org.example;
import java.util.ArrayList;
import java.util.List;
import java.util.concurrent.BlockingDeque;
import java.util.concurrent.LinkedBlockingDeque;
public class Main3 {
public static void main(String[] args){
System.out.println("start");
BlockingDeque<Integer> task = new LinkedBlockingDeque<>();
for (int i = 0; i < 10; i++) {
task.add(i);
}
List<Thread> workers = new ArrayList<>();
for (int i = 0; i < 3; i++) {
Thread worker = new Thread(()->{
while (true) {
Integer data = null;
try {
if (task.size()==0) {
System.out.println(Thread.currentThread().getName() +" quit");
break;
}
// Thread.sleep(100); // 默认任务耗时
data = task.take();
} catch (InterruptedException e) {
throw new RuntimeException(e);
}
System.out.println(Thread.currentThread().getName() +" do "+ data);
}
});
workers.add(worker);
worker.start();
}
for (Thread worker: workers) {
try {
worker.join();
} catch (InterruptedException e) {
throw new RuntimeException(e);
}
}
System.out.println("job done");
}
}
运行之后,感觉非常好,完美实现逻辑
但是当把上面的任务数加到200,线程数加到30,上面线程sleep的注释打开,再次运行,就会发现主进程最后会被一直卡着不结束,说明多线程没有正确判断任务结束,线程不安全
上面的子线程内的size()等于0到下面的BlockingDeque.take()取任务这段之间的代码,这段不是线程安全的
让线程正确判断任务结束,而且要线程安全的三种方法,推荐第二种,兼顾效率和兼容正确性
- 判断任务结束这段代码加synchronized约束起来,实现线程安全(太慢)
- 给总任务task内,加入和线程相同数量的停止标志marker
- 使用BlockingDeque.poll(超时时间) + 异常数据检查(需要检查异常数据)
使用synchronized约束
package org.example;
import java.util.ArrayList;
import java.util.List;
import java.util.concurrent.BlockingDeque;
import java.util.concurrent.LinkedBlockingDeque;
public class Main {
public static void main(String[] args) {
System.out.println("start");
BlockingDeque<Integer> task = new LinkedBlockingDeque<>();
for (int i = 0; i < 20; i++) {
task.add(i);
}
List<Thread> workers = new ArrayList<>();
for (int i = 0; i < 3; i++) {
Thread worker = new Thread(()->{
while (true) {
Integer data = null;
synchronized (task) {
if (task.size() ==0) {
System.out.println(Thread.currentThread().getName() +" quit");
break;
}
try {
data = task.take();
} catch (InterruptedException e) {
throw new RuntimeException(e);
}
}
try {
Thread.sleep(300);
} catch (InterruptedException e) {
throw new RuntimeException(e);
}
System.out.println(Thread.currentThread().getName() +" do "+ data);
}
});
workers.add(worker);
worker.start();
}
for (Thread worker: workers) {
try {
worker.join();
} catch (InterruptedException e) {
throw new RuntimeException(e);
}
}
System.out.println("job done");
}
}
总任务添加stop marker停止标志
package org.example;
import java.util.ArrayList;
import java.util.List;
import java.util.concurrent.BlockingDeque;
import java.util.concurrent.LinkedBlockingDeque;
public class Main2 {
public static void main(String[] args){
System.out.println("start");
BlockingDeque<Integer> task = new LinkedBlockingDeque<>();
for (int i = 0; i < 20; i++) {
task.add(i);
}
List<Thread> workers = new ArrayList<>();
for (int i = 0; i < 3; i++) task.add(99);
for (int i = 0; i < 3; i++) {
Thread worker = new Thread(()->{
while (true) {
Integer data = null;
try {
data = task.take();
if (data == 99) {
System.out.println(Thread.currentThread().getName() +" quit");
break;
}
} catch (InterruptedException e) {
throw new RuntimeException(e);
}
try {
Thread.sleep(300);
} catch (InterruptedException e) {
throw new RuntimeException(e);
}
System.out.println(Thread.currentThread().getName() +" do "+ data);
}
});
workers.add(worker);
worker.start();
}
for (Thread worker: workers) {
try {
worker.join();
} catch (InterruptedException e) {
throw new RuntimeException(e);
}
}
System.out.println("job done");
}
}
使用BlockingDeque.poll(超时时间),避免了take的永久性等待问题,但是会取到null值,要加判断处理
package org.example;
import java.util.ArrayList;
import java.util.List;
import java.util.concurrent.BlockingDeque;
import java.util.concurrent.LinkedBlockingDeque;
import java.util.concurrent.TimeUnit;
public class Main4 {
public static void main(String[] args){
System.out.println("start");
BlockingDeque<Integer> task = new LinkedBlockingDeque<>();
for (int i = 0; i < 200; i++) {
task.add(i);
}
List<Thread> workers = new ArrayList<>();
for (int i = 0; i < 30; i++) {
Thread worker = new Thread(()->{
while (true) {
Integer data = null;
try {
if (task.size()==0) {
System.out.println(Thread.currentThread().getName() +" quit");
break;
}
Thread.sleep(100); // 默认任务耗时
data = task.poll(1000, TimeUnit.MILLISECONDS);
if (data == null) {
System.out.println(Thread.currentThread().getName() +" get null");
continue;
}
} catch (InterruptedException e) {
throw new RuntimeException(e);
}
System.out.println(Thread.currentThread().getName() +" do "+ data);
}
});
workers.add(worker);
worker.start();
}
for (Thread worker: workers) {
try {
worker.join();
} catch (InterruptedException e) {
throw new RuntimeException(e);
}
}
System.out.println("job done");
}
}