并行流

通过调用parallelStream方法,可以将集合转换为并行流。并行流是将元素分割成多个块的流,使用不同的线程处理每个块。因此,你可以自动划分指定操作的工作负载在多核处理器的所有核心上,并使它们一样忙。

1
2
3
public long sequentialSum(long n) {
    return Stream.iterate(1L, i -> i + 1).limit(n).reduce(0L, Long::sum);
}

将顺序流转为并行流

调用parallel方法可以将顺序流转为并行流:

1
2
3
public long parallelSum(long n) {
    return Stream.iterate(1L, i -> i + 1).limit(n).parallel().reduce(0L, Long::sum);
}

实际上,在顺序流上并行parallel方法并没有对流进行任何转换。在内部设置一个布尔值,标志parallel后面所有的操作需要并行处理。你也许会认为通过sequential方法和parallel方法可以更精细的控制流操作,比如:

1
stream.parallel().filter(...).sequential().map(...).parallel().reduce();

但是parallelsequential 的最后一次调用获胜并影响整个流管道。

配置并行流使用的线程池

并行流在内部使用默认的ForkJoinPool。默认情况下,它的线程数与处理器的数量相同,由Runtime.getRuntime().availableprocessors()返回。

但是你可以修改系统属性java.util.concurrent.ForkJoinPool.common.parallelism修改线程池大小,比如:System.setProperty(“java.util.concurrent.ForkJoinPool.common.parallelism”, “12”);

这是一个全局设置,因此它将影响代码中的所有并行流。不推荐修改这个值。

测量流处理性能

如果使用Maven作为构建工具,那么要在项目中使用JMH,需要向pom.xml文件添加几个依赖项:

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
<dependency>
  <groupId>org.openjdk.jmh</groupId>
  <artifactId>jmh-core</artifactId>
  <version>1.17.4</version>
</dependency>
<dependency>
  <groupId>org.openjdk.jmh</groupId>
  <artifactId>jmh-generator-annprocess</artifactId>
  <version>1.17.4</version>
</dependency>

<build>
  <plugins>
    <plugin>
      <groupId>org.apache.maven.plugins</groupId>
      <artifactId>maven-shade-plugin</artifactId>
      <executions>
        <execution>
          <phase>package</phase>
          <goals><goal>shade</goal></goals>
          <configuration>
            <finalName>benchmarks</finalName>
            <transformers>
              <transformer implementation="org.apache.maven.plugins.shade.
resource.ManifestResourceTransformer">
                <mainClass>org.openjdk.jmh.Main</mainClass>
              </transformer>
            </transformers>
          </configuration>
        </execution>
      </executions>
    </plugin>
  </plugins>
</build>
 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
@BenchmarkMode(Mode.AverageTime)
@OutputTimeUnit(TimeUnit.MILLISECONDS)
@Fork(2, jvmArgs = {"-Xms4G", "-Xmx4G"})
public class ParallelStreamBenchmark {
    private static final long N = 10_000_000L;

    @Benchmark
    public long sequentialSum() {
        return Stream.iterate(1L, i -> i + 1).limit(N).reduce(0L, Long::sum);
    }

    @Benchmark
    public long iterativeSum() {
        long result = 0;
        for (long i = 1L; i <= N; i++) {
            result += i;
        }
        return result;
    }

    @TearDown(Level.Invocation)
    public void tearDown() {
        System.gc();
    }
}

这说明了并行编程是多么棘手,有时甚至违反直觉。当被误用时,可能会降低程序的整体性能,因此必须了解调用并行方法的背后发生了什么。

使用更专业的方法

LongStream.rangeClosed相比iterate有2个好处:

  • LongStream.rangeClosed直接处理原始类型long,因此没有装箱和拆箱开销。
  • LongStream.rangeClosed产生一个范围的数字,可以很容易地划分为独立的块。
1
2
3
4
5
6
7
8
9
@Benchmark
public long rangedSum() {
    return LongStream.rangeClosed(1, N).reduce(0L, Long::sum);
}

@Benchmark
public long parallelRangedSum() {
    return LongStream.rangeClosed(1, N).parallel().reduce(0L, Long::sum);
}

正确使用并行流

误用并行流产生错误的主要原因是使用了改变某些共享状态的算法。下面是一种方法通过修改共享累加器来求前n个自然数之和:

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
public long sideEffectSum(long n) {
    Accumulator accumulator = new Accumulator();
    LongStream.rangeClosed(1, n).forEach(accumulator::add);
    return accumulator.total;
}

public class Accumulator {
    public long total = 0;
    public void add(long value) { total += value; }
}

请记住,避免共享可变状态可以确保并行流产生正确的结果。

有效地使用并行流

