1、概述

Java 8引入了 流(Stream) 处理的概念,这是对数据执行批量操作的有效方法。在支持并发的环境中可以获得并行流。

这些流可以提高性能,但要付出多线程开销。

在本快速教程中,我们将研究 Stream API 的最大局限之一,并了解如何使并行流与自定义ThreadPool实例一起使用–或者,有一个库可以处理此问题。

2、并行流

我们从一个简单的示例开始,对任何 Collection 类型调用parallelStream方法,将返回可能并行的 Stream

@Test
public void givenListWhenCallingParallelStreamShouldBeParallelStream(){
    List<Long> aList = new ArrayList<>();
    Stream<Long> parallelStream = aList.parallelStream();
         
    assertTrue(parallelStream.isParallel());
}

在此类 Stream 中发生的默认处理使用ForkJoinPool.commonPool(), 这是整个应用程序共享的线程池。

3、自定义线程池

实际上,在处理流时,我们可以使用自定义ThreadPool

下面的示例让并行 Stream 使用自定义线程池来计算 1100,0000(含)之间的长整型值之和:

@Test
public void giveRangeOfLongsWhenSummedInParallelShouldBeEqualToExpectedTotal() 
  throws InterruptedException, ExecutionException {
     
    long firstNum = 1;
    long lastNum = 100_0000;
 
    List<Long> aList = LongStream.rangeClosed(firstNum, lastNum).boxed()
      .collect(Collectors.toList());
 
    ForkJoinPool customThreadPool = new ForkJoinPool(4);
    long actualTotal = customThreadPool.submit(
      () -> aList.parallelStream().reduce(0L, Long::sum)).get();
  
    assertEquals((lastNum + firstNum) * lastNum / 2, actualTotal);
}

我们使用了并行度为 4ForkJoinPool构造函数。需要进行一些实验才能确定不同环境的最佳值,但是一个很好的经验法则就是根据CPU的核心数量来选择数字。

接下来,我们处理了并行 Stream 的内容,并在reduce调用中将它们汇总。

这个简单的示例可能无法展示使用自定义线程池的全部用处,但是当不希望将通用线程池与耗时的任务捆绑在一起(例如,处理来自网络源的数据),或者应用程序中的其他组件正在使用公共线程池的情况下,使用自定义线程池的好处是显而易见的。

另外,除了在代码中自定义ForkJoinPool外,还可以通过Java系统参数java.util.concurrent.ForkJoinPool.common.parallelism来修改公共线程池的并行度。例如-Djava.util.concurrent.ForkJoinPool.common.parallelism=8

4、结论

我们简要介绍了如何使用自定义线程池运行并行 Stream。在正确的环境中,并通过适当使用并行度级别,可以在某些情况下提高性能。