在springboot中,commandlinerunner
本身并不是一个直接支持异步任务的机制。
commandlinerunner
接口定义了一个在 spring boot 应用程序启动后立即同步执行的方法 run(string... args)
。
这意味着,通过实现 commandlinerunner
接口定义的任务将在主线程中顺序执行,而不会创建新的线程来异步执行这些任务。
然而,如果你希望在 commandlinerunner
中执行异步任务,你可以手动创建线程或使用 spring 的异步执行功能。
以下是一些实现异步任务的方法。
1.概要分析
1.1 手动创建线程
在 commandlinerunner
的 run
方法中,你可以直接创建并启动一个新的线程来执行异步任务。
这种方法简单直接,但需要注意线程管理和异常处理。
@component public class mycommandlinerunner implements commandlinerunner { @override public void run(string... args) throws exception { new thread(() -> { // 异步执行的代码 system.out.println("异步任务执行中..."); // 模拟耗时操作 try { thread.sleep(1000); } catch (interruptedexception e) { thread.currentthread().interrupt(); } system.out.println("异步任务完成"); }).start(); // 主线程继续执行,不会等待异步任务完成 system.out.println("commandlinerunner 执行完毕,主线程继续"); } }
1.2 使用 spring 的异步执行功能
如果你希望利用 spring 的异步支持来执行异步任务,你可以在 commandlinerunner
中注入一个使用 @async
注解的服务。但是,需要注意的是,由于 commandlinerunner
的 run
方法本身是在 spring 容器完全初始化之后同步执行的,因此即使你调用了一个异步服务方法,run
方法本身仍然会立即返回,不会等待异步任务完成。
首先,你需要在 spring boot 应用中启用异步支持,通过在启动类上添加 @enableasync
注解。
然后,你可以创建一个异步服务:
@service public class asyncservice { @async public void executeasynctask() { // 异步执行的代码 system.out.println("异步任务执行中..."); // 模拟耗时操作 try { thread.sleep(1000); } catch (interruptedexception e) { thread.currentthread().interrupt(); } system.out.println("异步任务完成"); } }
在 commandlinerunner
中注入并使用这个服务:
@component public class mycommandlinerunner implements commandlinerunner { @autowired private asyncservice asyncservice; @override public void run(string... args) throws exception { asyncservice.executeasynctask(); // 调用异步方法,但 commandlinerunner 的 run 方法会立即返回 system.out.println("commandlinerunner 执行完毕,主线程继续,不会等待异步任务完成"); } }
虽然 commandlinerunner
本身不支持异步执行,但你可以通过手动创建线程或使用 spring 的异步支持来在 commandlinerunner
中执行异步任务。然而,需要注意的是,commandlinerunner
的 run
方法本身仍然是同步执行的,它不会等待任何异步任务完成。
如果你的应用程序依赖于异步任务的结果,你可能需要采用其他机制(如 future
、completablefuture
或消息队列)来管理异步任务的执行和结果。
2.核心原理分析
org.springframework.boot.commandlinerunner
是 spring boot 框架中的一个核心接口,其原理分析可以从以下几个方面进行。
2.1 接口定义与功能
commandlinerunner
是一个函数式接口(functional interface),它只定义了一个抽象方法 run(string... args)
。
这个方法在 spring boot 应用程序启动完成后被调用,允许开发者执行一些初始化操作或启动后的任务。这些任务可能包括数据初始化、缓存预热、日志记录等。
2.2 执行时机
当 spring boot 应用程序启动时,spring 容器会完成一系列的初始化操作,包括 bean 的加载和依赖注入等。
在所有 spring bean 都初始化完成后,spring boot 会查找所有实现了 commandlinerunner
接口的 bean,并依次调用它们的 run
方法。
这意味着 commandlinerunner
的执行时机是在 spring 上下文准备好之后,但在应用程序对外提供服务之前。
2.3 命令行参数传递
commandlinerunner
的 run
方法接收一个 string... args
参数,这个参数包含了启动应用程序时传递给它的命令行参数。
这使得开发者可以根据命令行参数的不同来执行不同的初始化逻辑。
2.4 实现与注册
要使用 commandlinerunner
,开发者需要创建一个类并实现这个接口,然后重写 run
方法以定义自己的初始化逻辑。
为了让 spring 容器能够扫描到这个实现类并将其注册为一个 bean,通常会在类上添加 @component
或其他类似的注解(如 @service
、@repository
等)。此外,也可以通过编程方式在配置类中显式地注册这个 bean。
2.5 执行顺序
如果应用程序中有多个实现了 commandlinerunner
接口的类,那么它们的 run
方法将按照一定的顺序执行。
默认情况下,执行顺序取决于 spring 容器注册这些 bean 的顺序。但是,开发者可以通过 @order
注解或实现 ordered
接口来指定执行顺序。
@order
注解的值越小,优先级越高,相应的 run
方法就会越早执行。
2.6 与 applicationrunner 的区别
值得注意的是,spring boot 还提供了另一个类似的接口 applicationrunner
,它也用于在应用程序启动后执行初始化任务。与 commandlinerunner
不同的是,applicationrunner
的 run
方法接收一个 applicationarguments
参数而不是 string... args
。
applicationarguments
提供了对命令行参数的更高级别访问,包括选项和非选项参数等。此外,如果同时存在 commandlinerunner
和 applicationrunner
的实现,那么 commandlinerunner
的实现会先于 applicationrunner
的实现被调用。
2.7 应用场景
commandlinerunner
适用于需要在应用程序启动后立即执行的任务场景,如数据初始化、配置加载、缓存预热等。通过使用 commandlinerunner
,开发者可以确保这些任务在应用程序对外提供服务之前完成,从而提高应用程序的性能和用户体验。
综上所述,org.springframework.boot.commandlinerunner
是 spring boot 框架中用于执行启动后任务的强大机制,它通过简单的接口定义和灵活的注册方式,为开发者提供了方便、高效的初始化操作手段。
3.部分源码分析
3.1 启动spring boot应用程序
/** * 启动应用程序。 * * @param args 命令行参数 * @return configurableapplicationcontext 应用程序上下文 */ public configurableapplicationcontext run(string... args) { // 记录应用程序启动时间 long starttime = system.nanotime(); // 创建引导上下文 defaultbootstrapcontext bootstrapcontext = createbootstrapcontext(); configurableapplicationcontext context = null; // 配置无头模式属性 configureheadlessproperty(); // 获取启动监听器 springapplicationrunlisteners listeners = getrunlisteners(args); // 通知监听器应用程序即将启动 listeners.starting(bootstrapcontext, this.mainapplicationclass); try { // 创建并配置应用参数 applicationarguments applicationarguments = new defaultapplicationarguments(args); configurableenvironment environment = prepareenvironment(listeners, bootstrapcontext, applicationarguments); // 配置忽略beaninfo的设置 configureignorebeaninfo(environment); // 打印启动横幅 banner printedbanner = printbanner(environment); // 创建应用程序上下文 context = createapplicationcontext(); // 设置应用启动器 context.setapplicationstartup(this.applicationstartup); // 准备应用程序上下文 preparecontext(bootstrapcontext, context, environment, listeners, applicationarguments, printedbanner); // 刷新上下文,使配置生效 refreshcontext(context); // 启动后配置 afterrefresh(context, applicationarguments); // 计算应用程序启动时间 duration timetakentostartup = duration.ofnanos(system.nanotime() - starttime); // 如果启用了启动信息日志,则记录启动信息 if (this.logstartupinfo) { new startupinfologger(this.mainapplicationclass).logstarted(getapplicationlog(), timetakentostartup); } // 通知监听器应用程序已启动 listeners.started(context, timetakentostartup); // 调用应用程序运行者 callrunners(context, applicationarguments); } catch (throwable ex) { // 处理启动失败 handlerunfailure(context, ex, listeners); throw new illegalstateexception(ex); } try { // 计算应用程序就绪时间 duration timetakentoready = duration.ofnanos(system.nanotime() - starttime); // 通知监听器应用程序已就绪 listeners.ready(context, timetakentoready); } catch (throwable ex) { // 处理就绪失败 handlerunfailure(context, ex, null); throw new illegalstateexception(ex); } // 返回应用程序上下文 return context; }
3.2 调用所有的runner实现类
/** * 调用所有的runner实现类。 * * 本方法的目的是遍历applicationcontext中所有的runner实例,并根据它们的类型分别调用相应的方法。 * runner和commandlinerunner是spring boot提供的一组接口,用于在应用程序启动后执行一些自定义的初始化逻辑。 * 这里通过判断runner的类型,来决定是调用applicationrunner还是commandlinerunner的方法,从而实现对不同类型runner的兼容处理。 * * @param context spring应用上下文,用于获取beanprovider以获取runner实例。 * @param args 命令行参数,传递给每个runner的调用方法。 */ private void callrunners(applicationcontext context, applicationarguments args) { // 通过beanprovider获取所有runner类型的bean,并以有序的方式遍历它们 context.getbeanprovider(runner.class).orderedstream().foreach((runner) -> { // 如果runner是applicationrunner类型,则调用callrunner方法,并传入applicationrunner和命令行参数 if (runner instanceof applicationrunner) { callrunner((applicationrunner) runner, args); } // 如果runner是commandlinerunner类型,则同样调用callrunner方法,并传入commandlinerunner和命令行参数 if (runner instanceof commandlinerunner) { callrunner((commandlinerunner) runner, args); } }); }
4.典型的应用场景分析
seata中的tm(transaction manager,事务管理器)和rm(resource manager,资源管理器)是分布式事务框架中的关键角色,它们各自承担着特定的职责,以确保分布式事务的一致性和完整性。
4.1 tm(transaction manager,事务管理器)
4.1.1 定义与职责
- (1)tm负责定义全局事务的范围,即开始全局事务、提交或回滚全局事务。它是分布式事务的发起者和终结者,类似于本地事务中的begin…commit…rollback操作,但针对的是全局的分布式事务。
- (2)在seata框架中,tm与业务系统集成在一起,作为客户端存在。当业务操作需要跨多个服务或数据库时,tm会启动一个全局事务,并管理这个事务的生命周期。
4.1.2 工作流程
- (1)tm向tc(transaction coordinator,事务协调者)请求开启一个全局事务,tc生成一个全局唯一的事务id(xid)并返回给tm。
- (2)tm携带xid进行远程服务调用,xid在微服务调用链中传播,确保所有参与的分支事务都能被正确关联到全局事务中。
- (3)当业务操作完成后,tm根据业务逻辑向tc发起全局事务的提交或回滚请求。
- (4)tc根据各分支事务的执行结果,决定全局事务的提交或回滚,并通知所有rm进行相应的操作。
4.2 rm(resource manager,资源管理器)
4.2.1 定义与职责
- (1)rm负责管理分支事务处理的资源,如数据库连接、消息队列等。它是分布式事务中具体执行操作的组件。
- (2)rm与tc进行通信,注册分支事务、报告分支事务的状态,并根据tc的指令驱动分支事务的提交或回滚。
- (3)在seata框架中,rm同样与业务系统集成在一起,作为客户端存在。每个参与全局事务的服务或数据库操作,都会有一个对应的rm来管理。
4.2.2 工作流程
- (1)在执行具体业务操作之前,rm会向tc注册分支事务,并将其纳入全局事务的管辖范围。
- (2)rm执行本地事务操作,如数据库更新、消息发送等,并确保这些操作是可回滚和持久化的。
- (3)rm将分支事务的执行结果(提交或回滚)上报给tc。
- (4)当tc收到tm的全局事务提交或回滚请求时,它会根据各分支事务的状态决定全局事务的结果,并通知所有rm进行相应的提交或回滚操作。
4.3 seata rm和tm与seata server之间的rpc通信
在seata中,tm(transaction manager,事务管理器)与seata server(即tc,transaction coordinator,事务协调者)之间的通信是通过rpc(remote procedure call,远程过程调用)实现的。
rpc是一种允许程序在网络上调用远程计算机上程序的技术,就像调用本地计算机上的程序一样。
4.3.1 tm与seata server之间的rpc通信
(1)建立连接
- tm在启动时会尝试与seata server建立长连接。这个连接是通过netty等网络通信框架实现的,netty提供了高效、异步的网络通信能力。
- 在建立连接的过程中,tm会向seata server发送注册请求,包括应用id、事务组名称等信息,以便seata server能够识别和管理该tm。
(2)事务管理
- 一旦连接建立,tm就可以通过rpc调用seata server提供的服务来管理全局事务。
- 例如,当业务操作需要跨多个服务或数据库时,tm会向seata server请求开启一个全局事务,并获取一个全局唯一的事务id(xid)。
- 在执行远程服务调用时,tm会将xid携带在调用中,以便参与的rm能够识别并将分支事务注册到全局事务中。
- 当业务操作完成后,tm会根据业务逻辑向seata server发起全局事务的提交或回滚请求。
(3)通信协议
- seata使用自定义的通信协议来进行rpc通信,该协议支持事务的创建、提交、回滚等操作。
- 通信过程中,seata还实现了心跳检测、超时重试等机制来确保通信的可靠性和稳定性。
(4)性能优化
- 为了提高rpc通信的性能,seata采用了多种优化策略,如使用netty的主从reactor多线程模型来处理并发请求、采用批量发送请求来减少网络开销等。
4.3.2 rm与seata server之间的rpc通信
在seata中,rm负责管理分支事务处理的资源,如数据库连接等。
当rm执行分支事务时,它需要与seata server进行通信,以注册分支事务、报告分支事务的状态,并根据seata server的指令驱动分支事务的提交或回滚。
这种通信是通过rpc机制实现的,它允许rm远程调用seata server上的服务。
(1)建立连接
- 当rm启动时,它会根据配置尝试与seata server建立长连接。这个连接是通过netty等网络通信框架实现的,netty提供了高效、异步的网络通信能力。
- 在建立连接的过程中,rm会向seata server发送注册请求,包括应用id、事务组名称等信息,以便seata server能够识别和管理该rm。
(2)分支事务注册
- 当rm执行一个分支事务时,它会向seata server注册该分支事务。注册过程中,rm会携带全局事务id(xid)等信息,以便seata server能够将该分支事务关联到相应的全局事务中。
- seata server在收到注册请求后,会为该分支事务分配一个唯一的分支事务id,并将其注册到全局事务中。
(3)状态报告与指令执行
- 在分支事务执行过程中,rm会定期向seata server报告分支事务的状态,如正在执行、已提交、已回滚等。
- 当全局事务需要提交或回滚时,seata server会根据各分支事务的状态和结果,向rm发送相应的提交或回滚指令。
- rm在收到指令后,会执行相应的操作,如提交本地事务、回滚本地事务等,并将执行结果报告给seata server。
(4)心跳检测与异常处理
- 为了保持连接的活跃状态,rm会定期向seata server发送心跳消息。
- 如果seata server在一段时间内没有收到rm的心跳消息,它可能会认为rm已经离线,并采取相应的异常处理措施,如重试连接、记录日志等。
seata rm与seata server之间的rpc通信是seata分布式事务框架中的重要组成部分。通过高效的rpc通信机制,rm能够远程调用seata server提供的服务来管理分支事务,确保分布式事务的一致性和完整性。同时,seata还通过多种优化策略来提高rpc通信的性能和可靠性。
4.4 seata server利用springboot commandlinerunner启动服务端通信渠道
在seata server中,serverrunner
类是一个重要的组件,它继承自spring boot的commandlinerunner
接口。
这意味着在spring boot应用启动后,serverrunner
的run()
方法会被自动执行。这种方法通常用于在应用启动后立即执行一段特定的代码,比如初始化资源、启动服务等。
serverrunner
类的主要职责是初始化netty通信渠道,即nettyremotingserver
。
netty是一个高性能的异步事件驱动的网络应用框架,用于快速开发可维护的高性能协议服务器和客户端。
在seata中,netty用于节点间的通信,包括事务协调器(tc)、事务管理器(tm)和资源管理器(rm)之间的通信。
以下是serverrunner
类中run()
方法可能包含的逻辑的一个简单示例:
@override public void run(string... args) throws exception { // 初始化netty通信服务器 nettyremotingserver nettyremotingserver = new nettyremotingserver(); nettyremotingserver.setport(serverport); // 设置服务器端口 nettyremotingserver.sethost(serverhost); // 设置服务器主机地址 nettyremotingserver.start(); // 启动服务器 // 其他初始化逻辑,比如注册服务等 }
在这个例子中,run()
方法首先创建了一个nettyremotingserver
实例,并设置了服务器的主机地址和端口号。然后,它调用start()
方法来启动netty服务器,这样seata server就可以监听来自其他节点的请求了。
总的来说
serverrunner
类在seata server中扮演着重要的角色,它负责初始化netty通信渠道,为seata节点间的通信提供基础设施。
以上为个人经验,希望能给大家一个参考,也希望大家多多支持代码网。
发表评论