ForkJoin
在JDK1.7,并行执行任务,提高效率,大数据量才会使用
特点:大任务拆分成小任务,工作窃取,里面维护的是双端队列
package com.kuang.forkjoin;
import java.util.concurrent.RecursiveTask;
/**
* 如何使用forkjoin
* 1.forkjoinpool通过它来执行
* 2.计算任务forkjoinpool.execute(ForkJoinTask task)
* 3.计算类要继承ForkJoinTask
*/
public class ForkJoinDemo extends RecursiveTask<Long> {
private Long start;
private Long end;
private Long temp = 10000L;//临界值
public ForkJoinDemo(Long start, Long end) {
this.start = start;
this.end = end;
}
@Override
protected Long compute() {
if (end - start < temp) {
Long sum = 0L;
for (Long i = start; i <= end; i++) {
sum += i;
}
return sum;
} else {//forkjoin递归
long middle = (start + end) / 2;
ForkJoinDemo task1 = new ForkJoinDemo(start, middle);
task1.fork();//拆分任务,把任务压入线程队列
ForkJoinDemo task2 = new ForkJoinDemo(middle+1,end);
task2.fork();//拆分任务,把任务压入线程队列
return task1.join()+task2.join();
}
}
}
求和计算优化
package com.kuang.forkjoin;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.ForkJoinPool;
import java.util.concurrent.ForkJoinTask;
import java.util.stream.LongStream;
public class Test {
public static void main(String[] args) throws ExecutionException, InterruptedException {
test1();//5190
//test2();//3618
//test3();//162
}
//初级程序员
public static void test1() {
long begin = System.currentTimeMillis();
Long sum = 0L;//为什么使用long反而更快些
for (Long i = 1L; i <= 10_0000_0000L; i++) {
sum += i;
}
long end = System.currentTimeMillis();
System.out.println("sum=" + sum + "消耗时间" + (end - begin));
}
//中级程序员ForkJoin
public static void test2() throws ExecutionException, InterruptedException {
long begin = System.currentTimeMillis();
ForkJoinPool forkJoinPool = new ForkJoinPool();
ForkJoinDemo forkJoinDemo = new ForkJoinDemo(0L, 10_0000_0000L);
ForkJoinTask<Long> submit = forkJoinPool.submit(forkJoinDemo);//异步提交有返回值
Long sum = submit.get();
long end = System.currentTimeMillis();
System.out.println("sum=" + sum + "消耗时间" + (end - begin));
}
//高级程序员并行流(]左开右闭
public static void test3() {
long begin = System.currentTimeMillis();
long sum = LongStream.rangeClosed(0L, 10_0000_0000L).parallel().reduce(0L, Long::sum);
long end = System.currentTimeMillis();
System.out.println("sum=" + sum + "消耗时间" + (end - begin));
}
}
异步回调
package com.kuang.future;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.TimeUnit;
/**
* 异步调用:CompletableFuture
* // 异步执行
* // 成功回调
* // 失败回调
*/
public class Demo01 {
public static void main(String[] args) throws ExecutionException, InterruptedException {
// 没有返回值的异步回调 runAsync
/*CompletableFuture<Void> completableFuture = CompletableFuture.runAsync(() -> {
try {
TimeUnit.SECONDS.sleep(2);
} catch (InterruptedException e) {
e.printStackTrace();
}
System.out.println(Thread.currentThread().getName()+"=>void");
});
System.out.println("1111");
completableFuture.get();//获取阻塞执行结果*/
// 有返回值的异步回调 supplyAsync
// ajax 成功和失败的回调
// 返回错误信息
CompletableFuture<Integer> completableFuture=CompletableFuture.supplyAsync(()->{//供给型接口 有返回值无参数
System.out.println(Thread.currentThread().getName()+"supplyAsync=>Integer");
int i=10/0;
return 1024;
});
System.out.println(completableFuture.whenComplete((t, u) -> {//消费型接口 有参数无返回值
System.out.println("t=>"+t);//正常返回结果 t=>null
System.out.println("u=>"+u);//错误信息:u=>java.util.concurrent.CompletionException: java.lang.ArithmeticException: / by zero
}).exceptionally((e) -> {//函数型接口 有参数有返回值
System.out.println(e.getMessage());//java.lang.ArithmeticException: / by zero
return 111;//可以获取错误的返回结果
}).get());
}
}
JMM
volatile关键字是JVM提供轻量级的同步机制
JMM是java内存模型,不存在东西,概念规定
关于JMM的一些约定
线程解锁前,必须把共享变量立即刷回主存
线程加锁前,必须读取主存中最新值到工作内存中
加锁和解锁是同一把锁
8种操作(assign赋值)
write和store反了
Volatile
特点:保证可见性、不保证原子性、禁止指令重排
保证可见性示例代码
package com.kuang.tvolatile;
import java.util.concurrent.TimeUnit;
public class JMMDemo {
// 不加volatile会死循环
private volatile static int num=0;
public static void main(String[] args) {
new Thread(()->{
while (num==0){
}
}).start();
try {
TimeUnit.SECONDS.sleep(1);
} catch (InterruptedException e) {
e.printStackTrace();
}
num=1;
System.out.println(num);
}
}
原子性:不可分割
线程A在执行任务的时候不能被打扰也不能被分割,要么同时成功,要么同时失败
不保证原子性示例代码:
package com.kuang.tvolatile;
public class VDemo02 {
private volatile static int num = 0;
public static void add() {
num++;
}
public static void main(String[] args) {
for (int i = 0; i < 20; i++) {
new Thread(() -> {
for (int j = 0; j < 1000; j++) {
add();
}
}).start();
}
while (Thread.activeCount() > 2) {
Thread.yield();
}
System.out.println(Thread.currentThread().getName() + " " + num);
}
}
如果不加synchronized和lock,如何保证原子性,使用原子类解决原子性问题
这些类的底层都直接和操作系统挂钩,在内存中修改值,Unsafe特殊的类
package com.kuang.tvolatile;
import java.util.concurrent.atomic.AtomicInteger;
public class VDemo02 {
private volatile static AtomicInteger num=new AtomicInteger();
public static void add() {
//num++;//不是一个原子性操作
num.getAndIncrement();
}
public static void main(String[] args) {
for (int i = 0; i < 20; i++) {
new Thread(() -> {
for (int j = 0; j < 1000; j++) {
add();
}
}).start();
}
while (Thread.activeCount() > 2) {
Thread.yield();
}
System.out.println(Thread.currentThread().getName() + " " + num);
}
}
禁止指令重排
指令重排:你写的程序,计算机并不是按照你写的那样去执行的
处理器在执行重排的时候需要考数据之间的依赖性
源代码-->编译器优化的重排-->指令并行也可能重排-->内存系统也会重排-->执行
volatile可以避免指令重排(由于内存屏障)
内存屏障就想象成我们的CPU指令,它有两个作用
可以保证特定的操作的执行顺序
可以保证某些变量的内存可见性(利用这些特性,volatile实现了可见性)
彻底玩转单例模式
package com.kuang.single;
// 饿汉式 (不想用也会把这些东西创建出来,造成浪费空间)
public class HungryMan {
// 可能会浪费空间
private byte[] data1=new byte[1024*1024];
private byte[] data2=new byte[1024*1024];
private byte[] data3=new byte[1024*1024];
// 无参构造
private HungryMan(){
}
private final static HungryMan hungryMan=new HungryMan();
public HungryMan getHungryMan(){
return hungryMan;
}
}
package com.kuang.single;
// 懒汉式
public class LazyMan {
private LazyMan(){
System.out.println(Thread.currentThread().getName()+"ok");
}
private volatile static LazyMan lazyMan;
//双重检测锁模式的 懒汉式单例 DCL懒汉式
public static LazyMan getInstance(){
// 加锁(加锁之前,可能会被2个线程或以上拿到,所以需要两次判断)
if (lazyMan==null){
// 锁class代表只锁一个
synchronized(LazyMan.class){
if (lazyMan==null){
return lazyMan=new LazyMan();//加锁之后极端情况也会出现问题 赋值不保证原子性
/**
* 分配内存空间
* 执行构造方法,初始化对象
* 把这个对象指向这个空间
*
* 期望123
* 132 线程A是允许的
* 线程B由于线程A已经对象指向这个空间了,lazyMan不等于null就return lazyMan但是此时这个lazyMan还没有完成构造
* 为了避免指令重排,需要在该对象上加上关键字volatile
*/
}
}
}
return lazyMan;
}
public static void main(String[] args) {
// 多线程并发 就有问题了(解决方案:加锁)
for (int i = 0; i < 10; i++) {
new Thread(()->{
LazyMan.getInstance();
}).start();
}
}
}
这里双重检测加锁是保证了操作原子性,只有一个线程能创建一个实例,其他线程无法创建第二个
volatile关键字是为了防止因为指令重排导致的多线程问题,有可能线程A创建一个实例,虚拟机只执行了分配空间,对象地址引用这两步,这时线程B过来发现对象已经被创建了,但是获取到的对象是还没有被初始化的
反射破坏单例一: 一个反射一个类的方法创建
// 反射破坏单例
public static void main(String[] args) throws Exception {
LazyMan instance = LazyMan.getInstance();
Constructor<LazyMan> declaredConstructor = LazyMan.class.getDeclaredConstructor(null);
declaredConstructor.setAccessible(true);
LazyMan instance2 = declaredConstructor.newInstance();
System.out.println(instance);
System.out.println(instance2);
}
解决破坏单例的方法添加异常 添加之后如果要破坏单例的话两个都使用反射创建就行了
package com.kuang.single;
import java.lang.reflect.Constructor;
// 懒汉式
public class LazyMan {
private LazyMan(){
synchronized (LazyMan.class){
if (lazyMan!=null)
throw new RuntimeException("不要试图使用反射破坏异常");
}
}
private volatile static LazyMan lazyMan;
//双重检测锁模式的 懒汉式单例 DCL懒汉式
public static LazyMan getInstance(){
// 加锁(加锁之前,可能会被2个线程或以上拿到,所以需要两次判断)
if (lazyMan==null){
// 锁class代表只锁一个
synchronized(LazyMan.class){
if (lazyMan==null){
return lazyMan=new LazyMan();//加锁之后极端情况也会出现问题 赋值不保证原子性
/**
* 分配内存空间
* 执行构造方法,初始化对象
* 把这个对象指向这个空间
*
* 期望123
* 132 线程A是允许的
* 线程B由于线程A已经对象指向这个空间了,lazyMan不等于null就return lazyMan但是此时这个lazyMan还没有完成构造
* 为了避免指令重排,需要在该对象上加上关键字volatile
*/
}
}
}
return lazyMan;
}
public static void main(String[] args) throws Exception {
LazyMan instance = LazyMan.getInstance();//如果把这行注释,下面2行注释放开就又破坏单例了
Constructor<LazyMan> declaredConstructor = LazyMan.class.getDeclaredConstructor(null);
declaredConstructor.setAccessible(true);
//LazyMan instance1 = declaredConstructor.newInstance();
LazyMan instance2 = declaredConstructor.newInstance();
System.out.println(instance);
//System.out.println(instance1);
System.out.println(instance2);
}
}
反射破坏单例二:添加标志位需要重新设置就可以破坏了
解决破坏单例二:添加标志位即可不要修改就行
package com.kuang.single;
import java.lang.reflect.Constructor;
import java.lang.reflect.Field;
// 懒汉式
public class LazyMan {
private static boolean flag=false;
private LazyMan(){
synchronized (LazyMan.class){
if (flag==false){//第一次执行无论通不通过反射,都会变为true
flag=true;
}else {
throw new RuntimeException("不要试图使用反射破坏异常");
}
}
}
private volatile static LazyMan lazyMan;
//双重检测锁模式的 懒汉式单例 DCL懒汉式
public static LazyMan getInstance(){
// 加锁(加锁之前,可能会被2个线程或以上拿到,所以需要两次判断)
if (lazyMan==null){
// 锁class代表只锁一个
synchronized(LazyMan.class){
if (lazyMan==null){
return lazyMan=new LazyMan();//加锁之后极端情况也会出现问题 赋值不保证原子性
/**
* 分配内存空间
* 执行构造方法,初始化对象
* 把这个对象指向这个空间
*
* 期望123
* 132 线程A是允许的
* 线程B由于线程A已经对象指向这个空间了,lazyMan不等于null就return lazyMan但是此时这个lazyMan还没有完成构造
* 为了避免指令重排,需要在该对象上加上关键字volatile
*/
}
}
}
return lazyMan;
}
// 反射破坏单例
// 把下面三行注释放开就是破坏单例
public static void main(String[] args) throws Exception {
//Field flag = LazyMan.class.getDeclaredField("flag");
//flag.setAccessible(true);
Constructor<LazyMan> declaredConstructor = LazyMan.class.getDeclaredConstructor(null);
declaredConstructor.setAccessible(true);
LazyMan instance1 = declaredConstructor.newInstance();
//flag.set(instance1,false);
LazyMan instance2 = declaredConstructor.newInstance();
System.out.println(instance1);
System.out.println(instance2);
}
}
package com.kuang.single;
// 静态内部类
public class Holder {
private Holder(){
}
public static Holder getInstance(){
return InnerClass.HOLDER;
}
public static class InnerClass{
private static final Holder HOLDER=new Holder();
}
}
不能使用反射破坏枚举
package com.kuang.single;
import java.lang.reflect.Constructor;
public enum EnumSingle {
INSTANCE;
public EnumSingle getInstance(){
return INSTANCE;
}
}
class Test{
public static void main(String[] args) throws Exception {
EnumSingle instance1= EnumSingle.INSTANCE;
Constructor<EnumSingle> declaredConstructor = EnumSingle.class.getDeclaredConstructor(null);
declaredConstructor.setAccessible(true);
EnumSingle instance2 = declaredConstructor.newInstance();
System.out.println(instance1);
System.out.println(instance2);
}
}
反编译javap -p EnumSingle.class发现它是有参构造而不是无参构造
package com.kuang.single;
import java.lang.reflect.Constructor;
public enum EnumSingle {
INSTANCE;
public EnumSingle getInstance(){
return INSTANCE;
}
}
class Test{
public static void main(String[] args) throws Exception {
EnumSingle instance1= EnumSingle.INSTANCE;
Constructor<EnumSingle> declaredConstructor = EnumSingle.class.getDeclaredConstructor(String.class,int.class);
declaredConstructor.setAccessible(true);
EnumSingle instance2 = declaredConstructor.newInstance();
System.out.println(instance1);
System.out.println(instance2);
}
}
深入理解CAS(比较并交换)
当前工作内存中的值与主内存值进行比较,如果这个值是期望的,那么则执行操作,如果不是就一直循环
缺点:循环会耗时、一次性只能保证一个共享变量的原子性、引发ABA问题
package com.kuang.cas;
import java.util.concurrent.atomic.AtomicInteger;
/**
* CAS 比较并交换
*/
public class CASDemo {
public static void main(String[] args) {
AtomicInteger atomicInteger=new AtomicInteger(2020);
System.out.println(atomicInteger.compareAndSet(2020, 2021));
System.out.println(atomicInteger);
atomicInteger.getAndIncrement();
System.out.println(atomicInteger.compareAndSet(2020, 2021));
System.out.println(atomicInteger);
}
}
自旋锁
var1当前对象+var2偏移量如果是var5的话就执行操作var5+var4
原子引用解决ABA问题
package com.kuang.cas;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicStampedReference;
/**
* 解决ABA问题,引入原子引用
*/
public class CASDemo {
public static void main(String[] args) {
//如果泛型是一个包装类,注意对象引用问题
AtomicStampedReference<Integer> atomicStampedReference=new AtomicStampedReference<>(1,1);
new Thread(()->{
int stamp=atomicStampedReference.getStamp();
System.out.println("a1=>"+stamp);
try {
TimeUnit.SECONDS.sleep(2);
} catch (InterruptedException e) {
e.printStackTrace();
}
atomicStampedReference.compareAndSet(1,2,atomicStampedReference.getStamp(),atomicStampedReference.getStamp()+1);
System.out.println("a2=>"+atomicStampedReference.getStamp());
atomicStampedReference.compareAndSet(2,1,atomicStampedReference.getStamp(),atomicStampedReference.getStamp()+1);
System.out.println("a3=>"+atomicStampedReference.getStamp());
},"a").start();
new Thread(()->{
int stamp=atomicStampedReference.getStamp();
System.out.println("b1=>"+stamp);
try {
TimeUnit.SECONDS.sleep(2);
} catch (InterruptedException e) {
e.printStackTrace();
}
atomicStampedReference.compareAndSet(1,6,atomicStampedReference.getStamp(),atomicStampedReference.getStamp()+1);
System.out.println("b2=>"+atomicStampedReference.getStamp());
},"b").start();
}
}
带版本号的原子操作:Integer使用了对象缓存机制,默认范围是-128~127,推荐使用静态工厂方法valueOf获取对象实例,而不是new,因为valueOf使用缓存,而new一定会创建新的对象分配新的内存空间
各种锁的理解
可重入锁(递归锁)
synchronized锁
package com.kuang.lock;
public class Demo01 {
public static void main(String[] args) {
Phone phone=new Phone();
new Thread(()->{
phone.sendMessage();
},"A").start();
new Thread(()->{
phone.sendMessage();
},"B").start();
}
}
class Phone{
public synchronized void sendMessage(){
System.out.println(Thread.currentThread().getName()+"=>"+"发短信");
call();//这里可以看做也有锁
}
public synchronized void call(){
System.out.println(Thread.currentThread().getName()+"=>"+"打电话");
}
}
Lock版
package com.kuang.lock;
import java.util.concurrent.locks.Lock;
import java.util.concurrent.locks.ReentrantLock;
public class Demo02 {
public static void main(String[] args) {
Phone2 phone=new Phone2();
new Thread(()->{
phone.sendMessage();
},"A").start();
new Thread(()->{
phone.sendMessage();
},"B").start();
}
}
class Phone2{
Lock lock=new ReentrantLock();
public void sendMessage(){
lock.lock();
try {
System.out.println(Thread.currentThread().getName()+"=>"+"发短信");
call();
} catch (Exception e) {
e.printStackTrace();
}finally {
lock.unlock();
}
}
public void call(){
// 细节问题:锁要配对
lock.lock();
try {
System.out.println(Thread.currentThread().getName()+"=>"+"打电话");
} catch (Exception e) {
e.printStackTrace();
}finally {
lock.unlock();
}
}
}
自旋锁
package com.kuang.lock;
import java.util.concurrent.atomic.AtomicReference;
public class SpinLockDemo {
AtomicReference<Thread> atomicReference=new AtomicReference<>();
public void myLock(){
Thread thread = Thread.currentThread();
System.out.println(Thread.currentThread().getName()+"=>myLock");
while (!atomicReference.compareAndSet(null, thread)) {
}
}
public void myUnlock(){
Thread thread = Thread.currentThread();
System.out.println(Thread.currentThread().getName()+"=>myUnLock");
atomicReference.compareAndSet(thread,null);
}
}
package com.kuang.lock;
import java.util.concurrent.TimeUnit;
public class SpinLockTest {
public static void main(String[] args) throws InterruptedException {
SpinLockDemo lock = new SpinLockDemo();
new Thread(()->{
lock.myLock();
try {
TimeUnit.SECONDS.sleep(3);
} catch (Exception e) {
e.printStackTrace();
}finally {
lock.myUnlock();
}
},"A").start();
new Thread(()->{
lock.myLock();
try {
} catch (Exception e) {
e.printStackTrace();
}finally {
lock.myUnlock();
}
},"B").start();
}
}
死锁排查
package com.kuang.lock;
import java.util.concurrent.TimeUnit;
public class DeadLockDemo {
public static void main(String[] args) {
String lockA="lockA";
String lockB="lockB";
new Thread(new MyThread(lockA,lockB),"lockA").start();
new Thread(new MyThread(lockB,lockA),"lockB").start();
}
}
class MyThread implements Runnable{
private String lockA;
private String lockB;
public MyThread(String lockA, String lockB) {
this.lockA = lockA;
this.lockB = lockB;
}
@Override
public void run() {
synchronized (lockA){
System.out.println(Thread.currentThread().getName()+"lock:"+lockA+"get" +lockB);
try {
TimeUnit.SECONDS.sleep(3);
} catch (InterruptedException e) {
e.printStackTrace();
}
synchronized (lockB){
System.out.println(Thread.currentThread().getName()+"lock:"+lockB+"get" +lockA);
}
}
}
}
使用jps -l定位进程号 接着使用jstack 进程号找到死锁问题
面试说排查死锁:日志+堆栈