JDK19 - 虚拟线程详解

news2024/11/14 13:17:40

JDK19 - 虚拟线程详解

  • 前言
  • 一. Continuation 和 虚拟线程
    • 1.1 Continuation 案例
    • 1.2 Continuation 内的重要成员
    • 1.3 run() 执行/恢复执行
    • 1.4 yield() 暂停执行
    • 1.5 测试和小总结
  • 二. VirtualThread 解读
    • 2.1 VirtualThread 内的重要成员和构造
    • 2.2 VirtualThread 的首次执行
    • 2.3 结束阻塞再次调度
    • 2.4 跟着Debug走一遍

前言

之前在 Virtual Thread 虚拟线程探究 这篇文章里面主要讲了一下虚拟线程的使用和简要的介绍。这篇文章我们就来深入学习一下相关的原理。

虚拟线程的实现可以由两个部分组成:

  • Continuation:一种提供执行和暂停函数的服务类。
  • Scheduler:执行器。负责将虚拟线程挂载到平台线程上。底层交给ForkJoinPool执行。

一. Continuation 和 虚拟线程

虚拟线程的实现,底层重度依赖于Continuation这个类的实现。

  1. Loom的愿景是啥?write sync run async
  2. 那遇到同步阻塞(write sync)的时候怎么办?将底层切换为异步非阻塞(run async)。
  3. 异步事件处理完了之后怎么办?需要切回原先的代码点继续执行。

从上面可以发现,有两个重要的功能点就是:

  • 同步切异步:暂停执行。
  • 异步处理完毕时切同步:恢复执行。

虚拟线程会把调度任务包装到一个Continuation 实例中,在里面主要完成上面两件事情

  • 当任务需要阻塞挂起的时候,调用 Continuationyield操作进行阻塞。
  • 任务需要解除阻塞继续执行的时候,则调用 Continuationrun恢复执行。

1.1 Continuation 案例

我们用一个简单的案例,让大家直观的感受到Continuation的作用和神奇之处。不过在此之前,Continuation属于非常底层的一种API,常规情况下,我们无法直接调用,因此我们在编写测试用例的时候,需要添加相关的参数。我们给Java Compiler添加以下参数:

--add-exports java.base/jdk.internal.vm=ALL-UNNAMED

如图:
在这里插入图片描述
代码如下:

@org.junit.Test
public void testContinuation(){
    ContinuationScope scope = new ContinuationScope("scope");
    Continuation continuation = new Continuation(scope, () -> {
        System.out.println("Running before yield");
        Continuation.yield(scope);
        System.out.println("Running after yield");
    });
    System.out.println("First run");
    // 第一次执行Continuation.run
    continuation.run();
    System.out.println("Second run");
    // 第二次执行Continuation.run
    continuation.run();
    System.out.println("Done");
}

如果运行时候还是报错了:

java.lang.IllegalAccessError: class Test (in unnamed module @0x4d76f3f8) cannot access class jdk.internal.vm.ContinuationScope (in module java.base) because module java.base does not export jdk.internal.vm to unnamed module @0x4d76f3f8

我们给UT添加相关的VM参数即可:在这里插入图片描述
再次运行即可成功执行:在这里插入图片描述
从这个运行结果我们可以看出来:

  1. Continuation实例进行yield调用后进入阻塞。
  2. 再次调用其run方法就可以从yield的调用之处往下执行,从而实现了程序的中断和恢复。

1.2 Continuation 内的重要成员

public class Continuation {
    // 是否开启本地缓存
    private static final boolean PRESERVE_EXTENT_LOCAL_CACHE;
    // 主要用于对 Java 核心类库中的一些非公开方法和字段的访问
    private static final JavaLangAccess JLA = SharedSecrets.getJavaLangAccess();
    // 实际运行的 runnable,外部传入
    private final Runnable target;
    // scope对象,使用同一scope的Continuation可以相互之间yield
    private final ContinuationScope scope;
    // 父节点
    private Continuation parent;
    // 子节点
    private Continuation child; 
    // 栈内存空间
    private StackChunk tail;
    // 当前Continuation是否已完成
    private boolean done;
    // 装载状态
    private volatile boolean mounted = false;
    // yield信息
    private Object yieldInfo;
    // 标记一个未挂载的Continuation是否通过强制抢占式卸载
	private boolean preempted;
	public Continuation(ContinuationScope scope, Runnable target) {
        this.scope = scope;
        this.target = target;
    }
}

我们从这个成员结构上可以发现几个重要的点:

  1. Continuation的数据结构是一个链表。有父节点和子节点。
  2. Continuation中唯一的构造函数有俩参数:一个是ContinuationScope(用于标识Continuation)。一个是Runnable:我们要执行的任务。

如图:
在这里插入图片描述
我们再来说下ContinuationScopeContinuation的关系。

