八、异步编程

news2025/1/10 17:45:09

文章目录

  • 异步编程
  • FutureTask应用&源码分析
    • FutureTask介绍
    • FutureTask应用
    • FutureTask源码分析
      • FutureTask中的核心属性
      • FutureTask的run方法
      • FutureTask的set&setException方法
      • FutureTask的cancel方法
      • FutureTask的get方法
      • FutureTask的finishCompletion方法
  • CompletableFuture应用&源码分析
    • CompletableFuture介绍
    • CompletableFuture应用
      • supplyAsync
      • runAsync
      • thenApply、thenApplyAsync
      • thenAccept、thenAcceptAsync
      • thenRun、thenRunAsync
      • thenCombine、thenAcceptBoth、runAfterBoth
      • applyToEither、acceptEither、runAfterEither
      • exceptionally、henCompose、handle
      • allOf,anyOf
    • CompletableFuture源码分析
      • 当前任务执行方式
      • 任务编排的存储&执行方式
      • 任务编排流程
      • 查看后置任务执行时机
    • CompletableFuture执行流程图

异步编程

FutureTask应用&源码分析

FutureTask介绍

FutureTask是一个可以取消异步任务的类。FutureTask对Future做的一个基本实现。可以调用方法区开始和取消一个任务。
一般是配合Callable去使用。
异步任务启动之后,可以获取一个绑定当前异步任务的FutureTask。
可以基于FutureTask的方法去取消任务,查看任务是否结果,以及获取任务的返回结果。
FutureTask内部的整体结构中,实现了RunnableFuture的接口,这个接口又继承了Runnable, Future这个两个接口。所以FutureTask也可以作为任务直接交给线程池去处理。

FutureTask应用

大方向是FutureTask对任务的控制:

  • 任务执行过程中状态的控制
  • 任务执行完毕后,返回结果的获取
    FutureTask的任务在执行run方法后,是无法被再次运行,需要使用runAndReset方法才可以。
    public static void main(String[] args) throws InterruptedException {
        FutureTask<String> futureTask = new FutureTask<>(() -> {
            System.out.println("任务开始执行......");
            Thread.sleep(2000);
            System.out.println("任务执行完毕......");
            return "OK!";
        });
        // 构建线程池
        ExecutorService threadPool = Executors.newFixedThreadPool(10);

        // 构建线程池
        threadPool.execute(futureTask);

        // futureTask提供了run方法,一般不会自己去调用run方法,让线程池去执行任务,由线程池去执行run方法
        // run方法在执行时,是有任务状态的。任务已经执行了,再次调用run方法无效的。
        // 如果希望任务可以反复被执行,需要去调用runAndReset方法
        futureTask.run();

        // 对返回结果的获取,类似阻塞队列的poll方法
        // 如果在指定时间内,没有拿到方法的返回结果,直接扔TimeoutException
//        try {
//            String s = futureTask.get(3000, TimeUnit.MICROSECONDS);
//            System.out.println("返回结果:" + s);
//        } catch (Exception e) {
//            System.out.println("异常返回:" + e.getMessage());
//            e.printStackTrace();
//        }

        // 对返回结果的获取,类似阻塞队列的take方法,死等结果
//        try {
//            String s = futureTask.get();
//            System.out.println("任务结果:" + s);
//        } catch (InterruptedException e) {
//            e.printStackTrace();
//        } catch (ExecutionException e) {
//            e.printStackTrace();
//        }

        // 对任务状态的控制
        System.out.println("任务结束了么?:" + futureTask.isDone());
        Thread.sleep(1000);
        System.out.println("任务结束了么?:" + futureTask.isDone());
        Thread.sleep(1000);
        System.out.println("任务结束了么?:" + futureTask.isDone());
    }

FutureTask源码分析

看FutureTask的源码,要从几个方向去看:

  • 先查看FutureTask中提供的一些状态
  • 再查看任务的执行过程

FutureTask中的核心属性

清楚任务的流转流转状态是怎样的,其次对于核心属性要追到是干嘛的。

/**
FutureTask的核心属性
FutureTask任务的状态流转
* NEW -> COMPLETING -> NORMAL 任务正常执行,并且返回结果也正常返回 
* NEW -> COMPLETING -> EXCEPTIONAL 任务正常执行,但是结果是异常 
* NEW -> CANCELLED 任务被取消
* NEW -> INTERRUPTING -> INTERRUPTED
*/
// 记录任务的状态
private volatile int state;
// 任务被构建之后的初始状态
private static final int NEW          = 0;
private static final int COMPLETING   = 1;
private static final int NORMAL       = 2;
private static final int EXCEPTIONAL  = 3;
private static final int CANCELLED    = 4;
private static final int INTERRUPTING = 5;
private static final int INTERRUPTED  = 6;

/** 需要执行任务,会被赋值到这个属性 */
private Callable<V> callable;
/** 任务的任务结果要存储在这几个属性中 */
private Object outcome;
/** 执行任务的线程 */
private volatile Thread runner;
/** 等待返回结果的线程Node对象, */
private volatile WaitNode waiters;

static final class WaitNode {
    volatile Thread thread;
    volatile WaitNode next;
    WaitNode() { thread = Thread.currentThread(); }
}

