一.阻塞队列
阻塞队列,也是特殊的队列,虽然也是先进先出的,但是带有特殊功能。
阻塞:1.如果队列为空,执行出队列操作,就会阻塞,阻塞到另一个线程往队列中添加元素(队列不空为止)。
2.如果队列满了,执行入队列操作,也会阻塞,阻塞到另一个线程从队列中取走元素为止(队列不满)。
二.消息队列,也是特殊的队列,相当于是在阻塞队列的基础上,加了个“消息的类型”,按照制定类别进行先进先出。
给大家举一个比较形象的例子:医院里超声科这个科室会有很多人来看病,有来看胃的,有来看心脏的,有来看宝宝的,
我们用长方形代表看胃的,圆代表看宝宝的,三角形代表看心脏的,医生这时如果叫一个看心脏的病人来就诊,那么应该是第一个三角形病人去就诊(先进先出),而不是后面的三角形病人。
三.“生产者消费者模型”
举个简单的例子:过年包饺子。
两种典型的包法:1.每个人都进行擀饺子皮,然后包
一个人擀饺子皮,另外三个人包
这两种方法显然是第二种方法效率高一些。因为第一种方法,大家都会竞争擀面杖就会产生阻塞等待,影响效率。
这种方式就称为“生产者消费者模型”,此时负责 擀饺子皮的就是生产者,包饺子的就是消费者,盖帘就是阻塞队列,如果擀饺子皮慢了,包饺子的就得等,如果包饺子的慢了,那么擀饺子皮的就得等会。
生产者消费者模型,给程序带来的两个非常重要的好处
实现了发送方和接受方之间的“解耦”,降低耦合的过程,就叫做解耦
典型场景:服务器之间的相互调用
此时A把请求转发给B处理,B处理完了,把结果反馈给A此时就是A调用了B,上述场景中,A和B之间的耦合是比较高的,A要调用B,A务必知道B的存在,如果B挂了,很容易引起A的bug,另外如果再加上一个C服务器,此时也需要对A修改不少代码,重新测试,重新发布,会很麻烦
针对上述场景,使用生产者消费者模型,可以有效地降低耦合。
此时A和B之间的耦合就降低很多了,A的代码中没有任何一行与代码有关,B的代码中,也没有任何一行代码和A相关,如果B挂了,对于A没有任何影响,因为队列还好着,A仍然可以给队列插入元素如果队列满了,就先阻塞。如果A挂了,对于B也没有影响,因为队列还好着,B仍然可以从队列中获取元素,如果队列为空,先阻塞等待.A B任何一方挂了不会对对方造成影响,当我们新增C时,也是同样的道理。
生产者消费者第二个模型,第二个好处,可以做到削峰填谷,保证系统的稳定性。
当上游水很多时,三峡大坝会起到一个存水的作用,防止上游水过多造成对下游的冲击过大。
同理当客户端发送过多的请求时,会把请求放在阻塞队列里,然后服务器慢慢地去接受,这样就可以防止客户端请求过多,把服务器给压垮。
三.阻塞队列:
标准库提供的阻塞队列
阻塞队列的方法主要有两个:put()放入元素,take()取出元素,这两种方法是带有阻塞功能的。
下面我们具体来看一下基于链表实现的阻塞队列的一些方法
public static void main(String[] args) throws InterruptedException{
BlockingQueue<Integer> blockingQueue = new LinkedBlockingQueue<>();
//put方法是指将阻塞队列中放入元素,如果此时队列中元素满了,此时就会阻塞等待
blockingQueue.put(1);
//blockingQueue.put(2);
//take 方法是从阻塞队列中取出元素,如果此时队列中元素为空,此时就会阻塞等待
Integer a=blockingQueue.take();
System.out.println(a);
//当我们第二次取b的时候,因为此时队列中已经没有了元素,此时就会阻塞等待
Integer b=blockingQueue.take();
System.out.println(b);
}
public static void main(String[] args) {
BlockingQueue<Integer>blockingQueue=new LinkedBlockingQueue<>();
Thread producer =new Thread(()->
{
int count=0;
while(true)
{
try {
blockingQueue.put(count);
System.out.println("生产者:"+count);
count++;
Thread.sleep(500);
} catch (InterruptedException e) {
throw new RuntimeException(e);
}
}
});
Thread consumer=new Thread(()->{
while(true)
{
try {
Integer c=blockingQueue.take();
System.out.println("消费者:"+c);
} catch (InterruptedException e) {
throw new RuntimeException(e);
}
}
});
producer.start();
consumer.start();
}
自己实现个阻塞队列
//自己实现一个阻塞队列
思路:先用数组实现一个循环队列,然后再加上阻塞的功能
/*public class MyblockingQueue {
public int[] arr = new int[1000];
int start = 0;//标记循环队列的头
int end = 0;//标记循环队列的尾
public int size = 0;
public void put(int a) {
synchronized (this) {
//判断队列元素是否满了
while (size == arr.length) {
try {
如果队列元素满了就阻塞等待一下
this.wait();
} catch (InterruptedException e) {
throw new RuntimeException(e);
}
}
arr[end] = a;
end++;
if (end >= arr.length) {
end = 0;
}
size++;
一旦增加元素,就通知一下正在阻塞等待取元素的线程
this.notify();
}
}
public int take() {
synchronized (this) {
//判断队列元素是否为空
while (size == 0) {
try {
//如果为空就阻塞等待一下
this.wait();
} catch (InterruptedException e) {
throw new RuntimeException(e);
}
}
int a = arr[start];
start++;
if (start >= arr.length) {
start = 0;
}
size--;
//一旦有元素被取出,就通知一下正在阻塞等待放元素的线程
this.notify();
return a;
}
}
}
class text91{
public static void main(String[] args) {
MyblockingQueue myblockingQueue=new MyblockingQueue();
Thread producer=new Thread(()->{
int count=0;
while(true)
{
myblockingQueue.put(count);
System.out.println("生产者:"+count);
count++;
try {
Thread.sleep(500);
} catch (InterruptedException e) {
throw new RuntimeException(e);
}
}
});
Thread consumer=new Thread(()->{
while(true)
{
int a=myblockingQueue.take();
System.out.println("消费者:"+a);
}
});
producer.start();
consumer.start();
}
}
计时器:
这里的定时器不是用来提醒的,而是执行一个实现准备好的方法代码,这个是开发中一个常用的组件,尤其是网络编程的时候,很容易出现“卡了”,“连不上了”,就可以使用定时器来及时止损
public class Thread69 {
public static void main(String[] args) {
//Timer 类是标准库的定时器
Timer timer=new Timer();
//往定时器里注册任务1 run()方法里面是具体的任务,1000表示任务在1000毫秒后执行
//schedule()方法用来注册任务
timer.schedule(new TimerTask(){
public void run()
{
System.out.println("任务1");
}
},1000);
//往定时器里注册任务2
timer.schedule(new TimerTask() {
@Override
public void run() {
System.out.println("任务2");
}
},500);
}
}
自己实现一个定时器:
实现一个定时器我们的思路是:先用一个优先级阻塞队列来保存任务,然后,再设计一个扫描线程来看一下任务是否达到了要执行的时间,为啥要用优先级阻塞队列来保存任务呢?因为我们拿出来的第一个任务是时间最早的,只需判定它到没到执行时间,如果它没到,后面的肯定都没到,这样就不用扫描线程去扫描了。
class Mytask implements Comparable<Mytask>
{
Runnable runnable;
long nowtime;
public Mytask(Runnable runnable, long nowtime) {
this.runnable = runnable;
this.nowtime = nowtime;
}
@Override
public int compareTo(Mytask o) {
return (int)(o.nowtime-this.nowtime);
}
public void getrun()
{
runnable.run();
}
public long gettime()
{
return nowtime;
}
}
class Mytimer{
//创建一个扫描线程
public Thread t=null;
//创建一个优先级队列来存储任务
public PriorityBlockingQueue<Mytask>m=new PriorityBlockingQueue<>();
public Mytimer()
{
t=new Thread(()->{
while(true)
{
try {
//取出当前队列的首元素
synchronized (this) {
Mytask mytask = m.take();
//计算当前的时间戳
long curenttime = System.currentTimeMillis();
//当前时间和任务执行的时间进行比较,如果还没到任务执行的时间,就将任务放回去, 阻塞等待一下,如果到了就执行
if (curenttime < mytask.gettime()) {
m.put(mytask);
this.wait(mytask.gettime() - curenttime);
} else {
mytask.getrun();
}
}
} catch (InterruptedException e) {
throw new RuntimeException(e);
}
}
});
t.start();
}
public void schedule(Runnable runnable,long after)
{
//time 表示的是任务执行的时间
long time=System.currentTimeMillis()+after;
Mytask mytask=new Mytask(runnable,time);
//将任务放到队列当中
m.put(mytask);
synchronized (this) {
this.notify();
}
}
}
public class Thread79 {
public static void main(String[] args) {
Mytimer mytimer=new Mytimer();
mytimer.schedule(new Runnable(){
public void run()
{
System.out.println("任务1");
}
},1000);
mytimer.schedule(new Runnable(){
public void run()
{
System.out.println("任务2");
}
},2000);
}
}
大家想一下46行和71行,我们要加上wait,和notify呢?那是因为如果当前取出的任务没有到达要执行的时间,那么就会一直重复取出来这个任务,然后再放回去,这种现象称之为”忙等“,按理来说等待是要释放CPU资源的,让CPU干别的事情,但是忙等,即进行了等待,又占用了CPU资源。
我们大家再来想一下,如果当前是13.00,队首元素是14.00就等1个小时就行了。但是如果使用sleep,一个新的任务一旦来了,新的任务是13.30,此时仍按照sleep一小时来等待,就会错过新任务的执行时间。使用wait更合适,方便随时唤醒,使用wait等待,一旦有新任务来了,就notify一下,将线程唤醒,重新检查时间,重新计算要等待的时间。
我们再来看一下这段代码
这里有同学可能会问,我们这里的锁只加在wait上不就行了,为啥要将取出来的任务和wait一起包括在里面呢?我们可以想一个问题,假设synchronized只加在wait上,当线程一旦刚执行完m.put,这个线程从cpu上调度走了,这时假设来了一个新任务,这个新任务的执行时间是1.30,notify也被执行了,当线程回来之后,进行wait操作,此时wait时间已经算好了,假设gettime()是2.00,curenttime是1.00,此时需要等待1小时,这时线程回来了,此时就在这里等了一个小时,那么这个新的任务到点了也没有执行,因此我们必须保证获取到新任务和wait操作是原子的,这样notify就不会空打,也就不会出现错过任务执行这种问题了,因此synchronized必须将获取新任务和wait操作一起包括在里面。
线程池:
线程存在的意义:使用进程来实现并发编程,太重了,此时引入了线程,线程也叫做“轻量级线程”,创建线程比创建进程更高效,销毁线程比销毁进程更高效,调度线程比调度进程更高效,此时使用多线程可以代替进程实现并发编程了,但是随着并发程度的提高,对于性能要求标准的提高,当我们频繁创建销毁线程的时候,其实开销也很大。
我们可以使用线程池,来降低创建、销毁线程的开销,我们可以事先把使用的线程创建好,放到“池”中,后面需要使用的时候,直接从池里获取,如果用完了也还给池,这比创建、销毁更高效。
创建线程、销毁线程是由操作系统内核完成的,从池子里获取/还给池,是自己用户代码就能实现,不必交给内核操作。
给大家举个简单的例子:
在银行大厅里,用户都是自主的,用户想干啥就干啥,就像程序中的用户态,用户态执行的是程序员自己写的代码,想干啥,怎么干都是由程序员代码自己决定的。但是有些操作,需要在银行柜台后完成,需要通过银行的工作人员来间接完成,就像程序中的“内核态”,内核态进行的操作都是在操作系统内核中完成的,内核会给程序提供一些api成为系统调用,程序可以调用系统调用,驱使内核完成一部分工作,系统调用里面的内容是直接和内核的代码相关的,这一部分工作不受程序员自身控制,都是内核自行完成的。
相比于内核来说,用户态,程序执行的行为是可控的,想要做某个工作,会很快的完成(例如从池子里获取线程/还给池子线程),如果要是通过内核,从系统里创建个线程,就需要通过系统调用,让内核来执行,此时内核可能会有很多任务(内核不只是给一个应用程序服务,而是给所有的程序都提供服务)因此当使用系统调用的,执行内核代码的时候,无法确定内核都要做哪些工作,整体过程“不可控”。因此当我们用用户的代码创建的线程池来获取线程、回放线程会更加高效。
二.java标准库,也提供了线程的线程池,可以直接使用
工厂模式:使用普通的方法来代替构造方法,创建对象
那为啥构造方法要被替代,因为构造方法只构造一种对象,好办,但是要构造多种不同情况的对象,就难搞了
给大家举个例子:
我们创建一个类:平面上有一个点
class Point
{
public Point(double x,double y) //使用笛卡尔坐标系提供的坐标,来构造点
{
}
public Point(double r,double a)//使用笛卡尔积来构造点
{
}
}
多个构造方法应该是重载的,重载要求方法名相同,参数个数或者类型不同,显然这代码是存在问题的
因此我们可以采用工厂模式:
class PointFactory
{
public static Point makePointxy(double x,double y)
{
}
public static Point makePointry(double r,double y)
{
}
}
Point p=PointFactory.makePointxy(10,20);
普通方法,方法名字没有限制,因此有多种方式构造,使用不同的方法名即可。
我们来具体了解一下标准库里的线程池
public static void main(String[] args) {
ExecutorService pool= Executors.newFixedThreadPool(10);
//构造出一个含有10个线程的线程池干活
for(int i=0;i<1000;i++)
{
int ret=i;//为啥这里要用一个ret变量来记录i呢, i是主线程里面的局部变量(在主线程的栈上)
//随着主线程这里的代码块执行结束就销毁了,很可能主线程这里for执行完了
//当前run的任务在线程池里还没排到,i就要销毁了,于是就有了变量捕获,让run方法把刚才主线程
//里的i给当前的run的栈上拷贝一份。
//submit方法,给线程池提供若干个方法
pool.submit(new Runnable() {
@Override
public void run() {
System.out.println(ret);
}
});
}
}
}
另外我们运行程序之后发现,main线程结束了,但是整个进程没结束,这是因为线程池中的线程都是前台线程,此时会阻止进程结束
我们注意此时线程池里放了1000个任务,一共有10个线程干活,差不多一个线程干100个,因为每个任务执行的时间差不多,每个线程都执行完一个任务后,再领取下一个任务,1000个任务在队列中排队,这10个线程,依次来取队列中的任务,取一个就执行一个,执行完了再执行下一个。
new CachedThreadPool()这里面的线程数量是变化的,任务多了,线程数量就多一些,任务少了,线程数量就少一些
newSingleThreadExecutor()里面只有一个线程
newScheduledThreadPool()类似于定时器,也是让任务延迟执行,只不过是不是由扫描线程来执行,而是让线程池来执行。
上述的这几个线程池,本质上都是通过包装ThreadPoolExecutor来实现的
下面我们来具体了解一下
我们看一下里面的具体参数:
(1)corePoolSize 核心线程数
(2) maximumPoolSize 最大线程数
ThreadPoolExecutor相当于把里面的线程分成两类:一类是正式员工(核心线程)一类是临时工/实习生,这两个之和是最大线程数,允许正式员工摸鱼,不允许实习生摸鱼,如果实习生摸鱼摸的太久了,就会被销毁(开除了),如果任务多,需要更多的线程,但是任务少,此时线程还那么多,就不合适了,需要对一些线程进行淘汰,整体的策略是:正式员工保底,临时工动态调节。
那么实际开过程中,线程池里放多少线程合适呢?不同程序的特点不同,此时要设置的线程数是不同的,考虑两个极端情况:1.CPU密集型,每个线程执行的任务都是狂转CPU(进行一系列算数运算)此时线程池里的线程数,做多不应该超过CPU核数,cpu密集型任务要一直占用cpu,那么多线程,cpu核数被占用完了2.IO密集型:每个线程干的工作就是等待IO(读写硬盘,读写网卡,等待用户输入),不吃cpu,此时这样的线程处于阻塞状态,不参与cpu调度,这样的线程不受制于cpu核数。真实的程序,一部分吃cpu,一部分要等待IO,那么多少线程数量合适,其实最好的方法是在自己的机子上试试,看看系统资源的占用情况。
(3)long keepAliveTime
TimeUnit unit 这两个描述了临时工摸鱼的最大时间
(4)BlockingQueue<Runnable>workQueue
线程池的任务队列
(5)ThreadFactory threadFactory
用于创建线程,线程池也是需要创建线程的
(6)RejectedExecutionHandler handler
描述了线程池的“拒绝策略”也是一种特殊的对象,如果当线程池任务队列满了,继续添加任务会有什么行为?
四种拒绝策略:
给大家举个具体的例子:我这一天安排了很多事情,第一件事情就是要做学习笔记,这时一个朋友让我去给他拿快递,第一种策略:我哇一下直接哭出来了,第二种策略:我会说自己的快递自己去拿。
第三种拒绝策略放下手中的学习笔记,去拿快递,第四种:拒绝拿快递,照常去做学习笔记