多线程代码案例
- 单例模式
- 初步了解
- 饿汉模式
- 懒汉模式
- 线程安全问题分析
- 存在的问题
- 生产者消费者模型
- 初识生产者消费者模型
- 初识阻塞队列
- 生产者消费者模型的意义
- BlockingQueue
- 阻塞队列模拟实现
- 定时器
- 初识计时器
- 初识Timer类
- 初识 schedule() 方法
- 简易定时器的实现
- 思路讲解
- 代码书写
- 线程池
- 线程池是什么
- Java中的线程池
- 工厂模式
- ThreadPoolExecutor介绍
- 实现一个简单的线程池
单例模式
初步了解
单例模式, 是非常经典的一个设计模式. 那什么是设计模式呢?
设计模式, 可以说是一种类似于模板的东西. 在实际的开发过程中, 由于会经常遇到一些重复的场景, 此时一些优秀的程序开发者, 就根据这种场景去定制了一系列的解决方案, 此时后续遇到类似的场景的时候, 就可以直接进行复用, 同时参考这些解决方案去写代码, 可以一定程度上的保证代码的质量. 而这个解决方案就是所谓的设计模式
而这个单例模式, 它主要就是约定了在特定场景中, 希望类只有一个对象.
那此时可能有人就要问了: 你这个要求也太简单了, 你不让我创建第二个对象, 那我不创建不就行了? 为啥还要用什么设计模式?
实际上, 如果不采用设计模式, 那么此时的这个约定就类似于一种口头协议, 那此时可能你前几天写代码的时候创建了一个对象, 过几天可能就忘记了, 然后又创建了一个此时这个口头协议可能就会被破坏掉了. 而通过设计模式去设计一个单例模式, 此时就能够让编译器去强行的帮我们进行检查. 其实我们之前也遇到过很多这样的机制, 例如@Override
注解, 就是用于让编译器强制检查你的方法是不是真正的重写了的, 如果不是一个重写方法, 那么编译器就会报错来进行提示.
接下来我们就来学习一下如何实现一个单例模式, 一般来说单例模式分为两种, 分别是饿汉模式和懒汉模式, 我们依次来看.
饿汉模式
饿汉模式, 主要指的就是这个类它会在初始化的时候直接创建好对象. 同时, 为了能够防止它被调用构造方法创建新的对象, 我们需要将其的对象设置为私有构造方法.
随后如果其他地方需要用到这个对象, 那么我们就直接把这个对象拿出来就行, 因此我们还需要提供一个方法来用于获取对象.
总结一下饿汉模式的实现步骤:
- 内部直接提供一个静态对象, 类加载的时候就会直接创建
- 私有化构造方法, 防止别人调用
- 提供一个 get 方法用于获取实例
那么这个代码实现起来还是比较简单的, 我们直接看代码
public class HungerSingleTon {
// 创建对象
private static HungerSingleTon instance = new HungerSingleTon();
// 私有构造方法
private HungerSingleTon(){}
// 获取实例方法
public static HungerSingleTon getInstance(){
return instance;
}
}
此时我们就实现了一个饿汉版本的单例模式, 但是此时这个单例模式有一个问题: 如果我们的这个类用不上呢? 难道就要让这个实例占着一块空间吗? 这明显不够好, 因此就有了懒汉模式
懒汉模式
懒汉模式与饿汉模式的唯一区别就是, 懒汉模式比较懒, 如果不使用实例的话就不创建, 等到要用的时候才会进行创建.
此时要实现这个懒汉模式, 我们就可以默认该实例为 null, 当第一次使用的时候才会创建实例, 后面使用则直接返回已创建的实例.
那我们要如何才能判断是否是第一次使用呢?
其实也很简单, 只要实例为 null, 那么此时不就代表了是第一次调用吗? 此时就先进行创建, 然后再返回. 后续再获取实例的时候, 就不需要再创建了, 而是可以直接返回.
这个思路也是比较容易理解的, 直接在饿汉模式的基础上进行一些修改即可, 因此我们直接看代码
public class LazySingleTon {
private static LazySingleTon instance = null;
private LazySingleTon(){}
public static LazySingleTon getInstance() {
if(instance == null){
instance = new LazySingleTon();
}
return instance;
}
}
线程安全问题分析
既然是一个多线程实例章节, 而这里的单例模式还没有涉及到多线程, 自然就说明我们这里的单例模式就还没有结束. 因此我们继续上面的话题, 提出一个问题: 上面的这两个单例模式, 是否含有线程安全问题? 换一个更加具体的问题, 如果多个线程同时调用 getInstance() 方法, 会不会引发线程安全问题?
很明显, 我们的饿汉模式中调用 getInstance() 方法的时候, 只涉及到了对象的读取, 因此此时并不会涉及到线程安全问题.
但是反观懒汉模式, 懒汉模式在第一次调用 getInstance() 方法的时候的时候, 涉及到了修改操作, 那么此时就可能会有线程不安全问题.
例如张三和李四, 假设它们前面有一个苹果, 那么如果它们两个只是看着, 不去拿走/更换这个苹果, 那么此时就不会有问题. 但是假如它们两个的其中一个, 想要拿走/更换这个苹果, 那么此时就有可能发生冲突.
比如张三正在观察这个苹果, 然后李四给它拿走了, 此时张三再次去观察苹果的时候, 就会发现苹果不见了, 此时观察出来的结果就是错误的了.
而对于线程安全问题, 我们学习过的一个的解决方案就是加锁. 但是加锁还涉及到一个关键问题, 要在什么地方加锁? 此时我们就需要对这个代码的执行流程进行分析
很明显, 涉及到修改的主要就是这个判断语句以及下面的 new 操作
此时如果有两个线程按照下面这个执行流程来执行的话, 此时就是有问题的
很明显, 此时的这两个操作如果可以被拆分, 那么就会导致创建了两次对象, 违背了单例模式的初衷.
此时可能有人就要问了: 你这个多创建一次, 似乎也不会有什么问题吧?
目前我们看不出区别, 是因为这个对象非常的轻量. 但是假如说这个对象创建的过程中需要做巨大的准备工作, 例如读取几个 G 的文件, 那么这个多创建一次对象的开销就会变得非常大了.
根据我们上面的讨论, 这个 if 判断的过程和创建对象的过程是不可拆分的, 如果拆分就会引发线程安全问题. 因此为了合并这两个操作, 我们就要对这个创建的过程进行加锁
public class LazySingleTon {
private static LazySingleTon instance = null;
private LazySingleTon(){}
public static LazySingleTon getInstance() {
// 加锁
synchronized (LazySingleTon.class){
if(instance == null){
instance = new LazySingleTon();
}
}
return instance;
}
}
但是此时又引出了另一个问题, 这个线程安全问题只会在第一次发生, 而加锁又是一个耗时操作, 也就是说后面每一次我们都会进行一个没有必要但是又比较耗时的操作.
因此我们又要想办法来只让第一次操作的时候才加锁, 那么此时我们就可以再加一层 if 判定
public class LazySingleTon {
private static LazySingleTon instance = null;
private LazySingleTon(){}
public static LazySingleTon getInstance() {
if(instance == null){
// 加锁
synchronized (LazySingleTon.class){
if(instance == null){
instance = new LazySingleTon();
}
}
}
return instance;
}
}
这里我们看到, 虽然这里套了两个相同的 if 判断, 但是这里这两个 if 的目的却是完全不同的, 并且也是缺一不可的.
其中第一个 if 用于判断是否要加锁, 核心目的是为了减少后面获取对象的开销. 而第二个 if 则是用于去判断是否要创建对象, 核心目的是为了保证只创建一次对象.
我们现在的代码, 虽然看似已经非常的不错了, 不过还是有一个隐藏的问题. 这个问题也是由于编译器的一个优化导致的, 这个优化被称作指令重排序.
指令重排序指的是编译器会保证在代码逻辑不变的前提下将指令的执行顺序进行改变, 从而保证代码的执行效率能够提升. 例如张三和李四今天去旅游了, 张三原本规划的路线是如下所示的
但是李四发现张三的这个路线不太行, 效率有点低, 此时就给张三的路线进行了优化
在上面的这个过程中, 李四就相当于是编译器, 它把张三指令的执行顺序进行了改变, 保证在结果不变的前提下, 更加高效的运行.
在单线程的情况下, 这个优化是没有问题的, 但是在多线程的环境下, 也可能引发线程安全问题.
一般来说, 创建对象的过程可以粗略分为三步: 1. 申请空间 2. 初始化对象 3. 赋值引用
而这里, 执行第二步和第三步的指令, 就有可能会被编译器重排序, 那么此时可能会导致什么问题呢? 我们依旧是看一个图
很明显, 一旦后面的两个指令被重排序了, 那么此时就有可能会导致有其他的线程会操作到一个没有初始化好的一个对象, 那么此时自然就是不科学的.
因此为了防止这种问题的发生, 我们就要让编译器不要优化我们的代码. 那么我们就需要用到 volatile 来修饰对象, 防止编译器对其进行优化
public class LazySingleTon {
private volatile static LazySingleTon instance = null;
private LazySingleTon(){}
public static LazySingleTon getInstance() {
if(instance == null){
// 加锁
synchronized (LazySingleTon.class){
if(instance == null){
instance = new LazySingleTon();
}
}
}
return instance;
}
}
存在的问题
那么到此为止, 一个基本的单例模式就完成了. 不过这里实际上还是有一些问题的, 例如通过反射是否能够打破这个单例模式?
实际上, 由于反射是能够通过设定访问权限访问到私有方法的, 因此即使我们使用 private 来修饰构造方法, 同样可以通过反射来进行初始化从而构造出新的对象. 例如下面的这个代码, 就可以打破我们实现的单例模式
public class Main {
public static void main(String[] args) throws NoSuchMethodException, InvocationTargetException, InstantiationException, IllegalAccessException {
// 尝试通过反射来创建一个新的对象
Constructor<LazySingleTon> declaredConstructor = LazySingleTon.class.getDeclaredConstructor();
declaredConstructor.setAccessible(true);
LazySingleTon lazySingleTon = declaredConstructor.newInstance();
// 打印两个对象的引用, 查看是否是同一个对象
System.out.println(lazySingleTon);
System.out.println(LazySingleTon.getInstance());
}
}
那此时难道就没有反制手段了吗? 实际上, 在反射的代码中, 有一个特殊的类型是显式的被排除的, 这个类型就是枚举
那么此时我们就可以通过使用枚举, 来保证我们的单例模式不会被反射打破
不过正常来说, 上面我们实现的那个单例模式已经完全够用了, 因此这里的避免反射的版本我们这里就不实现了, 简单了解一下即可, 感兴趣的可以自行尝试一下.
生产者消费者模型
初识生产者消费者模型
生产者消费者模型, 这个名字听起来似乎有一点高大上, 但是实际上也是一个非常简单的东西. 我们现实中可能会遇到很多类似的情形, 比如我们的一些自助餐厅, 它里面打菜的地方可能比较有限, 那么此时新来的人就会进行排队. 那么此时当前面的人打完了菜后, 就可以让队伍里面的人继续进来打菜. 此时的这个情景就比较类似于一个生产者消费者模型
一般来说, 一个生产者消费者模型会分为三个部分, 一个就是生产者, 一个就是消费者, 还有一个中间的存储空间. 其中消费者就会从存储空间里面, 去取出事物进行处理, 而生产者则是会不断地往存储空间中放入事物.
例如我们上面的那个例子里面, 不断加入队伍的人群就是一个生产者, 队伍就是一个中间存储空间, 打菜的位置就类似于一个消费者.
那么这里讨论到的中间结构, 最常见的一个结构就是阻塞队列, 那么阻塞队列又是什么东西呢?
初识阻塞队列
其实阻塞队列从它的名字, 我们也差不多能看出大概是什么意思. 它实际上就是一种特殊的队列, 不过能够在多线程环境下保证线程安全, 同时具备一定的阻塞功能. 例如
- 如果队列为空还想出, 那么就阻塞等待来等待别的线程给队列塞元素
- 如果队列为满还想入, 那么就阻塞等待来等待别的线程拿队列的元素
而这个阻塞队列, 它最核心的意义就是用于去实现生产者消费者模型
生产者消费者模型的意义
那么使用生产者消费者模型能够有什么优点呢? 其实主要就是两个优点: 1. 解耦合 2. 削峰填谷. 下面我们来依次讨论一下这两个优点
我们现在考虑有一个系统, 它由多台主机共同构成.
假设现在客户端发送出来一些请求, 会先由主机 A 接收到, 然后转发给 B 进行处理. 那么此时它们的耦合程度就是非常高的, 因为一旦 B 这边出现了什么修改, 那么 A 就需要进行对应的修改. 同时, 如果我们还要添加一个主机 C, 此时主机 A 也需要进行修改来适配主机 C.
此时我们就可以引入一个阻塞队列, 去装载请求. 同时, 一般这个阻塞队列会单独的作为一个服务器程序, 也部署到一个主机上面, 此时的这个队列也有一个新的名字, 叫做消息队列.
那么在这样的情况下, 此时主机 A 就只需要关心如何把请求塞到队列里面即可, 而主机 B 也只需要关心如何把请求从队列里面取出即可. 那么即使主机 B 做出改动, 主机 A 也不需要进行任何的变化. 同时, 如果需要添加主机, 也只需要关系和消息队列的交互即可, 没有必要关心和其他主机的交互, 此时就做到了解耦合.
上面我们提到了生产者消费者模型的解耦合优点, 那么接下来我们来看其的另一个优点, 削峰填谷. 我们依旧是看这个图
此时可以看到, 此时 A 是直接把请求塞给 B 的, 那假设我们的 A 接收的请求非常多, 超出了 B 的处理能力, 那么当 A 把所有请求一股脑塞给 B 的时候, B 就可能会因为无法处理而直接崩溃
但是如果使用消息队列作为中间人, 此时队列如果满了, 那么 A 就需要阻塞等待, B 也就可以慢慢处理而不用担心处理不过来然后崩溃
那么此时在请求量高的时候, 消息队列就可以把这个峰值给平下来, 让 B 缓慢处理, 这就是削峰. 随后请求量低下来后, B 就可以一直处理刚刚残留下来的一些请求, 这就是填谷.
虽然这样的做法可能会一定程度上降低请求的处理效率, 但是这样也比系统直接崩溃更好
BlockingQueue
BlockingQueue 就是 Java 中提供的阻塞队列接口, 其中最主要的有两个实现, 分别是 ArrayBlockingQueue 和LinkedBlockingQueue, 实际上就是由数组和链表实现的两个阻塞队列
虽然这个接口继承了 Queue 接口, 但是使用阻塞队列的时候并不推荐使用 Queue 中的方法, 因为它们并没有线程安全和阻塞的特性. 如果要使用阻塞队列, 则应该使用里面特殊的两个方法 put() 和 take(), 分别对应阻塞性的入队列和出队列
下面我们就简单的通过这个阻塞队列来实现一个生产者消费者模型, 具体要求如下:
- 阻塞队列中存储的是 Integer 类型的数字
- 一个线程作为生产者, 间隔一段时间会往阻塞队列里面丢数字
- 一个线程作为消费者, 间隔一段时间会往阻塞队列里面拿数字
public class BlockingQueueDemo {
public static void main(String[] args) {
// 创建一个容量为 10 的阻塞队列
BlockingQueue<Integer> blockingQueue = new ArrayBlockingQueue<>(10);
// 生产者
new Thread(() -> {
for (int i = 0; ; i++) {
try {
blockingQueue.put(i);
} catch (InterruptedException e) {
throw new RuntimeException(e);
}
System.out.println("生产了: " + i);
// 休眠 50 ms
try {
Thread.sleep(50);
} catch (InterruptedException e) {
e.printStackTrace();
}
}
}).start();
new Thread(() -> {
while (true) {
try {
System.out.println("消费了: " + blockingQueue.take());
} catch (InterruptedException e) {
e.printStackTrace();
}
// 休眠 10 s
try {
Thread.sleep(10000);
} catch (InterruptedException e) {
e.printStackTrace();
}
}
}).start();
}
}
很明显, 在上面的代码中, 虽然刚开始生产者生产的特别快, 但是一旦阻塞队列已经慢了, 此时放入元素的操作就会直接阻塞等待, 等待消费者消费完后, 随后再进行生产
阻塞队列模拟实现
接下来, 我们就来实现一个生产者消费者模型的核心部分, 阻塞队列. 那么既然是实现一个基于队列的东西, 自然就需要先实现一个基本的队列. 我们这里就实现一个基于整型数组实现的队列, 具体如何实现这里不进行讲解, 直接看代码
public class MyBlockingQueue {
// 存储数据的数组
private int[] data;
// 队列中实际存储数据的个数
private int size;
// 头指针
private int head;
// 尾指针
private int tail;
public MyBlockingQueue(int capacity) {
data = new int[capacity];
}
public void put(int val){
// 队列已满, 不能添加
if(size == data.length){
throw new ArrayStoreException("队列已满");
}
// 存储, 尾指针后移, size 加一
data[tail++] = val;
size++;
// 如果到达了数组的最后,则重置为 0
if(tail == data.length){
tail = 0;
}
}
public int take(){
// 队列为空, 不能取出
if(size == 0){
throw new ArrayStoreException("队列为空");
}
// 拿到返回值, 头指针后移, size 减一
int ret = data[head++];
size--;
// 如果到达了数组的最后,则重置为 0
if(head == data.length){
head = 0;
}
return ret;
}
}
随后我们就需要保障这两个操作的线程安全, 此时就可以通过加锁操作来说. 但是涉及到加锁就会涉及到一个问题, 锁应该加在哪里?
实际上我们可以看到, 这两个操作无论是哪一个都涉及到了一定的读写操作, 因此我们这里就先粗暴的直接给两个方法整体都加上锁, 也就是在两个方法前面加上 synchronized
public class MyBlockingQueue {
// 存储数据的数组
private int[] data;
// 队列中实际存储数据的个数
private int size;
// 头指针
private int head;
// 尾指针
private int tail;
public MyBlockingQueue(int capacity) {
data = new int[capacity];
}
public synchronized void put(int val){
// 队列已满, 不能添加
if(size == data.length){
throw new ArrayStoreException("队列已满");
}
// 存储, 尾指针后移, size 加一
data[tail++] = val;
size++;
// 如果到达了数组的最后,则重置为 0
if(tail == data.length){
tail = 0;
}
}
public synchronized int take(){
// 队列为空, 不能取出
if(size == 0){
throw new ArrayStoreException("队列为空");
}
// 拿到返回值, 头指针后移, size 减一
int ret = data[head++];
size--;
// 如果到达了数组的最后,则重置为 0
if(head == data.length){
head = 0;
}
return ret;
}
}
当然, 这里也是可以通过内部把所有代码包裹起来, 提供 this 或者是一个锁对象来进行的, 不过这里无比需要保证的是两个锁对象需要是同一个.
此时可能有人要问了: 这样直接就把整个方法进行加锁是否会过于粗暴呢?
实际上, 这里对整个方法进行加锁, 也是后续实现阻塞的一个必要条件, 因此这里我们就先这样做即可, 接下来实现阻塞的时候自然会发现这一点.
那么接下来, 我们就需要通过 wait() 和 notify() 去实现组合和唤醒的效果.
其实时机也很好判断, 我们说过阻塞队列的阻塞时机就两个
- 如果队列为空还想出, 那么就阻塞等待来等待别的线程给队列塞元素
- 如果队列为满还想入, 那么就阻塞等待来等待别的线程拿队列的元素
很明显, 这两个时机分别就对应了我们原先代码中抛出异常的两个位置. 同时, 唤醒的操作我们就直接在每一次操作的最末尾进行即可.
修改后代码如下
public class MyBlockingQueue {
// 存储数据的数组
private int[] data;
// 队列中实际存储数据的个数
private int size;
// 头指针
private int head;
// 尾指针
private int tail;
public MyBlockingQueue(int capacity) {
data = new int[capacity];
}
public synchronized void put(int val) throws InterruptedException {
// 队列已满, 阻塞等待
if(size == data.length){
wait();
}
// 存储, 尾指针后移, size 加一
data[tail++] = val;
size++;
// 如果到达了数组的最后,则重置为 0
if(tail == data.length){
tail = 0;
}
// 唤醒其他线程
notify();
}
public synchronized int take() throws InterruptedException {
// 队列为空, 阻塞等待
if(size == 0){
wait();
}
// 拿到返回值, 头指针后移, size 减一
int ret = data[head++];
size--;
// 如果到达了数组的最后,则重置为 0
if(head == data.length){
head = 0;
}
// 唤醒其他线程
notify();
return ret;
}
}
但是此时我们的阻塞队列并没有完成, 依旧存在问题, 问题就在于这个 wait() 的唤醒时机. 一般情况下我们的wait() 都只能由 notify() 唤醒, 但是有没有意外情况呢?
那这里既然提到了, 肯定就是有的. 假如使用 interrupt() 唤醒, 那么此时 wait() 就会直接抛出异常, 那么此时就是可能会有问题的.
虽然在我们实现的这个方法里面, 是直接抛出异常的, 因此没有什么问题
但是假如使用try-catch处理异常, 并且没有抛出的时候, 就有可能有问题, 如下就是一个通过try-catch处理异常, 并且没有抛出的代码
public class MyBlockingQueue {
// 存储数据的数组
private int[] data;
// 队列中实际存储数据的个数
private int size;
// 头指针
private int head;
// 尾指针
private int tail;
public MyBlockingQueue(int capacity) {
data = new int[capacity];
}
public synchronized void put(int val){
// 队列已满, 阻塞等待
if(size == data.length){
try {
wait();
} catch (InterruptedException e) {}
}
// 存储, 尾指针后移, size 加一
data[tail++] = val;
size++;
// 如果到达了数组的最后,则重置为 0
if(tail == data.length){
tail = 0;
}
// 唤醒其他线程
notify();
}
public synchronized int take() {
// 队列为空, 阻塞等待
if(size == 0){
try {
wait();
} catch (InterruptedException e) {}
}
// 拿到返回值, 头指针后移, size 减一
int ret = data[head++];
size--;
// 如果到达了数组的最后,则重置为 0
if(head == data.length){
head = 0;
}
// 唤醒其他线程
notify();
return ret;
}
}
那么为什么会有问题呢? 实际上是因为触发 interrupt() 后抛出异常, 如果 catch 后不进行任何处理, 此时就会直接执行下面的代码了. 那此时假如是放入操作, 而此时并不是通过 notify() 唤醒, 此时证明队列还是满的, 那么此时我们如果放入数据, 就会直接替换有效的数据, 这很明显是一个严重的问题.
那么为了防止它进行操作, 难道要再检测一次看看这个队列是不是满的吗? 那假如我一直调用 interrupt 呢? 难道要加无数个 if 检测?
既然是一个不确定次数的 if, 不如我们直接使用 while 循环解决问题. 那么我们最终修改的代码如下, 这里虽然我们没有采用 try catch, 不过还是推荐采用 while 循环来处理 wait()
package blockingqueue;
import java.util.NoSuchElementException;
public class MyBlockingQueue {
// 存储数据的数组
private int[] data;
// 队列中实际存储数据的个数
private volatile int size;
// 头指针
private volatile int head;
// 尾指针
private volatile int tail;
public MyBlockingQueue(int capacity) {
data = new int[capacity];
}
public synchronized void put(int val) throws InterruptedException{
// 队列已满, 阻塞等待
while(size == data.length){
wait();
}
// 存储, 尾指针后移, size 加一
data[tail++] = val;
size++;
// 如果到达了数组的最后,则重置为 0
if(tail == data.length){
tail = 0;
}
// 唤醒其他线程
notify();
}
public synchronized int take() throws InterruptedException {
// 队列为空, 阻塞等待
while(size == 0){
wait();
}
// 拿到返回值, 头指针后移, size 减一
int ret = data[head++];
size--;
// 如果到达了数组的最后,则重置为 0
if(head == data.length){
head = 0;
}
// 唤醒其他线程
notify();
return ret;
}
}
实际上在官方文档中, wait() 也被推荐配合 while 使用, 每次唤醒后再进行一次检查, 从而防止中断或者虚假唤醒的情况发生
此时可能有人还看到了, 下面我们的无参 wait(), 实际上是一个 wait(0)
此时可能有人就要问了: 这个 wait() 的参数不是等待时间吗? wait(0) 又是什么操作?
实际上这是 Java 中的一个约定, wait(0) 代表的就是无期限的等待.这一点也会偶尔被作为问题进行提问, 我们简单了解一下即可
回到我们的阻塞队列, 这里实际上还是有一些细节问题需要我们处理. 其中的 size, head 和 tail 这几个变量, 在操作的代码中, 既涉及到了读操作也涉及到了写操作(读写操作对于 CPU 来说是多条指令, 一旦执行次数过多, 可能就会触发编译器的优化机制), 那么此时就推荐将这些成员变量加上 volatile 关键字, 防止内存可见性问题
最终代码如下
public class MyBlockingQueue {
// 存储数据的数组
private int[] data;
// 队列中实际存储数据的个数
private volatile int size;
// 头指针
private volatile int head;
// 尾指针
private volatile int tail;
public MyBlockingQueue(int capacity) {
data = new int[capacity];
}
public synchronized void put(int val){
// 队列已满, 阻塞等待
while(size == data.length){
try {
wait();
} catch (InterruptedException e) {}
}
// 存储, 尾指针后移, size 加一
data[tail++] = val;
size++;
// 如果到达了数组的最后,则重置为 0
if(tail == data.length){
tail = 0;
}
// 唤醒其他线程
notify();
}
public synchronized int take() {
// 队列为空, 阻塞等待
while(size == 0){
try {
wait();
} catch (InterruptedException e) {}
}
// 拿到返回值, 头指针后移, size 减一
int ret = data[head++];
size--;
// 如果到达了数组的最后,则重置为 0
if(head == data.length){
head = 0;
}
// 唤醒其他线程
notify();
return ret;
}
}
这里我们就可以将上面实现的生产者消费者模型中的阻塞队列换成我们自己实现的这个队列, 来看看效果
package blockingqueue;
import java.util.concurrent.ArrayBlockingQueue;
import java.util.concurrent.BlockingQueue;
public class BlockingQueueDemo {
public static void main(String[] args) {
// 创建一个容量为 10 的阻塞队列
MyBlockingQueue blockingQueue = new MyBlockingQueue(10);
// 生产者
new Thread(() -> {
for (int i = 0; ; i++) {
try {
blockingQueue.put(i);
} catch (InterruptedException e) {
throw new RuntimeException(e);
}
System.out.println("生产了: " + i);
// 休眠 500 ms
try {
Thread.sleep(50);
} catch (InterruptedException e) {
e.printStackTrace();
}
}
}).start();
new Thread(() -> {
while (true) {
try {
System.out.println("消费了: " + blockingQueue.take());
} catch (InterruptedException e) {
e.printStackTrace();
}
// 休眠 1 s
try {
Thread.sleep(10000);
} catch (InterruptedException e) {
e.printStackTrace();
}
}
}).start();
}
}
很明显没有任何问题, 生产者消费者模型正常运行
定时器
初识计时器
定时器也很好理解, 就是类似于一个闹钟, 用于指定时间执行逻辑, 它的用途十分的广泛, 尤其是在网络通信中非常常见
举一个例子, 假设有一个客户端向一个服务器发送请求, 但假如一直没有响应, 无限等下去也不现实, 因为我们并不知道这个响应是丢失了还是响应丢失了还是什么其他问题. 此时就可以设定一个等待时间, 然后执行重发一次请求或者是断开连接等操作
初识Timer类
Java 中也提供了一个现成的定时器的, 就是 Timer 类. 接下来我们就来简单使用一下这个类来写一个 Hello World 程序
这里我们先按照这个代码书写即可, 随后我们会对其中的细节进行一些介绍
public class Demo {
public static void main(String[] args) {
// 创建定时器对象
Timer timer = new Timer();
// 设定 2 秒后打印 Hello World
timer.schedule(new TimerTask() {
@Override
public void run() {
System.out.println("Hello World");
}
}, 2000);
System.out.println("程序启动");
}
}
随后尝试运行代码, 我们可以发现这个 Hello World 并没有立即打印, 而是稍微等了一段时间后进行的打印.
此时我们的第一个 Hello World 程序就完成了, 接下来我们就来简单的介绍一下这个代码的组成部分
初识 schedule() 方法
schedule() 方法中, 最核心的部分就是这个 TimerTask 类型的参数. 这个参数实际上是一个实现了 Runnable 接口的抽象类
既然是实现了 Runnable 接口, 里面肯定就有一个 run() 方法, 实际上我们上面书写 Hello World 程序的时候, 重写的就是这个 run() 方法, 它和我们之前写过的 run() 方法没有什么区别, 都是去描述一个任务的.
而上面我们代码, 就是直接在提供参数的时候, 采用了匿名内部类的写法, 创建了一个匿名对象作为参数.
而后面还提供的一个 delay 参数, 这代表的是过多少时间后执行这个 run() 方法. 这里的意思就是过 2000ms 后, 打印 Hello World.
同时, 这个 schedule() 方法还有一些其他的实现, 例如还有一个提供 period 参数的方法
这个 period 指的就是一个执行间隔, 当我们提供 period 后, 这个任务就会变为一个循环执行的任务, 并且执行的间隔时长是 period.
例如我们这里让我们上面的 Hello World 代码变为间隔 5s 打印一次, 那么我们就可以额外提供一个 period 参数
package timer;
import java.util.Timer;
import java.util.TimerTask;
public class Demo {
public static void main(String[] args) {
// 创建定时器对象
Timer timer = new Timer();
// 设定 2s 后打印 Hello World, 5s 打印一次
timer.schedule(new TimerTask() {
@Override
public void run() {
System.out.println("Hello World");
}
}, 2000, 5000);
System.out.println("程序启动");
}
}
执行代码后可以看到, 它就会一直间隔一段时间打印这个 Hello World
简易定时器的实现
思路讲解
接下来我们就来实现一个简单的定时器, 它主要就包括了上面的那个到达指定时间执行任务的方法.
那么既然要实现一个定时器, 我们肯定要知道它的大体实现逻辑.
首先, 我们肯定需要一个东西用来扫描任务是否到时间了, 是否要执行任务, 那这个操作肯定是需要一个线程去执行的. 从上面我们的 Hello World 程序也可以看出, 这个 Timer 肯定自己也有一个线程去执行想要的工作, 和主线程并不是同一个. 因为如果是同一个, 那么这个程序启动就不应该打印出来.
其次, 我们还需要一个存储空间, 去用于存储我们的任务. 并且这个存储结构还要尽可能的对任务进行优先级排列, 因为时间少的我们就要优先级更高, 等会优先执行它. 并且此时我们也不用关心后面的任务, 因为时间最少的任务都没执行, 后面时间更多的也不用着急. 那么很明显, 都提到优先级了, 自然就是可以使用优先级队列来存储
最后我们还需要一个用来描述任务的类, 不然我们的优先级队列也不知道要存什么类型的数据. 实际上这个对应的就是上面的 TimeTask 类.
同时这个类至少应该包含两个信息:1. 执行时间 2. 任务逻辑.
对于这个时间信息, 我们这里就采取绝对时间的方式去存储, 即直接去存储时间戳的方式去实现. 因为这样实现, 我们后续检查任务是否要执行就可以直接看看它和当前时间差多少, 比较容易实现.
代码书写
那么有了大概思路, 我们接下来就来实现代码
首先我们先实现一个类, 用于表示任务, 这个代码还是比较简单的, 直接看代码即可
public class MyTimerTask {
// 执行时间
private long executeTime;
// 任务
private Runnable task;
// 构造方法
public MyTimerTask(long delay, Runnable task) {
this.executeTime = System.currentTimeMillis() + delay;
this.task = task;
}
public long getExecuteTime() {
return executeTime;
}
public Runnable getTask() {
return task;
}
}
接下来就来实现 MyTimer 类的大体框架
public class MyTimer {
private PriorityQueue<MyTimerTask> taskQueue = new PriorityQueue<>();
private Thread thread = new Thread(() -> {
while (true) {
if (taskQueue.isEmpty()) {
// 如果队列为空...
// 这里先不做处理
return;
}
// 拿到第一个任务
MyTimerTask task = taskQueue.peek();
// 查看任务是否已经到时间了
long currentTime = System.currentTimeMillis();
long taskTime = task.getExecuteTime();
// 是就执行并且出队列
if (currentTime >= taskTime) {
taskQueue.poll();
task.getTask().run();
}
}
});
public MyTimer() {
// 启动线程
thread.start();
}
public void schedule(long delay, Runnable task) {
// 创建任务对象, 添加到队列中
taskQueue.add(new MyTimerTask(delay, task));
}
}
这样很明显是有很大的问题的, 首先就是如果队列为空, 那么应该如何处理?
很明显, 如果任务队列为空, 我们的扫描线程就应该等待后续任务的添加, 因此这里需要添加一个 wait(), 同时我们放入任务的位置也需要添加一个 notify() 来唤醒线程.
同时, 由于这个扫描线程和 schedule() 方法都涉及到了对队列的修改操作, 因此为了保证线程安全, 同时也是为了能够使用 wait() 和 notify(), 我们需要给这两个方法加锁.
public class MyTimer {
private PriorityQueue<MyTimerTask> taskQueue = new PriorityQueue<>();
private final Object lock = new Object();
private Thread thread = new Thread(() -> {
synchronized (lock) {
while (true) {
if (taskQueue.isEmpty()) {
// 队列为空, 等待
try {
lock.wait();
} catch (InterruptedException e) {
throw new RuntimeException(e);
}
}
// 拿到第一个任务
MyTimerTask task = taskQueue.peek();
// 查看任务是否已经到时间了
long currentTime = System.currentTimeMillis();
long taskTime = task.getExecuteTime();
// 是就执行并且出队列
if (currentTime >= taskTime) {
taskQueue.poll();
task.getTask().run();
}
}
}
});
public MyTimer() {
// 启动线程
thread.start();
}
public void schedule(long delay, Runnable task) {
synchronized (lock) {
// 创建任务对象, 添加到队列中
taskQueue.add(new MyTimerTask(delay, task));
// 唤醒线程
lock.notify();
}
}
}
此时我们的代码大体完成了, 但是还有一个致命的问题, 就是我们没有给 MyTimerTask 实现 Comparable 接口, 此时 PriorityQueue 就会不知道如何对其进行比较, 因此接下来我们来实现一下
public class MyTimerTask implements Comparable<MyTimerTask>{
// 执行时间
private long executeTime;
// 任务
private Runnable task;
public MyTimerTask(long delay, Runnable task) {
this.executeTime = System.currentTimeMillis() + delay;
this.task = task;
}
public long getExecuteTime() {
return executeTime;
}
public Runnable getTask() {
return task;
}
@Override
public int compareTo(MyTimerTask o) {
// 时间戳小的任务更小
return (int) (this.executeTime - o.executeTime);
}
}
那么接下来, 我们来尝试执行一下下面的这个程序
public class Demo {
public static void main(String[] args) {
// 创建定时器对象
MyTimer timer = new MyTimer();
// 设定 2s, 5s, 7s 后打印 Hello World
timer.schedule(() -> System.out.println("Hello World: 2s"), 2000);
timer.schedule(() -> System.out.println("Hello World: 5s"), 5000);
timer.schedule(() -> System.out.println("Hello World: 1s"), 1000);
System.out.println("程序启动");
}
}
此时会发现, 我们的结果似乎不太对劲, 这是为什么呢?
实际上, 这里涉及到的是加锁的位置的问题, 我们上面是直接粗暴的把锁加在了所有代码外面, 此时就会引发问题. 那么是为什么呢?
我们仔细看一下这一段代码
可以发现, 这段代码除了执行到 wait() 部分会释放锁, 其他部分都会一直持有锁. 那么这会导致什么问题呢? 这就会导致, 如果执行不到 wait(), 那么我们的主线程完全没有办法获取锁, 往队列里面添加元素.
换句话说, 只要你的队列里面有东西, 此时就不会 wait(), 也就无法添加元素. 最终导致我们的队列里面一直只有一个元素.
这肯定不是我们想要的, 我们需要的效果肯定是主线程能够一并的把所有的任务放到里面, 然后按照时间顺序执行. 那么我们如何解决这个 Bug 呢?
其实解决方法有两个:
- 增加主线程获取锁的机会
- 让队首元素不到时间的时候也进入等待
增加主线程获取锁的机会, 实际上做起来也非常简单, 我们直接把这个锁放到 while 循环里面即可
此时我们尝试去运行上面的测试代码, 就可以发现正常执行了
这里是因为我们把 while 循环的判断放到外面后, 此时判断的间隔就允许主线程去获取锁从而添加元素, 此时就可以正常执行了.
虽然上面我们通过了第一个方式, 使得代码可以正常执行了, 不过此时代码并不够好. 我们假设一种情况, 假如我们的任务需要半个小时后执行, 那么此时我们的代码会如何执行呢?
可以看到, 它此时就会一直在这个循环里面空转. 这很明显并不是我们想要的, 那么有没有什么办法可以解决这个问题呢?
实际上这里就需要用到我们针对于上面问题提到的另一个解决方案: 让队首元素不到时间的时候也进入等待
这个也很好理解, 我们在最后这个判断这里, 放一个等待分支, 等待时间就是任务执行时间 - 当前时间
public class MyTimer {
private PriorityQueue<MyTimerTask> taskQueue = new PriorityQueue<>();
private final Object lock = new Object();
private Thread thread = new Thread(() -> {
while (true) {
synchronized (lock) {
if (taskQueue.isEmpty()) {
// 队列为空, 等待
try {
lock.wait();
} catch (InterruptedException e) {
throw new RuntimeException(e);
}
}
// 拿到第一个任务
MyTimerTask task = taskQueue.peek();
// 查看任务是否已经到时间了
long currentTime = System.currentTimeMillis();
long taskTime = task.getExecuteTime();
// 是就执行并且出队列
if (currentTime >= taskTime) {
taskQueue.poll();
task.getTask().run();
}else{
// 没到时间, 等待
try {
lock.wait(taskTime - currentTime);
} catch (InterruptedException e) {
throw new RuntimeException(e);
}
}
}
}
});
public MyTimer() {
// 启动线程
thread.start();
}
public void schedule(Runnable task, long delay) {
synchronized (lock) {
// 创建任务对象, 添加到队列中
taskQueue.add(new MyTimerTask(delay, task));
// 唤醒线程
lock.notify();
}
}
}
此时可能有人就要问了: 你这个等待可不可以通过 sleep() 实现呢?
其实这里通过 wait() 实现是一个更好的选择, 因为我们采用 wait() 时, 如果此时其他线程丢进来一个新的要马上执行的任务, 此时就会调用 notify() 并且唤醒当前线程, 防止执行不到这个需要更快执行的任务.
到这里, 我们的简易计时器就算完成了, 这个计时器的功能还是比较简单的, 并没有包含那种周期执行任务的功能. 我们这里简单介绍一下周期功能的实现.
其实这个功能也是比较简单的, 我们给任务中添加一个字段, 表示执行间隔, 随后在执行任务的时候, 看看这个任务是不是一个周期任务, 如果是的话, 就根据间隔重新生成一个任务并且丢到队列里面即可. 这里就不详细介绍了, 感兴趣的可以自行实现.
线程池
线程池是什么
线程虽然相对于进程来说, 虽然更加轻量, 但是加入进一步提高创建销毁的频率, 线程的开销也就无法忽视. 因此为了更加进一步的追求效率, 就有了使用线程池的这个方案
线程池并不是很难理解, 假设池就是一个临时仓库, 并且我已知我后面可能要用到多个线程, 但是目前先不用, 那么我就可以先创建好几个线程, 然后存到线程池里面, 后续要用的时候就不用创建, 而是直接从线程池拿就行
但是为什么从池里面取的操作会比直接创建效率更高呢?
实际上这个就涉及到了操作系统中, 用户态和内核态的问题. 我们从池子里面取出来, 是在应用程序层面进行的, 也就是用户态进行的, 不需要去调用系统内核. 而如果我们要创建线程, 就需要通过调用系统 API 去让操作系统去进行创建, 此时就涉及到了内核态的操作.
那为什么内核态操作就会相对于用户态更慢呢?
实际上也很好理解, 因为操作系统的内核很忙, 它负责给整个计算机所有的应用程序提供支持, 并不是只负责你一个应用, 既然要干的活多了, 自然速度就会更慢, 同时系统内核也是我们不可控的, 具体它进行什么操作, 我们从应用程序层面没有办法控制, 只能通过操作系统去进行指挥.
而我们的应用程序基本上是只需要去兼顾自身的, 此时自然要干的活更少, 就会更加的快, 同时应用程序是我们自己实现的, 它要干什么活我们说了算, 此时也不会有不可控的情况发生.
Java中的线程池
下面是 Java 中创建线程池的方式, 我们可以通过调用 Executors 里面的静态方法来创建
public class Demo05 {
public static void main(String[] args) {
//创建一个带有缓存的线程池
ExecutorService service1 = Executors.newCachedThreadPool();
//创建一个固定容量的线程池, 需要指定容量
ExecutorService service2 = Executors.newFixedThreadPool(10);
//创建只有一个线程的线程池
ExecutorService service3 = Executors.newSingleThreadExecutor();
//创建一个线程池用于执行定时任务, 需要制定核心线程数(核心线程是什么后面解释)
ExecutorService service4 = Executors.newScheduledThreadPool(20);
}
}
此时我们可以看到, 它的这个类提供了非常多的线程池创建方式. 其中有两个方式允许我们指定相关线程的数量
此时可能有人会有一个问题: 线程池的线程数目应该设定多少合适呢?
实际上这个问题并没有标准答案, 一定是根据具体情景, 进行一次次的性能测试最后得出来的. 那这是为什么呢?
首先一般来说, 我们的代码从核心逻辑上来说可以分为两类: 1. CPU密集型 2. IO密集型
CPU 密集型代码也就是这段代码主要做的是逻辑判断和算数运算这样的工作, 这类工作主要是依靠CPU来进行的, 此时最合适的情况就是, 线程的数量和 CPU 差不多, 因为这样就能够让 CPU 专注于去执行这些运算.
如果此时线程数量过多, 大于了 CPU 核心数目, 那此时调度的开销反而可能导致效率变低, 因此此时线程数不应该超过CPU核心数目. 就类似于张三让李四算几个数学题, 张三还不让李四一次算完一道, 每次做一半就让李四换一题做, 此时效率自然没有一个一个题做更高.
而 IO 密集型的代码就不会那么占用 CPU 资源, 核心工作主要是由 IO 设备来进行, 此时 CPU 主要是一个指挥作用. 那此时就可以让 CPU 进行调度来提高效率, 那么此时线程数就可以大于核心数.
不过上面说了这么多, 但是还是有一个问题. 在实际代码中, 我们怎么知道这两个类型的代码哪个多哪个少呢? 也不可能说所有的代码都是 CPU 密集或者 IO 密集. 因此此时也就只能够去通过一次次实验的方式, 去观察现象从而得出最优的数值.
工厂模式
在上面创建线程池的时候, 我们会发现我们创建对象并不是和传统的一样直接通过 new 来进行, 而是通过了一个什么类的一个方法. 这样设计又是为了什么呢? 有什么优势呢?
实际上, 这个地方也用到了一个经典的设计模式, 这个设计模式就叫做工厂模式.
工厂我们日常生活中也听说过不少, 他核心的作用就是去生产一些东西. 而这里实际上也是差不多的, 主要就是基于一个工厂类, 然后对象的生产操作就会由这些工厂类去进行.
例如我们刚刚的Executors
就是一个工厂类, 其中创建线程池的方法就被叫做工厂方法.
那么为什么要工厂模式呢? 其实因素无非大体就以下几点:
- 解耦合: 将创建对象的过程和具体的代码流程进行分离, 后续修改创建对象的过程的时候只需要修改工厂方法即可.
- 封装简化使用: 假如创建对象的过程非常繁琐, 那么此时通过工厂模式可以提供一个简易的创建方式来创建方法
- 弥补构造方法的缺陷: Java 的构造方法在某些情况无法满足创建对象的需求, 此时可以借助工厂方法来创建对象
其中前面两点还是比较容易理解的, 我们主要看一下第三点, 什么叫做弥补构造方法的缺陷呢? 我们来看下面这个例子, 假如我现在有一个类, 它可以采取两种方式去描述二维坐标的一个点
//这个类用于描述一个点
class Point{
double X;
double Y;
double R;
double A;
//使用平面直角坐标系来描述一个点, X 是横坐标, Y是纵坐标
public Point(double X, double Y){}
//使用极坐标描述一个点, R 是半径, A 是角度
public Point(double R, double A){}
}
但是此时就很明显会发生问题, 因为构造方法的名字必须与类相同, 那么此时写多个构造方法时就一定会构成重载, 而重载又要求这两个方法的方法签名不能一样
但是有时候又会有虽然是构造两个不同意义的对象, 但是参数类型和参数数量却一样的情况发生. 那么此时构造方法就无法满足需求
此时我们就可以去创建一个工厂类, 专门用来生产这个类的对象
假设我们对上述代码进行修改, 写一个工厂类专门用于生产对应的对象
//这个类用于描述一个点
class Point{
double X;
double Y;
double R;
double A;
}
//一个用于生产点对象的类
class PointFactory{
//使用平面直角坐标系来描述一个点
public static Point newPointByXY(double X, double Y){
Point ret = new Point();
ret.X = X;
ret.Y = Y;
return ret;
}
//使用极坐标描述一个点
public static Point newPointByRA(double R, double A){
Point ret = new Point();
ret.R = R;
ret.A = A;
return ret;
}
}
很明显, 此时就不会出现任何问题了, 后续要创建对象的时候, 就直接调用对应的静态方法即可
ThreadPoolExecutor介绍
当我们点击刚刚哪个创建线程池的工厂方法后, 发现其实际上就是创建了一个 ThreadPoolExecutor 对象. 同时还提供了一大堆参数.
很明显, 此时我们就可以看到这个工厂类的一部分优势了, 如果我们要自己手动创建线程池, 还需要去提供一大堆的参数, 但是我们通过一个工厂方法, 就可以非常简单的创建出一个线程池
同时, 我们继续进入构造方法, 会发现其实虽然有很多构造方法, 它们的内部都是指向了同一个参数最复杂的方法, 没有提供的参数它都会直接提供一个默认值过去
接下来我们就来了解一下这个完全体构造方法的各个参数分别代表着的是什么意思
- corePoolSize: 核心线程数, 核心线程数也可以理解为是最少线程数, 也就是这个线程池最少都会有几个线程, 不会低于这个值, 并且核心线程是不会被清除的, 但是非核心线程如果空闲一段时间就会被销毁
- maximumPoolSize: 最大线程数, 也就是允许容纳的最大线程数
- keepAliveTime: 空闲时间, 非核心线程允许空闲的最大时间, 空闲超过这个时间那么非核心线程就会被销毁
- TimeUnit: 时间单位, 用于作为上面这个空闲时间的单位的
- BlockingQueue<Runnable>: 用于存储任务的阻塞队列, 可以根据需求选择不同的阻塞队列.
- 如果需要优先级, 选择PriorityBlockingQueue.
- 如果任务数量相对恒定且不需要优先级, 使用ArrayBlockingQueue.
- 如果任务数量变动大且不需要优先级, LinkedBlockingQueue
- ThreadFactory: 也是工厂模式的体现, 是用于生产线程的工厂类, 可以更方便的设置线程的属性
- RejectedExecutionHandler: 拒绝策略, 线程池中的任务数量是有上限的. 这个策略就是用于说明如果任务数量已经达到上限, 此时还往此线程池输入任务, 线程池会如何操作. 这里主要有四种策略
- ThreadPoolExecutor.AbortPolicy: 直接抛出异常
- ThreadPoolExecutor.CallerRunsPolicy: 新添加的任务由布置任务的线程负责执行
- ThreadPoolExecutor.DiscardOldestPolicy: 将队列中最老的任务丢了, 把新任务塞进去
- ThreadPoolExecutor.DiscardPolicy: 直接把这个新任务丢了
举一个例子, 假如现在李四已经要忙疯了, 干不过来了, 此时张三还想给他分配任务. 下面是李四分别采取各个策略的情况
- AbortPolicy: 李四就直接坐地上直接开始哇哇大哭, 直接开始吐槽"我都累成这样了, 你还给我加活?".
- CallerRunsPolicy: 李四把这个活还给张三, 让张三自己干.
- DiscardOldestPolicy: 李四就把自己任务里面的最早安排的任务给放弃, 接下这个新的活.
- DiscardPolicy: 李四直接忽略这个活, 张三也不会管这个活了, 这个活就彻底没人干了, 直接丢了.
实现一个简单的线程池
我们就简单的实现一个线程池, 原理也是比较简单的:
- 创建任务存储的地方: 这里直接采取线程的阻塞队列实现
- 创建多个线程: 直接在构造方法中创建并且启动即可
- 让线程去跑任务: 在创建线程的时候, 指定它们的工作是从阻塞队列中取出任务, 并且执行即可
public class MyThreadPool {
// 存放任务的地方
BlockingQueue<Runnable> taskQueue = new ArrayBlockingQueue<>(10);
public MyThreadPool(int poolSize) {
// 创建10个线程
for (int i = 0; i < poolSize; i++) {
Thread thread = new Thread(() -> {
while (true) {
try {
// 从任务队列中取出一个任务
Runnable task = taskQueue.take();
// 执行任务
task.run();
} catch (InterruptedException e) {
e.printStackTrace();
}
}
});
thread.start();
}
}
// 添加任务的方法
public void submit(Runnable task) throws InterruptedException {
taskQueue.put(task);
}
}