FutureTask的run方法

任务执行前的一些判断,以及调用任务封装结果的方式,还有最后的一些后续处理。

// 当线程池执行FutureTask任务时,会调用的方法
public void run() {
	// 如果当前任务状态不是NEW,直接return告辞
    if (state != NEW ||
        // 如果状态正确是NEW,这边需要基于CAS将runner属性设置为当前线程
		// 如果CAS失败,直接return告辞
        !UNSAFE.compareAndSwapObject(this, runnerOffset,
                                     null, Thread.currentThread()))
        return;
    try {
    	// 将要执行的任务拿到
        Callable<V> c = callable;
        // 健壮性判断,保证任务不是null
        // 再次判断任务的状态是NEW(DCL)
        if (c != null && state == NEW) {
        	// 执行任务
			// result:任务的返回结果
			// ran:如果为true,任务正常结束。 如果为false,任务异常结束。
            V result;
            boolean ran;
            try {
                // 执行任务
                result = c.call();
                // 正常结果,ran设置为true
                ran = true;
            } catch (Throwable ex) {
            	// 如果任务执行期间出了异常 
            	// 返回结果置位null
                result = null;
                // ran设置为false
                ran = false;
                // 封装异常结果
                setException(ex);
            }
            if (ran)
                // 封装正常结果
                set(result);
        }
    } finally {
        // 将执行任务的线程置位null
        runner = null;
        // 拿到任务的状态
        int s = state;
        // 如果状态大于等于INTERRUPTING
        if (s >= INTERRUPTING)
            // 进来代表任务中断,做一些后续处理
            handlePossibleCancellationInterrupt(s);
    }
}

FutureTask的set&setException方法

任务执行完毕后,修改任务的状态以及封装任务的结果。

// 没有异常的时候,正常返回结果
protected void set(V v) {
    // 因为任务执行完毕,需要将任务的状态从NEW,修改为COMPLETING
    if (UNSAFE.compareAndSwapInt(this, stateOffset, NEW, COMPLETING)) {
        // 将返回结果赋值给 outcome 属性
        outcome = v;
        // 将任务状态变为NORMAL,正常结束
        UNSAFE.putOrderedInt(this, stateOffset, NORMAL); // final state
        finishCompletion();
    }
}

// 任务执行期间出现了异常,这边要封装结果
protected void setException(Throwable t) {
	// 因为任务执行完毕,需要将任务的状态从NEW,修改为COMPLETING
    if (UNSAFE.compareAndSwapInt(this, stateOffset, NEW, COMPLETING)) {
        // 将异常信息封装到 outcome 属性
        outcome = t;
        // 将任务状态变为EXCEPTIONAL,异常结束
        UNSAFE.putOrderedInt(this, stateOffset, EXCEPTIONAL); // final state
        finishCompletion();
    }
}

FutureTask的cancel方法

任务取消的一个方式

  • 任务直接从NEW状态转换为CANCEL
  • 任务从NEW状态变成INTERRUPTING,然后再转换为INTERRUPTED
// 取消任务操作
public boolean cancel(boolean mayInterruptIfRunning) {
    // 查看任务的状态是否是NEW,如果NEW状态,就基于传入的参数mayInterruptIfRunning 
    // 决定任务是直接从NEW转换为CANCEL,还是从NEW转换为INTERRUPTING
    if (!(state == NEW &&
          UNSAFE.compareAndSwapInt(this, stateOffset, NEW,
              mayInterruptIfRunning ? INTERRUPTING : CANCELLED)))
        return false;
    try {    // in case call to interrupt throws exception
        // 如果mayInterruptIfRunning为true 
        // 就需要中断线程
        if (mayInterruptIfRunning) {
            try {
                // 拿到任务线程
                Thread t = runner;
                if (t != null)
                    // 如果线程不为null,直接interrupt
                    t.interrupt();
            } finally { // final state
                // 将任务状态设置为INTERRUPTED
                UNSAFE.putOrderedInt(this, stateOffset, INTERRUPTED);
            }
        }
    } finally {
        // 任务结束后的一些处理~~ 一会看~~
        finishCompletion();
    }
    return true;
}

FutureTask的get方法

这个是线程获取FutureTask任务执行结果的方法

// 拿任务结果
public V get() throws InterruptedException, ExecutionException {
    // 获取任务的状态
    int s = state;
    // 要么是NEW,任务还没执行完
	// 要么COMPLETING,任务执行完了,结果还没封装好。
    if (s <= COMPLETING)
        // 让当前线程阻塞,等待结果
        s = awaitDone(false, 0L);
    // 最终想要获取结果,需要执行report方法
    return report(s);
}

