Wednesday, April 27, 2022

Parallel streams in Java: Benchmarking and performance considerations

The Stream API makes it possible to execute a sequential stream in parallel without rewriting the code.

The Stream API brought a new programming paradigm to Java: a declarative way of processing data using streams—expressing what should be done to the values and not how it should be done. More importantly, the API allows you to harness the power of multicore architectures for the parallel processing of data. There are two kinds of streams.

Oracle Java Exam, Oracle Java Tutorial and Material, Oracle Java Career, Java Skills, Java Jobs, Oracle Java Certification, Java Preparation, Oracle Java

◉ A sequential stream is one whose elements are processed sequentially (as in a for loop) when the stream pipeline is executed by a single thread.

◉ A parallel stream is split into multiple substreams that are processed in parallel by multiple instances of the stream pipeline being executed by multiple threads, and their intermediate results are combined to create the final result.

A parallel stream can be created only directly on a collection by invoking the Collection.parallelStream() method.

The sequential or parallel mode of an existing stream can be modified by calling the BaseStream.sequential() and BaseStream.parallel() intermediate operations, respectively. A stream is executed sequentially or in parallel depending on the execution mode of the stream on which the terminal operation is initiated.

The Stream API makes it possible to execute a sequential stream in parallel without rewriting the code. The primary reason for using parallel streams is to improve performance while at the same time ensuring that the results obtained are the same, or at least compatible, regardless of the mode of execution. Although the API goes a long way toward achieving its aim, it is important to understand the pitfalls to avoid when executing stream pipelines in parallel.

Using parallel streams

Building parallel streams. The execution mode of an existing stream can be set to parallel by calling the parallel() method on the stream. The parallelStream() method of the Collection interface can be used to create a parallel stream with a collection as the datasource. No other code is necessary for parallel execution, as the data partitioning and thread management for a parallel stream are handled by the API and the JVM. As with any stream, the stream is not executed until a terminal operation is invoked on it.

The isParallel() method of the stream interfaces can be used to determine whether the execution mode of a stream is parallel.

Executing parallel streams. The Stream API allows a stream to be executed either sequentially or in parallel—meaning that all stream operations can execute either sequentially or in parallel. A sequential stream is executed in a single thread running on one CPU core. The elements in the stream are processed sequentially in a single pass by the stream operations that are executed in the same thread.

A parallel stream is executed by different threads, running on multiple CPU cores in a computer. The stream elements are split into substreams that are processed by multiple instances of the stream pipeline being executed in multiple threads. The partial results from the processing of each substream are merged (or combined) into a final result.

Parallel streams utilize the fork/join framework for executing parallel tasks. This framework provides support for the thread management necessary to execute the substreams in parallel. The number of threads employed during parallel stream execution is dependent on the CPU cores in the computer.

Factors affecting performance

There are no guarantees that executing a stream in parallel will improve performance. This section looks at some factors that can affect performance.

In general, increasing the number of CPU cores and, thereby, the number of threads that can execute in parallel scales performance only up to a threshold for a given size of data, as some threads might become idle if there is no data left for them to process. The number of CPU cores boosts performance to a certain extent, but it is not the only factor that should be considered when deciding whether to execute a stream in parallel.

Inherent in the total cost of parallel processing is the startup cost of setting up the parallel execution. At the onset, if this cost is already comparable to the cost of sequential execution, not much can be gained by resorting to parallel execution.

A combination of the following three factors can be crucial in deciding whether a stream should be executed in parallel:

◉ Sufficiently large data size. The size of the stream must be sufficiently large enough to warrant parallel processing; otherwise, sequential processing is preferable. The startup cost can be too prohibitive for parallel execution if the stream size is too small.

◉ Computation-intensive stream operations. If the stream operations are small computations, the stream size should be proportionately large to warrant parallel execution. If the stream operations are computation-intensive, the stream size is less significant, and parallel execution can boost performance.

◉ Easily splitable stream. If the cost of splitting the stream into substreams is higher than the cost of processing the substreams, employing parallel execution can be futile. A collection such as an ArrayList, a HashMap, or a simple array are efficiently splitable, whereas a LinkedList or I/O-based datasources are least efficient in this regard.

Benchmarking is recommended

Benchmarking—that is, measuring performance—is strongly recommended before deciding whether parallel execution will be beneficial. Listing 1 illustrates a simple program that reads the system clock before and after a stream is executed; it can be used to get a sense of how well a stream performs.