ContinuationScope是一个用于标识Continuation的作用域的类。它可以被看作是一个上下文环境,用于将Continuation与特定的执行环境相关联。在协程或轻量级线程的实现中,ContinuationScope通常用于区分不同的协程或线程,以便在不同的上下文中执行Continuation

Continuation是一个表示协程或轻量级线程的对象。它可以被看作是一个可以中断和恢复的执行单元。通过Continuation可以实现在不同的执行环境中暂停和恢复执行,从而实现协程的切换和轻量级线程的执行。

同时两者满足:

  • 一个Continuation必须绑定一个ContinuationScope上下文环境。
  • 一个ContinuationScope上下文环境可以绑定多个Continuation

1.3 run() 执行/恢复执行

public final void run() {
    while (true) {
        // 进行线程装载
        mount();
        JLA.setExtentLocalCache(extentLocalCache);
		// 如果这个任务已经执行完毕了,就抛异常
        if (done)
            throw new IllegalStateException("Continuation terminated");
		// 获取当前虚拟线程对应的运载线程
        Thread t = currentCarrierThread();
        // 如果parent和child都执行了yield,但是child先执行run。倘若当前线程和父
        if (parent != null) {
            if (parent != JLA.getContinuation(t))
                throw new IllegalStateException();
        } else
            this.parent = JLA.getContinuation(t);
        // 运载线程设置当前Continuation实例
        JLA.setContinuation(t, this);

        try {
            boolean isVirtualThread = (scope == JLA.virtualThreadContinuationScope());
            // 此处判断是否存在堆栈内存空间,如不存在则说明未开始
            if (!isStarted()) { 
                // 相当于执行我们的task任务了
                enterSpecial(this, false, isVirtualThread);
            } else {
                assert !isEmpty();
                // 如果执行过了,那么isContinue为true,代表继续执行。
                enterSpecial(this, true, isVirtualThread);
            }
        } finally {
            fence();
            try {
                // 清理
                assert isEmpty() == done : "empty: " + isEmpty() + " done: " + done + " cont: " + Integer.toHexString(System.identityHashCode(this));
                // 当前Continuation执行完毕,那么重新将Continuation指向父节点,链表执行。
                JLA.setContinuation(currentCarrierThread(), this.parent);
                // 如果有父节点,那么清理一下子节点(说明这个子节点被执行过了)
                if (parent != null)
                    parent.child = null;
				// 进行后置的yield清理工作
                postYieldCleanup();
				// 进行unmount卸载操作
                unmount();
                // 判断是否需要保留当前线程的本地缓存并处理
                if (PRESERVE_EXTENT_LOCAL_CACHE) {
                    extentLocalCache = JLA.extentLocalCache();
                } else {
                    extentLocalCache = null;
                }
                JLA.setExtentLocalCache(null);
            } catch (Throwable e) { e.printStackTrace(); System.exit(1); }
        }
        // 到这里为止,我们就来到了父Continuation
        assert yieldInfo == null || yieldInfo instanceof ContinuationScope;
        // 两种可能
        // 情况一:执行完了,清除相关引用并结束死循环(返回)
        if (yieldInfo == null || yieldInfo == scope) {
            this.parent = null;
            this.yieldInfo = null;
            return;
        } else {
        	// 此时是子Continuation执行了yield,那么需要将控制权转义给父Continuation来进行yield操作
            parent.child = this;
            parent.yield0((ContinuationScope)yieldInfo, this);
            parent.child = null;
        }
    }
}

1.4 yield() 暂停执行

我们来看下相关代码:

// 将当前的延续挂起到给定范围
public static boolean yield(ContinuationScope scope) {
	// 获取当前运载线程的Continuation 
    Continuation cont = JLA.getContinuation(currentCarrierThread());
    Continuation c;
    // 基于当前的 Continuation 向父节点方向遍历寻找。直到找到一个节点的ContinuationScope(上下文环境) 
    // 和当前的上下文环境不一致的时候停止。也就是找到当前上下文环境里面,Continuation边界
    for (c = cont; c != null && c.scope != scope; c = c.parent)
        ;
    // 找不到就抛异常
    if (c == null)
        throw new IllegalStateException("Not in scope " + scope);
    // 找到了就调用yield函数,将当前执行权交给父Continuation 
    return cont.yield0(scope, null);
}