// 线程等待FutureTask结果的过程
private int awaitDone(boolean timed, long nanos)
    throws InterruptedException {
    // 针对get方法传入了等待时长时,需要计算等到什么时间点
    final long deadline = timed ? System.nanoTime() + nanos : 0L;
    // 声明好需要的Node,queued:放到链表中了么?
    WaitNode q = null;
    boolean queued = false;
    for (;;) {
        // 查看线程是否中断,如果中断,从等待链表中移除,甩个异常
        if (Thread.interrupted()) {
            removeWaiter(q);
            throw new InterruptedException();
        }

        // 拿到状态
        int s = state;
        // 到这,说明任务结束了。
        if (s > COMPLETING) {
            if (q != null)
                // 如果之前封装了WaitNode,现在要清空
                q.thread = null;
            return s;
        }
        // 如果任务状态是COMPLETING,这就不需要去阻塞线程,让步一下,等待一小会,结果就有了
        else if (s == COMPLETING) // cannot time out yet
            Thread.yield();
        // 如果还没初始化WaitNode,初始化
        else if (q == null)
            q = new WaitNode();
        // 没放队列的话,直接放到waiters的前面
        else if (!queued)
            queued = UNSAFE.compareAndSwapObject(this, waitersOffset,
                                                 q.next = waiters, q);
        // 准备挂起线程,如果timed为true,挂起一段时间
        else if (timed) {
            // 计算出最多可以等待多久
            nanos = deadline - System.nanoTime();
            // 如果等待的时间没了
            if (nanos <= 0L) {
                // 移除当前的Node,返回任务状态
                removeWaiter(q);
                return state;
            }
            // 等一会
            LockSupport.parkNanos(this, nanos);
        }
        else
            // 死等
            LockSupport.park(this);
    }
}

// get的线程已经可以阻塞结束了,基于状态查看能否拿到返回结果
private V report(int s) throws ExecutionException {
    // 拿到outcome 返回结果
    Object x = outcome;
    // 如果任务状态是NORMAL,任务正常结束,返回结果
    if (s == NORMAL)
        return (V)x;
    // 如果任务状态大于等于取消
    if (s >= CANCELLED)
        // 直接抛出异常
        throw new CancellationException();
    // 到这就是异常结束
    throw new ExecutionException((Throwable)x);
}

FutureTask的finishCompletion方法

只要任务结束了,无论是正常返回,异常返回,还是任务被取消都会执行这个方法。
而这个方法其实就是唤醒那些执行get方法等待任务结果的线程。

// 任务结束后触发
private void finishCompletion() {
    // assert state > COMPLETING;
    // 在任务结束后,需要唤醒
    for (WaitNode q; (q = waiters) != null;) {
        // 第一步直接以CAS的方式将WaitNode置为null
        if (UNSAFE.compareAndSwapObject(this, waitersOffset, q, null)) {
            for (;;) {
                // 拿到了Node中的线程
                Thread t = q.thread;
                // 如果线程不为null
                if (t != null) {
                    // 第一步先置位null
                    q.thread = null;
                    // 直接唤醒这个线程
                    LockSupport.unpark(t);
                }
                // 拿到当前Node的next
                WaitNode next = q.next;
                // next为null,代表已经将全部节点唤醒了吗,跳出循环
                if (next == null)
                    break;
                // 将next置位null
                q.next = null; // unlink to help gc
                // q的引用指向next
                q = next;
            }
            break;
        }
    }
    // 任务结束后,可以基于这个扩展方法,记录一些信息
    done();
    // 任务执行完,把callable具体任务置位null
	callable = null;        // to reduce footprint
}

CompletableFuture应用&源码分析

CompletableFuture介绍

平时多线程开发一般就是使用Runnable,Callable,Thread,FutureTask,ThreadPoolExecutor 这些内容和并发编程息息相关。相对来对来说成本都不高,多多使用是可以熟悉这些内容。这些内容组合在一起去解决一些并发编程的问题时,很多时候没有办法很方便的去完成异步编程的操作。
Thread + Runnable:执行异步任务,但是没有返回结果。
Thread + Callable + FutureTask:完整一个可以有返回结果的异步任务。

  • 获取返回结果,如果基于get方法获取,线程需要挂起在WaitNode里。
  • 获取返回结果,也可以基于isDone判断任务的状态,但是这里需要不断轮询

上述的方式都是有一定的局限性的。
比如说任务A,任务B,还有任务C。其中任务B还有任务C执行的前提是任务A先完成,再执行任务B和任务C。
如果任务的执行方式逻辑比较复杂,可能需要业务线程导出阻塞等待,或者是大量的任务线程去编一些任务执行的业务逻辑。对开发成本来说比较高。
CompletableFuture就是帮你处理这些任务之间的逻辑关系,编排好任务的执行方式后,任务会按照规划好的方式一步一步执行,不需要让业务线程去频繁的等待。

CompletableFuture应用

CompletableFuture应用还是需要一点点的成本的。
首先对CompletableFuture提供的函数式编程中三个函数有一个掌握。

Supplier<U> // 生产者,没有入参,有返回结果 
Consumer<T> // 消费者,有入参,但是没有返回结果 
Function<T,U>// 函数,有入参,又有返回结果

supplyAsync

CompletableFuture如果不提供线程池的话,默认使用的ForkJoinPool,而ForkJoinPool内部是守护线程,如果main线程结束了,守护线程会跟着一起结束。

public static void main(String[] args) {
    // 生产者,可以指定返回结果
    CompletableFuture<String> firstTask = CompletableFuture.supplyAsync(() -> {
        System.out.println("异步任务开始执行");
        System.out.println("异步任务执行结束");
        return "返回结果";
    });

    String result1 = firstTask.join();
    String result2 = null;

    try {
        result2 = firstTask.get();
    } catch (InterruptedException e) {
        e.printStackTrace();
    } catch (ExecutionException e) {
        e.printStackTrace();
    }

    System.out.println(result1 + "," + result2);
}

