当前位置: 代码网 > it编程>编程语言>Java > Java并发编程实战之从线程池到CompletableFuture详解

Java并发编程实战之从线程池到CompletableFuture详解

2026年04月18日 Java 我要评论
一、线程池最佳实践1.1 自定义线程池@configurationpublic class threadpoolconfig { @bean("taskexecutor") public

一、线程池最佳实践

1.1 自定义线程池

@configuration
public class threadpoolconfig {

    @bean("taskexecutor")
    public executor taskexecutor() {
        threadpooltaskexecutor executor = new threadpooltaskexecutor();
        
        // 核心线程数
        executor.setcorepoolsize(10);
        
        // 最大线程数
        executor.setmaxpoolsize(50);
        
        // 队列容量
        executor.setqueuecapacity(200);
        
        // 线程活跃时间
        executor.setkeepaliveseconds(60);
        
        // 线程名前缀
        executor.setthreadnameprefix("task-");
        
        // 拒绝策略
        executor.setrejectedexecutionhandler(new threadpoolexecutor.callerrunspolicy());
        
        // 优雅关闭
        executor.setwaitfortaskstocompleteonshutdown(true);
        executor.setawaitterminationseconds(60);
        
        executor.initialize();
        return executor;
    }

    @bean("ioexecutor")
    public executor ioexecutor() {
        // io 密集型任务使用更多线程
        return executors.newfixedthreadpool(
            runtime.getruntime().availableprocessors() * 2,
            new threadfactorybuilder().setnameformat("io-pool-%d").build()
        );
    }

    @bean("cpuexecutor")
    public executor cpuexecutor() {
        // cpu 密集型任务使用核心数线程
        return executors.newfixedthreadpool(
            runtime.getruntime().availableprocessors(),
            new threadfactorybuilder().setnameformat("cpu-pool-%d").build()
        );
    }
}

1.2 线程池监控

@component
public class threadpoolmonitor {

    @autowired
    private threadpooltaskexecutor taskexecutor;

    @scheduled(fixedrate = 60000)
    public void monitor() {
        threadpoolexecutor executor = taskexecutor.getthreadpoolexecutor();
        
        log.info("thread pool status - " +
            "active: {}, " +
            "pool size: {}, " +
            "core pool size: {}, " +
            "max pool size: {}, " +
            "task count: {}, " +
            "completed task count: {}, " +
            "queue size: {}",
            executor.getactivecount(),
            executor.getpoolsize(),
            executor.getcorepoolsize(),
            executor.getmaximumpoolsize(),
            executor.gettaskcount(),
            executor.getcompletedtaskcount(),
            executor.getqueue().size()
        );
    }
}

二、completablefuture 异步编程

2.1 基础用法

@service
public class asyncorderservice {

    @autowired
    private inventoryservice inventoryservice;

    @autowired
    private paymentservice paymentservice;

    @autowired
    private notificationservice notificationservice;

    // 异步创建订单
    public completablefuture<order> createorderasync(orderrequest request) {
        return completablefuture.supplyasync(() -> validaterequest(request))
            .thenapplyasync(this::reserveinventory)
            .thenapplyasync(this::processpayment)
            .thenapplyasync(this::saveorder)
            .whencomplete((order, ex) -> {
                if (ex != null) {
                    log.error("order creation failed", ex);
                    // 补偿操作
                    compensate(order);
                } else {
                    notificationservice.sendorderconfirmation(order);
                }
            });
    }

    // 并行处理
    public completablefuture<orderdetails> getorderdetailsasync(string orderid) {
        completablefuture<order> orderfuture = completablefuture
            .supplyasync(() -> orderrepository.findbyid(orderid).orelsethrow());
        
        completablefuture<list<orderitem>> itemsfuture = completablefuture
            .supplyasync(() -> orderitemrepository.findbyorderid(orderid));
        
        completablefuture<shippinginfo> shippingfuture = completablefuture
            .supplyasync(() -> shippingservice.getshippinginfo(orderid));
        
        // 等待所有完成
        return completablefuture.allof(orderfuture, itemsfuture, shippingfuture)
            .thenapply(v -> {
                orderdetails details = new orderdetails();
                details.setorder(orderfuture.join());
                details.setitems(itemsfuture.join());
                details.setshipping(shippingfuture.join());
                return details;
            });
    }

    // 带超时的异步操作
    public completablefuture<order> createorderwithtimeout(orderrequest request) {
        return createorderasync(request)
            .ortimeout(30, timeunit.seconds)
            .exceptionally(ex -> {
                log.error("order creation timeout", ex);
                throw new ordertimeoutexception("order creation timed out");
            });
    }
}

2.2 异常处理

@service
public class asyncexceptionhandlingservice {

    public completablefuture<result> processwithfallback(string input) {
        return completablefuture
            .supplyasync(() -> riskyoperation(input))
            .exceptionally(ex -> {
                log.error("primary operation failed", ex);
                return fallbackoperation(input);
            })
            .thenapply(this::transformresult);
    }

    // 组合异常处理
    public completablefuture<result> processwithmultiplefallbacks(string input) {
        return completablefuture
            .supplyasync(() -> primaryoperation(input))
            .handle((result, ex) -> {
                if (ex != null) {
                    log.warn("primary failed, trying fallback 1", ex);
                    return fallbackoperation1(input);
                }
                return result;
            })
            .thencompose(result -> {
                if (result == null) {
                    return completablefuture.supplyasync(() -> fallbackoperation2(input));
                }
                return completablefuture.completedfuture(result);
            });
    }
}

三、并发集合与工具类

3.1 concurrenthashmap 使用

@service
public class cacheservice {

    private final concurrenthashmap<string, cacheentry> cache = new concurrenthashmap<>();

    public object get(string key, supplier<object> loader) {
        cacheentry entry = cache.get(key);
        
        if (entry != null && !entry.isexpired()) {
            return entry.getvalue();
        }
        
        // 使用 computeifabsent 保证原子性
        return cache.computeifabsent(key, k -> {
            object value = loader.get();
            return new cacheentry(value, system.currenttimemillis() + 60000);
        }).getvalue();
    }

    public void put(string key, object value, long ttlmillis) {
        cache.put(key, new cacheentry(value, system.currenttimemillis() + ttlmillis));
    }

    // 批量操作
    public map<string, object> getall(set<string> keys) {
        return cache.entryset().stream()
            .filter(e -> keys.contains(e.getkey()) && !e.getvalue().isexpired())
            .collect(collectors.tomap(
                map.entry::getkey,
                e -> e.getvalue().getvalue()
            ));
    }

    // 定期清理
    @scheduled(fixedrate = 60000)
    public void cleanup() {
        cache.entryset().removeif(e -> e.getvalue().isexpired());
    }
}

3.2 其他并发工具

@component
public class concurrenttoolsdemo {

    // countdownlatch - 等待多个任务完成
    public void processwithlatch(list<task> tasks) throws interruptedexception {
        countdownlatch latch = new countdownlatch(tasks.size());
        
        for (task task : tasks) {
            completablefuture.runasync(() -> {
                try {
                    task.execute();
                } finally {
                    latch.countdown();
                }
            });
        }
        
        latch.await(5, timeunit.minutes);
    }

    // cyclicbarrier - 多阶段任务同步
    public void multiphaseprocessing(list<worker> workers) throws exception {
        cyclicbarrier barrier = new cyclicbarrier(workers.size());
        
        for (worker worker : workers) {
            new thread(() -> {
                try {
                    // 阶段 1
                    worker.phase1();
                    barrier.await();
                    
                    // 阶段 2
                    worker.phase2();
                    barrier.await();
                    
                    // 阶段 3
                    worker.phase3();
                } catch (exception e) {
                    log.error("worker failed", e);
                }
            }).start();
        }
    }

    // semaphore - 限流
    public void limitedconcurrentoperation(list<task> tasks, int maxconcurrent) {
        semaphore semaphore = new semaphore(maxconcurrent);
        
        for (task task : tasks) {
            completablefuture.runasync(() -> {
                try {
                    semaphore.acquire();
                    task.execute();
                } catch (interruptedexception e) {
                    thread.currentthread().interrupt();
                } finally {
                    semaphore.release();
                }
            });
        }
    }

    // exchanger - 线程间数据交换
    public void exchangedata() {
        exchanger<data> exchanger = new exchanger<>();
        
        // 生产者
        new thread(() -> {
            try {
                data data = producedata();
                data processed = exchanger.exchange(data);
                // 使用处理后的数据
            } catch (interruptedexception e) {
                thread.currentthread().interrupt();
            }
        }).start();
        
        // 消费者
        new thread(() -> {
            try {
                data data = exchanger.exchange(null);
                data processed = processdata(data);
                exchanger.exchange(processed);
            } catch (interruptedexception e) {
                thread.currentthread().interrupt();
            }
        }).start();
    }
}

四、原子类与 cas

@component
public class atomicoperationsdemo {

    // 原子计数器
    private final atomiclong counter = new atomiclong(0);

    // 原子累加器(高并发下性能更好)
    private final longadder longadder = new longadder();

    // 原子引用
    private final atomicreference<config> configref = new atomicreference<>(new config());

    public long incrementandget() {
        return counter.incrementandget();
    }

    public void add(long value) {
        longadder.add(value);
    }

    public long getsum() {
        return longadder.sum();
    }

    // cas 更新配置
    public boolean updateconfig(config newconfig) {
        config current;
        do {
            current = configref.get();
            if (!shouldupdate(current, newconfig)) {
                return false;
            }
        } while (!configref.compareandset(current, newconfig));
        return true;
    }

    // longaccumulator - 自定义累加逻辑
    private final longaccumulator maxaccumulator = new longaccumulator(long::max, 0);

    public void updatemax(long value) {
        maxaccumulator.accumulate(value);
    }

    public long getmax() {
        return maxaccumulator.get();
    }
}

五、fork/join 框架

@component
public class forkjoindemo {

    // 递归任务示例
    public long parallelsum(long[] array) {
        forkjoinpool pool = new forkjoinpool();
        try {
            return pool.invoke(new sumtask(array, 0, array.length));
        } finally {
            pool.shutdown();
        }
    }

    // 递归任务
    private static class sumtask extends recursivetask<long> {
        private static final int threshold = 10000;
        
        private final long[] array;
        private final int start;
        private final int end;

        sumtask(long[] array, int start, int end) {
            this.array = array;
            this.start = start;
            this.end = end;
        }

        @override
        protected long compute() {
            if (end - start <= threshold) {
                // 直接计算
                long sum = 0;
                for (int i = start; i < end; i++) {
                    sum += array[i];
                }
                return sum;
            }

            // 拆分任务
            int mid = (start + end) / 2;
            sumtask left = new sumtask(array, start, mid);
            sumtask right = new sumtask(array, mid, end);
            
            left.fork();
            long rightresult = right.compute();
            long leftresult = left.join();
            
            return leftresult + rightresult;
        }
    }

    // 递归动作示例
    public void parallelprocess(list<task> tasks) {
        forkjoinpool.commonpool().invoke(new processtask(tasks, 0, tasks.size()));
    }

    private static class processtask extends recursiveaction {
        private static final int threshold = 100;
        
        private final list<task> tasks;
        private final int start;
        private final int end;

        processtask(list<task> tasks, int start, int end) {
            this.tasks = tasks;
            this.start = start;
            this.end = end;
        }

        @override
        protected void compute() {
            if (end - start <= threshold) {
                for (int i = start; i < end; i++) {
                    tasks.get(i).execute();
                }
                return;
            }

            int mid = (start + end) / 2;
            processtask left = new processtask(tasks, start, mid);
            processtask right = new processtask(tasks, mid, end);
            
            invokeall(left, right);
        }
    }
}

六、总结

java 并发编程的关键点:

  1. 线程池:合理配置核心参数,做好监控
  2. 异步编程:completablefuture 让异步代码更优雅
  3. 并发集合:选择合适的线程安全集合
  4. 原子操作:cas 实现无锁编程
  5. 任务分解:fork/join 处理大数据量任务

这其实可以更优雅一点。并发编程的核心是理解 happens-before 规则和内存模型。

参考资源:

以上为个人经验,希望能给大家一个参考,也希望大家多多支持代码网。

(0)

相关文章:

版权声明:本文内容由互联网用户贡献,该文观点仅代表作者本人。本站仅提供信息存储服务,不拥有所有权,不承担相关法律责任。 如发现本站有涉嫌抄袭侵权/违法违规的内容, 请发送邮件至 2386932994@qq.com 举报,一经查实将立刻删除。

发表评论

验证码:
Copyright © 2017-2026  代码网 保留所有权利. 粤ICP备2024248653号
站长QQ:2386932994 | 联系邮箱:2386932994@qq.com