目录
前言:
定时器
使用标准库中定时器
模拟实现定时器
线程池
使用标准库中的线程池
代码实现
ThreadPoolExecutor类介绍
构造方法参数介绍
拒绝策略介绍
模拟实现线程池
代码实现
提出问题
小结:
前言:
这篇文章同上一篇文章都是介绍多线程下的一些案例。我们可以通过这些案例更加深入的了解多线程编程。
定时器
所谓定时器就是指定一些任务,在后面的某一时刻执行。这里先使用标准库中的定时器。标准库中用Timer类来表示定时器。使用schedule方法来提交任务和指定任务执行时间。
注意:
- delay是指定从当前代码执行时间开始以后多久时间,单位为毫秒。
- task就算具体指定的任务,我们可以清楚看到它的类型为TimerTask。TimerTask是一个抽象类,它实现了Runnable接口。
使用标准库中定时器
public class ThreadDemo23 {
public static void main(String[] args) {
Timer timer = new Timer();
timer.schedule(new TimerTask() {
@Override
public void run() {
System.out.println("11111");
}
}, 3000);
timer.schedule(new TimerTask() {
@Override
public void run() {
System.out.println("22222");
}
}, 2000);
timer.schedule(new TimerTask() {
@Override
public void run() {
System.out.println("33333");
}
}, 1000);
}
}
注意:这里可以看见是按照时间打印的日志,并且打印完进程并没有结束。这是因为执行任务的线程没有结束,这个线程是前台线程,阻止进程的结束。
模拟实现定时器
思路:
实现定时器需要添加任务。如果有很多任务,我们每次要执行的都是时间最早的任务,因此我们使用优先级队列PriorityBlockingQueue来保存任务(阻塞队列多线程情况下线程安全)。还需要一个线程始终检测任务队列的线程,时间到了就用这个线程来执行任务。
这里的任务需要添加时间,因此我们将任务和时间设计为一个类,优先级队列就存储这个对象即 可。这个对象需要有比较性,可以实现Comparable接口,重写compareTo方法,用时间进行比较。也可以直接传一个比较器Comparator对象即可。
检测线程每次拿出优先级队列里最早的任务比较,如果当前时间还没到,就把这个任务又入队列。否则就执行这个任务。
同样的,我们也采取schedule方法来提交任务,当我们往优先级队列提交任务时,需要进行时间转换:当前时间 + 所指定的时间。这样就符合需求了。先展示代码,随后介绍多线程情况下线程安全问题。
class MyTask implements Comparable<MyTask> {
private long time;
private Runnable runnable;
public MyTask(Runnable runnable, long time) {
this.runnable = runnable;
this.time = time;
}
public long getTime() {
return time;
}
public void run() {
runnable.run();
}
@Override
public int compareTo(MyTask o) {
return (int) (this.time - o.time);
}
}
class MyTimer {
//检测线程
private Thread t = null;
//存储任务
private PriorityBlockingQueue<MyTask> priorityBlockingQueue = new PriorityBlockingQueue<>();
public MyTimer() {
t = new Thread(()-> {
while (true) {
try {
//当代码执行到currentTime这一行,如果被调度走,再执行schedule改变了堆顶元素,即等待时间就变了,但这是没有wait那么notify就
//空打了一炮,当线程调度回来时,已经落后于时间了,即错过了依次一次schedule
//所以就需要保证原子性,让这个线程只有wait了,才执行notify刷新等待时间,不能空打一炮。那么就增加锁的范围
synchronized (this) {
//获取最小时间任务
MyTask myTask = priorityBlockingQueue.take();
//当前时间
long currentTime = System.currentTimeMillis();
//如果一直取的时间都小于,那么这个线程就会一直取,放。那么就需要wait等待一下
//等待的最大时间:currentTime - tmp.getTime()
if (currentTime < myTask.getTime()) {
priorityBlockingQueue.put(myTask);
this.wait(myTask.getTime() - currentTime);
} else {
myTask.run();
}
}
} catch (InterruptedException e) {
e.printStackTrace();
}
}
});
t.start();
}
public void schedule(Runnable runnable, long time) {
//时间转换
MyTask myTask = new MyTask(runnable, System.currentTimeMillis() + time);
priorityBlockingQueue.put(myTask);
//每入一个任务,就需要跟新一下wait的等待时间
synchronized (this) {
this.notify();
}
}
}
注意:
多线程情况下,我们如果不使用wait方法进行阻塞。那么这个检测线程就会一直检测,当前时间离最早任务执行时间差距很大的话,就比较浪费系统资源了。因此当发现时间还每到时就进行阻塞(最大阻塞时间为当前时间离最早执行时间差),这里锁首先加到wait的头顶。每当进行schedule提交任务时,就用notify通知wait重新检测一次,因为有可能这次提交的任务比之前提交的任务时间更早。
由于线程调度是随机的,假设检测线程代码执行到wait方法上面(还没有执行wait),就被调度走了。这个时候schedule提交了一次比之前更早要执行的任务,由于检测线程没有阻塞,这里的notify就会空打一炮。当检测线程被调度回来时,它所记的时间还是上一次的,就错过了这次提交。
基于上述问题,我们扩大wait锁的范围。保证每次notify的时候,检测线程都在阻塞状态。现在就算出现了上述问题,notify这里就会阻塞等待,直到执行wait释放锁后,才能执行notify。
这里的锁都是在this上加着,如果使用了匿名内部类的方法来实现线程的创建,就会出bug。因为检测线程里的this和schedule方法里的this不是同一个对象。可以使用lambda表达式或者指定一个锁对象,不再用this就可以解决这个问题。
线程池
线程池里提供了一组线程,我们只需要向其中的阻塞队列提交任务,就会使用线程池里的线程来执行任务。
线程相比于进程是轻量不少,还有比线程更加轻量的协程。使用线程池也是因为创建一个线程需要操作系统内核来实现,其实还是比较消耗系统资源的。我们使用线程池为我们提供的线程就和内核没有关系,我们通过代码就可以实现,做到更加节省系统资源。
当我们向内核申请创建一个线程,就算把任务交给了内核,需要执行内核中的代码。但是内核服务于很多程序,我们就不确定内核什么时候执行我们提交的任务,那么就是不可控的。当我们直接使用线程池的里线程就和内核没有关系,这个时候相对于就是可控的。
使用标准库中的线程池
- newCachedThreadPool()方法线程数量是动态变化的。(根据任务的大小来确定)
- newFixedThreadPool()方法可以指定线程池中具体的线程数量。
- newScheduledThreadPool()方法使用线程池里的线程让任务延迟执行。
- newSingleThreadExecutor()方法指定一个线程来执行任务。
代码实现
public class ThreadDemo26 {
public static void main(String[] args) {
//线程池里的线程都是前台线程,阻止进程的结束
//创建线程池,指定线程数量为10个
ExecutorService executorService = Executors.newFixedThreadPool(10);
//这里涉及变量捕获,变量的生命周期不一样,就存在变量捕获
//变量捕获的前提需要这个变量没有改变
//变量捕获先把这个变量存起来,使用时再创建这个变量,把值赋过去
for(int i = 0; i < 1000; i++) {
int n = i;
executorService.submit(new Runnable() {
@Override
public void run() {
System.out.println(n);
}
});
}
}
}
注意:
这里使用的是指定具体线程数量的线程池。使用submit方法提交任务。
这里涉及变量捕获,变量i在主线程里是局部变量,如果想要在线程池中的线程使用这个变量就存在变量捕获(变量的生命周期不一样)。当线程使用的时候可能主线程中局部变量i已经释放了,变量捕获就是先把这个变量拷贝一份,使用的时候再创建出这个变量。变量捕获有个重要的前提这个变量没有变化,这个变量i显然变化了,那么就无法捕获,所以创建了临时变量n,来进行捕获。
这里的线程池对象是通过直接通过Executors类调用的方法来获取实例对象。构造方法虽然可以重载,但是无法表示参数完全一致,但是意义却不同的不同构造方法。这里通过不同的普通方法,就可以实例出不同的线程池对象。这样的设计方式称为工厂模式,这样的类称为工厂类,方法称为工厂方法。
Executors里面都是一些静态方法。它其实是对ThreadPoolExecutor这个类的包装,在包装的时候把Executors设计为了工厂模式。
ThreadPoolExecutor类介绍
Java标准文档中就提出程序员被敦促使用更方便的Executors工厂方法。ThreadPoolExecutor类比较复杂所以就包装出了这个工厂方法。
构造方法参数介绍
- corePoolSize:核心线程数。
- maximumPoolSize:最大线程数。maximumPoolSize - corePoolSize:临时线程数,用来对线程池线程数量动态增加的一些线程。
- keepAliveTime:临时线程的存活时间。当临时线程很久都没有使用,这些线程就被销毁了。
- unit:时间单位。
- workQueue:存储任务的工作队列。线程执行的任务都是从这个队列中拿的。
- threadFactory:用来创建线程。
- habdler:当阻塞队列满的时候,再向线程池提交新任务的“拒绝策略”。
拒绝策略介绍
- 如果队列满了,再添加新任务,直接抛异常。
- 如果队列满了,哪个线程提交的这个任务,就交给哪个线程执行。
- ‘如果队列满了,接受新任务,丢弃最早的任务。
- 如果队列满了,接收新任务,丢弃最新任务。
模拟实现线程池
思路:
首先提供阻塞队列用来存储提交的任务,然后创建一些线程来从阻塞队列中取任务执行。设计submit方法为阻塞队列中提交任务。
代码实现
class MyThreadPool {
private BlockingQueue<Runnable> queue = new LinkedBlockingQueue<>();
//创建线程,处理任务
public MyThreadPool(int n) {
for(int i = 0; i < n; i++) {
Thread t = new Thread(() -> {
while (true) {
try {
Runnable take = queue.take();
take.run();
} catch (InterruptedException e) {
throw new RuntimeException(e);
}
}
});
t.start();
}
}
//提交任务
public void submit(Runnable take) throws InterruptedException {
queue.put(take);
}
}
注意:这里在多线程情况下,调用submit方法,没有线程安全问题。因为这里只涉及到了向阻塞队列里提交任务。
提出问题
线程池中的线程设计多少合适呢?
答:这个需要根据任务量的大小来确定。因为不同的任务量需要在一定效率下执行这些任务线程数量是不同的。我们需要根据实际进行测试,观察系统资源的使用是否符合我们的预期来最终确定线程数量。
小结:
与大家共勉一句名言:”我希望,自己今后能以一朵花的姿态行走。穿越四季轮回,在无声中不颓废,不失色,一生花开成景,花落成诗“。