以下是使用并行流的一些建议:

  • 并行流并不总是比相应的顺序流快,因此在选择顺序流和并行流时,最重要的建议是使用适当的基准测试它们的性能。
  • 自动装箱和拆箱操作会极大地影响性能。可能使用IntStream、LongStream和DoubleStream。
  • 有些操作在并行流上的性能比在顺序流上差。特别是依赖于元素顺序的limit和findFirst之类的操作在并行流中非常昂贵。
  • 考虑流管道操作的总成本,成本越高,使用并行流性能越好。
  • 对于少量数据,选择并行流不是个好决定。并行处理只有几个元素的优势不足以补偿并行化过程带来的额外成本。
  • 考虑流下面的数据结构是否容易分解。比如ArrayList比LinkedList更容易分割。原始类型流的range方法能快速分解。你也可以实现自己的Spliterator来完全控制分解过程。
  • 流的特征以及管道的中间操作如何修改它们,会改变分解过程的性能。比如一个固定大小的流可以分成多个相等的部分,然后可以更有效地并行处理每个部分,但是filter操作会丢弃不可预知的元素数量,从而使流本身的大小未知。
  • 考虑终端操作的合并步骤是否昂贵。

下表从可分解性的角度总结了某些集合的并行友好性。

数据源 可分解性
ArrayList 极好
LinkedList
IntStream.range 极好
Stream.iterate
HashSet
TreeSet

fork/join框架

fork/join框架的设计目的是递归地将一个可并行化的任务分解为更小的任务,然后将每个子任务的结果组合起来,以生成总体结果。它是ExecutorService接口的实现,该接口将这些子任务分发给线程池(称为ForkJoinPool)中的工作线程。

使用RecursiveTask

想要提交任务到线程池,你需要创建一个RecursiveTask<R>子类,其中R是并行任务产生的结果类型。如果任务不返回任何结果,则需要创建RecursiveAction的子类。要定义RecursiveTask,只需要实现它的一个抽象方法:

1
protected abstract R compute();

该方法定义了将任务分解为子任务的逻辑,以及生成单个子任务结果的算法。

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
public class ForkJoinSumCalculator extends java.util.concurrent.RecursiveTask<Long> {
    private final long[] numbers;
    private final int start;
    private final int end;
    public static final long THRESHOLD = 10_000;

    public ForkJoinSumCalculator(long[] numbers) {
        this(numbers, 0, numbers.length);
    }

    private ForkJoinSumCalculator(long[] numbers, int start, int end) {
        this.numbers = numbers;
        this.start = start;
        this.end = end;
    }

    @Override
    protected Long compute() {
        int length = end - start;
        if (length <= THRESHOLD) {
            return computeSequentially();
        }
        ForkJoinSumCalculator leftTask =
            new ForkJoinSumCalculator(numbers, start, start + length / 2);
        leftTask.fork();
        ForkJoinSumCalculator rightTask =
            new ForkJoinSumCalculator(numbers, start + length / 2, end);
        Long rightResult = rightTask.compute();
        Long leftResult = leftTask.join();
        return leftResult + rightResult;
    }

    private long computeSequentially() {
        long sum = 0;
        for (int i = start; i < end; i++) {
            sum += numbers[i];
        }
        return sum;
    }
}

public static long forkJoinSum(long n) {
    long[] numbers = LongStream.rangeClosed(1, n).toArray();
    ForkJoinTask<Long> task = new ForkJoinSumCalculator(numbers);
    return new ForkJoinPool().invoke(task);
}

使用fork/join框架的最佳实践

尽管fork/join框架相对容易使用,但它也很容易被误用。下面是一些有效使用它的最佳实践:

  • 在任务上调用join方法会阻塞调用者,直到该任务生成的结果就绪为止。因此必须在启动两个子任务的计算之后调用它。
  • 不应该在RecursiveTask中调用ForkJoinPoolinvoke方法。只有顺序代码才应该使用invoke开始并行计算。
  • 在子任务上调用fork方法实际上是将任务放入ForkJoinPool。在其中一个子任务上直接调用compute可以重用一个的线程,避免在池中不必要地分配另一个任务而造成的开销。
  • 调试使用fort/join框架的并行计算比较棘手,因为在不同的线程执行计算。
  • 和使用并行流一样,不能想当然地认为在多核处理器上使用fork/join框架进行计算比顺序处理会更快。一个任务应该可以分解为几个独立的子任务,以便能够获得相应的性能提升。这些子任务的执行时间应该比创建一个新任务时间要长。

工作窃取

ForkJoinPool中每个线程都持有一个双链表,包含分配给它的任务。每完成一个任务,线程从队列头部取出另一个任务并开始执行。但是由于一些原因,一个线程可能比其他线程更快地完成分配给它的所有任务,这意味着当其他线程仍然非常忙时,它的队列将变为空。在这种情况下,该线程会随机选择另一个线程的工作队列,并从队列尾部“窃取”任务。这个过程一直进行直到所有任务都完成。这就是为什么有许多较小的任务,而不是只有几个较大的任务,可以帮助更好地平衡工作线程之间的工作负载。

Spliterator

Spliterator是Java 8新增的一个接口。Collection接口提供了一个默认方法spliterator(),返回一个spliterator对象。Spliterator接口定义了几个方法,如下所示:

