文章目录
- 线程池
- 1 什么是线程池
- 2 标准库中的线程池
- 2.1 什么是工厂模式
- 2.2 如何使用标准库中的线程池完成任务
- 2.3 ThreadPoolExecutor 构造方法的解释
- 3 实现一个线程池
线程池
1 什么是线程池
随着并发程度的提高,随着对性能要求标准的提高会发现,好像线程创建也没有那么的轻量。
当需要频繁的创建和销毁线程的时候,就会发现开销好像还挺大的。
解决办法:
- “轻量级线程”,也就是 协程 和 纤程
- 使用 线程池,来降低创建和销毁线程的开销。
线程池就像是一个存储多个线程的容器,事先把需要使用的线程创建好并放到池中。
后续使用的时候,直接从池子里拿即可,用完了就再还给池子。
创建和销毁线程,是交由操作系统内核完成的
从池子里获取和还给池子,是用户代码就可以实现的。
所以,从池里获取和还给池,要比创建和销毁更加的高效。
2 标准库中的线程池
在 java 标准库中,也是提供了现成的的线程池,可以直接使用。
ExecutorService pool = Executors.newFixedThreadPool(10);
上述代码的作用是 创建一个线程池,池子里的线程数目固定为10个
此处的 new (newFixedThreadPool)是方法名字的一部分,而不是 new 关键字。
这个操作使用某个类中的某个静态方法,直接构造出一个对象来。
(相当于是把 new 操作给隐藏到这样的方法背后了)
像这样的方法就称为 “工厂方法”,提供这个工厂方法的类就叫做 “工厂类”。
此处的代码使用的是 “工厂模式”,这种设计模式。
2.1 什么是工厂模式
工厂模式可以用一句话表示,使用普通的方法来代替构造方法创建对象。
如果构造方法只是构造一种对象,那还比较好办;但如果要构造多种不同情况的对象就不好办了。
举个例子:
class Point {
public Point(double x, double y) {}
public Point(double r, double a) {}
}
很明显这里的代码有问题,正常来说,多个构造方法是通过 “重载” 的方式来提供的。
重载的要求是,方法名相同,参数的个数或者类型不同。
上述的代码,编译器会因为这是一个方法写了两次而报错。
为了解决上面的问题就可使用工厂模式。
class PointFactory {
public static Point makePointByXY(double x, double y) {}
public static Point makePointByRA(double r, double a) {}
}
Point p = PointFactory.makePointByXY(10, 20);
普通方法,方法名字没有限制。因此有很多方式构造,就可以直接使用不同的方法名即可。
此时方法的参数是否要区分,已经不重要了。
2.2 如何使用标准库中的线程池完成任务
线程池提供了一个重要的方法,submit 可以给线程池提交若干个任务。
package thread;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
// 使用标准库的线程池
public class ThreadDemo6 {
public static void main(String[] args) {
// 创建一个有十个线程的线程池
ExecutorService pool = Executors.newFixedThreadPool(10);
for (int i = 0; i < 1000; i++) {
int num = i;
pool.submit(new Runnable() {
@Override
public void run() {
System.out.println("酒國最牛!!!" + num);
}
});
}
}
}
此处要注意的是,当前操作是往线程池里放 1000 个任务。
这 1000 个任务就是这 10 个线程来平均分配的,差不多就是一个线程执行 100 个。
但是这里的平均又不是严格意义上的平均,可能有的多一个,有的少一个。
(每个线程都执行完一个任务之后,再立即执行下一个任务。由于每个任务执行的时间都差不多,因此每个线程做的任务数量就差不多)
进一步的可以认为,这 1000 个任务,就再一个队列中排队。
这 10 个线程,就一次来去队列中的任务,取一个就执行一个,执行完之后再执行下一个。
标准库中有哪几种不同的线程池
newCachedThreadPool() 这个线程池里的线程数量是动态变化的。
如果任务多了,就多搞几个线程,如果任务少了,就少搞几个线程。
newSingleThreadExecutor() 这个线程池里的线程只有一个。
newFixedThreadPool 这个线程池上面代码介绍了。
newScheduledThreadPool() 这个线程池类似于定时器,也是让任务延时执行。
只不过执行的时候不是有扫描线程自己执行了,而是由单独的线程池来执行。
2.3 ThreadPoolExecutor 构造方法的解释
上述这些线程池,本质都是通过包装 ThreadPoolExecutor 来实现出来的
ThreadPoolExecutor 这个线程池使用起来更麻烦一点,所以才提供了工厂类,让其的使用变得简单。
这里指的麻烦的意思是功能更强大
打卡 java 文档 ,ThreadPoolExecutor 这个线程池就在下面这个包里。
找到上面的包并点击它,下滑找到下面的圈出的内容。
1、corePoolSize 这个参数是核心线程数,maximumPoolSize 这个参数是最大核心线程数。
2、ThreadPoolExecutor 线程池相当于把里面的线程分为两类:
- 一类是正式员工(核心线程)
- 一类是临时工/实习生
这两者加在一起就组成了最大线程数。
如果任务比较多,显然需要更多线程,此时多搞一些线程,成本也是值得的。
但是一个程序有时候任务多,有时候任务少,如果此时的任务比较少,线程还是那么多,
就非常不合适了。此时就需要对现有的线程进行一定的淘汰。
整体的策略是。正式员工保底,临时工动态调节。
不同的程序特点不同,需要设置的线程数也是不同的。
这里考虑两个极端情况:
-
CPU 密集型
每个线程执行的任务都是狂转CPU(进行一系列的算数运算),此时线程池线程数,最多也不该超过CPU核数。
如果搞的线程特别多了,也没有足够的空间使用。 -
IO 密集型
每个线程干的工作就是等待 IO(读写硬盘、读写网卡、等待用户输入…),不吃 CPU。
此时这样的线程处于阻塞状态,不参与CPU调度。这个时候多搞一些线程都无所谓,不再受制于CPU核数了。
然而实际开发中并没有程序符合这两种理想模型,真实的程序往往一部分是CPU密集型,一部分是IO密集型。
具体这个程序多少工作量是 吃CPU,多少工作量是等待IO,这是不确定的。
我们也只能在实践中确定线程的数量,也就是通过测试和实验的方式。
3、long keepAliveTime 这个参数描述了临时工可以摸鱼的最大时间,也就是临时使用的线程。
4、TimeUnit unit 这个参数描述的是时间单位(s、ms、分钟)
5、BlockingQueue workQueue 这个参数描述了这是线程池的任务队列。
每个工作线程都是在不停的尝试 take 的,如果有任务,就 take 成功;如果没有就阻塞。
6、ThreadFactory threadFactory 这个参数是用于创建线程的,线程池是需要创建线程的。
7、RejectedExecutionHandler handler 这个参数描述了线程池的 “拒接策略”
也是一个特殊的对象,描述了当线程池任务队列满了,如果继续添加任务会有什么样的行为。
以下是几种拒绝策略:
- 第一种策略是,如果任务太多了,队列满了,就直接抛出异常
- 第二种策略是,如果队列满了,多出来的任务是谁添加的,谁就负责执行
- 第三种策略是,如果队列满了,丢弃最早的任务
- 第四种策略是,丢弃最新的任务
3 实现一个线程池
一个线程池,至少要有两大部分。
- 阻塞队列 - 保存任务
- 若干个工作线程
1、创建若干个线程
public MyThreadPool(int n) {
// 创建线程
for (int i = 0; i < n; i++) {
Thread thread = new Thread(() -> {
while (true) {
try {
Runnable runnable = queue.take(); //拿到任务
runnable.run();//执行任务
}catch (InterruptedException e) {
e.printStackTrace();
}
}
});
thread.start(); //启动线程
}
}
2、给线程池里的线程注册任务
public void submit(Runnable runnable) {
try {
queue.put(runnable);
} catch (InterruptedException e) {
e.printStackTrace();
}
}
3、测试结果
MyThreadPool myThreadPool = new MyThreadPool(10);
for (int i = 0; i < 1000; i++) {
int n = i;
myThreadPool.submit(new Runnable() {
@Override
public void run() {
System.out.println("酒國:" + n);
}
});
}
完整代码
package thread;
import java.util.concurrent.BlockingQueue;
import java.util.concurrent.LinkedBlockingQueue;
class MyThreadPool {
// 此处不涉及时间,只有任务,使用 Runnable 即可
private BlockingQueue<Runnable> queue = new LinkedBlockingQueue<>();
// n 表示线程的数量
public MyThreadPool(int n) {
// 创建线程
for (int i = 0; i < n; i++) {
Thread thread = new Thread(() -> {
while (true) {
try {
Runnable runnable = queue.take(); //拿到任务
runnable.run();//执行任务
}catch (InterruptedException e) {
e.printStackTrace();
}
}
});
thread.start(); //启动线程
}
}
// 注册任务给线程池
public void submit(Runnable runnable) {
try {
queue.put(runnable);
} catch (InterruptedException e) {
e.printStackTrace();
}
}
}
public class ThreadDemo7 {
public static void main(String[] args) {
MyThreadPool myThreadPool = new MyThreadPool(10);
for (int i = 0; i < 1000; i++) {
int n = i;
myThreadPool.submit(new Runnable() {
@Override
public void run() {
System.out.println("酒國:" + n);
}
});
}
}
}