1)阻塞队列:是一个线程安全的队列,是可以保证线程安全的
1.1)如果当前队列为空,尝试出队列,进入阻塞状态,一直阻塞到队列里面的元素不为空
1.2)如果当前队列满了,尝试入队列,也会产生阻塞,一直阻塞到队列中的元素不为满为止
1.3)所以在Java的标准库中内置了一个BlockingQueue(是一个接口)这样的类来实现阻塞队列这样的功能,它的用法与普通的入队列和出队列很相似,没有取队首元素的操作;
1.4)Java.util.concurrent这个包里面包含了很多与多线程并发相关的组件操作,简称JUC
BlockingQueue<String> blockingQueue=new LinkedBlockingQueue<>();//基于链表来实现,可以指定阻塞队列的大小 blockingQueue.put("hello"); String str=blockingQueue.take();
阻塞队列的知识点补充:
1)add方法和offer方法可以将指定的元素放到BlockingQueue里面,此时阻塞队列可以容纳,那么直接返回true,否则直接返回false,不会使阻塞队列阻塞
2)但是put方法也是将我们制定的元素存放到blockingQueue里面,如果说这个阻塞队列没有空间那么调用该方法的线程会阻塞等待
3)poll(time):取出BlockingQueue排在首位的元素,如果不能立即取出,那么会等到time规定的时间内取,规定时间到还没有取到那么直接返回null;
4)take():取走BlockingQueue里排在首位的对象,若BlockingQueue为空,阻断进入等待状态直到Blocking有新的对象被加入为止
5)BlockingQueue不接受null 元素,试图add、put 或offer 一个null 元素时,某些实现会抛出NullPointerException,null 被用作指示poll 操作失败的警戒值;
BlockingQueue<String> queue1=new ArrayBlockingQueue<String>(10); BlockingQueue<String> queue2=new LinkedBlockingQueue<String>(10); BlockingQueue<String> queue3=new PriorityBlockingQueue<>(10);
1)手动实现一个阻塞队列:
在实现循环队列的时候,有一个重要的问题,如何判断使空队列,还是满的队列?
1)head==tail来进行判断,这是并不靠谱的
由于一直进行插入元素,导致的head==tail,就说明是队列满了
由于一直进行删除元素,导致的head==tail,就说明此时队列是空的
所以我们这么做:size=0就是空,size==数组长度就是满,在这里,必须要加一个锁对象,给谁加锁就锁哪一个对象
2)数组实现队列,就是一个循环队列,我们用[head,tail)这个范围来表示数组的一个有效元素范围
3)当head或者tail到达数组元素的末尾之后,就需要从头开始,重新进行循环
进行入队列:就是把新的元素放到tail位置上面,并且让tail++(元素不满)
进行出队列:就是把随手元素取出来,让head++(元素不为空)
1)可以浪费一个格子,直接浪费,head==tail认为是空,head=tail+1认为是满
2)可以是用一个变量来进行记录元素的个数,size==0认为是空size==array.length认为是满
public static void main(String[] args) { myqueue queue=new myqueue();//作为交易场所 Thread t1=new Thread(){//搞一个这样的线程作为生产者 public void run() { for(int i=0;i<1000;i++) { try{ queue.put(i); System.out.println("生产元素生产了"+i+"个"); sleep(1000);/每秒钟生产一个元素 }catch(InterruptedException e) { e.printStackTrace(); } } } }; t1.start(); Thread t2=new Thread() {//搞一个这样的线程作为消费者 public void run() { while (true) { //频繁取队首元素 int num = queue.take(); System.out.println("消费元素为" + num); } } }; t2.start(); } }
public class MyBlockingQueue { public int[] array=new int[10]; public int tail=0; public int head=0; public int count=0; Object object1=new Object(); Object object2=new Object(); public void put(int data) throws InterruptedException { synchronized (Object.class){ if(count==array.length){ object1.wait(); } array[tail]=data; tail++; count++; if(tail==array.length){ tail=0; } } } public int take() throws InterruptedException { int result=0; synchronized (Object.class){ if(count==0){ object2.wait(); } result=array[head]; head++; count--; if(head==array.length){ head=0; } object1.notify(); } return result; } }
数组实现队列,就是一个循环队列 入队列,就是把新的元素放到tail位置上,并且tail++; 出队列,就是把队首元素取出来,也就是说把head位置的元素返回回去,如果是引用数据类型,要手动置为空,并且head++; class MyQueue { //保存数据的本体 Object object=new Object(); private int []arr1; //队首元素下标 private int head=0; //队尾元素下标 private int tail=0; //有效数据元素的个数 private int count=0; MyQueue() { this.arr1 = new int[1000]; } public void put(int data) { synchronized (object) { if (count == arr1.length) { //此时队列中的值已经满了 //此时的条件,最好写成while 因为有可能会出现第一个线程放入元素后,第二个线程又继续放,就有会放满的情况,使用while的目的是为了让wait唤醒之后,再次去判断一下条件是否成立; try{ object.wait(); }catch(InterruptedException e) { e.printStackTrace(); } } arr1[tail] = data; tail++; //处于tail到达数组末尾的情况 if (tail == arr1.length) { tail = 0; } //上面的这个条件判定可以写成tail=tail%array.length count++; //我们put成功了,就可以进行唤醒take中的wait操作,因为此时队列一定是不为空的 object.notify(); } } public int take(){ synchronized (object) { if (count == 0) { //head==tail有一个元素,count=0一个元素都没有 try { object.wait(); } catch (InterruptedException e) { e.printStackTrace(); } } int ret = arr1[head]; head++; if (head == arr1.length) { head = 0; } //我们take成功了,就可以唤醒put中的wait,因为此时队列一定不为满 count--; object.notify(); return ret; } } }
1)要是想要这个队列支持线程安全,一定要保证在多线程环境下面调用这里面的put和take是没有任何问题的;
1.1)看了这里面的代码之后,put和take里面的每一行代码都是在操作公共的变量,可以给整个方法来进行加锁或者是通过同步代码块的方式,指定this来加锁,或者指定一个专门的锁对象,锁这个对象和调用wait都是这个对象,所以说要是想保证线程安全,就需要进行使用synchronized来将若干个非原子性的操作,打包成原子性的操作
如果说要是想精准唤醒某一个线程,就需要使用不同的锁对象:
1)要是想唤醒t1,我们就必须o1.notify(),让t1进行o1.wait()
2)要是想唤醒t2,我们就必须o2.notify(),让t2进行o2.wait()
2)要想实现阻塞效果,我们就需要搭配对象等待集来进行使用
2.1)我们使用哪一个对象来进行加锁,就需要使用哪一个对象来进行wait操作,如果是针对this加锁就使用this.wait();
2.2)对于put操作来说,阻塞条件就是队列为满
2.3)对于take操作来说,阻塞条件就是队列为空
2.4)put中的wait要靠take来进行唤醒,条件是队列为满,只要队列不为满,只要我们take成功取走了一个元素,队列不就不为满了吗,就可以唤醒了
2.5)take中的wait要靠put来进行唤醒,条件是队列为空,只要我们的队列不为空,也就是说只要我们put成功放进去了一个元素,队列不就不为空了吗,就可以唤醒了
2.6)当前代码中,put操作和take操作两种操作不会同时wait,等待条件是截然不同的
注意:tail=tail%array.length这种写法十分的不建议
1)这种写法非常的不直观
2)取%操作,这一种操作对于计算机来说的开销是非常大的,相当于除法操作,比较操作就是一个跳转指令;既不利于提高开发效率,也不能提高运行效率
1)生产者生产元素的速度是小于消费者消费的速度
2)put操作和take操作有可能都会出现阻塞的情况,但是此时由于这两个代码中的阻塞条件是对立的,因此我们两边的wait不会同时触发
put操作就会唤醒take的阻塞,put操作就破坏了take的阻塞条件
take操作就会唤醒put的阻塞,take操作也就破坏了put的阻塞条件
3)下面的if操作最好换成while操作,如果是多个线程出现阻塞等待的时候,万一同时唤醒了多个线程,就很有可能出现,第一个线程放入元素,第二个线程又放的时候,就会出现满的情况,所以我们使用while就是为了让wait被唤醒之后,再次确定一下条件是否成立
4)如果说有人等待,那么notify是可以唤醒的,如果说没有等待,那么notify没有任何副作用
二)关于生产者消费者模型的理解:
生产者消费者模型,拿一个包饺子的例子来说
第一种方式:每一个人,擀完一个饺子一个包一个饺子,擀一个饺子包一个饺子,但是擀面杖只有一个,就会导致锁的冲突比较激烈,况且提高了门槛,我们只有先获取到这把锁才能进行擀饺子皮,其他人获取不到这把锁,就会阻塞等待,整体的效率并不会很高
第二种方式:一个人负责擀饺子,这个人擀出一堆皮,三个人负责取出这些皮,进行包饺子
1)生产者:擀皮的人;
2)消费者:包饺子的人;
3)交易场所:盖帘(放饺子皮的盖帘);擀饺子皮的人就是饺子皮的生产者,要进行源源不断地生成饺子皮
包饺子的人就是及饺子皮的消费者,我们要不断地进行使用和消耗饺子皮
上述模型中一般生产者也只有一个,盖帘是交易场所,消费者确实有很多个
在计算机中,生产者就是一组线程,消费者是另一组线程,阻塞队列就是生产者消费者模型中的交易场所
所以说在我们平时写代码的过程中,代码一定要高内聚,低耦合
高内聚的意思就是说:希望不同模块之间,联系要尽量的少, 所以说我们使用生产者消费者模型就可以降低这里面的耦合
最大的用处解耦合:写了两个代码,一个代码中的两个代码块的关联关系很复杂,这样耦合就比较高,两个模块的关联关系尽量小,简单,整体的代码是可以相互理解的,耦合比较低,
1)例如A要传输一定的数据给B,如果直接传输,此时就要求,要么是A向B传输数据,要么是B向A拉取数据,都是需要A和B进行相互交互的,A和B之间存在着一些关联关系,如果B挂了,A也就会有太大的影响;
2)在我们开发A代码的时候就必须充分了解B提供的一些接口,开发B代码的时候要充分了解A是怎么调用的;
3)未来如果需要进行扩展,扩展也搞一个C,让A也给C传输数据,这个改动就可能比较复杂,因为本来是A和B进行传输的,多了一个C,那么就是A想C传输数据,或者是C和B来向A拉取数据,改动比较复杂,就认为A和B的耦合比较高,B挂了,对A没啥影响
1)A把数据写到队列里面,B再从队列里面取出元素进行消费,
2)A不知道数据要发送给谁,只需要向队列中添加元素,B不知道这个数据是谁发送过来的,只需要从队列中取元素即可
3)如果在后面需要进行扩展(再来C),也不需要直接从A要元素,只需要在队列中取出元素即可,这个阻塞队列就好似于变成了中转站一样的东西;
4)这样我们就做到了,让生产者和消费者可以不知道他们彼此之间是谁,这个数据是谁生产的,是谁消费的,都不重要,能生产,能消费就可以了,这样还是我们的系统变得更加灵活,可以随意替换A,B,C的任意一个模块,修改更方便,耦合耕地,让代码程序的维护性变得更高;
2)销峰填谷:
我们来举一个三峡大坝的例子
汛期:如果没有大坝,那么到了雨季,那么下游的水就会很大,就会造成水灾可能会发生灾难
罕期:如果没有大坝,那么下游的水就很少,有可能就会发生旱灾
于是就有了大坝:
1)汛期:关闸蓄水,并不是全部关了,让水按照一定的速率向下流,有节奏,按照一定的速率来进行放水,避免突然一波把下游带走(当突然下暴雨的时候);
2)罕期:开闸放水,让水也按照一定的速率向下流,避免下游太缺水,避免造成旱灾;
3)首先我们让消费者消费得快一些,让生产者生产的慢一些,此时我们会看到,消费者线程会阻塞等待,每当有新的生产者的元素时,消费者才会执行;
4)但是如果消费者消费得慢一些,让生产者生产的快一些,就会出现,一开始大量的迸发出生产元素,等到生产了100多个才开始消费;(代码是第二种情况
1)互联网上面过来的请求数量,是多还是少,是不可控的;突然来了大量请求,如果没有入口服务器,这些什么商家服务器,直播服务器就有可能会挂掉,操作数据库,效率比较低,况且需要的系统资源可能会更多,如果主机的硬件不够,程序就有可能直接挂了,咱们的入口服务器一般是不会垮掉的,因为入口服务器是不会不会处理数据请求的
2)通过一个队列来进行缓存请求,建立一个生产者消费者模型,此时即使网络这边过来一大波请求,这些请求只是冲击了队列服务器,对于后续的业务服务器,仍然是按照固定的速率来消费数据,阻塞队列是没有什么计算量的,就单纯的存个数据,就能抗住更大的压力
3)实际上,网管是不可以直接与各个服务器进行进行相连的,通过一个队列来实现生产者消费者模型
1)现在即使说现在网络上面过来了一大波请求,此时这些请求指示冲击了队列服务器,但是对于后面的业务服务器,任然是以固定的速率来进行消费数据,如果互联网这边的请求少了,后面的这些服务器也不会闲着,就会把之前队列积压的数据,来取出来进行处理
2)这里此时的请求的压力直接给到了阻塞队列这里面,此时针对我们的队列的请求是暴涨的,但是我们的阻塞队列没有做过多的计算,没啥计算量,就是单纯的存储一个数据,它是可以承受住一定的压力的;
咱们在实际开发中使用到的阻塞队列并不是一个简单的数据结构,而是一组专门的服务器程序,它所提供的功能也并不仅仅是阻塞队列的功能,还会在这上面的队列中增加新的功能,比如说对于数据持久化存储,支持多个数据通道,支持多节点容灾冗余备份,支持管理面板,方便于配置参数,这样的队列又起了一个新的名字,叫做消息队列,但是本质上还是阻塞队列的功能,kafak,mq就是业界常见使用的消息队列