Wednesday, May 5, 2021

Java 8 Parallel Streams – Custom Thread Pools Examples

A brief intro to custom thread pools and their use in Java 8 parallel streams. Examples on how to use custom pools with the Parallel streams API which avoids common thread pool usage.

1. Introduction

In this tutorial, You’ll learn how to create custom thread pools in Java 8 for bulk data processing with parallel streams powerful API.

Parallel Stream can work well in concurrent environments and these are improved versions of streams performance at the cost of multi-threading overhead.

The main focus in this article is to look at one of the biggest limitations of Stream API and Examples on how can you use the Parallel Streams with the custom thread pools.

Java 8 Parallel Streams, Core Java, Java 8, Oracle Java Tutorial and Material, Oracle Java Learning, Oracle Java Preparation, Java Career

2. Java 8 Parallel Streams


First, let us see how to create Parallel Streams from a Collection.

To make a stream that can run by multiple cores of the processer, you just need to call parallelStream() method.

package com.oraclejavacertified.java8.streams.parallel.streams;
 
import java.util.Arrays;
import java.util.List;
import java.util.stream.Stream;
 
public class ParallelStreamCreation {
 
    public static void main(String[] args) {
 
        List<Integer> intList = Arrays.asList(10, 20, 30, 40, 50);
 
        Stream<Integer> parallelStream = intList.parallelStream();
 
        parallelStream.forEach(value -> System.out.println(value));
    }
}

Output:

[30
40
50
20
10]

You can observe the output that printed the values randomly by different cores.

Internally, it uses SplitIterator and StreamSupport classes to make it run parallelly.

The default processing is done with ForkJoinPool.commonPool() which is shared by the entire application. If you lots of parallel streams that are running at the same time then you may see performance and delay in processing time.

3. Using Custom Thread Pool


As a result of the above approach will use a common ForkJoinPool for all the parallel Streams.

If you have many parallel streams running at the same time and some of them take time longer than expected due to network slowness and those tasks may be blocking the threads from the common pool. Hence, it causes to slow down the tasks and take a longer time to complete.

In these cases, It is good to go with the custom thread pools with the parallel streams combination.

Look at the below program, that runs with 5 threads using ForkJoinPool and inside creating a new parallel stream to find the sum of all numbers for the given range.

package com.oraclejavacertified.java8.streams.parallel.streams;
 
import java.util.List;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.ForkJoinPool;
import java.util.stream.Collectors;
import java.util.stream.IntStream;
 
public class CustomPoolParallelStreams {
 
    public static void main(String[] args) throws ExecutionException, InterruptedException {
        parallelStreamProcess();
    }
 
    private static void parallelStreamProcess() throws ExecutionException, InterruptedException {
 
        int start = 1;
        int end = 10000;
 
        List<Integer> intList = IntStream.rangeClosed(start, end).boxed()
                .collect(Collectors.toList());
        System.out.println(intList.size());
 
        ForkJoinPool newCustomThreadPool = new ForkJoinPool(5);
        int actualTotal = newCustomThreadPool.submit(
                () -> {
                     int a = intList.stream().parallel().reduce(0, Integer::sum).intValue();
                     return a;
                }).get();
 
        System.out.println("actualTotal " + actualTotal);
 
    }
 
}

Output:

[10000

actualTotal 50005000]

Java 8 Parallel Streams, Core Java, Java 8, Oracle Java Tutorial and Material, Oracle Java Learning, Oracle Java Preparation, Java Career
Actually, The above program does not come up with the efficient but I have seen the many websites talking about this solution. In fact, this also creating a parallel stream inside ForkJoinPool which again internally consumes threads from a common pool of ForkJoinPool area.

So, If you are running multiple parallel streams then do not use this Steam api parallel method as this might slow other streams give the results in more time.

Here, we have taken the pool count as 5 but you can change it as per your CPU configuration. If you have more then you can fine-tune based on the other tasks.

If you have only one parallel stream then you can use it with a limited pool count.

But, Wait for a java update that parallel stream can take ForkJoinPool as input to limit the number of parallel processes.

Source: javacodegeeks.com

Related Posts

0 comments:

Post a Comment