private boolean yield0(ContinuationScope scope, Continuation child) {
    // 代码将`preempted`变量设置为false,表示当前的Continuation对象没有被抢占。
    preempted = false;
    // 检查当前传入的scope(上下文)和当前Continuation的scope是否已只,若不相等,说明需要切换到不同的scope
    // 那么就将传入的scope赋值给当前Continuation对象的yieldInfo信息中,表示要在父Continuation中进行yield操作
    // 这里和run函数的最后处理做对其(else分支)
    if (scope != this.scope)
        this.yieldInfo = scope;
    // 进行yield操作
    int res = doYield();
    U.storeFence(); // needed to prevent certain transformations by the compiler

    assert scope != this.scope || yieldInfo == null : "scope: " + scope + " this.scope: " + this.scope + " yieldInfo: " + yieldInfo + " res: " + res;
    assert yieldInfo == null || scope == this.scope || yieldInfo instanceof Integer : "scope: " + scope + " this.scope: " + this.scope + " yieldInfo: " + yieldInfo + " res: " + res;
    // 若child不是null,说明当前Continuation对象是子Continuation,那么需要把结果传递给父Continuation
    if (child != null) { // TODO: ugly
        if (res != 0) {
            child.yieldInfo = res;
        } else if (yieldInfo != null) {
            assert yieldInfo instanceof Integer;
            child.yieldInfo = yieldInfo;
        } else {
            child.yieldInfo = res;
        }
        this.yieldInfo = null;
    } else {
    	// 若当前是父Continuation,那么根据yield结果做不同处理
        if (res == 0 && yieldInfo != null) {
            res = (Integer)yieldInfo;
        }
        this.yieldInfo = null;

        if (res == 0)
        	// 续执行前回调
            onContinue();
        else
        	// Continuation固定在运载线程前回调
            onPinned0(res);
    }
    assert yieldInfo == null;

    return res == 0;
}

1.5 测试和小总结

其实对上面的流程进行一个简要的总结就是。

针对 run() 执行/恢复执行:

  1. 先进行装载。把Continuation实例和运载线程进行绑定。
  2. 判断是否存在堆栈内存空间,若存在,说明之前已经执行过一部分调用栈了。那么继续执行(倘若执行过调用栈,那么会把相关数据信息存储到堆内存中)。
  3. 若不存在,则完整的执行一遍调用栈即可。
  4. 当前Continuation执行完毕,卸载。然后更新Continuation指向为父Continuation
  5. yieldInfo就是当前的scope环境或者为null,说明执行完毕,退出死循环。
  6. 否则,说明是子Continuation执行了yield函数,那么此时需要将控制权交给父Continuation

针对 yield() 暂停执行:

  1. 会从当前Continuation实例开始向父节点遍历寻找scope边界处的Continuation。将控制权交给最顶层的Continuation(前提是同一个scope上下文)
  2. 进行yield操作进入阻塞。
  3. 如果当前是子Continuation,将结果传递给父Continuation
  4. 如果当前是父Continuation,那么针对yield结果做不同处理。比如是否要继续执行当前Continuation

案例如下:

@org.junit.Test
public void test4(){
    ContinuationScope scope1 = new ContinuationScope("scope1");

    ContinuationScope scope2 = new ContinuationScope("scope2");

    ContinuationScope scope3 = new ContinuationScope("scope3");

    Continuation child2 = new Continuation(scope3, () -> {
        System.out.println("before scope yield");
        Continuation.yield(scope1);
        System.out.println("after scope yield");
    });

    Continuation child1 = new Continuation(scope2, () -> {
        System.out.println("before child2 run");
        child2.run();
        System.out.println("after child2 run");
    });

    Continuation continuation = new Continuation(scope1, () ->  {
        System.out.println("before child1 run");
        child1.run();
        System.out.println("after child1 run");
    });

    System.out.println("before run");
    continuation.run();
    System.out.println("before run again");
    continuation.run();
    System.out.println("end");
}

最终输出结果如下:
在这里插入图片描述

执行yield的时候,我们传入的是scope1,但是当前的上下文却是scope3。发现两者并不一致,因此根据代码逻辑,就会将控制权交给scope1本身。此时scope1进入阻塞。
在这里插入图片描述
用泳道图表示如下:
在这里插入图片描述

二. VirtualThread 解读

我们从案例出发:

private static int sendHttpRequest() {
    try {
        HttpClient client = HttpClient.newHttpClient();
        HttpRequest request = HttpRequest.newBuilder()
                .uri(URI.create("https://www.google.com/"))
                .build();
        HttpResponse<String> httpResponse = client.send(request, HttpResponse.BodyHandlers.ofString());
        return httpResponse.statusCode();
    } catch (Exception e) {
        throw new RuntimeException(e);
    }
}

@org.junit.Test
public void testVT()throws Exception{
    Runnable sendHttpTask = () -> {
        System.out.println(new Random().nextInt());
        // 虚线程执行过程中,同步阻塞式发送Http请求
        System.out.println(sendHttpRequest());
        System.out.println(new Random().nextInt());
    };
    // 1. 创建虚线程
    Thread virtualThread = Thread.ofVirtual().unstarted(sendHttpTask);
    // 2. 启动虚线程
    virtualThread.start();
    // 等待虚线程执行完毕
    virtualThread.join();
}

2.1 VirtualThread 内的重要成员和构造

虚拟线程VirtualThread构造:

private static final Unsafe U = Unsafe.getUnsafe();
private static final ContinuationScope VTHREAD_SCOPE = new ContinuationScope("VirtualThreads");
private static final ForkJoinPool DEFAULT_SCHEDULER = createDefaultScheduler();
private static final ScheduledExecutorService UNPARKER = createDelayedTaskScheduler();
private static final int TRACE_PINNING_MODE = tracePinningMode();
private static final long STATE = U.objectFieldOffset(VirtualThread.class, "state");
private static final long PARK_PERMIT = U.objectFieldOffset(VirtualThread.class, "parkPermit");
private static final long CARRIER_THREAD = U.objectFieldOffset(VirtualThread.class, "carrierThread");
private static final long TERMINATION = U.objectFieldOffset(VirtualThread.class, "termination");
private final Executor scheduler;
private final Continuation cont;
private final Runnable runContinuation;
private volatile int state;
private static final int NEW      = 0;
private static final int STARTED  = 1;
private static final int RUNNABLE = 2;     // runnable-unmounted
private static final int RUNNING  = 3;     // runnable-mounted
private static final int PARKING  = 4;
private static final int PARKED   = 5;     // unmounted
private static final int PINNED   = 6;     // mounted
private static final int YIELDING = 7;     // Thread.yield
private static final int TERMINATED = 99;  // final state
private static final int SUSPENDED = 1 << 8;
private static final int RUNNABLE_SUSPENDED = (RUNNABLE | SUSPENDED);
private static final int PARKED_SUSPENDED   = (PARKED | SUSPENDED);
private volatile boolean parkPermit;
private volatile Thread carrierThread;
private volatile CountDownLatch termination;

挑几个重点:

  • DEFAULT_SCHEDULER :默认的调度器。底层是ForkJoinPool
  • UNPARKER :调度线程实例,用于唤醒带超时阻塞的虚拟线程实例。用于sleep的唤醒操作。
  • TRACE_PINNING_MODEpined thread的跟踪模式。
  • contContinuation实例。主要负责虚拟线程的阻塞和继续执行操作。
  • runContinuationContinuation实例中包装的Runnable实例。
  • state:虚拟线程的状态。由JVM直接访问和修改。
  • parkPermitpark操作的许可。
  • carrierThread:运载线程实例。
  • termination:一个栅栏值,用于join操作。

再来看下它的唯一构造器:

VirtualThread(Executor scheduler, String name, int characteristics, Runnable task) {
    super(name, characteristics, /*bound*/ false);
    Objects.requireNonNull(task);

    // 若调度器非空,直接使用,这个分支不会走到。
    if (scheduler == null) {
        Thread parent = Thread.currentThread();
        // 如果父线程是虚拟线程,那么使用父虚拟线程的调度器
        if (parent instanceof VirtualThread vparent) {
            scheduler = vparent.scheduler;
        } else {
        	// 否则使用默认的调度器
            scheduler = DEFAULT_SCHEDULER;
        }
    }

    this.scheduler = scheduler;
    this.cont = new VThreadContinuation(this, task);
    this.runContinuation = this::runContinuation;
}

主要做了这么几件事:

  1. 给定一个默认的调度器ForkJoinPool
  2. 初始化一个Continuation。在Continuation的基础上封装成一个VThreadContinuation
  3. 封装一下ContinuationRunnable,最终将它提交给调度器来执行。

我们看一下VThreadContinuation,它继承于Continuation

private static class VThreadContinuation extends Continuation {
    VThreadContinuation(VirtualThread vthread, Runnable task) {
    	// 父类Continuation构造,传入一个Scope(上下文环境)和一个VirtualThread.run()函数。
        super(VTHREAD_SCOPE, () -> vthread.run(task));
    }
    // 同时重写了onPinned函数,基于跟踪模式决定pinned线程栈的打印策略
    @Override
    protected void onPinned(Continuation.Pinned reason) {
        if (TRACE_PINNING_MODE > 0) {
            boolean printAll = (TRACE_PINNING_MODE == 1);
            PinnedThreadPrinter.printStackTrace(System.out, printAll);
        }
    }
}

而我们上面创建虚拟线程时调用的代码,实际上就是调用构造函数进行了一系列的初始化动作:

Thread virtualThread = Thread.ofVirtual().unstarted(sendHttpTask);

紧接着执行了start函数:

virtualThread.start();
↓↓↓↓↓↓↓↓↓↓  VirtualThread.start()  ↓↓↓↓↓↓↓↓↓↓
@Override
void start(ThreadContainer container) {
    // 尝试将虚拟线程的状态改为 STARTED
    if (!compareAndSetState(NEW, STARTED)) {
        throw new IllegalThreadStateException("Already started");
    }
    // bind thread to container
    setThreadContainer(container);
    // start thread
    boolean started = false;
    container.onStart(this); // may throw
    try {
        // extent locals may be inherited
        inheritExtentLocalBindings(container);
        // 平台代码完成这段代码的执行
        submitRunContinuation();
        started = true;
    } finally {
        if (!started) {
            setState(TERMINATED);
            container.onExit(this);
            afterTerminate(/*executed*/ false);
        }
    }
}
↓↓↓↓↓↓↓↓↓↓  VirtualThread.submitRunContinuation()  ↓↓↓↓↓↓↓↓↓↓
private void submitRunContinuation() {
    submitRunContinuation(false);
}
↓↓↓↓↓↓↓↓↓↓  VirtualThread.submitRunContinuation()  ↓↓↓↓↓↓↓↓↓↓
private void submitRunContinuation(boolean lazySubmit) {
    try {
    	// 是否开启延迟提交
        if (lazySubmit && scheduler instanceof ForkJoinPool pool) {
            pool.lazySubmit(ForkJoinTask.adapt(runContinuation));
        } else {
        	// 倘若不开启,就把runContinuation任务提交。
            scheduler.execute(runContinuation);
        }
    } catch (RejectedExecutionException ree) {
        // ...省略
    }
}
↓↓↓↓↓↓↓↓↓↓  VirtualThread.runContinuation()  ↓↓↓↓↓↓↓↓↓↓
private void runContinuation() {
    // 从这段代码可以发现,到目前为止的执行任务都是交给平台线程来执行的。
    if (Thread.currentThread().isVirtual()) {
        throw new WrongThreadException();
    }
    boolean firstRun;
    int initialState = state();
    // 如果虚拟线程的状态处于刚启动,那么就把他改为执行中,并且标记为首次执行
    if (initialState == STARTED && compareAndSetState(STARTED, RUNNING)) {
        // first run
        firstRun = true;
    } else if (initialState == RUNNABLE && compareAndSetState(RUNNABLE, RUNNING)) {
        // 否则,说明这个线程已经处于执行状态了,那么就设置park的许可,并标记为非首次执行
        setParkPermit(false);
        firstRun = false;
    } else {
        // not runnable
        return;
    }

    // notify JVMTI before mount
    if (notifyJvmtiEvents) notifyJvmtiMountBegin(firstRun);

    try {
    	// 执行Continuation的run函数。
        cont.run();
    } finally {
        // 如果执行完毕了,就做后续的清理工作。
        if (cont.isDone()) {
            afterTerminate(/*executed*/ true);
        } else {
        	// 倘若没有执行完成,说明某个地方调用了Continuation.yield()函数,或者pin到运载线程中进行了park操作
        	// 例如LockSupport的park操作
            afterYield();
        }
    }
}

总结下就是:(注意,这里都是平台线程来完成)

  1. 虚拟线程VirtualThreadContinuation的基础上,封装了下run函数。主要做了虚拟线程状态state的维护工作。刚启动的时候改为STARTED
  2. 以及虚拟线程发生park的时候(阻塞挂起),调用afterYield()函数,也是做了状态的维护的动作。
  3. 而真正的run/yield逻辑则交给底层的Continuation来实现。

2.2 VirtualThread 的首次执行

在经历了VirtualThread的一层封装之后(维护了虚拟线程的状态变化),最后会调用实际的Continuation对象的run函数。


1.当首次执行Continuation.run函数的时候,会先执行VirtualThread.run方法。主要目的就是将当前的虚拟线程装载到载体线程上。

↓↓↓↓↓↓↓↓↓↓  VirtualThread.run()  ↓↓↓↓↓↓↓↓↓↓
@ChangesCurrentThread
private void run(Runnable task) {
    assert state == RUNNING;
    boolean notifyJvmti = notifyJvmtiEvents;

    // first mount
    mount();
    if (notifyJvmti) notifyJvmtiMountEnd(true);

    // emit JFR event if enabled
    if (VirtualThreadStartEvent.isTurnedOn()) {
        var event = new VirtualThreadStartEvent();
        event.javaThreadId = threadId();
        event.commit();
    }

    try {
        task.run();
    } catch (Throwable exc) {
        dispatchUncaughtException(exc);
    } finally {
        try {
            // pop any remaining scopes from the stack, this may block
            StackableScope.popAll();
            // emit JFR event if enabled
            if (VirtualThreadEndEvent.isTurnedOn()) {
                var event = new VirtualThreadEndEvent();
                event.javaThreadId = threadId();
                event.commit();
            }

        } finally {
            // last unmount
            if (notifyJvmti) notifyJvmtiUnmountBegin(true);
            unmount();

            // final state
            setState(TERMINATED);
        }
    }
}

这段代码看起来比较长,但是再把它精简一点,核心的三个步骤就是:

mount();
try{
	task.run();
}finally{
	unmount();
}
  1. VirtualThread装载到CarrierThread上。
  2. 调用真正的Task任务,本文的案例就是sendHttpTask
  3. 从载体上卸载这个虚拟线程。返回时,当前线程就是当前平台。

2.本文的案例中,sendHttpTask这个任务存在IO阻塞。而Loom会重写所有可能的同步阻塞。一旦出现阻塞点,最终就会调用VirtualThread.park()方法。这里是我的调用链:
在这里插入图片描述

VirtualThread.park()方法做了啥:

