多线程初阶
- 认识线程
- 线程的概念
- 线程和进程的区别
- Java 的线程 和 操作系统线程 的关系
- 创建线程
- 方法1 继承Thread类
- 方法2 实现Runnable接口
- 其他变形
- 匿名内部类创建Thread子类对象
- 匿名内部类创建Runnable子类对象
- lambda表达式创建Runnable子类对象
- Thread类及常见方法
- Thread的常见构造方法
- Thread的常见属性
- ID
- 优先级
- 后台线程
- 启动一个线程
- 终止一个线程
- 方式一:手动设定标志位
- 方式二:使用 Thread.interrupted() 或者 Thread.currentThread().isInterrupted() 代替自定义标志位.
- 等待一个线程
- 线程的状态
- 线程安全
- 线程不安全的案例
- 线程不安全的原因
- 修改共享数据
- 可见性
- 指令重排序
- synchronized关键字
- synchronized的特性
- 互斥
- 刷新内存
- 可重入
- synchronized使用案例
- 修饰普通方法
- 修饰静态方法
- 修饰代码块
- volatile关键字
- volatile能保证内存可见性
- volatile不保证原子性
- synchronized也能保证内存可见性
- wait和notify
- wait
- notify
- notifyAll
- wait和sleep对比
- 多线程案例
- 单例模式
- 饿汉式
- 懒汉式-单线程
- 懒汉模式-多线程
- 懒汉模式-多线程改进版
- 阻塞队列
- 生产者消费者模型
- 标准库中的阻塞队列
- 生产者消费者模型
- 实现阻塞队列
- 定时器
- 标准库中的定时器
- 模拟实现定时器
- 线程池
- 标准库中的线程池
- ThreadPoolExecutor
- 构造方法
- 实现线程池
认识线程
线程的概念
线程是操作系统调度的最小执行单位,是进程中的一个实体。一个进程可以包含多个线程,这些线程共享进程的资源,包括内存空间、文件句柄等。
线程具有以下特点:
并发执行:多个线程在同一时间段内可以并发执行,从而提高了系统的资源利用率和响应速度。
共享资源:线程之间共享进程的资源,可以直接访问进程的内存空间、文件、网络连接等。这使得线程之间通信和数据共享更加方便快捷。
轻量级:相对于进程来说,线程的创建、切换和销毁的开销较小,因为线程之间不需要独立的内存空间。
独立调度:线程是操作系统调度的最小执行单位,不同的线程可以有不同的优先级,操作系统可以对线程进行调度,以满足各个线程的运行需求。
同步与互斥:多个线程共享资源时,可能会出现竞态条件(Race
Condition)等问题,需要使用同步机制(如互斥锁、信号量等)来保证线程之间的正确协作。线程在多核处理器上可以实现真正的并行执行,不同的线程可以在不同的核上同时执行,从而进一步提高系统的性能。
线程广泛应用于操作系统、服务器、多线程编程等领域,可以有效地提高程序的并发能力和响应速度。但是在多线程编程中,也需要注意线程安全、死锁、资源竞争等问题,以保证多线程的正确性和稳定性。
线程和进程的区别
线程(Thread)和进程(Process)是操作系统中的两个核心概念,它们有以下区别:
定义:进程是操作系统中资源分配的基本单位,是一个独立的执行环境;而线程是进程内的一个独立执行单元,是进程的实际工作单位。
资源占用:每个进程都拥有独立的内存空间、文件句柄等系统资源,进程间的资源相互独立;而多个线程共享所属进程的资源,包括内存、打开的文件、网络连接等。
创建和销毁开销:创建或销毁进程的开销较大,需要分配和释放独立的内存空间等;而线程的创建或销毁开销较小,只需在进程内调整一些数据结构即可。
并发性:多个进程之间是并发执行的,在多核处理器上可以实现真正的并行执行;而多个线程共享所属进程的执行环境,但在单核处理器上多个线程只能通过时间片轮转方式实现并发。
通信和同步:进程间通信(IPC)的机制比较复杂,如管道、消息队列、共享内存等;而线程之间通信更加简便,可以直接读写共享内存、使用互斥锁、条件变量等同步机制。
安全性:进程之间相互隔离,一个进程崩溃不会影响其他进程的运行;而多线程共享同一进程的资源,一个线程的错误可能导致整个进程崩溃。
综上所述,进程和线程在资源占用、创建销毁开销、并发性、通信和同步等方面存在差异。选择进程还是线程取决于具体应用场景和需求,需要综合考虑资源利用、并发性能和系统安全等因素。
Java 的线程 和 操作系统线程 的关系
线程是操作系统中的概念. 操作系统内核实现了线程这样的机制, 并且对用户层提供了一些 API 供用户使用(例如 Linux 的 pthread 库).
Java 标准库中 Thread 类可以视为是对操作系统提供的 API 进行了进一步的抽象和封装.
创建线程
方法1 继承Thread类
**创建一个子类,这个子类继承自Thread
调用start方法启动线程 **
class MyThread extends Thread {
@Override
public void run() {
System.out.println("MyThread");
}
}
public class demo1 {
public static void main(String[] args) {
MyThread t = new MyThread();
t.start();
}
}
方法2 实现Runnable接口
实现Runnable接口,创建Thread类实例,
调用Thread的构造方法时将Runnable对象作为参数,
调用start方法
class MyThread2 implements Runnable {
@Override
public void run() {
System.out.println("MyThread2");
}
}
public class Demo2 {
public static void main(String[] args) {
Thread t2 = new Thread(new MyThread2());
t2.start();
}
}
其他变形
匿名内部类创建Thread子类对象
public class Demo3 {
public static void main(String[] args) {
Thread t3 = new Thread(){
@Override
public void run() {
System.out.println("线程创建成功");
}
};
t3.start();
}
}
匿名内部类创建Runnable子类对象
public class Demo4 {
public static void main(String[] args) {
Thread t4 = new Thread(new Runnable() {
@Override
public void run() {
System.out.println("线程创建成功");
}
});
t4.start();
}
}
lambda表达式创建Runnable子类对象
public class Demo5 {
public static void main(String[] args) {
Thread t5 = new Thread(() -> {
System.out.printf("使用lambda创建线程");
});
t5.start();
}
}
Thread类及常见方法
Thread的常见构造方法
方法 | 说明 |
---|---|
Thread() | 创建线程对象 |
Thread(Runnable target) | 使用 Runnable 对象创建线程对象 |
Thread(String name) | 创建线程对象,并命名 |
Thread(Runnable target, String name) | 使用 Runnable 对象创建线程对象,并命名 |
Thread(ThreadGroup group,Runnable target) | 线程可以被用来分组管理,分好的组即为线程组 |
Thread的常见属性
属性 | 获取方法 |
---|---|
ID | getId() |
名称 | getName() |
状态 | getState() |
优先级 | getPriority() |
是否后台线程 | isDaemon() |
是否存活 | isAlive() |
是否被中断 | isInterrupter() |
ID
ID是一个线程的身份标识
一个人可以有好多个名字,一个线程也可以有好几个身份标识 (JVM有一个身份标识,pthread库也有一个线程的身份标识,内核里针对线程的pcb还有身份标识),他们都是相互独立的
优先级
通过getPriority()/setPricrity()方法来设置线程的优先级
但是作用不是很大,因为线程的调度主要是由系统的内核来负责的,而系统内核的调度速度实在是太快
后台线程
后台线程/守护线程 后台线程不影响进程结束
前台线程 前台线程会影响到进程的结束,如果前台线程没有执行完,线程是不会结束的
一个进程中所有的前台线程都执行完,退出了,即使后台线程还没有运行结束,也会随着线程一起退出
启动一个线程
Java中使用start()方法启动一个线程
我们通过重写run方法创建一个线程对象,但是线程被创建出来并不意味着线程就开始运行了 run方法中写的逻辑相当于提供给线程一个指令清单 而调用start方法才是真正在操作系统底层创建出一个线程,线程才真正独立去运行了
调用start方法会在系统中真正的创建出线程,系统内核完成先创建出PCB,再把PCB加入到对应的链表中的操作
终止一个线程
一个线程的run方法执行完毕,就算终止了,此处的终止线程,就是想办法,让run方法能够尽快的执行完毕
方式一:手动设定标志位
使用标志位:在线程内部定义一个标志位,通常是一个布尔类型的变量。线程会定期检查这个标志位,如果标志位被设置为指定的终止条件,线程就会自行终止执行。
public class Demo6 {
public static boolean isQuit = false;
public static void main(String[] args) throws InterruptedException {
Thread t = new Thread(() -> {
while (!isQuit){
System.out.println("线程正在运行");
try {
Thread.sleep(1000);
} catch (InterruptedException e) {
throw new RuntimeException(e);
}
}
});
t.start();
//sleep模拟主线程中要执行的逻辑
Thread.sleep(3000);
//此时想要终止t线程,就把标志位改为true,使t线程被终止
isQuit = true;
System.out.println("t线程被终止");
}
}
如果我们把标志位放在主线程中 代码还能正常运行吗?
答案是会出现报错
通过报错信息可以看到是因为lambda使用的标志位应该是final修饰或者实际上是final修饰的
也就是说lambda表达式使用的必须是一个常量,这里就涉及到了lambda表达式的变量捕获问题
lambda可以捕获到外面的变量,lambda表达式的执行时机也是比较靠后的,这就导致如果真正在执行lambda表达式时,万一isQuit标志位被销毁,此时就会出现错误,这种情况是客观存在的,让lambda表达式去访问一个已经被销毁的变量,明显是不合适的
此时lambda引入了"变量捕获"这样的机制,表面上是在lambda表达式中直接访问外部的变量,其实本质上是把外部的变量给复制了一份到lambda表达式中,这样就可以解决刚才的生命周期的问题
但是变量捕获这里的限制是要求捕获的变量必须是final修饰的(或者是在整个代码中没有进行修改的),原因是Java是通过复制的方式进行变量捕获,此时如果外面的代码逻辑对这个变量进行修改,就会出现lambda表达式中的值和外部的值不一样的情况,容易出现歧义
方式二:使用 Thread.interrupted() 或者 Thread.currentThread().isInterrupted() 代替自定义标志位.
Thread 内部包含了一个 boolean 类型的变量作为线程是否被中断的标记.
方法 | 说明 |
---|---|
public void interrupt() | 中断对象关联的线程,如果线程正在阻塞,则以异常方式通知,否则设置标志位 |
public static booleaninterrupted() | 判断当前线程的中断标志位是否设置,调用后清除标志位 |
public booleanisInterrupted() | 判断对象关联的线程的标志位是否设置,调用后不清除标志位 |
public class Demo7 {
public static void main(String[] args) throws InterruptedException {
Thread t = new Thread(()->{
//通过Thread.currentThread()获取到当前线程,在调用isInterruptted()方法判断标志位
while(!Thread.currentThread().isInterrupted()){
System.out.println("Thread");
try {
Thread.sleep(1000);
} catch (InterruptedException e) {
e.printStackTrace();
}
}
});
t.start();
Thread.sleep(3000);
t.interrupt();
}
}
运行上述代码会发现一个问题
在这里会抛出一个异常,表示线程正在sleep被唤醒,此时就是因为我们调用的interrupt()方法
在线程正在sleep的过程中,其他线程调用interrupt方法,就会强制使sleep抛出一个异常,sleep就会被立即唤醒(无论设置的sleep时间是否达到)
但是sleep在被唤醒的同时,会自动清除前面设置的标志位,这样就留下更多的操作空间
- 立即停止循环,立即结束线程(在catch中加break);
- 继续做一些事情,过一会儿在结束线程(catch中执行别的逻辑,执行完了在break)
- 忽略终止的请求,继续进行循环(不加break)
等待一个线程
等待一个线程用到join方法
多个线程是并发执行的,具体的执行过程是由操作系统负责调度的,而操作系统的调度线程的过程是随机的,此时无法确定线程执行的先后顺序,而我们更喜欢确定的顺序而不是随机调度
等待线程就是一种规划线程结束顺序的手段,假设有A,B两个线程,我们此时希望B先结束而A后结束,此时就可以让A线程中调用B.join()方法,此时B线程还没执行完,A线程就会进入阻塞状态,给B线程留下执行时间,B执行完毕后,A再从阻塞状态中恢复回来,并且继续往后执行,如果A执行到B.join()的时候,B已经执行完了,A就不必阻塞,直接执行即可
阻塞让代码暂时不继续执行了,让线程暂时不去cpu上参与调度
sleep也能将线程阻塞,但是是有时间限制的,而join的阻塞,则是无休止的等待,但是我们也能给join的等待设置一个等待时间,只要时间到达,无论是否完成,都不进行等待
注意join也是可以被interrupt唤醒的
线程的状态
线程的状态是一个枚举类型
在pcb字段中,系统设定的状态有就绪状态和阻塞状态,在Java中,把上述状态又作出进一步细分了
NEW: 安排了工作, 还未开始行动
RUNNABLE: 可工作的. 又可以分成正在工作中和即将开始工作.
BLOCKED: 这几个都表示排队等着其他事情
WAITING: 这几个都表示排队等着其他事情
TIMED_WAITING: 这几个都表示排队等着其他事情
TERMINATED: 工作完成了.
线程安全
想给出一个线程安全的确切定义是复杂的,但我们可以这样认为:
如果多线程环境下代码运行的结果是符合我们预期的,即在单线程环境应该的结果,则说这个程序是线
程安全的。
线程不安全的案例
class Counter {
public int count;
public void increase() {
count++;
}
}
public class Demo8 {
public static void main(String[] args) throws InterruptedException {
Counter counter = new Counter();
Thread t1 = new Thread(()->{
for (int i = 0; i < 50000; i++) {
counter.increase();
}
});
Thread t2 = new Thread(()->{
for (int i = 0; i < 50000; i++) {
counter.increase();
}
});
t1.start();
t2.start();
t1.join();
t2.join();
System.out.println(counter.count);
}
}
这段代码是一个使用多线程进行计数的示例。其中,Counter 类表示一个计数器,具有一个 count 成员变量和一个 increase() 方法用于增加计数。
在 Demo8 主类中,创建了两个线程 t1 和 t2,分别通过 Lambda 表达式定义了它们的执行逻辑。这两个线程同时对同一个 Counter 对象的 count 变量进行自增操作,每个线程增加 50000 次。
接着,调用 t1.start() 和 t2.start() 启动这两个线程并发执行。然后,通过调用 t1.join() 和 t2.join(),主线程会等待 t1 和 t2 执行完毕后再继续执行。
最后,打印出 counter.count 的值,即两个线程共同增加的计数结果。
但是观察运行结果,并没有出现我们预想的100000的结果,而是小于100000的一个数字,这里就出现了线程安全问题
上述代码出现线程不安全主要是因为count++这一个操作,这个操作本质上是通过三个步骤
- 把内存中的数据,加载到CPU的寄存器中(load)
- 把寄存器中的数据进行+1(add)
3.把寄存器中的数据协会到内存中(save)
如果上述的操作,在两个线程,或者更多个线程并发执行的情况下,就可能会出现问题
在5w的循环过程中,有多少次这两个线程执行++是串行的,有多少次会出现覆盖结果的是不确定的,因为线程的调度是随机的,是抢占式的执行过程,所以每次的运行结果都不同
线程不安全的原因
- 根本原因是: 多个线程之间的调度是随机的,操作系统使用抢占式执行的策略来调度线程
- 多个线程同时修改同一个变量,容易产生线程安全问题
- 进行的修改不是原子的
- 内存可见性引起的线程安全问题
- 指令重排序引起的线程安全问题
修改共享数据
上面的线程不安全的代码中, 涉及到多个线程针对 counter.count 变量进行修改. 此时这个 counter.count 是一个多个线程都能访问到的 “共享数据”
counter.count 这个变量就是在堆上. 因此可以被多个线程共享访问.
可见性
可见性指, 一个线程对共享变量值的修改,能够及时地被其他线程看到.
Java 内存模型 (JMM): Java虚拟机规范中定义了Java内存模型.
目的是屏蔽掉各种硬件和操作系统的内存访问差异,以实现让Java程序在各种平台下都能达到一致的并发效果
线程之间的共享变量存在 主内存 (Main Memory).
每一个线程都有自己的 “工作内存” (Working Memory) .
当线程要读取一个共享变量的时候, 会先把变量从主内存拷贝到工作内存, 再从工作内存读取数据.
当线程要修改一个共享变量的时候, 也会先修改工作内存中的副本, 再同步回主内存.
由于每个线程有自己的工作内存, 这些工作内存中的内容相当于同一个共享变量的 “副本”. 此时修改线程1 的工作内存中的值, 线程2 的工作内存不一定会及时变化
指令重排序
一段代码是这样的:
- 去前台取下 U 盘
- 去教室写 10 分钟作业
- 去前台取下快递
如果是在单线程情况下,JVM、CPU指令集会对其进行优化,比如,按 1->3->2的方式执行,也是没问
题,可以少跑一次前台。这种叫做指令重排序
编译器对于指令重排序的前提是 “保持逻辑不发生变化”. 这一点在单线程环境下比较容易判断, 但 是在多线程环境下就没那么容易了,
多线程的代码执行复杂程度更高, 编译器很难在编译阶段对代 码的执行效果进行预测, 因此激进的重排序很容易导致优化后的逻辑和之前不等价.
synchronized关键字
synchronized的特性
互斥
synchronized 会起到互斥效果, 某个线程执行到某个对象的 synchronized 中时, 其他线程如果也执行到同一个对象 synchronized 就会阻塞等待.
进入 synchronized 修饰的代码块, 相当于 加锁 退出 synchronized 修饰的代码块, 相当于 解锁
刷新内存
synchronized 的工作过程:
- 获得互斥锁
- 从主内存拷贝变量的最新副本到工作的内存
- 执行代码
- 将更改后的共享变量的值刷新到主内存
- 释放互斥锁
所以 synchronized 也能保证内存可见性.
可重入
synchronized 同步块对同一条线程来说是可重入的,不会出现自己把自己锁死的问题;
// 第一次加锁, 加锁成功
lock();
// 第二次加锁, 锁已经被占用, 阻塞等待.
lock();
Java 中的 synchronized 是 可重入锁, 因此没有上面的问题.
synchronized使用案例
synchronized 本质上要修改指定对象的 “对象头”. 从使用角度来看, synchronized 也势必要搭配一个具体的对象来使用.
修饰普通方法
public class SynchronizedDemo {
public synchronized void methond() {
}
}
修饰静态方法
public class SynchronizedDemo {
public synchronized static void method() {
}
}
修饰代码块
锁当前对象
public class SynchronizedDemo {
public void method() {
synchronized (this) {
}
}
}
锁类对象
public class SynchronizedDemo {
public void method() {
synchronized (SynchronizedDemo.class) {
}
}
}
两个线程竞争同一把锁, 才会产生阻塞等待.
两个线程分别尝试获取两把不同的锁, 不会产生竞争.
volatile关键字
volatile能保证内存可见性
volatile 修饰的变量, 能够保证 “内存可见性”.
代码再写入volatile修饰的变量时
- 改变线程工作内存中volatile变量副本的值
- 将改变后的副本的值从工作内存刷新到主内存
代码在读取volatile修饰的变量的时候
- 从主内存中读取volatile变量的最新值到线程的工作内存中
- 从共内存中读取volatile变量的副本
直接访问工作内存(CPU寄存器或者CPU的缓存),速度非常快,但是可能出现数据不一致的情况,加上volatile,强制读写内存,虽然速度变慢,但是数据变得更准确了
举例说明
public class Demo9 {
public static int flag = 0;
public static void main(String[] args) {
Thread t1 = new Thread(()->{
while (flag == 0){
}
System.out.println("循环结束");
});
Thread t2 = new Thread(()->{
Scanner scanner = new Scanner(System.in);
System.out.println("输入一个整数");
flag = scanner.nextInt();
});
t1.start();
t2.start();
}
}
这段代码是一个简单的多线程示例,其中包括了主线程、t1 线程和 t2 线程。
在主类 Demo9 中,定义了一个静态变量 flag,初始值为 0。然后创建了两个线程 t1 和 t2。
t1 线程通过 Lambda 表达式定义了它的执行逻辑。在循环中,它不断地检查 flag 的值,直到 flag 的值发生改变,才会跳出循环并打印出 “循环结束”。
t2 线程同样使用 Lambda 表达式定义了它的执行逻辑。它通过 Scanner 获取用户输入的整数,并将该整数赋值给 flag 变量。
接下来,通过调用 t1.start() 和 t2.start() 启动这两个线程并发执行。
按照代码逻辑我们应该在控制台输入非0的数据,即可将线程t1中的循环终止
但是运行发现线程t1并没有结束运行,这里就出现了内存可见性问题
因为t1读取的是自己工作内存中的数据,当t2对flag变量进行修改,此时t1感知不到flag的变化
此时如果给flag加上volatile 则可以解决问题
volatile不保证原子性
不保证原子性(Atomicity):volatile 并不能确保对变量的复合操作是原子的。如果多个线程同时对同一个 volatile 变量进行复合操作,还是需要额外的同步机制来保证线程安全。
synchronized也能保证内存可见性
synchronized 既能保证原子性, 也能保证内存可见性.
对上面的代码进行调整:
去掉 flag 的 volatile
给 t1 的循环内部加上 synchronized, 并借助 counter 对象加锁.
public class Demo9 {
public static int flag = 0;
public static void main(String[] args) {
Object locker = new Object();
Thread t1 = new Thread(() -> {
while (true) {
synchronized (locker) {
if (flag != 0) {
break;
}
}
}
System.out.println("循环结束");
});
Thread t2 = new Thread(() -> {
Scanner scanner = new Scanner(System.in);
System.out.println("输入一个整数");
flag = scanner.nextInt();
});
t1.start();
t2.start();
}
}
wait和notify
wait() 和 notify() 是 Java 中用于线程间通信的两个方法,常与 synchronized 关键字一起使用。
由于线程之间是抢占式执行的, 因此线程之间执行的先后顺序难以预知.
但是实际开发中有时候我们希望合理的协调多个线程之间的执行先后顺序.
完成这个协调工作, 主要涉及到三个方法
wait() / wait(long timeout): 让当前线程进入等待状态. notify() / notifyAll():
唤醒在当前对象上等待的线程.
wait
wait 做的事情:
- 使当前执行代码的线程进行等待. (把线程放到等待队列中)
- 释放当前的锁
- 满足一定条件时被唤醒, 重新尝试获取这个锁.
wait要搭配synchronized使用,脱离synchronized使用wait会直接抛出异常
wait结束等待的条件
- 其他线程调用该对象的 notify 方法.
- wait 等待时间超时 (wait 方法提供一个带有 timeout 参数的版本, 来指定等待时间).
- 其他线程调用该等待线程的 interrupted 方法, 导致 wait 抛出 InterruptedException 异常.
notify
notify 方法是唤醒等待的线程.
-
方法notify()也要在同步方法或同步块中调用,该方法是用来通知那些可能等待该对象的对象锁的其它线程,对其发出通知notify,并使它们重新获取该对象的对象锁。
-
如果有多个线程等待,则有线程调度器随机挑选出一个呈 wait 状态的线程。(并没有 “先来后到”)
-
在notify()方法后,当前线程不会马上释放该对象锁,要等到执行notify()方法的线程将程序执行完,也就是退出同步代码块之后才会释放对象锁。
notifyAll
notify方法只是唤醒某一个等待线程. 使用notifyAll方法可以一次唤醒所有的等待线程.
wait和sleep对比
其实理论上 wait 和 sleep 完全是没有可比性的,因为一个是用于线程之间的通信的,一个是让线程阻
塞一段时间,
唯一的相同点就是都可以让线程放弃执行一段时间.
当然为了面试的目的,我们还是总结下:
- wait 需要搭配 synchronized 使用. sleep 不需要.
- wait 是 Object 的方法 sleep 是 Thread 的静态方法.
多线程案例
单例模式
饿汉式
class Singleton{
private static Singleton singleton = new Singleton();
private Singleton(){
}
public static Singleton getInstance(){
return singleton;
}
}
懒汉式-单线程
class Singleton{
private static Singleton singleton = null;
private Singleton(){
}
public static Singleton getInstance(){
if (singleton == null) {
singleton = new Singleton();
}
return singleton;
}
}
懒汉模式-多线程
单线程懒汉模式是线程不安全的
线程安全问题发生在首次创建实例时,如果在多个线程中同时调用getInstance方法,就可能会导致创建出多个实例
我们可以给getInstance方法加synchronized来解决线程安全问题
class Singleton{
private static Singleton singleton = null;
private Singleton(){
}
public synchronized static Singleton getInstance(){
if (singleton == null) {
singleton = new Singleton();
}
return singleton;
}
}
懒汉模式-多线程改进版
加锁解锁操作是一件开销比较高的事情,而饿汉模式的线程不安全只是发生在首次创建实例的时候,在后续调用getInstance方法时,就可以不进行加锁操作
class Singleton {
//防止出现内存可见性问题,在此处加volatile防止内存可见性问题
private volatile static Singleton singleton = null;
private Singleton() {
}
public static Singleton getInstance() {
//第一个if判断此时是否有实例,优化加锁操作
if (singleton == null)
synchronized (Singleton.class) {
//第二个if防止其他线程修改这个实例,进行一个二次检查
if (singleton == null) {
singleton = new Singleton();
}
}
return singleton;
}
}
阻塞队列
阻塞队列是一种特殊的队列. 也遵守 “先进先出” 的原则.
阻塞队列能是一种线程安全的数据结构, 并且具有以下特性:
当队列满的时候, 继续入队列就会阻塞, 直到有其他线程从队列中取走元素.
当队列空的时候, 继续出队列也会阻塞, 直到有其他线程往队列中插入元素.
阻塞队列的一个典型应用场景就是 “生产者消费者模型”. 这是一种非常典型的开发模型.
生产者消费者模型
生产者消费者模式就是通过一个容器来解决生产者和消费者的强耦合问题。
生产者和消费者彼此之间不直接通讯,而通过阻塞队列来进行通讯,所以生产者生产完数据之后不用等待消费者处理,直接扔给阻塞队列,消费者不找生产者要数据,而是直接从阻塞队列里取.
生产者-消费者模型是一种常见的多线程编程模型,用于解决生产者和消费者之间的数据共享与同步问题。它包括以下几个角色和操作:
生产者(Producer):负责生成数据,并将数据放入共享的缓冲区中。
消费者(Consumer):负责从共享的缓冲区中取出数据,并进行消费处理。
缓冲区(Buffer):用于存储待处理的数据,生产者将数据放入缓冲区,消费者从缓冲区中取出数据。
此处的阻塞队列就相当于一个缓冲区,平衡了生产者和消费者的处理能力,阻塞队列也能使生产者和消费者之间解耦合
阻塞队列本质上就是一个队列,只不过比普通队列多出一个阻塞功能
所谓阻塞功能是指:
当队列满时,继续入队列,就会出现阻塞,阻塞到其他线程从队列中取走元素为止
当队列空时,继续出队列,也会出现堵塞,堵塞到其他线程往队列中添加元素为止
生产者-消费者模型具有以下优势:
-
解耦和灵活性:生产者和消费者之间通过共享的缓冲区进行通信,彼此之间解耦,可以独立地进行操作。这种解耦性使得生产者和消费者可以以不同的速度进行工作,并且可以动态调整生产者和消费者的数量,从而提供更好的灵活性。
-
同步和互斥:生产者-消费者模型提供了同步和互斥机制,确保在多线程环境下正确处理共享数据。通过使用锁或其他同步机制,生产者和消费者可以协调访问共享的缓冲区,在确保数据完整性和一致性的同时避免竞争条件和数据冲突。
-
缓冲能力:生产者-消费者模型中的缓冲区充当了一个临时存储区域,可以缓冲一定量的数据。这样可以平衡生产者和消费者之间的速度差异,当生产者速度快于消费者时,缓冲区可以存储更多数据,反之亦然。
-
提高效率:通过同时运行多个生产者和消费者线程,并允许它们并行执行,生产者-消费者模型可以充分利用系统的多核处理能力,提高整体的处理效率和吞吐量。
-
易于扩展:生产者-消费者模型可以轻松扩展到更复杂的情况,例如多个生产者和多个消费者之间的数据共享和同步,或者在分布式系统中使用消息队列等更高级的技术来实现生产者和消费者之间的通信。
总而言之,生产者-消费者模型是一种可靠且有效的多线程编程模型,可以在多个线程之间实现数据共享和同步,提高系统的并发性和处理能力。它在很多场景下都能发挥重要的作用,如任务调度、消息传递、事件处理等。
标准库中的阻塞队列
在 Java 标准库中内置了阻塞队列. 如果我们需要在一些程序中使用阻塞队列, 直接使用标准库中的即可.
- BlockingQueue 是一个接口. 真正实现的类是 LinkedBlockingQueue.
- put 方法用于阻塞式的入队列, take 用于阻塞式的出队列.
- BlockingQueue 也有 offer, poll, peek 等方法, 但是这些方法不带有阻塞特性.
生产者消费者模型
public class Demo11{
public static void main(String[] args) throws InterruptedException {
BlockingQueue<Integer> blockingQueue = new LinkedBlockingQueue<Integer>();
Thread customer = new Thread(()->{
while (true){
int val = 0;
try {
val = blockingQueue.take();
System.out.println("消费元素"+val);
} catch (InterruptedException e) {
throw new RuntimeException(e);
}
}
});
Thread producer = new Thread(()->{
for (int i = 0; i < 50; i++) {
try {
int num = i;
System.out.println("生产一个元素" + num);
blockingQueue.put(num);
Thread.sleep(1000);
} catch (InterruptedException e) {
throw new RuntimeException(e);
}
}
});
customer.start();
producer.start();
customer.join();
producer.join();
}
}
上述是实现了一个生产者消费者模型,
可以发现我们只在生产者中加入了sleep操作,而消费者中没有加sleep 但是根据结果可以看出来当队列中没有元素时,消费者会阻塞等待,直到新的元素被加入到队列
实现阻塞队列
通过"循环队列"的方式来实现
使用synchronized进行加锁控制
put插入元素时,判断队列是否满,如果满进行wait(要在循环中进行wait,被唤醒时如果正好另一个线程插入元素,此时队列仍然是满的)
take取出元素时,判断队列是否为空,如果为空,就进行wait 同样在循环中进行
public class MyBlockQueue {
private String[] items = new String[1000];
//head 指向队列的头部
private int head = 0;
//tail 指向队列尾部的下一个元素 队列中有效元素范围为[head,tail)
//当head和tail相等时,队列为空
private int tail = 0;
private int size = 0;
//入队列
public void put(String elem){
if(size >= items.length){
//队列满
return;
}
items[tail] = elem;
tail++;
//当tail到达末尾,就回到开头
if(tail >= items.length){
tail = 0;
}
size++;
}
//出队列
public String take(){
if(items.length == 0){
return null;
}
String elem = items[head];
head++;
if (head >= items.length){
head = 0;
}
size--;
return elem;
}
}
上述是一个环形队列的简单实现,我们要将这个环形队列变成一个阻塞队列
1.解决线程安全问题
先给put和take操作加锁,保证多线程调用的时候能够保证线程安全
考虑内存可见性问题,给size加volatile
public class MyBlockQueue {
private String[] items = new String[1000];
//head 指向队列的头部
private int head = 0;
//tail 指向队列尾部的下一个元素 队列中有效元素范围为[head,tail)
//当head和tail相等时,队列为空
private int tail = 0;
private volatile int size = 0;
//入队列
public synchronized void put(String elem){
if(size >= items.length){
//队列满
return;
}
items[tail] = elem;
tail++;
//当tail到达末尾,就回到开头
if(tail >= items.length){
tail = 0;
}
size++;
}
//出队列
public synchronized String take(){
if(items.length == 0){
return null;
}
String elem = items[head];
head++;
if (head >= items.length){
head = 0;
}
size--;
return elem;
}
}
- 实现阻塞的效果
当队列满时,在put就会产生阻塞
当队列空时,在take也会产生阻塞
public class MyBlockQueue {
private String[] items = new String[1000];
//head 指向队列的头部
private int head = 0;
//tail 指向队列尾部的下一个元素 队列中有效元素范围为[head,tail)
//当head和tail相等时,队列为空
private int tail = 0;
private volatile int size = 0;
//入队列
public synchronized void put(String elem){
if(size >= items.length){
//队列满
//return;
try {
this.wait();
} catch (InterruptedException e) {
throw new RuntimeException(e);
}
}
items[tail] = elem;
tail++;
//当tail到达末尾,就回到开头
if(tail >= items.length){
tail = 0;
}
size++;
this.notify();
// 此时的wait用来唤醒入队列的wait
}
//出队列
public synchronized String take(){
if(items.length == 0){
//return null;
try {
this.wait();
} catch (InterruptedException e) {
throw new RuntimeException(e);
}
}
String elem = items[head];
head++;
if (head >= items.length){
head = 0;
}
size--;
this.notify();
//此处的notify用来唤醒入队列的wait
return elem;
}
}
上述代码完成了阻塞特性,但是没有解决我们最开始提出的问题,
如果插入操作被唤醒的同时又插入一个元素,而且此时队列正好满,此时就会出现元素被覆盖的情况,删除操作也同理,可能会出现越界问题
此时我们可以巧妙的考虑将判断队列是否为满和队列是否为空的判断由if改为while,这样就会在被唤醒是再次进行一次条件判断,此时就可以解决上述问题
我们也可以在wait源码中看到,推荐我们在使用wait时搭配循环来使用
public class MyBlockQueue {
private String[] items = new String[1000];
//head 指向队列的头部
private int head = 0;
//tail 指向队列尾部的下一个元素 队列中有效元素范围为[head,tail)
//当head和tail相等时,队列为空
private int tail = 0;
private volatile int size = 0;
//入队列
public synchronized void put(String elem){
while (size >= items.length){
//队列满
//return;
try {
this.wait();
} catch (InterruptedException e) {
throw new RuntimeException(e);
}
}
items[tail] = elem;
tail++;
//当tail到达末尾,就回到开头
if(tail >= items.length){
tail = 0;
}
size++;
this.notify();
// 此时的wait用来唤醒入队列的wait
}
//出队列
public synchronized String take(){
while (items.length == 0){
//return null;
try {
this.wait();
} catch (InterruptedException e) {
throw new RuntimeException(e);
}
}
String elem = items[head];
head++;
if (head >= items.length){
head = 0;
}
size--;
this.notify();
//此处的notify用来唤醒入队列的wait
return elem;
}
}
定时器
在Java中,定时器是一种用于调度任务的机制。Java提供了java.util.Timer和java.util.TimerTask类,用于创建和执行定时任务。
定时器也是软件开发中的一个重要组件. 类似于一个 “闹钟”. 达到一个设定的时间之后, 就执行某个指定好的代码
标准库中的定时器
标准库中提供了一个 Timer 类. Timer 类的核心方法为 schedule .
schedule 包含两个参数.
第一个参数(TimeTask)指定即将要执行的任务代码,
第二个参数指定多长时间之后执行 (单位为毫秒).
public class Demo12 {
public static void main(String[] args) {
Timer timer = new Timer();
timer.schedule(new TimerTask() {
@Override
public void run() {
System.out.println("hello");
}
},3000);
}
}
一个定时器里,是可以有多个任务的
public class Demo12 {
public static void main(String[] args) {
Timer timer = new Timer();
timer.schedule(new TimerTask() {
@Override
public void run() {
System.out.println("hello1");
}
},3000);
timer.schedule(new TimerTask() {
@Override
public void run() {
System.out.println("hello2");
}
},2000);
timer.schedule(new TimerTask() {
@Override
public void run() {
System.out.println("hello3");
}
},1000);
}
}
而且我们可以发现,当所有任务都执行完,程序并不会结束,说明Timer里面有自己的线程,为了保证随时可以处理新安排的任务,这个线程会持续进行,并且这个线程还是个前台线程
模拟实现定时器
首先一个定时器里面是可以有很多个任务的,首先是要将一个任务描述出来,再通过合适的数据结构来把多个任务组织起来
-
创建一个TimerTask这样的类,表示一个任务,这个任务需要包含任务的内容和任务的实际执行时间
-
使用一定的数据结构,把TimerTask组织起来,这里如果使用List这样的数据结构来组织的话,如果任务非常多,要确定每个任务什么时间能够执行,就需要一个线程不断的对List线程进行遍历,看这里的每个元素,是否到达了时间,这样的操作就会大量的消耗系统资源,明显是不合理的,我们这里可以采用优先级队列的方式,通过时间来判断优先级,此时队首元素就是时间最小的任务,并且针对这一个任务的扫描,也不需要一直反复进行,而是在获取到队首元素时,和当前系统时间做差值,根据差值,来决定扫描线程的休眠使劲按,在时间到达前不会进行扫描,这样大幅度降低了扫描的次数,减少了资源的利用
-
扫描线程,负责监控队首元素是否达到了时间,调用run方法完成任务
虽然上述方法就可以完成定时器的主题逻辑,但还是要注意一些问题
问题1
PriorityQueue这个集合类,不是线程安全的,我们创建出来这个实例,既会在主线程中使用,也会在扫描线程中使用,所以我们要给针对queue的操作,进行加锁
问题2
扫描线程是根据队首元素和当前系统时间的差值进行休眠,这个休眠应该用sleep还是wait呢?
答案是使用wait,因为sleep在休眠时,是不会释放锁的,会影响到其他线程执行schedule,我们想象一个场景,假设当前最靠前的任务是14:00执行,当前时刻是13:00,此时扫描线程应该休眠一小时,但如果此时新增任务要在13:30执行,这个新的任务,就成了最早要执行的任务,所以此时要唤醒这个休眠的扫描线程,并且根据最新的情况,重新判定休眠时间,这就需要每次加入新的任务,都要将扫描线程唤醒,此时使用wait比较合适,虽然sleep睡眠可以使用interrupt来打断,但是使用interrupt意味着线程应该要结束
问题3
因为我们使用了优先级队列这样的数据结构,这就对我们的元素有了要求,必须是可比较的元素,才可以放在优先级队列中,
代码实现
class MyTimerTask implements Comparable<MyTimerTask>{
private long time;
private Runnable runnable;
public MyTimerTask(Runnable runnable,long delay){
time = System.currentTimeMillis() + delay;
this.runnable = runnable;
}
public long getTime() {
return time;
}
public Runnable getRunnable() {
return runnable;
}
@Override
public int compareTo(MyTimerTask o) {
return (int) (this.time - o.time);
}
}
class MyTimer{
private PriorityQueue<MyTimerTask> queue = new PriorityQueue<>();
private Object locker = new Object();
public void schedule(Runnable runnable, long delay){
synchronized (locker){
MyTimerTask task = new MyTimerTask(runnable,delay);
queue.offer(task);
locker.notify();
}
}
//扫描线程
public MyTimer(){
Thread t = new Thread(() -> {
//最外层循环用来扫描
while (true){
try {
//此时的while主要是跟据wait搭配
while (queue.isEmpty()){
//此时队列为空,不应该取元素
//使用wait等待优于使用continue,continue会导致无限循环,增大cpu开销
locker.wait();
}
MyTimerTask task = queue.peek();
long curTime = System.currentTimeMillis();
if(curTime > task.getTime()){
//说明该执行任务了
queue.poll();
task.getRunnable().run();
//调用run方法执行任务
}else {
locker.wait(task.getTime()-curTime);
}
}catch (InterruptedException e){
e.printStackTrace();
}
}
});
t.start();
}
}
线程池
在Java中,线程池(Thread Pool)是一种管理和重用线程的机制,它维护一个线程集合,并可以根据需要将任务分配给空闲的线程来执行。使用线程池可以提高线程的利用率和性能,避免频繁地创建和销毁线程。
Java提供了java.util.concurrent包中的ExecutorService接口和相关类来实现线程池。
创建线程池:可以使用Executors类提供的静态方法来创建不同类型的线程池,例如newFixedThreadPool、newCachedThreadPool、newSingleThreadExecutor等。
其中,newFixedThreadPool创建固定大小的线程池,newCachedThreadPool创建可根据需求自动调整大小的线程池,newSingleThreadExecutor创建只有一个线程的线程池。
ExecutorService executor = Executors.newFixedThreadPool(5);
此处采用了工厂模式 Executors称为工厂类,newFixedThreadPool称为工厂方法
我们通常创建对象,都是通过new来通过构造方法来创建对象,但是用构造方法有一个缺陷就是构造方法的方法名必须是类名,但是有的类,又需要不同的构造方式,此时就只能用方法重载的方式构造,但是如果此时我们需要的两种构造方法的参数恰好相同,此时就无法构成重载,就采用工厂模式来解决上述问题
我们就使用普通的方法来构造对象,这样的方法名字就可以是任意的,在普通方法内部,再来new对象,因为普通方法的目的是为了创建出对象来,所以这样的方法一般是静态的
标准库中的线程池
- 使用 Executors.newFixedThreadPool(10) 能创建出固定包含 10 个线程的线程池.
- 返回值类型为 ExecutorService
- 通过 ExecutorService.submit 可以注册一个任务到线程池中.
使用线程池的execute方法或submit方法提交任务给线程池执行。execute方法用于提交没有返回值的任务,而submit方法用于提交有返回值的任务。
public class Demo14 {
public static void main(String[] args) {
ExecutorService pool = Executors.newFixedThreadPool(5);
pool.submit(new Runnable() {
@Override
public void run() {
while (true) {
System.out.println("hello");
}
}
});
}
}
Executors 创建线程池的几种方式
newFixedThreadPool: 创建固定线程数的线程池
newCachedThreadPool: 创建线程数目动态增长的线程池.
newSingleThreadExecutor: 创建只包含单个线程的线程池.
newScheduledThreadPool: 设定 延迟时间后执行命令,或者定期执行命令. 是进阶版的 Timer.
Executors 本质上是 ThreadPoolExecutor 类的封装.
ThreadPoolExecutor
ThreadPoolExecutor 是 Java 标准库中的一个类,它实现了 ExecutorService 接口,提供了一个可配置的线程池。相比于 Executors 类中提供的静态方法创建线程池,使用 ThreadPoolExecutor 可以更加灵活地配置线程池的参数。
构造方法
我们只需要明白最后一个,其他的就都可以理解
int corePoolSize 表示核心线程数 int maximumPoolSize表示最大线程数 ThreadPoolExecutor里面的线程数,并非是固定不变的,会根据当前任务情况动态变化,核心线程数表示至少得又这么多线程,哪怕线程池一点任务也没有,最大线程数表示,就算任务再多,也不能超过这么多线程
long keepAliveTime 表示那些超出核心线程数的那些线程,也就是临时的线程空闲状态下的最大保持时间,当这些线程超过这个时间没有执行任务,就会被销毁
TimeUnit表示keepAilveTime的单位
BlockingQueue< Runnable > workQueue表示一个阻塞队列,线程池内部有很对任务,这些任务可以使用阻塞队列来管理,线程池可以内置阻塞队列,也可以手动指定一个
ThreadFactory threadFactory 用于创建新线程的工厂类。可以自定义线程工厂来设置线程的名称、优先级等属性。
RejectExecutionHandler handler 表示拒绝策略
当任务无法被线程池执行时的处理策略。
常用的策略有
CallerRunsPolicy(使用提交任务的线程来执行该任务)、
AbortPolicy(默认策略,抛出 RejectedExecutionException 异常)
DiscardPolicy(丢弃无法执行的任务)DiscardOldestPolicy(丢弃队列中最旧的任务)
也可以自定义实现 RejectedExecutionHandler 接口来定义自己的拒绝策略。
实现线程池
class MyThreadPool{
//阻塞队列,用来存放任务,
private BlockingQueue<Runnable> queue = new LinkedBlockingQueue<>();
//将任务放入线程池
public void submit(Runnable runnable) throws InterruptedException {
queue.put(runnable);
}
//n 表示线程池里有几个线程
public MyThreadPool(int n){
for (int i = 0; i < n; i++) {
Thread t = new Thread(()->{
while (true){
try {
//从队列中取出任务
Runnable runnable = queue.take();
runnable.run();
} catch (InterruptedException e) {
throw new RuntimeException(e);
}
}
});
t.start();
}
}
}