runAsync

当前方式既不会接收参数,也不会返回任何结果,非常基础的任务编排方式。

public static void main(String[] args) throws IOException {
    CompletableFuture.runAsync(()->{
        System.out.println("任务go");
        System.out.println("任务done");
    });
    System.in.read();
}

thenApply、thenApplyAsync

有任务A,还有任务B。
任务B需要在任务A执行完毕后再执行。
而且任务B需要任务A的返回结果。
任务B自身也有返回结果。
thenApply可以拼接异步任务,前置任务处理完之后,将返回结果交给后置任务,然后后置任务再执行。
thenApply提供了带有Async的方法,可以指定每个任务使用的具体线程池。

public static void main(String[] args) {
    ExecutorService executor = Executors.newFixedThreadPool(10);

    /*
    CompletableFuture<String> taskA = CompletableFuture.supplyAsync(() -> {
        String id = UUID.randomUUID().toString();
        System.out.println("执行任务A:" + id);
        return id;
    });
    CompletableFuture<String> taskB = taskA.thenApply(result -> {
        System.out.println("任务B获取到任务A结果:" + result);
        result = result.replace("-", "");
        return result;
    });
    System.out.println("main线程拿到结果:" + taskB.join());
    */

    CompletableFuture<String> taskB = CompletableFuture.supplyAsync(() -> {
        String id = UUID.randomUUID().toString();
        System.out.println("执行任务A:" + id + "," + Thread.currentThread().getName());
        return id;
    }).thenApplyAsync(result -> {
        System.out.println("任务B获取到任务A结果:" + result + "," + Thread.currentThread().getName());
        result = result.replace("-", "");
        return result;
    }, executor);
    System.out.println("main线程拿到结果:" + taskB.join());
}

thenAccept、thenAcceptAsync

套路和thenApply一样,都是任务A和任务B的拼接。
前置任务需要有返回结果,后置任务会接收前置任务的结果,返回后置任务没有返回值。

public static void main(String[] args) throws IOException {
    CompletableFuture.supplyAsync(() -> {
        System.out.println("任务A");
        return "abcdefg";
    }).thenAccept(result -> {
        System.out.println("任务b,拿到结果处理:" + result);
    });
    System.in.read();
}

thenRun、thenRunAsync

套路和thenApply,thenAccept一样,都是任务A和任务B的拼接。
前置任务没有返回结果,后置任务不接收前置任务结果,后置任务也会有返回结果。

public static void main(String[] args) throws IOException {
    CompletableFuture.runAsync(() -> {
        System.out.println("任务A!!");
    }).thenRun(() -> {
        System.out.println("任务B!!");
    });
    System.in.read();
}

thenCombine、thenAcceptBoth、runAfterBoth

比如有任务A,任务B,任务C。任务A和任务B并行执行,等到任务A和任务B全部执行完毕后,再执行任务C。
A+B ------ C
基于前面thenApply,thenAccept,thenRun知道了一般情况三种任务的概念。
thenCombine以及thenAcceptBoth还有runAfterBoth的区别是一样的。

public static void main(String[] args) throws IOException {
    CompletableFuture<Integer> taskC = CompletableFuture.supplyAsync(() -> {
        System.out.println("任务A");
        try {
            Thread.sleep(2000);
        } catch (InterruptedException e) {
            e.printStackTrace();
        }
        return 78;
    }).thenCombine(CompletableFuture.supplyAsync(() -> {
        System.out.println("任务B");
        try {
            Thread.sleep(2000);
        } catch (InterruptedException e) {
            e.printStackTrace();
        }
        return 66;
    }), (resultA, resultB) -> {
        System.out.println("任务C");
        int resultC = resultA + resultB;
        return resultC;
    });

    System.out.println(taskC.join());
    System.in.read();
}

applyToEither、acceptEither、runAfterEither

比如有任务A,任务B,任务C。任务A和任务B并行执行,只要任务A或者任务B执行完毕,开始执行任务C。
A or B ----- C
applyToEither,acceptEither,runAfterEither三个方法拼接任务的方式都是一样的。
区别依然是,可以接收结果并且返回结果,可以接收结果没有返回结果,不接收结果也没返回结果。

public static void main(String[] args) throws IOException {
    CompletableFuture<Integer> taskC = CompletableFuture.supplyAsync(() -> {
        System.out.println("任务A");
        return 78;
    }).applyToEither(CompletableFuture.supplyAsync(() -> {
        System.out.println("任务B");
        return 66;
    }), resultFirst -> {
        System.out.println("任务C");
        return resultFirst;
    });
    System.out.println(taskC.join());
    System.in.read();
}

exceptionally、henCompose、handle

exceptionally
这个也是拼接任务的方式,但是只有前面业务执行时出现异常了,才会执行当前方法来处理。
只有异常出现时,CompletableFuture的编排任务没有处理完时,才会触发。
thenCompose,handle
这两个也是异常处理的套路,可以根据方法描述发现,他的功能方向比exceptionally要更加丰富。
thenCompose可以拿到返回结果同时也可以拿到出现的异常信息,但是thenCompose本身是Consumer不能返回结果。无法帮你捕获异常,但是可以拿到异常返回的结果。
handle可以拿到返回结果同时也可以拿到出现的异常信息,并且也可以指定返回托底数据。可以捕获异常的,异常不会抛出去。

