文章目录
- 1.概念
- 1.1 什么是JUC
- 1.2 线程与进程
- 1.3 线程的几种状态
- 1.4 守护线程
- 1.5 死锁与活锁
- 1.6 乐观锁与悲观锁
- 1.7 自旋锁
- 2.Lock
- 2.1 使用Lock
- 2.2 Lock与Synchronized
- 2.3 虚假唤醒
- 3.八锁问题
- 3.1 创建一个Phone实例多线程调用两个方法
- 3.2 创建一个Phone实例多线程调用两个方法,其中第一个线程调用的方法中加延迟
- 3.3 创建一个Phone实例多线程调用两个方法,其中一个是普通方法,而且该线程位置靠后
- 3.4 创建两个Phone实例多线程调用两个方法
- 3.5 创建一个Phone实例多线程调用两个方法,两个方法都有static修饰
- 3.6 创建两个Phone实例多线程调用两个方法,两个方法都有static修饰
- 3.7 创建一个Phone实例多线程调用两个方法,其中一个有static修饰,而且调用线程在前面
- 3.8 创建两个Phone实例多线程调用两个方法,其中一个有static修饰,而且调用线程在前面
- 4.线程安全集合
- 5.Callable
- 6.三大辅助工具类
- 6.1 CountDownLatch
- 6.2 CyclicBarrier
- 6.3 Semaphore
- 7.读写锁
- 8.阻塞队列与同步队列
- 8.1 阻塞队列
- 8.2 同步队列
- 9.线程池
- 9.1 三大方法
- 9.2 七大参数
- 9.3 四种拒绝策略
- 10.四大函数式接口
- 11.ForkJoin
- 12.异步回调
- 12.1 没有返回值的runAsync异步回调
- 12.2 有返回值的supplyAsync异步回调
- 13.Volatile
- 13.1 保证可见性
- 13.2 不保证原子性
- 13.3 避免指令重排
- 14.CAS
- 15.Condition
1.概念
1.1 什么是JUC
-
JUC是java.util.concurrent包的缩写
-
主要涉及的三个类
java.util.concurrent、 java.util.concurrent.automic、 java.util.concurrent.locks
这几个包下基本都是关于高并发的内容。也就是我们常说的多线程。
1.2 线程与进程
如果学过操作系统的话,这个应该都了解过。
线程是最小的调度单位,进程是最小的资源分配单位。
举个例子来说:
- 进程:一个程序,QQ.EXE Music.EXE;数据+代码+pcb
- 线程:开了一个进程Typora,写字,等待几分钟会进行自动保存(线程负责的)
不过我们必须知道,Java并没有权限去开启线程,操作硬件,如果我们跟进源码的话,可以发现,java的底层是使用native标识的方法来开启线程,其实是使用了C++来开启线程。
1.3 线程的几种状态
public enum State {
/**
* Thread state for a thread which has not yet started.
*/
//运行
NEW,
/**
* Thread state for a runnable thread. A thread in the runnable
* state is executing in the Java virtual machine but it may
* be waiting for other resources from the operating system
* such as processor.
*/
//运行
RUNNABLE,
/**
* Thread state for a thread blocked waiting for a monitor lock.
* A thread in the blocked state is waiting for a monitor lock
* to enter a synchronized block/method or
* reenter a synchronized block/method after calling
* {@link Object#wait() Object.wait}.
*/
//阻塞
BLOCKED,
/**
* Thread state for a waiting thread.
* A thread is in the waiting state due to calling one of the
* following methods:
* <ul>
* <li>{@link Object#wait() Object.wait} with no timeout</li>
* <li>{@link #join() Thread.join} with no timeout</li>
* <li>{@link LockSupport#park() LockSupport.park}</li>
* </ul>
*
* <p>A thread in the waiting state is waiting for another thread to
* perform a particular action.
*
* For example, a thread that has called <tt>Object.wait()</tt>
* on an object is waiting for another thread to call
* <tt>Object.notify()</tt> or <tt>Object.notifyAll()</tt> on
* that object. A thread that has called <tt>Thread.join()</tt>
* is waiting for a specified thread to terminate.
*/
//等待
WAITING,
/**
* Thread state for a waiting thread with a specified waiting time.
* A thread is in the timed waiting state due to calling one of
* the following methods with a specified positive waiting time:
* <ul>
* <li>{@link #sleep Thread.sleep}</li>
* <li>{@link Object#wait(long) Object.wait} with timeout</li>
* <li>{@link #join(long) Thread.join} with timeout</li>
* <li>{@link LockSupport#parkNanos LockSupport.parkNanos}</li>
* <li>{@link LockSupport#parkUntil LockSupport.parkUntil}</li>
* </ul>
*/
//超时等待
TIMED_WAITING,
/**
* Thread state for a terminated thread.
* The thread has completed execution.
*/
//终止
TERMINATED;
}
这里的阻塞实际上与等待状态没多大区别。
1.4 守护线程
我们可以使用setDaemon()
方法将一个普通线程转换成守护线程。
守护线程目的是为了给其他线程提供服务,比如GC垃圾回收线程。如果只剩下守护线程的时候,JVM就会退出。
1.5 死锁与活锁
- 死锁:是指两个或两个以上的进程(或线程)在执行过程中,因争夺资源而造成的一种互相等待的现象,若无外力作用,它们都将无法推进下去。此时称系统处于死锁状态或系统产生了死锁,这些永远在互相等待的进程称为死锁进程。
- 活锁:线程A和B都需要过桥(都需要使用进程),而都礼让不走(那到的系统优先级相同,都认为不是自己优先级高),就这么僵持下去.(很绅士,互相谦让)
1.6 乐观锁与悲观锁
- 悲观锁(Pessimistic Lock), 顾名思义,就是很悲观,每次去拿数据的时候都认为别人会修改,所以每次在拿数据的时候都会上锁,这样别人想拿这个数据就会block直到它拿到锁。传统的关系型数据库里边就用到了很多这种锁机制,比如行锁,表锁等,读锁,写锁等,都是在做操作之前先上锁。
- 乐观锁(Optimistic Lock), 顾名思义,就是很乐观,每次去拿数据的时候都认为别人不会修改,所以不会上锁,但是在更新的时候会判断一下在此期间别人有没有去更新这个数据,可以使用版本号等机制。乐观锁适用于多读的应用类型,这样可以提高吞吐量,像数据库如果提供类似于write_condition机制的其实都是提供的乐观锁。
两种锁各有优缺点,不可认为一种好于另一种,像乐观锁适用于写比较少的情况下,即冲突真的很少发生的时候,这样可以省去了锁的开销,加大了系统的整个吞吐量。但如果经常产生冲突,上层应用会不断的进行retry,这样反倒是降低了性能,所以这种情况下用悲观锁就比较合适。
1.7 自旋锁
自旋锁其实就是循环等待的锁。
public final int getAndAddInt(Object var1, long var2, int var4) {
int var5;
do {
var5 = this.getIntVolatile(var1, var2);
} while(!this.compareAndSwapInt(var1, var2, var5, var5 + var4));
return var5;
}
2.Lock
JUC包下的Lock接口所有已知实现类:
- ReentrantLock(可重入式锁)
- ReentrantReadWriteLock.ReadLock(可重入式读锁)
- ReentrantReadWriteLock.WriteLock(可重入式写锁)
根据公平性,我们还可以将锁分为公平锁和非公平锁。所谓的公平锁就类似FIFO,不允许插队,非公平锁就是可以插队。
公平锁会严重影响性能,一般都是使用非公平锁。
如果跟进ReentrantLock的源码我们可以发现,可重入式锁默认都是生成非公平锁,我们也可以手动设置参数生成公平锁。
2.1 使用Lock
public class SaleTicketDemo02 {
public static void main(String[] args) {
//多线程操作
//并发:多线程操作同一个资源类,把资源类丢入线程
Ticket2 ticket = new Ticket2();
new Thread(()->{for(int i=0;i<40;i++) ticket.sale(); },"A").start();
new Thread(()->{for(int i=0;i<40;i++) ticket.sale(); },"B").start();
new Thread(()->{for(int i=0;i<40;i++) ticket.sale(); },"C").start();
}
}
//lock三部曲
//1、 Lock lock=new ReentrantLock();
//2、 lock.lock() 加锁
//3、 finally=> 解锁:lock.unlock();
class Ticket2{
private int number=50;
Lock lock=new ReentrantLock();
//卖票的方式
// 使用Lock 锁
public void sale(){
//加锁
lock.lock();
try {
//业务代码
if(number>=0){
System.out.println(Thread.currentThread().getName()+" 卖出了第"+number+" 张票,剩余:"+number+" 张票");
number--;
}
}catch (Exception e) {
e.printStackTrace();
}
finally {
//解锁
lock.unlock();
}
}
}
2.2 Lock与Synchronized
-
Synchronized 内置的Java关键字,Lock是一个Java类
-
Synchronized 无法判断获取锁的状态,Lock可以判断
-
Synchronized 会自动释放锁,lock必须要手动加锁和手动释放锁!可能会遇到死锁
-
Synchronized 线程1(获得锁->阻塞)、线程2(等待);lock就不一定会一直等待下去,lock会有一个trylock去尝试获取锁,不会造成长久的等待。
-
Synchronized 是可重入锁,不可以中断的,公平的;Lock,可重入的,可以判断锁,并且可以自己设置公平锁和非公平锁,默认是非公平锁;
-
Synchronized 适合锁少量的代码同步问题,Lock适合锁大量的同步代码;
2.3 虚假唤醒
先展示什么样的代码会造成虚假唤醒:
//定义Resource作为线程需要的资源
public class Resource {
//当前资源的数量
int num=0;
//当前资源的上限
int size=1;
//消费资源
public synchronized void remove(){
if(num==0){
try {
System.out.println("消费者等待");
wait();
} catch (InterruptedException e) {
e.printStackTrace();
}
}
num--;
System.out.println("消费者线程为:"+Thread.currentThread().getName()+"资源数量"+num);
notifyAll();
}
//生产资源
public synchronized void put(){
if(num==size){
try {
System.out.println("生产者等待");
wait();
} catch (InterruptedException e) {
e.printStackTrace();
}
}
num++;
System.out.println("生产者线程为:"+Thread.currentThread().getName()+"资源数量"+num);
notifyAll();
}
}
首先我先用文字描述一下。
理论情况下,正常的执行顺序应该是生产者线程1/2将size+1,然后消费者线程1/2将size-1,size不会超过1。但是如果产生虚假唤醒,那么就会出现size大于1的情况发生。
假如我们有两个消费者线程,两个生产者线程。然后我们的生产者线程1优先启动,size加1以后,唤醒其他所有线程,假如生产者线程2启动,他会判断size==1然后发生阻塞,消费者线程任意一个启动后判断资源大于1,消耗资源然后就会唤醒其他所有线程,这个时候生产者线程1直接将资源+1,然后同时生产者线程2会从阻塞的地方执行,他不会判断资源数是否已经等于1,直接进行size+1结果就是资源数大于1。这个时候就发生了虚假唤醒。
解决办法就是将if(num == size)换成while(num == size)。
如果还是看不懂可以去看这个老哥的博客,图文并茂。https://juejin.cn/post/7001511752131149832
3.八锁问题
初始场景
- Phone资源类有
void sendMessage() 、void call()
两个方法 - main方法开启 多线程场景 进行测试,8种情况可以提出8个问题,即8锁问题
- JUC下线程sleep方法:
timeUnit.SECONDS.sleep(1);
package eight_locks;
public class Demo1 {
public static void main(String[] args) {
}
}
class Phone {
public synchronized void sendMessage(){
System.out.println("发短信");
}
public synchronized void call(){
System.out.println("打电话");
}
}
3.1 创建一个Phone实例多线程调用两个方法
package eight_locks;
import java.util.concurrent.TimeUnit;
public class Demo1 {
public static void main(String[] args) {
Phone p = new Phone();
// 发短信
new Thread(()->{
p.sendMessage();
}).start();
// JUC下的线程延时方法
try {
TimeUnit.SECONDS.sleep(1);
} catch (InterruptedException e) {
e.printStackTrace();
}
// 打电话
new Thread(()->{
p.call();
}).start();
}
}
class Phone {
public synchronized void sendMessage(){
System.out.println("发短信");
}
public synchronized void call(){
System.out.println("打电话");
}
}
输出:
发短信
打电话
3.2 创建一个Phone实例多线程调用两个方法,其中第一个线程调用的方法中加延迟
package eight_locks;
import java.util.concurrent.TimeUnit;
public class Demo1 {
public static void main(String[] args) {
Phone p = new Phone();
// 发短信
new Thread(()->{
p.sendMessage();
}).start();
// JUC下的线程延时方法
try {
TimeUnit.SECONDS.sleep(1);
} catch (InterruptedException e) {
e.printStackTrace();
}
// 打电话
new Thread(()->{
p.call();
}).start();
}
}
class Phone {
public synchronized void sendMessage(){
// JUC下的线程延时方法
try {
TimeUnit.SECONDS.sleep(4);
} catch (InterruptedException e) {
e.printStackTrace();
}
System.out.println("发短信");
}
public synchronized void call(){
System.out.println("打电话");
}
}
输出:
发短信
打电话
Phone是一个资源类,谁先拿到这个锁,谁就先执行,如果某一个线程就算代码里面休眠5s,那么他并没有执行完,锁也不会释放。所以第二个线程就是要等到第一个线程执行完释放锁才能够执行。
3.3 创建一个Phone实例多线程调用两个方法,其中一个是普通方法,而且该线程位置靠后
package eight_locks;
import java.util.concurrent.TimeUnit;
public class Demo3 {
public static void main(String[] args) {
Phone3 p = new Phone3();
// 发短信
new Thread(()->{
p.sendMessage();
}).start();
// JUC下的线程延时方法
try {
TimeUnit.SECONDS.sleep(1);
} catch (InterruptedException e) {
e.printStackTrace();
}
// 打电话
new Thread(()->{
p.watchMovie();
}).start();
}
}
class Phone3 {
public synchronized void sendMessage(){
// JUC下的线程延时方法
try {
TimeUnit.SECONDS.sleep(3);
} catch (InterruptedException e) {
e.printStackTrace();
}
System.out.println("发短信");
}
public synchronized void call(){
System.out.println("打电话");
}
public void watchMovie(){
System.out.println("看电影");
}
}
输出:
看电影
发短信
这里的watchMovie()方法,是普通方法,所以并不会受到锁的影响,不过这里发短信在看电影之后主要还是因为在sendMessage方法中,线程休眠了3s。
3.4 创建两个Phone实例多线程调用两个方法
package eight_locks;
import java.util.concurrent.TimeUnit;
public class Demo4 {
public static void main(String[] args) {
Phone4 one = new Phone4();
Phone4 two = new Phone4();
// 发短信
new Thread(()->{
one.sendMessage();
}).start();
// JUC下的线程延时方法
try {
TimeUnit.SECONDS.sleep(1);
} catch (InterruptedException e) {
e.printStackTrace();
}
// 打电话
new Thread(()->{
two.call();
}).start();
}
}
class Phone4 {
public synchronized void sendMessage(){
// JUC下的线程延时方法
try {
TimeUnit.SECONDS.sleep(3);
} catch (InterruptedException e) {
e.printStackTrace();
}
System.out.println("发短信");
}
public synchronized void call(){
System.out.println("打电话");
}
}
输出:
打电话
发短信
这里创建了两个Phone对象,每个Phone对象都拥有一把锁,所以两个线程间并不影响。这里打电话优先输出也是因为发短信方法中让线程休眠了。
3.5 创建一个Phone实例多线程调用两个方法,两个方法都有static修饰
package eight_locks;
import java.util.concurrent.TimeUnit;
public class Demo5 {
public static void main(String[] args) {
Phone5 one = new Phone5();
// 发短信
new Thread(()->{
one.sendMessage();
}).start();
// JUC下的线程延时方法
try {
TimeUnit.SECONDS.sleep(1);
} catch (InterruptedException e) {
e.printStackTrace();
}
// 打电话
new Thread(()->{
one.call();
}).start();
}
}
class Phone5 {
public synchronized static void sendMessage(){
// JUC下的线程延时方法
try {
TimeUnit.SECONDS.sleep(3);
} catch (InterruptedException e) {
e.printStackTrace();
}
System.out.println("发短信");
}
public synchronized static void call(){
System.out.println("打电话");
}
}
输出:
发短信
打电话
这里我们要注意,如果一个方法使用synchronized和static修饰,那么这里的synchronized关键字锁的就是Class。比如这里锁的就是Phone.class。所以这里同样的第二个线程要等第一个线程释放锁以后才能执行。
3.6 创建两个Phone实例多线程调用两个方法,两个方法都有static修饰
package eight_locks;
import java.util.concurrent.TimeUnit;
public class Demo6 {
public static void main(String[] args) {
Phone6 one = new Phone6();
Phone6 two = new Phone6();
// 发短信
new Thread(()->{
one.sendMessage();
}).start();
// JUC下的线程延时方法
try {
TimeUnit.SECONDS.sleep(1);
} catch (InterruptedException e) {
e.printStackTrace();
}
// 打电话
new Thread(()->{
two.call();
}).start();
}
}
class Phone6 {
public synchronized static void sendMessage(){
// JUC下的线程延时方法
try {
TimeUnit.SECONDS.sleep(3);
} catch (InterruptedException e) {
e.printStackTrace();
}
System.out.println("发短信");
}
public synchronized static void call(){
System.out.println("打电话");
}
}
输出:
发短信
打电话
这里虽然生成了两个Phone对象,但是synchronized加static关键字锁的是Class,两个Phone对象锁的是一个Class模板,所以同上,第二个线程要等第一个线程释放class的锁才能执行。
3.7 创建一个Phone实例多线程调用两个方法,其中一个有static修饰,而且调用线程在前面
package eight_locks;
import java.util.concurrent.TimeUnit;
public class Demo7 {
public static void main(String[] args) {
Phone7 one = new Phone7();
// 发短信
new Thread(()->{
one.sendMessage();
}).start();
// JUC下的线程延时方法
try {
TimeUnit.SECONDS.sleep(1);
} catch (InterruptedException e) {
e.printStackTrace();
}
// 打电话
new Thread(()->{
one.call();
}).start();
}
}
class Phone7 {
public synchronized static void sendMessage(){
// JUC下的线程延时方法
try {
TimeUnit.SECONDS.sleep(3);
} catch (InterruptedException e) {
e.printStackTrace();
}
System.out.println("发短信");
}
public synchronized void call(){
System.out.println("打电话");
}
}
输出:
打电话
发短信
这里打电话先输出的原因是 Phone实例 和 Phone.Class 分别被锁,两个线程之间并无影响,因为线程延迟的原因所以打电话先输出。
3.8 创建两个Phone实例多线程调用两个方法,其中一个有static修饰,而且调用线程在前面
package eight_locks;
import java.util.concurrent.TimeUnit;
public class Demo7 {
public static void main(String[] args) {
Phone8 one = new Phone8();
Phone8 two = new Phone8();
// 发短信
new Thread(()->{
one.sendMessage();
}).start();
// JUC下的线程延时方法
try {
TimeUnit.SECONDS.sleep(1);
} catch (InterruptedException e) {
e.printStackTrace();
}
// 打电话
new Thread(()->{
two.call();
}).start();
}
}
class Phone7 {
public synchronized static void sendMessage(){
// JUC下的线程延时方法
try {
TimeUnit.SECONDS.sleep(3);
} catch (InterruptedException e) {
e.printStackTrace();
}
System.out.println("发短信");
}
public synchronized void call(){
System.out.println("打电话");
}
}
输出:
打电话
发短信
原理同7。两个线程分别锁的是 Phone8.Class 和 Phone8实例,因为线程延迟,打电话 才会优先输出。
4.线程安全集合
多线程操作常见的集合时,可能会因为两个或者多个线程插入数据破坏原本集合的数据结构。
我们可以使用JUC包下的线程安全的集合:
ConcurrentSkipListMap
ConcurrentSkipListSet
CopyOnWriteArrayList
CopyOnWriteArraySet
5.Callable
- Interface Callable
Callable
接口类似于Runnable
,因为它们都是为其实例可能由另一个线程执行的类设计的。 然而,Runnable
不返回结果,也不能抛出被检查的异常。 这里的泛型V用来指定返回数据的类型。
- Class FutureTask
在实际操作中我们都是使用FutureTask
结合Callable使用。因为Thread的入参是Runable接口,而FutureTask实现了RunnableFuture<V>
接口,RunnableFuture<V>
接口实现了Future
接口和Runable
接口。这里的V要和Callable保持一致,都是返回值类型。
具体使用我就不在这里赘述了,就是资源类实现Callable接口,然后将资源类对象作为参数放进FutureTask的构造方法,最后将FutureTask作为参数放进Thead()就好了。返回值使用FutureTask类的get方法就可以获得。
6.三大辅助工具类
6.1 CountDownLatch
-
允许一个或多个线程等待直到在其他线程中执行的一组操作完成的同步辅助。简单理解就是计数,一个场景就是全部线程执行完毕之后才执行后边的代码,常用于必须执行的任务。
-
CountDownLatch用给定的计数初始化。 await方法阻塞,直到由于countDown()方法的调用而导致当前计数达到零,之后所有等待线程被释放,并且任何后续的await 调用立即返回。 这是一个一次性的现象 - 计数无法重置。 如果您需要重置计数的版本,请考虑使用CyclicBarrier 。
-
CountDownLatch是一种通用的同步工具,可用于多种用途。 一个CountDownLatch为一个计数的CountDownLatch用作一个简单的开/关锁存器,或者门:所有线程调用await在门口等待,直到被调用countDown()的线程打开。 一个CountDownLatch初始化N可以用来做一个线程等待,直到N个线程完成某项操作,或某些动作已经完成N次。
-
CountDownLatch一个有用的属性是,它不要求调用countDown线程等待计数到达零之前继续,它只是阻止任何线程通过await ,直到所有线程结束可以通过。
上面这一段如果你看不懂的话,或者太长了不想看直接看代码吧,挺好理解的。
package three_help_classes;
import java.util.concurrent.CountDownLatch;
public class Demo1_CountDownLatch {
public static void main(String[] args) throws InterruptedException {
// 创建计数器类
CountDownLatch countDownLatch = new CountDownLatch(4);
for (int i = 0; i < 4; i++) {
new Thread(()-> {
System.out.println(Thread.currentThread().getName() +"===>执行完毕");
countDownLatch.countDown();
},String.valueOf(i)).start();
}
// 上米面的代码只是开启线程,当上面线程中的业务代码全部执行完毕,才执行性下面代码
countDownLatch.await();
System.out.println("我是最后执行的~");
}
}
输出:
1===>执行完毕
2===>执行完毕
3===>执行完毕
0===>执行完毕
我是最后执行的~
6.2 CyclicBarrier
- 允许一组线程全部等待彼此达到共同屏障点的同步辅助。 循环阻塞在涉及固定大小的线程方的程序中很有用,这些线程必须偶尔等待彼此。 屏障被称为循环 ,因为它可以在等待的线程被释放之后重新使用。
- public CyclicBarrier(int parties, Runnable barrierAction) 绑定达到技计数目标的任务
- public CyclicBarrier(int parties)简单理解就是 达到指定目标个数线程执行完毕,才执行绑定的任务
package three_help_classes;
import java.util.concurrent.BrokenBarrierException;
import java.util.concurrent.CyclicBarrier;
public class Demo2_CyclicBarrier {
public static void main(String[] args) {
/**
* 构造函数有两个
* - public CyclicBarrier(int parties, Runnable barrierAction) 绑定达到技计数目标的任务
* - public CyclicBarrier(int parties)
*/
CyclicBarrier cyclicBarrier = new CyclicBarrier(6,()->{
System.out.println("6个线程全部执行完才输出!!");
});
for (int i = 1; i <= 8; i++) {
final int temp = i;
new Thread(()->{
System.out.println(Thread.currentThread().getName() + "===>执行完毕~~");
try {
// 等待设置目标线程数达标,执行绑定的方法
cyclicBarrier.await();
} catch (InterruptedException e) {
e.printStackTrace();
} catch (BrokenBarrierException e) {
e.printStackTrace();
}
},String.valueOf(temp)).start();
}
}
}
6.3 Semaphore
- 一个计数信号量。 在概念上,信号量维持一组许可证。 如果有必要,每个acquire()都会阻塞,直到许可证可用,然后才能使用它。 每个release()添加许可证,潜在地释放阻塞获取方。 但是,没有使用实际的许可证对象;Semaphore只保留可用数量的计数,并相应地执行。
- 简单理解类似限流(有限资源保持有秩序)思想,假如只有3个停车位,6辆车,无空车位之前其他三辆车需要等待。
package three_help_classes;
import java.util.Timer;
import java.util.concurrent.Semaphore;
import java.util.concurrent.TimeUnit;
public class Demo3_Semaphore {
public static void main(String[] args) {
// 可以理解为只有4个车位
Semaphore semaphore = new Semaphore(4);
for (int i = 1; i <= 10; i++) {
new Thread(()->{
try {
// 占位
semaphore.acquire();
System.out.println(Thread.currentThread().getName() + "===>拿到了车位!");
// 停车时间
TimeUnit.SECONDS.sleep(3);
// 离开车位
System.out.println(Thread.currentThread().getName() + "====>离开了!");
} catch (InterruptedException e) {
e.printStackTrace();
}finally {
// 最后告诉等待的人有1个空位了
semaphore.release();
}
},String.valueOf(i)).start();
}
}
}
输出:
1===>拿到了车位!
4===>拿到了车位!
3===>拿到了车位!
2===>拿到了车位!
3====>离开了!
2====>离开了!
4====>离开了!
1====>离开了!
6===>拿到了车位!
5===>拿到了车位!
7===>拿到了车位!
8===>拿到了车位!
8====>离开了!
7====>离开了!
5====>离开了!
6====>离开了!
10===>拿到了车位!
9===>拿到了车位!
9====>离开了!
10====>离开了!
7.读写锁
ReentrantReadWriteLock
是ReentrantLock
的进一步改进,他将锁更细分为读锁和写锁,粒度更细。如果是关于读写操作的话,使用读写锁具有更高的性能。
如果我们的程序中包含了读写操作,我们有没有使用锁之类的东西的话,很容易造成数据不可靠的问题。
所以我们引入ReadWriteLock 。
- public interface ReadWriteLock
ReentrantReadWriteLock实现了这个接口。
ReadWriteLock
维护一对关联的locks
,一个用于只读操作,一个用于写入。readlock
可以由多个阅读器线程同时进行,只要没有作者。writelock
是独家的。
更多信息可以自行查看官方api文档,下面展示一下使用。
package com.ogj.rw;
import java.util.HashMap;
import java.util.Map;
import java.util.concurrent.locks.Lock;
import java.util.concurrent.locks.ReadWriteLock;
import java.util.concurrent.locks.ReentrantLock;
import java.util.concurrent.locks.ReentrantReadWriteLock;
public class ReadWriteLockDemo {
public static void main(String[] args) {
MyCache_ReadWriteLock mycache = new MyCache_ReadWriteLock();
//开启5个线程 写入数据
for (int i = 1; i <=5 ; i++) {
int finalI = i;
new Thread(()->{
mycache.put(String.valueOf(finalI),String.valueOf(finalI));
}).start();
}
//开启10个线程去读取数据
for (int i = 1; i <=10 ; i++) {
int finalI = i;
new Thread(()->{
String o = mycache.get(String.valueOf(finalI));
}).start();
}
}
}
class MyCache_ReadWriteLock{
private volatile Map<String,String> map=new HashMap<>();
//使用读写锁
private ReadWriteLock readWriteLock=new ReentrantReadWriteLock();
//普通锁
private Lock lock=new ReentrantLock();
public void put(String key,String value){
//加锁
readWriteLock.writeLock().lock();
try {
//写入
//业务流程
System.out.println(Thread.currentThread().getName()+" 线程 开始写入");
map.put(key, value);
System.out.println(Thread.currentThread().getName()+" 线程 写入OK");
} catch (Exception e) {
e.printStackTrace();
} finally {
readWriteLock.writeLock().unlock(); //解锁
}
}
public String get(String key){
//加锁
String o="";
readWriteLock.readLock().lock();
try {
//得到
System.out.println(Thread.currentThread().getName()+" 线程 开始读取");
o = map.get(key);
System.out.println(Thread.currentThread().getName()+" 线程 读取OK");
} catch (Exception e) {
e.printStackTrace();
} finally {
readWriteLock.readLock().unlock();
}
return o;
}
}
8.阻塞队列与同步队列
8.1 阻塞队列
-
Interface BlockingQueue
-
父接口:
All Superinterfaces Collection <E>, Iterable <E>, Queue <E>
-
实现接口:
All Known Subinterfaces:BlockingDeque <E>, TransferQueue <E>
-
实现子类: ArrayBlockingQueue , DelayQueue , LinkedBlockingDeque , LinkedBlockingQueue , LinkedTransferQueue , PriorityBlockingQueue , SynchronousQueue
阻塞队列一般都是使用于线程池。
基本的操作有:
方式 | 抛出异常 | 不会抛出异常,有返回值 | 阻塞 等待 | 超时 等待 |
---|---|---|---|---|
添加 | add | offer | put | offer(timenum,timeUnit) |
移除 | remove | poll | take | poll(timenum,timeUnit) |
判断队列首 | element | peek | - | - |
8.2 同步队列
- public class SynchronousQueue
- 实现的父接口:
All Implemented Interfaces: Serializable , Iterable <E>, Collection <E>, BlockingQueue <E>, Queue <E>
同步队列没有任何内部容量,甚至没有一个容量。 简单理解就是入队一个元素必须出队之后其他元素才能接着入队。操作同阻塞队列。
9.线程池
池化技术
- 程序的运行会占用系统的资源,使用池化技术可以优化资源的使用
- 线程池、连接池、内存池、对象池等
线程池优点
- 线程复用、控制最大并发数、管理线程
- 降低资源的消耗
- 提高响应速度
- 方便管理
9.1 三大方法
- ExecutorService threadPool = Executors.newSingleThreadExecutor();//单个线程
- ExecutorService threadPool2 = Executors.newFixedThreadPool(5); //创建一个固定的线程池的大小
- ExecutorService threadPool3 = Executors.newCachedThreadPool(); //可伸缩的
不过阿里巴巴开发手册中写过,为了方便其他开发人员,最好是直接new ThreadPoolExecutor,如果你跟进三大构造方法的源码,其实底层也就是new ThreadPoolExecutor,只不过是参数的不同罢了。
9.2 七大参数
public ThreadPoolExecutor(int corePoolSize, //核心线程池大小
int maximumPoolSize, //最大的线程池大小
long keepAliveTime, //超时了没有人调用就会释放
TimeUnit unit, //超时单位
BlockingQueue<Runnable> workQueue, //阻塞队列
ThreadFactory threadFactory, //线程工厂 创建线程的 一般不用动
RejectedExecutionHandler handler //拒绝策略
)
这里要额外说一下maximunPoolSize的设置,也就是最大线程数。
- CPU密集型:和本机核心数保持一致,CPU性能最高。
获取CPU核心数:System.out.println("查看本机核心数:" + Runtime.getRuntime().availableProcessors());
- IO密集型:根据程序中大型IO耗时线程,保证大于等于。
9.3 四种拒绝策略
-
new ThreadPoolExecutor.AbortPolicy(): 该拒绝策略为:银行满了,还有人进来,不处理这个人的,并抛出异常。如果超出最大承载(队列容量大小+maxPoolSize),就会抛出异常
-
new ThreadPoolExecutor.CallerRunsPolicy(): 该拒绝策略为:哪来的去哪里 main线程进行处理
-
new ThreadPoolExecutor.DiscardPolicy() :该拒绝策略为:队列满了,丢掉异常,不会抛出异常。
-
new ThreadPoolExecutor.DiscardOldestPolicy():该拒绝策略为:队列满了,尝试去和最早的进程竞争,不会抛出异常
10.四大函数式接口
所谓的函数式接口其实说的就是只包含一个方法的接口。
比如:
public interface Runnable {
public abstract void run();
}
- public interface Function<T, R>:函数型接口,T为入参类型,R为返回值类型
- public interface Predicate<T>:断言性接口,有一个输入参数,返回值只能是布尔值
- public interface Consumer<T>:消费者接口,没有返回值,只有输入值
- public interface Supplier<T> :生产者接口,没有参数,只有返回值
具体可以看API手册。
11.ForkJoin
- 大致思想:将一个大任务拆分为多个子任务,然后子任务处理的结果再合并为最终的结果
- 特点:工作窃取,简单理解就是 若有两个线程执行任务,线程1已经执行完全部的任务,线程2未执行完,这时候线程1会窃取线程2未执行完的任务去执行。内部实现的数据结构为 双端队列 。
============ MyRecursiveTask ========
package study_fokejoin;
import java.util.concurrent.AbstractExecutorService;
import java.util.concurrent.ForkJoinTask;
import java.util.concurrent.RecursiveTask;
/**
* 计算 1 - 10_0000_0000的累和
* 使用分支合并
*/
public class MyRecursiveTask extends RecursiveTask<Long> {
private Long start = 1L;
private Long end;
// 临界值,判断选择分支合并 还是 一般方法
private Long temp = 10000L;
long sum = 0L;
public MyRecursiveTask(Long start, Long end) {
this.start = start;
this.end = end;
}
// 分支合并计算方法
@Override
protected Long compute() {
// 一般方法
if((end - start) < temp){
for (long i = start; i <= end; i++) {
sum += i;
}
return sum;
} // ForkJoin方法
else{
long mid = (start + end) /2;
// 拆分任务,将任务压入线程队列
MyRecursiveTask task1 = new MyRecursiveTask(start,mid);
task1.fork();
MyRecursiveTask task2 = new MyRecursiveTask(mid+1,end);
task2.fork();
// 合并子任务结果
long sum = task1.join() + task2.join();
return sum;
}
}
}
============ Test ===============
package study_fokejoin;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.ForkJoinPool;
import java.util.concurrent.ForkJoinTask;
import java.util.stream.LongStream;
public class Test {
public static void main(String[] args) throws ExecutionException, InterruptedException {
// 使用FokrJoin计算
MyRecursiveTask task = new MyRecursiveTask(1L,10_0000_0000L);
ForkJoinPool forkJoinPool = new ForkJoinPool();
ForkJoinTask<Long> res = forkJoinPool.submit(task);
System.out.println(res.get());
// 使用并行Stream流
System.out.println(LongStream.rangeClosed(1L, 10_0000_0000L).parallel().reduce(0, Long::sum));
}
}
12.异步回调
如果说到异步,我们就会联想到AJAX,这个是属于前端发送异步请求的一种技术。
在java中使用Future\<V>
实现异步。
java.util.concurrent包下的 接口Future<V>
,有
实现子类:CompletableFuture , CountedCompleter , ForkJoinTask , FutureTask , RecursiveAction , RecursiveTask , SwingWorker
大多数情况下都是使用CompletableFuture。
12.1 没有返回值的runAsync异步回调
public static void main(String[] args) throws ExecutionException, InterruptedException
{
// 发起 一个 请求
System.out.println(System.currentTimeMillis());
System.out.println("---------------------");
CompletableFuture<Void> future = CompletableFuture.runAsync(()->{
//发起一个异步任务
try {
TimeUnit.SECONDS.sleep(2);
} catch (InterruptedException e) {
e.printStackTrace();
}
System.out.println(Thread.currentThread().getName()+".....");
});
System.out.println(System.currentTimeMillis());
System.out.println("------------------------------");
//输出执行结果
System.out.println(future.get()); //获取执行结果
}
12.2 有返回值的supplyAsync异步回调
CompletableFuture<Integer> completableFuture=CompletableFuture.supplyAsync(()->{
System.out.println(Thread.currentThread().getName());
try {
TimeUnit.SECONDS.sleep(2);
int i=1/0;
} catch (InterruptedException e) {
e.printStackTrace();
}
return 1024;
});
System.out.println(completableFuture.whenComplete((t, u) -> {
//success 回调
System.out.println("t=>" + t); //正常的返回结果
System.out.println("u=>" + u); //抛出异常的 错误信息
}).exceptionally((e) -> {
//error回调
System.out.println(e.getMessage());
return 404;
}).get());
13.Volatile
Volatile 是 JVM 提供的轻量级的同步机制,使用该关键字可以保证单个变量被同步访问。
使用Volatile可以有以下效果:
- 保证可见性
- 不保证原子性
- 禁止指令重排
13.1 保证可见性
public class JMMDemo01 {
// 如果不加volatile 程序会死循环
// 加了volatile是可以保证可见性的
private volatile static Integer number = 0;
public static void main(String[] args) {
//main线程
//子线程1
new Thread(()->{
while (number==0){
}
}).start();
try {
TimeUnit.SECONDS.sleep(2);
} catch (InterruptedException e) {
e.printStackTrace();
}
//子线程2
new Thread(()->{
while (number==0){
}
}).start();
try {
TimeUnit.SECONDS.sleep(2);
} catch (InterruptedException e) {
e.printStackTrace();
}
// 在这里即使我们修改了number的值,上面的两个线程依然会陷入死循环
number=1;
System.out.println(number);
}
}
13.2 不保证原子性
如果说一个操作具有原子性,意思就是说这个操作不可分割,只有成功或者失败,没有成功了50%这种说法。
如果字段被Volatile修饰,这并不能保证这个操作具有原子性。
比如说num++
这个自加操作,在底层上实际上并不是一次完成的,需要计算、分配内存、赋值。
如果想要保证线程安全,就使用java.util.concurrent.atomic
13.3 避免指令重排
我们写的程序,计算机并不是按照我们自己写的那样去执行的,而是这样一个过程:
源代码–>编译器优化重排–>指令并行也可能会重排–>内存系统也会重排–>执行
int x=1; //1
int y=2; //2
x=x+5; //3
y=x*x; //4
//我们期望的执行顺序是 1_2_3_4 可能执行的顺序会变成2134 1324
//可不可能是 4123? 不可能的
处理器在进行指令重排的时候,并不是无脑重新排列,而是会考虑数据之间的依赖性。
volatile中会加一道内存的屏障,这个内存屏障可以保证在这个屏障中的指令顺序。
14.CAS
CAS(Compare-and-Swap),即比较并替换,是一种实现并发算法时常用到的技术,Java并发包中的很多类都使用了CAS技术。
例如AtomicInteger中的compareAndSet:
public class casDemo {
//CAS : compareAndSet 比较并交换
public static void main(String[] args) {
AtomicInteger atomicInteger = new AtomicInteger(2020);
//boolean compareAndSet(int expect, int update)
//期望值、更新值
//如果实际值 和 我的期望值相同,那么就更新
//如果实际值 和 我的期望值不同,那么就不更新
System.out.println(atomicInteger.compareAndSet(2020, 2021));
System.out.println(atomicInteger.get());
//因为期望值是2020 实际值却变成了2021 所以会修改失败
//CAS 是CPU的并发原语
atomicInteger.getAndIncrement(); //++操作
System.out.println(atomicInteger.compareAndSet(2020, 2021));
System.out.println(atomicInteger.get());
}
}
这里其实使用到了自旋锁。
15.Condition
Conditon就是监听器类。
监听器类只包含私有字段,每个监听器都有一个相关的Lock。
使用监听器我们可以实现精准唤醒:
/**
* A 执行完 调用B
* B 执行完 调用C
* C 执行完 调用A
*/
public class C {
// 开启线程
public static void main(String[] args) {
Data3 data3 = new Data3();
new Thread(()->{
for(int i=0;i<10;i++){
data3.printA();
}
},"A").start();
new Thread(()->{
for(int i=0;i<10;i++){
data3.printB();
}
},"B").start();
new Thread(()->{
for(int i=0;i<10;i++){
data3.printC();
}
},"C").start();
}
}
class Data3{
//资源类
private Lock lock=new ReentrantLock();
private Condition condition1 = lock.newCondition();
private Condition condition2 = lock.newCondition();
private Condition condition3 = lock.newCondition();
private int number = 1; //1A 2B 3C
public void printA(){
lock.lock();
try {
//业务 判断 -> 执行 -> 通知
while(number!=1){
//等待
condition1.await();
}
//操作
System.out.println(Thread.currentThread().getName()+",AAAAA");
//唤醒指定的线程
number=2;
condition2.signal(); // 唤醒2
} catch (Exception e) {
e.printStackTrace();
} finally {
lock.unlock();
}
}
public void printB(){
lock.lock();
try {
//业务 判断 -> 执行 -> 通知
while (number!=2){
condition2.await();
}
System.out.println(Thread.currentThread().getName()+",BBBBB");
//唤醒3
number=3;
condition3.signal();
} catch (Exception e) {
e.printStackTrace();
} finally {
lock.unlock();
}
}
public void printC(){
lock.lock();
try {
//业务 判断 -> 执行 -> 通知
while(number!=3){
condition3.await();
}
System.out.println(Thread.currentThread().getName()+",CCCCC");
//唤醒1
number=1;
condition1.signal();
} catch (Exception e) {
e.printStackTrace();
} finally {
lock.unlock();
}
}
}