java多线程这一篇就差不多了

news2024/7/4 5:28:05

java多线程这一篇就差不多了

什么是多线程?

一般被问你对多线程了解多少的时候,你可能不仅仅只需要知道线程怎么创建,你可能需要了解线程的几种创建方式,线程的生命周期,线程池相关,并发安全,原子类,锁机制…

总之本文了解一下基础的

基础概念

并发和并行

并发:指两个或多个事件在同⼀个时间段内发⽣。
并⾏:指两个或多个事件在同⼀时刻发⽣(同时发⽣)。

进程和线程

  • 进程:是指⼀个内存中运⾏的应⽤程序,每个进程都有⼀个独⽴的内存空间,⼀个应⽤程序可以同时运⾏多个进程;进程也是程序的⼀次执⾏过程,是系统运⾏程序的基本单位;系统运⾏⼀个程序即是⼀个进程从创建、运⾏到消亡的过程。
  • 线程:线程是进程中的⼀个执⾏单元,负责当前进程中程序的执⾏,⼀个进程中⾄少有⼀个线程。⼀个进程中是可以有多个线程的,这个应⽤程序也可以称之为多线程程序。简⽽⾔之:⼀个程序运⾏后⾄少有⼀个进程,⼀个进程中可以包含多个线程

线程的创建方式

三种创建线程的方式

public class MyThread extends Thread{

    public MyThread(String name) {
        super(name);
    }

    @Override
    public void run() {
        for (int i = 0; i < 200; i++) {
            System.out.println(getName()+":"+i);
        }
    }
}

public class MyRunable implements Runnable{

    public void run() {
        String name = Thread.currentThread().getName();
        System.out.println("Running " +  name );
        try {
            for(int i = 4; i > 0; i--) {
                System.out.println("Thread: " + name + ", " + i);
                // 让线程睡眠一会
                Thread.sleep(50);
            }
        }catch (InterruptedException e) {
            System.out.println("Thread " +  name + " interrupted.");
        }
        System.out.println("Thread " +  name + " exiting.");
    }
}

public class MyCallableThread implements Callable<Integer> {

    @Override  
    public Integer call() throws Exception  
    {  
        int i = 0;  
        for(;i<100;i++)  
        {  
            System.out.println(Thread.currentThread().getName()+" "+i);  
        }  
        return i;  
    }  
}

线程创建后使用示例

public class ThreadTest {
    public static void main(String[] args) {
        // 创建线程
        MyThread myThread = new MyThread("自定义线程");
        // 启动线程
        myThread.start();

        Thread runThread = new Thread(new MyRunable(), "runable线程");
        runThread.start();


        MyCallableThread myCallableThread = new MyCallableThread();
        FutureTask<Integer> ft = new FutureTask<>(myCallableThread);
        new Thread(ft, "有返回值的线程").start();

        try {
            System.out.println("子线程的返回值:" + ft.get());
        } catch (InterruptedException e) {
            throw new RuntimeException(e);
        } catch (ExecutionException e) {
            throw new RuntimeException(e);
        }

    }
}

关于传参和获取返回参数

传参的话一般是是内部类的形式,或者给类定义相关属性,

返回值的话,你也可以使用定义相关属性的方式获取,或者基于 FutureTask

        String str = "你好呀";
        Thread thread = new Thread((Runnable) () -> {
            System.out.println("str");
        }, "线程1");
        thread.start();

线程的常用方法

  • public void start()

    使该线程开始执行;Java 虚拟机调用该线程的 run 方法。

  • public void run()

    如果该线程是使用独立的 Runnable 运行对象构造的,则调用该 Runnable 对象的 run 方法;否则,该方法不执行任何操作并返回。

  • public final void setName(String name)

    改变线程名称,使之与参数 name 相同。

  • public final void setPriority(int priority)

    更改线程的优先级。

  • public final void setDaemon(boolean on)

    将该线程标记为守护线程或用户线程。

  • public final void join(long millisec)

    等待该线程终止的时间最长为 millis 毫秒。

  • public void interrupt()

    中断线程。

  • public final boolean isAlive()

    测试线程是否处于活动状态。

start和run

start是开启一个新的线程,run相当于正常调用方法(在线程池概念中,调用run方法来执行具体任务)

setDaemon

守护线程,用于为其他线程提供服务,一般是虚拟机退出的时候停止,需要在start方法之前设置

用户进程: 默认创建的是用户进程,用户进程在任务执行结束后终止

setPriority

线程执行的优先级,这个是虚拟机中的概念,Thread.MIN_PRIORITY和Thread.MAX_PRIORITY两个常量定义了其最大和最小值,

当然并不是你设置了优先级更高就一定会先执行,而且还需要实际的系统支持优先级的设定,否则和没设置一样。似乎linux是不支持优先级这个概念的

join

主线程中创建了线程a,然后执行了线程a.join(),那么主线程会陷入阻塞状态,等线程a的任务执行完成或发生异常终止后才继续执行

这个方法支持毫秒数入参,意思大概是线程a终止或者等待多久后,主线程会继续执行

        // 创建线程
        MyThread myThread = new MyThread("自定义线程");

        // 启动线程
        myThread.start();
        boolean alive1 = myThread.isAlive();
        myThread.join();
        System.out.println("等待结束");
        System.out.println(alive1);

currentThread

获取当前线程

sleep

当前线程沉睡多少秒,会让出cpu的执行权

yield

当前线程会让线程从运行转就绪状态,和sleep不一样的是,不需要陷入阻塞状态,可以和其他线程争夺cpu的执行权

interrupt

NEW和TERMINATED对于中断操作几乎是屏蔽的,RUNNABLE和BLOCKED类似,对于中断操作只是设置中断标志位并没有强制终止线程,对于线程的终止权利依然在程序手中。WAITING/TIMED_WAITING状态下的线程对于中断操作是敏感的,他们会抛出异常并清空中断标志位。

        // 创建线程
        MyThread myThread = new MyThread("自定义线程");

        // 启动线程,线程中睡眠10s
        myThread.start();
        boolean alive1 = myThread.isAlive();

        // 主线程睡眠2s
        Thread.sleep(2000);
        // 设置myThread的中断状态为true,这个时候myThread会抛出java.lang.InterruptedException: sleep interrupted异常
        // 如果这个异常在myThread被捕获,那myThread将继续执行,否则停止
        myThread.interrupt();
        // 返回myThread这个线程的中断状态,由于myThread抛出异常后,中断状态马上会置为false,所以此处获取的还是false
        System.out.println(myThread.isInterrupted());
        // 返回当前线程的中断状态静态方法。这里是主线程
        System.out.println(Thread.interrupted());

当然还有一种是子线程处于运行状态,并且一直在执行任务,然后主线程就可以设置子线程的中断状态,来停止子线程的运行,由于子线程处于运行状态,所以此时不会抛出中断异常

while (!Thread.currentThread().isInterrupted() && count < 1000) {
  System.out.println("count = " + count++);
  }
  System.out.println("线程停止: stop thread");
  }

holdsLock(Object x)

当前线程是否持有某个对象的锁

线程异常处理器

如果需要在线程的外部获取线程抛出的异常

同时设置全局处理器和单个线程的处理器时,只有单个线程的处理器生效

public class ThreadTest5 {
    static {
        // 设置全局的梳理线程异常的类
        Thread.setDefaultUncaughtExceptionHandler(new MyUncaughtExceptionHandler());
    }
    public static void main(String[] args) throws InterruptedException {
        // 创建线程
        MyThread myThread = new MyThread("自定义线程");
        myThread.setUncaughtExceptionHandler((t,e) ->{
            System.out.println("异常的线程名称:"+t.getName());
            System.out.println(e.getMessage());
        });
        // 启动线程,线程中睡眠10s
        myThread.start();


    }
}
public class MyUncaughtExceptionHandler implements Thread.UncaughtExceptionHandler {
    @Override
    public void uncaughtException(Thread t, Throwable e) {
        System.out.println("进入了全局的线程异常处理");
    }
}

LockSupport

LockSupport.park()休眠线程,LockSupport.unpark()唤醒线程,两个方法配合使用。

也可以通过LockSupport.parkNanos()指定休眠时间后,自动唤醒。
LockSupport.park()不会释放monitor锁。
线程被中断,LockSupport.park()不会抛出异常,也不会吞噬掉interrupt的状态,调用者可以获取interrupt状态,自行进行判断,线程是由于什么原因被唤醒了。
LockSupport.park()会是线程进入WAITING状态,而LockSupport.parkNanos(long nanos) 会进入TIMED_WAITING状态。
LockSupport.park(Object blocker)和LockSupport.getBlocker(t1)配合使用,可以进行自定义数据传输。

public class LockSupportTest {

    public static void main(String[] args) throws InterruptedException {

        Thread t = new Thread(() -> {
            // 睡眠1秒,避免unpark先执行
//            try {
//                TimeUnit.SECONDS. sleep(1);
//            } catch (InterruptedException e) {
//
//            }
            String a = "1";
            //使用LockSupport的park()方法阻塞当前线程t
            LockSupport.park(a);
            System.out.println("任务结束");
        });
        //启动当前线程t
        t.start();
        // 让park先执行
        TimeUnit.SECONDS.sleep(1);
        
        // 获取锁对象,用于传递数据,必须在park之后执行,只有park执行后而且unpark未执行才可以获取到,具体看源码
        Object blocker = LockSupport.getBlocker(t);

        //唤醒线程t,park方法实际上是获取一个许可证,而unpaark是提供一个许可证,可以比park先执行,先执行之后park就不会阻塞了
        LockSupport.unpark(t);


        System.out.println(blocker);
        System.out.println("释放锁");

    }
}

线程的生命周期

<img src=“https://cdn.jsdelivr.net/gh/cloudinwinter/myimage@master/blogImg/20221125/1image-20221203144314150.png” alt=“image-20221203144314150” style=“zoom:80%;” /"/>

  • 新建状态(New)

    当线程对象对创建后,即进⼊了新建状态,如: Thread t = new MyThread();

  • 就绪状态(Runnable)

    当调⽤线程对象的start()⽅法( t.start(); ),线程即进⼊就绪状态。处于就绪状态的线程,只是说明此线程已经做好了准备,随时等待CPU调度执⾏,并不是说执⾏了 t.start() 此线程⽴即就会执⾏;

  • 运⾏状态(Running)

    当CPU开始调度处于就绪状态的线程时,此时线程才得以真正执⾏,即进⼊到运⾏状态。注:就 绪

    状态是进⼊到运⾏状态的唯⼀⼊⼝,也就是说,线程要想进⼊运⾏状态执⾏,⾸先必须处于就绪状态

  • 阻塞状态(Blocked)

    处于运⾏状态中的线程由于某种原因,暂时放弃对CPU的使⽤权,停⽌执⾏,此时进⼊阻塞状态,直到其进⼊到就绪状态,才 有机会再次被CPU调⽤以进⼊到运⾏状态。根据阻塞产⽣的原因不同,

    阻塞状态⼜可以分为三种:

    1.等待阻塞:运⾏状态中的线程执⾏wait()⽅法,使本线程进⼊到等待阻塞状态;

    2.同步阻塞 – 线程在获取synchronized同步锁失败(因为锁被其它线程所占⽤),它会进⼊同步阻塞状

    态;

    3.其他阻塞 – 通过调⽤线程的sleep()或join()或发出了I/O请求时,线程会进⼊到阻塞状态。当sleep()

    状态超时、join()等待线程终⽌或者超时、或者I/O处理完毕时,线程重新转⼊就绪状态。

  • 死亡状态(Dead)

    线程执⾏完了或者因异常退出了run()⽅法,该线程结束⽣命周期。

锁池和等待池

wait和notify使用在后面介绍

**锁池:**假设线程A已经拥有了某个对象(注意:不是类)的锁,而其它的线程想要调用这个对象的某个synchronized方法(或者synchronized块),由于这些线程在进入对象的synchronized方法之前必须先获得该对象的锁的拥有权,但是该对象的锁目前正被线程A拥有,所以这些线程就进入了该对象的锁池中。

等待池:假设一个线程A调用了某个对象的wait()方法,线程A就会释放该对象的锁(因为wait()方法必须出现在synchronized中,这样自然在执行wait()方法之前线程A就已经拥有了该对象的锁),同时线程A就进入到了该对象的等待池中。如果另外的一个线程调用了相同对象的notifyAll()方法,那么处于该对象的等待池中的线程就会全部进入该对象的锁池中,准备争夺锁的拥有权。如果另外的一个线程调用了相同对象的notify()方法,那么仅仅有一个处于该对象的等待池中的线程(随机)会进入该对象的锁池.

WAITING和TIME_WAITING

BLOCKED和WAITING,首先他们都是阻塞的一种状态,BLOCKED和WAITING两个状态最大的区别有两个:

  • BLOCKED是锁竞争失败后被被动触发的状态,WAITING是人为的主动触发的状态
  • BLCKED的唤醒时自动触发的,而WAITING状态是必须要通过特定的方法来主动唤醒

线程池

一般不允许显示的创建线程,而是通过线程池创建

线程池相关类

java.util.concurrent包下包含了线程安全相关的类,也包含了线程池相关的类

ExecutorService

