当前位置: 代码网 > it编程>编程语言>Java > Java ForkJoinPool线程池的使用之并行计算数组求和实例

Java ForkJoinPool线程池的使用之并行计算数组求和实例

2025年05月29日 Java 我要评论
java forkjoinpool线程池的使用之并行计算数组求和package com.zhangxueliang.juc;import java.io.ioexception;import java

java forkjoinpool线程池的使用之并行计算数组求和

package com.zhangxueliang.juc;

import java.io.ioexception;
import java.util.arrays;
import java.util.random;
import java.util.concurrent.forkjoinpool;
import java.util.concurrent.recursiveaction;
import java.util.concurrent.recursivetask;

public class forkjoinpooldemo {
	static int[] nums = new int[1000000];
	static final int max_num = 50000;
	static random r = new random();
	
	static {
		for(int i=0; i<nums.length; i++) {
			nums[i] = r.nextint(100);
		}
		
		system.out.println("---" + arrays.stream(nums).sum()); //stream api
	}
	

	static class addtask extends recursiveaction {

		int start, end;

		addtask(int s, int e) {
			start = s;
			end = e;
		}

		@override
		protected void compute() {

			if(end-start <= max_num) {
				long sum = 0l;
				for(int i=start; i<end; i++) sum += nums[i];
				system.out.println("from:" + start + " to:" + end + " = " + sum);
			} else {

				int middle = start + (end-start)/2;

				addtask subtask1 = new addtask(start, middle);
				addtask subtask2 = new addtask(middle, end);
				subtask1.fork();
				subtask2.fork();
			}


		}

	}

	
	static class addtaskret extends recursivetask<long> {
		
		private static final long serialversionuid = 1l;
		int start, end;
		
		addtaskret(int s, int e) {
			start = s;
			end = e;
		}

		@override
		protected long compute() {
			
			if(end-start <= max_num) {
				long sum = 0l;
				for(int i=start; i<end; i++) sum += nums[i];
				return sum;
			} 
			
			int middle = start + (end-start)/2;
			
			addtaskret subtask1 = new addtaskret(start, middle);
			addtaskret subtask2 = new addtaskret(middle, end);
			subtask1.fork();
			subtask2.fork();
			
			return subtask1.join() + subtask2.join();
		}
		
	}
	
	public static void main(string[] args) throws ioexception {
		/*forkjoinpool fjp = new forkjoinpool();
		addtask task = new addtask(0, nums.length);
		fjp.execute(task);*/

		forkjoinpooldemo temp = new forkjoinpooldemo();

		forkjoinpool fjp = new forkjoinpool();
		addtaskret task = new addtaskret(0, nums.length);
		fjp.execute(task);
		long result = task.join();
		system.out.println(result);
		
		//system.in.read();
		
	}
}

forkjoinpool 示例代码解析

这段代码演示了 java 中 forkjoinpool 框架的使用,展示了两种不同的任务分割方式:

  • recursiveaction(无返回值)
  • recursivetask(有返回值)

代码结构分析

1. 初始化部分

static int[] nums = new int[1000000];  // 创建包含100万个元素的数组
static final int max_num = 50000;     // 任务分割的阈值
static random r = new random();        // 随机数生成器

// 静态初始化块:填充数组并计算总和
static {
    for(int i=0; i<nums.length; i++) {
        nums[i] = r.nextint(100);  // 每个元素赋值为0-99的随机数
    }
    system.out.println("---" + arrays.stream(nums).sum()); // 使用stream api计算总和作为验证基准
}

2. recursiveaction 实现(无返回值)

static class addtask extends recursiveaction {
    int start, end;
    
    addtask(int s, int e) {
        start = s;
        end = e;
    }

    @override
    protected void compute() {
        if(end-start <= max_num) {  // 如果任务足够小,直接计算
            long sum = 0l;
            for(int i=start; i<end; i++) sum += nums[i];
            system.out.println("from:" + start + " to:" + end + " = " + sum);
        } else {  // 否则分割任务
            int middle = start + (end-start)/2;
            addtask subtask1 = new addtask(start, middle);
            addtask subtask2 = new addtask(middle, end);
            subtask1.fork();  // 异步执行子任务
            subtask2.fork();
        }
    }
}

3. recursivetask 实现(有返回值)

static class addtaskret extends recursivetask<long> {
    int start, end;
    
    addtaskret(int s, int e) {
        start = s;
        end = e;
    }

    @override
    protected long compute() {
        if(end-start <= max_num) {  // 如果任务足够小,直接计算并返回结果
            long sum = 0l;
            for(int i=start; i<end; i++) sum += nums[i];
            return sum;
        } 
        
        // 分割任务
        int middle = start + (end-start)/2;
        addtaskret subtask1 = new addtaskret(start, middle);
        addtaskret subtask2 = new addtaskret(middle, end);
        subtask1.fork();  // 异步执行子任务
        subtask2.fork();
        
        return subtask1.join() + subtask2.join();  // 合并子任务结果
    }
}

4. 主方法

public static void main(string[] args) throws ioexception {
    // 创建forkjoinpool实例
    forkjoinpool fjp = new forkjoinpool();
    
    // 创建有返回值的任务
    addtaskret task = new addtaskret(0, nums.length);
    
    // 执行任务
    fjp.execute(task);
    
    // 获取并打印结果
    long result = task.join();
    system.out.println(result);
}

关键概念解释

forkjoinpool:

  • java 7引入的线程池实现
  • 使用工作窃取(work-stealing)算法提高并行效率
  • 特别适合分治(divide-and-conquer)算法

recursiveaction:

  • 用于不返回结果的任务
  • 需要实现compute()方法
  • 示例中的addtask只打印结果不返回

recursivetask:

  • 用于需要返回结果的任务
  • 需要实现compute()方法并返回指定类型
  • 示例中的addtaskret返回子数组的和

fork()和join():

  • fork(): 异步安排任务执行
  • join(): 等待任务完成并获取结果

执行流程

  1. 初始化一个包含100万个随机数的数组
  2. 使用stream api计算总和作为基准
  3. 创建forkjoinpool
  4. 创建addtaskret任务,范围是整个数组
  5. 任务会根据max_num阈值(50000)不断分割,直到足够小
  6. 小任务直接计算子数组和
  7. 合并所有子任务的结果得到最终总和
  8. 打印结果(应与stream api计算的结果一致)

使用建议

  1. 对于计算密集型任务,forkjoinpool通常比传统线程池更高效
  2. 任务分割的阈值需要合理设置,太小会导致过多任务创建开销,太大会降低并行度
  3. 有返回结果需求时使用recursivetask,否则使用recursiveaction
  4. 注意join()是阻塞调用,会等待任务完成

这段代码很好地展示了forkjoin框架的分治思想和使用方法,是并行计算数组求和的经典示例。

总结

以上为个人经验,希望能给大家一个参考,也希望大家多多支持代码网。

(0)

相关文章:

版权声明:本文内容由互联网用户贡献,该文观点仅代表作者本人。本站仅提供信息存储服务,不拥有所有权,不承担相关法律责任。 如发现本站有涉嫌抄袭侵权/违法违规的内容, 请发送邮件至 2386932994@qq.com 举报,一经查实将立刻删除。

发表评论

验证码:
Copyright © 2017-2025  代码网 保留所有权利. 粤ICP备2024248653号
站长QQ:2386932994 | 联系邮箱:2386932994@qq.com