Listing 1. Benchmark the performance of sequential and parallel streams.

import java.util.function.LongFunction;

import java.util.stream.LongStream;

/*

 * Benchmark the execution time to sum numbers from 1 to n values

 * using streams.

 */

public final class StreamBenchmarks {

  public static long seqSumRangeClosed(long n) {                           // (1)

    return LongStream.rangeClosed(1L, n).sum();

  }

  public static long paraSumRangeClosed(long n) {                          // (2)

    return LongStream.rangeClosed(1L, n).parallel().sum();

  }

  public static long seqSumIterate(long n) {                               // (3)

    return LongStream.iterate(1L, i -> i + 1).limit(n).sum();

  }

  public static long paraSumIterate(long n) {                              // (4)

    return LongStream.iterate(1L, i -> i + 1).limit(n).parallel().sum();

  }

  public static long iterSumLoop(long n) {                                 // (5)

    long result = 0;

    for (long i = 1L; i <= n; i++) {

      result += i;

    }

    return result;

  }

  /*

   * Applies the function parameter func, passing n as parameter.

   * Returns the average time (ms.) to execute the function 100 times.

   */

  public static <R> double measurePerf(LongFunction<R> func, long n) {     // (6)

    int numOfExecutions = 100;

    double totTime = 0.0;

    R result = null;

      for (int i = 0; i < numOfExecutions; i++) {                          // (7)

        double start = System.nanoTime();                                  // (8)

        result = func.apply(n);                                            // (9)

        double duration = (System.nanoTime() - start)/1_000_000;           // (10)

        totTime += duration;                                               // (11)

      }

      double avgTime = totTime/numOfExecutions;                            // (12)

    return avgTime;

  }

  /*

   * Executes the functions in the vararg funcs for different stream sizes.

   */

  public static <R> void xqtFunctions(LongFunction<R>... funcs) {          // (13)

    long[] sizes = {1_000L, 10_000L, 100_000L, 1_000_000L};                // (14)

    // For each stream size ...

    for (int i = 0; i < sizes.length; ++i) {

      System.out.printf("%7d", sizes[i]);                                  // (15)

      // ... execute the functions passed in the var-arg funcs.

      for (int j = 0; j < funcs.length; ++j) {                             // (16)

        System.out.printf("%10.5f", measurePerf(funcs[j], sizes[i]));

      }

      System.out.println();

    }

  }

  public static void main(String[] args) {                               // (17)

    System.out.println("Streams created with the rangeClosed() method:");// (18)

    System.out.println(" Size Sequential Parallel");

    xqtFunctions(StreamBenchmarks::seqSumRangeClosed,

      StreamBenchmarks::paraSumRangeClosed);

    System.out.println("Streams created with the iterate() method:");    // (19)

    System.out.println(" Size Sequential Parallel");

    xqtFunctions(StreamBenchmarks::seqSumIterate,

      StreamBenchmarks::paraSumIterate);

    System.out.println("Iterative solution with an explicit loop:");     // (20)

    System.out.println(" Size Iterative");

    xqtFunctions(StreamBenchmarks::iterSumLoop);

  }

}

Possible output from the program would look like Figure 1; these results will be referred to throughout the rest of the article.

Oracle Java Exam, Oracle Java Tutorial and Material, Oracle Java Career, Java Skills, Java Jobs, Oracle Java Certification, Java Preparation, Oracle Parallel Streams

Figure 1. Output from the benchmark program

The class StreamBenchmarks in Listing 1 defines five methods, at lines (1) through (5), that compute the sum of values from 1 to n. These methods compute the sum in various ways, and each method is executed with four different values of n—that is, the stream size is the number of values for summation.

The program prints the benchmarks for each method for the different values of n, which of course can vary, as many factors can influence the results—the most significant one being the number of CPU cores on the computer.

Here’s a more detailed look.

Methods at lines (1) and (2). The methods seqSumRangeClosed() at line (1) and paraSumRangeClosed() at line (2) perform the computation on a sequential and a parallel stream, respectively, that are created with the rangeClosed() method.

return LongStream.rangeClosed(1L, n).sum();             // sequential stream
...
return LongStream.rangeClosed(1L, n).parallel().sum();  // parallel stream

Note that the terminal operation sum() is not computation-intensive.