public static void main(String[] args) throws IOException {
    CompletableFuture<Integer> taskC = CompletableFuture.supplyAsync(() -> {
        System.out.println("任务A");
        // int i = 1 / 0;
        return 78;
    }).applyToEither(CompletableFuture.supplyAsync(() -> {
        System.out.println("任务B");
        return 66;
    }), resultFirst -> {
        System.out.println("任务A");
        return resultFirst;
    }).handle((r, ex) -> {
        System.out.println("handle:" + r);
        System.out.println("handle:" + ex);
        return -1;
    });
    /*
    .exceptionally(ex -> {
        System.out.println("exceptionally:" + ex);
        return -1;
    })*/
    /*
    .whenComplete((r, ex) -> {
        System.out.println("whenComplete:" + r);
        System.out.println("whenComplete:" + ex);
    });
     */
    System.out.println(taskC.join());
    System.in.read();
}

allOf,anyOf

allOf的方式是让内部编写多个CompletableFuture的任务,多个任务都执行完后,才会继续执行你后续拼接的任务。
allOf返回的CompletableFuture是void,没有返回结果。

public static void main(String[] args) throws IOException {
    CompletableFuture.allOf(
            CompletableFuture.runAsync(() -> {
                try {
                    Thread.sleep(1000);
                } catch (InterruptedException e) {
                    e.printStackTrace();
                }
                System.out.println("任务A");
            }),
            CompletableFuture.runAsync(() -> {
                try {
                    Thread.sleep(2000);
                } catch (InterruptedException e) {
                    e.printStackTrace();
                }
                System.out.println("任务B");
            }),
            CompletableFuture.runAsync(() -> {
                try {
                    Thread.sleep(3000);
                } catch (InterruptedException e) {
                    e.printStackTrace();
                }
                System.out.println("任务C");
            })
    ).thenRun(() -> {
        System.out.println("任务D");
    });

    System.in.read();
}

anyOf是基于多个CompletableFuture的任务,只要有一个任务执行完毕就继续执行后续,最先执行完的任务做作为返回结果的入参。

public static void main(String[] args) throws IOException {
    CompletableFuture.anyOf(
            CompletableFuture.supplyAsync(() -> {
                try {
                    Thread.sleep(1000);
                } catch (InterruptedException e) {
                    e.printStackTrace();
                }
                System.out.println("任务A");
                return "A";
            }),
            CompletableFuture.supplyAsync(() -> {
                try {
                    Thread.sleep(2000);
                } catch (InterruptedException e) {
                    e.printStackTrace();
                }
                System.out.println("任务B");
                return "B";
            }),
            CompletableFuture.supplyAsync(() -> {
                try {
                    Thread.sleep(3000);
                } catch (InterruptedException e) {
                    e.printStackTrace();
                }
                System.out.println("任务C");
                return "C";
            })).thenAccept(r -> {
        System.out.println("任务D执行," + r + "先执行完毕的");
    });
    System.in.read();
}

CompletableFuture源码分析

CompletableFuture的源码内容特别多。不需要把所有源码都看了,更多的是要掌握整个CompletableFuture的源码执行流程,以及任务的执行时机。
从CompletableFuture中比较简单的方法作为分析的入口,从而掌握整体执行的流程。

当前任务执行方式

将任务和CompletableFuture封装到一起,再执行封住好的具体对象的run方法即可。

 // 提交任务到CompletableFuture
public static CompletableFuture<Void> runAsync(Runnable runnable) {
    // asyncPool:执行任务的线程池
	// runnable:具体任务。
    return asyncRunStage(asyncPool, runnable);
}

// 内部执行的方法
static CompletableFuture<Void> asyncRunStage(Executor e, Runnable f) {
    // 对任务做非空校验
    if (f == null) throw new NullPointerException();
    // 直接构建了CompletableFuture的对象,作为最后的返回结果
    CompletableFuture<Void> d = new CompletableFuture<Void>();
    // 将任务和CompletableFuture对象封装为了AsyncRun的对象
	// 将封装好的任务交给了线程池去执行
    e.execute(new AsyncRun(d, f));
    // 返回构建好的CompletableFuture
    return d;
}

// 封装任务的AsyncRun类信息
static final class AsyncRun extends ForkJoinTask<Void>
        implements Runnable, AsynchronousCompletionTask {
    // 声明存储CompletableFuture对象以及任务的成员变量
    CompletableFuture<Void> dep;
    Runnable fn;
    // 将传入的属性赋值给成员变量
    AsyncRun(CompletableFuture<Void> dep, Runnable fn) {
        this.dep = dep; this.fn = fn;
    }

    public final Void getRawResult() { return null; }
    public final void setRawResult(Void v) {}
    public final boolean exec() { run(); return true; }

	// 当前对象作为任务提交给线程池之后,必然会执行当前方法
    public void run() {
        // 声明局部变量
        CompletableFuture<Void> d; Runnable f;
        // 将成员变量赋值给局部变量,并且做非空判断
        if ((d = dep) != null && (f = fn) != null) {
            // help GC,将成员变量置位null,只要当前任务结束后,成员变量也拿不到引用。
            dep = null; fn = null;
            // 先确认任务没有执行。
            if (d.result == null) {
                try {
                    // 直接执行任务
                    f.run();
                    // 当前方法是针对Runnable任务的,不能将结果置位null 
                    // 要给没有返回结果的Runnable做一个返回结果
                    d.completeNull();
                } catch (Throwable ex) {
                    // 异常结束!
                    d.completeThrowable(ex);
                }
            }
            d.postComplete();
        }
    }
}

