+# Multithreading,Parallel & Asynchronous Coding in Modern Java This repo has the code for parallel and asynchronous programming in Java.
public class ParallelismExample {
public static void main(String[] args) {
List<String> namesList = List.of("Bob", "Jamie", "Jill", "Rick");
System.out.println("namesList : " + namesList);
List<String> namesListUpperCase = namesList
.parallelStream()
.map(String::toUpperCase)
.collect(Collectors.toList());
System.out.println("namesListUpperCase : " + namesListUpperCase);
}
}We need a lot of boilerplate code to achive parallelism here. We have to always implement the Runnable interface and pass its instances each into a seperate Thread.
...
public Product retrieveProductDetails(String productId) throws InterruptedException {
stopWatch.start();
ProductInfoRunnable productInfoRunnable = new ProductInfoRunnable(productId);
Thread productInfoThread = new Thread(productInfoRunnable);
ReviewRunnable reviewRunnable = new ReviewRunnable(productId);
Thread reviewThread = new Thread(reviewRunnable);
productInfoThread.start();
reviewThread.start();
productInfoThread.join();
reviewThread.join();
ProductInfo productInfo = productInfoRunnable.getProductInfo();
Review review = reviewRunnable.getReview();
stopWatch.stop();
log("Total Time Taken : "+ stopWatch.getTime());
return new Product(productId, productInfo, review);
}
...
private class ProductInfoRunnable implements Runnable {
private ProductInfo productInfo;
private String productId;
public ProductInfoRunnable(String productId) {
this.productId = productId;
}
@Override
public void run() {
productInfo = productInfoService.retrieveProductInfo(productId);
}
public ProductInfo getProductInfo() {
return productInfo;
}
}
private class ReviewRunnable implements Runnable {
private String productId;
private Review review;
public ReviewRunnable(String productId) {
this.productId = productId;
}
@Override
public void run() {
review = reviewService.retrieveReviews(productId);
}
public Review getReview() {
return review;
}
}
...Then it's needed to manually:
- create the threads
- start the threads
- join the threads
Next thing is, threads are expensive
- threads have their own runtime-stack, memory, registers and ...
the answer...
- Thread Pool is a group of threads created and readily available
- CPU Intensive Tasks
- ThreadPool Size = No of Cores
- I/O task
- ThreadPool Size > No of Cores
- What are the benefits of thread pool?
- No need to manually create, start and join the threads
- Achieving Concurrency in your application
- Released as part of Java5
- ExecutorService in Java is an Asynchronous Task Execution Engine
- It provides a way to asynchronously execute tasks and provides the results in a much simpler way compared to threads
- This enabled coarse-grained task based parallelism in Java
ExecutorService executorService = Executors.newFixedThreadPool(Runtime.getRuntime().availableProcessors());With the use of ExecutorService and Futures we can reduce boiler plate dramatically
Future<ProductInfo> productInfoFuture = executorService.submit(() -> productInfoService.retrieveProductInfo(productId));
Future<Review> reviewFuture = executorService.submit(() -> reviewService.retrieveReviews(productId));
ProductInfo productInfo = productInfoFuture.get();
Review review = reviewFuture.get();Execution can even be time boxed:
ProductInfo productInfo = productInfoFuture.get(1, TimeUnit.SECONDS);ExecutorService must be shut down!
drawbacks:
- Future is designed to lead to blocking (get)
- This got introduced as part of Java7
- This is an extension of ExecutorService
- Fork/Join framework is designed to achieve Data Parallelism
- ExecutorService is designed to achieve Task Based Parallelism
At first T1 is getting the task and splits it into subtasks.
With Workstealing every thread in the work queue is looking to 'steal' tasks from other tasks so that the amount of work is spread almost equally.
- ForkJoin Task represents part of the data and its computation
- Type of tasks to submit to ForkJoin Pool
- ForkJoinTask
- RecursiveTask -> Task that returns a value
- RecursiveAction -> Task that does not return a value
- ForkJoinTask
drawbacks: code might become relatively complex.
To work with parallel streams we just replace the stream call with parallelStream and thats it!1!!
but wait... let's get closer...
- Streams API are sequential by default
- sequential() -> Executes the stream in sequential
- parallel() -> Executes the stream in parallel
- Both the functions() changes the behavior of the whole pipeline
changing the parallelStream() behaviour to sequential:
List<String> stringTransform(List<String> inputs) {
return inputs.parallelStream()
.map(ParallelStreamsExample::addNameLengthTransform)
.sequential()
.collect(Collectors.toList());
}the other way around:
List<String> stringTransform(List<String> inputs) {
return inputs.stream()
.map(ParallelStreamsExample::addNameLengthTransform)
.parallel()
.collect(Collectors.toList());
}- invoke price validator service for every card
- success when all items match with current pricing
3 phases
- split into chunks
- execute on chunks
- combine chunks
- Data Source is split in to small data chunks
- Example - List Collection split into chunks of elements to size 1
- This is done using
Spliterators - For
ArrayList, theSpliteratorisArrayListSpliterator
- Data chunks are applied to the Stream Pipeline and the Intermediate operations executed in a Common ForkJoin Pool
- Watch the Fork/Join FrameWork Lectures
- Combine the executed results into a final result
- Combine phase in Streams API maps to terminal operations
- Uses
collect()andreduce()functionscollect(toList())
By the examples it can be seen, that the LinkedList approach isn't as performant as expected.
- Invoking
parallelStream()does not guarantee faster performance of your code.- there are additional steps needed compared to simple sequential execution
Always test performance improvements.
- The order of the collection depends on:
- Type of Collection
- Spliterator Implementation of the collection
- Example : ArrayList
- Type of Collection - Ordered
- Spliterator Implementation - Ordered Spliterator Implementation
- Example : Set
- Type of Collection - UnOrdered
- Spliterator Implementation - UnOrdered Spliterator Implementation
Order is only maintained for ordered collections.
| Collect | Reduce |
|---|---|
| - Part of Streams API | - Part of Streams API |
| - Used as a terminal operation in Streams API | - Used as a terminal operation in Streams API |
| - Produces a single result | - Produces a single result |
| - Result is produced in a mutable fashion | - Result is produced in a immutable fashion |
| - Reduce the computation into a single value | |
| - Feature rich and used for many different use cases | - Sum, Multiplication |
| Examples: | Examples: |
collect(toList()), collect(toSet()) |
Sum -> reduce(0.0, (x , y)->x+y) |
collect(summingDouble(Double::doubleValue)); |
Multiply -> reduce(1.0, (x , y)->x * y) |
public static int reduceParallel() {
int sum = List.of(1, 2, 3, 4, 5, 6, 7, 8)
.parallelStream()
.reduce(0, (x, y) -> x + y);
return sum;
}Identityvalues for different kinds of calculations:
- Addition: 0
- 0 + 20 = 20
- Multiplication: 1
- 1 * 20 = 20
- when working with parallel streams the outcome is changed dramatically when using a wrong identity because of the chunking of partial calculations. In the above example it will show:
...
.reduce(1, (x, y) -> x + y);Sum then is 44 instead of 37 (when used sequential stream; correct result should be however 36).
Reduce is recommended for computations that are associative.
Stream Operations that perform poor
- Sorting
- stream API operators:
iterate(),limit() - Impact of Boxing and UnBoxing when it comes to parallel Streams
- Boxing -> Converting a Primitive Type to Wrapper class equivalent
- 1 -> new Integer(1)
- UnBoxing -> Converting a Wrapper class to Primitive equivalent
- new Integer(1) -> 1
- Boxing -> Converting a Primitive Type to Wrapper class equivalent
Common ForkJoin Pool is used by:
- ParallelStreams
- CompletableFuture
- Completable Future have options to use a User-defined ThreadPools
- Common ForkJoin Pool is shared by the whole process
Get fork join pool parallelism:
System.out.println("parallelism: " + ForkJoinPool.getCommonPoolParallelism());On my machine with 12 virtual cores this results to 11... what is going on with the 12th? Just check the now enhaced test output executing CheckoutServiceTest#shouldExseed500milliesWith25elems:
It's the thread that started the pool. But this can even be changed via:
System.setProperty("java.util.concurrent.ForkJoinPool.common.parallelism", "100");or env passing
-Djava.util.concurrent.ForkJoinPool.common.parallelism=100
to use every possible thread.
This is shown in the com.learnjava.service.CheckoutServiceTest#shouldModifyParallelism test method. In this method every item runs in it's own thread. This is practical here and reduces the overall amount of used time because we don't do heavy lifting in our threads (basically waiting). If we had the opposite situation it would be recommended to not use more threads than virtual cores.
- Introduced in Java 8
- CompletableFuture is an Asynchronous Reactive Functional Programming API
- Asynchronous Computations in a functional Style
- CompletableFutures API is created to solve the limitations of Future API
- Fundamentally Asynchronous
- Call returns immediately and the response will be sent when its available
Resilient:
- Exception or error won’t crash the app or code
Elastic:
- Asynchronous Computations normally run in a pool of threads
- Number of of threads can go up or down based on the need
Message Driven:
- Asynchronous computations interact with each through messages in a event-driven style
- Factory Methods
- Initiate asynchronous computation
- Completion Stage Methods
- Chain asynchronous computation
- Exception Methods
- Handle Exceptions in an Asynchronous Computation
supplyAsync()
- FactoryMethod
- Initiate Asynchronous computation
- Input is Supplier Functional Interface
- Returns CompletableFuture()
thenAccept()
- CompletionStage Method
- Chain Asynchronous Computation
- Input is Consumer Functional Interface
- Consumes the result of the previous
- Returns CompletableFuture
- Use it at the end of the Asynchronous computation
HelloWorldService helloWorldService = new HelloWorldService();
CompletableFuture.supplyAsync(helloWorldService::helloWorld)
.thenAccept(x -> log("Result: " + x));
log("done!");If run in main method there is nothing more seen than 'done!'. This is the case bacause the helloWorld method takes at least 1000 ms and at that point the main process is already completed. First thing todo is simply wait...
...
CommonUtil.delay(2000);or even better use join:
...
CompletableFuture.supplyAsync(helloWorldService::helloWorld)
.thenAccept(x -> log("Result: " + x))
.join();
...Keep in mind that join is a blocking call.
- Completion Stage method
- Transform the data from one form to another
- Input is Function Functional Interface
- Returns CompletableFuture
Check the value via:
helloFuture
.thenAccept(s -> assertEquals("HELLO WORLD", s))
.join();Watch out for the join here! Without it the test would pass even if the results don't match (they are simply compared after test execution).
thenCombine()
- This is a Completion Stage Method
- Used to Combine Independent Completable Futures
- Takes two arguments
- CompletionStage , BiFunction
- Returns a
CompletableFuture<T>
thenCompose()
- Completion Stage method
- Transform the data from one form to another
- Input is Function Functional Interface
- Deals with functions that return CompletableFuture
- thenApply deals with Function that returns a value
- Returns
CompletableFuture<T> - dependent task completion depends on result of previous task
now the product info service depends on the inventory service.
Problem with first approach - blocking call is eating performance.
public Inventory addInventory(ProductOption productOption) {
delay(500);
return Inventory.builder()
.count(2).build();
}Is executed within more than 3000ms. Second approach introduces blocking code only when the results are collected
private List<ProductOption> updateInventoryParallelCallsBlockingOnCollectResult(ProductInfo productInfo) {
List<CompletableFuture<ProductOption>> options = productInfo.getProductOptions().stream()
.map(option -> {
return CompletableFuture.supplyAsync(() -> inventoryService.addInventory(option))
.thenApply(inventory -> {
option.setInventory(inventory);
return option;
});
})
.collect(Collectors.toList());
return options.stream().map(CompletableFuture::join).collect(Collectors.toList());
}this approach takes 1500ms+.
- CompletableFuture is a functional style API
public String helloWorldAsyncCalls() {
CompletableFuture<String> hello = CompletableFuture.supplyAsync(() -> this.hws.hello());
CompletableFuture<String> world = CompletableFuture.supplyAsync(() -> this.hws.world());
CompletableFuture<String> hiCompletableFuture = CompletableFuture.supplyAsync(() -> {
delay(1000);
return "Hi CompletableFuture!";
});
String hw = hello
.thenCombine(world, (h, w) -> h + w)
.thenCombine(hiCompletableFuture, (prev, cur) -> prev + cur)
.thenApply(String::toUpperCase)
.join();
return hw;
}Let's have it try/catched:
public String helloWorldAsyncCalls() {
try {
CompletableFuture<String> hello = CompletableFuture.supplyAsync(() -> this.hws.hello());
CompletableFuture<String> world = CompletableFuture.supplyAsync(() -> this.hws.world());
CompletableFuture<String> hiCompletableFuture = CompletableFuture.supplyAsync(() -> {
delay(1000);
return "Hi CompletableFuture!";
});
String hw = hello
.thenCombine(world, (h, w) -> h + w)
.thenCombine(hiCompletableFuture, (prev, cur) -> prev + cur)
.thenApply(String::toUpperCase)
.join();
return hw;
} catch (Exception e) {
log("Exception is " + e);
throw e;
}
}CompletableFuture API has functional style of handling exceptions Three options available:
- Catch Exception and Recover
- handle()
- exceptionally()
- Catch Exception and Does not Recover
- whenComplete()
The third test method produces an exception:
[ForkJoinPool.commonPool-worker-5] - inside world
[ForkJoinPool.commonPool-worker-19] - inside hello
[ForkJoinPool.commonPool-worker-19] - Exception after world is: java.lang.NullPointerException
But how? The handle function is called anyway, even if no exception occured. Then the result parameter can be used.