1
2
3
4
5
6
public interface Spliterator<T> {
    boolean tryAdvance(Consumer<? super T> action);
    Spliterator<T> trySplit();
    long estimateSize();
    int characteristics();
}

tryAdvance方法与普通迭代器类似,用于顺序遍历Spliterator的元素,如果还有其他元素要遍历,则返回true。

trySplit方法用于将其部分元素分割到第二个Spliterator,从而允许并行处理这两个Spliterator。

estimateSize方法估计剩余要遍历的元素的数量。

分裂过程

将一个流分成多个部分是一个递归过程。首先,在第一个Spliterator上调用trySplit并生成第二个Spliterator。然后在这两个spliterator上再次调用trySplit,得到四个Spliterator。不断调用方法trySplit,直到返回null,说明正在处理的数据是不可再分割的。当所有Spliterators调用trySplit返回null时,分裂过程终止。分裂过程会受到Spliterator本身的特性的影响,它们由characteristics方法声明的。

Spliterator特征

characteristics方法返回一个int,包含一组Spliterator的特征。可以使用这些特性更好地控制和优化Spliterator使用。

特征 意义
ORDERED 元素有顺序,因此Spliterator在遍历和分区时保持这个顺序
DISTINCT 所有元素唯一
SORTED 遍历的元素遵循预定义的排序顺序
SIZED 遍历的元素大小已知
NON_NULL 元素非空
IMMUTABLE 数据源不能修改
CONCURRENT 可并行
SUBSIZED 所有的Spliterator大小已知

实现自己的Spliterator

我们将开发一个简单的方法来计算字符串中的单词数量。其迭代版如下所示:

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
public int countWordsIteratively(String s) {
    int counter = 0;
    boolean lastSpace = true;
    for (char c : s.toCharArray()) {
        if (Character.isWhitespace(c)) {
            lastSpace = true;
        } else {
            if (lastSpace) counter++;
            lastSpace = false;
        }
    }
    return counter;
}

函数风格重写单词计数器

首先,需要将字符串转换为流

1
Stream<Character> stream = IntStream.range(0, SENTENCE.length()).mapToObj(SENTENCE::charAt);

定义一个类WordCounter用于携带2个状态,一个是目前为止找到的单词数量,一个是最后字符是否为空格。

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
class WordCounter {
    private final int counter;
    private final boolean lastSpace;

    public WordCounter(int counter, boolean lastSpace) {
        this.counter = counter;
        this.lastSpace = lastSpace;
    }

    public WordCounter accumulate(Character c) {
        if (Character.isWhitespace(c)) {
            return lastSpace ? this : new WordCounter(counter, true);
        } else {
            return lastSpace ? new WordCounter(counter + 1, false) : this;
        }
    }

    public WordCounter combine(WordCounter wordCounter) {
        return new WordCounter(counter + wordCounter.counter,
            wordCounter.lastSpace);
    }

    public int getCounter() {
        return counter;
    }
}

如下所示计算流中的单词数:

1
2
3
4
5
6
private int countWords(Stream<Character> stream) {
    WordCounter wordCounter = stream.reduce(new WordCounter(0, true),
        WordCounter::accumulate,
        WordCounter::combine);
    return wordCounter.getCounter();
}

使单词计数器并行工作

解决方案需确保字符串不是在随机位置拆分,而只在单词末尾拆分。所以需要实现一个字符Spliterator,只在两个单词之间分割字符串,然后从中创建并行流。

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
class WordCounterSpliterator implements Spliterator<Character> {
    private final String string;
    private int currentChar = 0;

    public WordCounterSpliterator(String string) {
        this.string = string;
    }

    @Override
    public boolean tryAdvance(Consumer<? super Character> action) {
        action.accept(string.charAt(currentChar++));
        return currentChar < string.length();
    }

    @Override
    public Spliterator<Character> trySplit() {
        int currentSize = string.length() - currentChar;
        if (currentSize < 10) {
            return null;
        }
        for (int splitPos = currentSize / 2 + currentChar;
             splitPos < string.length(); splitPos++) {
            if (Character.isWhitespace(string.charAt(splitPos))) {
                Spliterator<Character> spliterator =
                    new WordCounterSpliterator(string.substring(currentChar,
                        splitPos));
                currentChar = splitPos;
                return spliterator;
            }
        }
        return null;
    }

    @Override
    public long estimateSize() {
        return string.length() - currentChar;
    }

    @Override
    public int characteristics() {
        return ORDERED + SIZED + SUBSIZED + NON_NULL + IMMUTABLE;
    }
}

使单词计数器Spliterator工作

现在可以使用这个并行流,如下所示:

1
2
Spliterator<Character> spliterator = new WordCounterSpliterator(SENTENCE);
Stream<Character> stream = StreamSupport.stream(spliterator, true);

Spliterator的最后一个显著特性是,可以绑定要在第一次遍历、第一次拆分或第一次查询时(而不是在创建时)遍历的元素的数据源,称为后期绑定Spliterator。