任务编排的存储&执行方式

首先如果要在前继任务处理后,执行后置任务的话。
有两种情况:

  • 前继任务如果没有执行完毕,后置任务需要先放在stack栈结构中存储。
  • 前继任务已经执行完毕了,后置任务就应该直接执行,不需要在往stack中存储了。

如果单独采用thenRun在一个任务后面指定多个后继任务,CompletableFuture无法保证具体的执行顺序,而影响执行顺序的是前继任务的执行时间,以及后置任务编排的时机。

任务编排流程

// 编排任务,前继任务搞定,后继任务再执行
public CompletableFuture<Void> thenRun(Runnable action) {
    // 执行了内部的uniRunStage方法, 
    // null:线程池,现在没给。
	// action:具体要执行的任务
    return uniRunStage(null, action);
}

// 内部编排任务方法
private CompletableFuture<Void> uniRunStage(Executor e, Runnable f) {
    // 后继任务不能为null,健壮性判断
    if (f == null) throw new NullPointerException();
    // 创建CompletableFuture对象d,与后继任务f绑定
    CompletableFuture<Void> d = new CompletableFuture<Void>();
    // 如果线程池不为null,代表异步执行,将任务压栈
	// 如果线程池是null,先基于uniRun尝试下,看任务能否执行
    if (e != null || !d.uniRun(this, f, null)) {
        // 如果传了线程池,这边需要走一下具体逻辑 
        // e:线程池
		// d:后继任务的CompletableFuture
		// this:前继任务的CompletableFuture
		// f:后继任务
        UniRun<T> c = new UniRun<T>(e, d, this, f);
        // 将封装好的任务,push到stack栈结构
		// 只要前继任务没结束,这边就可以正常的将任务推到栈结构中
		// 放入栈中可能会失败
        push(c);
        // 无论压栈成功与否,都要尝试执行以下。
        c.tryFire(SYNC);
    }
    // 无论任务执行完毕与否,都要返回后继任务的CompletableFuture
    return d;
}

查看后置任务执行时机

任务在编排到前继任务时,因为前继任务已经结束了,这边后置任务会主动的执行。

// 后置任务无论压栈成功与否,都需要执行tryFire方法
static final class UniRun<T> extends UniCompletion<T,Void> {
    Runnable fn;
    // executor:线程池
	// dep:后置任务的CompletableFuture
	// src:前继任务的CompletableFuture
	// fn:具体的任务
    UniRun(Executor executor, CompletableFuture<Void> dep,
           CompletableFuture<T> src, Runnable fn) {
        super(executor, dep, src); this.fn = fn;
    }
    final CompletableFuture<Void> tryFire(int mode) {
        // 声明局部变量
        CompletableFuture<Void> d; CompletableFuture<T> a;
        // 赋值局部变量
		// (d = dep) == null:赋值加健壮性校验
        if ((d = dep) == null ||
            // 调用uniRun。
			// a:前继任务的CompletableFuture
			// fn:后置任务
			// 第三个参数:传入的是this,是UniRun对象
            !d.uniRun(a = src, fn, mode > 0 ? null : this))
            // 进到这,说明前继任务没结束,等!
            return null;
        dep = null; src = null; fn = null;
        return d.postFire(a, mode);
    }
}

// 是否要主动执行任务
final boolean uniRun(CompletableFuture<?> a, Runnable f, UniRun<?> c) {
    // 方法要么正常结束,要么异常结束
    Object r; Throwable x;
    // a == null:健壮性校验
	// (r = a.result) == null:判断前继任务结束了么? 
	// f == null:健壮性校验
    if (a == null || (r = a.result) == null || f == null)
        // 到这代表任务没结束。
        return false;
    // 后置任务执行了没? == null,代表没执行
    if (result == null) {
        // 如果前继任务的结果是异常结束。如果前继异常结束,直接告辞,封装异常结果
        if (r instanceof AltResult && (x = ((AltResult)r).ex) != null)
            completeThrowable(x, r);
        else
            // 到这,前继任务正常结束,后置任务正常执行
            try {
                // 如果基于tryFire(SYNC)进来,这里的C不为null,执行c.claim 
                // 如果是因为没有传递executor,c就是null,不会执行c.claim
                if (c != null && !c.claim())
                    // 如果返回false,任务异步执行了,直接return false
                    return false;
                // 如果claim没有基于线程池运行任务,那这里就是同步执行
				// 直接f.run了。
                f.run();
                // 封装Null结果
                completeNull();
            } catch (Throwable ex) {
                // 封装异常结果
                completeThrowable(ex);
            }
    }
    return true;
}