Catches the Exception but does not recover from it.

By default, CompletableFuture uses the Common ForkJoinPool. The no of threads in the pool == number of cores.
- Common ForkJoinPool is shared by
- ParallelStreams
- CompletableFuture
- Its common for applications to use ParallelStreams and CompletableFuture together
- The following issues may occur:
- Thread being blocked by a time consuming task
- Thread not available
- The following issues may occur:
Creating a User-defined ThreadPool:
Executors.newFixedThreadPool(Runtime.getRuntime().availableProcessors());Motivation: See com.learnjava.completablefuture.CompletableFutureHelloWorld.helloWorldCombined3AsyncTasksWithLog.
Test Output:

The corresponding test log outputs show, that the completion stage functions are all executed in the same thread.
Async() Overloaded Functions
- Using async() functions allows you to change the thread of execution
- Use this when you have blocking operations in your
Completablefuturepipeline
| Async Function | Regular Function |
|---|---|
| thenCombineAsync() | thenCombine() |
| thenApplyAsync() | thenApply() |
| thenComposeAsync() | thenCompose() |
| thenAcceptAsync() | thenAccept() |
GitHubJobsClient is used to illustrate web client behaviour using CompletableFuture.
- static method that’s part of CompletableFuture API
- Use
allOf()when you are dealing with Multiple CompletableFuture
- static method that’s part of CompletableFuture API
- Use
anyOf()when you are dealing with retrieving data from multiple Data Sources
