The parallel stream starts to show better performance when the number of values approaches 100,000. The stream size is then significantly large for the parallel stream to show better performance. Note that the range of values defined by the arguments of the rangeClosed() method can be efficiently split into substreams, because its start and end values are provided.

Methods at lines (3) and (4). The methods seqSumIterate() at line (3) and paraSumIterate() at line (4) return a sequential and a parallel stream, respectively, created with the iterate() method.

return LongStream.iterate(1L, i -> i + 1).limit(n).sum(); // Sequential ...
return LongStream.iterate(1L, i -> i + 1).limit(n).parallel().sum(); // Parallel

Here, the method iterate() creates an infinite stream, and the limit() intermediate operation truncates the stream according to the value of n. The performance of both streams degrades fast when the number of values increases.

However, the parallel stream performs worse than the sequential stream in all cases. The values generated by the iterate() method are not known before the stream is executed, and the limit() operation is also stateful, making the process of splitting the values into substreams inefficient in the case of the parallel stream.

Method at line (5). Method iterSumLoop() at line (5) uses a for loop to compute the sum. Using a for loop to calculate the sum performs best for all values of n compared to the streams, showing that significant overhead is involved in using streams for summing a sequence of numerical values.

The rest of the listing. Here is a description of the other main lines in Listing 1.

The methods measurePerf() at line (6) and xqtFunctions() at line (13) create the benchmarks for functions passed as parameters.

In the measurePerf() method, the system clock is read at line (8) and the function parameter func is applied at line (9). The system clock is read again at line (10) after the function application at line (9) has been completed. The execution time calculated at line (10) reflects the time for executing the function.

Applying the function func evaluates the lambda expression or the method reference implementing the LongFunction interface. In Listing 1, the function parameter func is implemented by method references that call methods at lines (1) through (5) in the StreamBenchmarks class whose execution time is to be measured.

Side effects and other factors


Efficient execution of parallel streams that produces the desired results requires the stream operations (and their behavioral parameters) to avoid certain side effects.

Noninterfering behaviors. The behavioral parameters of stream operations should be noninterfering, both for sequential and parallel streams. Unless the stream datasource is concurrent, the stream operations should not modify it during the execution of the stream pipeline.

Stateless behaviors. The behavioral parameters of stream operations should be stateless, both for sequential and parallel streams. A behavior parameter implemented as a lambda expression should not depend on any state that might change during the execution of the stream pipeline. The results from a stateful behavioral parameter can be nondeterministic or even incorrect. For a stateless behavior parameter, the results are always the same.

Having a shared state that is accessed by the behavior parameters of the stream operations in a pipeline is not a good idea. Why? Executing the pipeline in parallel can lead to race conditions when the global state is accessed; using synchronization code to provide thread safety may defeat the purpose of parallelization. Using the three-argument reduce() or collect() method can be a better solution to encapsulate shared state.

The intermediate operations distinct(), skip(), limit(), and sorted() are stateful. They can carry extra performance overhead when executed in a parallel stream, because such an operation can entail multiple passes over the data and may require significant data buffering.

Ordering and terminal operations. An ordered stream processed by operations that preserve the encounter order will produce the same results, regardless of whether it is executed sequentially or in parallel. However, repeated execution of an unordered stream—sequential or parallel—can produce different results.

Preserving the encounter order of elements in an ordered parallel stream can incur a performance penalty. The performance of an ordered parallel stream can be improved if the ordering constraint is removed by calling the unordered() intermediate operation on the stream.

The stateful intermediate operations distinct(), skip(), and limit() can improve performance in a parallel stream that is unordered, as compared to one that is ordered.

◉ Rather than needing to buffer the first occurrence of a duplicate value, the distinct() operation need only buffer any occurrence.

◉ The skip() operation can skip any n elements, rather than skipping the first n elements.

◉ The limit() operation can truncate the stream after any n elements, rather than just after the first n elements.

The terminal operation findAny() is intentionally nondeterministic, and it can return any element in the stream. It is especially suited for parallel streams.

The forEach() terminal operation ignores the encounter order, but the forEachOrdered() terminal operation preserves the order. The sorted() stateful intermediate operation, on the other hand, enforces a specific encounter order, regardless of whether it is executed in a parallel pipeline.

Autoboxing and unboxing of numeric values. Because the Stream API allows both object and numeric streams, and it provides support for conversion between them, choosing a numeric stream, when possible, can offset the overhead of autoboxing and unboxing in object streams.

Source: oracle.com

Related Posts

0 comments:

Post a Comment