专栏简介: JavaEE从入门到进阶
题目来源: leetcode,牛客,剑指offer.
创作目标: 记录学习JavaEE学习历程
希望在提升自己的同时,帮助他人,,与大家一起共同进步,互相成长.
学历代表过去,能力代表现在,学习能力代表未来!
目录:
1. 线程池是什么?
2. 线程池的实现原理
3. 标准库中的线程池.
3.1 线程池的使用.
3.2 线程池的创建.
4. 实现线程池
1. 线程池是什么?
线程存在的意义:
想要搞清楚什么是线程池 , 首先要明白线程存在的意义. 由于使用进程并发编程开销过大 , 于是引入了线程 , 线程也叫做 "轻量级进程" , 创建/调度/销毁线程都比进程更加高效. 此时多线程在很多时候就可以代替多进程实现并发编程了.
随着并发度的提高 , 以及我们对性能标准的要求越来越高 , 线程已没有之前认为的那么"轻量"了 ,当我们需要频繁的创建/销毁线程 , 开销还是挺大的. 为了进一步提高效率 , 此时有两种方法:
- 1. 搞一个更轻量的线程=>协程/纤程 , Go语言之所以天然支持高并发 , 其原因之一就是内置了协程 , 但遗憾的是Java标准库中没有.
- 2. 使用线程池 , 来降低 创建/销毁线程的开销 , 很明显后者根符合实际情况.
线程池:
因此线程池就是我们的最终选择 , 说到线程池 , 可能会想到字符串常量池和数据库连接池 , 其实原理类似 , 就是把需要使用的线程创建好放到"池"中 , 后期需要使用的时候直接从池中取 , 使用完毕后再放回池中. 这两个操作比 创建/销毁 线程更高效 , 而且 创建/销毁 线程由操作系统内核调用 , 线程池 取出/放入 操作是代码就能是实现的.
操作系统内核:
相比于内核来说 , 用户态执行代码的行为是可控的 , 如果让内核在系统中 创建/和销毁 一个线程 , 就需要让内核来执行 , 但此时你不清楚内核背负着多少个任务(内核要为所有的程序提供服务) , 这样执行效率就非常不可控.
例如 , 将银行柜台当做操作系统内核 , 将银行大厅当做用户态. 在银行大厅 , 用户都是自由的想干啥干啥 , 类比线程池中 取出/放入 线程(非常干净利落的完成). 但有些操作需要在柜台内部完成 , 就好比是程序中的内核态 , 内核会给程序提供一些API 作为系统调用 , 程序可以通过系统调用 , 驱使内核完成一些工作(创建/销毁线程). 例如 , 用户想复印身份证 , 如果交给银行柜台去办 , 那么工作人员不一定立即去执行 , 因为工作人员为所有用户服务 , 可能需要先完成之前分配的任务再处理当前任务.
线程池的优点:
- 降低资源消耗. 通过重复利用创建好的线程 , 降低线程 创建/销毁 的造成的开销.
- 提高响应速度. 当任务到达时 , 任务可以不需等待就立即创建.
- 提高线程的可管理性. 线程是稀缺资源 , 如何无限制的创建不仅会消耗系统资源 , 还会降低系统的稳定性 , 使用线程池可以统一分配 , 调优和监控. 但是要合理使用线程池 , 还需对其实现原理了然于心.
2. 线程池的实现原理
当我们向线程池提交一个任务之后 , 线程池如何处理这个任务呢? 下面是线程池的主要处理流程:
- 1. 线程池判断核心线程池里的线程是否都在执行任务. 如果不是 , 则创建一个新的工作线程来执行任务. 如果核心线程池里的线程都在执行任务 , 则进入下个流程.
- 2. 线程池判断工作队列是否已满. 如果工作队列未满 , 则将新提交的任务存储在工作队列中. 如果工作队列满了 , 则进入下个流程.
- 3. 线程池判断线程池中的线程是否都处于工作状态. 如果不是 , 则创建一个新的工作线程来执行任务. 如果已经满了 , 则交给饱和策略来处理这个任务.
ThreadPoolExecutor 执行 execute() 方法的示意图:
- 1. 如果当前运行的线程少于 corePoolSize , 则创建新线程来执行任务(注意 , 这一步骤需要获取全局锁).
- 2. 如果运行的线程等于或多于 corePoolSize , 则将任务加入 BlockingQueue 队列.
- 3. 如果队列已满 , 则创建新的线程来执行任务.(注意 , 这一步骤需要获取全局锁)
- 4. 如果创建的线程将使当前运行的线程超过 maximumPoolSize , 任务将被拒绝 , 并调用 RejetctedExecutionHandler.rejectedExecution()方法.
3. 标准库中的线程池.
3.1 线程池的使用.
此处构造出一个 10 个线程的线程池 , 然后就可以随时安排任务让线程执行.
public static void main(String[] args) {
ExecutorService pool = Executors.newFixedThreadPool(10);
for (int i = 0; i < 1000; i++) {
int n = i;
pool.submit(new Runnable() {
@Override
public void run() {
System.out.println("Hello" + n);
}
});
}
}
程序运行结束后 , 虽然 main 线程结束了 , 但整个进程没有结束. 这是因为线程池中的线程都是前台线程 , 会阻止进程结束.
变量捕获
这段代码有个疑惑的点 , 为什么变量 i 要赋值给 n 后再打印?
这时因为 , 此处的 run() 方法属于 Runnable. 这个方法的执行时机 , 不是立刻马上而是在未来的某个节点(由于线程的抢占式执行随机调度). 而 i 属于主线程中的局部变量(在主线程的栈上) , 随着主线程这里的 for 执行完就销毁了 , 但是很有可能当主线程执行完后 , 当前 run() 的任务在线程池里还没排到. 因此为了避免作用域的差异 , 导致后续执行 run() 时 i 已经销毁 , 于是有了变量捕获 , 也就是主线程的 i 给当前 run() 方法的栈上拷贝一份.
3.2 线程池的创建.
我们可以通过 ThreadPoolExecutor 来创建一个线程池.
创建一个线程池时需要输入以下几个参数 , 如下:
- 1. corePoolSize (核心线程数的大小): 当提交一个任务到线程池时 , 线程池会创建一个线程来执行任务 , 即使其他空闲的线程能够执行新任务也会创建线程 , 等到需要执行的任务数大于线程池基本大小就不再会创建. 如果调用了线程池的 prestartAllCoreThreads() 方法 , 线程池就会提前创建并启动所有基本线程.(核心线程就相当于公司中的正式员工 , 基本线程相当于实习员工)
- 2.maximumPoolSize(最大线程数): 线程池允许创建的最大线程数 , 如果队列满了 , 并且已创建的线程数小于最大线程数 , 则线程池会创建新的线程执行任务.(值得注意的是 , 如果使用了无界任务队列这个参数就没有什么效果)
举个例子 , 线程池中的线程扮演者两类角色 , 核心线程扮演者正式员工 , 其余线程扮演者实习员工.一但公司有任务都是正式员工优先做 , 实在缺人手才会招实习员工. 而且正式员工运行摸鱼 , 实习员工没有这项特权 , 一但摸鱼时间过久就会被销毁.所以线程池的整体策略就是: 正式员工保底 , 临时工动态调整. 那么实际开发中线程池的线程数设置为多少合适呢?
这时就需要分情况讨论 , 考虑两个极端情况:
- 1). CPU密集型 , 每个线程需要执行的任务都需要高速运行CPU(进行一系列算数操作) , 此时线程池的线程数 . 最多不应该超过 CPU 核数 , 因为设置的线程过多也没有机会执行.
- 2). IO密集型 , 每个线程的任务就是等待 IO (读写硬盘 , 读写网卡 , 等待用户输入.....) , 不会占用过多的 CPU , 即使线程处于阻塞状态也会参与 CPU 的调度 , 这时理论上来说 , 线程数可以设置为无穷大 , 不会受制于 CPU 核数.
综上 , 由于实际开发过程中都是两种情况的结合 , 因此需要我们进行实际的测试 , 看其结果的效率和实际资源占用是否符合我们预期.
- 3. keepAliveTime(线程活动保持时间): 线程池的工作空闲后 , 保持存活的时间.(也就是实习生摸鱼的最大时间)
- 4. TimeUmit(线程活动保持时间的单位): 可选时间单位有天 (DAYS) , 小时(HOURS) , 分钟(MINUTES) , 毫秒(MILLISECONDS) , 微秒(MICROSECONDS , 千分之一毫秒)和纳秒(NANOSECONDS , 千分之一微秒).
- 5. BlockingQueue<Runnable> workQueue(任务队列): 用于保存等待执行的任务的阻塞队列.
- 6.RejectedExecutionHandler(饱和策略): 当队列和线程都满了 , 说明线程池处于饱和状态 , 那么必须采取一种策略处理提交的新任务. 这个策略默认情况下是AbortPolicy.
- 7.threadFactory: 用线程提供的工厂类来创建线程.
在 JDK 1.5 之后 Java 线程池框架提供了以下四种策略:
- AbortPolicy: 直接抛出异常.
- CallerRunsPolicy: 只用调用者所在的线程来执行任务.
- DiscardOldestPolicy: 丢弃队列中最近的一个任务 , 并执行当前任务.
- DiscardPolicy: 不处理 , 丢弃掉.
4. 实现线程池.
实现线程池的核心就两点: 1.创建线程. 2. 注册任务给线程池.
class MyThreadPool {
private BlockingQueue<Runnable> queue = new LinkedBlockingQueue();
//使用构造方法创建线程
public MyThreadPool(int n) {
for (int i = 0; i < n; i++) {
Thread t = new Thread(() -> {
while (true) {
try {
Runnable runnable = queue.take();
runnable.run();
} catch (InterruptedException e) {
throw new RuntimeException(e);
}
}
});
t.start();
}
}
//注册任务给线程池
public void submit(Runnable runnable) {
try {
queue.put(runnable);
} catch (InterruptedException e) {
throw new RuntimeException(e);
}
}
}
public class ThreadDemo13 {
public static void main(String[] args) {
MyThreadPool myThreadPool = new MyThreadPool(10);
for (int i = 0; i < 1000; i++) {
int n = i;
myThreadPool.submit(new Runnable() {
@Override
public void run() {
System.out.println("Hello" + n);
}
});
}
}
}