1、概述
Java 8引入了 流(Stream) 处理的概念,这是对数据执行批量操作的有效方法。在支持并发的环境中可以获得并行流。
这些流可以提高性能,但要付出多线程开销。
在本快速教程中,我们将研究 Stream API 的最大局限之一,并了解如何使并行流与自定义ThreadPool
实例一起使用–或者,有一个库可以处理此问题。
2、并行流
我们从一个简单的示例开始,对任何 Collection 类型调用parallelStream
方法,将返回可能并行的 Stream:
@Test
public void givenListWhenCallingParallelStreamShouldBeParallelStream(){
List<Long> aList = new ArrayList<>();
Stream<Long> parallelStream = aList.parallelStream();
assertTrue(parallelStream.isParallel());
}
在此类 Stream 中发生的默认处理使用ForkJoinPool.commonPool()
, 这是整个应用程序共享的线程池。
3、自定义线程池
实际上,在处理流时,我们可以使用自定义ThreadPool
。
下面的示例让并行 Stream 使用自定义线程池来计算 1 到 100,0000(含)之间的长整型值之和:
@Test
public void giveRangeOfLongsWhenSummedInParallelShouldBeEqualToExpectedTotal()
throws InterruptedException, ExecutionException {
long firstNum = 1;
long lastNum = 100_0000;
List<Long> aList = LongStream.rangeClosed(firstNum, lastNum).boxed()
.collect(Collectors.toList());
ForkJoinPool customThreadPool = new ForkJoinPool(4);
long actualTotal = customThreadPool.submit(
() -> aList.parallelStream().reduce(0L, Long::sum)).get();
assertEquals((lastNum + firstNum) * lastNum / 2, actualTotal);
}
我们使用了并行度为 4 的ForkJoinPool
构造函数。需要进行一些实验才能确定不同环境的最佳值,但是一个很好的经验法则就是根据CPU的核心数量来选择数字。
接下来,我们处理了并行 Stream 的内容,并在reduce
调用中将它们汇总。
这个简单的示例可能无法展示使用自定义线程池的全部用处,但是当不希望将通用线程池与耗时的任务捆绑在一起(例如,处理来自网络源的数据),或者应用程序中的其他组件正在使用公共线程池的情况下,使用自定义线程池的好处是显而易见的。
另外,除了在代码中自定义ForkJoinPool
外,还可以通过Java系统参数java.util.concurrent.ForkJoinPool.common.parallelism
来修改公共线程池的并行度。例如-Djava.util.concurrent.ForkJoinPool.common.parallelism=8
4、结论
我们简要介绍了如何使用自定义线程池运行并行 Stream。在正确的环境中,并通过适当使用并行度级别,可以在某些情况下提高性能。