1. 说一下线程池的核心参数(线程池的执行原理知道嘛)?
线程池核心参数主要参考ThreadPoolExecutor
这个类的7个参数的构造函数
-
corePoolSize
核心线程数目 -
maximumPoolSize
最大线程数目 = (核心线程+救急线程的最大数目) -
keepAliveTime
生存时间 - 救急线程的生存时间,生存时间内没有新任务,此线程资源会释放 -
unit
时间单位 - 救急线程的生存时间单位,如秒、毫秒等 -
workQueue
- 当没有空闲核心线程时,新来任务会加入到此队列排队,队列满会创建救急线程执行任务 -
threadFactory
线程工厂 - 可以定制线程对象的创建,例如设置线程名字、是否是守护线程等 -
handler
拒绝策略 - 当所有线程都在繁忙,workQueue
也放满时,会触发拒绝策略
工作流程
1,任务在提交的时候,首先判断核心线程数是否已满,如果没有满则直接添加到工作线程执行
2,如果核心线程数满了,则判断阻塞队列是否已满,如果没有满,当前任务存入阻塞队列
3,如果阻塞队列也满了,则判断线程数是否小于最大线程数,如果满足条件,则使用临时线程执行任务
如果核心或临时线程执行完成任务后会检查阻塞队列中是否有需要执行的线程,如果有,则使用非核心线程执行任务
4,如果所有线程都在忙着(核心线程+临时线程),则走拒绝策略
拒绝策略:
AbortPolicy
:直接抛出异常,默认策略;CallerRunsPolicy
:用调用者所在的线程来执行任务;DiscardOldestPolicy
:丢弃阻塞队列中靠最前的任务,并执行当前任务;DiscardPolicy
:直接丢弃任务;
参考代码:
public class TestThreadPoolExecutor {
static class MyTask implements Runnable {
private final String name;
private final long duration;
public MyTask(String name) {
this(name, 0);
}
public MyTask(String name, long duration) {
this.name = name;
this.duration = duration;
}
@Override
public void run() {
try {
LoggerUtils.get("myThread").debug("running..." + this);
Thread.sleep(duration);
} catch (InterruptedException e) {
e.printStackTrace();
}
}
@Override
public String toString() {
return "MyTask(" + name + ")";
}
}
public static void main(String[] args) throws InterruptedException {
AtomicInteger c = new AtomicInteger(1);
ArrayBlockingQueue<Runnable> queue = new ArrayBlockingQueue<>(2);
ThreadPoolExecutor threadPool = new ThreadPoolExecutor(
2,
3,
0,
TimeUnit.MILLISECONDS,
queue,
r -> new Thread(r, "myThread" + c.getAndIncrement()),
new ThreadPoolExecutor.AbortPolicy());
showState(queue, threadPool);
threadPool.submit(new MyTask("1", 3600000));
showState(queue, threadPool);
threadPool.submit(new MyTask("2", 3600000));
showState(queue, threadPool);
threadPool.submit(new MyTask("3"));
showState(queue, threadPool);
threadPool.submit(new MyTask("4"));
showState(queue, threadPool);
threadPool.submit(new MyTask("5",3600000));
showState(queue, threadPool);
threadPool.submit(new MyTask("6"));
showState(queue, threadPool);
}
private static void showState(ArrayBlockingQueue<Runnable> queue, ThreadPoolExecutor threadPool) {
try {
Thread.sleep(300);
} catch (InterruptedException e) {
e.printStackTrace();
}
List<Object> tasks = new ArrayList<>();
for (Runnable runnable : queue) {
try {
Field callable = FutureTask.class.getDeclaredField("callable");
callable.setAccessible(true);
Object adapter = callable.get(runnable);
Class<?> clazz = Class.forName("java.util.concurrent.Executors$RunnableAdapter");
Field task = clazz.getDeclaredField("task");
task.setAccessible(true);
Object o = task.get(adapter);
tasks.add(o);
} catch (Exception e) {
e.printStackTrace();
}
}
LoggerUtils.main.debug("pool size: {}, queue: {}", threadPool.getPoolSize(), tasks);
}
}
2. 线程池中有哪些常见的阻塞队列?
workQueue
- 当没有空闲核心线程时,新来任务会加入到此队列排队,队列满会创建救急线程执行任务
比较常见的有4个,用的最多是ArrayBlockingQueue
和LinkedBlockingQueue
ArrayBlockingQueue
:基于数组结构的有界阻塞队列,FIFO
。LinkedBlockingQueue
:基于链表结构的有界阻塞队列,FIFO
。DelayedWorkQueue
:是一个优先级队列,它可以保证每次出队的任务都是当前队列中执行时间最靠前的SynchronousQueue
:不存储元素的阻塞队列,每个插入操作都必须等待一个移出操作。
ArrayBlockingQueue
的LinkedBlockingQueue
区别
LinkedBlockingQueue | ArrayBlockingQueue |
---|---|
默认无界,支持有界 | 强制有界 |
底层是链表 | 底层是数组 |
是懒惰的,创建节点的时候添加数据 | 提前初始化Node 数组 |
入队会生成新Node | Node 需要是提前创建好的 |
两把锁(头尾) | 一把锁 |
左边是LinkedBlockingQueue
加锁的方式,右边是ArrayBlockingQueue
加锁的方式
LinkedBlockingQueue
读和写各有一把锁,性能相对较好ArrayBlockingQueue
只有一把锁,读和写公用,性能相对于LinkedBlockingQueue
差一些
3. 如何确定核心线程数?
在设置核心线程数之前,需要先熟悉一些执行线程池执行任务的类型
IO
密集型任务
一般来说:文件读写、DB
读写、网络请求等
推荐:核心线程数大小设置为2N+1
(N
为计算机的CPU
核数)
CPU
密集型任务
一般来说:计算型代码、Bitmap
转换、Gson
转换等
推荐:核心线程数大小设置为N+1
(N
为计算机的CPU
核数)
java
代码查看CPU
核数
参考回答:
① 高并发、任务执行时间短 -->
( CPU核数+1
),减少线程上下文的切换
② 并发不高、任务执行时间长
-
IO
密集型的任务-->
(CPU核数 * 2 + 1
) -
计算密集型任务
-->
(CPU核数+1
)
③ 并发高、业务执行时间长,解决这种类型任务的关键不在于线程池而在于整体架构的设计,看看这些业务里面某些数据是否能做缓存是第一步,增加服务器是第二步,至于线程池的设置,设置参考(2)
4. 线程池的种类有哪些?
在java.util.concurrent.Executors
类中提供了大量创建连接池的静态方法,常见就有四种
-
创建使用固定线程数的线程池
-
核心线程数与最大线程数一样,没有救急线程
-
阻塞队列是
LinkedBlockingQueue
,最大容量为Integer.MAX_VALUE
-
适用场景:适用于任务量已知,相对耗时的任务
-
案例:
public class FixedThreadPoolCase { static class FixedThreadDemo implements Runnable{ @Override public void run() { String name = Thread.currentThread().getName(); for (int i = 0; i < 2; i++) { System.out.println(name + ":" + i); } } } public static void main(String[] args) throws InterruptedException { //创建一个固定大小的线程池,核心线程数和最大线程数都是3 ExecutorService executorService = Executors.newFixedThreadPool(3); for (int i = 0; i < 5; i++) { executorService.submit(new FixedThreadDemo()); Thread.sleep(10); } executorService.shutdown(); } }
-
-
单线程化的线程池,它只会用唯一的工作线程来执行任 务,保证所有任务按照指定顺序(
FIFO
)执行-
核心线程数和最大线程数都是1
-
阻塞队列是
LinkedBlockingQueue
,最大容量为Integer.MAX_VALUE
-
适用场景:适用于按照顺序执行的任务
-
案例:
public class NewSingleThreadCase { static int count = 0; static class Demo implements Runnable { @Override public void run() { count++; System.out.println(Thread.currentThread().getName() + ":" + count); } } public static void main(String[] args) throws InterruptedException { //单个线程池,核心线程数和最大线程数都是1 ExecutorService exec = Executors.newSingleThreadExecutor(); for (int i = 0; i < 10; i++) { exec.execute(new Demo()); Thread.sleep(5); } exec.shutdown(); } }
-
-
可缓存线程池
-
核心线程数为0
-
最大线程数是
Integer.MAX_VALUE
-
阻塞队列为
SynchronousQueue
:不存储元素的阻塞队列,每个插入操作都必须等待一个移出操作。 -
适用场景:适合任务数比较密集,但每个任务执行时间较短的情况
-
案例:
public class CachedThreadPoolCase { static class Demo implements Runnable { @Override public void run() { String name = Thread.currentThread().getName(); try { //修改睡眠时间,模拟线程执行需要花费的时间 Thread.sleep(100); System.out.println(name + "执行完了"); } catch (InterruptedException e) { e.printStackTrace(); } } } public static void main(String[] args) throws InterruptedException { //创建一个缓存的线程,没有核心线程数,最大线程数为Integer.MAX_VALUE ExecutorService exec = Executors.newCachedThreadPool(); for (int i = 0; i < 10; i++) { exec.execute(new Demo()); Thread.sleep(1); } exec.shutdown(); } }
-
-
提供了“延迟”和“周期执行”功能的
ThreadPoolExecutor
。-
适用场景:有定时和延迟执行的任务
-
案例:
public class ScheduledThreadPoolCase { static class Task implements Runnable { @Override public void run() { try { String name = Thread.currentThread().getName(); System.out.println(name + ", 开始:" + new Date()); Thread.sleep(1000); System.out.println(name + ", 结束:" + new Date()); } catch (InterruptedException e) { e.printStackTrace(); } } } public static void main(String[] args) throws InterruptedException { //按照周期执行的线程池,核心线程数为2,最大线程数为Integer.MAX_VALUE ScheduledExecutorService scheduledThreadPool = Executors.newScheduledThreadPool(2); System.out.println("程序开始:" + new Date()); /** * schedule 提交任务到线程池中 * 第一个参数:提交的任务 * 第二个参数:任务执行的延迟时间 * 第三个参数:时间单位 */ scheduledThreadPool.schedule(new Task(), 0, TimeUnit.SECONDS); scheduledThreadPool.schedule(new Task(), 1, TimeUnit.SECONDS); scheduledThreadPool.schedule(new Task(), 5, TimeUnit.SECONDS); Thread.sleep(5000); // 关闭线程池 scheduledThreadPool.shutdown(); } }
-
5. 为什么不建议用Executors
创建线程池?
参考阿里开发手册《Java
开发手册-嵩山版》
6. 线程池使用场景CountDownLatch
、Future
(你们项目哪里用到了多线程)?
6.1 CountDownLatch
CountDownLatch
(闭锁/倒计时锁)用来进行线程同步协作,等待所有线程完成倒计时(一个或者多个线程,等待其他多个线程完成某件事情之后才能执行)
-
其中构造参数用来初始化等待计数值
-
await()
用来等待计数归零 -
countDown()
用来让计数减一
案例代码:
public class CountDownLatchDemo {
public static void main(String[] args) throws InterruptedException {
//初始化了一个倒计时锁 参数为 3
CountDownLatch latch = new CountDownLatch(3);
new Thread(() -> {
System.out.println(Thread.currentThread().getName()+"-begin...");
try {
Thread.sleep(1000);
} catch (InterruptedException e) {
throw new RuntimeException(e);
}
//count--
latch.countDown();
System.out.println(Thread.currentThread().getName()+"-end..." +latch.getCount());
}).start();
new Thread(() -> {
System.out.println(Thread.currentThread().getName()+"-begin...");
try {
Thread.sleep(2000);
} catch (InterruptedException e) {
throw new RuntimeException(e);
}
//count--
latch.countDown();
System.out.println(Thread.currentThread().getName()+"-end..." +latch.getCount());
}).start();
new Thread(() -> {
System.out.println(Thread.currentThread().getName()+"-begin...");
try {
Thread.sleep(1500);
} catch (InterruptedException e) {
throw new RuntimeException(e);
}
//count--
latch.countDown();
System.out.println(Thread.currentThread().getName()+"-end..." +latch.getCount());
}).start();
String name = Thread.currentThread().getName();
System.out.println(name + "-waiting...");
//等待其他线程完成
latch.await();
System.out.println(name + "-wait end...");
}
}
6.2 案例一(es
数据批量导入)
在我们项目上线之前,我们需要把数据库中的数据一次性的同步到es
索引库中,但是当时的数据好像是1000万左右,一次性读取数据肯定不行(oom
异常),当时我就想到可以使用线程池的方式导入,利用CountDownLatch
来控制,就能避免一次性加载过多,防止内存溢出
整体流程就是通过CountDownLatch+线程池
配合去执行
详细实现流程:
6.3 案例二(数据汇总)
在一个电商网站中,用户下单之后,需要查询数据,数据包含了三部分:订单信息、包含的商品、物流信息;这三块信息都在不同的微服务中进行实现的,我们如何完成这个业务呢?
-
在实际开发的过程中,难免需要调用多个接口来汇总数据,如果所有接口(或部分接口)的没有依赖关系,就可以使用
线程池+future
来提升性能 -
报表汇总
6.4 案例三(异步调用)
在进行搜索的时候,需要保存用户的搜索记录,而搜索记录不能影响用户的正常搜索,我们通常会开启一个线程去执行历史记录的保存,在新开启的线程在执行的过程中,可以利用线程提交任务
7. 如何控制某个方法允许并发访问线程的数量?
Semaphore
信号量,是JUC
包下的一个工具类,我们可以通过其限制执行的线程数量,达到限流的效果
当一个线程执行时先通过其方法进行获取许可操作,获取到许可的线程继续执行业务逻辑,当线程执行完成后进行释放许可操作,未获取达到许可的线程进行等待或者直接结束。
Semaphore
两个重要的方法
lsemaphore.acquire()
: 请求一个信号量,这时候的信号量个数-1(一旦没有可使用的信号量,也即信号量个数变为负数时,再次请求的时候就会阻塞,直到其他线程释放了信号量)
lsemaphore.release()
:释放一个信号量,此时信号量个数+1
线程任务类:
public class SemaphoreCase {
public static void main(String[] args) {
// 1. 创建 semaphore 对象
Semaphore semaphore = new Semaphore(3);
// 2. 10个线程同时运行
for (int i = 0; i < 10; i++) {
new Thread(() -> {
try {
// 3. 获取许可
semaphore.acquire();
} catch (InterruptedException e) {
e.printStackTrace();
}
try {
System.out.println("running...");
try {
Thread.sleep(1000);
} catch (InterruptedException e) {
e.printStackTrace();
}
System.out.println("end...");
} finally {
// 4. 释放许可
semaphore.release();
}
}).start();
}
}
}