@Override
void park() {
    assert Thread.currentThread() == this;

    // complete immediately if parking permit available or interrupted
    if (getAndSetParkPermit(false) || interrupted)
        return;

    // park the thread
    setState(PARKING);
    try {
        if (!yieldContinuation()) {
            // park on the carrier thread when pinned
            parkOnCarrierThread(false, 0);
        }
    } finally {
        assert (Thread.currentThread() == this) && (state() == RUNNING);
    }
}

核心做了两件事情:

  1. 把虚拟线程状态置为PARKING
  2. 调用yieldContinuation函数,也是VirtualThread里面对原生Continuation.yield()函数的一层封装。
@ChangesCurrentThread
private boolean yieldContinuation() {
    boolean notifyJvmti = notifyJvmtiEvents;

    // unmount
    if (notifyJvmti) notifyJvmtiUnmountBegin(false);
    unmount();
    try {
        // 利用Continuation的yield函数,停止继续执行,等待外部调用Continuation.run之后恢复执行
        return Continuation.yield(VTHREAD_SCOPE);
    } finally {
        // 倘若执行到这里,说明外部已经调用了Continuation.run函数,此时重新将虚拟线程进行挂载,利用虚拟线程执行后续代码
        mount();
        if (notifyJvmti) notifyJvmtiMountEnd(false);
    }
}
  1. yieldContinuation()方法中,先进行unmount卸载。那么运载线程此刻就可以不再执行虚拟线程的任务了,就可以干别的事情了。
  2. 然后执行Continuationyield函数,实现真正的“暂停”。等待外部调用Continuation.run之后恢复执行。

这里可以看出来,此时虚拟线程已经进入了阻塞状态。由于运载线程已经和虚拟线程解除了绑定,因此运载线程可以做自己想做的事情。所以并没有“真正的阻塞”(真正的阻塞指OS Thread的阻塞,而运载线程是绑定了OS Thread的,而VirtualThread是不会直接绑定OS Thread的,它依靠运载线程执行代码)

2.3 结束阻塞再次调度

当网络的IO请求处理完毕之后,就会调用VirtualThread.unpark方法:

@Override
@ChangesCurrentThread
void unpark() {
    Thread currentThread = Thread.currentThread();
    if (!getAndSetParkPermit(true) && currentThread != this) {
        int s = state();
        if (s == PARKED && compareAndSetState(PARKED, RUNNABLE)) {
            if (currentThread instanceof VirtualThread vthread) {
                Thread carrier = vthread.carrierThread;
                carrier.setCurrentThread(carrier);
                try {
                    submitRunContinuation();
                } finally {
                    carrier.setCurrentThread(vthread);
                }
            } else {
                submitRunContinuation();
            }
        } else if (s == PINNED) {
            // unpark carrier thread when pinned.
            synchronized (carrierThreadAccessLock()) {
                Thread carrier = carrierThread;
                if (carrier != null && state() == PINNED) {
                    U.unpark(carrier);
                }
            }
        }
    }
}

主要做了两件事情:

  1. 将虚拟线程的状态置为RUNNABLE
  2. 再次调用submitRunContinuation函数(这里就和第一次执行时候的逻辑大致一样了),将任务交给线程池来调度。同样地submitRunContinuation函数最终把任务交给runContinuation

只不过执行runContinuation函数的时候,走的不再是第一个分支了,如图:
在这里插入图片描述

然后开始调用cont.run(),借助Continuation来完成任务栈的恢复调用。最终Continuation执行完毕,将虚拟线程的状态置为TERMINATED

2.4 跟着Debug走一遍

1.启动Test,创建了一个虚拟线程(未启动),做一些初始化操作(调度器、VirtualThread自己封装的Continuation等)虚拟线程状态:New
在这里插入图片描述
2.紧接着准备启动虚拟线程了:
在这里插入图片描述
此时是我们第一次启动该虚拟线程,调用栈为:

  1. VirtualThread.start():虚拟线程状态:STARTED
  2. 调用submitRunContinuation–>交给调度器执行runContinuation任务(默认底层ForkJoinPool)。
  3. runContinuation里面,主要是对原生Continuation的一层封装,维护了虚拟线程的状态,此时状态由STARTED改为RUNNING,并标记为第一次执行。
    在这里插入图片描述

3.然后执行Continuation.run();,倘若是第一次执行,则还会调用一遍VirtualThread.run方法。将当前虚拟线程装载到运载线程上。运载完毕后调用真实的task任务:
在这里插入图片描述
调用真实的task任务了
在这里插入图片描述
4.直到执行sendHttpRequest这个任务的时候,发现存在IO阻塞,一旦出现阻塞点,就会调用VirtualThread.park函数:此时状态改为PARKING
在这里插入图片描述
5.紧接着调用yieldContinuation(同Continuation,都是VirtualThread进行的一层封装)
在这里插入图片描述
先进行unmount卸载,解放运载线程,让他可以作别的事情,然后在调用底层的Continuation.yield进行暂停。此时虚拟线程状态为:PARKED

