多线程的模式--(阻塞队列,定时器,线程池)
多线程模式:
阻塞队列(线程安全)
重点是如何自己去实现这种数据结构:
编辑
定时器:
实现一个定时器:
线程池:
实现线程池
多线程模式:
软件开发中也有很多常见的 "问题场景". 针对这些问题场景, 大佬们总结出了一些固定的套路. 按照
这个套路来实现代码, 也不会吃亏。大佬们为我们操碎了心。
单例模型(某一个类,在进程中只有唯一一个实例)
分为:饿汉模式 ,懒汉模式
饿汉模式:就是将文件所有的内容都读到内存中,并显示。(小规模就好,太多了内存不够,所以懒汉模式)
class Singleton {
private static Singleton instance = new Singleton();
private Singleton() {}
public static Singleton getInstance() {
return instance;
}
}
被static修饰,该属性是类的属性(类对象上),JVM中,每个类的对象只有唯一一份,类对象的成员自然也是唯一一份。
private将new操作给静止掉,在类内部把实例创建好同事静止外部重新创建实例,此时,就可以保证单例的特性。
懒汉模式:只读取文件的一小部分。把当前屏幕填充上,如果用户翻页了,再读其他文件内容,如果不翻页,就可以节约运算资源。
单线程:核心思想,非必要,不创建。第一次使用的时候才创建实例
class Singleton {
private static Singleton instance = null;
private Singleton() {}
public static Singleton getInstance() {
if (instance == null) {
instance = new Singleton();
}
return instance;
}
}
多线程:
上面的代码是不安全的,为什么?因为在多线程下,同时调用 getInstance 方法, 就可能导致
创建出多个实例。
加上 synchronized 可以改善这里的线程安全问题
class Singleton {
private static Singleton instance = null;
private Singleton() {}
public synchronized static Singleton getInstance() {
if (instance == null) {
instance = new Singleton();
}
return instance;
}
}
加锁进行实例化对象,是很耗资源。其实一个实例创建后,在内存中已经存在,其他线程其实更多的是读操作。那么就没必要去进行加锁操作。
对上述代码改进:
class Singleton {
private static volatile Singleton instance = null;
private Singleton() {}
public static Singleton getInstance() {
//判断是否为空,诺不为空,就不需进行加锁实例化。
if (instance == null) {
synchronized (Singleton.class) {
//进行实例化判断
if (instance == null) {
instance = new Singleton();
}
}
}
return instance;
}
}
两个if所代表的意义有所不同。
1.加锁if:把if和new变为原子操作
2.双重 if:减少不必要的加锁操作
3.使用volatile 禁止指令重排序,保证后续线程肯定拿到的是完整对象。
单例模式:线程安全问题:
饿汉模式:天然就是安全的,只是读操作
懒汉模式:不安全的,有读也有写。
阻塞队列(线程安全)
本质是一个循环队列,但是它带有阻塞特性;
1.如果入队列为空,尝试出队列,就会阻塞等待。等待到队列不空为止。
2.如果队列满了,尝试入队,就会阻塞等待,等待到队列不满为止。
这个就有一个经典的模型进行解释--生产者消费者模型,什么是生产者消费者模型?其实简单理解为,生产效率与消费效率的比值。比如一个面包厂1小时生产4个面包,而此时有很多人等着吃面包。这个就是简单的生产者消费者模型。
这个数据结构的模式有几个好处:
1.可以让上下游模块之间,可以更好的“解耦和”。
队列与具体业务无关,队列中的某一个线程挂了,不影响其他线程,比如电脑有时候网页会卡,但是某些功能还在运行。
2.削峰填谷
不知道各位有没有打游戏,王者农药肯定都听过,其中有个事情,在游戏早期,它出了一款皮肤,这个皮肤很受玩家喜欢,再上线的那一刻,众多玩家,蹲点购买。使得当时的支付系统蹦了几分钟。为了应对这种情况,阻塞队列就可以减少这种风险。
在Java中提供了一个阻塞队列的数据的集合,BlockingQueue
- BlockingQueue 是一个接口. 真正实现的类是 LinkedBlockingQueue.
- put 方法用于阻塞式的入队列, take 用于阻塞式的出队列.
- BlockingQueue 也有 offer, poll, peek 等方法, 但是这些方法不带有阻塞特性.
BlockingQueue<String> queue = new LinkedBlockingQueue<>();
// 入队列
queue.put("abc");
// 出队列. 如果没有 put 直接 take, 就会阻塞.
String elem = queue.take();
重点是如何自己去实现这种数据结构:
主要分为三步:
1.先实现一个普通队列
2.加上线程安全
3.加上阻塞功能
class MyBlockingQueue{
//普通队列
private int [] items=new int[1000];
//规定head--tail的范围为有效范围
volatile private int head=0;
volatile private int tail=0;
volatile private int size=0;
//入队列
synchronized public void put(int elem) throws InterruptedException {
//队列元素满了
while (size==items.length){
this.wait();
}
items[tail]=elem;
tail++;
//判断是否到达末尾,队列中的元素没有满的情况下
if (tail==items.length){
tail=0;
}
//可读性下差,开发效率慢
//tail=tail%items.length;
this.notify();
size++;
}
//出队列
synchronized public Integer take() throws InterruptedException {
while (size==0){
this.wait();
}
int value=items[head];
head++;
if (head==items.length){
head=0;
}
this.notify();
size--;
return value;
}
}
public class test8 {
public static void main(String[] args) {
MyBlockingQueue queue=new MyBlockingQueue();
Thread t1=new Thread(()->{
while (true){
try {
int value= queue.take();
System.out.println("消费:"+value);
} catch (InterruptedException e) {
throw new RuntimeException(e);
}
}
});
Thread t2=new Thread(()->{
int value=0;
while (true) {
try {
System.out.println("生产:"+value);
queue.put(value);
Thread.sleep(1000);
value++;
} catch (InterruptedException e) {
throw new RuntimeException(e);
}
}
});
}
}
入队成功后其他线程才能出队,出队成功后其他线程才能入队。
定时器:
定时器也是软件开发中的一个重要组件. 类似于一个 "闹钟". 达到一个设定的时间之后, 就执行某个指定好的代码。
在java中提供了Timer类。Timer 类的核心方法为 schedule ,其中包含两个参数. 第一个参数指定即将要执行的任务代码, 第二个参数指定多长时间之后执行 (单位为毫秒)。
Timer timer = new Timer();
timer.schedule(new TimerTask() {
@Override
public void run() {
System.out.println("hello");
}
}, 3000);
实现一个定时器:
定时器的构成:
- 一个带优先级的阻塞队列(阻塞队列中的任务都有各自的执行时刻 delay. 最先执行的任务一定是 delay 最小的. 使用带优先级的队列就可以高效的把这个 delay 最小的任务找出来.)
- 队列中的每个元素是一个 Task 对象.
- Task 中带有一个时间属性, 队首元素就是即将运行的。
- 同时有一个 worker 线程一直扫描队首元素, 看队首元素是否需要执行
class MyTask implements Comparable<MyTask>{
public Runnable runnable;
//为了方便后续,使用绝对的时间戳
public long time;
public MyTask(Runnable runnable,long delay){
this.runnable=runnable;
//获取当前时刻的时间戳+delay,作为任务的实际执行时间
this.time=System.currentTimeMillis()+delay;
}
@Override
public int compareTo(MyTask o) {
//设置比较器,构建优先级队列
return (int)(this.time-o.time);
}
}
class MyTimer{
//这个结构,带有优先级的阻塞对列,核心数据结构
//创建一个锁对象
private Object loker=new Object();
private PriorityBlockingQueue<MyTask> queue=new PriorityBlockingQueue<>();
//此处的dalay 是一个形如3000这样的数字(多长时间后执行)
public void schedule(Runnable runnable,long dalay){
//根据参数,构造MyTask,插入队列即可
MyTask myTask=new MyTask(runnable,dalay);
queue.put(myTask);
synchronized (loker){
loker.notify();
}
}
//构造线程
public MyTimer(){
Thread t=new Thread(()->{
while (true) {
try {
synchronized (loker){
MyTask myTask=queue.take();
long curTime=System.currentTimeMillis();
if (myTask.time <= curTime){
//时间到了,执行任务
myTask.runnable.run();
}else {
//时间还没到
//将刚刚取出的任务,重新塞回队列
queue.put(myTask);
loker.wait(myTask.time-curTime);
}
}
} catch (InterruptedException e) {
throw new RuntimeException(e);
}
}
});
t.start();
}
}
public class test10 {
}
线程池:
(就是就是装有很多线程的仓库,使用线程从里面拿就好)
线程池最大的好处就是减少每次启动、销毁线程的损耗。
Executors.newFixedThreadPool(10) 能创建出固定包含 10 个线程的线程池.
返回值类型为 ExecutorService 通过 ExecutorService.submit 可以注册一个任务到线程池中.
submit放入线程
ExecutorService pool = Executors.newFixedThreadPool(10);
pool.submit(new Runnable() {
@Override
public void run() {
System.out.println("hello");
}
});
源代码:
- corePoolSize:核心线程数(不会消失)
- maximumPoolSize:最大线程数(核心线程数+零时线程)
- keepAliveTime: 临时线程允许的空闲时间.
- unit: keepaliveTime 的时间单位, 是秒, 分钟, 还是其他值.
- workQueue: 传递任务的阻塞队列
- threadFactory: 创建线程的工厂, 参与具体的创建线程工作.
- RejectedExecutionHandler: 拒绝策略, 如果任务量超出线程池的负荷了接下来怎么处理:
- AbortPolicy(): 超过负荷, 直接抛出异常.
- CallerRunsPolicy(): 调用者负责处理
- DiscardOldestPolicy(): 丢弃队列中最老的任务.
- DiscardPolicy(): 丢弃新来的任务
Executors 创建线程池的4种方式
- newFixedThreadPool: 创建固定线程数的线程池
- newCachedThreadPool: 创建线程数目动态增长的线程池.
- newSingleThreadExecutor: 创建只包含单个线程的线程池.
- newScheduledThreadPool: 设定 延迟时间后执行命令,或者定期执行命令. 是进阶版的Timer.
实现线程池
- 核心操作为 submit, 将任务加入线程池中
- 使用 Worker 类描述一个工作线程. 使用 Runnable 描述一个任务.
- 使用一个 阻塞队列中组织所有的任务
- 每个 worker 线程要做的事情: 不停的从 阻塞队列中取任务并执行.
- 指定一下线程池中的最大线程数 maxWorkerCount,当前线程数超过这个最大值时, 就不再新增线程了。
class Worker extends Thread {
private LinkedBlockingQueue<Runnable> queue = null;
public Worker(LinkedBlockingQueue<Runnable> queue) {
super("worker");
this.queue = queue;
}
@Override
public void run() {
// try 必须放在 while 外头, 或者 while 里头应该影响不大
try {
while (!Thread.interrupted()) {
Runnable runnable = queue.take();
runnable.run();
}
} catch (InterruptedException e) {
}
}
}
public class MyThreadPool {
private int maxWorkerCount = 10;
private LinkedBlockingQueue<Runnable> queue = new LinkedBlockingQueue();
public void submit(Runnable command) {
if (queue.size() < maxWorkerCount) {
// 当前 worker 数不足, 就继续创建 worker
Worker worker = new Worker(queue);
worker.start();
}
// 将任务添加到任务队列中
queue.put(command);
}
public static void main(String[] args) throws InterruptedException {
MyThreadPool myThreadPool = new MyThreadPool();
myThreadPool.execute(new Runnable() {
@Override
public void run() {
System.out.println("吃饭");
}
});
Thread.sleep(1000);
}
}