线程池真正的接口,常用的实现类有ScheduledThreadPoolExecutor和ThreadPoolExecutor

    public ThreadPoolExecutor(int corePoolSize,
                              int maximumPoolSize,
                              long keepAliveTime,
                              TimeUnit unit,
                              BlockingQueue<Runnable> workQueue,
                              ThreadFactory threadFactory,
                              RejectedExecutionHandler handler) {
  • 第1个参数 :corePoolSize 表示常驻核心线程数。如果等于0,则任务执行完成后,没有任何请求进入时销毁线程池的线程;如果大于0,即使本地任务执行完毕,核心线程也不会被销毁。这个值的设置非常关键,设置过大会浪费资源,设置的过小会导致线程频繁地创建或销毁。

  • 第2个参数:maximumPoolSize 表示线程池能够容纳同时执行的最大线程数。从上方的示例代码中第一处来看,必须大于或等于1。如果待执行的线程数大于此值,需要借助第5个参数的帮助。缓存在队列中。如果maximumPoolSize 与corePoolSize 相等,即是固定大小线程池。

  • 第3个参数: keepAliveTime
    表示线程池中的线程空闲时间,当空闲时间达到keepAliveSeconds值时,线程被销毁,直到剩下corePoolSize 个线程为止,避免浪费内存和句柄资源。在默认情况下,当线程池的线程大于corePoolSize 时,keepAliveSeconds才会起作用。但是ThreadPoolExecutor的allowCoreThreadTimeOut 变量设置为ture时,核心线程超时后也会被回收。

  • TimeUnit : keepAliveSeconds

  • workQueue 表示缓存队列。如果线程池里的线程数大于corePoolSize ,就会放到缓存队列,缓存队列满了会创建新线程到maximumPoolsize;直到当请求的线程数大于maximumPoolSize时,会执行设定的策略,默认是拒绝创建策略。(注意:当线程池里的线程数大于corePoolSize且小于maximumPoolSize时,这时候再有请求的线程就会放到缓存队列,注意只是放到缓存队列但是不创建新的线程,直到请求的线程存满缓存队列时,才会开始创建新的线程,直到maxmunPoolSize就会拒绝创建或者执行提前设定的策略。

  • threadFactory: 创建线程的工厂

  • handler : 拒绝策略处理器

Executors

Executors是一个线程池的工厂类,预制以一些我们常用的线程池(就是不同的参数,实现不同功能的线程池),以下是几个常用的线程池,还有很多

  • newFixedThreadPool 创建⼀个固定⻓度的线程池,当到达线程最⼤数量时,线程池的规模将不再变化。
  • newCachedThreadPool 创建⼀个可缓存的线程池,如果当前线程池的规模超出了处理需求,将回收空的线程;当需求增加时,会增加线程数量;线程池规模⽆限制。
  • newSingleThreadPoolExecutor 创建⼀个单线程的Executor,确保任务对了,串⾏执⾏
  • newScheduledThreadPool :创建⼀个固定⻓度的线程池,⽽且以延迟或者定时的⽅式来执⾏,类似Timer;
        ScheduledExecutorService executorService = Executors.newScheduledThreadPool(1);
        ScheduledFuture<String> scheduledFuture = executorService.schedule(new Callable<String>() {
            public String call()  {
                return "call";
            }
        }, 10, TimeUnit.SECONDS);

        try {
            System.out.println(scheduledFuture.get());
        } catch (InterruptedException e) {
            throw new RuntimeException(e);
        } catch (ExecutionException e) {
            throw new RuntimeException(e);
        }

ThreadFactory

线程工厂一般我们会根据具体业务来指定创建线程的名称

示例,线程创建工厂

    public class CustomThreadFactory implements ThreadFactory {
        private final AtomicInteger index = new AtomicInteger(1);

        private CustomThreadFactory() {
            SecurityManager sm = System.getSecurityManager();
            group = (sm != null) ? sm.getThreadGroup()
                    : Thread.currentThread().getThreadGroup();
        }

        @Override
        public Thread newThread(Runnable r) {
            Thread t = new Thread(group, r, "SocketStreamHandle-"
                    + index.getAndIncrement());
            t.setDaemon(true);
            if (t.getPriority() != Thread.NORM_PRIORITY) {
                t.setPriority(Thread.NORM_PRIORITY);
            }
            return t;
        }
    }

线程池大小和一些使用场景

来自:https://link.zhihu.com/?target=http%3A//ifeve.com/how-to-calculate-threadpool-size

一般说来,大家认为线程池的大小经验值应该这样设置:(其中N为CPU的个数)

  • 如果是CPU密集型应用,则线程池大小设置为N+1
  • 如果是IO密集型应用,则线程池大小设置为2N+1

如果一台服务器上只部署这一个应用并且只有这一个线程池,那么这种估算或许合理,具体还需自行测试验证。
但是,IO优化中,这样的估算公式可能更适合:
最佳线程数目 = ((线程等待时间+线程CPU时间)/线程CPU时间 )* CPU数目
因为很显然,线程等待时间所占比例越高,需要越多线程。线程CPU时间所占比例越高,需要越少线程。
下面举个例子:
比如平均每个线程CPU运行时间为0.5s,而线程等待时间(非CPU运行时间,比如IO)为1.5s,CPU核心数为8,那么根据上面这个公式估算得到:((0.5+1.5)/0.5)8=32。这个公式进一步转化为:
最佳线程数目 = (线程等待时间与线程CPU时间之比 + 1) CPU数目

刚刚说到的线程池大小的经验值,其实是这种公式的一种估算值。

// 获取cpu核数
Runtime.getRuntime().availableProcessors()

看了上面这段,你大概想要整个项目使用一个线程池,来避免cpu频繁的切换,而实际上一个应用中线程池的数量是不能控制的,比如你使用了很多第三方依赖,这些依赖里面就可能会根据具体的逻辑需要创建线程池,而且为了避免多个任务之间的相互影响,应该是针对每个业务创建一个线程池。而线程池的大小也不一定需要是cpu核数。

如果我们使用spring定时任务的话,可能会创建一个定时任务的线程池,避免一个任务因为等待另一个任务执行而演示

正常使用的话,比如我们接收第三方的数据后,可以效验格式后尽快响应,剩下的操作异步处理,这个时候你大致可以创建一个线程池,交由spring容器进行管理,

然后就是一些第三方jar包,比如rocketMq,不管是消息接收者,还是消息发送者都有设置线程池的大小方法,如果我们需要加快消息的处理,可以设置消息接收者的线程池的最大线程数的大小。

springboot中@EnableAsync和@Async,你需要异步执行某个方法,你也可以通过配置设置异步执行的线程池的大小,具体似乎是通过AsyncConfigurer 来设置线程池和线程异常处理,以下仅做参考

@Configuration
@EnableAsync
public class ExecutorConfig {

    @Bean
    public Executor asyncServiceExecutor() {
        ThreadPoolTaskExecutor executor = new ThreadPoolTaskExecutor();
        //配置核心线程数
        executor.setCorePoolSize(5);
        //配置最大线程数
        executor.setMaxPoolSize(5);
        //配置队列大小
        executor.setQueueCapacity(20);
        // 设置线程活跃时间(秒)
        executor.setKeepAliveSeconds(60);
        //配置线程池中的线程的名称前缀
        executor.setThreadNamePrefix("async-service-");
        // rejection-policy:当pool已经达到max size的时候,如何处理新任务
        // CALLER_RUNS:不在新线程中执行任务,而是有调用者所在的线程来执行
        executor.setRejectedExecutionHandler(new ThreadPoolExecutor.CallerRunsPolicy());
        // 等待所有任务结束后再关闭线程池
        executor.setWaitForTasksToCompleteOnShutdown(true);
        //执行初始化
        executor.initialize();
        return executor;
    }
}
Async("asyncThreadPoolTaskExecutor")

线程池的销毁

线程池有shutdown方法

如果是springboot的话可以指定销毁方法,感觉似乎没啥必要,一般应该不需要销毁吧

@Bean(destroyMethod = "shutdown")

线程池的拒绝策略

拒绝策略提供顶级接口 RejectedExecutionHandler ,其中方法 rejectedExecution 即定制具体的拒绝策略的执行逻辑。
jdk默认提供了四种拒绝策略:

  • AbortPolicy - 抛出异常,中止任务。抛出拒绝执行 RejectedExecutionException 异常信息。线程池默认的拒绝策略。必须处理好抛出的异常,否则会打断当前的执行流程,影响后续的任务执行
  • CallerRunsPolicy - 使用调用线程执行任务。当触发拒绝策略,只要线程池没有关闭的话,则使用调用线程直接运行任务。一般并发比较小,性能要求不高,不允许失败。但是,由于调用者自己运行任务,如果任务提交速度过快,可能导致程序阻塞,性能效率上必然的损失较大
  • DiscardPolicy - 直接丢弃,其他啥都没有
  • DiscardOldestPolicy - 丢弃队列最老任务,添加新任务。当触发拒绝策略,只要线程池没有关闭的话,丢弃阻塞队列 workQueue 中最老的一个任务,并将新任务加入

可以实现RejectedExecutionHandler,定制一些记录导数据库(空闲再提取出来),打印日志

线程池的一些使用方式

代码规范中已经不允许使用Executors来创建线程池。

    public static void main(String[] args) throws ExecutionException, InterruptedException {

        ThreadPoolExecutor executor = new ThreadPoolExecutor(10, 10,
                0L, TimeUnit.MILLISECONDS,
                new LinkedBlockingQueue<>());
        // 无返回值
        executor.execute(()->{
            System.out.println("1");
        });

        Future<String>  future = executor.submit(() -> "3");
        //Future的默认实现是FutureTask,还是阻塞获取结果,get方法支持配置阻塞的时间
        String str = future.get();
        System.out.println(str);

        List<Callable<String>> threadList = new ArrayList<>();
        for (int i = 0; i < 100; i++) {
            int a = i+1;
            threadList.add(()-> "数字:" + a);
        }
        // 批量执行,只支持Callable类型的入参
        List<Future<String>> futures = executor.invokeAll(threadList);
        for (Future<String> future1 : futures) {
            System.out.println(future1.get());
        }
        
    }

线程池原理

通过已经创建的线程调用相关类的run方法,而不是start方法

工作队列说明

ArrayBlockingQueue是一个有界缓存等待队列,可以指定缓存队列的大小;

LinkedBlockingQueue是一个无界(没有大小限制)缓存等待队列。当前执行的线程数量达到corePoolSize的数量时,剩余的元素会在阻塞队列里等待,在使用此阻塞队列时maximumPoolSizes就相当于无效了。LinkedBlockingQueue也可以设置大小。 示例参考LinkedBlockingQueue

SynchronousQueue没有容量,是无缓冲等待队列,是一个不存储元素的阻塞队列,会直接将任务交给消费者,必须等队列中的添加元素被消费后才能继续添加新的元素。使用SynchronousQueue阻塞队列一般要求maximumPoolSizes为无界,避免线程拒绝执行操作。参考 newCachedThreadPool

注意

线程池工作队列的长度不要设置太长,应该考虑合理设置线程池的参数来保证任务的处理速度,过长的工作队列会导致内存溢出

线程安全

什么情况下会有线程安全问题

大致就是存在共享的资源(成员变量)的情况下,多个线程对同一个共享变量进行操作,然后导致线程安全问题

以下是测试代码,我想要将num一直减小到0,但是在多线程的情况下 num距离0还差了很多

        ThreadPoolExecutor executor = new ThreadPoolExecutor(100, 100,
                0L, TimeUnit.MILLISECONDS,
                new LinkedBlockingQueue<>());
        List<Callable<String>> threadList = new ArrayList<>();
        for (int i = 0; i < 100; i++) {
            int a = i+1;
            threadList.add(()-> sub());
        }

        // 批量执行,只支持Callable类型的入参
        List<Future<String>> futures = executor.invokeAll(threadList);
        for (Future<String> future1 : futures) {
            System.out.println(future1.get());
        }

    }
    static int num = 100;

    private  static String sub(){
        num = num - 1;
        // 让测试效果更明显
        try {
            Thread.sleep(1);
        } catch (InterruptedException e) {

        }
        return Thread.currentThread().getName() +":"+ num;
    }

处理的方式也很简单,直接在sub方法上添加synchronized关键字

怎么保证线程安全

总之就是保证多线程下业务代码的原子性,可见性和顺序性

首先你需要了解一下jvm会将我们的字节码转换成指令,会对指令进行重排序,了解一个cpu的多级缓存(怎么读取一个变量的),a ++ 转换成指令其实是两个操作,一个相加,一个赋值。

原子性: 多个指令同时执行成功或同时执行失败。

可见性:线程啊对变量的修改对于其他线程是可见的,其他线程会读取到修改的最新值

顺序性:多个操作按顺序执行

就实际使用而言,加锁,使用线程安全相关的类,都是保证线程安全的方法,当然你需要先了解这些概念,才能正确使用

synchronized

synchronized底层原理

synchronized是JVM内置锁,基于Monitor机制实现,依赖底层操作系统的互斥原语Mutex(互斥量),它是一个重量级锁,性能较低。当然,JVM内置锁在1.5之后版本做了重大的优化,如锁粗化(Lock Coarsening)、锁消除(Lock Elimination)、轻量级锁(Lightweight
Locking)、偏向锁(Biased Locking)、自适应自旋(Adaptive Spinning)等技术来减少锁操作的开销,内置锁的并发性能已经基本与Lock持平

在讲原理前,我们先讲一下Java对象的构成。在JVM中,对象在内存中分为三块区域:对象头,实例数据和对齐填充。如图所示:

对象头

  • Mark Word,用于存储对象自身运行时的数据,如哈希码(Hash Code),GC分代年龄,锁状态标志,偏向线程ID、偏向时间戳等信息,它会根据对象的状态复用自己的存储空间。它是实现轻量级锁和偏向锁的关键
  • 类型指针,对象会指向它的类的元数据的指针,虚拟机通过这个指针确定这个对象是哪个类的实例。
  • Array length,如果对象是一个数组,还必须记录数组长度的数据。

下面锁的说明来自:https://zhuanlan.zhihu.com/p/343305760

自适应性自旋锁

在说自适应自旋锁之前,先讲自旋锁。上面已经讲过,当线程没有获得monitor对象的所有权时,就会进入阻塞,当持有锁的线程释放了锁,当前线程才可以再去竞争锁,但是如果按照这样的规则,就会浪费大量的性能在阻塞和唤醒的切换上,特别是线程占用锁的时间很短的话。

为了避免阻塞和唤醒的切换,在没有获得锁的时候就不进入阻塞,而是不断地循环检测锁是否被释放,这就是自旋。在占用锁的时间短的情况下,自旋锁表现的性能是很高的。

但是又有问题,由于线程是一直在循环检测锁的状态,就会占用cpu资源,如果线程占用锁的时间比较长,那么自旋的次数就会变多,占用cpu时间变长导致性能变差,当然我们也可以通过参数-XX:PreBlockSpin设置自旋锁的自旋次数,当自旋一定的次数(时间)后就挂起,但是设置的自旋次数是多少比较合适呢?

如果设置次数少了或者多了都会导致性能受到影响,而且占用锁的时间在业务高峰期和正常时期也有区别,所以在JDK1.6引入了自适应性自旋锁。

自适应性自旋锁的意思是,自旋的次数不是固定的,而是由前一次在同一个锁上的自旋时间及锁的拥有者的状态来决定。

表现是如果此次自旋成功了,很有可能下一次也能成功,于是允许自旋的次数就会更多,反过来说,如果很少有线程能够自旋成功,很有可能下一次也是失败,则自旋次数就更少。这样能最大化利用资源,随着程序运行和性能监控信息的不断完善,虚拟机对锁的状况预测会越来越准确,也就变得越来越智能。

锁消除

锁消除是一种锁的优化策略,这种优化更加彻底,在JVM编译时,通过对运行上下文的扫描,去除不可能存在共享资源竞争的锁。这种优化策略可以消除没有必要的锁,节省毫无意义的请求锁时间。比如StringBuffer的append()方法,就是使用synchronized进行加锁的。

public synchronized StringBuffer append(String str) {
    toStringCache = null;
    super.append(str);
    return this;
}

如果在实例方法中StringBuffer作为局部变量使用append()方法,StringBuffer是不可能存在共享资源竞争的,因此会自动将其锁消除。例如:

public String add(String s1, String s2) {
    //sb属于不可能共享的资源,JVM会自动消除内部的锁
    StringBuffer sb = new StringBuffer();
    sb.append(s1).append(s2);
    return sb.toString();
}
锁粗化

如果一系列的连续加锁解锁操作,可能会导致不必要的性能损耗,所以引入锁粗话的概念。意思是将多个连续加锁、解锁的操作连接在一起,扩展成为一个范围更大的锁。

偏向锁

偏向锁是JDK1.6引入的一个重要的概念,JDK的开发人员经过研究发现,在大多数情况下,锁不仅不存在多线程竞争,而且总是由同一线程多次获得。也就是说在很多时候我们是假设有多线程的场景,但是实际上却是单线程的。所以偏向锁是在单线程执行代码块时使用的机制。

原理是什么呢,我们前面提到锁的争夺实际上是Monitor对象的争夺,还有每个对象都有一个对象头,对象头是由Mark Word和Klass pointer 组成的。一旦有线程持有了这个锁对象,标志位修改为1,就进入偏向模式,同时会把这个线程的ID记录在对象的Mark Word中,当同一个线程再次进入时,就不再进行同步操作,这样就省去了大量的锁申请的操作,从而提高了性能。

一旦有多个线程开始竞争锁的话呢?那么偏向锁并不会一下子升级为重量级锁,而是先升级为轻量级锁。

轻量级锁

如果获取偏向锁失败,也就是有多个线程竞争锁的话,就会升级为JDK1.6引入的轻量级锁,Mark Word 的结构也变为轻量级锁的结构。

执行同步代码块之前,JVM会在线程的栈帧中创建一个锁记录(Lock Record),并将Mark Word拷贝复制到锁记录中。然后尝试通过CAS操作将Mark Word中的锁记录的指针,指向创建的Lock Record。如果成功表示获取锁状态成功,如果失败,则进入自旋获取锁状态。

自旋锁的原理在上面已经讲过了,如果自旋获取锁也失败了,则升级为重量级锁,也就是把线程阻塞起来,等待唤醒。

重量级锁

重量级锁就是一个悲观锁了,但是其实不是最坏的锁,因为升级到重量级锁,是因为线程占用锁的时间长(自旋获取失败),锁竞争激烈的场景,在这种情况下,让线程进入阻塞状态,进入阻塞队列,能减少cpu消耗。所以说在不同的场景使用最佳的解决方案才是最好的技术。synchronized在不同的场景会自动选择不同的锁,这样一个升级锁的策略就体现出了这点。

小结

偏向锁:适用于单线程执行。

轻量级锁:适用于锁竞争较不激烈的情况。

重量级锁:适用于锁竞争激烈的情况。

ReentrantLock与synchronized

我们看一下他们的区别:

  • synchronized是Java语法的一个关键字,加锁的过程是在JVM底层进行。Lock是一个类,是JDK应用层面的,在JUC包里有丰富的API。
  • synchronized在加锁和解锁操作上都是自动完成的,Lock锁需要我们手动加锁和解锁。
  • Lock锁有丰富的API能知道线程是否获取锁成功,而synchronized不能。
  • synchronized能修饰方法和代码块,Lock锁只能锁住代码块。
  • Lock锁有丰富的API,可根据不同的场景,在使用上更加灵活。
  • synchronized是非公平锁,而Lock锁既有非公平锁也有公平锁,可以由开发者通过参数控制。

个人觉得在锁竞争不是很激烈的场景,使用synchronized,语义清晰,实现简单,JDK1.6后引入了偏向锁,轻量级锁等概念后,性能也能保证。而在锁竞争激烈,复杂的场景下,则使用Lock锁会更灵活一点,性能也较稳定。

总之就是synchronized在大量线程争抢锁的情况下,按照其原理,可能会有大量线程陷入重量级锁的状况,然后就会有用户台和内核态的切换,所以大量的并发情况下使用ReentrantLock,在并发量不大的情况下,使用synchronized更简单

synchronized使用示例

主要是分为类锁和对象锁,其中类锁的话,

类锁示例
    public static void main(String[] args) throws ExecutionException, InterruptedException {

        ThreadPoolExecutor executor = new ThreadPoolExecutor(100, 100,
                0L, TimeUnit.MILLISECONDS,
                new LinkedBlockingQueue<>());
        List<Callable<String>> threadList = new ArrayList<>();
        for (int i = 0; i < 100; i++) {
            int a = i+1;
            threadList.add(()-> sub());
        }

        // 批量执行,只支持Callable类型的入参
        List<Future<String>> futures = executor.invokeAll(threadList);
        for (Future<String> future1 : futures) {
            System.out.println(future1.get());
        }

    }
    static int num = 100;

    private synchronized static String sub(){
        num = num - 1;
        // 让测试效果更明显
        try {
            Thread.sleep(1);
        } catch (InterruptedException e) {

        }
        return Thread.currentThread().getName() +":"+ num;
    }

效果等同

    private static String sub(){
        synchronized (safeTest2.class){
            num = num - 1;
            // 让测试效果更明显
            try {
                Thread.sleep(1);
            } catch (InterruptedException e) {

            }
            return Thread.currentThread().getName() +":"+ num;
        }
    }
实例锁实例
public class safeTest3 {
    public static void main(String[] args) throws ExecutionException, InterruptedException {

        ThreadPoolExecutor executor = new ThreadPoolExecutor(10, 10,
                0L, TimeUnit.MILLISECONDS,
                new LinkedBlockingQueue<>());
        // 这个对象必须放在循环的外部,否则无效果
        safeTest3 safeTest3 = new safeTest3();
        for (int i = 0; i < 100; i++) {
            int a = i + 1;
            executor.execute(()->safeTest3.sub());
        }
    }
    int num = 100;

    private synchronized void sub() {

        num = num - 1;
        // 让测试效果更明显
        try {
            Thread.sleep(1);
        } catch (InterruptedException e) {

        }
        System.out.println(Thread.currentThread().getName() + ":" + num);

    }
}

效果等同

    private void sub() {
        synchronized(this){
            num = num - 1;
            // 让测试效果更明显
            try {
                Thread.sleep(1);
            } catch (InterruptedException e) {

            }
            System.out.println(Thread.currentThread().getName() + ":" + num);
        }
    }
字符串作为锁对象

字符串在堆中是有一个统一的常量池,同一个字符串是堆中的同一个对象,所以我们对同一个字符串加锁,所有线程共享这把锁

    private void sub() {
        synchronized("你好"){
            num = num - 1;
            // 让测试效果更明显
            try {
                Thread.sleep(1);
            } catch (InterruptedException e) {

            }
            System.out.println(Thread.currentThread().getName() + ":" + num);
        }
    }
锁的使用说明

这个不是看是不是调用同一个方法,而是判定是不是获取的同一个锁对象,

1、锁对象中类对象和实例对象是不一样的,类对象在堆中只有一个,字符串在堆中也只有一个,所以只要是类锁或者字符串包裹的代码,都只有一个线程可以进入执行

2、而实例对象锁,由于一个类可以创建多个实例对象,你懂得。。。。。(抱歉,解释不清了)

wait和notify

wait和notify都是Object的方法,使用的对象必须持有锁,上面线程声明周期中的等待池和锁池的概念

    public static void main(String[] args) throws ExecutionException, InterruptedException {

        ThreadPoolExecutor executor = new ThreadPoolExecutor(10, 10,
                0L, TimeUnit.MILLISECONDS,
                new LinkedBlockingQueue<>());
        // 这个对象必须放在循环的外部,否则无效果
        safeTest5 safeTest3 = new safeTest5();
        for (int i = 0; i < 5; i++) {
            int a = i + 1;
            executor.execute(()->safeTest3.sub());
        }

        executor.execute(()->{
            synchronized ("你好呀"){
                "你好呀".notifyAll();
            }
        });

    }
    int num = 100;

    private void sub() {
        synchronized("你好呀"){
            try {
                "你好呀".wait();
            } catch (InterruptedException e) {
                e.printStackTrace();
            }

            num = num - 1;
            // 让测试效果更明显
            try {
                Thread.sleep(1);
            } catch (InterruptedException e) {

            }
            System.out.println(Thread.currentThread().getName() + ":" + num);
        }
    }

锁的分类

死锁

大致是两个线程独获取到了对方需要的锁,导致这两个线程都处在阻塞状态

public class DeadLock implements Runnable {
    public int flag = 1;  
    //静态对象是类的所有对象共享的  
    private static Object o1 = new Object(), o2 = new Object();  
    @Override 
    public void run() {  
        System.out.println("flag=" + flag);  
        if (flag == 1) {  
            synchronized (o1) {  
                try {  
                    Thread.sleep(500);  
                } catch (Exception e) {  
                    e.printStackTrace();  
                }  
                synchronized (o2) {  
                    System.out.println("1");  
                }  
            }  
        }  
        if (flag == 0) {  
            synchronized (o2) {  
                try {  
                    Thread.sleep(500);  
                } catch (Exception e) {  
                    e.printStackTrace();  
                }  
                synchronized (o1) {  
                    System.out.println("0");  
                }  
            }  
        }  
    }  
   
    public static void main(String[] args) {  
           
        DeadLock td1 = new DeadLock();  
        DeadLock td2 = new DeadLock();  
        td1.flag = 1;  
        td2.flag = 0;  
        //td1,td2都处于可执行状态,但JVM线程调度先执行哪个线程是不确定的。  
        //td2的run()可能在td1的run()之前运行  
        new Thread(td1).start();  
        new Thread(td2).start();  

    }
}

可重入锁

这部分内容来自:https://blog.csdn.net/NullpointerExcep/article/details/127352527

可重入锁,也叫做递归锁,指的是同一线程外层函数获得锁之后,内层递归函数仍然有获取该锁的代码,但不受影响。

在JAVA环境下ReentrantLock和synchronized都是可重入锁。

synchronized是一个可重入锁。在一个类中,如果synchronized方法1调用了synchronized方法2,方法2是可以正常执行的,这说明synchronized是可重入锁。否则,在执行方法2想获取锁的时候,该锁已经在执行方法1时获取了,那么方法2将永远得不到执行。

可重入锁在什么场景使用呢?

可重入锁主要用在线程需要多次进入临界区代码时,需要使用可重入锁。具体的例子,比如上文中提到的一个synchronized方法需要调用另一个synchronized方法时。

可重入锁的实现原理是怎么样的?

加锁时,需要判断锁是否已经被获取。如果已经被获取,则判断获取锁的线程是否是当前线程。如果是当前线程,则给获取次数加1。如果不是当前线程,则需要等待。

释放锁时,需要给锁的获取次数减1,然后判断,次数是否为0了。如果次数为0了,则需要调用锁的唤醒方法,让锁上阻塞的其他线程得到执行的机会。

volatile

让多个线程对同一变量的修改,对其他线程可见。

1、可以实现更新操作的原子性,这里特别说明i++ 是+1和赋值两个操作,所以不能用volatile来保证原子性

2、volatile是CAS机制的基础。

<img src=“https://cdn.jsdelivr.net/gh/cloudinwinter/myimage@master/blogImg/20221205/18d5494eef01f3a2934c1d4d26f74ef365c607c7d.png@f_auto”/"/>

CAS机制

原理

深入一点大致就是CPU存在多级缓存,同一个变量的值在主内存的数据和cpu高速缓存中的数据可能不一致,然后存在一个对比修改的过程。

  1. 程序以及数据被加载到主内存
  2. 指令和数据被加载到CPU的高速缓存
  3. CPU执行指令,把结果写到高速缓存
  4. 高速缓存中的数据写回主内存

<img src=“https://cdn.jsdelivr.net/gh/cloudinwinter/myimage@master/blogImg/20221205/18669504-667b424d7e3aec3d.png” alt=“img” style=“zoom: 50%;” /"/>

核心方法说明

java中CAS似乎都是调用sun.misc.Unsafe的相关方法来实现,这个方法很多地方都用了,比容原子类,AQS相关实现等

 public final native boolean compareAndSwapInt(Object var1, long var2, int var4, int var5);

看了上面的原理部分知道第三个参数是旧址,第四个参数是新值

然后第一个和第二个参数用于定位你需要修改的数据的内存地址,

var1:要修改的对象起始地址 如:0x00000111
var2:需要修改的具体内存地址 如100 。0x0000011+100 = 0x0000111就是要修改的值的地址
注意没有var3
var4:期望内存中的值,拿这个值和0x0000111内存中的中值比较,如果为true,则修改,返回ture,否则返回false,等待下次修改。
var5:如果上一步比较为ture,则把var5更新到0x0000111其实的内存中。
原子操作,直接操作内存。

ConcurrentHashMap

HashMap

数据结构

1、JDK8的HashMap是数组+链表+红黑树实现的

数组的大小必须是2的幂次方数

2、通过hashcode & (table.length - 1)计算在数组中的位置

1.与&:遇0则0

2.或 |:遇1则1

所以保证每一位都是1,按位与的计算结果为0到数组大小,当然我们设置大小的时候似乎不用太在意,里面有相关的转换来帮我们达到特定大小,比如我们初始化的是31或者31,那么初始化的数组大小是64,具体里面的位运算太多,头晕不看了

put方法流程

ConcurrentHashMap

原理和jdk8代码

数据结构:ReentrantLock+Segment+HashEntry,一个Segment中包含一个HashEntry数组,每个
HashEntry又是一个链表结构
元素查询:二次hash,第一次Hash定位到Segment,第二次Hash定位到元素所在的链表的头部
锁:Segment分段锁 Segment继承了ReentrantLock,锁定操作的Segment,其他的Segment不受影
响,并发度为segment个数,可以通过构造函数指定,数组扩容不会影响其他的segment
get方法无需加锁,volatile保证

jdk8:
数据结构:synchronized+CAS+Node+红黑树,Node的val和next都用volatile修饰,保证可见性
查找,替换,赋值操作都使用CAS
锁:锁链表的head节点,不影响其他元素的读写,锁粒度更细,效率更高,扩容时,阻塞所有的读写
操作、并发扩容
读操作无锁:
Node的val和next使用volatile修饰,读写线程对该变量互相可见
数组用volatile修饰,保证扩容时被读线程感知

    final V putVal(K key, V value, boolean onlyIfAbsent) {
        if (key == null || value == null) throw new NullPointerException();
        int hash = spread(key.hashCode());
        int binCount = 0;
        // 1、遍历node数组,node是key-value结构
        for (Node<K,V>[] tab = table;;) {
            Node<K,V> f; int n, i, fh;
            if (tab == null || (n = tab.length) == 0)// 初始化node数组
                tab = initTable();
            else if ((f = tabAt(tab, i = (n - 1) & hash)) == null) {
                // 判断如果数组当前位置为null,就使用cas机制设置数组这个位置的值,设置失败结束当前循环(说明其他线程设置了)
                if (casTabAt(tab, i, null,
                             new Node<K,V>(hash, key, value, null)))
                    break;                   // no lock when adding to empty bin
            }
            else if ((fh = f.hash) == MOVED)
                // 帮助扩容的逻辑
                tab = helpTransfer(tab, f);
            else {
                V oldVal = null;
                // 对于单个node的操作进行加锁
                synchronized (f) {
                    if (tabAt(tab, i) == f) {
                        if (fh >= 0) {
                            binCount = 1;
                            for (Node<K,V> e = f;; ++binCount) {
                                K ek;
                                if (e.hash == hash &&
                                    ((ek = e.key) == key ||
                                     (ek != null && key.equals(ek)))) {
                                    oldVal = e.val;
                                    if (!onlyIfAbsent)
                                        e.val = value;
                                    break;
                                }
                                Node<K,V> pred = e;
                                if ((e = e.next) == null) {
                                    pred.next = new Node<K,V>(hash, key,
                                                              value, null);
                                    break;
                                }
                            }
                        }
                        else if (f instanceof TreeBin) {
                            Node<K,V> p;
                            binCount = 2;
                            if ((p = ((TreeBin<K,V>)f).putTreeVal(hash, key,
                                                           value)) != null) {
                                oldVal = p.val;
                                if (!onlyIfAbsent)
                                    p.val = value;
                            }
                        }
                    }
                }
                if (binCount != 0) {
                    if (binCount >= TREEIFY_THRESHOLD)
                        treeifyBin(tab, i);
                    if (oldVal != null)
                        return oldVal;
                    break;
                }
            }
        }
        addCount(1L, binCount);
        return null;
    }

使用场景

定时任务计算时存储一下每次计算都需要用到的数据。

建立websoket连接时用于保存客户端id

@Slf4j
@Component
@ServerEndpoint("/websocket/incident")
public class WebSocket {

    /**
     *  与某个客户端的连接对话,需要通过它来给客户端发送消息
     */
    private Session session;

     /**
     * 标识当前连接客户端的用户名
     */
    private String uuid;

    /**
     *  用于存所有的连接服务的客户端,这个对象存储是安全的
     */
    private static ConcurrentHashMap<String,WebSocket> webSocketSet = new ConcurrentHashMap<>();


    @OnOpen
    public void OnOpen(Session session){

        this.session = session;
        String id = IdGenerator.randomUUID();
        this.uuid = id;
        // name是用来表示唯一客户端,如果需要指定发送,需要指定发送通过name来区分
        webSocketSet.put(id,this);
        log.info("[WebSocket] 连接成功,当前连接人数为:={}",webSocketSet.size());
    }


    @OnClose
    public void OnClose(){
        webSocketSet.remove(this.uuid);
        log.info("[WebSocket] 退出成功,当前连接人数为:={}",webSocketSet.size());
    }

    @OnMessage
    public void OnMessage(String message){
        log.info("[WebSocket] 收到消息:{}",message);
    }


    /**
     *
     * @param session
     * @param error
     */
    @OnError
    public void onError(Session session, Throwable error) {
        log.error("websocket客户端连接异常:",error);
    }
    /**
     * 群发
     * @param message
     */
    public void GroupSending(String message){
        for (WebSocket webSocket : webSocketSet.values()){
            try {
                webSocket.session.getBasicRemote().sendText(message);
            }catch (Exception e){
                e.printStackTrace();
            }
        }
    }

}

CopyOnWriteArrayList

原理

读不加锁,写加锁

关键属性 array 使用 volatile修饰,其修改添加方法中都先拷贝一个 array副本,然后对副本进行操作,操作完成后将副本赋值给 array属性。

    private transient volatile Object[] array;  

public boolean add(E e) {
        final ReentrantLock lock = this.lock;
        lock.lock();
        try {
            Object[] elements = getArray();
            int len = elements.length;
            Object[] newElements = Arrays.copyOf(elements, len + 1);
            newElements[len] = e;
            setArray(newElements);
            return true;
        } finally {
            lock.unlock();
        }
    }

AQS

AQS原理

AQS:AbstractQuenedSynchronizer抽象的队列式同步器。是除了java自带的synchronized关键字之外的锁机制。
AQS的全称为(AbstractQueuedSynchronizer),这个类在java.util.concurrent.locks包

AQS的核心思想是,如果被请求的共享资源空闲,则将当前请求资源的线程设置为有效的工作线程,并将共享资源设置为锁定状态,如果被请求的共享资源被占用,那么就需要一套线程阻塞等待以及被唤醒时锁分配的机制,这个机制AQS是用CLH队列锁实现的,即将暂时获取不到锁的线程加入到队列中。
CLH(Craig,Landin,and Hagersten)队列是一个虚拟的双向队列,虚拟的双向队列即不存在队列实例,仅存在节点之间的关联关系

AQS具备的特性: 阻塞等待队列 、共享/独占、 公平/非公平 、可重入 、允许中断

CLH队列说明

AbstractQuenedSynchronizer中的Node内部类,Node中的属性prev、next等用来指向上一个Node和下一个Node, AbstractQuenedSynchronizer中又定义了head和tail用于获取一整个Node队列的头部和尾部。

       +------+  prev +-----+       +-----+
  head |      | <---- |     | <---- |     |  tail
       +------+       +-----+       +-----+

在获取锁失败后,会进入队列的尾部,释放锁的时候会设置头部的下一个节点为新的头部,这个看我下面文档中的ReentrantLock中非公平锁加锁和解锁代码流程

Node的waitStatus

五种状态:

​ 1.初始状态 EXCLUSIVE

​ 2.CANCELLED = 1:说明节点已经 取消获取 lock 了(一般是由于 interrupt 或 timeout 导致的)很多时候是在cancelAcquire 里面进行设置这个标识

​ 3.SIGNAL = -1:表示当前节点的后继节点需要唤醒

​ 4.CONDITION = -2:当前节点在 Condition Queue 里面

​ 5.PROPAGATE = -3:当前节点获取到 lock 或进行 release lock 时, 共享模式的最终状态是 PROPAGATE(PS: 有可能共享模式的节点变成 PROPAGATE 之前就被其后继节点抢占 head 节点, 而从Sync Queue中被踢出掉)

        static final Node EXCLUSIVE = null;

        /** waitStatus value to indicate thread has cancelled */
        static final int CANCELLED =  1;
        /** waitStatus value to indicate successor's thread needs unparking */
        static final int SIGNAL    = -1;
        /** waitStatus value to indicate thread is waiting on condition */
        static final int CONDITION = -2;
        /**
         * waitStatus value to indicate the next acquireShared should
         * unconditionally propagate
         */
        static final int PROPAGATE = -3;

条件等待队列

AbstractQueuedSynchronizer.ConditionObject,这个待定吧,不太了解

相关方法说明

State三种访问方式: getState() setState() compareAndSetState()

AQS定义两种资源共享方式

  • Exclusive-独占,只有一个线程能执行,如ReentrantLock
  • Share-共享,多个线程可以同时执行,如Semaphore/CountDownLatch

不同的自定义同步器竞争共享资源的方式也不同。自定义同步器在实现时只需要实现共享资源state的获取与释放方式即可,至于具体线程等待队列的维护(如获取资源失败入队/唤醒出队等),AQS已经在顶层实现好了。自定义同步器实现时主要实现以下几种方法:

  • isHeldExclusively():该线程是否正在独占资源。只有用到condition才需要去实现它。
  • tryAcquire(int):独占方式。尝试获取资源,成功则返回true,失败则返回false。
  • tryRelease(int):独占方式。尝试释放资源,成功则返回true,失败则返回false。
  • tryAcquireShared(int):共享方式。尝试获取资源。负数表示失败;0表示成功,但没有剩余可用资源;正数表示成功,且有剩余资源。
  • tryReleaseShared(int):共享方式。尝试释放资源,如果释放后允许唤醒后续等待结点返回true,否则返回false。

条件等待队列相关方法

  • 通过signal或signalAll将条件队列中的节点转移到同步队列。(由条件队列转化为同步队列)
  • 调用Condition#await方法会释放当前持有的锁,然后阻塞当前线程,同时向Condition队列尾部添加一个节点,所以调用Condition#await方法的时候必须持有锁。
  • 调用Condition#signal方法会将Condition队列的首节点移动到阻塞队列尾部,然后唤醒因调用Condition#await方法而阻塞的线程(唤醒之后这个线程就可以去竞争锁了),所以调用Condition#signal方法的时候必须持有锁,持有锁的线程唤醒被因调用Condition#await方法而阻塞的线程。

AQS相关实现

似乎

AQS

ReentrantLock

基本使用方式

public class ReentrantLockTest {
    private static final ReentrantLock lock = new ReentrantLock();
    public static void main(String[] args) throws ExecutionException, InterruptedException {

        ThreadPoolExecutor executor = new ThreadPoolExecutor(10, 10,
                0L, TimeUnit.MILLISECONDS,
                new LinkedBlockingQueue<>());

        for (int i = 0; i < 100; i++) {
            int a = i + 1;
            executor.execute(()->sub());
        }


    }
    static int num = 100;

    private static void sub() {
        lock.lock();
        try {
            num = num - 1;
            // 让测试效果更明显
            Thread.sleep(1);
            System.out.println(Thread.currentThread().getName() + ":" + num);
        } catch (Exception e) {
            e.printStackTrace();
        } finally {
            // 必须手动解锁
            lock.unlock();
        }

    }
}

条件队列基本使用方式

更多示例可以参考AbstractQueuedSynchronizer.ConditionObject的调用位置

@Slf4j
public class ConditionTest {

    public static void main(String[] args) {

        Lock lock = new ReentrantLock();
        Condition condition = lock.newCondition();

        new Thread(() -> {
            lock.lock();
            try {
                log.debug(Thread.currentThread().getName() + " 开始处理任务1");
                // 释放当前持有的锁,并且阻塞当前线程,同时向Condition队列尾部添加一个节点,持有锁的对象就是condition这个实例对象
                condition.await();
                log.debug(Thread.currentThread().getName() + " 结束处理任务1");
            } catch (InterruptedException e) {
                e.printStackTrace();
            } finally {
                lock.unlock();
            }
        }).start();

        new Thread(() -> {
            lock.lock();
            try {
                log.debug(Thread.currentThread().getName() + " 开始处理任务2");
                Thread.sleep(2000);
                // 唤醒因调用Condition#await方法而阻塞的线程,必须持有锁才能调用
                condition.signal();
                log.debug(Thread.currentThread().getName() + " 结束处理任务2");
            } catch (Exception e) {
                e.printStackTrace();
            } finally {
                lock.unlock();
            }
        }).start();
    }

}
21:13:12.384 [Thread-0] DEBUG cn.sry1201.thread.test2.ConditionTest - Thread-0 开始处理任务1
21:13:12.386 [Thread-1] DEBUG cn.sry1201.thread.test2.ConditionTest - Thread-1 开始处理任务2
21:13:14.394 [Thread-1] DEBUG cn.sry1201.thread.test2.ConditionTest - Thread-1 结束处理任务2
21:13:14.394 [Thread-0] DEBUG cn.sry1201.thread.test2.ConditionTest - Thread-0 结束处理任务1

加锁源码分析

建议先了解一下CAS

非公平锁

1、包含公平锁和非公平锁两个内部类,都继承自AbstractQuenedSynchronizer

// 默认是非公平锁    
public ReentrantLock() {
        sync = new NonfairSync();
    }

2、非公平锁加锁方法

        final void lock() {
            // 尝试将AbstractQueuedSynchronizer#state设置为已经加锁,方法里面的stateOffset用于定位state的内存地址
            if (compareAndSetState(0, 1))
                //AbstractOwnableSynchronizer#exclusiveOwnerThread 锁的拥有者设置为当前线程
                setExclusiveOwnerThread(Thread.currentThread());
            else
                // 没有获取到锁进行下一步
                acquire(1);
        }

    public final void acquire(int arg) {
    	// 尝试加锁,加锁失败为true
        if (!tryAcquire(arg) &&
        	// addwaiter将当前程组装成node对象,然后添加到队列的尾部,之前的尾部向前移动,并且返回一个node(可能是尾部的node)
        	// 
            acquireQueued(addWaiter(Node.EXCLUSIVE), arg))
            selfInterrupt();
    }
    final boolean acquireQueued(final Node node, int arg) {
        boolean failed = true;
        try {
            boolean interrupted = false;
            
            // 这里就实现了每一个节点只判断上一个节点的状态,来确定自身是不是可以获取锁了
            for (;;) {
                //获取传入节点的上一个节点
                final Node p = node.predecessor();
                // 判断节点是不是头节点,是的话尝试加锁
                if (p == head && tryAcquire(arg)) {
                    // 头结点加锁成功,设置下一个节点为头结点
                    setHead(node);
                    //去除头节点和下一个节点的关联关系
                    p.next = null; // help GC
                    failed = false;
                    return interrupted;
                }
                // 将节点循环向前移动, 判断节点状态waitStatus来确定是否需要跳过,新加入的节点会被设置成SIGNAL
                if (shouldParkAfterFailedAcquire(p, node) &&
                    // waitStatus是默认状态才进入这里,里面会调用LockSupport.park将当前线程设置成阻塞状态
                    parkAndCheckInterrupt())
                    // 返回为true后,当前线程会被设置成中断状态
                    interrupted = true;
            }
        } finally {
            if (failed)
                cancelAcquire(node);
        }
    }

非公平锁的tryLock方法最终调用到这,lock方法的获取锁的逻辑也调用到这里,

        final boolean nonfairTryAcquire(int acquires) {
            final Thread current = Thread.currentThread();
            int c = getState();
            if (c == 0) {
                if (compareAndSetState(0, acquires)) {
                    setExclusiveOwnerThread(current);
                    return true;
                }
            }
            else if (current == getExclusiveOwnerThread()) {
                int nextc = c + acquires;
                if (nextc < 0) // overflow
                    throw new Error("Maximum lock count exceeded");
                setState(nextc);
                return true;
            }
            return false;
        }
CLH队列加锁部分设置逻辑
// 将新的节点置为最后一个节点,并且设置新节点和之前的最后一个节点的关联关系
private Node addWaiter(Node mode) {
        Node node = new Node(Thread.currentThread(), mode);
        // Try the fast path of enq; backup to full enq on failure
        Node pred = tail;
        if (pred != null) {
            node.prev = pred;
            if (compareAndSetTail(pred, node)) {
                pred.next = node;
                return node;
            }
        }
        enq(node);
        return node;
    }
 
private Node enq(final Node node) {
        for (;;) {
            Node t = tail;
            if (t == null) { // Must initialize
                if (compareAndSetHead(new Node()))
                    // 最后一个节点是null,那么就将队列头部设置成尾部,
                    tail = head;
            } else {
                node.prev = t;
                // 如果最新的节点加入和尾部的节点不一致,那么将新节点设置成尾部节点的下一个节点
                if (compareAndSetTail(t, node)) {
                    t.next = node;
                    return t;
                }
            }
        }
    }
Node状态设置逻辑
// 传入的是节点的上一个节点  和  节点
private static boolean shouldParkAfterFailedAcquire(Node pred, Node node) {
        int ws = pred.waitStatus;
        if (ws == Node.SIGNAL)
			// SIGNAL表示这个节点的后续节点需要暂停
            return true;
        if (ws > 0) {
			// 表示当前节点的上一个节点是cancelled,可以越过这个节点
            do {
                node.prev = pred = pred.prev;
            } while (pred.waitStatus > 0);
            pred.next = node;
        } else {
			// 将节点置为SIGNAL,表示等待上一个节点被唤醒
            compareAndSetWaitStatus(pred, ws, Node.SIGNAL);
        }
        return false;
    }
释放锁
    public final boolean release(int arg) {
        if (tryRelease(arg)) {
            Node h = head;
            if (h != null && h.waitStatus != 0)
                unparkSuccessor(h);
            return true;
        }
        return false;
    }
    private void unparkSuccessor(Node node) {
        /*
         * If status is negative (i.e., possibly needing signal) try
         * to clear in anticipation of signalling.  It is OK if this
         * fails or if status is changed by waiting thread.
         */
        int ws = node.waitStatus;
        if (ws < 0)
            compareAndSetWaitStatus(node, ws, 0);

        /*
         * Thread to unpark is held in successor, which is normally
         * just the next node.  But if cancelled or apparently null,
         * traverse backwards from tail to find the actual
         * non-cancelled successor.
         */
        Node s = node.next;
        if (s == null || s.waitStatus > 0) {
            s = null;
            for (Node t = tail; t != null && t != node; t = t.prev)
                if (t.waitStatus <= 0)
                    s = t;
        }
        if (s != null)
            LockSupport.unpark(s.thread);
    }
// 释放锁会将AbstractOwnableSynchronizer#exclusiveOwnerThread 锁的拥有者设置为null
protected final boolean tryRelease(int releases) {
    		// 由于锁的可重入,这里知道减到0才表示释放成功
            int c = getState() - releases;
            if (Thread.currentThread() != getExclusiveOwnerThread())
                throw new IllegalMonitorStateException();
            boolean free = false;
            if (c == 0) {
                free = true;
                setExclusiveOwnerThread(null);
            }
    		// 设置锁的状态 
            setState(c);
            return free;
        }

公平锁

看完了非公平锁的源码,公平锁似乎就是 去除了 线程一进来就获取锁,获取锁失败才加入CLH队列的逻辑。也就是说非公平锁已经在同步队列中的还是按照先后顺序执行

ReentrantReadWriteLock

读写锁介绍

现实中有这样一种场景:对共享资源有读和写的操作,且写操作没有读操作那么频繁(读多写少)。在没有写操作的时候,多个线程同时读一个资源没有任何问题,所以应该允许多个线程同时读取共享资源(读读可以并发);但是如果一个线程想去写这些共享资源,就不应该允许其他线程对该资源进行读和写操作了(读写,写读,写写互斥)。在读多于写的情况下,读写锁能够提供比排它锁更好的并发性和吞吐量。

针对这种场景,JAVA的并发包提供了读写锁ReentrantReadWriteLock,它内部,维护了一对相关的锁,一个用于只读操作,称为读锁;一个用于写入操作,称为写锁,描述如下:

线程进入读锁的前提条件:

  • 没有其他线程的写锁
  • 没有写请求或者有写请求,但调用线程和持有锁的线程是同一个。

线程进入写锁的前提条件:

  • 没有其他线程的读锁
  • 没有其他线程的写锁

而读写锁有以下三个重要的特性:

  • 公平选择性:支持非公平(默认)和公平的锁获取方式,吞吐量还是非公平优于公平。
  • 可重入:读锁和写锁都支持线程重入。以读写线程为例:读线程获取读锁后,能够再次获取读锁。写线程在获取写锁之后能够再次获取写锁,同时也可以获取读锁。
  • 锁降级:遵循获取写锁、再获取读锁最后释放写锁的次序,写锁能够降级成为读锁。

使用示例

public class ReadWriteLockTest {
    static Map<String, String> map = new HashMap<>();
    private ReadWriteLock readWriteLock = new ReentrantReadWriteLock();
    private Lock r = readWriteLock.readLock();
    private Lock w = readWriteLock.writeLock();

    // 读操作上读锁
    public String get(String key) {
        r.lock();
        try {
            System.out.println("进入读锁");
            TimeUnit.SECONDS.sleep(5);
            // TODO 业务逻辑
            return map.get(key);
        } catch (InterruptedException e) {
            e.printStackTrace();
        } finally {
            r.unlock();
        }
        return null;
    }

    // 写操作上写锁
    public void put(String key, String value) {
        w.lock();
        try {
            System.out.println("进入写锁");
            TimeUnit.SECONDS.sleep(10);
            // TODO 业务逻辑
            map.put(key,value);
        } catch (InterruptedException e) {

        } finally {
            w.unlock();
        }
    }

    public static void main(String[] args) {
        ThreadPoolExecutor executor = new ThreadPoolExecutor(10, 10,
                0L, TimeUnit.MILLISECONDS,
                new LinkedBlockingQueue<>());
        ReadWriteLockTest readWriteLockTest = new ReadWriteLockTest();
        // 验证写写互斥,读写互斥,读和读不影响, 主要是看日志的打印时间
        executor.execute(()-> readWriteLockTest.put("1","2"));
        executor.execute(()-> System.out.println(readWriteLockTest.get("1")));
        executor.execute(()-> System.out.println(readWriteLockTest.get("1")));
        executor.execute(()-> readWriteLockTest.put("1","3"));
        executor.execute(()-> readWriteLockTest.put("1","4"));
    }
}

源码分析

。。。。。

Semaphore

介绍

Semaphore,俗称信号量,它是操作系统中PV操作的原语在java的实现,它也是基于AbstractQueuedSynchronizer实现的。

Semaphore的功能非常强大,大小为1的信号量就类似于互斥锁,通过同时只能有一个线程获取信号量实现。大小为n(n>0)的信号量可以实现限流的功能,它可以实现只能有n个线程同时获取信号量。

构造方法

默认非公平锁

    public Semaphore(int permits) {
        sync = new NonfairSync(permits);
    }
  • permits 表示许可证的数量(资源数)
  • fair 表示公平性,如果这个设为 true 的话,下次执行的线程会是等待最久的线程

常用方法

  • acquire() 表示阻塞并获取许可
  • tryAcquire() 方法在没有许可的情况下会立即返回 false,要获取许可的线程不会阻塞
  • release() 表示释放许可
  • int availablePermits():返回此信号量中当前可用的许可证数。
  • int getQueueLength():返回正在等待获取许可证的线程数。
  • boolean hasQueuedThreads():是否有线程正在等待获取许可证。
  • void reducePermit(int reduction):减少 reduction 个许可证
  • Collection getQueuedThreads():返回所有等待获取许可证的线程集合

原理

使用的是AQS的共享锁,加锁时用了tryAcquireShared,其中代表锁状态的字段AbstractQueuedSynchronizer#state在构造方法中设置为令牌的数量,获取锁成功则减一,然后小于0时就不能再获取锁了

        final int nonfairTryAcquireShared(int acquires) {
            for (;;) {
                int available = getState();
                int remaining = available - acquires;
                if (remaining < 0 ||
                    compareAndSetState(available, remaining))
                    return remaining;
            }
        }

其他线程阻塞和添加到同步队列中的代码和ReentrantLock中一致,反正都是调用到了AbstractQueuedSynchronizer中继承的方法

使用示例

    // 定义一个只能允许5个线程同时执行的 信号量
    private static final Semaphore semaphore = new Semaphore(5);


    public static void main(String[] args) throws ExecutionException, InterruptedException {
        ThreadPoolExecutor executor = new ThreadPoolExecutor(10, 10,
                0L, TimeUnit.MILLISECONDS,
                new LinkedBlockingQueue<>());


        for (int i = 0; i < 100; i++) {
            int a = i + 1;
            executor.execute(()-> {
                try {
                    // 获取到令牌才能往后执行,semaphore支持获取多个令牌,默认是非公平锁,具体查看详细的构造方法
                    semaphore.acquire();
                    // 测试结果应该是每隔1s打印出 5行
                    System.out.println("这是第" + a + "个执行的线程");
                    Thread.sleep(1000);
                } catch (InterruptedException e) {

                }finally {
                    // 将令牌返还,应该获取几个令牌就返还几个令牌
                    semaphore.release();
                }
            });

        }

    }

使用场景

比如客服是有限的,所以只有当有客服处于空闲状态,才能接通电话。。。。。

CountDownLatch

介绍

CountDownLatch(闭锁)是一个同步协助类,允许一个或多个线程等待,直到其他线程完成操作集。

初始化多个令牌,countDown获取领取令牌,一直到令牌的数量为0,countDownLatch.await()获取到令牌为0后就会继续执行。需要说明的是令牌不能重复使用,所以一个CountDownLatch实例只能使用一次。

使用示例

public class CountDownLatchTest2 {
    public static void main(String[] args) throws Exception {

        CountDownLatch countDownLatch = new CountDownLatch(5);
        for (int i = 0; i < 5; i++) {
            final int index = i;
            new Thread(() -> {
                try {
                    Thread.sleep(1000 + ThreadLocalRandom.current().nextInt(1000));
                    System.out.println(Thread.currentThread().getName() + " finish task" + index);

                    countDownLatch.countDown();
                } catch (InterruptedException e) {
                    e.printStackTrace();
                }
            }).start();
        }

        // 主线程在阻塞,当计数器==0,就唤醒主线程往下执行。
        countDownLatch.await();
        System.out.println("主线程:在所有任务运行完成后,进行结果汇总");

    }
}

CountDownLatch实现原理

底层基于 AbstractQueuedSynchronizer 实现,CountDownLatch 构造函数中指定的count直接赋给AQS的state;每次countDown()则都是release(1)减1,最后减到0时unpark阻塞线程;这一步是由最后一个执行countdown方法的线程执行的。

而调用await()方法时,当前线程就会判断state属性是否为0,如果为0,则继续往下执行,如果不为0,则使当前线程进入等待状态,直到某个线程将state属性置为0,其就会唤醒在await()方法中等待的线程。

CyclicBarrier

CyclicBarrier介绍

更多:https://www.jianshu.com/p/043ac5689002

字面意思回环栅栏,通过它可以实现让一组线程等待至某个状态(屏障点)之后再全部同时执行。叫做回环是因为当所有等待线程都被释放以后,CyclicBarrier可以被重用。

构造方法和重要方法

// parties表示屏障拦截的线程数量,每个线程调用 await 方法告诉 CyclicBarrier 我已经到达了屏障,然后当前线程被阻塞。
 public CyclicBarrier(int parties)
 // 用于在线程到达屏障时,优先执行 barrierAction,方便处理更复杂的业务场景(该线程的执行时机是在到达屏障之后再执行)
//屏障 指定数量的线程全部调用await()方法时,这些线程不再阻塞
// BrokenBarrierException 表示栅栏已经被破坏,破坏的原因可能是其中一个线程 await() 时被中断或者超时
public int await() throws InterruptedException, BrokenBarrierException
public int await(long timeout, TimeUnit unit) throws InterruptedException, BrokenBarrierException, TimeoutException
//循环  通过reset()方法可以进行重置

使用示例

public class CyclicBarrierTest3 {

    private static final CyclicBarrier cyclicBarrier = new CyclicBarrier(6);


    public static void main(String[] args) throws InterruptedException {

        AtomicInteger counter = new AtomicInteger();
        ThreadPoolExecutor threadPoolExecutor = new ThreadPoolExecutor(
                12, 12, 0, TimeUnit.SECONDS,
                new ArrayBlockingQueue<>(100),
                (r) -> new Thread(r, counter.addAndGet(1) + " 号 "),
                new ThreadPoolExecutor.AbortPolicy());

        for (int i = 0; i < 12; i++) {
            if(i == 5){
                // 让日志效果更明显
                TimeUnit.SECONDS.sleep(20);
            }
            threadPoolExecutor.submit(() ->{
                int sleepMills = ThreadLocalRandom.current().nextInt(1000);

                try {
                    Thread.sleep(sleepMills);
                    System.out.println(Thread.currentThread().getName() + " 选手已就位, 准备共用时: " + sleepMills + "ms" + cyclicBarrier.getNumberWaiting());
                    // 线程到达屏障后等待其他选后就位
                    cyclicBarrier.await();
						System.out.println("开始了");	
                } catch (InterruptedException e) {
                   e.printStackTrace();
                    //其他线程调用CyclicBarrier.reset(),会触发此异常
                } catch (BrokenBarrierException e) {
                    e.printStackTrace();
                }

            });
        }
    }
}

Atomic原子类

基于CAS机制,最终还是使用了sun.misc.Unsafe中相关方法

在java.util.concurrent.atomic包里提供了一组原子操作类:

**基本类型:**AtomicInteger、AtomicLong、AtomicBoolean;

**引用类型:**AtomicReference、AtomicStampedRerence、AtomicMarkableReference;

**数组类型:**AtomicIntegerArray、AtomicLongArray、AtomicReferenceArray

对象属性原子修改器:AtomicIntegerFieldUpdater、AtomicLongFieldUpdater、AtomicReferenceFieldUpdater

原子类型累加器(jdk1.8增加的类):DoubleAccumulator、DoubleAdder、LongAccumulator、LongAdder、Striped64

AtomicInteger

说明

基于CAS机制,重要属性是value,其内所有的cas都是操作这个value的内存地址。一般每个独立的业务都需要一个AtomicInteger实例对象。

重要方法

//以原子的方式将实例中的原值加1,返回的是自增前的旧值;
public final int getAndIncrement() {
    return unsafe.getAndAddInt(this, valueOffset, 1);
}
 
//getAndSet(int newValue):将实例中的值更新为新值,并返回旧值;
public final boolean getAndSet(boolean newValue) {
    boolean prev;
    do {
        prev = get();
    } while (!compareAndSet(prev, newValue));
    return prev;
}
 
//incrementAndGet() :以原子的方式将实例中的原值进行加1操作,并返回最终相加后的结果;
public final int incrementAndGet() {
    return unsafe.getAndAddInt(this, valueOffset, 1) + 1;
}
 
//addAndGet(int delta) :以原子方式将输入的数值与实例中原本的值相加,并返回最后的结果;
public final int addAndGet(int delta) {
    return unsafe.getAndAddInt(this, valueOffset, delta) + delta;

使用示例

自加一的方法演示,多线程安全

    public static void main(String[] args) throws ExecutionException, InterruptedException {
        ThreadPoolExecutor executor = new ThreadPoolExecutor(10, 10,
                0L, TimeUnit.MILLISECONDS,
                new LinkedBlockingQueue<>());

        AtomicInteger atomicInteger = new AtomicInteger();
        CountDownLatch countDownLatch = new CountDownLatch(100);
        for (int i = 0; i < 100; i++) {
            int a = i + 1;
            executor.execute(() -> {
                countDownLatch.countDown();
                atomicInteger.getAndIncrement();
                System.out.println(atomicInteger);
            });
        }

        countDownLatch.await();
        System.out.println("最终结果:" + atomicInteger.get());
    }

AtomicIntegerArray

说明

还是基于CAS,会动态计算数组中某个元素的内存地址,然后进行操作

重要方法

//addAndGet(int i, int delta):以原子更新的方式将数组中索引为i的元素与输入值相加;
public final int addAndGet(int i, int delta) {
    return getAndAdd(i, delta) + delta;
}
 
//getAndIncrement(int i):以原子更新的方式将数组中索引为i的元素自增加1;
public final int getAndIncrement(int i) {
    return getAndAdd(i, 1);
}
 
//compareAndSet(int i, int expect, int update):将数组中索引为i的位置的元素进行更新
public final boolean compareAndSet(int i, int expect, int update) {
    return compareAndSetRaw(checkedByteOffset(i), expect, update);

AtomicReference

使用示例

似乎没啥好说的,就是cas替换的对象变成的引用类型

public class AtomicReferenceTest {

    public static void main(String[] args) {
        User user1 = new User("张三", 23);
        User user2 = new User("李四", 25);
        User user3 = new User("王五", 20);

        //初始化为 user1
        AtomicReference<User> atomicReference = new AtomicReference<>();
        atomicReference.set(user1);

        //把 user2 赋给 atomicReference
        atomicReference.compareAndSet(user1, user2);
        System.out.println(atomicReference.get());

        //把 user3 赋给 atomicReference
        atomicReference.compareAndSet(user1, user3);
        System.out.println(atomicReference.get());

    }

}

AtomicIntegerFieldUpdater

public class AtomicIntegerFieldUpdaterTest {

    public static class Candidate {

        volatile int score = 0;

        AtomicInteger score2 = new AtomicInteger();
    }

    public static final AtomicIntegerFieldUpdater<Candidate> scoreUpdater =
            AtomicIntegerFieldUpdater.newUpdater(Candidate.class, "score");

    public static AtomicInteger realScore = new AtomicInteger(0);

    public static void main(String[] args) throws InterruptedException {

        final Candidate candidate = new Candidate();

        Thread[] t = new Thread[10000];
        for (int i = 0; i < 10000; i++) {
            t[i] = new Thread(new Runnable() {
                @Override
                public void run() {
                    if (Math.random() > 0.4) {
                        candidate.score2.incrementAndGet();
                        scoreUpdater.incrementAndGet(candidate);
                        realScore.incrementAndGet();
                    }
                }
            });
            t[i].start();
        }
        for (int i = 0; i < 10000; i++) {
            t[i].join();
        }
        System.out.println("AtomicIntegerFieldUpdater Score=" + candidate.score);
        System.out.println("AtomicInteger Score=" + candidate.score2.get());
        System.out.println("realScore=" + realScore.get());

    }
}

对于AtomicIntegerFieldUpdater 的使用稍微有一些限制和约束,约束如下:

(1)字段必须是volatile类型的,在线程之间共享变量时保证立即可见.eg:volatile int value = 3

(2)字段的描述类型(修饰符public/protected/default/private)与调用者与操作对象字段的关系一致。也就是说调用者能够直接操作对象字段,那么就可以反射进行原子操作。但是对于父类的字段,子类是不能直接操作的,尽管子类可以访问父类的字段。

(3)只能是实例变量,不能是类变量,也就是说不能加static关键字。

(4)只能是可修改变量,不能使final变量,因为final的语义就是不可修改。实际上final的语义和volatile是有冲突的,这两个关键字不能同时存在。

(5)对于AtomicIntegerFieldUpdater和AtomicLongFieldUpdater只能修改int/long类型的字段,不能修改其包装类型(Integer/Long)。如果要修改包装类型就需要使用AtomicReferenceFieldUpdater。

LongAdder/DoubleAdder

原理

AtomicInteger等基于cas的操作在高并发场景下,大量线程处于自旋状态,占用cpu资源,

    public final long getAndAddLong(Object var1, long var2, long var4) {
        long var6;
        do {
            var6 = this.getLongVolatile(var1, var2);
        } while(!this.compareAndSwapLong(var1, var2, var6, var6 + var4));

        return var6;
    }

所以可以使用LongAdder,

AtomicLong中有个内部变量value保存着实际的long值,所有的操作都是针对该变量进行。也就是说,高并发环境下,value变量其实是一个热点,也就是N个线程竞争一个热点。LongAdder的基本思路就是分散热点,将value值分散到一个数组中,不同线程会命中到数组的不同槽中,各个线程只对自己槽中的那个值进行CAS操作,这样热点就被分散了,冲突的概率就小很多。如果要获取真正的long值,只要将各个槽中的变量值累加返回。

    
    public void add(long x) {
        Cell[] as; long b, v; int m; Cell a;
        // cells数组已经不为null或者cas失败
        if ((as = cells) != null || !casBase(b = base, b + x)) {
            boolean uncontended = true;
            if (as == null || (m = as.length - 1) < 0 ||
                // getProbe()是当前线程获取一个int值, & m是达到求模一样的效果,是线程可能落在数组的任何一个索引上
                (a = as[getProbe() & m]) == null ||
                !(uncontended = a.cas(v = a.value, v + x)))
                longAccumulate(x, null, uncontended);
        }
    }

public long sum() {
        Cell[] as = cells; Cell a;
        long sum = base;
        if (as != null) {
            for (int i = 0; i < as.length; ++i) {
                if ((a = as[i]) != null)
                    sum += a.value;
            }
        }
        return sum;
    }

LongAccumulator

LongAccumulator是LongAdder的增强版。LongAdder只能针对数值的进行加减运算,而LongAccumulator提供了自定义的函数操作。具体再说吧。。。。。。。。

BlockingQueue

Queue接口

队列结构,可以从头部和尾部添加和移除元素

public interface Queue<E> extends Collection<E> {
    //添加一个元素,添加成功返回true, 如果队列满了,就会抛出异常
    boolean add(E e);
    //添加一个元素,添加成功返回true, 如果队列满了,返回false
    boolean offer(E e);
    //返回并删除队首元素,队列为空则抛出异常
    E remove();
    //返回并删除队首元素,队列为空则返回null
    E poll();
    //返回队首元素,但不移除,队列为空则抛出异常
    E element();
    //获取队首元素,但不移除,队列为空则返回null
    E peek();

BlockingQueue

BlockingQueue 继承了 Queue 接口,是队列的一种。Queue 和 BlockingQueue 都是在 Java 5 中加入的。阻塞队列(BlockingQueue)是一个在队列基础上又支持了两个附加操作的队列,常用解耦。两个附加操作:

  • 支持阻塞的插入方法put: 队列满时,队列会阻塞插入元素的线程,直到队列不满。
  • 支持阻塞的移除方法take: 队列空时,获取元素的线程会等待队列变为非空

BlockingQueue的容量都是固定的,否则无法形成队列满了阻塞的操作

方法介绍

BlockingQueue和JDK集合包中的Queue接口兼容,同时在其基础上增加了阻塞功能。

入队:

(1)offer(E e):如果队列没满,返回true,如果队列已满,返回false(不阻塞)

(2)offer(E e, long timeout, TimeUnit unit):可以设置阻塞时间,如果队列已满,则进行阻塞。超过阻塞时间,则返回false

(3)put(E e):队列没满的时候是正常的插入,如果队列已满,则阻塞,直至队列空出位置

出队:

(1)poll():如果有数据,出队,如果没有数据,返回null (不阻塞)

(2)poll(long timeout, TimeUnit unit):可以设置阻塞时间,如果没有数据,则阻塞,超过阻塞时间,则返回null

(3)take():队列里有数据会正常取出数据并删除;但是如果队列里无数据,则阻塞,直到队列里有数据

BlockingQueue常用方法示例

当队列满了无法添加元素,或者是队列空了无法移除元素时:

  1. 抛出异常:add、remove、element
  2. 返回结果但不抛出异常:offer、poll、peek
  3. 阻塞:put、take
方法抛出异常返回特定值阻塞阻塞特定时间
入队add(e)offer(e)put(e)offer(e, time, unit)
出队remove()poll()take()poll(time, unit)
获取队首元素element()peek()不支持不支持

是否有界

阻塞队列还有一个非常重要的属性,那就是容量的大小,分为有界和无界两种。无界队列意味着里面可以容纳非常多的元素,例如 LinkedBlockingQueue 的上限是 Integer.MAX_VALUE,是非常大的一个数,可以近似认为是无限容量,因为我们几乎无法把这个容量装满。但是有的阻塞队列是有界的,例如 ArrayBlockingQueue 如果容量满了,也不会扩容,所以一旦满了就无法再往里放数据了。

常见阻塞队列

BlockingQueue 接口的实现类都被放在了 juc 包中,它们的区别主要体现在存储结构上或对元素操作上的不同,但是对于take与put操作的原理,却是类似的。

队列描述
ArrayBlockingQueue基于数组结构实现的一个有界阻塞队列
LinkedBlockingQueue基于链表结构实现的一个有界阻塞队列
PriorityBlockingQueue支持按优先级排序的无界阻塞队列
DelayQueue基于优先级队列(PriorityBlockingQueue)实现的无界阻塞队列
SynchronousQueue不存储元素的阻塞队列
LinkedTransferQueue基于链表结构实现的一个无界阻塞队列
LinkedBlockingDeque基于链表结构实现的一个双端阻塞队列

LinkedBlockingQueue

LinkedBlockingQueue是一个基于链表实现的阻塞队列,默认情况下,该阻塞队列的大小为Integer.MAX_VALUE,由于这个数值特别大,所以 LinkedBlockingQueue 也被称作无界队列,代表它几乎没有界限,队列可以随着元素的添加而动态增长,但是如果没有剩余内存,则队列将抛出OOM错误。所以为了避免队列过大造成机器负载或者内存爆满的情况出现,我们在使用的时候建议手动传一个队列的大小。

LinkedBlockingQueue内部由单链表实现,只能从head取元素,从tail添加元素。LinkedBlockingQueue采用两把锁的锁分离技术实现入队出队互不阻塞,添加元素和获取元素都有独立的锁,也就是说LinkedBlockingQueue是读写分离的,读写操作可以并行执行。

ArrayBlockingQueue

ArrayBlockingQueue是最典型的有界阻塞队列,其内部是用数组存储元素的,初始化时需要指定容量大小,利用 ReentrantLock 实现线程安全。

在生产者-消费者模型中使用时,如果生产速度和消费速度基本匹配的情况下,使用ArrayBlockingQueue是个不错选择;当如果生产速度远远大于消费速度,则会导致队列填满,大量生产线程被阻塞。

使用独占锁ReentrantLock实现线程安全,入队和出队操作使用同一个锁对象,也就是只能有一个线程可以进行入队或者出队操作;这也就意味着生产者和消费者无法并行操作,在高并发场景下会成为性能瓶颈。

ConcurrentLinkedQueue

java8中似乎也是使用CAs机制实现,反正我没看lock锁

其他好像也没啥说的,算了

二义性

一般多线程都不允许设置为null的元素,比如ConcurrentLinkedQueue不允许添加null,ConcurrentHashMap似乎key和value都不允许为null

假定ConcurrentHashMap也可以存放value为null的值。那不管是HashMap还是ConcurrentHashMap调用map.get(key)的时候,如果返回了null,那么这个null都有两重含义:

  • 1.这个key从来没有在map中映射过。
  • 2.这个key的value在设置的时候,就是null。

至于为啥不支持key为null,这个待定

Future和CompletableFuture

Future使用

上文中介绍Callable和线程池的时候使用了Future获取线程任务的执行结果

**Future就是对于具体的Runnable或者Callable任务的执行结果进行取消、查询是否完成、获取结果。**必要时可以通过get方法获取执行结果,该方法会阻塞直到任务返回结果。

  • boolean cancel (boolean mayInterruptIfRunning) 取消任务的执行。参数指定是否立即中断任务执行,或者等等任务结束
  • boolean isCancelled () 任务是否已经取消,任务正常完成前将其取消,则返回 true
  • boolean isDone () 任务是否已经完成。需要注意的是如果任务正常终止、异常或取消,都将返回true
  • V get () throws InterruptedException, ExecutionException 等待任务执行结束,然后获得V类型的结果。InterruptedException 线程被中断异常, ExecutionException任务执行异常,如果任务被取消,还会抛出CancellationException
  • V get (long timeout, TimeUnit unit) throws InterruptedException, ExecutionException, TimeoutException 同上面的get功能一样,多了设置超时时间。参数timeout指定超时时间,uint指定时间的单位,在枚举类TimeUnit中有相关的定义。如果计算超时,将抛出TimeoutException

CompletionService

Callable+Future 可以实现多个task并行执行,但是如果遇到前面的task执行较慢时需要阻塞等待前面的task执行完后面task才能取得结果。而CompletionService的主要功能就是一边生成任务,一边获取任务的返回值。让两件事分开执行,任务之间不会互相阻塞,可以实现先执行完的先取结果,不再依赖任务顺序了。

CompletionService原理

内部通过阻塞队列+FutureTask,实现了任务先完成可优先获取到,即结果按照完成先后顺序排序,内部有一个先进先出的阻塞队列,用于保存已经执行完成的Future,通过调用它的take方法或poll方法可以获取到一个已经执行完成的Future,进而通过调用Future接口实现类的get方法获取最终的结果

使用示例

//创建线程池
ExecutorService executor = Executors.newFixedThreadPool(10);
//创建CompletionService
CompletionService<Integer> cs = new ExecutorCompletionService<>(executor);
//异步向电商S1询价
cs.submit(() -> getPriceByS1());
//异步向电商S2询价
cs.submit(() -> getPriceByS2());
//异步向电商S3询价
cs.submit(() -> getPriceByS3());
//将询价结果异步保存到数据库
for (int i = 0; i < 3; i++) {
    Integer r = cs.take().get();
    executor.execute(() -> save(r));
    
    }

CompletableFuture

简单的任务,用Future获取结果还好,但我们并行提交的多个异步任务,往往并不是独立的,很多时候业务逻辑处理存在串行[依赖]、并行、聚合的关系。如果要我们手动用 Fueture 实现,是非常麻烦的。

CompletableFuture是Future接口的扩展和增强。CompletableFuture实现了Future接口,并在此基础上进行了丰富地扩展,完美地弥补了Future上述的种种问题。更为重要的是,CompletableFuture实现了对任务的编排能力。借助这项能力,我们可以轻松地组织不同任务的运行顺序、规则以及方式。从某种程度上说,这项能力是它的核心能力。而在以往,虽然通过CountDownLatch等工具类也可以实现任务的编排,但需要复杂的逻辑处理,不仅耗费精力且难以维护。

相关方法

描述依赖关系:

  1. thenApply() 把前面异步任务的结果,交给后面的Function
  2. thenCompose()用来连接两个有依赖关系的任务,结果由第二个任务返回

描述and聚合关系:

  1. thenCombine:任务合并,有返回值
  2. thenAccepetBoth:两个任务执行完成后,将结果交给thenAccepetBoth消耗,无返回值。
  3. runAfterBoth:两个任务都执行完成后,执行下一步操作(Runnable)。

描述or聚合关系:

  1. applyToEither:两个任务谁执行的快,就使用那一个结果,有返回值。
  2. acceptEither: 两个任务谁执行的快,就消耗那一个结果,无返回值。
  3. runAfterEither: 任意一个任务执行完成,进行下一步操作(Runnable)。

并行执行:

CompletableFuture类自己也提供了anyOf()和allOf()用于支持多个CompletableFuture并行执行

创建异步操作

CompletableFuture 提供了四个静态方法来创建一个异步操作:

public static CompletableFuture<Void> runAsync(Runnable runnable)
public static CompletableFuture<Void> runAsync(Runnable runnable, Executor executor)
public static <U> CompletableFuture<U> supplyAsync(Supplier<U> supplier)
public static <U> CompletableFuture<U> supplyAsync(Supplier<U> supplier, Executor executor)

这四个方法区别在于:

  • runAsync 方法以Runnable函数式接口类型为参数,没有返回结果,supplyAsync 方法Supplier函数式接口类型为参数,返回结果类型为U;Supplier 接口的 get() 方法是有返回值的(会阻塞
  • 没有指定Executor的方法会使用ForkJoinPool.commonPool() 作为它的线程池执行异步代码。如果指定线程池,则使用指定的线程池运行。
  • 默认情况下 CompletableFuture 会使用公共的 ForkJoinPool 线程池,这个线程池默认创建的线程数是 CPU 的核数(也可以通过 JVM option:-Djava.util.concurrent.ForkJoinPool.common.parallelism 来设置 ForkJoinPool 线程池的线程数)。如果所有 CompletableFuture 共享一个线程池,那么一旦有任务执行一些很慢的 I/O 操作,就会导致线程池中所有线程都阻塞在 I/O 操作上,从而造成线程饥饿,进而影响整个系统的性能。所以,强烈建议你要根据不同的业务类型创建不同的线程池,以避免互相干扰

runAsync&supplyAsync

    public static void main(String[] args) throws ExecutionException, InterruptedException {

        Runnable runnable = () -> System.out.println("执行无返回结果的异步任务");
        CompletableFuture.runAsync(runnable);

        CompletableFuture<String> future = CompletableFuture.supplyAsync(() -> {
            System.out.println("执行有返回值的异步任务");
            try {
                Thread.sleep(5000);
            } catch (InterruptedException e) {
                e.printStackTrace();
            }
            return "Hello World";
        });
        // 阻塞等待返回
        String result = future.get();
        System.out.println(result);
    }

获取结果

join&get

join()和get()方法都是用来获取CompletableFuture异步之后的返回值。join()方法抛出的是uncheck异常(即未经检查的异常),不会强制开发者抛出。get()方法抛出的是经过检查的异常,ExecutionException, InterruptedException 需要用户手动处理(抛出或者 try catch)

结果处理

当CompletableFuture的计算结果完成,或者抛出异常的时候,我们可以执行特定的 Action。主要是下面的方法:

public CompletableFuture<T> whenComplete(BiConsumer<? super T,? super Throwable> action)
public CompletableFuture<T> whenCompleteAsync(BiConsumer<? super T,? super Throwable> action)
public CompletableFuture<T> whenCompleteAsync(BiConsumer<? super T,? super Throwable> action, Executor executor)
  • Action的类型是BiConsumer,它可以处理正常的计算结果,或者异常情况。
  • 方法不以Async结尾,意味着Action使用相同的线程执行,而Async可能会使用其它的线程去执行(如果使用相同的线程池,也可能会被同一个线程选中执行)。
  • 这几个方法都会返回CompletableFuture,当Action执行完毕后它的结果返回原始的CompletableFuture的计算结果或者返回异常
public class FutureTaskTest2 {
    public static void main(String[] args) throws ExecutionException, InterruptedException {

        CompletableFuture<String> future = CompletableFuture.supplyAsync(() -> {
            try {
                TimeUnit.SECONDS.sleep(1);
            } catch (InterruptedException e) {
                e.printStackTrace();
            }
            // 测试异常情况,正常情况测试注释这一行
            int i = 12 / 0;
            System.out.println("执行结束!");
            return "test";
        });
        // 获取任务的执行劫夺,这里应该可以对结果进行修改
        future.whenComplete(new BiConsumer<String, Throwable>() {
            @Override
            public void accept(String t, Throwable action) {
                if(action != null){
                    System.out.println("发生异常");
                }else {
                    System.out.println(t + " 执行完成!");
                }

            }
        });

        // 任务抛出异常后处理
        future.exceptionally(new Function<Throwable, String>() {
            @Override
            public String apply(Throwable t) {
                System.out.println("执行失败:" + t.getMessage());
                return "异常xxxx";
            }
        });
        // 获取结果必须写在结果处理和异常处理的后面,否则结果处理和异常处理不会执行
        System.out.println(future.get());
    }
}

结果转换

所谓结果转换,就是将上一段任务的执行结果作为下一阶段任务的入参参与重新计算,产生新的结果。

thenApply

thenApply 接收一个函数作为参数,使用该函数处理上一个CompletableFuture 调用的结果,并返回一个具有处理结果的Future对象。

public class FutureTaskTest3 {
    public static void main(String[] args) throws ExecutionException, InterruptedException {

        CompletableFuture<Integer> future = CompletableFuture.supplyAsync(() -> {
            int result = 100;
            System.out.println("一阶段:" + result);
            return result;
        }).thenApply(number -> {
            int result = number * 3;
            System.out.println("二阶段:" + result);
            return result;
        });
        // 阻塞等待返回,获取的是二阶段的返回结果
        Integer result = future.get();
        System.out.println(result);
    }
}

thenCompose

thenCompose 的参数为一个返回 CompletableFuture 实例的函数,该函数的参数是先前计算步骤的结果。

public <U> CompletableFuture<U> thenCompose(Function<? super T, ? extends CompletionStage<U>> fn);
public <U> CompletableFuture<U> thenComposeAsync(Function<? super T, ? extends CompletionStage<U>> fn) ;
public class FutureTaskTest4 {
    public static void main(String[] args) throws ExecutionException, InterruptedException {
        CompletableFuture<Integer> future = CompletableFuture
                .supplyAsync(new Supplier<Integer>() {
                    @Override
                    public Integer get() {
                        int number = new Random().nextInt(30);
                        System.out.println("第一阶段:" + number);
                        return number;
                    }
                })
                .thenCompose(new Function<Integer, CompletionStage<Integer>>() {
                    @Override
                    public CompletionStage<Integer> apply(Integer param) {
                        return CompletableFuture.supplyAsync(new Supplier<Integer>() {
                            @Override
                            public Integer get() {
                                int number = param * 2;
                                System.out.println("第二阶段:" + number);
                                return number;
                            }
                        });
                    }
                });
        System.out.println(future.get());
    }
}
thenApply 和 thenCompose的区别
  • thenApply 转换的是泛型中的类型,返回的是同一个CompletableFuture;
  • thenCompose 将内部的 CompletableFuture 调用展开来并使用上一个CompletableFutre 调用的结果在下一步的 CompletableFuture 调用中进行运算,是生成一个新的CompletableFuture。

结果消费

与结果处理和结果转换系列函数返回一个新的 CompletableFuture 不同,结果消费系列函数只对结果执行Action,而不返回新的计算值。

根据对结果的处理方式,结果消费函数又分为:

  • thenAccept系列:对单个结果进行消费
  • thenAcceptBoth系列:对两个结果进行消费
  • thenRun系列:不关心结果,只对结果执行Action

thenAccept

通过观察该系列函数的参数类型可知,它们是函数式接口Consumer,这个接口只有输入,没有返回值。

CompletableFuture<Void> future = CompletableFuture
        .supplyAsync(() -> {
            int number = new Random().nextInt(10);
            System.out.println("第一阶段:" + number);
            return number;
        }).thenAccept(number ->
                System.out.println("第二阶段:" + number * 5));

thenAcceptBoth

thenAcceptBoth 函数的作用是,当两个 CompletionStage 都正常完成计算的时候,就会执行提供的action消费两个异步的结果

CompletableFuture<Integer> futrue1 = CompletableFuture.supplyAsync(new Supplier<Integer>() {
    @Override
    public Integer get() {
        int number = new Random().nextInt(3) + 1;
        try {
            TimeUnit.SECONDS.sleep(number);
        } catch (InterruptedException e) {
            e.printStackTrace();
        }
        System.out.println("第一阶段:" + number);
        return number;
    }
});

CompletableFuture<Integer> future2 = CompletableFuture.supplyAsync(new Supplier<Integer>() {
    @Override
    public Integer get() {
        int number = new Random().nextInt(3) + 1;
        try {
            TimeUnit.SECONDS.sleep(number);
        } catch (InterruptedException e) {
            e.printStackTrace();
        }
        System.out.println("第二阶段:" + number);
        return number;
    }
});

futrue1.thenAcceptBoth(future2, new BiConsumer<Integer, Integer>() {
    @Override
    public void accept(Integer x, Integer y) {
        System.out.println("最终结果:" + (x + y));
    }
}

thenRun

thenRun 也是对线程任务结果的一种消费函数,与thenAccept不同的是,thenRun 会在上一阶段 CompletableFuture 计算完成的时候执行一个Runnable,Runnable并不使用该 CompletableFuture 计算的结果。

public CompletionStage<Void> thenRun(Runnable action);
public CompletionStage<Void> thenRunAsync(Runnable action);
CompletableFuture<Void> future = CompletableFuture.supplyAsync(() -> {
    int number = new Random().nextInt(10);
    System.out.println("第一阶段:" + number);
    return number;
}).thenRun(() ->
        System.out.println("thenRun 执行"));

结果组合

thenCombine

thenCombine 方法,合并两个线程任务的结果,并进一步处理。

CompletableFuture<Integer> future1 = CompletableFuture
        .supplyAsync(new Supplier<Integer>() {
            @Override
            public Integer get() {
                int number = new Random().nextInt(10);
                System.out.println("第一阶段:" + number);
                return number;
            }
        });
CompletableFuture<Integer> future2 = CompletableFuture
        .supplyAsync(new Supplier<Integer>() {
            @Override
            public Integer get() {
                int number = new Random().nextInt(10);
                System.out.println("第二阶段:" + number);
                return number;
            }
        });
CompletableFuture<Integer> result = future1
        .thenCombine(future2, new BiFunction<Integer, Integer, Integer>() {
            @Override
            public Integer apply(Integer x, Integer y) {
                return x + y;
            }
        });

任务交互

所谓线程交互,是指将两个线程任务获取结果的速度相比较,按一定的规则进行下一步处理。

applyToEither

两个线程任务相比较,先获得执行结果的,就对该结果进行下一步的转化操作。

.....

future1.applyToEither(future2, new Function<Integer, Integer>() {
    @Override
    public Integer apply(Integer number) {
        System.out.println("最快结果:" + number);
        return number * 2;
    }

acceptEither

两个线程任务相比较,先获得执行结果的,就对该结果进行下一步的消费操作。

future1.acceptEither(future2, new Consumer<Integer>() {
    @Override
    public void accept(Integer number) {
        System.out.println("最快结果:" + number);
    }

runAfterEither

两个线程任务相比较,有任何一个执行完成,就进行下一步操作,不关心运行结果。

future1.runAfterEither(future2, new Runnable() {
    @Override
    public void run() {
        System.out.println("已经有一个任务完成了");
    }
}).join();

runAfterBoth

两个线程任务相比较,两个全部执行完成,才进行下一步操作,不关心运行结果。

anyOf

anyOf 方法的参数是多个给定的 CompletableFuture,当其中的任何一个完成时,方法返回这个 CompletableFuture。

allOf

allOf方法用来实现多 CompletableFuture 的同时返回。

注意事项

需要使用自定义线程池:https://www.cnblogs.com/blackmlik/p/16098938.html

CompletableFuture常用方法总结

使用案例:实现最优的“烧水泡茶”程序

著名数学家华罗庚先生在《统筹方法》这篇文章里介绍了一个烧水泡茶的例子,文中提到最优的工序应该是下面这样:

对于烧水泡茶这个程序,一种最优的分工方案:用两个线程 T1 和 T2 来完成烧水泡茶程序,T1 负责洗水壶、烧开水、泡茶这三道工序,T2 负责洗茶壶、洗茶杯、拿茶叶三道工序,其中 T1 在执行泡茶这道工序时需要等待 T2 完成拿茶叶的工序。

public class CompletableFutureDemo2 {

    public static void main(String[] args) {

        //任务1:洗水壶->烧开水
        CompletableFuture<Void> f1 = CompletableFuture
                .runAsync(() -> {
                    System.out.println("T1:洗水壶...");
                    sleep(1, TimeUnit.SECONDS);

                    System.out.println("T1:烧开水...");
                    sleep(15, TimeUnit.SECONDS);
                });
        //任务2:洗茶壶->洗茶杯->拿茶叶
        CompletableFuture<String> f2 = CompletableFuture
                .supplyAsync(() -> {
                    System.out.println("T2:洗茶壶...");
                    sleep(1, TimeUnit.SECONDS);

                    System.out.println("T2:洗茶杯...");
                    sleep(2, TimeUnit.SECONDS);

                    System.out.println("T2:拿茶叶...");
                    sleep(1, TimeUnit.SECONDS);
                    return "龙井";
                });
        //任务3:任务1和任务2完成后执行:泡茶
        CompletableFuture<String> f3 = f1.thenCombine(f2, (__, tf) -> {
            System.out.println("T1:拿到茶叶:" + tf);
            System.out.println("T1:泡茶...");
            return "上茶:" + tf;
        });
        //等待任务3执行结果
        System.out.println(f3.join());
    }

    static void sleep(int t, TimeUnit u) {
        try {
            u.sleep(t);
        } catch (InterruptedException e) {
        }
    }
}

待补充

Disruptor

ForkJoinPool和ForkJoinTask

-,待定

分布式锁

什么是分布式锁?

分布式锁其实就是,控制分布式系统不同进程共同访问共享资源的一种锁的实现。如果不同的系统或同一个系统的不同主机之间共享了某个临界资源,往往需要互斥来防止彼此干扰,以保证一致性。

比如定时任务执行,如果部署了多个实例,需要保证只有一台机器在运行这个定时任务

实现方案:

你可使用redis自行实现

开源框架~Redisson 分布式锁的相关实现:https://github.com/redisson/redisson/wiki/8.-%E5%88%86%E5%B8%83%E5%BC%8F%E9%94%81%E5%92%8C%E5%90%8C%E6%AD%A5%E5%99%A8

关联信息

  • 关联的主题:
  • 上一篇:
  • 下一篇:
  • image: 20221021/1
  • 转载自:

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.coloradmin.cn/o/66898.html

如若内容造成侵权/违法违规/事实不符,请联系多彩编程网进行投诉反馈,一经查实,立即删除!

相关文章

ADSP-21489的图形化编程详解(4:左右声道音量调节和多通道的输入输出详解)

左右声道音量调节 在直通的前提下&#xff0c;我们加入一个调音量的算法模块&#xff0c;来实现调节输出音量大小的功能。首先拖出来一个音量调节算法模块&#xff1a; 我们这里都是双通道&#xff0c;所以需要对这个音量调节模块进行配置&#xff1a; 连好程序&#xff0c;下…

acwing基础课——Dijkstra

由数据范围反推算法复杂度以及算法内容 - AcWing 常用代码模板3——搜索与图论 - AcWing 基本思想&#xff1a; 迪杰斯特拉&#xff08;dijkstra&#xff09;算法是单源最短路径问题的求解方法,它是一个按路径长度递增的次序产生最短路径的算法。单源最短路径就在给出一个固定…

Sqoop数据导出第2关:HDFS数据导出至Mysql内

为了完成本关任务,你需要掌握: 1、数据库( MySQL )建表。 2、HDFS 数据导出至 MySQL 中。 数据库( MySQL )建表 因为这边 Sqoop 不能够帮关系型数据库创建表,所以需要我们自己来创建表。 用命令进入 MySQL 客户端。 mysql -uroot -p123123 -h127.0.0.1 创建数据库…

备忘录模式(Memento)

参考&#xff1a; [备忘录设计模式 (refactoringguru.cn)](https://refactoringguru.cn/design-patterns/mediator) 文章目录一、什么是备忘录模式&#xff1f;二、实现三、优缺点优点缺点四、适用环境一、什么是备忘录模式&#xff1f; 在软件构建过程中&#xff0c;某些对象…

3.ORM实践

文章目录3.1 介绍Spring Data JPAJPA&#xff08;Java Persistence API&#xff09;标准HibernateSpring DataSpring Data JPA引入依赖3.2 定义JPA的实体对象常用JPA注解实体主键映射关系常用lombok注解3.3 SpringBucks线上咖啡馆实战项目&#xff08;1&#xff09;项目目标&am…

如何利用Java爬取网站数据?

1.Jsoup介绍 - 官网文档&#xff1a;https://jsoup.org - Jsoup 是一款Java 的HTML解析器&#xff0c;可直接解析某个URL地址、HTML文本内容。它提供了一套非常省力的API&#xff0c;可通过DOM&#xff0c;CSS以及类似于jQuery的操作方法来取出和操作数据。 2. Jsoup快速入门…

TCP--三次握手和四次挥手

原文网址&#xff1a;TCP--三次握手和四次挥手_IT利刃出鞘的博客-CSDN博客 简介 本文介绍TCP的三次握手和四次挥手。即&#xff1a;TCP建立连接和断开连接的过程。 三次握手 流程图 主机 A为客户端&#xff0c;主机B为服务端。 第一次握手 A 发送同步报文段&#xff08;SYN…

小程序初始创建

1. 注册小程序账号 官网&#xff1a; https://mp.weixin.qq.com/wxopen/waregister?actionstep1 2. 下载小程序 官网&#xff1a; https://developers.weixin.qq.com/miniprogram/dev/devtools/download.html 百度网盘&#xff08;非最新版&#xff09; https://pan.baidu…

部署SNMP使网管与设备通信,配置关于TCP测试NQA的配置案例

一、部署SNMP 组网需求&#xff1a;通过部署RouterA由NMS网管设备管理用于监控网络是否畅通和业务是否正常。为了保证NMS和RouterA之间有一个数据传输安全、接入方式灵活、链路传输可靠的网络&#xff0c;并且可以实时监控设备的运行情况&#xff0c;网络中的RouterA通过网管实…

Databend 开源周报 #70

Databend 是一款强大的云数仓。专为弹性和高效设计&#xff0c;自由且开源。 即刻体验云服务&#xff1a;https://app.databend.com。 What’s New 探索 Databend 本周新进展&#xff0c;遇到更贴近你心意的 Databend 。 Features & Improvements Format 更好地检查格…

PCB 二:AD 原理图绘制以及PCB绘制

PCB 二&#xff1a;AD 原理图绘制以及PCB绘制前言(一)资料总结(二)PCB前言 本文简单收集了AD软件在绘制PCB电路板的一些资料&#xff0c;还有遇到的一些问题&#xff0c;并记录一些常用的操作。 (一)资料总结 1【AD】Altium Designer 原理图的绘制 2【AD】Altium Designer P…

飞利浦CT的AI重建技术

原文&#xff1a;AI for significantly lower dose and improved image quality 飞利浦医疗CT的深度学习重建技术。 人工智能可显着降低剂量并提高图像质量概述背景飞利浦CT智能工作流Precise Image 如何训练神经网络深入了解深度学习训练神经网络验证神经网络推断法可以实现快…

Android 11及以上授予文件管理权限

背景 安卓11改变了此前安卓系统对于文件管理的规则,在安卓11上,文件读写变成了特殊权限。应用默认只能读写自己的目录/android/data/包名 gradle配置 Android11系统对应用写入权限做了严格的限制。本文介绍如何获取文件读写权限。 项目中 build.gradle 的targetSdkVersion …

YOLOv5 模型结构及代码详细讲解(一)

王旭*&#xff0c;沈啸彬 *, 张钊* (淮北师范大学计算机科学与技术学院&#xff0c;淮北师范大学经济与管理学院&#xff0c;安徽 淮北) *These authors contributed to the work equllly and should be regarded as co-first authors. &#x1f31e;欢迎来到深度学习的世界 …

window本地编译Spring源码并运行

1. Gradle 下载地址 https://services.gradle.org/distributions/ 2. Jdk下载地址&#xff1a; https://www.oracle.com/java/technologies/downloads/#java11-windows 3. Spring源码下载地址 https://github.com/spring-projects/spring-framework/tags 4. 注意事项 以…

微服务守护神-Sentinel-流控规则

引言 书接上篇 微服务守护神-Sentinel-概念 &#xff0c;上面介绍了Sentinel相关概念&#xff0c;本篇就来看下Sentinel的流控规则。 流控规则 流量控制&#xff0c;其原理是监控应用流量的QPS(每秒查询率) 或并发线程数等指标&#xff0c;当达到指定的阈值时 对流量进行控…

OSG开发-使用VisualStudio2019创建CMake项目方式开发HelloOSG

本文保证你使用VS2019&#xff0c;可以把这个OSG程序开发出来&#xff0c;看到那个蓝色的带有纹理的地球。 大概步骤如下&#xff1a; 下载已经编译好的OSG的库和头文件新建一个CMake项目编辑make.txt编写main.cpp运行。下载已经编译好的OSG的库和头文件 由于我们需要用OSG的…

blender 常用修改器

文章目录前置.阵列.倒角.布尔.精简.拆边.镜像.螺旋.实体化表面细分.三角化.体积到网格.焊接修改器.蒙皮.线框.铸型.曲线修改器.置换修改器.晶格修改器.缩裹修改器.简易修改.表面形变修改 .多级精度修改器.前置. 注意&#xff0c;修改器未应用前&#xff0c;只能操作原物体 阵…

自动驾驶:2022 apollo day 观后感(三)

自动驾驶&#xff1a;2022 apollo day 观后感&#xff08;三&#xff09;TOPIC THREE&#xff1a; 文心大模型在自动驾驶感知中的落地应用&#xff08;王井东&#xff09;多传感器融合autolabeling的发展&#xff1a;大模型数据闭环大模型&#xff0c;已经成为自动驾驶能力提升…

差动驱动机器人轨迹-CoCube

轨迹博客&#xff1a; 玫瑰线轨迹如何规划&#xff1f;&#xff08;desmosROS2turtlesim……&#xff09; ROS1云课→23turtlesim绘制小结&#xff08;数学和编程&#xff09; 如上所涉及的机器人假定模型都是差动驱动机器人。 许多移动机器人使用一种称为差动驱动的驱动机构…