一)Semphore:限流器用我就对了
Java中信号量Semphore是把操作系统原生的信号量封装了一下,本质就是一个计数器,描述了 可用资源的个数,主要涉及到两个操作
如果计数器为
0
了,继续Р操作,就会出现阻塞等待的情况
P
操作:申请一个可用资源,计数器-1V
操作:释放一个可用资源,计数器+1停车场门口有一个灯牌,会显示停车位还剩余多少个,每进去一辆车,显示的停车位数量就-1,就相当于进行了一次P操作,每出去一辆车, 显示的停车位数量就+1,就相当于进行了一次V操作,而当停车场的剩余车位为0时,显示的停车位数量就为0了;
1)创建
Semaphore
示例, 初始化为4
, 表示有4
个可用资源.
2)acquire
方法表示申请资源(P
操作),release
方法表示释放资源(V
操作)public class Main{ public static void main(String[] args) { Semaphore semaphore=new Semaphore(10); Runnable runnable=new Runnable() { @Override public void run() { System.out.println(Thread.currentThread().getName()+"开始申请资源"); try { semaphore.acquire(); System.out.println(Thread.currentThread().getName()+"已经获取到资源了"); semaphore.release(); System.out.println(Thread.currentThread().getName()+"开始释放资源了"); } catch (InterruptedException e) { throw new RuntimeException(e); } } }; for(int i=0;i<10;i++){ Thread t=new Thread(runnable); t.start(); } } }
public class Main{ public static int count=0; public static void main(String[] args) throws InterruptedException { Semaphore semaphore=new Semaphore(1); Thread t1=new Thread(()-> { for (int i = 0; i < 10000; i++) { try { semaphore.acquire(); count++; semaphore.release(); } catch (InterruptedException e) { throw new RuntimeException(e); } } }); Thread t2=new Thread(()->{ for(int i=0;i<10000;i++){ try { semaphore.acquire(); count++; semaphore.release(); } catch (InterruptedException e) { throw new RuntimeException(e); } }}); t1.start(); t2.start(); t1.join(); t2.join(); System.out.println(count); } }
基于Semphore可以实现限流器:
什么是限流:比如说某一个广场,他的日载流量是6W,那么如果说假设有一天来了10W人,但是只能进去6W人,这个时候就只能排队入园了,因为工作人员始终会将人数控制在6W人
咱们再从生活中的事例回到程序当中,假设一个程序只能为 10W 人提供服务,突然有一天因为某个热点事件,造成了系统短时间内的访问量迅速增加到了 50W,那么导致的直接结果是系统崩溃,任何人都不能用系统了,显然只有少人数能用远比所有人都不能用更符合预期,因此这个时候要使用限流了
Semphore本身是依靠计数器的思想来进行实现的,它可以控制对于共享资源的访问数量,当线程需要访问该资源的时候,他必须先进行获取一个许可,就是从计数器中获取到资源
当计数器本身大于0的时候,线程可以获取到这个可用资源并且能够继续执行
当计数器本身等于0的时候,线程将会被阻塞,直到有其他的线程释放资源
Semphore本身有两个重要的操作,acquire()和realse()操作
1)当线程需要访问共享资源的时候,它会调用acquire方法来获取资源,如果计数器的值大于0,那么acquire()方法会将计数器的值减1,并且允许线程继续运行,如果计数器的值等于0,那么acquire()方法会使得线程阻塞,知道有其他线程释放资源
2)当线程使用完成共享资源以后,该线程可以调用realse方法来释放资源,realse()方法会使得计数器的值+1,表示有一个资源可以使用,其他被阻塞的线程可以有机会获得可用资源并且+1;
关于公平模式和非公平模式:
在这里面所谓的公平模式就是说线程调用acquire的先后顺序来获取到这个可用资源的,公平模式遵循先进先出原则,所以非公平模式是抢占式的,也就是说有可能一个新的获取线程恰好在一个许可证释放以后得到了这个许可证,但是这个已经获取许可证的线程前面还存在着一些其他的线程,当然在这里面非公平模式的性能比较高;
假设说,当有时候需要等待某一些线程执行完成了之后,再来执行主线程的代码,此时应该怎么做呢?可能有人会说,简单,用 join() 方法等待线程执行完成之后再执行主线程就行了,当然,如果使用的是 Thread 来执行任务,那这种写法也是可行的。然而真实的(编码)环境中我们是不会使用 Thread 来执行多任务的,而是会使用线程池来执行多任务,这样可以避免线程重复启动和销毁所带来的性能开销;
二)CountDownLatch:别急,等人齐了在开团
撞线:调用latch.countDown()
比赛结束,统计成绩:latch.await(),只要还存在着有任意的一个选手不进行撞线,那么比赛就无法结束,只有说所有的选手比赛撞了线,那么最终的比赛才可以结束
public class Main { public static void main(String[] args) throws InterruptedException { CountDownLatch latch=new CountDownLatch(10); for(int i=0;i<10;i++){ Thread t=new Thread(()->{ System.out.println("线程"+Thread.currentThread().getName()+"开始起跑"); try { Thread.sleep(new Random().nextInt(10000)); System.out.println("线程"+Thread.currentThread().getName()+"开始撞线"); latch.countDown(); } catch (InterruptedException e) { throw new RuntimeException(e); } }); t.start(); } latch.await(); System.out.println("比赛完成"); } }
1)使用CountDownLatch可以实现等待所有任务执行完成以后再来执行主任务的功能,他就是类似于说好像比赛中等待所有运动员都完成比赛以后再来公布排名一样,当然咱们在大玩着荣耀的时候也是一样,只有说所有人集合完毕以后在开团
2)而CountDownLatch就是通过计数器来实现等待功能的,当创建CountDownLatch的时候会创建一个大于0的计数器,每一次调用countDown()方法的时候计数器的值会减1,直到计数器的值变成0以后,等待的任务就可以继续执行了
3)countDownLatch在底层实现的时候是依靠内部创建并维护了一个voltaile的计数器,当调用countDown()方法的时候,会尝试将整数计数器-1,CountDownLatch 在创建的时候需要传入一个整数,在这个整数“倒数”到 0 之前,主线程需要一直挂起等待,直到其他的线程都执行之后,主线才能继续执行
public static void main(String[] args) throws InterruptedException { //创建CountDownLatch实现两个计数器 CountDownLatch latch=new CountDownLatch(2); //创建线程池执行任务 ExecutorService service= Executors.newFixedThreadPool(2); service.submit(new Runnable() { @Override public void run() { System.out.println("我是线程池提交的第一个任务"); latch.countDown(); } }); service.submit(new Runnable() { @Override public void run() { System.out.println("我是线程池提交的第二个任务"); latch.countDown(); } }); latch.await(); System.out.println("线程池中的任务已经全部执行完成"); }
三)循环栅栏(Cycbarrier):人齐了老司机就可以发车了
循环栅栏实现一个可以循环利用的屏障
https://img-blog.csdnimg.cn/img_convert/f10e1adb034e3ebaa2c02b11596386ee.gif
1)CycliBarrier作用是让一组线程之间可以相互等待,当到达一个共同点的时候,所有之前等待的线程会冲破栅栏,一起向下执行
2)现在举个例子来说:伟哥要做末班车回家,公交车站的司机会等待车上面的所有乘客坐满以后再来发车,还有比如说王者荣耀,得等待5个队友游戏都加载完了才可以进入到游戏
3)本质上来说是让多个线程共同相互等待,知道说当所有的线程都到达了屏障点以后,之前的所有线程才可以继续向下执行,CycBarrier本身就象老司机开车一样,如果车上面还有空闲的座位,那么司机就得等着,只有说当作为坐满以后,老司机才发车
public static void main(String[] args) { CyclicBarrier barrier=new CyclicBarrier(10, new Runnable() { @Override public void run() { System.out.println("现在司机上面的人都到齐了开始进行发车"); System.out.println("当前线程池中的任务都已经执行完成了"); } }); ExecutorService service=Executors.newFixedThreadPool(10); for(int i=0;i<10;i++){ service.submit(new Runnable() { @Override public void run() { try { Thread.sleep(new Random().nextInt(5000)); System.out.println("当前乘客开始上车"+Thread.currentThread().getName()); barrier.await();//当前判断线程池中的任务执行完成可以执行多次 System.out.println("当前线程下车"+Thread.currentThread().getName()); } catch (InterruptedException e) { throw new RuntimeException(e); } catch (BrokenBarrierException e) { throw new RuntimeException(e); } } }); } } }
在CycliBarrier底层是基于计数器来实现的,当count不为0的时候,每一个线程在到达屏障点以后会先进行调用await()方法将自己阻塞,此时计数器会减1,此时这个线程会阻塞在这个屏障处,当循环栅栏的计数器被减为0的时候,所有调用await()的线程就会被唤醒,就会冲破栅栏,一起执行,CountDownLatch和CycliBarrier在底层都是依靠计数器来实现的,但是CountDownLatch只能使用一次,但是CycliBarrier却可以使用多次,这就是两者最大的区别
总结:CycliBarrier在底层是依靠ReentranLock来实现计数器的原子性更新的,CycliBarrier最常使用的就是await()方法,使用该方法就会将计数器的值减1,并判断当前的计数器是否为0,如果不是0就阻塞等待,并且当计数器变成0以后,该线程也就是阻塞在循环栅栏的线程才可以继续执行剩余任务;
三)线程池的状态:
1)Running状态:运行状态,线程池创建完成以后就进入到这个状态,如果不手动调用关闭方法,那么线程池在整个程序运行过程中都是这个状态;
2)ShutDown状态:关闭状态,线程池本身不再接受新任务的提交,但是会有先将线程池中已经存在的任务处理完成
3)Stop停止状态:不再接受新任务的提交,并且会中断正在执行的任务,放弃任务队列中已经存在的任务
4)tidying状态:整理状态,所有的任务都执行完成以后,也包括任务队列中的任务执行完成,当前线程池中的活动线程数降为0的状态,到达此状态以后会调用线程池的terminated方法
5)terminated状态:销毁状态,当调用线程池的terminated方法以后会进入到这个状态
ThreadPoolExecutor executor=new ThreadPoolExecutor(10, 10, 100, TimeUnit.SECONDS, new LinkedBlockingDeque<>(100), new ThreadFactory() { @Override public Thread newThread(Runnable r) { return new Thread(r); } }){ @Override protected void terminated() { super.terminated(); System.out.println("线程池终止"); } };
1)当进行调用shutDown方法的时候,线程池中的状态会由Running状态到达shutDown状态,最后在到达tidying状态,最后到达terminated状态
2)当进行调用shutDownNow方法的时候,线程池中的状态会由running状态到达stop状态,最后在到达tidying状态,最后到达terminated状态
3)进行调用terminated方法,线程池会直接从tidying状态到达terminated状态,可以在阻塞队列的时候重写此方法,默认来说这个方法是空的
四)如何判断线程池中的任务都已经执行完成了?
1)在很多场景下,都希望等待线程池中的所有任务都执行完,然后再来执行下一步操作,对于Thread类来说,这样的实现是很简单的,加上一个join方法就解决了,但是对于线程池的判断就比较麻烦了
2)从上面的执行结果可以看出来,程序先打印了任务执行完成,再来继续打印并执行线程池的任务,这种执行顺序混乱的结果不是我们想要看到的,我们期望的结果就是等到鲜橙汁中的所有任务都执行完成了,再来进行打印线程池执行完成的信息;
3)产生少数问题的原因就是主线程main和线程池是并发执行的,所以说当线程池还没有执行完main现成的打印结果就已经执行了,想要解决这个问题就需要在打印结果之前,先判断线程池中的任务是否已经执行完成,如果没有执行完成就等到任务执行完成再来打印结果
public static void main(String[] args) { ExecutorService service=Executors.newFixedThreadPool(10); for(int i=0;i<10;i++){ service.submit(new Runnable() { @Override public void run() { System.out.println("开始执行线程池中的任务"); } }); } System.out.println("线程池中的所有任务执行完成"); }
1)使用isTerminated()方法来判断:
1)使用线程池的终止状态来进行判断线程池中的任务是否已经全部执行完成,但是如果想要让线程之中的状态改变就需要调用shutDown()方法,不然线程池会一直处于Running运行状态那么就没有办法来进行判断是否处于终止状态来判断线程池中的任务是否已经全部执行
2)shutdown方法是启动线程池有序关闭的方法,它在关闭之前会执行完成所有已经提交的任务,并且不会再进行接收新的任务,当线程池中的所有任务都执行完成以后,线程池就处于终止状态了,此时isTerminated()方法返回的结果也就是true了;
缺点:需要关闭线程池
ThreadPoolExecutor executor=new ThreadPoolExecutor(10, 10, 100, TimeUnit.SECONDS, new LinkedBlockingDeque<>(100), new ThreadFactory() { @Override public Thread newThread(Runnable r) { return new Thread(r); } }){ @Override protected void terminated() { super.terminated(); System.out.println("线程池终止"); } }; executor.submit(new Runnable() { @Override public void run() { System.out.println("执行任务1"); } }); executor.submit(new Runnable() { @Override public void run() { System.out.println("执行任务2"); } }); executor.shutdown(); while(!executor.isTerminated()){ } System.out.println("线程池中的任务已经执行完成了");
2)判断getCompletedTaskCount和getTaskCount是否相等
getTaskCount()返回执行计划任务的总数,但是因为本身任务和线程的状态都在不断地发生变化,因此返回的值是一个近似值;
getCompetedTaskCount()返回完成执行的任务总数,但是因为本身任务和线程的状态都在不断地发生变化,因此返回的值是一个近似值,但是在连续的调用中并不会减少
虽然不需要关闭线程池,但是可能会造成一定的误差
3)调用countDownLatch和CycliBarrier
需要注意的是countDownLatch中的countDown()方法和CycliBarrier中的await()方法需要在线程池的run方法的最后调用
4)使用FutureTask
FutureTask中的优势就是判断比较精准,调用每一个线程的FutureTask的get方法就是等待该任务执行完成的,需要使用submit进行提交:
public static void main(String[] args) throws ExecutionException, InterruptedException { ThreadPoolExecutor executor=new ThreadPoolExecutor(10,10,0,TimeUnit.SECONDS,new LinkedBlockingDeque<>(100)); FutureTask<Integer> task1=new FutureTask<>(new Callable<Integer>() { @Override public Integer call() throws Exception { int a=10; a++; System.out.println("a++完成"); return a; } }); FutureTask<Integer> task2=new FutureTask<>(new Callable<Integer>() { @Override public Integer call() throws Exception { int b=11; b++; System.out.println("b++完成"); return b; } }); executor.submit(task1); executor.submit(task2); Integer result1= task1.get(); Integer result2=task2.get(); System.out.println("线程池中的任务都已经执行完成"); }
五)submit和execute的区别:
1)接收到的参数不同:submit方法只能接受到runnable接口的任务,但是submit方法及可以接受到runnable方法的任务,也可以接收到callable,futureTask类型的任务,前者没有返回值,后者可以后返回值;
2)execute()的返回值是void,线程提交后不能得到线程的返回值,submit()的返回值是Future,通过Future的get()方法可以获取到线程执行的返回值,get()方法是同步的,执行get()方法时,如果线程还没执行完,会同步等待,直到线程执行完成
注意:虽然submit()方法可以提交Runnable类型的参数,但执行Future方法的get()时,线程执行完会返回null,不会有实际的返回值,这是因为Runable本来就没有返回值