The Stream API brought a new programming paradigm to Java: a declarative way of processing data using streams—expressing what should be done to the values and not how it should be done. More importantly, the API allows you to harness the power of multicore architectures for the parallel processing of data. There are two kinds of streams.
◉ A sequential stream is one whose elements are processed sequentially (as in a for loop) when the stream pipeline is executed by a single thread.
◉ A parallel stream is split into multiple substreams that are processed in parallel by multiple instances of the stream pipeline being executed by multiple threads, and their intermediate results are combined to create the final result.
A parallel stream can be created only directly on a collection by invoking the Collection.parallelStream() method.
The sequential or parallel mode of an existing stream can be modified by calling the BaseStream.sequential() and BaseStream.parallel() intermediate operations, respectively. A stream is executed sequentially or in parallel depending on the execution mode of the stream on which the terminal operation is initiated.
The Stream API makes it possible to execute a sequential stream in parallel without rewriting the code. The primary reason for using parallel streams is to improve performance while at the same time ensuring that the results obtained are the same, or at least compatible, regardless of the mode of execution. Although the API goes a long way toward achieving its aim, it is important to understand the pitfalls to avoid when executing stream pipelines in parallel.
Using parallel streams
Building parallel streams. The execution mode of an existing stream can be set to parallel by calling the parallel() method on the stream. The parallelStream() method of the Collection interface can be used to create a parallel stream with a collection as the datasource. No other code is necessary for parallel execution, as the data partitioning and thread management for a parallel stream are handled by the API and the JVM. As with any stream, the stream is not executed until a terminal operation is invoked on it.
The isParallel() method of the stream interfaces can be used to determine whether the execution mode of a stream is parallel.
Executing parallel streams. The Stream API allows a stream to be executed either sequentially or in parallel—meaning that all stream operations can execute either sequentially or in parallel. A sequential stream is executed in a single thread running on one CPU core. The elements in the stream are processed sequentially in a single pass by the stream operations that are executed in the same thread.
A parallel stream is executed by different threads, running on multiple CPU cores in a computer. The stream elements are split into substreams that are processed by multiple instances of the stream pipeline being executed in multiple threads. The partial results from the processing of each substream are merged (or combined) into a final result.
Parallel streams utilize the fork/join framework for executing parallel tasks. This framework provides support for the thread management necessary to execute the substreams in parallel. The number of threads employed during parallel stream execution is dependent on the CPU cores in the computer.
Factors affecting performance
There are no guarantees that executing a stream in parallel will improve performance. This section looks at some factors that can affect performance.
In general, increasing the number of CPU cores and, thereby, the number of threads that can execute in parallel scales performance only up to a threshold for a given size of data, as some threads might become idle if there is no data left for them to process. The number of CPU cores boosts performance to a certain extent, but it is not the only factor that should be considered when deciding whether to execute a stream in parallel.
Inherent in the total cost of parallel processing is the startup cost of setting up the parallel execution. At the onset, if this cost is already comparable to the cost of sequential execution, not much can be gained by resorting to parallel execution.
A combination of the following three factors can be crucial in deciding whether a stream should be executed in parallel:
◉ Sufficiently large data size. The size of the stream must be sufficiently large enough to warrant parallel processing; otherwise, sequential processing is preferable. The startup cost can be too prohibitive for parallel execution if the stream size is too small.
◉ Computation-intensive stream operations. If the stream operations are small computations, the stream size should be proportionately large to warrant parallel execution. If the stream operations are computation-intensive, the stream size is less significant, and parallel execution can boost performance.
◉ Easily splitable stream. If the cost of splitting the stream into substreams is higher than the cost of processing the substreams, employing parallel execution can be futile. A collection such as an ArrayList, a HashMap, or a simple array are efficiently splitable, whereas a LinkedList or I/O-based datasources are least efficient in this regard.
Benchmarking is recommended
Benchmarking—that is, measuring performance—is strongly recommended before deciding whether parallel execution will be beneficial. Listing 1 illustrates a simple program that reads the system clock before and after a stream is executed; it can be used to get a sense of how well a stream performs.
Listing 1. Benchmark the performance of sequential and parallel streams.
import java.util.function.LongFunction;
import java.util.stream.LongStream;
/*
* Benchmark the execution time to sum numbers from 1 to n values
* using streams.
*/
public final class StreamBenchmarks {
public static long seqSumRangeClosed(long n) { // (1)
return LongStream.rangeClosed(1L, n).sum();
}
public static long paraSumRangeClosed(long n) { // (2)
return LongStream.rangeClosed(1L, n).parallel().sum();
}
public static long seqSumIterate(long n) { // (3)
return LongStream.iterate(1L, i -> i + 1).limit(n).sum();
}
public static long paraSumIterate(long n) { // (4)
return LongStream.iterate(1L, i -> i + 1).limit(n).parallel().sum();
}
public static long iterSumLoop(long n) { // (5)
long result = 0;
for (long i = 1L; i <= n; i++) {
result += i;
}
return result;
}
/*
* Applies the function parameter func, passing n as parameter.
* Returns the average time (ms.) to execute the function 100 times.
*/
public static <R> double measurePerf(LongFunction<R> func, long n) { // (6)
int numOfExecutions = 100;
double totTime = 0.0;
R result = null;
for (int i = 0; i < numOfExecutions; i++) { // (7)
double start = System.nanoTime(); // (8)
result = func.apply(n); // (9)
double duration = (System.nanoTime() - start)/1_000_000; // (10)
totTime += duration; // (11)
}
double avgTime = totTime/numOfExecutions; // (12)
return avgTime;
}
/*
* Executes the functions in the vararg funcs for different stream sizes.
*/
public static <R> void xqtFunctions(LongFunction<R>... funcs) { // (13)
long[] sizes = {1_000L, 10_000L, 100_000L, 1_000_000L}; // (14)
// For each stream size ...
for (int i = 0; i < sizes.length; ++i) {
System.out.printf("%7d", sizes[i]); // (15)
// ... execute the functions passed in the var-arg funcs.
for (int j = 0; j < funcs.length; ++j) { // (16)
System.out.printf("%10.5f", measurePerf(funcs[j], sizes[i]));
}
System.out.println();
}
}
public static void main(String[] args) { // (17)
System.out.println("Streams created with the rangeClosed() method:");// (18)
System.out.println(" Size Sequential Parallel");
xqtFunctions(StreamBenchmarks::seqSumRangeClosed,
StreamBenchmarks::paraSumRangeClosed);
System.out.println("Streams created with the iterate() method:"); // (19)
System.out.println(" Size Sequential Parallel");
xqtFunctions(StreamBenchmarks::seqSumIterate,
StreamBenchmarks::paraSumIterate);
System.out.println("Iterative solution with an explicit loop:"); // (20)
System.out.println(" Size Iterative");
xqtFunctions(StreamBenchmarks::iterSumLoop);
}
}
Possible output from the program would look like Figure 1; these results will be referred to throughout the rest of the article.
0 comments:
Post a Comment