6.当IO结束阻塞之后,调用unpark()函数:如果状态是PARKED,就把他改为RUNNABLE
在这里插入图片描述
最终再次调用了submitRunContinuation函数,根据上面的逻辑,最终走到第二个if分支:
在这里插入图片描述
最终再次借助底层的Continuation.run()完成任务的恢复执行(这一部分debug不出来)

最终虚拟线程任务执行完毕,将状态改为TERMINATED
在这里插入图片描述

下面在debug的时候是看不到的,因为state的维护都是交给JVM来完成的,看注释。
在这里插入图片描述

不仅如此。源码中涉及mount,unmount,notifyJvmtiMount,notifyJvmtiUnmount处,涉及线程 虚线程 的装载与卸载,其之前或之后的代码可能无法debug到。

用流程图表示如下:
在这里插入图片描述

最后,关于虚拟线程使用还有一定的局限性,如果我们代码块包含了一些synchronized关键字,虚拟线程就无法在阻塞操作期间卸载,因为它被固定到其执行线程上。不仅如此,还存在着ThreadLocal的使用相关问题,这类问题会另外写一篇文章去总结分享。

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.coloradmin.cn/o/832901.html

如若内容造成侵权/违法违规/事实不符,请联系多彩编程网进行投诉反馈,一经查实,立即删除!

相关文章

Kubernetes高可用集群二进制部署(二)ETCD集群部署

Kubernetes概述 使用kubeadm快速部署一个k8s集群 Kubernetes高可用集群二进制部署&#xff08;一&#xff09;主机准备和负载均衡器安装 Kubernetes高可用集群二进制部署&#xff08;二&#xff09;ETCD集群部署 Kubernetes高可用集群二进制部署&#xff08;三&#xff09;部署…

problem(2):快速访问Github

访问GitHub慢&#xff0c;这是所有程序员都遇到的问题&#xff0c;今天给大家推荐一款软件&#xff0c;让我们浏览GitHub和浏览gitee一样快&#xff0c;这个开源软件就是FastGithub。 github加速神器&#xff0c;解决github打不开、用户头像无法加载、releases无法上传下载、g…

【MATLAB第64期】基于MATLAB的无目标函数SOBOL等全局敏感性分析法模型合集(SOBOL,PAWN,GSA,GSUA,GSAT等) 【更新中】

【MATLAB第64期】基于MATLAB的无目标函数SOBOL等全局敏感性分析法模型合集(SOBOL,PAWN,GSA,GSUA,GSAT等) 【更新中】 引言 在前面几期&#xff0c;介绍了局部敏感性分析法&#xff0c;本期来介绍全局敏感性分析模型&#xff0c;因还在摸索中&#xff0c;所以更新较慢&#xf…

复现原型链污染

目录 原型链污染是什么 例1 复现 例2 复现 原型链污染是什么 第一章中说到&#xff0c;foo.__proto__指向的是Foo类的prototype。那么&#xff0c;如果我们修改了foo.__proto__中的值&#xff0c;是不是就可以修改Foo类呢&#xff1f; 做个简单的实验&#xff1a; // foo是一个…

【Linux】揭秘:提升dd命令效率的秘密武器!

红帽RHCE试听课程&#xff1a;如何快速实现对服务器密码爆破&#xff1f;https://mp.weixin.qq.com/s/JUpf8G86jvnNwvKLUfWcLQ 红帽RHCE试听课程&#xff1a;linux系统下&#xff0c;用这个命令可以提高60%的工作效率https://mp.weixin.qq.com/s/pZVjMI1PLJzrA8hoPzkgMA 大家好…

LNMP及论坛搭建(第一个访问,单节点)

LNMP&#xff1a;目前成熟的一个企业网站的应用模式之一&#xff0c;指的是一套协同工作的系统和相关软件 能够提供静态页面服务&#xff0c;也可以提供动态web服务&#xff0c;LNMP是缩写 L&#xff1a;指的是Linux操作系统。 N&#xff1a;指的是nginx&#xff0c;nginx提…

MS17-010永恒之蓝漏洞复现

一&#xff0c;认识永恒之蓝 1&#xff0c;简介 永恒之蓝&#xff0c;代号MS17-010。爆发于2017年&#xff0c;其通过控制用户主机&#xff0c;利用SMB协议的漏洞来获取系统的最高权限&#xff0c;进而可以窃取信息&#xff0c;偷窥隐私&#xff0c;甚至使系统瘫痪。曾爆发覆盖…

性能测试遇到问题怎么办?学会分析流程就不怕!

一、内存溢出 1、堆内存溢出 现象&#xff1a; &#xff08;1&#xff09;压测执行一段时间后&#xff0c;系统处理能力下降。这时用JConsole、JVisualVM等工具连上服务器查看GC情况&#xff0c;每次GC回收都不彻底并且可用堆内存越来越少。 &#xff08;2&#xff09;压测持续…

