java forkjoinpool线程池的使用之并行计算数组求和
package com.zhangxueliang.juc;
import java.io.ioexception;
import java.util.arrays;
import java.util.random;
import java.util.concurrent.forkjoinpool;
import java.util.concurrent.recursiveaction;
import java.util.concurrent.recursivetask;
public class forkjoinpooldemo {
static int[] nums = new int[1000000];
static final int max_num = 50000;
static random r = new random();
static {
for(int i=0; i<nums.length; i++) {
nums[i] = r.nextint(100);
}
system.out.println("---" + arrays.stream(nums).sum()); //stream api
}
static class addtask extends recursiveaction {
int start, end;
addtask(int s, int e) {
start = s;
end = e;
}
@override
protected void compute() {
if(end-start <= max_num) {
long sum = 0l;
for(int i=start; i<end; i++) sum += nums[i];
system.out.println("from:" + start + " to:" + end + " = " + sum);
} else {
int middle = start + (end-start)/2;
addtask subtask1 = new addtask(start, middle);
addtask subtask2 = new addtask(middle, end);
subtask1.fork();
subtask2.fork();
}
}
}
static class addtaskret extends recursivetask<long> {
private static final long serialversionuid = 1l;
int start, end;
addtaskret(int s, int e) {
start = s;
end = e;
}
@override
protected long compute() {
if(end-start <= max_num) {
long sum = 0l;
for(int i=start; i<end; i++) sum += nums[i];
return sum;
}
int middle = start + (end-start)/2;
addtaskret subtask1 = new addtaskret(start, middle);
addtaskret subtask2 = new addtaskret(middle, end);
subtask1.fork();
subtask2.fork();
return subtask1.join() + subtask2.join();
}
}
public static void main(string[] args) throws ioexception {
/*forkjoinpool fjp = new forkjoinpool();
addtask task = new addtask(0, nums.length);
fjp.execute(task);*/
forkjoinpooldemo temp = new forkjoinpooldemo();
forkjoinpool fjp = new forkjoinpool();
addtaskret task = new addtaskret(0, nums.length);
fjp.execute(task);
long result = task.join();
system.out.println(result);
//system.in.read();
}
}
forkjoinpool 示例代码解析
这段代码演示了 java 中 forkjoinpool 框架的使用,展示了两种不同的任务分割方式:
recursiveaction(无返回值)recursivetask(有返回值)
代码结构分析
1. 初始化部分
static int[] nums = new int[1000000]; // 创建包含100万个元素的数组
static final int max_num = 50000; // 任务分割的阈值
static random r = new random(); // 随机数生成器
// 静态初始化块:填充数组并计算总和
static {
for(int i=0; i<nums.length; i++) {
nums[i] = r.nextint(100); // 每个元素赋值为0-99的随机数
}
system.out.println("---" + arrays.stream(nums).sum()); // 使用stream api计算总和作为验证基准
}2. recursiveaction 实现(无返回值)
static class addtask extends recursiveaction {
int start, end;
addtask(int s, int e) {
start = s;
end = e;
}
@override
protected void compute() {
if(end-start <= max_num) { // 如果任务足够小,直接计算
long sum = 0l;
for(int i=start; i<end; i++) sum += nums[i];
system.out.println("from:" + start + " to:" + end + " = " + sum);
} else { // 否则分割任务
int middle = start + (end-start)/2;
addtask subtask1 = new addtask(start, middle);
addtask subtask2 = new addtask(middle, end);
subtask1.fork(); // 异步执行子任务
subtask2.fork();
}
}
}3. recursivetask 实现(有返回值)
static class addtaskret extends recursivetask<long> {
int start, end;
addtaskret(int s, int e) {
start = s;
end = e;
}
@override
protected long compute() {
if(end-start <= max_num) { // 如果任务足够小,直接计算并返回结果
long sum = 0l;
for(int i=start; i<end; i++) sum += nums[i];
return sum;
}
// 分割任务
int middle = start + (end-start)/2;
addtaskret subtask1 = new addtaskret(start, middle);
addtaskret subtask2 = new addtaskret(middle, end);
subtask1.fork(); // 异步执行子任务
subtask2.fork();
return subtask1.join() + subtask2.join(); // 合并子任务结果
}
}4. 主方法
public static void main(string[] args) throws ioexception {
// 创建forkjoinpool实例
forkjoinpool fjp = new forkjoinpool();
// 创建有返回值的任务
addtaskret task = new addtaskret(0, nums.length);
// 执行任务
fjp.execute(task);
// 获取并打印结果
long result = task.join();
system.out.println(result);
}关键概念解释
forkjoinpool:
- java 7引入的线程池实现
- 使用工作窃取(work-stealing)算法提高并行效率
- 特别适合分治(divide-and-conquer)算法
recursiveaction:
- 用于不返回结果的任务
- 需要实现
compute()方法 - 示例中的
addtask只打印结果不返回
recursivetask:
- 用于需要返回结果的任务
- 需要实现
compute()方法并返回指定类型 - 示例中的
addtaskret返回子数组的和
fork()和join():
fork(): 异步安排任务执行join(): 等待任务完成并获取结果
执行流程
- 初始化一个包含100万个随机数的数组
- 使用stream api计算总和作为基准
- 创建forkjoinpool
- 创建addtaskret任务,范围是整个数组
- 任务会根据max_num阈值(50000)不断分割,直到足够小
- 小任务直接计算子数组和
- 合并所有子任务的结果得到最终总和
- 打印结果(应与stream api计算的结果一致)
使用建议
- 对于计算密集型任务,forkjoinpool通常比传统线程池更高效
- 任务分割的阈值需要合理设置,太小会导致过多任务创建开销,太大会降低并行度
- 有返回结果需求时使用recursivetask,否则使用recursiveaction
- 注意join()是阻塞调用,会等待任务完成
这段代码很好地展示了forkjoin框架的分治思想和使用方法,是并行计算数组求和的经典示例。
总结
以上为个人经验,希望能给大家一个参考,也希望大家多多支持代码网。
发表评论