virtualthread
是 java 实现轻量级并发(也称为纤程或协程)的关键。与传统的平台线程(直接映射到操作系统线程)不同,虚拟线程由 jvm 管理和调度,可以在少量平台线程上运行大量的虚拟线程,从而提高应用程序的吞吐量。
流程概要
核心组件:
scheduler
(executor): 每个虚拟线程都有一个调度器,它是一个 executor
实例。
这个调度器负责安排虚拟线程在载体线程(通常是平台线程)上执行。
如果没有显式指定,它会使用默认的 forkjoinpool
(default_scheduler
) 或者继承父虚拟线程的调度器。
- virtualthread.java
// scheduler and continuation private final executor scheduler; private final continuation cont; private final runnable runcontinuation;
在构造函数中,如果 scheduler
为 null
,会进行选择:
- virtualthread.java
// ... existing code ... virtualthread(executor scheduler, string name, int characteristics, runnable task) { super(name, characteristics, /*bound*/ false); objects.requirenonnull(task); // choose scheduler if not specified if (scheduler == null) { thread parent = thread.currentthread(); if (parent instanceof virtualthread vparent) { scheduler = vparent.scheduler; } else { scheduler = default_scheduler; } } this.scheduler = scheduler; this.cont = new vthreadcontinuation(this, task); this.runcontinuation = this::runcontinuation; } // ... existing code ...
cont
(continuation): 这是虚拟线程的核心。continuation
是 jdk 内部的一个机制,允许代码执行被挂起(yield)和恢复。每个虚拟线程都包装了一个 continuation
实例。
vthreadcontinuation
是一个内部类,它继承自 continuation
,并包装了用户提供的 runnable
任务。
- virtualthread.java
// ... existing code ... /** * the continuation that a virtual thread executes. */ private static class vthreadcontinuation extends continuation { vthreadcontinuation(virtualthread vthread, runnable task) { super(vthread_scope, wrap(vthread, task)); } @override protected void onpinned(continuation.pinned reason) { } private static runnable wrap(virtualthread vthread, runnable task) { return new runnable() { @hidden @jvmtihideevents public void run() { vthread.notifyjvmtistart(); // notify jvmti try { vthread.run(task); } finally { vthread.notifyjvmtiend(); // notify jvmti } } }; } } // ... existing code ...
runcontinuation
(runnable): 这是一个 runnable
,其 run()
方法(即 this::runcontinuation
指向的 virtualthread#runcontinuation()
方法)负责实际执行或继续执行虚拟线程的任务。它处理虚拟线程的挂载(mount)到载体线程、运行 continuation
、以及卸载(unmount)。
状态管理: virtualthread
内部维护了一系列状态常量(如 new
, started
, running
, parked
, terminated
等)和一个 volatile int state
字段来跟踪其生命周期。状态之间的转换是精心设计的,以处理各种场景,如启动、运行、暂停、阻塞和终止。
- virtualthread.java
// ... existing code ... /* * virtual thread state transitions: * * new -> started // thread.start, schedule to run * started -> terminated // failed to start * started -> running // first run * running -> terminated // done * * running -> parking // thread parking with locksupport.park // ... many more states ... * yielded -> running // continue execution after thread.yield */ private static final int new = 0; private static final int started = 1; private static final int running = 2; // runnable-mounted // untimed and timed parking private static final int parking = 3; private static final int parked = 4; // unmounted // ... other state constants ... private static final int terminated = 99; // final state // ... existing code ...
carrierthread
(thread): 表示当前承载该虚拟线程执行的平台线程。当虚拟线程被挂起(unmounted)时,它不占用平台线程。当它需要运行时,调度器会将其调度到某个可用的载体线程上执行。
启动流程 (start()
方法):
- virtualthread.java
// ... existing code ... @override void start(threadcontainer container) { if (!compareandsetstate(new, started)) { throw new illegalthreadstateexception("already started"); } // bind thread to container assert threadcontainer() == null; setthreadcontainer(container); // start thread boolean addedtocontainer = false; boolean started = false; try { container.add(this); // may throw addedtocontainer = true; // scoped values may be inherited inheritscopedvaluebindings(container); // submit task to run thread, using externalsubmit if possible externalsubmitruncontinuationorthrow(); started = true; } finally { if (!started) { afterdone(addedtocontainer); } } } @override public void start() { start(threadcontainers.root()); } // ... existing code ...
- 当调用虚拟线程的
start()
方法时,它首先将状态从new
原子地更新为started
。 - 然后,它将自身添加到一个线程容器 (
threadcontainer
) 中。 - 最关键的一步是调用
externalsubmitruncontinuationorthrow()
(或类似的提交方法) 将runcontinuation
任务提交给其scheduler
执行。
执行与挂起 (runcontinuation()
和 yieldcontinuation()
):
- virtualthread.java
// ... existing code ... @changescurrentthread // allow mount/unmount to be inlined private void runcontinuation() { // the carrier must be a platform thread if (thread.currentthread().isvirtual()) { throw new wrongthreadexception(); } // set state to running int initialstate = state(); if (initialstate == started || initialstate == unparked || initialstate == unblocked || initialstate == yielded) { // newly started or continue after parking/blocking/thread.yield if (!compareandsetstate(initialstate, running)) { return; } // consume permit when continuing after parking or blocking. if continue // after a timed-park or timed-wait then the timeout task is cancelled. if (initialstate == unparked) { canceltimeouttask(); setparkpermit(false); } else if (initialstate == unblocked) { canceltimeouttask(); blockpermit = false; } } else { // not runnable return; } mount(); try { cont.run(); } finally { unmount(); if (cont.isdone()) { afterdone(); } else { afteryield(); } } } // ... existing code ...
- virtualthread.java
// ... existing code ... @hidden private boolean yieldcontinuation() { notifyjvmtiunmount(/*hide*/true); try { return continuation.yield(vthread_scope); } finally { notifyjvmtimount(/*hide*/false); } } // ... existing code ...
runcontinuation()
:
- 检查当前线程是否为平台线程(载体线程不能是虚拟线程)。
- 原子地更新状态到
running
。 - 调用
mount()
方法,将虚拟线程“挂载”到当前平台线程上。这包括设置carrierthread
字段,并使thread.currentthread()
返回该虚拟线程实例。 - 执行
cont.run()
,这会实际运行或恢复continuation
中包装的用户任务。 - 执行完毕或
continuation
挂起后,调用unmount()
方法,将虚拟线程从平台线程上“卸载”。这包括清除carrierthread
,并使thread.currentthread()
返回平台线程本身。 - 根据
continuation
是否完成 (isdone()
),调用afterdone()
或afteryield()
。 yieldcontinuation()
: 当虚拟线程需要挂起时(例如,执行了locksupport.park()
,或者i/o操作会阻塞),它会调用continuation.yield(vthread_scope)
。这会导致continuation
的执行暂停,runcontinuation()
方法中的cont.run()
调用会返回。此时,载体平台线程可以被释放去执行其他任务。afteryield()
: 当yieldcontinuation()
成功挂起后,此方法被调用。它根据虚拟线程挂起前的原因(如parking
,yielding
,blocking
,waiting
)更新状态,并可能安排后续操作(如设置定时器唤醒、重新提交到调度器等)。
parking (park()
和 parknanos()
):
- virtualthread.java
// ... existing code ... @override void park() { assert thread.currentthread() == this; // complete immediately if parking permit available or interrupted if (getandsetparkpermit(false) || interrupted) return; // park the thread boolean yielded = false; setstate(parking); try { yielded = yieldcontinuation(); } catch (outofmemoryerror e) { // park on carrier } finally { assert (thread.currentthread() == this) && (yielded == (state() == running)); if (!yielded) { assert state() == parking; setstate(running); } } // park on the carrier thread when pinned if (!yielded) { parkoncarrierthread(false, 0); } } // ... existing code ...
- 当虚拟线程调用
locksupport.park()
或locksupport.parknanos()
时,会调用virtualthread
内部的park()
或parknanos()
方法。 - 这些方法会尝试通过
yieldcontinuation()
来挂起虚拟线程。 - 如果
yieldcontinuation()
成功(即虚拟线程没有被固定在载体线程上),虚拟线程的状态会变为parked
或timed_parked
,并且载体线程被释放。 - 如果
yieldcontinuation()
失败(例如,虚拟线程在 jni 调用中或synchronized
块中被固定),它会退而求其次,在当前的载体线程上实际执行u.park()
(通过parkoncarrierthread
方法),此时载体线程会被阻塞,状态变为pinned
或timed_pinned
。
调度 (submitruncontinuation
系列方法):
- virtualthread.java
// ... existing code ... private void submitruncontinuation(executor scheduler, boolean retryonoome) { boolean done = false; while (!done) { try { // pin the continuation to prevent the virtual thread from unmounting // when submitting a task. for the default scheduler this ensures that // the carrier doesn't change when pushing a task. for other schedulers // it avoids deadlock that could arise due to carriers and virtual // threads contending for a lock. if (currentthread().isvirtual()) { continuation.pin(); try { scheduler.execute(runcontinuation); } finally { continuation.unpin(); } } else { scheduler.execute(runcontinuation); } done = true; } catch (rejectedexecutionexception ree) { submitfailed(ree); throw ree; } catch (outofmemoryerror e) { if (retryonoome) { u.park(false, 100_000_000); // 100ms } else { throw e; } } } } // ... existing code ...
- 有多种提交
runcontinuation
任务到调度器的方法,如submitruncontinuation()
,lazysubmitruncontinuation()
,externalsubmitruncontinuation()
。 - 这些方法处理了向调度器提交任务的细节,包括在特定情况下(如在虚拟线程内部提交任务给调度器)需要固定
continuation
(continuation.pin()
) 以防止在提交过程中虚拟线程被意外卸载。
总而言之,virtualthread
的实现巧妙地结合了 continuation
机制和 executor
调度框架。continuation
提供了挂起和恢复执行流的能力,使得虚拟线程在阻塞操作时可以释放底层的平台线程。调度器则负责在虚拟线程准备好运行时,将其分配给一个可用的平台线程执行。通过这种方式,少量的平台线程就可以支持大量并发的虚拟线程,大大降低了线程创建和上下文切换的开销。状态管理确保了虚拟线程在各种并发场景下的正确行为。
调度器继承
instanceof
模式匹配语法 (parent instanceof virtualthread vparent
)
这是一种 java 的语法特性,称为 模式匹配 (pattern matching) for instanceof
。它从 java 14 开始作为预览特性引入,并在 java 16 中正式发布。
在以前的 java 版本中,如果想检查一个对象的类型,并且如果类型匹配则将其转换为该类型并使用,你通常会这样写:
if (parent instanceof virtualthread) { virtualthread vparent = (virtualthread) parent; // 然后使用 vparent scheduler = vparent.scheduler; }
模式匹配 instanceof
简化了这个过程。
if (parent instanceof virtualthread vparent)
这行代码做了两件事:
- 类型检查: 它检查
parent
对象是否是virtualthread
的一个实例。 - 条件声明和赋值: 如果
parent
确实是virtualthread
的实例,那么它会声明一个新的局部变量vparent
(类型为virtualthread
),并将parent
自动转换 (cast) 为virtualthread
类型后赋值给vparent
。这个vparent
变量只在if
语句块为真(即类型匹配成功)的作用域内有效。
这是一种更简洁、更安全的写法,避免了显式的类型转换和引入额外的变量声明步骤。
这里的“父子关系”并不是指操作系统层面严格的父子进程或线程关系,而是指创建者与被创建者的关系。
thread parent = thread.currentthread();
:这行代码获取的是当前正在执行 virtualthread
构造函数的线程。这个线程就是新虚拟线程的“创建者”或“父”线程。
逻辑:
当创建一个新的 virtualthread
时,可以显式地给它传递一个 scheduler
。
如果调用者没有提供 scheduler
(即 scheduler == null
),那么虚拟线程的构造逻辑会尝试确定一个默认的调度器。
这时,它会检查创建这个新虚拟线程的线程 (parent
):
- 如果
parent
本身也是一个virtualthread
(parent instanceof virtualthread vparent
):那么新的虚拟线程将继承其创建者虚拟线程的调度器 (scheduler = vparent.scheduler;
)。 - 如果
parent
是一个平台线程 (platform thread) 或者其他非virtualthread
类型: 那么新的虚拟线程将使用默认的调度器 (scheduler = default_scheduler;
),这个默认调度器通常是一个forkjoinpool
。
这种设计体现了一种“上下文感知”的默认行为。如果你的代码已经在某个特定的虚拟线程(它使用着特定的调度器)中运行,当你从这个虚拟线程中再创建一个新的虚拟线程时,让新的虚拟线程默认使用与创建者相同的调度器通常是合理的。这有助于:
- 资源管理: 如果你为一组相关的任务配置了特定的调度器(例如,具有特定线程池大小或优先级的调度器),那么从这个组内派生的新虚拟线程默认使用相同的调度器可以保持资源使用的一致性。
- 行为一致性: 任务的执行特性(如并发级别)可以更容易地在相关的虚拟线程间保持一致。
简单来说,如果一个虚拟线程a创建了另一个虚拟线程b,并且没有为b指定调度器,那么b就会默认使用a的调度器。如果是一个平台线程创建了虚拟线程b,并且没有为b指定调度器,那么b就会使用全局默认的调度器。
continuation 类深度分析
continuation
类是 java 实现轻量级线程(虚拟线程)的基石。它代表了一种一次性(one-shot)的分界延续(delimited continuation)。简单来说,它封装了一段计算(一个 runnable
任务),这段计算可以被挂起(yield),并在之后从挂起点恢复执行。
核心能力与特性:
封装计算单元:
- 每个
continuation
对象都关联一个runnable
任务 (this.target
) 和一个continuationscope
(this.scope
)。 continuationscope
用于界定yield
操作的范围。当调用continuation.yield(scope)
时,执行会从当前continuation
向上回溯,直到找到匹配该scope
的continuation
实例,然后挂起。
执行与挂起 (run()
和 yield()
):
run()
: 这是启动或恢复 continuation
执行的入口点。
- 它首先会尝试“挂载”(mount)
continuation
到当前的载体线程(carrier thread)。挂载意味着将continuation
的执行上下文(主要是栈帧)与载体线程关联起来。 - 通过
jla.setcontinuation(t, this)
将当前continuation
设置为载体线程的活动continuation
。 - 调用本地方法
enterspecial(this, iscontinue, isvirtualthread)
来实际进入或恢复continuation
的执行。这个本地方法是 jvm 实现continuation
魔法的核心,它处理栈的切换和管理。 enterspecial
内部会调用enter()
,进而调用target.run()
来执行用户代码。- 当
continuation
内部调用continuation.yield(scope)
时,enterspecial
(或其调用的更深层本地代码) 会保存当前执行状态(栈帧等),然后“返回”到run()
方法中enterspecial
调用的地方。 run()
方法的finally
块负责“卸载”(unmount)continuation
,清理状态,并将载体线程的活动continuation
恢复为其父continuation
(如果存在)。
yield(continuationscope scope)
(静态方法):
- 这是
continuation
主动让出执行权的方式。 - 它会找到当前线程上与指定
scope
匹配的最内层continuation
。 - 然后调用该
continuation
实例的yield0(scope, child)
方法(这是一个非静态的内部方法,最终会触发本地代码doyield()
),将执行权交还给其父continuation
或调度器。 yieldinfo
字段用于在yield
时传递信息,决定是彻底返回还是在父continuation
中继续yield
。
栈管理 (stackchunk
):
continuation
的执行栈不是直接使用平台线程的完整栈,而是由一系列stackchunk
对象来管理。当continuation
挂起时,它的活动栈帧被保存在这些stackchunk
中。恢复时,这些栈帧被重新加载。private stackchunk tail;
字段指向continuation
栈的当前末端。isempty()
方法检查所有stackchunk
是否都为空,用于判断continuation
是否执行完毕。
状态管理:
done
: 标记continuation
的runnable
是否已经执行完毕。mounted
: 一个volatile
标志,表示continuation
当前是否挂载在某个载体线程上。mount()
和unmount()
方法以及compareandsetmounted()
原子地更新此状态。scopedvaluecache
: 用于支持scopedvalue
,在continuation
挂载和卸载时保存和恢复作用域值缓存。
父子关系 (parent
, child
):
continuation
可以形成一个层级结构。当一个continuation
(父) 内部的某个点创建并运行另一个continuation
(子) 时,它们之间就建立了父子关系。jla.getcontinuation(currentcarrierthread())
用于获取当前载体线程上活动的continuation
。run()
方法中会处理parent
和child
的设置,确保在yield
和恢复时能正确地在continuation
层级间导航。
pinning (固定):
pinned
枚举(native
,monitor
,critical_section
,exception
)定义了continuation
可能被“固定”在载体线程上而无法安全yield
的原因。例如,如果continuation
的执行进入了本地方法(jni),或者持有一个对象监视器锁(synchronized
块),它就可能被固定。vthreadcontinuation
(在virtualthread.java
中定义) 的onpinned()
方法是一个回调,当continuation
被固定时会被调用。虚拟线程的实现会根据这个信息决定是阻塞载体线程还是采取其他策略。
与 jvm 的深度集成:
@intrinsiccandidate
注解的本地方法如doyield()
和enterspecial()
表明这些方法的实现是由 jvm 高度优化的,它们直接操纵线程栈和执行状态。jdk.internal.access.javalangaccess
(jla) 和jdk.internal.access.sharedsecrets
用于在java.base
模块内部访问java.lang.thread
等类的包私有或私有成员,这是实现continuation
与线程状态紧密集成的关键。
continuation
提供的核心能力: 它提供了一种机制,使得一段java代码的执行可以在不阻塞底层平台线程的情况下被暂停,其状态(主要是调用栈)被保存起来,之后可以在相同的或不同的平台线程上从暂停点恢复执行。这是实现用户态线程(如虚拟线程)的基础。
实现多个虚拟线程的 jvm 级别调度还需要什么?
仅仅有 continuation
是不够的,还需要一个完整的框架来管理和调度它们,这正是 java.lang.virtualthread
所做的事情。关键组件包括:
虚拟线程的表示 (virtualthread
类):
- 每个虚拟线程实例内部包装一个
continuation
。 - 管理虚拟线程的生命周期状态(
new
,started
,running
,parked
,terminated
等)。 - 处理中断、加入 (
join
) 等线程操作。
调度器 (executor
):
virtualthread
需要一个调度器(通常是forkjoinpool
,如default_scheduler
)来执行它的continuation
。- 调度器负责将准备好运行的虚拟线程(其
continuation
)提交给一个可用的平台线程(载体线程)来执行。 - 当虚拟线程
yield
时,它会从载体线程上卸载,载体线程可以被调度器用于执行其他虚拟线程或任务。
阻塞操作的适配:
- 标准库中的阻塞操作(如
locksupport.park()
,object.wait()
, 大部分同步 i/o 操作)需要被适配,以便在虚拟线程中调用时,它们能够触发continuation.yield()
而不是阻塞载体线程。 - 例如,
virtualthread.park()
方法会尝试yieldcontinuation()
。如果成功,虚拟线程挂起,载体线程释放。如果失败(因为continuation
被固定),则会退化为在载体线程上实际park
。
与平台线程的交互(挂载/卸载):
virtualthread.mount()
: 当虚拟线程的continuation
开始在载体线程上运行时,需要将虚拟线程设置为thread.currentthread()
的返回值,并记录载体线程。virtualthread.unmount()
: 当continuation
yield
或执行完毕时,需要从载体线程上卸载,恢复thread.currentthread()
指向载体线程本身。
固定 (pinning) 的处理:
- 需要有机制检测
continuation
何时被固定(例如,在 jni 调用中或持有监视器锁)。 - 当虚拟线程被固定时,如果它执行了阻塞操作,那么载体线程本身可能会被阻塞,因为
continuation
无法yield
。这是虚拟线程的一个重要性能考量点。
线程局部变量和作用域值:
- 需要确保线程局部变量(
threadlocal
)对于虚拟线程按预期工作(即每个虚拟线程有自己的副本)。 scopedvalue
是一个更现代的替代方案,与虚拟线程和continuation
结合得更好。continuation
类中的scopedvaluecache
字段就是为此服务的。
如果自己设计虚拟线程调度应该怎么做?
这是一个非常复杂的系统工程,深度依赖于 jvm 的底层支持。但从概念上讲,可以设想以下组件:
mycontinuation
:
- 核心: 能够保存和恢复执行上下文(调用栈、程序计数器、寄存器)。这部分是最难的,需要 jvm 指令集层面的支持,或者像
libcontext
这样的库(但 java 没有直接使用这个)。在 jdk 中,这是通过stackchunk
和大量本地代码实现的。 - 接口:
void init(runnable task)
,boolean resume()
,void yield()
,boolean isdone()
。 - 状态:
initial
,suspended
,running
,done
。
myvirtualthread
:
- 属性: id, 名称, 状态 (
new
,runnable
,blocked
,terminated
), 优先级 (可选)。 - 包含: 一个
mycontinuation
实例,一个runnable
任务。 - 方法:
start()
,interrupt()
,join()
,getstate()
。
myscheduler
(调度器):
- 核心: 一个或多个平台线程(称为工作线程或载体线程)。
- 队列: 一个或多个用于存放
myvirtualthread
的就绪队列。
调度循环:
- 工作线程从就绪队列中取出一个
myvirtualthread
。 - 设置
thread.currentthread()
指向这个myvirtualthread
(逻辑上的)。 - 调用
myvirtualthread.getcontinuation().resume()
。 - 如果
resume()
返回是因为yield()
被调用: - 根据
yield
的原因(例如,等待 i/o,等待锁),将myvirtualthread
放入相应的等待结构中,或者如果只是普通的yield
,放回就绪队列。 - 如果
resume()
返回是因为任务完成 (isdone()
为 true): - 更新
myvirtualthread
状态为terminated
,处理join
等待。 - 恢复
thread.currentthread()
指向平台工作线程。 - 工作线程继续从队列取下一个任务。
同步原语和 i/o 适配:
锁 (mylock
): 当 myvirtualthread
尝试获取一个已被持有的锁时,它不应阻塞平台工作线程。而是:
- 将
myvirtualthread
放入该锁的等待队列。 - 调用其
mycontinuation.yield()
。 - 当锁被释放时,调度器将等待队列中的一个
myvirtualthread
移回就绪队列。
i/o: 对于非阻塞 i/o (nio):
- 发起非阻塞 i/o 操作。
- 将
myvirtualthread
和一个回调(当 i/o 完成时调用)注册到selector
。 - 调用
mycontinuation.yield()
。 - 当
selector
检测到 i/o 事件完成时,执行回调,回调将对应的myvirtualthread
重新放入调度器的就绪队列。
pinning 处理:
- 需要一种方法来标记代码段(如 jni 调用,
synchronized
块)是不可yield
的。 - 如果一个
myvirtualthread
在这种不可yield
的代码段中尝试执行一个会yield
的操作(如获取mylock
),调度器可能不得不阻塞当前平台工作线程,或者抛出异常,或者有其他备用策略。
挑战:
- 栈操作: 安全、高效地保存和恢复调用栈是最大的挑战,这需要 jvm 的深度配合。
- 与现有 java 生态的兼容性: 大量现有库依赖于
thread.currentthread()
的行为和阻塞原语。 - 调试和监控: 调试跨越多个
continuation
片段的代码会更复杂。
jdk 中的 continuation
和 virtualthread
在 jvm 层面解决了这些核心挑战。自己从头设计一个类似的系统将是一项艰巨的任务,但理解其基本原理有助于更好地使用这些高级并发特性。
总结
以上为个人经验,希望能给大家一个参考,也希望大家多多支持代码网。
发表评论