一、线程池最佳实践
1.1 自定义线程池
@configuration
public class threadpoolconfig {
@bean("taskexecutor")
public executor taskexecutor() {
threadpooltaskexecutor executor = new threadpooltaskexecutor();
// 核心线程数
executor.setcorepoolsize(10);
// 最大线程数
executor.setmaxpoolsize(50);
// 队列容量
executor.setqueuecapacity(200);
// 线程活跃时间
executor.setkeepaliveseconds(60);
// 线程名前缀
executor.setthreadnameprefix("task-");
// 拒绝策略
executor.setrejectedexecutionhandler(new threadpoolexecutor.callerrunspolicy());
// 优雅关闭
executor.setwaitfortaskstocompleteonshutdown(true);
executor.setawaitterminationseconds(60);
executor.initialize();
return executor;
}
@bean("ioexecutor")
public executor ioexecutor() {
// io 密集型任务使用更多线程
return executors.newfixedthreadpool(
runtime.getruntime().availableprocessors() * 2,
new threadfactorybuilder().setnameformat("io-pool-%d").build()
);
}
@bean("cpuexecutor")
public executor cpuexecutor() {
// cpu 密集型任务使用核心数线程
return executors.newfixedthreadpool(
runtime.getruntime().availableprocessors(),
new threadfactorybuilder().setnameformat("cpu-pool-%d").build()
);
}
}
1.2 线程池监控
@component
public class threadpoolmonitor {
@autowired
private threadpooltaskexecutor taskexecutor;
@scheduled(fixedrate = 60000)
public void monitor() {
threadpoolexecutor executor = taskexecutor.getthreadpoolexecutor();
log.info("thread pool status - " +
"active: {}, " +
"pool size: {}, " +
"core pool size: {}, " +
"max pool size: {}, " +
"task count: {}, " +
"completed task count: {}, " +
"queue size: {}",
executor.getactivecount(),
executor.getpoolsize(),
executor.getcorepoolsize(),
executor.getmaximumpoolsize(),
executor.gettaskcount(),
executor.getcompletedtaskcount(),
executor.getqueue().size()
);
}
}
二、completablefuture 异步编程
2.1 基础用法
@service
public class asyncorderservice {
@autowired
private inventoryservice inventoryservice;
@autowired
private paymentservice paymentservice;
@autowired
private notificationservice notificationservice;
// 异步创建订单
public completablefuture<order> createorderasync(orderrequest request) {
return completablefuture.supplyasync(() -> validaterequest(request))
.thenapplyasync(this::reserveinventory)
.thenapplyasync(this::processpayment)
.thenapplyasync(this::saveorder)
.whencomplete((order, ex) -> {
if (ex != null) {
log.error("order creation failed", ex);
// 补偿操作
compensate(order);
} else {
notificationservice.sendorderconfirmation(order);
}
});
}
// 并行处理
public completablefuture<orderdetails> getorderdetailsasync(string orderid) {
completablefuture<order> orderfuture = completablefuture
.supplyasync(() -> orderrepository.findbyid(orderid).orelsethrow());
completablefuture<list<orderitem>> itemsfuture = completablefuture
.supplyasync(() -> orderitemrepository.findbyorderid(orderid));
completablefuture<shippinginfo> shippingfuture = completablefuture
.supplyasync(() -> shippingservice.getshippinginfo(orderid));
// 等待所有完成
return completablefuture.allof(orderfuture, itemsfuture, shippingfuture)
.thenapply(v -> {
orderdetails details = new orderdetails();
details.setorder(orderfuture.join());
details.setitems(itemsfuture.join());
details.setshipping(shippingfuture.join());
return details;
});
}
// 带超时的异步操作
public completablefuture<order> createorderwithtimeout(orderrequest request) {
return createorderasync(request)
.ortimeout(30, timeunit.seconds)
.exceptionally(ex -> {
log.error("order creation timeout", ex);
throw new ordertimeoutexception("order creation timed out");
});
}
}
2.2 异常处理
@service
public class asyncexceptionhandlingservice {
public completablefuture<result> processwithfallback(string input) {
return completablefuture
.supplyasync(() -> riskyoperation(input))
.exceptionally(ex -> {
log.error("primary operation failed", ex);
return fallbackoperation(input);
})
.thenapply(this::transformresult);
}
// 组合异常处理
public completablefuture<result> processwithmultiplefallbacks(string input) {
return completablefuture
.supplyasync(() -> primaryoperation(input))
.handle((result, ex) -> {
if (ex != null) {
log.warn("primary failed, trying fallback 1", ex);
return fallbackoperation1(input);
}
return result;
})
.thencompose(result -> {
if (result == null) {
return completablefuture.supplyasync(() -> fallbackoperation2(input));
}
return completablefuture.completedfuture(result);
});
}
}
三、并发集合与工具类
3.1 concurrenthashmap 使用
@service
public class cacheservice {
private final concurrenthashmap<string, cacheentry> cache = new concurrenthashmap<>();
public object get(string key, supplier<object> loader) {
cacheentry entry = cache.get(key);
if (entry != null && !entry.isexpired()) {
return entry.getvalue();
}
// 使用 computeifabsent 保证原子性
return cache.computeifabsent(key, k -> {
object value = loader.get();
return new cacheentry(value, system.currenttimemillis() + 60000);
}).getvalue();
}
public void put(string key, object value, long ttlmillis) {
cache.put(key, new cacheentry(value, system.currenttimemillis() + ttlmillis));
}
// 批量操作
public map<string, object> getall(set<string> keys) {
return cache.entryset().stream()
.filter(e -> keys.contains(e.getkey()) && !e.getvalue().isexpired())
.collect(collectors.tomap(
map.entry::getkey,
e -> e.getvalue().getvalue()
));
}
// 定期清理
@scheduled(fixedrate = 60000)
public void cleanup() {
cache.entryset().removeif(e -> e.getvalue().isexpired());
}
}
3.2 其他并发工具
@component
public class concurrenttoolsdemo {
// countdownlatch - 等待多个任务完成
public void processwithlatch(list<task> tasks) throws interruptedexception {
countdownlatch latch = new countdownlatch(tasks.size());
for (task task : tasks) {
completablefuture.runasync(() -> {
try {
task.execute();
} finally {
latch.countdown();
}
});
}
latch.await(5, timeunit.minutes);
}
// cyclicbarrier - 多阶段任务同步
public void multiphaseprocessing(list<worker> workers) throws exception {
cyclicbarrier barrier = new cyclicbarrier(workers.size());
for (worker worker : workers) {
new thread(() -> {
try {
// 阶段 1
worker.phase1();
barrier.await();
// 阶段 2
worker.phase2();
barrier.await();
// 阶段 3
worker.phase3();
} catch (exception e) {
log.error("worker failed", e);
}
}).start();
}
}
// semaphore - 限流
public void limitedconcurrentoperation(list<task> tasks, int maxconcurrent) {
semaphore semaphore = new semaphore(maxconcurrent);
for (task task : tasks) {
completablefuture.runasync(() -> {
try {
semaphore.acquire();
task.execute();
} catch (interruptedexception e) {
thread.currentthread().interrupt();
} finally {
semaphore.release();
}
});
}
}
// exchanger - 线程间数据交换
public void exchangedata() {
exchanger<data> exchanger = new exchanger<>();
// 生产者
new thread(() -> {
try {
data data = producedata();
data processed = exchanger.exchange(data);
// 使用处理后的数据
} catch (interruptedexception e) {
thread.currentthread().interrupt();
}
}).start();
// 消费者
new thread(() -> {
try {
data data = exchanger.exchange(null);
data processed = processdata(data);
exchanger.exchange(processed);
} catch (interruptedexception e) {
thread.currentthread().interrupt();
}
}).start();
}
}
四、原子类与 cas
@component
public class atomicoperationsdemo {
// 原子计数器
private final atomiclong counter = new atomiclong(0);
// 原子累加器(高并发下性能更好)
private final longadder longadder = new longadder();
// 原子引用
private final atomicreference<config> configref = new atomicreference<>(new config());
public long incrementandget() {
return counter.incrementandget();
}
public void add(long value) {
longadder.add(value);
}
public long getsum() {
return longadder.sum();
}
// cas 更新配置
public boolean updateconfig(config newconfig) {
config current;
do {
current = configref.get();
if (!shouldupdate(current, newconfig)) {
return false;
}
} while (!configref.compareandset(current, newconfig));
return true;
}
// longaccumulator - 自定义累加逻辑
private final longaccumulator maxaccumulator = new longaccumulator(long::max, 0);
public void updatemax(long value) {
maxaccumulator.accumulate(value);
}
public long getmax() {
return maxaccumulator.get();
}
}
五、fork/join 框架
@component
public class forkjoindemo {
// 递归任务示例
public long parallelsum(long[] array) {
forkjoinpool pool = new forkjoinpool();
try {
return pool.invoke(new sumtask(array, 0, array.length));
} finally {
pool.shutdown();
}
}
// 递归任务
private static class sumtask extends recursivetask<long> {
private static final int threshold = 10000;
private final long[] array;
private final int start;
private final int end;
sumtask(long[] array, int start, int end) {
this.array = array;
this.start = start;
this.end = end;
}
@override
protected long compute() {
if (end - start <= threshold) {
// 直接计算
long sum = 0;
for (int i = start; i < end; i++) {
sum += array[i];
}
return sum;
}
// 拆分任务
int mid = (start + end) / 2;
sumtask left = new sumtask(array, start, mid);
sumtask right = new sumtask(array, mid, end);
left.fork();
long rightresult = right.compute();
long leftresult = left.join();
return leftresult + rightresult;
}
}
// 递归动作示例
public void parallelprocess(list<task> tasks) {
forkjoinpool.commonpool().invoke(new processtask(tasks, 0, tasks.size()));
}
private static class processtask extends recursiveaction {
private static final int threshold = 100;
private final list<task> tasks;
private final int start;
private final int end;
processtask(list<task> tasks, int start, int end) {
this.tasks = tasks;
this.start = start;
this.end = end;
}
@override
protected void compute() {
if (end - start <= threshold) {
for (int i = start; i < end; i++) {
tasks.get(i).execute();
}
return;
}
int mid = (start + end) / 2;
processtask left = new processtask(tasks, start, mid);
processtask right = new processtask(tasks, mid, end);
invokeall(left, right);
}
}
}
六、总结
java 并发编程的关键点:
- 线程池:合理配置核心参数,做好监控
- 异步编程:completablefuture 让异步代码更优雅
- 并发集合:选择合适的线程安全集合
- 原子操作:cas 实现无锁编程
- 任务分解:fork/join 处理大数据量任务
这其实可以更优雅一点。并发编程的核心是理解 happens-before 规则和内存模型。
参考资源:
以上为个人经验,希望能给大家一个参考,也希望大家多多支持代码网。
发表评论