一、引言:事务管理在多线程环境下的挑战
1.1 事务的本质与线程安全
在spring框架中,事务管理是基于threadlocal实现的。threadlocal为每个线程提供了独立的变量副本,确保每个线程都能独立地操作自己的事务资源,而不会相互干扰。这种设计在单线程环境下工作得很好,但在多线程环境下却带来了挑战。
// threadlocal在spring事务管理中的应用示例
public abstract class transactionsynchronizationmanager {
private static final threadlocal<map<object, object>> resources =
new namedthreadlocal<>("transactional resources");
private static final threadlocal<set<transactionsynchronization>> synchronizations =
new namedthreadlocal<>("transaction synchronizations");
private static final threadlocal<string> currenttransactionname =
new namedthreadlocal<>("current transaction name");
private static final threadlocal<boolean> currenttransactionreadonly =
new namedthreadlocal<>("current transaction read-only status");
private static final threadlocal<integer> currenttransactionisolationlevel =
new namedthreadlocal<>("current transaction isolation level");
private static final threadlocal<boolean> actualtransactionactive =
new namedthreadlocal<>("actual transaction active");
}1.2 多线程事务管理的核心问题
在多线程环境下,事务管理面临以下主要挑战:
- 事务上下文隔离:主线程的事务上下文无法自动传播到子线程
- 资源竞争与死锁:多个线程同时访问共享数据资源
- 事务一致性保证:如何确保所有线程的操作要么全部成功,要么全部回滚
- 异常处理复杂度:一个线程的异常如何影响其他线程的事务状态
1.3 springboot事务管理架构概览
─────────────────────────────────────────────────────────────┐ │ spring transaction architecture │ ├─────────────────────────────────────────────────────────────┤ │ @transactional │ │ │ │ │ ▼ │ │ transactioninterceptor │ │ │ │ │ ▼ │ │ platformtransactionmanager │ │ │ │ │ ▼ │ │ datasourcetransactionmanager / jpatransactionmanager / etc │ │ │ │ │ ▼ │ │ jdbc connection / jpa entitymanager │ └─────────────────────────────────────────────────────────────┘
二、spring事务管理基础回顾
2.1 声明式事务管理
spring通过@transactional注解提供声明式事务管理,这是最常用的方式:
@service
public class userservice {
@autowired
private userrepository userrepository;
@transactional
public user createuser(user user) {
// 业务逻辑
return userrepository.save(user);
}
@transactional(readonly = true)
public user finduserbyid(long id) {
return userrepository.findbyid(id).orelse(null);
}
}2.2 编程式事务管理
对于更复杂的事务控制,spring提供了编程式事务管理:
@service
public class orderservice {
@autowired
private platformtransactionmanager transactionmanager;
@autowired
private transactiontemplate transactiontemplate;
public void processorder(long orderid) {
// 方式1:使用transactiontemplate
transactiontemplate.execute(status -> {
// 业务逻辑
return null;
});
// 方式2:使用platformtransactionmanager
transactiondefinition def = new defaulttransactiondefinition();
transactionstatus status = transactionmanager.gettransaction(def);
try {
// 业务逻辑
transactionmanager.commit(status);
} catch (exception e) {
transactionmanager.rollback(status);
throw e;
}
}
}2.3 事务传播行为详解
spring定义了7种事务传播行为,理解这些行为对于多线程事务管理至关重要:
| 传播行为 | 说明 | 适用场景 |
|---|---|---|
| required | 支持当前事务,如果不存在则创建新事务 | 默认设置,最常用 |
| supports | 支持当前事务,如果不存在则以非事务方式执行 | 查询方法 |
| mandatory | 支持当前事务,如果不存在则抛出异常 | 必须存在事务的方法 |
| requires_new | 创建新事务,暂停当前事务 | 独立事务操作 |
| not_supported | 以非事务方式执行,暂停当前事务 | 不需要事务支持的操作 |
| never | 以非事务方式执行,如果存在事务则抛出异常 | 禁止事务的方法 |
| nested | 如果存在事务,则在嵌套事务内执行 | 需要部分回滚的场景 |
@service
public class complexservice {
@transactional(propagation = propagation.required)
public void methoda() {
// 方法a的业务逻辑
methodb(); // 调用方法b
}
@transactional(propagation = propagation.requires_new)
public void methodb() {
// 方法b将在独立的事务中执行
// 即使methoda回滚,methodb的提交也不会被影响
}
}三、springboot线程池配置与使用
3.1 线程池基础配置
springboot提供了灵活的线程池配置选项:
@configuration
@enableasync
public class threadpoolconfig {
/**
* 核心业务线程池
*/
@bean("businessexecutor")
public threadpooltaskexecutor businessexecutor() {
threadpooltaskexecutor executor = new threadpooltaskexecutor();
// 核心线程数:线程池维护的最小线程数量
executor.setcorepoolsize(10);
// 最大线程数:线程池允许的最大线程数量
executor.setmaxpoolsize(50);
// 队列容量:当线程数达到核心线程数时,新任务会进入队列等待
executor.setqueuecapacity(100);
// 线程名前缀:方便日志追踪
executor.setthreadnameprefix("business-thread-");
// 拒绝策略:当线程池和队列都满了时的处理策略
executor.setrejectedexecutionhandler(new threadpoolexecutor.callerrunspolicy());
// 线程空闲时间:非核心线程空闲存活时间
executor.setkeepaliveseconds(60);
// 等待所有任务完成后关闭线程池
executor.setwaitfortaskstocompleteonshutdown(true);
// 等待任务完成的超时时间
executor.setawaitterminationseconds(60);
// 初始化线程池
executor.initialize();
return executor;
}
/**
* 事务处理专用线程池
*/
@bean("transactionexecutor")
public threadpooltaskexecutor transactionexecutor() {
threadpooltaskexecutor executor = new threadpooltaskexecutor();
executor.setcorepoolsize(5);
executor.setmaxpoolsize(20);
executor.setqueuecapacity(50);
executor.setthreadnameprefix("transaction-thread-");
// 使用自定义拒绝策略
executor.setrejectedexecutionhandler(new rejectedexecutionhandler() {
@override
public void rejectedexecution(runnable r, threadpoolexecutor executor) {
// 记录日志
log.warn("transaction task rejected: {}", r.tostring());
// 尝试重新执行
if (!executor.isshutdown()) {
try {
executor.getqueue().put(r);
} catch (interruptedexception e) {
thread.currentthread().interrupt();
}
}
}
});
executor.initialize();
return executor;
}
}3.2 异步任务执行
@service
public class asyncservice {
@async("businessexecutor")
public completablefuture<string> asyncmethodwithreturn(string param) {
log.info("async method started with param: {}", param);
try {
thread.sleep(1000); // 模拟耗时操作
} catch (interruptedexception e) {
thread.currentthread().interrupt();
}
return completablefuture.completedfuture("result for " + param);
}
@async("transactionexecutor")
public void asyncmethodwithoutreturn() {
log.info("async method without return value started");
// 执行业务逻辑
}
}3.3 线程池监控与管理
@component
public class threadpoolmonitor {
@autowired
@qualifier("businessexecutor")
private threadpooltaskexecutor businessexecutor;
@scheduled(fixedrate = 30000) // 每30秒监控一次
public void monitorthreadpool() {
threadpoolexecutor executor = businessexecutor.getthreadpoolexecutor();
log.info("====== thread pool monitor ======");
log.info("active threads: {}", executor.getactivecount());
log.info("pool size: {}", executor.getpoolsize());
log.info("core pool size: {}", executor.getcorepoolsize());
log.info("maximum pool size: {}", executor.getmaximumpoolsize());
log.info("queue size: {}", executor.getqueue().size());
log.info("completed tasks: {}", executor.getcompletedtaskcount());
log.info("total tasks: {}", executor.gettaskcount());
log.info("================================");
// 如果队列使用率过高,可以动态调整
double queueusage = (double) executor.getqueue().size() / executor.getqueue().remainingcapacity();
if (queueusage > 0.8) {
log.warn("thread pool queue usage is high: {}%", queueusage * 100);
}
}
}四、多线程环境下的事务挑战与解决方案
4.1 问题分析:为什么事务不能跨线程传播
@service
public class problematicservice {
@transactional
public void mainmethod() {
// 主线程事务开始
log.info("main thread transaction active: {}",
transactionsynchronizationmanager.isactualtransactionactive());
// 创建子线程
new thread(() -> {
// 子线程中无法访问主线程的事务上下文
log.info("child thread transaction active: {}",
transactionsynchronizationmanager.isactualtransactionactive());
// 这里会抛出异常:没有活动的事务
// 尝试数据库操作会失败
}).start();
}
}4.2 解决方案1:事务上下文传递
4.2.1 手动传递事务属性
@service
public class transactionpropagationservice {
@transactional
public void processwithcontextpropagation() {
// 获取当前事务的属性
transactionattribute transactionattribute =
transactionaspectsupport.currenttransactionstatus().gettransactionattribute();
// 获取当前事务的隔离级别、超时时间等属性
integer isolationlevel = transactionsynchronizationmanager.getcurrenttransactionisolationlevel();
integer timeout = transactionattribute.gettimeout();
boolean readonly = transactionsynchronizationmanager.iscurrenttransactionreadonly();
// 将事务属性传递给子线程
transactioncontext context = new transactioncontext();
context.setisolationlevel(isolationlevel);
context.settimeout(timeout);
context.setreadonly(readonly);
// 在子线程中重新创建事务
completablefuture.runasync(() -> {
executeinnewtransaction(context, () -> {
// 子线程的业务逻辑
log.info("child thread executing with transaction");
});
});
}
@data
private static class transactioncontext {
private integer isolationlevel;
private integer timeout;
private boolean readonly;
}
}4.2.2 使用inheritablethreadlocal(谨慎使用)
@component
public class transactioncontextholder {
// 注意:inheritablethreadlocal有内存泄漏风险,需要谨慎使用
private static final inheritablethreadlocal<map<string, object>> context =
new inheritablethreadlocal<>() {
@override
protected map<string, object> childvalue(map<string, object> parentvalue) {
// 深度拷贝,避免父子线程共享同一对象
return parentvalue != null ? new hashmap<>(parentvalue) : null;
}
};
public static void set(string key, object value) {
map<string, object> map = context.get();
if (map == null) {
map = new hashmap<>();
context.set(map);
}
map.put(key, value);
}
public static object get(string key) {
map<string, object> map = context.get();
return map != null ? map.get(key) : null;
}
public static void clear() {
context.remove();
}
}4.3 解决方案2:使用编程式事务管理
@service
public class programmatictransactionservice {
@autowired
private platformtransactionmanager transactionmanager;
@autowired
private jdbctemplate jdbctemplate;
public void processwithmultiplethreads(list<task> tasks) {
// 主线程事务
defaulttransactiondefinition definition = new defaulttransactiondefinition();
definition.setpropagationbehavior(transactiondefinition.propagation_required);
definition.setisolationlevel(transactiondefinition.isolation_read_committed);
transactionstatus mainstatus = transactionmanager.gettransaction(definition);
try {
// 主线程业务逻辑
executemainlogic();
// 创建子线程任务
list<completablefuture<void>> futures = new arraylist<>();
for (task task : tasks) {
completablefuture<void> future = completablefuture.runasync(() -> {
// 每个子线程有自己的事务
defaulttransactiondefinition childdefinition = new defaulttransactiondefinition();
childdefinition.setpropagationbehavior(transactiondefinition.propagation_requires_new);
childdefinition.setisolationlevel(transactiondefinition.isolation_read_committed);
transactionstatus childstatus = transactionmanager.gettransaction(childdefinition);
try {
executechildlogic(task);
transactionmanager.commit(childstatus);
} catch (exception e) {
transactionmanager.rollback(childstatus);
throw new runtimeexception("child thread transaction failed", e);
}
});
futures.add(future);
}
// 等待所有子线程完成
completablefuture.allof(futures.toarray(new completablefuture[0])).join();
// 提交主事务
transactionmanager.commit(mainstatus);
} catch (exception e) {
// 回滚主事务
transactionmanager.rollback(mainstatus);
throw new runtimeexception("main transaction failed", e);
}
}
private void executemainlogic() {
// 主线程业务逻辑实现
jdbctemplate.update("insert into main_table (data) values (?)", "main data");
}
private void executechildlogic(task task) {
// 子线程业务逻辑实现
jdbctemplate.update("insert into child_table (task_id, data) values (?, ?)",
task.getid(), task.getdata());
}
}五、主线程与子线程事务协调策略
5.1 策略一:主线程等待所有子线程提交
@service
public class coordinatedtransactionservice {
@autowired
private datasource datasource;
@autowired
private platformtransactionmanager transactionmanager;
/**
* 策略:主线程等待所有子线程事务成功后才提交
*/
public void coordinatedstrategy1(list<businesstask> tasks) {
// 用于收集子线程执行结果
list<completablefuture<boolean>> futures = new arraylist<>();
// 创建countdownlatch用于等待所有子线程完成
countdownlatch latch = new countdownlatch(tasks.size());
// 创建共享异常收集器
atomicreference<exception> sharedexception = new atomicreference<>();
// 启动主事务
defaulttransactiondefinition def = new defaulttransactiondefinition();
transactionstatus mainstatus = transactionmanager.gettransaction(def);
try {
// 执行主线程逻辑
executemainbusiness();
// 启动子线程
for (businesstask task : tasks) {
completablefuture<boolean> future = completablefuture.supplyasync(() -> {
try {
// 每个子线程使用独立的事务
return executechildtransaction(task);
} catch (exception e) {
sharedexception.set(e);
return false;
} finally {
latch.countdown();
}
});
futures.add(future);
}
// 等待所有子线程完成
boolean completed = latch.await(30, timeunit.seconds);
if (!completed) {
throw new timeoutexception("child threads timeout");
}
// 检查是否有子线程失败
if (sharedexception.get() != null) {
throw new runtimeexception("child thread failed", sharedexception.get());
}
// 检查所有子线程结果
for (completablefuture<boolean> future : futures) {
if (!future.get()) {
throw new runtimeexception("at least one child thread failed");
}
}
// 提交主事务
transactionmanager.commit(mainstatus);
} catch (exception e) {
// 回滚主事务
transactionmanager.rollback(mainstatus);
throw new runtimeexception("coordinated transaction failed", e);
}
}
private boolean executechildtransaction(businesstask task) {
// 子线程使用独立的事务
defaulttransactiondefinition def = new defaulttransactiondefinition();
def.setpropagationbehavior(transactiondefinition.propagation_requires_new);
transactionstatus status = transactionmanager.gettransaction(def);
try {
// 执行子线程业务逻辑
processtask(task);
transactionmanager.commit(status);
return true;
} catch (exception e) {
transactionmanager.rollback(status);
log.error("child transaction failed for task: {}", task.getid(), e);
return false;
}
}
}5.2 策略二:两阶段提交模式
@service
public class twophasecommitservice {
@autowired
private datasource datasource;
/**
* 简化的两阶段提交实现
*/
public void twophasecommitstrategy(list<runnable> tasks) {
// 第一阶段:准备阶段
list<completablefuture<boolean>> preparefutures = new arraylist<>();
list<transactionstatus> childstatuses = collections.synchronizedlist(new arraylist<>());
try {
// 主事务开始
connection mainconn = datasource.getconnection();
mainconn.setautocommit(false);
try {
// 主线程准备
preparemainphase(mainconn);
// 子线程准备
for (runnable task : tasks) {
completablefuture<boolean> future = completablefuture.supplyasync(() -> {
try {
connection childconn = datasource.getconnection();
childconn.setautocommit(false);
// 执行准备操作
boolean prepared = preparechildphase(childconn, task);
if (prepared) {
// 保存连接和状态,用于第二阶段
childstatuses.add(new transactionstatus(childconn));
}
return prepared;
} catch (sqlexception e) {
log.error("child prepare phase failed", e);
return false;
}
});
preparefutures.add(future);
}
// 等待所有准备阶段完成
completablefuture<void> allprepare = completablefuture.allof(
preparefutures.toarray(new completablefuture[0])
);
allprepare.get(10, timeunit.seconds);
// 检查所有子线程是否准备成功
boolean allprepared = preparefutures.stream()
.allmatch(f -> {
try {
return f.get();
} catch (exception e) {
return false;
}
});
if (!allprepared) {
// 有任何失败,执行回滚
rollbackall(mainconn, childstatuses);
throw new runtimeexception("prepare phase failed");
}
// 第二阶段:提交阶段
commitall(mainconn, childstatuses);
} catch (exception e) {
mainconn.rollback();
throw new runtimeexception("two-phase commit failed", e);
} finally {
mainconn.close();
}
} catch (sqlexception e) {
throw new runtimeexception("database connection error", e);
}
}
private void commitall(connection mainconn, list<transactionstatus> childstatuses) throws sqlexception {
try {
// 先提交所有子事务
for (transactionstatus status : childstatuses) {
status.getconnection().commit();
status.getconnection().close();
}
// 最后提交主事务
mainconn.commit();
} catch (sqlexception e) {
// 提交失败,尝试回滚所有
try {
mainconn.rollback();
} catch (sqlexception ex) {
log.error("failed to rollback main connection", ex);
}
throw e;
}
}
@data
private static class transactionstatus {
private final connection connection;
private final long threadid = thread.currentthread().getid();
private final localdatetime createtime = localdatetime.now();
}
}5.3 策略三:补偿事务模式
@service
@slf4j
public class compensationtransactionservice {
@autowired
private jdbctemplate jdbctemplate;
/**
* 补偿事务模式:记录所有操作,失败时执行补偿
*/
public void compensationstrategy(list<businessoperation> operations) {
// 用于记录需要补偿的操作
list<compensationaction> compensationactions = collections.synchronizedlist(new arraylist<>());
// 主事务开始
defaulttransactiondefinition maindef = new defaulttransactiondefinition();
platformtransactionmanager transactionmanager =
new datasourcetransactionmanager(objects.requirenonnull(jdbctemplate.getdatasource()));
transactionstatus mainstatus = transactionmanager.gettransaction(maindef);
try {
// 执行主操作
compensationaction mainaction = executemainoperation();
compensationactions.add(mainaction);
// 并行执行子操作
list<completablefuture<compensationaction>> futures = operations.stream()
.map(op -> completablefuture.supplyasync(() -> {
try {
return executechildoperation(op);
} catch (exception e) {
throw new completionexception(e);
}
}))
.collect(collectors.tolist());
// 等待所有子操作完成
list<compensationaction> childactions = futures.stream()
.map(f -> {
try {
return f.get();
} catch (exception e) {
throw new runtimeexception("child operation failed", e);
}
})
.collect(collectors.tolist());
compensationactions.addall(childactions);
// 所有操作成功,提交主事务
transactionmanager.commit(mainstatus);
// 记录成功日志
logcompensationsuccess(compensationactions);
} catch (exception e) {
// 回滚主事务
transactionmanager.rollback(mainstatus);
// 执行补偿操作
executecompensations(compensationactions);
throw new runtimeexception("transaction failed, compensation executed", e);
}
}
private compensationaction executemainoperation() {
// 执行业务操作,并返回补偿动作
string operationid = uuid.randomuuid().tostring();
try {
// 业务逻辑
jdbctemplate.update("insert into main_operations (id, data) values (?, ?)",
operationid, "main data");
// 返回补偿动作
return compensationaction.builder()
.operationid(operationid)
.operationtype("insert_main")
.compensationsql("delete from main_operations where id = ?")
.compensationparams(new object[]{operationid})
.build();
} catch (exception e) {
throw new runtimeexception("main operation failed", e);
}
}
private void executecompensations(list<compensationaction> actions) {
// 按照操作的反向顺序执行补偿
collections.reverse(actions);
for (compensationaction action : actions) {
try {
jdbctemplate.update(action.getcompensationsql(), action.getcompensationparams());
log.info("compensation executed for operation: {}", action.getoperationid());
} catch (exception e) {
log.error("failed to execute compensation for operation: {}",
action.getoperationid(), e);
// 继续执行其他补偿,不中断
}
}
}
@data
@builder
private static class compensationaction {
private string operationid;
private string operationtype;
private string compensationsql;
private object[] compensationparams;
private localdatetime operationtime;
}
}六、spring事务同步机制在多线程中的应用
6.1 使用transactionsynchronization
@service
public class transactionsynchronizationservice {
@autowired
private datasource datasource;
/**
* 使用transactionsynchronization协调多线程事务
*/
@transactional
public void processwithsynchronization(list<subtask> subtasks) {
// 注册事务同步器
transactionsynchronizationmanager.registersynchronization(
new customtransactionsynchronization(subtasks)
);
// 主线程业务逻辑
executemainbusiness();
// 注意:子线程操作将在事务提交前执行
// transactionsynchronization.beforecommit()中启动子线程
}
private class customtransactionsynchronization implements transactionsynchronization {
private final list<subtask> subtasks;
private final executorservice executorservice;
private final list<future<?>> futures;
public customtransactionsynchronization(list<subtask> subtasks) {
this.subtasks = subtasks;
this.executorservice = executors.newfixedthreadpool(subtasks.size());
this.futures = new arraylist<>();
}
@override
public void beforecommit(boolean readonly) {
log.info("transactionsynchronization.beforecommit called");
// 在事务提交前启动子线程
for (subtask task : subtasks) {
future<?> future = executorservice.submit(() -> {
try {
// 每个子线程使用独立连接和事务
executesubtaskinnewtransaction(task);
} catch (exception e) {
log.error("subtask execution failed", e);
throw new runtimeexception(e);
}
});
futures.add(future);
}
// 等待所有子线程完成
for (future<?> future : futures) {
try {
future.get(10, timeunit.seconds);
} catch (exception e) {
throw new runtimeexception("failed to complete subtasks", e);
}
}
executorservice.shutdown();
}
@override
public void aftercompletion(int status) {
log.info("transactionsynchronization.aftercompletion called with status: {}",
status == status_committed ? "committed" : "rolled_back");
if (status == status_rolled_back) {
// 事务回滚,需要清理子线程可能已经提交的操作
log.warn("main transaction rolled back, but child transactions may have been committed");
// 这里可以实现补偿逻辑
}
cleanup();
}
private void cleanup() {
if (!executorservice.isshutdown()) {
executorservice.shutdownnow();
}
}
}
}6.2 事务事件监听机制
@component
@slf4j
public class transactioneventlistenerservice {
@autowired
private applicationeventpublisher eventpublisher;
@transactionaleventlistener(phase = transactionphase.before_commit)
public void handlebeforecommit(transactionevent event) {
log.info("before commit event received");
// 在事务提交前执行操作
prepareforcommit();
}
@transactionaleventlistener(phase = transactionphase.after_completion)
public void handleaftercompletion(transactioncompletionevent event) {
log.info("transaction completed with status: {}",
event.gettransactionresult() == transactionresult.committed ? "committed" : "rolled_back");
if (event.gettransactionresult() == transactionresult.committed) {
// 事务提交后执行异步操作
executepostcommitoperations(event.getbusinessdata());
} else {
// 事务回滚后的清理操作
executerollbackcleanup(event.getbusinessdata());
}
}
@async
public void executepostcommitoperations(businessdata data) {
// 异步执行提交后的操作
log.info("executing post-commit operations asynchronously");
// 这里可以启动子线程进行后续处理
}
@data
public static class transactionevent {
private final string transactionid;
private final localdatetime eventtime;
private final businessdata businessdata;
}
@data
public static class transactioncompletionevent extends transactionevent {
private final transactionresult transactionresult;
}
public enum transactionresult {
committed,
rolled_back
}
}七、分布式事务在多线程场景下的应用
7.1 基于seata的分布式事务解决方案
@service
@slf4j
public class seatadistributedtransactionservice {
@autowired
private userservice userservice;
@autowired
private orderservice orderservice;
@autowired
private inventoryservice inventoryservice;
/**
* 使用seata at模式处理多线程分布式事务
* 注意:seata默认不支持多线程,需要特殊处理
*/
@globaltransactional(timeoutmills = 300000, name = "multi-thread-purchase")
public void purchasewithmultiplethreads(purchaserequest request) {
// 获取全局事务id
string xid = rootcontext.getxid();
log.info("global transaction started, xid: {}", xid);
// 用于收集子线程执行结果
list<completablefuture<boolean>> futures = new arraylist<>();
try {
// 任务1:扣减库存(异步执行)
completablefuture<boolean> inventoryfuture = completablefuture.supplyasync(() -> {
// 传播全局事务id到子线程
rootcontext.bind(xid);
try {
return inventoryservice.deduct(request.getproductid(), request.getquantity());
} finally {
rootcontext.unbind();
}
});
futures.add(inventoryfuture);
// 任务2:创建订单(异步执行)
completablefuture<boolean> orderfuture = completablefuture.supplyasync(() -> {
rootcontext.bind(xid);
try {
return orderservice.createorder(request);
} finally {
rootcontext.unbind();
}
});
futures.add(orderfuture);
// 任务3:更新用户信息(主线程执行)
boolean userupdated = userservice.updatepurchaseinfo(request.getuserid(), request.getamount());
if (!userupdated) {
throw new runtimeexception("failed to update user info");
}
// 等待所有异步任务完成
completablefuture<void> allfutures = completablefuture.allof(
futures.toarray(new completablefuture[0])
);
allfutures.get(30, timeunit.seconds);
// 检查所有任务结果
for (completablefuture<boolean> future : futures) {
if (!future.get()) {
throw new runtimeexception("one of the async tasks failed");
}
}
log.info("all distributed transactions completed successfully");
} catch (exception e) {
log.error("distributed transaction failed", e);
// seata会自动回滚所有分支事务
throw new runtimeexception("purchase failed", e);
} finally {
// 清理上下文
rootcontext.unbind();
}
}
}
// seata配置类
@configuration
public class seataconfig {
@bean
public globaltransactionscanner globaltransactionscanner() {
return new globaltransactionscanner("multi-thread-app", "my_test_tx_group");
}
/**
* 自定义datasourceproxy以支持多线程
*/
@bean
public datasource datasource(datasource druiddatasource) {
return new datasourceproxy(druiddatasource);
}
}7.2 基于消息队列的最终一致性方案
@service
@slf4j
public class mqbasedtransactionservice {
@autowired
private rabbittemplate rabbittemplate;
@autowired
private jdbctemplate jdbctemplate;
/**
* 基于消息队列的最终一致性方案
*/
@transactional
public void processwithmq(list<subtask> tasks) {
// 1. 主事务操作
executemaintransaction();
// 2. 发送准备消息(不投递)
list<string> messageids = new arraylist<>();
for (subtask task : tasks) {
string messageid = sendpreparemessage(task);
messageids.add(messageid);
}
// 3. 本地记录消息状态
savemessagestatus(messageids, messagestatus.prepared);
// 4. 提交主事务(消息仍未投递)
// 事务提交后,下面的代码才会执行
// 5. 事务提交后,确认投递消息
transactionsynchronizationmanager.registersynchronization(
new transactionsynchronization() {
@override
public void aftercommit() {
// 确认投递所有消息
for (string messageid : messageids) {
confirmmessagedelivery(messageid);
updatemessagestatus(messageid, messagestatus.confirmed);
}
// 异步执行子任务
executesubtasksasync(tasks);
}
@override
public void aftercompletion(int status) {
if (status == status_rolled_back) {
// 取消所有消息
for (string messageid : messageids) {
cancelmessage(messageid);
updatemessagestatus(messageid, messagestatus.cancelled);
}
}
}
}
);
}
private void executesubtasksasync(list<subtask> tasks) {
executorservice executor = executors.newfixedthreadpool(tasks.size());
list<completablefuture<void>> futures = tasks.stream()
.map(task -> completablefuture.runasync(() -> {
try {
// 每个子线程处理自己的任务
processsubtask(task);
} catch (exception e) {
log.error("subtask processing failed", e);
// 发送补偿消息
sendcompensationmessage(task);
}
}, executor))
.collect(collectors.tolist());
// 等待所有任务完成
completablefuture.allof(futures.toarray(new completablefuture[0]))
.thenrun(() -> {
executor.shutdown();
log.info("all subtasks completed");
})
.exceptionally(ex -> {
log.error("failed to complete all subtasks", ex);
executor.shutdownnow();
return null;
});
}
private enum messagestatus {
prepared,
confirmed,
cancelled,
completed
}
}八、性能优化与最佳实践
8.1 线程池优化配置
# application.yml 线程池配置
spring:
task:
execution:
pool:
# 公共线程池配置
common:
core-size: 10
max-size: 50
queue-capacity: 1000
keep-alive: 60s
thread-name-prefix: "common-task-"
# 事务处理专用线程池
transaction:
core-size: 5
max-size: 20
queue-capacity: 500
keep-alive: 30s
thread-name-prefix: "tx-task-"
allow-core-thread-timeout: true
# io密集型任务线程池
io-intensive:
core-size: 20
max-size: 100
queue-capacity: 2000
keep-alive: 120s
thread-name-prefix: "io-task-"
# 线程池监控配置
management:
endpoints:
web:
exposure:
include: "health,info,metrics,threadpool"
metrics:
export:
prometheus:
enabled: true8.2 事务优化策略
@configuration
@enabletransactionmanagement
public class transactionoptimizationconfig {
/**
* 事务管理器配置优化
*/
@bean
public platformtransactionmanager transactionmanager(datasource datasource) {
datasourcetransactionmanager transactionmanager = new datasourcetransactionmanager(datasource);
// 优化配置
transactionmanager.setnestedtransactionallowed(true); // 允许嵌套事务
transactionmanager.setvalidateexistingtransaction(true); // 验证已有事务
transactionmanager.setglobalrollbackonparticipationfailure(false); // 优化参与失败时的回滚行为
return transactionmanager;
}
/**
* 事务模板配置
*/
@bean
public transactiontemplate transactiontemplate(platformtransactionmanager transactionmanager) {
transactiontemplate template = new transactiontemplate(transactionmanager);
// 设置默认事务属性
template.setpropagationbehavior(transactiondefinition.propagation_required);
template.setisolationlevel(transactiondefinition.isolation_read_committed);
template.settimeout(30); // 30秒超时
// 只读事务优化
template.setreadonly(false);
return template;
}
/**
* 事务拦截器优化
*/
@bean
public transactioninterceptor transactioninterceptor(platformtransactionmanager transactionmanager) {
transactioninterceptor interceptor = new transactioninterceptor();
interceptor.settransactionmanager(transactionmanager);
// 配置事务属性源
namematchtransactionattributesource source = new namematchtransactionattributesource();
// 查询方法使用只读事务
rulebasedtransactionattribute readonlyattr = new rulebasedtransactionattribute();
readonlyattr.setreadonly(true);
readonlyattr.setpropagationbehavior(transactiondefinition.propagation_supports);
// 写操作使用读写事务
rulebasedtransactionattribute writeattr = new rulebasedtransactionattribute();
writeattr.setpropagationbehavior(transactiondefinition.propagation_required);
writeattr.setisolationlevel(transactiondefinition.isolation_read_committed);
writeattr.settimeout(30);
// 方法名模式匹配
source.addtransactionalmethod("get*", readonlyattr);
source.addtransactionalmethod("find*", readonlyattr);
source.addtransactionalmethod("query*", readonlyattr);
source.addtransactionalmethod("save*", writeattr);
source.addtransactionalmethod("update*", writeattr);
source.addtransactionalmethod("delete*", writeattr);
source.addtransactionalmethod("process*", writeattr);
interceptor.settransactionattributesource(source);
return interceptor;
}
}8.3 监控与告警
@component
@slf4j
public class transactionmonitor {
@autowired
private meterregistry meterregistry;
@autowired
private platformtransactionmanager transactionmanager;
private final map<string, atomicinteger> transactioncounters = new concurrenthashmap<>();
private final map<string, atomiclong> transactiondurations = new concurrenthashmap<>();
/**
* 事务监控aop
*/
@aspect
@component
public static class transactionmonitoringaspect {
private final threadlocal<long> starttime = new threadlocal<>();
private final transactionmonitor monitor;
public transactionmonitoringaspect(transactionmonitor monitor) {
this.monitor = monitor;
}
@around("@annotation(org.springframework.transaction.annotation.transactional)")
public object monitortransaction(proceedingjoinpoint joinpoint) throws throwable {
string methodname = joinpoint.getsignature().toshortstring();
string transactionname = extracttransactionname(methodname);
// 记录开始时间
starttime.set(system.currenttimemillis());
try {
// 增加事务计数器
monitor.incrementtransactioncounter(transactionname);
// 执行原方法
object result = joinpoint.proceed();
// 记录成功
monitor.recordtransactionsuccess(transactionname,
system.currenttimemillis() - starttime.get());
return result;
} catch (exception e) {
// 记录失败
monitor.recordtransactionfailure(transactionname,
system.currenttimemillis() - starttime.get(), e);
throw e;
} finally {
starttime.remove();
}
}
private string extracttransactionname(string methodname) {
// 简化的方法名提取逻辑
return methodname.replaceall(".*\\.", "").replaceall("\\(.*\\)", "");
}
}
public void incrementtransactioncounter(string transactionname) {
transactioncounters
.computeifabsent(transactionname, k -> new atomicinteger(0))
.incrementandget();
// 发布到监控系统
meterregistry.counter("transactions.total", "name", transactionname).increment();
}
public void recordtransactionsuccess(string transactionname, long duration) {
transactiondurations
.computeifabsent(transactionname, k -> new atomiclong(0))
.addandget(duration);
// 发布到监控系统
meterregistry.timer("transactions.duration", "name", transactionname, "status", "success")
.record(duration, timeunit.milliseconds);
// 检查性能阈值
if (duration > 1000) { // 超过1秒告警
log.warn("slow transaction detected: {} took {}ms", transactionname, duration);
}
}
/**
* 生成监控报告
*/
@scheduled(fixeddelay = 60000) // 每分钟生成一次报告
public void generatemonitoringreport() {
map<string, object> report = new hashmap<>();
transactioncounters.foreach((name, counter) -> {
long count = counter.getandset(0);
long totalduration = transactiondurations.getordefault(name, new atomiclong(0))
.getandset(0);
long avgduration = count > 0 ? totalduration / count : 0;
report.put(name, map.of(
"count", count,
"avgduration", avgduration,
"tps", count / 60.0
));
});
log.info("transaction monitoring report: {}", report);
// 发送到监控系统
sendtomonitoringsystem(report);
}
}九、常见问题与解决方案
9.1 问题一:事务不回滚
问题现象:子线程抛出异常,但主线程事务没有回滚。
原因分析:
- 子线程异常没有传播到主线程
- 事务传播行为配置不当
- 异常类型没有被spring事务管理器识别
解决方案:
@service
public class transactionrollbacksolution {
@transactional(rollbackfor = exception.class)
public void processwithrollbackcontrol(list<task> tasks) {
// 使用completablefuture收集异常
list<completablefuture<void>> futures = new arraylist<>();
completablefuture<throwable> errorfuture = new completablefuture<>();
for (task task : tasks) {
completablefuture<void> future = completablefuture.runasync(() -> {
try {
executetask(task);
} catch (exception e) {
// 将异常传递给错误future
errorfuture.complete(e);
throw new completionexception(e);
}
});
futures.add(future);
}
try {
// 等待所有任务完成或发生错误
completablefuture<void> allfutures = completablefuture.allof(
futures.toarray(new completablefuture[0])
);
// 设置超时时间
allfutures.get(30, timeunit.seconds);
// 检查是否有错误发生
if (errorfuture.isdone()) {
throw new runtimeexception("child thread failed", errorfuture.get());
}
} catch (timeoutexception e) {
throw new runtimeexception("operation timeout", e);
} catch (exception e) {
// 确保事务回滚
transactionaspectsupport.currenttransactionstatus().setrollbackonly();
throw new runtimeexception("process failed", e);
}
}
/**
* 另一种解决方案:使用transactioncallback
*/
public void processwithtransactioncallback(list<task> tasks) {
transactiontemplate.execute(new transactioncallbackwithoutresult() {
@override
protected void dointransactionwithoutresult(transactionstatus status) {
try {
// 并行执行任务
list<completablefuture<void>> futures = tasks.stream()
.map(task -> completablefuture.runasync(() -> {
// 每个子线程使用独立事务
executeinnewtransaction(task);
}))
.collect(collectors.tolist());
// 等待所有完成
completablefuture.allof(futures.toarray(new completablefuture[0]))
.exceptionally(ex -> {
// 标记事务为回滚
status.setrollbackonly();
return null;
})
.join();
} catch (exception e) {
status.setrollbackonly();
throw e;
}
}
});
}
}9.2 问题二:连接泄漏
问题现象:数据库连接数持续增长,最终耗尽连接池。
原因分析:
- 子线程没有正确关闭数据库连接
- 事务管理不当导致连接未释放
- 线程池配置不合理
解决方案:
@service
public class connectionleaksolution {
@autowired
private datasource datasource;
/**
* 使用connection包装器确保资源释放
*/
public void processwithconnectionmanagement(list<task> tasks) {
// 使用try-with-resources确保连接关闭
try (connectionholder connectionholder = new connectionholder(datasource)) {
list<completablefuture<void>> futures = tasks.stream()
.map(task -> completablefuture.runasync(() -> {
// 每个线程使用独立的连接
try (connection connection = datasource.getconnection()) {
connection.setautocommit(false);
try {
executetaskwithconnection(task, connection);
connection.commit();
} catch (exception e) {
connection.rollback();
throw new runtimeexception("task failed", e);
}
} catch (sqlexception e) {
throw new runtimeexception("connection error", e);
}
}))
.collect(collectors.tolist());
// 等待所有任务完成
completablefuture.allof(futures.toarray(new completablefuture[0])).join();
} catch (exception e) {
throw new runtimeexception("process failed", e);
}
}
/**
* 连接持有器,确保连接正确关闭
*/
private static class connectionholder implements autocloseable {
private final list<connection> connections = new arraylist<>();
private final datasource datasource;
public connectionholder(datasource datasource) {
this.datasource = datasource;
}
public connection getconnection() throws sqlexception {
connection connection = datasource.getconnection();
connections.add(connection);
return connection;
}
@override
public void close() {
for (connection connection : connections) {
try {
if (!connection.isclosed()) {
connection.close();
}
} catch (sqlexception e) {
log.error("failed to close connection", e);
}
}
}
}
/**
* 连接池监控
*/
@component
@slf4j
public static class connectionpoolmonitor {
@autowired
private datasource datasource;
@scheduled(fixedrate = 30000)
public void monitorconnectionpool() {
if (datasource instanceof hikaridatasource) {
hikaridatasource hikaridatasource = (hikaridatasource) datasource;
log.info("connection pool status: " +
"active: {}, " +
"idle: {}, " +
"total: {}, " +
"waiting: {}",
hikaridatasource.gethikaripoolmxbean().getactiveconnections(),
hikaridatasource.gethikaripoolmxbean().getidleconnections(),
hikaridatasource.gethikaripoolmxbean().gettotalconnections(),
hikaridatasource.gethikaripoolmxbean().getthreadsawaitingconnection());
// 连接泄漏检测
if (hikaridatasource.gethikaripoolmxbean().getactiveconnections() >
hikaridatasource.getmaximumpoolsize() * 0.8) {
log.warn("connection pool usage is high, possible connection leak");
}
}
}
}
}十、总结与最佳实践建议
10.1 核心原则总结
- 事务边界清晰:明确每个事务的边界,避免事务过长或过短
- 线程隔离:确保每个线程使用独立的事务上下文
- 资源管理:严格管理数据库连接等资源,避免泄漏
- 异常处理:设计完善的异常处理机制,确保事务一致性
- 监控告警:建立全面的监控体系,及时发现和处理问题
10.2 最佳实践建议
10.2.1 架构设计层面
/**
* 推荐的架构模式
*/
@component
public class transactionarchitecturepattern {
/**
* 模式1:主从事务模式
* 主线程负责协调,子线程执行具体任务
*/
public void masterslavepattern(list<task> tasks) {
// 1. 主线程开启事务,记录任务状态
recordtaskstart(tasks);
// 2. 子线程并行处理(各自独立事务)
list<completablefuture<result>> futures = processtasksinparallel(tasks);
// 3. 收集结果,更新状态
processresults(futures);
// 4. 主线程提交事务
}
/**
* 模式2:补偿事务模式
* 适用于需要最终一致性的场景
*/
public void compensationpattern(businessoperation operation) {
// 1. 执行主操作
operationresult result = executemainoperation(operation);
// 2. 记录操作日志(用于补偿)
recordoperationlog(operation, result);
// 3. 异步执行后续操作
executeasyncfollowup(operation, result);
// 4. 提供补偿接口
registercompensationcallback(operation);
}
/**
* 模式3:批量处理模式
* 适用于大批量数据处理
*/
public void batchprocessingpattern(list<dataitem> items) {
// 1. 分批处理
list<list<dataitem>> batches = partitionitems(items, 100);
// 2. 并行处理每个批次
batches.parallelstream().foreach(batch -> {
// 每个批次独立事务
processbatchintransaction(batch);
});
// 3. 汇总结果
summarizeresults();
}
}10.2.2 代码实现层面
- 使用模板方法减少重复代码:
public abstract class transactiontemplatepattern {
@autowired
protected platformtransactionmanager transactionmanager;
/**
* 执行带事务的异步任务
*/
protected <t> completablefuture<t> executeasyncintransaction(
supplier<t> task,
transactiondefinition definition) {
return completablefuture.supplyasync(() -> {
transactionstatus status = transactionmanager.gettransaction(definition);
try {
t result = task.get();
transactionmanager.commit(status);
return result;
} catch (exception e) {
transactionmanager.rollback(status);
throw new completionexception(e);
}
});
}
/**
* 执行带重试的事务
*/
protected <t> t executewithretry(
callable<t> task,
int maxretries,
long backoffdelay) {
int retrycount = 0;
while (retrycount <= maxretries) {
try {
return transactiontemplate.execute(status -> {
try {
return task.call();
} catch (exception e) {
throw new runtimeexception(e);
}
});
} catch (exception e) {
retrycount++;
if (retrycount > maxretries) {
throw e;
}
try {
thread.sleep(backoffdelay * retrycount);
} catch (interruptedexception ie) {
thread.currentthread().interrupt();
throw new runtimeexception(ie);
}
}
}
throw new illegalstateexception("should not reach here");
}
}10.2.3 配置管理层面
- 环境特定的线程池配置:
@configuration
@profile({"dev", "test"})
public class devthreadpoolconfig {
@bean
public threadpooltaskexecutor taskexecutor() {
threadpooltaskexecutor executor = new threadpooltaskexecutor();
executor.setcorepoolsize(5);
executor.setmaxpoolsize(10);
executor.setqueuecapacity(50);
return executor;
}
}
@configuration
@profile("prod")
public class prodthreadpoolconfig {
@bean
public threadpooltaskexecutor taskexecutor() {
threadpooltaskexecutor executor = new threadpooltaskexecutor();
executor.setcorepoolsize(20);
executor.setmaxpoolsize(100);
executor.setqueuecapacity(1000);
executor.setallowcorethreadtimeout(true);
executor.setkeepaliveseconds(120);
return executor;
}
}10.3 未来发展趋势
- 响应式事务管理:随着响应式编程的普及,响应式事务管理将成为趋势
- 云原生事务:在微服务和云原生架构下,分布式事务管理将更加重要
- ai优化:利用ai技术自动优化事务参数和线程池配置
- 无服务器事务:在serverless架构下的新型事务管理模式
10.4 结语
springboot中使用线程池控制主线程和子线程的事务是一个复杂但重要的话题。通过合理的架构设计、正确的事务策略选择、完善的异常处理机制和全面的监控体系,我们可以构建出既高效又可靠的多线程事务处理系统。
到此这篇关于springboot中使用线程池控制主线程与子线程事务的全过程的文章就介绍到这了,更多相关springboot线程池控制主线程与子线程事务内容请搜索代码网以前的文章或继续浏览下面的相关文章希望大家以后多多支持代码网!
发表评论