java forkjoinpool线程池的使用之并行计算数组求和
package com.zhangxueliang.juc; import java.io.ioexception; import java.util.arrays; import java.util.random; import java.util.concurrent.forkjoinpool; import java.util.concurrent.recursiveaction; import java.util.concurrent.recursivetask; public class forkjoinpooldemo { static int[] nums = new int[1000000]; static final int max_num = 50000; static random r = new random(); static { for(int i=0; i<nums.length; i++) { nums[i] = r.nextint(100); } system.out.println("---" + arrays.stream(nums).sum()); //stream api } static class addtask extends recursiveaction { int start, end; addtask(int s, int e) { start = s; end = e; } @override protected void compute() { if(end-start <= max_num) { long sum = 0l; for(int i=start; i<end; i++) sum += nums[i]; system.out.println("from:" + start + " to:" + end + " = " + sum); } else { int middle = start + (end-start)/2; addtask subtask1 = new addtask(start, middle); addtask subtask2 = new addtask(middle, end); subtask1.fork(); subtask2.fork(); } } } static class addtaskret extends recursivetask<long> { private static final long serialversionuid = 1l; int start, end; addtaskret(int s, int e) { start = s; end = e; } @override protected long compute() { if(end-start <= max_num) { long sum = 0l; for(int i=start; i<end; i++) sum += nums[i]; return sum; } int middle = start + (end-start)/2; addtaskret subtask1 = new addtaskret(start, middle); addtaskret subtask2 = new addtaskret(middle, end); subtask1.fork(); subtask2.fork(); return subtask1.join() + subtask2.join(); } } public static void main(string[] args) throws ioexception { /*forkjoinpool fjp = new forkjoinpool(); addtask task = new addtask(0, nums.length); fjp.execute(task);*/ forkjoinpooldemo temp = new forkjoinpooldemo(); forkjoinpool fjp = new forkjoinpool(); addtaskret task = new addtaskret(0, nums.length); fjp.execute(task); long result = task.join(); system.out.println(result); //system.in.read(); } }
forkjoinpool 示例代码解析
这段代码演示了 java 中 forkjoinpool
框架的使用,展示了两种不同的任务分割方式:
recursiveaction
(无返回值)recursivetask
(有返回值)
代码结构分析
1. 初始化部分
static int[] nums = new int[1000000]; // 创建包含100万个元素的数组 static final int max_num = 50000; // 任务分割的阈值 static random r = new random(); // 随机数生成器 // 静态初始化块:填充数组并计算总和 static { for(int i=0; i<nums.length; i++) { nums[i] = r.nextint(100); // 每个元素赋值为0-99的随机数 } system.out.println("---" + arrays.stream(nums).sum()); // 使用stream api计算总和作为验证基准 }
2. recursiveaction 实现(无返回值)
static class addtask extends recursiveaction { int start, end; addtask(int s, int e) { start = s; end = e; } @override protected void compute() { if(end-start <= max_num) { // 如果任务足够小,直接计算 long sum = 0l; for(int i=start; i<end; i++) sum += nums[i]; system.out.println("from:" + start + " to:" + end + " = " + sum); } else { // 否则分割任务 int middle = start + (end-start)/2; addtask subtask1 = new addtask(start, middle); addtask subtask2 = new addtask(middle, end); subtask1.fork(); // 异步执行子任务 subtask2.fork(); } } }
3. recursivetask 实现(有返回值)
static class addtaskret extends recursivetask<long> { int start, end; addtaskret(int s, int e) { start = s; end = e; } @override protected long compute() { if(end-start <= max_num) { // 如果任务足够小,直接计算并返回结果 long sum = 0l; for(int i=start; i<end; i++) sum += nums[i]; return sum; } // 分割任务 int middle = start + (end-start)/2; addtaskret subtask1 = new addtaskret(start, middle); addtaskret subtask2 = new addtaskret(middle, end); subtask1.fork(); // 异步执行子任务 subtask2.fork(); return subtask1.join() + subtask2.join(); // 合并子任务结果 } }
4. 主方法
public static void main(string[] args) throws ioexception { // 创建forkjoinpool实例 forkjoinpool fjp = new forkjoinpool(); // 创建有返回值的任务 addtaskret task = new addtaskret(0, nums.length); // 执行任务 fjp.execute(task); // 获取并打印结果 long result = task.join(); system.out.println(result); }
关键概念解释
forkjoinpool:
- java 7引入的线程池实现
- 使用工作窃取(work-stealing)算法提高并行效率
- 特别适合分治(divide-and-conquer)算法
recursiveaction:
- 用于不返回结果的任务
- 需要实现
compute()
方法 - 示例中的
addtask
只打印结果不返回
recursivetask:
- 用于需要返回结果的任务
- 需要实现
compute()
方法并返回指定类型 - 示例中的
addtaskret
返回子数组的和
fork()和join():
fork()
: 异步安排任务执行join()
: 等待任务完成并获取结果
执行流程
- 初始化一个包含100万个随机数的数组
- 使用stream api计算总和作为基准
- 创建forkjoinpool
- 创建addtaskret任务,范围是整个数组
- 任务会根据max_num阈值(50000)不断分割,直到足够小
- 小任务直接计算子数组和
- 合并所有子任务的结果得到最终总和
- 打印结果(应与stream api计算的结果一致)
使用建议
- 对于计算密集型任务,forkjoinpool通常比传统线程池更高效
- 任务分割的阈值需要合理设置,太小会导致过多任务创建开销,太大会降低并行度
- 有返回结果需求时使用recursivetask,否则使用recursiveaction
- 注意join()是阻塞调用,会等待任务完成
这段代码很好地展示了forkjoin框架的分治思想和使用方法,是并行计算数组求和的经典示例。
总结
以上为个人经验,希望能给大家一个参考,也希望大家多多支持代码网。
发表评论