数据库数据恢复-Oracle数据库文件出现坏块的数据恢复案例

Oracle数据库故障&初检&分析&#xff1a; 打开Oracle数据库时报错&#xff0c;报错信息&#xff1a;“system01.dbf需要更多的恢复来保持一致性&#xff0c;数据库无法打开”。用户急需恢复zxfg用户下的数据。 出现上述报错的可能原因包括&#xff1a;控制文件损坏、数…

【零基础学Rust | 基础系列 | 数据结构】元组,数组,向量,字符串,结构体

文章标题 简介&#xff1a;一&#xff0c;元组&#xff1a;1&#xff0c;定义元组&#xff1a;2&#xff0c;访问元组元素&#xff1a;3&#xff0c;元组解构&#xff1a;4&#xff0c;元组在函数中的应用&#xff1a; 二&#xff0c;数组&#xff1a;1&#xff0c;数组的声明和…

核心交换机新增了一个网段,现在下面PC可以获取地址访问内网 ,访问外网说DNS有问题不通

环境: SANGFOR AF 8.0.75 SANGFOR AC 13.0.47 H3C S6520-26Q-SI 问题描述: 1.在核心交换机上新规划了一个网段192.168.200.0/24,现在下面PC可以正常获取IP地址和DNS,正常访问内网服务和其它地址段IP ,访问外网说DNS有问题不通打不开网页 2.DNS解析失败,ping dns服务…

C++初阶 - 7.STL简介

目录 1.什么是STL 2.STL的版本 3.STL的六大组件 4.STL的重要性 5.如何学习STL 6.STL的缺陷 1.什么是STL STL(standard template libiary-标准模板库)&#xff1a;是C标准库的重要组成部分&#xff0c;不仅是一个可复用的组件库&#xff0c;而且是一个包罗数据结构与算法…

Django实现音乐网站 ⑷

使用Python Django框架制作一个音乐网站&#xff0c;在系列文章3的基础上继续开发&#xff0c; 本篇主要是后台歌曲类型表、歌单表模块功能开发。 目录 表结构设计 歌曲类型表结构 歌单表结构 创建表模型 创建表 后台注册表模型 引入表模型 后台自定义 总结 表结构设计…

在线考试系统ssm学生线上答疑问答试卷管理java jsp源代码mysql

本项目为前几天收费帮学妹做的一个项目&#xff0c;Java EE JSP项目&#xff0c;在工作环境中基本使用不到&#xff0c;但是很多学校把这个当作编程入门的项目来做&#xff0c;故分享出本项目供初学者参考。 一、项目描述 在线考试系统ssm 系统有1权限&#xff1a;管理员 二…

Docker Compose构建lnmp

目录 Compose的优点 编排和部署 Compose原理 Compose应用案例 安装docker-ce 阿里云镜像加速器 安装docker-compose docker-compose用法 Yaml简介 验证LNMP环境 Compose的优点 先来了解一下我们平时是怎么样使用docker的&#xff1f;把它进行拆分一下&#xff1a; 1…

低代码已经发展到什么水平了?

在数字化转型的浪潮下&#xff0c;企业和组织迫切需要更快速、高效的应用开发方式来满足日益复杂的业务需求。而低代码开发作为一种创新的开发方式&#xff0c;正在引领着应用开发的新潮流。低代码开发允许开发者以可视化的方式快速构建应用&#xff0c;减少了繁琐的代码编写&a…

微服务——elasticsearch

初识ES——什么是elasticsearch elasticsearch的发展 初识ES——正向索引和倒排索引 初识ES——es与mysql的概念对比 类比到mysql中是表结构约束 概念对比 初始ES——安装es和kibana 1.部署单点es 1.1创建网络 要安装es容器和kibana容器并让他们之间相连&#xff0c;这里…

编辑接口和新增接口的分别调用

在后台管理系统中,有时候会碰到新增接口和编辑接口共用一个弹窗的时候. 一.场景 在点击新增或者编辑的时候都会使用这个窗口,新增直接调用接口进行增加即可&#xff0c;编辑则是打开这个窗口显示当前行的数据,然后调用编辑接口。 二.处理方法 在默认的情况下,这个窗口用来处理…

AOP的实战(统一功能处理模块)

一、用户登录权限效验 用户登录权限的发展从之前每个方法中自己验证用户登录权限&#xff0c;到现在统一的用户登录验证处理&#xff0c;它是一个逐渐完善和逐渐优化的过程。 1.1 最初用户登录验证 我们先来回顾一下最初用户登录验证的实现方法&#xff1a; RestController…

Android入门教程||Android 架构||Android 应用程序组件

Android 架构 Android 操作系统是一个软件组件的栈&#xff0c;在架构图中它大致可以分为五个部分和四个主要层。 Linux内核 在所有层的最底下是 Linux - 包括大约115个补丁的 Linux 3.6。它提供了基本的系统功能&#xff0c;比如进程管理&#xff0c;内存管理&#xff0c;设…