在java 21中,引入了虚拟线程,这是一个非常非常重要的特性,之前一直苦苦寻找的java协程,终于问世了。在高并发以及io密集型的应用中,虚拟线程能极大的提高应用的性能和吞吐量。
什么是虚拟线程
先来看一下虚拟线程的概念。
虚拟线程概念
dk 21 引入了虚拟线程的支持,这是为了改善 java 应用程序在高并发场景下的性能。虚拟线程是一种轻量级线程,具有较小的内存占用,能够更高效地进行上下文切换,适用于 i/o 密集型的应用程序。
虚拟线程的工作原理
当应用程序启动一个虚拟线程时,jvm会将这个虚拟线程交给jvm底层的线程池去执行,这个底层的线程池是一个传统线程池,并且真正执行虚拟线程中任务的线程,也是传统线程(操作系统线程)。当虚拟线程遇到阻塞时,jvm会立刻将虚拟线程挂起,让其它虚拟线程执行。也就是说,开启一个虚拟线程,并不需要启用一个传统线程,一般一个传统线程,可以执行多个虚拟线程的任务。在执行过程中,可以把虚拟线程理解成任务task。
这里举一个列子,假设用户创建了1000个虚拟线程,jvm的执行虚拟线程的线程池线程数是10,那么当第一个虚拟线程v1需要执行时,jvm会将v1调度到传统线程t1上,以此类推,虚拟线程v2会被调度到传统线程t2上,那么v3->t3,v4->t4,… v10->t10。当执行到v11时,这里有三种情况:
如果v1~v10中有任何一个线程遇到阻塞,我们这里假设v3遇到阻塞,那么jvm会将v3挂起,此时t3线程可用,那么v11被t3执行。
如果v1~v10没有线程被阻塞,那么jvm根据划分的时间片,假设每个虚拟线程允许执行100ns,那么过了100ns后,这里v1最新执行,jvm则将v1挂起,让t1去执行v11。
如果以上两种情况都不满足,那么先将v11挂起,等待有可用的传统线程时,再执行v11。
对于被阻塞的线程,如v3,当io结束后,操作系统会通过事件,如epoll通知jvm,v3的io操作已结束,此时jvm重新唤醒v3,选择可用的传统线程,来执行v3的任务。
这里需要注意两点:
虚拟线程io执行完成后,会通过操作系统的事件通知机制,如epoll来通知jvm。这一点对于虚拟线程的高效调度至关重要,因为它确保了 阻塞的 i/o 操作 不会占用操作系统线程的时间片,避免了传统线程池的高资源消耗和效率低下。。
jvm在对虚拟线程进行上下文切换时,因为不涉及到操作系统级别的线程上下文切换,代价非常低,速度也非常快。
虚拟线程的调度
一般来说,程序员不需要对虚拟线程的调度进行管理,在jdk 21中,jvm默认启用了虚拟线程,并且会使用默认的forkjoinpool线程池来执行虚拟线程,并且线程池的大小,也会根据虚拟线程的数量,进行动态调整。如果需要手动管理执行虚拟线程的线程池大小,那么需要自定义线程池,并将虚拟线程交给自定义的线程池来执行,这样虽然可行,通常没有必要。
虚拟线程与传统线程区别
虚拟线程与传统线程的区别主要在于:
创建虚拟线程时,jvm不会创建一个操作系统线程,创建一个传统线程时,jvm会创建一个操作系统线程。一个传统线程,可以轮询执行多个虚拟线程。
虚拟线程是由传统线程来执行的,虚拟线程的调度由jvm控制,传统线程的执行和调度,由操作系统来控制。
虚拟线程的上下文切换是由jvm控制的,因为不涉及到操作系统级别线程的上下文切换,虚拟线程上下文切换速度非常快,可以满足高并发需求。
创建一个虚拟线程占用的内存非常小,相对而言,创建一个传统线程,占用的内存空间大。在应用中,可以创建大量的虚拟线程,一般支持到百万级,而创建传统线程,一般只能到几千,我们一般也不建议创建这么多传统线程。
虚拟线程类似于task,传统系统与操作系统线程对应,一个传统线程可以执行多个虚拟线程。虚拟线程与task的区别是,当传统线程执行虚拟线程时,遇到阻塞会挂起虚拟线程,当传统线程执行task时,遇到阻塞就真的阻塞了。当然传统中的task继承自runnable,虚拟线程继承自thread,他们属于不同的类,可调用的方法也不一样。
jdk也提供了虚拟线程池,可以通过下面方式得到一个虚拟线程池。
import java.util.concurrent.*;
public class virtualthreadpoolexample {
public static void main(string[] args) {
// 创建一个虚拟线程池
executorservice executor = executors.newvirtualthreadpertaskexecutor();
// 提交多个任务到线程池
for (int i = 0; i < 10; i++) {
final int taskid = i;
executor.submit(() -> {
system.out.println("task " + taskid + " running in " + thread.currentthread());
});
}
// 关闭线程池
executor.shutdown();
}
}上面代码中,提交给线程池的任务,jvm都会为其创建一个虚拟线程,然后以虚拟线程的方式执行。
与传统的线程池相比,虚拟线程池无法设置核心线程数、最大线程数、线程池大小、任务队列等参数,也不需要设置这些参数。
虚拟线程与传统线程的相同之处:
他们都继承自thread,用法一摸一样。也都支持线程池。
与传统一样,虚拟线程也有new,runnable,waiting,blocked,terminated等状态。
所有的锁,同步机制,对虚拟线程都适用,并且与传统线程一样,虚拟线程也会有资源争夺以及状态同步问题。并且也有上下文切换,虽然虚拟线程的上下文切换,代价非常小。
异常处理机制一样,如果遇到异常不处理,虚拟线程也会终止执行。
虚拟线程与协程的区别
协程是python中的异步编程技术,对于io密集型应用,协程可以发挥很大的优势。协程的异步工作原理与虚拟线程相似,也是遇到io就阻塞,让主线程继续执行其它任务,当io完成时,操作系统通过事件机制,如epoll,通知python进程,产生一个事件,放到event loop队列中,最后由主线程执行。
虚拟线程与协程的主要区别在于:
| 区别 | 虚拟线程 | 协程 |
|---|---|---|
| 并发/并行 | 虚拟线程是并行的,多个虚拟线程可以同时在多个cpu上运行,同一时刻,可以运行多个虚拟线程。从这个角度将,虚拟线程能支持更高的并发。 | 协程不是并行的,因为只有一个主线程执行任务事件,同一时刻,只有一个任务被处理。 |
| 资源争夺 | 虚拟线程中,存在资源争夺问题,以及状态同步问题,在编写代码时,需要考虑并发控制。甚至需要做合理的并发设计。 | 因为只有一个主线程在执行任务事件,没有并发问题,编程时也不需要考虑并发问题。 |
| 框架支持 | 虚拟线程是jdk 21的新特性,不需要任何框架支持。 | 需要框架支持,写异步代码和同步代码,使用的是两个完全不同的框架,另外学习异步编程,增加了学习成本。并且异步编程有些难度,debug也变得复杂些。 |
怎样使用虚拟线程
在jdk 21中,使用虚拟线程有两种方式:
- 直接创建并启动虚拟线程。
public class virtualthreadexample {
public static void main(string[] args) {
thread virtualthread = thread.ofvirtual().start(() -> {
system.out.println("hello virtual thread ");
});
try {
virtualthread.join(); // 等待虚拟线程完成
} catch (interruptedexception e) {
e.printstacktrace();
}
}
}- 通过线程池执行虚拟线程。
import java.util.concurrent.*;
public class virtualthreadpoolexample {
public static void main(string[] args) {
// 创建一个虚拟线程池
executorservice executor = executors.newvirtualthreadpertaskexecutor();
// 提交多个任务到线程池
for (int i = 0; i < 10; i++) {
final int taskid = i;
executor.submit(() -> {
system.out.println("task " + taskid + " running in " + thread.currentthread());
});
}
// 关闭线程池
executor.shutdown();
}
}通过线程池执行任务时,无法对并发实现控制,容易造成oom,或耗尽服务方资源,可以自定义以下虚拟线程池,实现资源控制:
package com.zengbiaobiao.demo.vitrualthreaddemo;
import org.springframework.lang.nonnull;
import java.util.arraylist;
import java.util.list;
import java.util.concurrent.*;
/*****
* 虚拟线程池,支持配置任务队列数和最大并发任务数
*/
public class virtualthreadexecutorservice extends abstractexecutorservice {
private volatile boolean shouldstop = false;
private final executorservice executor = executors.newvirtualthreadpertaskexecutor();
private final semaphore semaphore;
private final blockingqueue<runnable> taskqueue;
/******
* 构造函数
* @param taskqueuesize,任务队列大小,任务队列是一个阻塞队列,如果任务队列满了,那么调用execute方法会阻塞
* @param concurrencysize,并发任务大小,同时执行的io任务个数,防止并发过重,或者资源不够
*/
public virtualthreadexecutorservice(int taskqueuesize, int concurrencysize) {
this.semaphore = new semaphore(concurrencysize);
taskqueue = new linkedblockingqueue<>(taskqueuesize);
this.loopevent();
}
private void loopevent() {
thread.ofvirtual().name("virtualthreadexecutor").start(() -> {
while (!shouldstop) {
try {
runnable task = taskqueue.take();
semaphore.acquire();
executor.execute(() -> {
try {
try {
task.run();
} finally {
semaphore.release();
}
} catch (exception e) {
thread.currentthread().interrupt();
throw new runtimeexception(e);
}
});
} catch (interruptedexception e) {
thread.currentthread().interrupt();
if (shouldstop) break;
}
}
});
}
@override
public void shutdown() {
shouldstop = true;
executor.shutdown();
}
/**
* @return the task not executed
*/
@override
public list<runnable> shutdownnow() {
shouldstop = true;
list<runnable> remainingtasks = new arraylist<>(taskqueue);
taskqueue.clear();
executor.shutdownnow();
return remainingtasks;
}
@override
public boolean isshutdown() {
return shouldstop;
}
@override
public boolean isterminated() {
return shouldstop && executor.isterminated();
}
@override
public boolean awaittermination(long timeout, timeunit unit) throws interruptedexception {
return executor.awaittermination(timeout, unit);
}
@override
public void execute(runnable command) {
try {
taskqueue.put(command); // 阻塞直到队列有空间
} catch (interruptedexception e) {
thread.currentthread().interrupt();
throw new rejectedexecutionexception("task submission interrupted.", e);
}
}
}测试代码如下:
package com.zengbiaobiao.demo.vitrualthreaddemo;
import org.apache.tomcat.util.threads.virtualthreadexecutor;
public class virtualthreadexecutorservicedemo {
public static void main(string[] args) throws interruptedexception {
virtualthreadexecutorservice executorservice = new virtualthreadexecutorservice(10, 2);
for (int i = 0; i < 100000; i++) {
final string threadname = "thread-" + i;
system.out.println(thread.currentthread() + ": try to create task " + threadname);
executorservice.submit(() -> {
system.out.println(thread.currentthread() + ": " + threadname + " created!");
try {
thread.sleep(2000);
} catch (interruptedexception e) {
throw new runtimeexception(e);
}
system.out.println(thread.currentthread() + ": " + threadname + " finished!");
});
}
thread.sleep(5000000);
}
}哪些场景下可以应用虚拟线程
虚拟线程在io密集型的高并发应用中能发挥出巨大的威力,在所有io密集型应用中,具体来说,下列场景中,使用虚拟线程是比较合适的:
短时间需要完成的任务,且没有资源争夺或乱序问题,比如数据库写入,服务器 http 请求处理,远程 restful api 调用,rabbitmq 消息处理等应用场景。。
长时间运行的任务,但是对消息处理由顺序要求的任务。比如在电梯监控系统中,需要对每台电梯的数据进行处理,但是需要保证消息被处理的顺序。这时可以为每台电梯创建一个虚拟线程,这台电梯的数据交给专门的虚拟线程处理。因为应用中可以创建大量虚拟线程,并且虚拟线程一般都是异步处理任务,所以这个场景中,使用虚拟线程,可以满足高性能和高并发的要求。
api网关中,对多个上游api数据进行查询,组装合并,使用虚拟线程,相比传统线程,效果更佳。虚拟线程,也支持countdownlatch,semaphore等工具类。
事件驱动的架构中,使用虚拟线程,效果也很好。比如spring boot中的异步事件,默认使用的是传统线程池,如果将其改成虚拟线程池,并发处理能力可以极大提高。
那么哪些场景下不合适使用虚拟线程呢?
cpu密集型应用,比如大数据处理、图像处理、矩阵运算等。
如果应用有很高的并发资源争夺,或者状态同步,并且造成系统吞吐量低,需要考虑优化并发模型,这种场景下,不但传统线程不合适,虚拟线程也不合适。
虚拟线程实际应用场景举例
在一个spring boot项目中,有时候因为异步事件处理不过来,造成吞吐量下降,在jdk 21中,可以将事件改成虚拟线程来执行,代码如下:
package com.zengbiaobiao.demo.vitrualthreaddemo;
import org.springframework.context.annotation.bean;
import org.springframework.context.annotation.configuration;
import org.springframework.scheduling.annotation.enableasync;
import java.util.concurrent.executor;
import java.util.concurrent.executorservice;
import java.util.concurrent.executors;
import java.util.concurrent.semaphore;
@configuration
@enableasync
public class asyncconfig {
@bean(name = "taskexecutor")
public executor taskexecutor() {
// 最大并行任务数
semaphore semaphore = new semaphore(100);
executorservice virtualthreadpool = executors.newvirtualthreadpertaskexecutor();
return runnable -> {
try {
// 控制并行任务数
semaphore.acquire();
virtualthreadpool.submit(() -> {
try {
runnable.run();
} finally {
semaphore.release();
}
});
} catch (interruptedexception e) {
thread.currentthread().interrupt();
throw new runtimeexception("task submission interrupted", e);
}
};
}
}事件发送和处理代码如下:
package com.zengbiaobiao.demo.vitrualthreaddemo;
import org.springframework.context.applicationeventpublisher;
import org.springframework.context.event.eventlistener;
import org.springframework.scheduling.annotation.async;
import org.springframework.web.bind.annotation.getmapping;
import org.springframework.web.bind.annotation.requestmapping;
import org.springframework.web.bind.annotation.restcontroller;
@restcontroller
@requestmapping("/home")
public class homecontroller {
private final applicationeventpublisher eventpublisher;
public homecontroller(applicationeventpublisher eventpublisher) {
this.eventpublisher = eventpublisher;
}
@getmapping("/index")
public string index() {
for (int i = 0; i < 1000; i++) {
eventpublisher.publishevent("event " + i);
}
return "success";
}
@eventlistener
@async
public void handleevent(string event) {
system.out.println(thread.currentthread() + ": " + event);
try {
thread.sleep(100);
} catch (interruptedexception e) {
throw new runtimeexception(e);
}
}
}输出结果如下:
virtualthread[#2031]/runnable@forkjoinpool-1-worker-4: event 976
virtualthread[#2039]/runnable@forkjoinpool-1-worker-1: event 980
virtualthread[#1064]/runnable@forkjoinpool-1-worker-1: event 983
virtualthread[#2047]/runnable@forkjoinpool-1-worker-2: event 984
virtualthread[#2049]/runnable@forkjoinpool-1-worker-9: event 985
virtualthread[#2057]/runnable@forkjoinpool-1-worker-2: event 989
virtualthread[#2059]/runnable@forkjoinpool-1-worker-3: event 990
virtualthread[#2061]/runnable@forkjoinpool-1-worker-6: event 991
virtualthread[#2063]/runnable@forkjoinpool-1-worker-10: event 992
virtualthread[#2065]/runnable@forkjoinpool-1-worker-10: event 993
virtualthread[#2071]/runnable@forkjoinpool-1-worker-3: event 996
virtualthread[#2069]/runnable@forkjoinpool-1-worker-2: event 995
virtualthread[#2075]/runnable@forkjoinpool-1-worker-7: event 998
virtualthread[#2077]/runnable@forkjoinpool-1-worker-10: event 999
上面输出结果中,每次并发执行100个任务,当虚拟线程池任务达到100之后,执行eventpublisher.publishevent("event " + i)代码时,代码阻塞,过100ms之后,100个任务执行完成,下一批任务被执行。
虚拟线程使用注意事项
搞清楚任务类型,是io密集型,还是cpu密集型
与传统线程结合使用
关注性能和资源,使用虚拟线程无法通过线程池等工具控制并发,需要借助semepha,countdownlatch等工具才能限流,如果不限流,容易造成oom,或对目标系统造成巨大流量冲击。
在异步框架中,关注隐藏的传统线程,比如在httpclient的异步请求中,每次异步请求都会创建一个httpclient回调线程。大量的传统线程被间接创建,也容易引起oom。
由synchronized关键字引起的pinned问题,看起来在jdk 21中,做了一些优化,即便虚拟线程pinned到传统线程,也只是性能退回到传统线程,无非是慢一点,反而不是太大问题。经过大量测试,发现基本只出现一次,之后不会再出现。不过使用reentrantlock,效果确实会好很多,将synchronized关键字改成lock.()和lock.unlock(),forkjoinpool中的线程数量会降低,并且任务分配均衡。
不要忽略软件设计,尤其在需要大量同步的应用中。
经过验证,虚拟线程在遇到io时,确实会让步,并且不消耗太多资源,核心特点是,让异步编程变得简单,并且不需要框架支持。但是容易因大的并发,造成oom,或者对目标系统造成冲击,追求高并发可用,但一定要做测试和验证。对于需要做状态同步,如需要加锁,或需要使用synchronize关键字的代码,需要优化设计,如果无法规避,那么,使用虚拟线程,和使用线程池,效果差不多。
虚拟线程存在的问题:
java virtual threads — some early gotchas to look out for
two pitfalls by moving to java virtual threads
java 21 virtual threads - dude, where’s my lock?
pitfalls to avoid when switching to virtual threads
do java 21 virtual threads address the main reason to switch to reactive single-thread frameworks?
pinning: a pitfall to avoid when using virtual threads in java
taming the virtual threads: embracing concurrency with pitfall avoidance
pitfalls you encounter with virtual threads
示例代码在gitee上同步
到此这篇关于java 虚拟线程 探索的文章就介绍到这了,更多相关java 虚拟线程内容请搜索代码网以前的文章或继续浏览下面的相关文章希望大家以后多多支持代码网!
发表评论