virtualthread 是 java 实现轻量级并发(也称为纤程或协程)的关键。与传统的平台线程(直接映射到操作系统线程)不同,虚拟线程由 jvm 管理和调度,可以在少量平台线程上运行大量的虚拟线程,从而提高应用程序的吞吐量。
流程概要
核心组件:
scheduler (executor): 每个虚拟线程都有一个调度器,它是一个 executor 实例。
这个调度器负责安排虚拟线程在载体线程(通常是平台线程)上执行。
如果没有显式指定,它会使用默认的 forkjoinpool (default_scheduler) 或者继承父虚拟线程的调度器。
- virtualthread.java
// scheduler and continuation private final executor scheduler; private final continuation cont; private final runnable runcontinuation;
在构造函数中,如果 scheduler 为 null,会进行选择:
- virtualthread.java
// ... existing code ...
virtualthread(executor scheduler, string name, int characteristics, runnable task) {
super(name, characteristics, /*bound*/ false);
objects.requirenonnull(task);
// choose scheduler if not specified
if (scheduler == null) {
thread parent = thread.currentthread();
if (parent instanceof virtualthread vparent) {
scheduler = vparent.scheduler;
} else {
scheduler = default_scheduler;
}
}
this.scheduler = scheduler;
this.cont = new vthreadcontinuation(this, task);
this.runcontinuation = this::runcontinuation;
}
// ... existing code ...cont (continuation): 这是虚拟线程的核心。continuation 是 jdk 内部的一个机制,允许代码执行被挂起(yield)和恢复。每个虚拟线程都包装了一个 continuation 实例。
vthreadcontinuation 是一个内部类,它继承自 continuation,并包装了用户提供的 runnable 任务。
- virtualthread.java
// ... existing code ...
/**
* the continuation that a virtual thread executes.
*/
private static class vthreadcontinuation extends continuation {
vthreadcontinuation(virtualthread vthread, runnable task) {
super(vthread_scope, wrap(vthread, task));
}
@override
protected void onpinned(continuation.pinned reason) {
}
private static runnable wrap(virtualthread vthread, runnable task) {
return new runnable() {
@hidden
@jvmtihideevents
public void run() {
vthread.notifyjvmtistart(); // notify jvmti
try {
vthread.run(task);
} finally {
vthread.notifyjvmtiend(); // notify jvmti
}
}
};
}
}
// ... existing code ...runcontinuation (runnable): 这是一个 runnable,其 run() 方法(即 this::runcontinuation 指向的 virtualthread#runcontinuation() 方法)负责实际执行或继续执行虚拟线程的任务。它处理虚拟线程的挂载(mount)到载体线程、运行 continuation、以及卸载(unmount)。
状态管理: virtualthread 内部维护了一系列状态常量(如 new, started, running, parked, terminated 等)和一个 volatile int state 字段来跟踪其生命周期。状态之间的转换是精心设计的,以处理各种场景,如启动、运行、暂停、阻塞和终止。
- virtualthread.java
// ... existing code ... /* * virtual thread state transitions: * * new -> started // thread.start, schedule to run * started -> terminated // failed to start * started -> running // first run * running -> terminated // done * * running -> parking // thread parking with locksupport.park // ... many more states ... * yielded -> running // continue execution after thread.yield */ private static final int new = 0; private static final int started = 1; private static final int running = 2; // runnable-mounted // untimed and timed parking private static final int parking = 3; private static final int parked = 4; // unmounted // ... other state constants ... private static final int terminated = 99; // final state // ... existing code ...
carrierthread (thread): 表示当前承载该虚拟线程执行的平台线程。当虚拟线程被挂起(unmounted)时,它不占用平台线程。当它需要运行时,调度器会将其调度到某个可用的载体线程上执行。
启动流程 (start() 方法):
- virtualthread.java
// ... existing code ...
@override
void start(threadcontainer container) {
if (!compareandsetstate(new, started)) {
throw new illegalthreadstateexception("already started");
}
// bind thread to container
assert threadcontainer() == null;
setthreadcontainer(container);
// start thread
boolean addedtocontainer = false;
boolean started = false;
try {
container.add(this); // may throw
addedtocontainer = true;
// scoped values may be inherited
inheritscopedvaluebindings(container);
// submit task to run thread, using externalsubmit if possible
externalsubmitruncontinuationorthrow();
started = true;
} finally {
if (!started) {
afterdone(addedtocontainer);
}
}
}
@override
public void start() {
start(threadcontainers.root());
}
// ... existing code ...- 当调用虚拟线程的
start()方法时,它首先将状态从new原子地更新为started。 - 然后,它将自身添加到一个线程容器 (
threadcontainer) 中。 - 最关键的一步是调用
externalsubmitruncontinuationorthrow()(或类似的提交方法) 将runcontinuation任务提交给其scheduler执行。
执行与挂起 (runcontinuation() 和 yieldcontinuation()):
- virtualthread.java
// ... existing code ...
@changescurrentthread // allow mount/unmount to be inlined
private void runcontinuation() {
// the carrier must be a platform thread
if (thread.currentthread().isvirtual()) {
throw new wrongthreadexception();
}
// set state to running
int initialstate = state();
if (initialstate == started || initialstate == unparked
|| initialstate == unblocked || initialstate == yielded) {
// newly started or continue after parking/blocking/thread.yield
if (!compareandsetstate(initialstate, running)) {
return;
}
// consume permit when continuing after parking or blocking. if continue
// after a timed-park or timed-wait then the timeout task is cancelled.
if (initialstate == unparked) {
canceltimeouttask();
setparkpermit(false);
} else if (initialstate == unblocked) {
canceltimeouttask();
blockpermit = false;
}
} else {
// not runnable
return;
}
mount();
try {
cont.run();
} finally {
unmount();
if (cont.isdone()) {
afterdone();
} else {
afteryield();
}
}
}
// ... existing code ...- virtualthread.java
// ... existing code ...
@hidden
private boolean yieldcontinuation() {
notifyjvmtiunmount(/*hide*/true);
try {
return continuation.yield(vthread_scope);
} finally {
notifyjvmtimount(/*hide*/false);
}
}
// ... existing code ...runcontinuation():
- 检查当前线程是否为平台线程(载体线程不能是虚拟线程)。
- 原子地更新状态到
running。 - 调用
mount()方法,将虚拟线程“挂载”到当前平台线程上。这包括设置carrierthread字段,并使thread.currentthread()返回该虚拟线程实例。 - 执行
cont.run(),这会实际运行或恢复continuation中包装的用户任务。 - 执行完毕或
continuation挂起后,调用unmount()方法,将虚拟线程从平台线程上“卸载”。这包括清除carrierthread,并使thread.currentthread()返回平台线程本身。 - 根据
continuation是否完成 (isdone()),调用afterdone()或afteryield()。 yieldcontinuation(): 当虚拟线程需要挂起时(例如,执行了locksupport.park(),或者i/o操作会阻塞),它会调用continuation.yield(vthread_scope)。这会导致continuation的执行暂停,runcontinuation()方法中的cont.run()调用会返回。此时,载体平台线程可以被释放去执行其他任务。afteryield(): 当yieldcontinuation()成功挂起后,此方法被调用。它根据虚拟线程挂起前的原因(如parking,yielding,blocking,waiting)更新状态,并可能安排后续操作(如设置定时器唤醒、重新提交到调度器等)。
parking (park() 和 parknanos()):
- virtualthread.java
// ... existing code ...
@override
void park() {
assert thread.currentthread() == this;
// complete immediately if parking permit available or interrupted
if (getandsetparkpermit(false) || interrupted)
return;
// park the thread
boolean yielded = false;
setstate(parking);
try {
yielded = yieldcontinuation();
} catch (outofmemoryerror e) {
// park on carrier
} finally {
assert (thread.currentthread() == this) && (yielded == (state() == running));
if (!yielded) {
assert state() == parking;
setstate(running);
}
}
// park on the carrier thread when pinned
if (!yielded) {
parkoncarrierthread(false, 0);
}
}
// ... existing code ...- 当虚拟线程调用
locksupport.park()或locksupport.parknanos()时,会调用virtualthread内部的park()或parknanos()方法。 - 这些方法会尝试通过
yieldcontinuation()来挂起虚拟线程。 - 如果
yieldcontinuation()成功(即虚拟线程没有被固定在载体线程上),虚拟线程的状态会变为parked或timed_parked,并且载体线程被释放。 - 如果
yieldcontinuation()失败(例如,虚拟线程在 jni 调用中或synchronized块中被固定),它会退而求其次,在当前的载体线程上实际执行u.park()(通过parkoncarrierthread方法),此时载体线程会被阻塞,状态变为pinned或timed_pinned。
调度 (submitruncontinuation 系列方法):
- virtualthread.java
// ... existing code ...
private void submitruncontinuation(executor scheduler, boolean retryonoome) {
boolean done = false;
while (!done) {
try {
// pin the continuation to prevent the virtual thread from unmounting
// when submitting a task. for the default scheduler this ensures that
// the carrier doesn't change when pushing a task. for other schedulers
// it avoids deadlock that could arise due to carriers and virtual
// threads contending for a lock.
if (currentthread().isvirtual()) {
continuation.pin();
try {
scheduler.execute(runcontinuation);
} finally {
continuation.unpin();
}
} else {
scheduler.execute(runcontinuation);
}
done = true;
} catch (rejectedexecutionexception ree) {
submitfailed(ree);
throw ree;
} catch (outofmemoryerror e) {
if (retryonoome) {
u.park(false, 100_000_000); // 100ms
} else {
throw e;
}
}
}
}
// ... existing code ...- 有多种提交
runcontinuation任务到调度器的方法,如submitruncontinuation(),lazysubmitruncontinuation(),externalsubmitruncontinuation()。 - 这些方法处理了向调度器提交任务的细节,包括在特定情况下(如在虚拟线程内部提交任务给调度器)需要固定
continuation(continuation.pin()) 以防止在提交过程中虚拟线程被意外卸载。
总而言之,virtualthread 的实现巧妙地结合了 continuation 机制和 executor 调度框架。continuation 提供了挂起和恢复执行流的能力,使得虚拟线程在阻塞操作时可以释放底层的平台线程。调度器则负责在虚拟线程准备好运行时,将其分配给一个可用的平台线程执行。通过这种方式,少量的平台线程就可以支持大量并发的虚拟线程,大大降低了线程创建和上下文切换的开销。状态管理确保了虚拟线程在各种并发场景下的正确行为。
调度器继承
instanceof 模式匹配语法 (parent instanceof virtualthread vparent)
这是一种 java 的语法特性,称为 模式匹配 (pattern matching) for instanceof。它从 java 14 开始作为预览特性引入,并在 java 16 中正式发布。
在以前的 java 版本中,如果想检查一个对象的类型,并且如果类型匹配则将其转换为该类型并使用,你通常会这样写:
if (parent instanceof virtualthread) {
virtualthread vparent = (virtualthread) parent;
// 然后使用 vparent
scheduler = vparent.scheduler;
}模式匹配 instanceof 简化了这个过程。
if (parent instanceof virtualthread vparent) 这行代码做了两件事:
- 类型检查: 它检查
parent对象是否是virtualthread的一个实例。 - 条件声明和赋值: 如果
parent确实是virtualthread的实例,那么它会声明一个新的局部变量vparent(类型为virtualthread),并将parent自动转换 (cast) 为virtualthread类型后赋值给vparent。这个vparent变量只在if语句块为真(即类型匹配成功)的作用域内有效。
这是一种更简洁、更安全的写法,避免了显式的类型转换和引入额外的变量声明步骤。
这里的“父子关系”并不是指操作系统层面严格的父子进程或线程关系,而是指创建者与被创建者的关系。
thread parent = thread.currentthread();:这行代码获取的是当前正在执行 virtualthread 构造函数的线程。这个线程就是新虚拟线程的“创建者”或“父”线程。
逻辑:
当创建一个新的 virtualthread 时,可以显式地给它传递一个 scheduler。
如果调用者没有提供 scheduler (即 scheduler == null),那么虚拟线程的构造逻辑会尝试确定一个默认的调度器。
这时,它会检查创建这个新虚拟线程的线程 (parent):
- 如果
parent本身也是一个virtualthread(parent instanceof virtualthread vparent):那么新的虚拟线程将继承其创建者虚拟线程的调度器 (scheduler = vparent.scheduler;)。 - 如果
parent是一个平台线程 (platform thread) 或者其他非virtualthread类型: 那么新的虚拟线程将使用默认的调度器 (scheduler = default_scheduler;),这个默认调度器通常是一个forkjoinpool。
这种设计体现了一种“上下文感知”的默认行为。如果你的代码已经在某个特定的虚拟线程(它使用着特定的调度器)中运行,当你从这个虚拟线程中再创建一个新的虚拟线程时,让新的虚拟线程默认使用与创建者相同的调度器通常是合理的。这有助于:
- 资源管理: 如果你为一组相关的任务配置了特定的调度器(例如,具有特定线程池大小或优先级的调度器),那么从这个组内派生的新虚拟线程默认使用相同的调度器可以保持资源使用的一致性。
- 行为一致性: 任务的执行特性(如并发级别)可以更容易地在相关的虚拟线程间保持一致。
简单来说,如果一个虚拟线程a创建了另一个虚拟线程b,并且没有为b指定调度器,那么b就会默认使用a的调度器。如果是一个平台线程创建了虚拟线程b,并且没有为b指定调度器,那么b就会使用全局默认的调度器。
continuation 类深度分析
continuation 类是 java 实现轻量级线程(虚拟线程)的基石。它代表了一种一次性(one-shot)的分界延续(delimited continuation)。简单来说,它封装了一段计算(一个 runnable 任务),这段计算可以被挂起(yield),并在之后从挂起点恢复执行。
核心能力与特性:
封装计算单元:
- 每个
continuation对象都关联一个runnable任务 (this.target) 和一个continuationscope(this.scope)。 continuationscope用于界定yield操作的范围。当调用continuation.yield(scope)时,执行会从当前continuation向上回溯,直到找到匹配该scope的continuation实例,然后挂起。
执行与挂起 (run() 和 yield()):
run(): 这是启动或恢复 continuation 执行的入口点。
- 它首先会尝试“挂载”(mount)
continuation到当前的载体线程(carrier thread)。挂载意味着将continuation的执行上下文(主要是栈帧)与载体线程关联起来。 - 通过
jla.setcontinuation(t, this)将当前continuation设置为载体线程的活动continuation。 - 调用本地方法
enterspecial(this, iscontinue, isvirtualthread)来实际进入或恢复continuation的执行。这个本地方法是 jvm 实现continuation魔法的核心,它处理栈的切换和管理。 enterspecial内部会调用enter(),进而调用target.run()来执行用户代码。- 当
continuation内部调用continuation.yield(scope)时,enterspecial(或其调用的更深层本地代码) 会保存当前执行状态(栈帧等),然后“返回”到run()方法中enterspecial调用的地方。 run()方法的finally块负责“卸载”(unmount)continuation,清理状态,并将载体线程的活动continuation恢复为其父continuation(如果存在)。
yield(continuationscope scope) (静态方法):
- 这是
continuation主动让出执行权的方式。 - 它会找到当前线程上与指定
scope匹配的最内层continuation。 - 然后调用该
continuation实例的yield0(scope, child)方法(这是一个非静态的内部方法,最终会触发本地代码doyield()),将执行权交还给其父continuation或调度器。 yieldinfo字段用于在yield时传递信息,决定是彻底返回还是在父continuation中继续yield。
栈管理 (stackchunk):
continuation的执行栈不是直接使用平台线程的完整栈,而是由一系列stackchunk对象来管理。当continuation挂起时,它的活动栈帧被保存在这些stackchunk中。恢复时,这些栈帧被重新加载。private stackchunk tail;字段指向continuation栈的当前末端。isempty()方法检查所有stackchunk是否都为空,用于判断continuation是否执行完毕。
状态管理:
done: 标记continuation的runnable是否已经执行完毕。mounted: 一个volatile标志,表示continuation当前是否挂载在某个载体线程上。mount()和unmount()方法以及compareandsetmounted()原子地更新此状态。scopedvaluecache: 用于支持scopedvalue,在continuation挂载和卸载时保存和恢复作用域值缓存。
父子关系 (parent, child):
continuation可以形成一个层级结构。当一个continuation(父) 内部的某个点创建并运行另一个continuation(子) 时,它们之间就建立了父子关系。jla.getcontinuation(currentcarrierthread())用于获取当前载体线程上活动的continuation。run()方法中会处理parent和child的设置,确保在yield和恢复时能正确地在continuation层级间导航。
pinning (固定):
pinned枚举(native,monitor,critical_section,exception)定义了continuation可能被“固定”在载体线程上而无法安全yield的原因。例如,如果continuation的执行进入了本地方法(jni),或者持有一个对象监视器锁(synchronized块),它就可能被固定。vthreadcontinuation(在virtualthread.java中定义) 的onpinned()方法是一个回调,当continuation被固定时会被调用。虚拟线程的实现会根据这个信息决定是阻塞载体线程还是采取其他策略。
与 jvm 的深度集成:
@intrinsiccandidate注解的本地方法如doyield()和enterspecial()表明这些方法的实现是由 jvm 高度优化的,它们直接操纵线程栈和执行状态。jdk.internal.access.javalangaccess(jla) 和jdk.internal.access.sharedsecrets用于在java.base模块内部访问java.lang.thread等类的包私有或私有成员,这是实现continuation与线程状态紧密集成的关键。
continuation 提供的核心能力: 它提供了一种机制,使得一段java代码的执行可以在不阻塞底层平台线程的情况下被暂停,其状态(主要是调用栈)被保存起来,之后可以在相同的或不同的平台线程上从暂停点恢复执行。这是实现用户态线程(如虚拟线程)的基础。
实现多个虚拟线程的 jvm 级别调度还需要什么?
仅仅有 continuation 是不够的,还需要一个完整的框架来管理和调度它们,这正是 java.lang.virtualthread 所做的事情。关键组件包括:
虚拟线程的表示 (virtualthread 类):
- 每个虚拟线程实例内部包装一个
continuation。 - 管理虚拟线程的生命周期状态(
new,started,running,parked,terminated等)。 - 处理中断、加入 (
join) 等线程操作。
调度器 (executor):
virtualthread需要一个调度器(通常是forkjoinpool,如default_scheduler)来执行它的continuation。- 调度器负责将准备好运行的虚拟线程(其
continuation)提交给一个可用的平台线程(载体线程)来执行。 - 当虚拟线程
yield时,它会从载体线程上卸载,载体线程可以被调度器用于执行其他虚拟线程或任务。
阻塞操作的适配:
- 标准库中的阻塞操作(如
locksupport.park(),object.wait(), 大部分同步 i/o 操作)需要被适配,以便在虚拟线程中调用时,它们能够触发continuation.yield()而不是阻塞载体线程。 - 例如,
virtualthread.park()方法会尝试yieldcontinuation()。如果成功,虚拟线程挂起,载体线程释放。如果失败(因为continuation被固定),则会退化为在载体线程上实际park。
与平台线程的交互(挂载/卸载):
virtualthread.mount(): 当虚拟线程的continuation开始在载体线程上运行时,需要将虚拟线程设置为thread.currentthread()的返回值,并记录载体线程。virtualthread.unmount(): 当continuationyield或执行完毕时,需要从载体线程上卸载,恢复thread.currentthread()指向载体线程本身。
固定 (pinning) 的处理:
- 需要有机制检测
continuation何时被固定(例如,在 jni 调用中或持有监视器锁)。 - 当虚拟线程被固定时,如果它执行了阻塞操作,那么载体线程本身可能会被阻塞,因为
continuation无法yield。这是虚拟线程的一个重要性能考量点。
线程局部变量和作用域值:
- 需要确保线程局部变量(
threadlocal)对于虚拟线程按预期工作(即每个虚拟线程有自己的副本)。 scopedvalue是一个更现代的替代方案,与虚拟线程和continuation结合得更好。continuation类中的scopedvaluecache字段就是为此服务的。
如果自己设计虚拟线程调度应该怎么做?
这是一个非常复杂的系统工程,深度依赖于 jvm 的底层支持。但从概念上讲,可以设想以下组件:
mycontinuation:
- 核心: 能够保存和恢复执行上下文(调用栈、程序计数器、寄存器)。这部分是最难的,需要 jvm 指令集层面的支持,或者像
libcontext这样的库(但 java 没有直接使用这个)。在 jdk 中,这是通过stackchunk和大量本地代码实现的。 - 接口:
void init(runnable task),boolean resume(),void yield(),boolean isdone()。 - 状态:
initial,suspended,running,done。
myvirtualthread:
- 属性: id, 名称, 状态 (
new,runnable,blocked,terminated), 优先级 (可选)。 - 包含: 一个
mycontinuation实例,一个runnable任务。 - 方法:
start(),interrupt(),join(),getstate()。
myscheduler (调度器):
- 核心: 一个或多个平台线程(称为工作线程或载体线程)。
- 队列: 一个或多个用于存放
myvirtualthread的就绪队列。
调度循环:
- 工作线程从就绪队列中取出一个
myvirtualthread。 - 设置
thread.currentthread()指向这个myvirtualthread(逻辑上的)。 - 调用
myvirtualthread.getcontinuation().resume()。 - 如果
resume()返回是因为yield()被调用: - 根据
yield的原因(例如,等待 i/o,等待锁),将myvirtualthread放入相应的等待结构中,或者如果只是普通的yield,放回就绪队列。 - 如果
resume()返回是因为任务完成 (isdone()为 true): - 更新
myvirtualthread状态为terminated,处理join等待。 - 恢复
thread.currentthread()指向平台工作线程。 - 工作线程继续从队列取下一个任务。
同步原语和 i/o 适配:
锁 (mylock): 当 myvirtualthread 尝试获取一个已被持有的锁时,它不应阻塞平台工作线程。而是:
- 将
myvirtualthread放入该锁的等待队列。 - 调用其
mycontinuation.yield()。 - 当锁被释放时,调度器将等待队列中的一个
myvirtualthread移回就绪队列。
i/o: 对于非阻塞 i/o (nio):
- 发起非阻塞 i/o 操作。
- 将
myvirtualthread和一个回调(当 i/o 完成时调用)注册到selector。 - 调用
mycontinuation.yield()。 - 当
selector检测到 i/o 事件完成时,执行回调,回调将对应的myvirtualthread重新放入调度器的就绪队列。
pinning 处理:
- 需要一种方法来标记代码段(如 jni 调用,
synchronized块)是不可yield的。 - 如果一个
myvirtualthread在这种不可yield的代码段中尝试执行一个会yield的操作(如获取mylock),调度器可能不得不阻塞当前平台工作线程,或者抛出异常,或者有其他备用策略。
挑战:
- 栈操作: 安全、高效地保存和恢复调用栈是最大的挑战,这需要 jvm 的深度配合。
- 与现有 java 生态的兼容性: 大量现有库依赖于
thread.currentthread()的行为和阻塞原语。 - 调试和监控: 调试跨越多个
continuation片段的代码会更复杂。
jdk 中的 continuation 和 virtualthread 在 jvm 层面解决了这些核心挑战。自己从头设计一个类似的系统将是一项艰巨的任务,但理解其基本原理有助于更好地使用这些高级并发特性。
总结
以上为个人经验,希望能给大家一个参考,也希望大家多多支持代码网。
发表评论