使用中介者模式轻松实现命令查询职责分离,构建高内聚、低耦合的应用系统
一、知识点回顾
1. 什么是cqrs?
cqrs是command query responsibility segregation的缩写,一般称作命令查询职责分离。从字面意思理解,就是将命令(写入)和查询(读取)的责任划分到不同的模型中。
对比一下常用的 crud 模式(创建-读取-更新-删除),通常我们会让用户界面与负责所有四种操作的数据存储交互。而 cqrs 则将这些操作分成两种模式,一种用于查询(又称 "r"),另一种用于命令(又称 "cud")。

2. cqrs的作用是什么?
cqrs将系统的写操作(命令)和读操作(查询)分离到不同的模型和数据存储中,从而实现读写分离,提高系统的性能、可扩展性和安全性,并使复杂业务逻辑(写端)和高效查询(读端)各自得到优化,降低系统复杂性。它允许为写操作设计严谨的领域模型,为读操作设计简单、只关注查询效率的数据模型(如专用视图或报表数据库),并可通过事件等机制保持最终一致性。
3. cqrs 的优点
- 独立缩放。 cqrs 使读取模型和写入模型能够独立缩放。 此方法可帮助最大程度地减少锁争用并提高负载下的系统性能。
- 优化的数据架构。 读取操作可以使用针对查询进行优化的模式。 写入操作使用针对更新优化的模式。
- 安全性。 通过分隔读取和写入,可以确保只有适当的域实体或操作有权对数据执行写入操作。
- 关注点分离。 分离读取和写入责任会导致更简洁、更易于维护的模型。 写入端通常处理复杂的业务逻辑。 读取端可以保持简单且专注于查询效率。
- 更简单的查询。 在读取数据库中存储具体化视图时,应用程序可以在查询时避免复杂的联接。
二、关于pipelinr
项目地址
项目开发者在github的介绍不多,关键是最后一句话:it's similar to a popular mediatr .net library. 意思就是这个项目是参考着一个叫mediatr的.net库写的。关于mediatr我之前有两篇文章专门介绍过。
pipelinr(包括mediatr)提供了一种cqrs的实现方式,基于中介者模式实现进程内消息传递,用于解耦应用中的各个组件,支持请求/响应(一对一,有返回值)和发布/订阅(一对多,无返回值)两种消息模式。它们在内部提供管道行为 (pipeline behaviors),用于在消息处理前后插入自定义逻辑,如日志、验证、异常处理等。
需要提醒的是,pipelinr并不是一个完整的cqrs框架,它只是一个中介者模式的具体实现方式,将调用方和处理方进行了解耦,而这种模式恰好可以用来在一个单体应用(或者是微服务的服务内部)中实现简单的cqrs。
三、依赖安装和配置
1. maven安装
<dependency> <groupid>net.sizovs</groupid> <artifactid>pipelinr</artifactid> <version>0.11</version> </dependency>
2. gradle安装
dependencies {
compile 'net.sizovs:pipelinr:0.11'
}在spring项目中配置pipelinr
@configuration
public class pipelinrconfiguration {
@bean
pipeline pipeline(objectprovider<command.handler> commandhandlers, objectprovider<notification.handler> notificationhandlers, objectprovider<command.middleware> middlewares) {
return new pipelinr()
.with(commandhandlers::stream)
.with(notificationhandlers::stream)
.with(middlewares::orderedstream);
}
}四、核心组件
pipeline/pipelinr:pipeline是消息和处理器之间的中介者,调用方向pipeline发送消息,pipeline收到消息后通过注册到pipeline的中间件进行层层传递并最终抵达匹配的消息处理器进行处理。pipelinr是pipeline的默认实现。command<r>:用于约定请求/响应模式的消息类型,泛型参数r是返回值的类型,如果不需要返回值,可以将r指定为voidy。notification:用于约定发布/订阅模式的消息类型,没有返回值,消息可以有多个处理器。middleware:管道中间件,command和notification都定义了各自的中间件接口。pipeline接收到的消息,在到达最终的处理器之前,会经过所有注册到pipeline的中间。可以使用middleware实现诸如日志记录、数据验证、开启事务等一系列操作。
五、请求/响应模式实现
请求/响应模式需要用到command接口。
1. 定义command
command代表一个请求,需要实现net.sizovs.pipelinr.command接口。泛型参数指定返回值类型。
// 定义一个创建用户的命令
public class createusercommand implements command<userresponse> {
private string username;
private string email;
public createusercommand(string username, string email) {
this.username = username;
this.email = email;
}
public string getusername() {
return username;
}
public string getemail() {
return email;
}
}
// 返回值类型
public class userresponse {
private long userid;
private string username;
private string email;
public userresponse(long userid, string username, string email) {
this.userid = userid;
this.username = username;
this.email = email;
}
// getters
}2. 定义command handler
创建该command对应的处理器,实现net.sizovs.pipelinr.command.handler接口。
@component
public class createusercommandhandler implements command.handler<createusercommand, userresponse> {
@autowired
private userrepository userrepository;
@override
public userresponse handle(createusercommand command) {
// 业务逻辑处理
user user = new user();
user.setusername(command.getusername());
user.setemail(command.getemail());
user saveduser = userrepository.save(user);
return new userresponse(saveduser.getid(), saveduser.getusername(), saveduser.getemail());
}
}3. 在业务代码中使用
通过注入pipeline实例,发送command并获取响应。
@service
public class userservice {
@autowired
private pipeline pipeline;
public userresponse createuser(string username, string email) {
createusercommand command = new createusercommand(username, email);
userresponse response = pipeline.send(command);
return response;
}
}4. 添加command中间件
中间件可以在command处理前后执行一些操作,如验证、日志、事务管理等。
@component
public class loggingmiddleware implements command.middleware {
private static final logger logger = loggerfactory.getlogger(loggingmiddleware.class);
@override
public <r, c extends command<r>> r invoke(c command, chain<r> chain) {
logger.info("executing command: {}", command.getclass().getsimplename());
try {
r result = chain.proceed(command);
logger.info("command executed successfully");
return result;
} catch (exception e) {
logger.error("command execution failed", e);
throw e;
}
}
}
@component
public class validationmiddleware implements command.middleware {
@autowired
private validator validator;
@override
public <r, c extends command<r>> r invoke(c command, chain<r> chain) {
set<constraintviolation<c>> violations = validator.validate(command);
if (!violations.isempty()) {
throw new constraintviolationexception("validation failed", violations);
}
return chain.proceed(command);
}
}
@component
@order(1) // 指定中间件执行顺序
public class transactionmiddleware implements command.middleware {
@autowired
private platformtransactionmanager transactionmanager;
@override
public <r, c extends command<r>> r invoke(c command, chain<r> chain) {
transactionstatus status = transactionmanager.gettransaction(new defaulttransactiondefinition());
try {
r result = chain.proceed(command);
transactionmanager.commit(status);
return result;
} catch (exception e) {
transactionmanager.rollback(status);
throw e;
}
}
}六、发布/订阅模式实现
发布/订阅模式使用notification接口,用于一对多的消息分发,没有返回值。
1. 定义notification
notification代表一个事件通知,需要实现net.sizovs.pipelinr.notification接口。
// 定义一个用户创建成功的事件通知
public class usercreatednotification implements notification {
private long userid;
private string username;
private string email;
private localdatetime createdtime;
public usercreatednotification(long userid, string username, string email) {
this.userid = userid;
this.username = username;
this.email = email;
this.createdtime = localdatetime.now();
}
// getters
}2. 定义notification handler
notification可以有多个处理器,每个处理器实现net.sizovs.pipelinr.notification.handler接口。
@component
public class sendwelcomeemailhandler implements notification.handler<usercreatednotification> {
private static final logger logger = loggerfactory.getlogger(sendwelcomeemailhandler.class);
@autowired
private emailservice emailservice;
@override
public void handle(usercreatednotification notification) {
logger.info("sending welcome email to user: {}", notification.getusername());
emailservice.sendwelcomeemail(notification.getemail(), notification.getusername());
}
}
@component
public class logusercreationhandler implements notification.handler<usercreatednotification> {
private static final logger logger = loggerfactory.getlogger(logusercreationhandler.class);
@autowired
private userauditlogrepository auditlogrepository;
@override
public void handle(usercreatednotification notification) {
logger.info("logging user creation: {}", notification.getusername());
userauditlog auditlog = new userauditlog();
auditlog.setuserid(notification.getuserid());
auditlog.setoperation("create");
auditlog.settimestamp(notification.getcreatedtime());
auditlogrepository.save(auditlog);
}
}
@component
public class updateuserstatisticshandler implements notification.handler<usercreatednotification> {
private static final logger logger = loggerfactory.getlogger(updateuserstatisticshandler.class);
@autowired
private userstatisticsrepository statisticsrepository;
@override
public void handle(usercreatednotification notification) {
logger.info("updating statistics for new user: {}", notification.getusername());
userstatistics stats = statisticsrepository.findorcreate();
stats.incrementtotalusers();
statisticsrepository.save(stats);
}
}3. 发送notification
在command处理完成后,可以发送notification通知所有相关的处理器。
@component
public class createusercommandhandler implements command.handler<createusercommand, userresponse> {
@autowired
private userrepository userrepository;
@autowired
private pipeline pipeline;
@override
public userresponse handle(createusercommand command) {
// 业务逻辑处理
user user = new user();
user.setusername(command.getusername());
user.setemail(command.getemail());
user saveduser = userrepository.save(user);
// 发送事件通知
usercreatednotification notification = new usercreatednotification(
saveduser.getid(),
saveduser.getusername(),
saveduser.getemail()
);
pipeline.send(notification);
return new userresponse(saveduser.getid(), saveduser.getusername(), saveduser.getemail());
}
}4. 添加notification中间件
类似command,notification也支持中间件。
@component
public class notificationloggingmiddleware implements notification.middleware {
private static final logger logger = loggerfactory.getlogger(notificationloggingmiddleware.class);
@override
public <n extends notification> void invoke(n notification, chain chain) {
logger.info("publishing notification: {}", notification.getclass().getsimplename());
try {
chain.proceed(notification);
logger.info("notification published successfully");
} catch (exception e) {
logger.error("notification publishing failed", e);
throw e;
}
}
}
@component
public class notificationerrorhandlingmiddleware implements notification.middleware {
private static final logger logger = loggerfactory.getlogger(notificationerrorhandlingmiddleware.class);
@override
public <n extends notification> void invoke(n notification, chain chain) {
try {
chain.proceed(notification);
} catch (exception e) {
logger.error("error handling notification: {}", notification.getclass().getsimplename(), e);
// 可以选择吞掉异常或重新抛出,取决于业务需求
// throw e;
}
}
}七、总结
核心收获
通过本文的介绍,我们了解了如何在java应用中使用pipelinr框架实现cqrs模式。核心要点总结如下:
1. cqrs的价值
- 读写分离:通过command处理写操作,notification处理事件响应,实现职责的明确划分
- 独立优化:读端和写端可以独立优化,不同的数据模型适应不同的场景需求
- 系统解耦:中介者模式解耦了调用方和处理方,提高了系统的可维护性和可扩展性
2. pipelinr的核心特性
- 轻量级实现:相比完整的cqrs框架,pipelinr更轻便,学习成本低
- 灵活的管道机制:通过中间件可以方便地植入横切关注点(如日志、验证、事务等)
- 支持两种消息模式:command用于请求/响应,notification用于发布/订阅
3. 最佳实践建议
- 合理使用中间件:通过@order注解控制中间件执行顺序,但要避免中间件层级过多导致性能问题
- 异常处理:根据场景选择合适的异常处理策略,notification可考虑不中断其他处理器的错误隔离
- 事件驱动设计:充分利用notification实现事件驱动架构,解耦不同的业务流程
- 代码组织:按照command、handler、middleware的划分方式组织代码,保持结构清晰
实施建议
适用场景
- 中等复杂度的业务系统,需要良好的代码结构和可维护性
- 业务逻辑相对复杂,需要事件驱动的系统设计
- 团队具备良好的ddd设计理念和架构意识
注意事项
- 学习曲线:虽然pipelinr本身简单,但要理解cqrs的设计理念需要一定时间
- 适度使用:cqrs不是银弹,过度设计会增加系统复杂度,要根据实际需求决定是否引入
- 团队协作:cqrs的有效实施对团队的整体架构意识和编码规范要求较高
- 性能考虑:虽然使用了中介者模式会引入少量额外开销,但对大多数应用来说可以忽略不计
结论
pipelinr提供了一种轻量级、简洁的cqrs实现方案。它特别适合那些想要在不过度复杂化系统的前提下,引入ddd思想和事件驱动设计的项目。通过合理运用command和notification,结合恰当的中间件设计,开发者可以构建出高内聚、低耦合、易于维护和扩展的应用系统。
关键是要把握好"度"——既要充分发挥cqrs和pipelinr的优势,又要避免为了追求"高大上"的架构而过度设计,最终的目标是为业务的快速迭代和长期维护提供支撑。
到此这篇关于在java中实现cqrs架构的全过程的文章就介绍到这了,更多相关java cqrs架构内容请搜索代码网以前的文章或继续浏览下面的相关文章希望大家以后多多支持代码网!
发表评论