// 异步的线程池处理任务
final boolean claim() {
    Executor e = executor;
    if (compareAndSetForkJoinTaskTag((short)0, (short)1)) {
        // 只要有线程池对象,不为null
        if (e == null)
            return true;
        executor = null; // disable
        // 基于线程池的execute去执行任务
        e.execute(this);
    }
    return false;
}

前继任务执行完毕后,基于嵌套的方式执行后置。

// A:嵌套了B+C, B:嵌套了D+E
// 前继任务搞定,遍历stack执行后置任务
// A任务处理完,解决嵌套的B和C
final void postComplete() {
    // f:前继任务的CompletableFuture
	// h:存储后置任务的栈结构
    CompletableFuture<?> f = this; Completion h;
    // (h = f.stack) != null:赋值加健壮性判断,要确保栈中有数据
    while ((h = f.stack) != null ||
           // 循环一次后,对后续节点的赋值以及健壮性判断,要确保栈中有数据
           (f != this && (h = (f = this).stack) != null)) {
        // t:当前栈中任务的后续任务
        CompletableFuture<?> d; Completion t;
        // 拿到之前的栈顶h后,将栈顶换数据
        if (f.casStack(h, t = h.next)) {
            if (t != null) {
                if (f != this) {
                    pushStack(h);
                    continue;
                }
                h.next = null;    // detach
            }
            // 执行tryFire方法,
            f = (d = h.tryFire(NESTED)) == null ? this : d;
        }
    }
}

// 回来了 NESTED == -1
final CompletableFuture<V> tryFire(int mode) {
    CompletableFuture<V> d; CompletableFuture<T> a;
    if ((d = dep) == null ||
        !d.uniHandle(a = src, fn, mode > 0 ? null : this))
        return null;
    dep = null; src = null; fn = null;
    // 内部会执行postComplete,运行B内部嵌套的D和E
    return d.postFire(a, mode);
}

CompletableFuture执行流程图

在这里插入图片描述

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.coloradmin.cn/o/380116.html

如若内容造成侵权/违法违规/事实不符,请联系多彩编程网进行投诉反馈,一经查实,立即删除!

相关文章

DevOps 学习笔记(一) | DevOps 简介及环境搭建

1. 环境配置 本次实验需要三台服务器CI/CD 服务器、应用服务器和Harbor 服务器 DevOps 步骤 程序员将代码 push 到代码仓库Jenkins 根据触发条件拉取代码到CI/CD 服务器Jenkins 使用 Maven 将代码 build 成 jar 包Jenkins 使用 jar 包通过 Dockerfile 和 docker-compose.yml…

HBase JMX 指标学习

名词解释&#xff1a; JMX&#xff1a;Java Management Extensions&#xff0c;用于用于Java程序扩展监控和管理项。 GC&#xff1a;Garbage Collection&#xff0c;垃圾收集&#xff0c;垃圾回收机制。 1、概述 说到对Hadoop和 HBase的集群监控&#xff0c;大家知道的和用…

YOLOv8详解 【网络结构+代码+实操】

文章目录YOLOv8 概述模型结构Loss 计算训练数据增强训练策略模型推理过程网络模型解析卷积神经单元&#xff08;model.py&#xff09;Yolov8实操快速入门环境配置数据集准备模型的训练/验证/预测/导出使用CLI使用python多任务支持检测实例分割分类配置设置操作类型训练预测验证…

FastDDS-4.RTPS层

4. RTPS层 eprosima Fast DDS的较低层RTPS层是RTPS标准协议的实现。与DDS层相比&#xff0c;该层提供了对通信协议内部的更多控制&#xff0c;因此高级用户可以更好地控制库的功能。 4.1 与DDS层的关系 该层的元素与DDS层的元素一一对应&#xff0c;并添加了一些元素。该对应…

【使用两个栈实现队列】

文章目录一、栈和队列的基本特点二、基本接口函数的实现1.栈的接口2.创建队列骨架3.入队操作4.取出队列元素5.返回队首元素6.判断队列是否为空7.销毁队列总结一、栈和队列的基本特点 栈的特点是后进先出&#xff0c;而队列的特点是先进先出。 使用两个栈实现队列&#xff0c;必…

【DataX】数据同步到PG时遇到的分区不存在问题

数据同步到PG时遇到的分区不存在问题前言正文问题分析解决方法结语前言 大概说下这个问题牵扯出来的背景&#xff0c;一个外场项目&#xff0c;选型用PG存业务数据&#xff0c;然后客户要求保存保留一年的数据&#xff0c;运行到现在服务器5个T的磁盘已经有点扛不住了&#xf…

内存的管理

取指令——译码——执行——返存 计组课我们学过cpu真正读指令并非是从内存中读入&#xff0c;而是从cache读和存&#xff0c;再由cache进行取指或返存&#xff0c;因为cpu指令周期比内存周期速度快很多&#xff0c;cpu若要取指或返存都需要等待内存完成他的动作才可以进行下一…

python爬虫:如何定义内容提取器

