1. 并行流简介
java 8 引入了 stream
api,提供了一种高效的数据处理方式。而 并行流(parallel stream) 则是 stream
的并行版本,能够将流操作分配到多个线程中执行,充分利用多核 cpu 的性能。
特点:
- 默认使用
forkjoinpool.commonpool()
执行任务。 - 适合处理 计算密集型 任务。
- 任务执行顺序不确定。
2. 并行流的简单使用
将普通流转换为并行流非常简单,只需调用 parallel()
方法即可。
示例:并行流的基本使用
import java.util.arrays; import java.util.list; public class parallelstreamexample { public static void main(string[] args) { list<integer> numbers = arrays.aslist(1, 2, 3, 4, 5, 6, 7, 8, 9, 10); // 将流转换为并行流 numbers.parallelstream() .foreach(num -> system.out.println("线程: " + thread.currentthread().getname() + ", 处理: " + num)); } }
输出示例:
线程: main, 处理: 6
线程: forkjoinpool.commonpool-worker-1, 处理: 3
线程: forkjoinpool.commonpool-worker-2, 处理: 8
...
3. 配合自定义线程池
默认情况下,并行流使用 forkjoinpool.commonpool()
执行任务。你可以通过自定义线程池来控制并行流的执行环境。
示例:自定义线程池
import java.util.list; import java.util.concurrent.forkjoinpool; import java.util.stream.collectors; import java.util.stream.intstream; public class parallelstreamcustompool { public static void main(string[] args) { // 创建自定义线程池 forkjoinpool custompool = new forkjoinpool(4); // 在自定义线程池中执行并行流任务 custompool.submit(() -> { list<integer> result = intstream.rangeclosed(1, 10) .parallel() .map(i -> { system.out.println("线程: " + thread.currentthread().getname() + ", 处理: " + i); return i * 2; }) .boxed() .collect(collectors.tolist()); system.out.println("结果: " + result); }).join(); // 等待任务完成 custompool.shutdown(); // 关闭线程池 } }
示例:配合completablefuture实现异步
import java.util.list; import java.util.concurrent.completablefuture; import java.util.stream.collectors; import java.util.stream.intstream; public class parallelstreamwithcompletablefuture { public static void main(string[] args) { // 创建一个并行流 list<completablefuture<integer>> futures = intstream.rangeclosed(1, 10) .parallel() .maptoobj(i -> completablefuture.supplyasync(() -> { system.out.println("线程: " + thread.currentthread().getname() + ", 处理: " + i); return i * 2; // 模拟计算任务 })) .collect(collectors.tolist()); // 等待所有任务完成并获取结果 list<integer> results = futures.stream() .map(completablefuture::join) .collect(collectors.tolist()); system.out.println("结果: " + results); } }
好处:
- 并行流:适合处理数据流中的计算密集型任务,能够自动将任务分配到多个线程中执行。
- completablefuture:提供强大的异步编程能力,可以处理任务的依赖关系、异常处理、结果合并等。
结合两者的优势,可以实现:
- 异步并行处理:将并行流的任务异步化,进一步提升性能。
- 任务依赖管理:通过
completablefuture
管理任务之间的依赖关系。 - 结果合并:将多个任务的结果合并处理。
4. 控制有序性
并行流的任务执行顺序是不确定的。如果需要保持顺序,可以使用 foreachordered()
方法。
示例:保持顺序
import java.util.arrays; import java.util.list; public class parallelstreamorder { public static void main(string[] args) { list<integer> numbers = arrays.aslist(1, 2, 3, 4, 5, 6, 7, 8, 9, 10); numbers.parallelstream() .foreachordered(system.out::println); // 输出顺序与流中元素顺序一致 } }
5. 共享资源的安全性
并行流在多个线程中执行操作,如果操作共享可变状态,可能会导致线程安全问题。
示例:线程安全问题
import java.util.arraylist; import java.util.list; public class parallelstreamthreadsafety { public static void main(string[] args) { list<integer> result = new arraylist<>(); intstream.rangeclosed(1, 1000) .parallel() .foreach(result::add); // 这里会出现线程安全问题 system.out.println("结果大小: " + result.size()); // 结果可能小于 1000 } }
解决方法:
- 使用线程安全的集合,如
collections.synchronizedlist()
。 - 使用
collect()
方法将结果收集到线程安全的容器中。
示例:线程安全的解决方案
import java.util.list; import java.util.stream.collectors; import java.util.stream.intstream; public class parallelstreamthreadsafety { public static void main(string[] args) { list<integer> result = intstream.rangeclosed(1, 1000) .parallel() .boxed() .collect(collectors.tolist()); // 使用 collect() 方法 system.out.println("结果大小: " + result.size()); // 输出: 1000 } }
6. 注意事项
- 任务类型:
- 适合 计算密集型 任务,不适合 i/o 密集型 任务。
- 线程安全:
- 避免在并行流中操作共享可变状态。
- 任务顺序:
- 并行流的任务执行顺序不确定,使用
foreachordered()
保持顺序。
- 并行流的任务执行顺序不确定,使用
- 线程池管理:
- 使用自定义线程池时,记得关闭线程池,避免资源泄漏。
7. 总结
并行流是 java 8 提供的一个强大工具,能够显著提升数据处理性能。但在使用时需要注意线程安全、任务顺序和线程池管理等问题。通过合理使用并行流,可以编写高效、灵活的代码。
附录:完整代码
import java.util.arrays; import java.util.list; import java.util.concurrent.forkjoinpool; import java.util.stream.collectors; import java.util.stream.intstream; public class parallelstreamdemo { public static void main(string[] args) { // 基本使用 list<integer> numbers = arrays.aslist(1, 2, 3, 4, 5, 6, 7, 8, 9, 10); numbers.parallelstream() .foreach(num -> system.out.println("线程: " + thread.currentthread().getname() + ", 处理: " + num)); // 自定义线程池 forkjoinpool custompool = new forkjoinpool(4); custompool.submit(() -> { list<integer> result = intstream.rangeclosed(1, 10) .parallel() .map(i -> { system.out.println("线程: " + thread.currentthread().getname() + ", 处理: " + i); return i * 2; }) .boxed() .collect(collectors.tolist()); system.out.println("结果: " + result); }).join(); custompool.shutdown(); // 保持顺序 numbers.parallelstream() .foreachordered(system.out::println); // 线程安全 list<integer> saferesult = intstream.rangeclosed(1, 1000) .parallel() .boxed() .collect(collectors.tolist()); system.out.println("结果大小: " + saferesult.size()); } }
希望这篇文章能帮助你更好地理解和使用 java 的并行流!如果有任何问题,欢迎在评论区讨论!
到此这篇关于java stream 并行流简介、使用与注意事项小结的文章就介绍到这了,更多相关java stream 并行流内容请搜索代码网以前的文章或继续浏览下面的相关文章希望大家以后多多支持代码网!
发表评论