工作时,时常会遇到,线程相关的问题与解法,本人会持续对开发过程中遇到的关于线程相关的问题及解决记录更新记录在此篇博客中。
目录
一、线程基本知识
1. 线程和进程
二、问题与解法
1. 避免乘法级别数量线程并行
1)使用线程池
2)任务队列
3)限制并发任务数量
4)任务分批处理
2. 避免函数被重复调用
1)使用标志位
2)使用同步锁
3)使用AOP和缓存
4)使用消息队列
5)使用限流算法
一、线程基本知识
1. 线程与进程
进程是程序的一次执行过程,是系统运行程序的基本单位,因此进程是动态的。系统运行一个程序即是一个进程从创建,运行到消亡的过程。例如,任务管理器显示的就是进程:
线程是比进程更小的执行单位。一个进程在其执行过程中可以产生多个线程。与进程不同的是同类的多个线程共享堆和方法区资源,但每个线程有自己的程序计数器、虚拟机栈和本地方法栈。
JDK 1.2之前,Java线程是基于绿色线程实现的,这是一种用户级线程。即JVM自己模拟了多线程的运行,而不依赖于操作系统。由于绿色线程和原生线程比起来的在使用时有一些限制(比如,绿色线程不能直接使用操作系统提供的功能如异步 I/O、只能在一个内核线程上运行无法利用多核);在JDK 1.2之后,Java线程改为使用原生线程来实现。也就是说 JVM 直接使用操作系统原生的内核级线程(内核线程)来实现 Java 线程,由操作系统内核进行线程的调度和管理。
- 用户线程:由用户空间程序管理和调度的线程,运行在用户空间(专门给应用程序使用)。
- 内核线程:由操作系统内核管理和调度的线程,运行在内核空间(只有内核程序可以访问)。
顺便简单总结一下用户线程和内核线程的区别和特点:用户线程创建和切换成本低,但不可以利用多核。内核态线程,创建和切换成本高,可以利用多核。现在的 Java 线程的本质其实就是操作系统的线程。
线程模型是用户线程和内核线程之间的关联方式,常见的线程模型有这三种:
- 一对一(一个用户线程对应一个内核线程)
- 多对一(多个用户线程映射到一个内核线程)
- 多对多(多个用户线程映射到多个内核线程)
在 Windows 和 Linux 等主流操作系统中,Java 线程采用的是一对一的线程模型,也就是一个 Java 线程对应一个系统内核线程。
一个进程中可以有多个线程,多个线程共享进程的堆和方法区(JDK1.8 之后的元空间)资源,但是每个线程有自己的程序计数器、虚拟机栈 和 本地方法栈。
- 程序计数器为什么私有:程序计数器是为了线程切换后,能够回到正确的执行位置。
- 虚拟机栈为什么私有:每个Java栈帧在执行之前会创建一个栈帧用于存储局部变量表、操作数栈、常量池引用等信息。从方法调用直至完成的过程,对应着一个栈帧在Java虚拟机栈中入栈和出栈的过程。
- 本地方法栈为什么私有:和虚拟机栈所发挥的作用非常相似,区别是:虚拟机栈为虚拟机执行 Java 方法 (也就是字节码)服务,而本地方法栈则为虚拟机使用到的 Native 方法服务。 在 HotSpot 虚拟机中和 Java 虚拟机栈合二为一。
堆和方法区是所有线程共享的资源,其中堆是进程中最大的一块内存,主要用于存放新创建的对象(几乎所有对象都在这里分配内存),方法区主要用于存放已被加载的类信息、常量、静态变量、即时编译器编译后的代码等数据。
二、问题与解法
1. 避免乘法级别数量线程并行
当一个多线程函数被重复调用,就造成了并行线程数量成为乘法级别。
这种情况通常不是故意的,而是不小心。
一旦发生这种情况,需要及时排查,不然线程太多、资源混乱,服务器上其他服务请求被搁置,就变成了事故。
例如:一个大批量执行、耗时、数据量大的任务。现在使用三个线程、并行生成结果。这个函数 每三分钟执行一次。可能下一次调用开始时,旧的线程还没执行结束。
count (并行线程数)= m(调用次数) * n(单词调用线程数)
乘法级别的线程在处理这些数据,不仅不能提高处理效率、反而由于线程太多造成了资源混乱、大量资源浪费。导致正常的请求无法及时被相应,影响到服务器上其他服务的正常使用。甚至可能导致服务器宕机。
可以使用线程池、任务队列、限制并发任务数量、任务分批处理来解决。
1)使用线程池
线程池可以有效管理线程的生命周期,避免频繁创建和销毁线程带来的开销,并且可以限制并发线程的数量。Java提供了 ExecutorService 来实现线程池。
注意,线程池多线程调用也会出现乘法级别数量的线程
示例代码:
import java.util.concurrent.*;
public class TaskScheduler {
private final ScheduledExecutorService scheduler = Executors.newScheduledThreadPool(1);
private final ExecutorService threadPool = Executors.newFixedThreadPool(3);
public void scheduleTask(Runnable task) {
scheduler.scheduleAtFixedRate(() -> {
try {
// 提交任务到线程池
for (int i = 0; i < 3; i++) {
threadPool.submit(task);
}
} catch (RejectedExecutionException e) {
// 处理线程池拒绝任务的情况
System.out.println("Task rejected, possibly due to shutdown or overload.");
}
}, 0, 3, TimeUnit.MINUTES);
}
public void shutdown() {
scheduler.shutdown();
threadPool.shutdown();
}
public static void main(String[] args) {
TaskScheduler scheduler = new TaskScheduler();
Runnable task = () -> {
// 你的任务逻辑
System.out.println("Task executed by " + Thread.currentThread().getName());
};
scheduler.scheduleTask(task);
// 程序结束时关闭线程池
Runtime.getRuntime().addShutdownHook(new Thread(scheduler::shutdown));
}
}
优点:
-
限制线程数量:通过固定大小的线程池,可以避免线程过多导致的资源竞争。
-
任务调度:
ScheduledExecutorService
可以定时执行任务,避免任务重叠。 -
优雅关闭:通过
shutdown()
方法可以优雅地关闭线程池。
2)任务队列
如果任务的执行时间不确定,可以使用任务队列来管理任务。线程池会从任务队列中获取任务并执行。
import java.util.concurrent.*;
public class TaskQueueExample {
private final ExecutorService threadPool = Executors.newFixedThreadPool(3);
private final BlockingQueue<Runnable> taskQueue = new LinkedBlockingQueue<>();
public void addTask(Runnable task) {
try {
taskQueue.put(task);
} catch (InterruptedException e) {
Thread.currentThread().interrupt();
}
}
public void startProcessing() {
for (int i = 0; i < 3; i++) {
threadPool.submit(() -> {
while (!Thread.currentThread().isInterrupted()) {
try {
Runnable task = taskQueue.take();
task.run();
} catch (InterruptedException e) {
Thread.currentThread().interrupt();
}
}
});
}
}
public void shutdown() {
threadPool.shutdown();
}
public static void main(String[] args) throws InterruptedException {
TaskQueueExample example = new TaskQueueExample();
example.startProcessing();
// 模拟添加任务
for (int i = 0; i < 10; i++) {
example.addTask(() -> {
System.out.println("Task executed by " + Thread.currentThread().getName());
try {
Thread.sleep(1000); // 模拟耗时任务
} catch (InterruptedException e) {
Thread.currentThread().interrupt();
}
});
}
// 程序结束时关闭线程池
Runtime.getRuntime().addShutdownHook(new Thread(example::shutdown));
}
}
优点:
-
任务排队:任务会被放入队列,按顺序执行,避免任务重叠。
-
线程复用:线程池中的线程会复用,减少线程创建和销毁的开销。
3)限制并发任务数量
如果任务的执行时间较长,可以限制同时执行的任务数量。例如,每次只允许 3 个任务并发执行。
示例代码:
import java.util.concurrent.*;
public class LimitedConcurrencyExample {
private final Semaphore semaphore = new Semaphore(3);
private final ExecutorService threadPool = Executors.newFixedThreadPool(3);
public void addTask(Runnable task) {
threadPool.submit(() -> {
try {
semaphore.acquire(); // 获取许可
task.run();
} catch (InterruptedException e) {
Thread.currentThread().interrupt();
} finally {
semaphore.release(); // 释放许可
}
});
}
public void shutdown() {
threadPool.shutdown();
}
public static void main(String[] args) {
LimitedConcurrencyExample example = new LimitedConcurrencyExample();
for (int i = 0; i < 10; i++) {
example.addTask(() -> {
System.out.println("Task executed by " + Thread.currentThread().getName());
try {
Thread.sleep(1000); // 模拟耗时任务
} catch (InterruptedException e) {
Thread.currentThread().interrupt();
}
});
}
// 程序结束时关闭线程池
Runtime.getRuntime().addShutdownHook(new Thread(example::shutdown));
}
}
优点:
-
限制并发:通过
Semaphore
限制同时执行的任务数量。 -
线程复用:线程池中的线程会复用,减少线程创建和销毁的开销。
4)任务分批处理
如果任务数据量很大,可以将任务分批处理,每批任务由一个线程处理。
示例代码:
import java.util.concurrent.*;
public class BatchTaskExample {
private final ExecutorService threadPool = Executors.newFixedThreadPool(3);
public void processTasks(List<Runnable> tasks) {
int batchSize = tasks.size() / 3;
for (int i = 0; i < 3; i++) {
int start = i * batchSize;
int end = (i == 2) ? tasks.size() : start + batchSize;
threadPool.submit(() -> {
for (int j = start; j < end; j++) {
tasks.get(j).run();
}
});
}
}
public void shutdown() {
threadPool.shutdown();
}
public static void main(String[] args) {
BatchTaskExample example = new BatchTaskExample();
List<Runnable> tasks = new ArrayList<>();
for (int i = 0; i < 10; i++) {
tasks.add(() -> {
System.out.println("Task executed by " + Thread.currentThread().getName());
try {
Thread.sleep(1000); // 模拟耗时任务
} catch (InterruptedException e) {
Thread.currentThread().interrupt();
}
});
}
example.processTasks(tasks);
// 程序结束时关闭线程池
Runtime.getRuntime().addShutdownHook(new Thread(example::shutdown));
}
}
优点:
-
分批处理:将任务分批处理,避免任务重叠。
-
线程复用:线程池中的线程会复用,减少线程创建和销毁的开销。
总结
根据你的需求,可以选择以下方案:
-
使用线程池:限制线程数量,避免线程过多导致的资源竞争。
-
任务队列:将任务放入队列,按顺序执行,避免任务重叠。
-
限制并发任务数量:通过
Semaphore
限制同时执行的任务数量。 -
任务分批处理:将任务分批处理,每批任务由一个线程处理。
选择合适的方案可以有效解决线程过多导致的问题,提高任务执行的效率。
2. 避免函数被重复调用
Java中,可以通过多种方式避免函数被重复调用。以下是常见的方法
1)使用标志位
函数调用前,设置一个标志位表示该函数是否已经被调用过。如果已经被调用多,则不再重复调用。
public class FunctionCall {
private static boolean isCalled = false;
public static void function() {
if (isCalled) {
System.out.println("Function is already being called.");
return;
}
isCalled = true;
try {
// Function logic here
System.out.println("Function is being called.");
} finally {
isCalled = false;
}
}
public static void main(String[] args) {
new Thread(() -> function()).start();
new Thread(() -> function()).start();
}
}
2)使用同步锁
使用同步锁(如 synchronized
或 ReentrantLock
)来确保函数在同一时间只能被一个线程调用。
import java.util.concurrent.locks.ReentrantLock;
public class FunctionCall {
private static final ReentrantLock lock = new ReentrantLock();
public static void function() {
lock.lock();
try {
// Function logic here
System.out.println("Function is being called.");
} finally {
lock.unlock();
}
}
public static void main(String[] args) {
new Thread(() -> function()).start();
new Thread(() -> function()).start();
}
}
3)使用AOP和缓存
通过AOP(面向切面编程)和缓存来检测函数是否正在被调用。如果函数正在被调用,则不再重复调用。
import org.aspectj.lang.ProceedingJoinPoint;
import org.aspectj.lang.annotation.Around;
import org.aspectj.lang.annotation.Aspect;
import org.aspectj.lang.annotation.Pointcut;
import org.springframework.cache.Cache;
import org.springframework.cache.CacheManager;
import org.springframework.cache.annotation.EnableCaching;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
import org.springframework.cache.concurrent.ConcurrentMapCacheManager;
@Configuration
@EnableCaching
public class CacheConfig {
@Bean
public CacheManager cacheManager() {
return new ConcurrentMapCacheManager("functionCache");
}
}
@Aspect
public class FunctionCallAspect {
private final CacheManager cacheManager;
public FunctionCallAspect(CacheManager cacheManager) {
this.cacheManager = cacheManager;
}
@Pointcut("execution(* com.example.FunctionCall.function(..))")
public void functionCallPointcut() {}
@Around("functionCallPointcut()")
public Object aroundFunctionCall(ProceedingJoinPoint joinPoint) throws Throwable {
Cache cache = cacheManager.getCache("functionCache");
if (cache != null && cache.get("functionKey") != null) {
System.out.println("Function is already being called.");
return null;
}
cache.put("functionKey", "inProgress");
try {
return joinPoint.proceed();
} finally {
cache.evict("functionKey");
}
}
}
public class FunctionCall {
public static void function() {
// Function logic here
System.out.println("Function is being called.");
}
public static void main(String[] args) {
new Thread(() -> function()).start();
new Thread(() -> function()).start();
}
}
4)使用消息队列
将函数调用请求放入消息队列中,然后由后台服务从队列中取出并处理,避免了函数的重复调用。
import java.util.concurrent.BlockingQueue;
import java.util.concurrent.LinkedBlockingQueue;
public class FunctionCall {
private static final BlockingQueue<Runnable> queue = new LinkedBlockingQueue<>();
public static void function() {
queue.add(() -> {
// Function logic here
System.out.println("Function is being called.");
});
}
public static void main(String[] args) {
new Thread(() -> {
while (true) {
try {
Runnable task = queue.take();
task.run();
} catch (InterruptedException e) {
Thread.currentThread().interrupt();
}
}
}).start();
new Thread(() -> function()).start();
new Thread(() -> function()).start();
}
}
5)使用限流算法
通过限流算法(如漏桶算法或令牌桶算法)来控制函数的调用频率,避免函数的重复调用。
import java.util.concurrent.Semaphore;
public class FunctionCall {
private static final Semaphore semaphore = new Semaphore(1);
public static void function() {
try {
semaphore.acquire();
// Function logic here
System.out.println("Function is being called.");
} catch (InterruptedException e) {
Thread.currentThread().interrupt();
} finally {
semaphore.release();
}
}
public static void main(String[] args) {
new Thread(() -> function()).start();
new Thread(() -> function()).start();
}
}
持续更新ing,推荐博客:
- Java 面试指南 | JavaGuide