项目背景 在python 即时网络爬虫项目启动说明中我们讨论一个数字&#xff1a;程序员浪费在调测内容提取规则上的时间&#xff0c;从而我们发起了这个项目&#xff0c;把程序员从繁琐的调测规则中解放出来&#xff0c;投入到更高端的数据处理工作中。 解决方案 为了解决这个问题…

微信小程序使用scss编译wxss文件的配置步骤

文章目录1、在 vscode 中搜索 easysass 插件并安装2、在微信开发工具中导入安装的easysass插件3、修改 spook.easysass-0.0.6/package.json 文件中的配置4、重启开发者工具&#xff0c;就可用使用了微信小程序开发者工具集成了 vscode 编辑器&#xff0c;可以使用 vscode 中众多…

C++修炼之练气期三层——函数重载

目录 1.引例 2.函数重载的概念 3.C支持函数重载的原理 1.引例 倘若现在要实现一个加法计算器&#xff0c;用C语言实现的话我们会选择这样的方式&#xff1a; int Add_int(int a, int b) {return a b; }double Add_double(double a, double b) {return a b; } 在使用加…

Exposure2023专业摄影RAW格式大师专业滤镜特效

Exposure2023是一款专为摄影艺术设计的图像编辑器。新的 Exposure2023结合了专业级的照片调整、庞大的华丽照片库和令人愉悦的高效设计。可以提供最大&#xff0c;最准确的电影外观选择。Exposure的创意外观不仅限于电影模拟&#xff0c;从干净优雅的现代风格到引人注目的色彩变…

SpringBoot+Nacos+OpenFeign环境搭建

目录 1.boot方式nacos与openFeign集成 1.引入依赖 2.添加配置 3.测试接口调用 4.常见问题&#xff1a; 1.版本依赖 2.nacos客户端 2.cloud方式nacos与openFeign集成 1.引入依赖 2.添加配置 3.接口定义 4.开启FeignClients客户端 5.远程接口测试 6.Nacos配置中心 1…

Java - 数据结构,二叉树

一、什么是树 概念 树是一种非线性的数据结构&#xff0c;它是由n&#xff08;n>0&#xff09;个有限结点组成一个具有层次关系的集合。把它叫做树是因为它看起来像一棵倒挂的树&#xff0c;也就是说它是根朝上&#xff0c;而叶朝下的。它具有以下的特点&#xff1a; 1、有…

SAP ERP系统MM模块常用增强之四:采购申请输入字段的校验检查

在SAP/ERP项目的实施中采购管理模块&#xff08;MM&#xff09;的创建和修改采购申请一般都会有输入字段校验检查的需求&#xff0c;来防止业务人员录入错误或少录入数据&#xff0c;这方面需求部分是可以通过配置实现&#xff0c;比如一些字段是否必输&#xff0c;是否显示等&…

WebRTC拥塞控制算法——GCC介绍

网络拥塞是基于IP协议的数据报交换网络中常见的一种网络传输问题&#xff0c;它对网络传输的质量有严重的影响&#xff0c; 网络拥塞是导致网络吞吐降低&#xff0c; 网络丢包等的主要原因之一&#xff0c; 这些问题使得上层应用无法有效的利用网络带宽获得高质量的网络传输效果…

C++——智能指针1

目录 RAII auto_ptr模拟实现 智能指针拷贝问题 唯一指针 shared_ptr&#xff08;可以拷贝&#xff09; shared_ptr模拟实现 完整代码 循环引用 weak_ptr模拟实现 定制删除器 shared_ptr定制删除器模拟实现 内存泄漏 RAII RAII&#xff08;Resource Acquisit…

SkyWalking使用案例

SkyWalking监控java项目Halo博客 Halo是一个开源的博客项目&#xff0c;使用java编写&#xff0c;官网地址&#xff1a;https://halo.run/ 安装java环境&#xff0c;Halo对java版本有限制&#xff0c;1.4.3版本以上需要使用java11以上 apt -y install openjdk-11-jdk java -…

matplotlib常用操作

文章目录1 matplotlib绘图1.1 绘图步骤2 matplotlib基本元素2.1 matplotlib 画布2.2 设置坐标轴长度和范围2.3 设置图形的线型和颜色2.4 设置图形刻度范围、刻度标签和坐标轴标签等2.4.1 设置刻度范围2.4.2 设置坐标轴刻度2.5 文本标签图例3 matplotlib的ax对象绘图4 绘制子图5…

2.3 黑群晖驱动:开启nvme缓存、将nvme缓存作为存储盘 教程

黑群晖驱动安装工具下载&#xff1a;链接&#xff1a;https://pan.baidu.com/s/1CMLl6waOuW-Ys2gKZx7Jgg?pwdchct提取码&#xff1a;chct一、开启NVME缓存在群辉->控制面板->终端机和SNMP->终端机中 勾选“启动SSH”功能&#xff0c;并点击应用&#xff1b;下载安装P…

HACKTHEBOX——Teacher

nmapnmap -sV -sC -p- -T4 -oA nmap 10.10.10.153nmap只发现了对外开放了80端口&#xff0c;从http-title看出可能是某个中学的官网http打开网站确实是一个官网&#xff0c;查看每个接口看看有没有可以利用的地方发现了一个接口&#xff0c;/images/5.png&#xff0c;但是响应包…