Skip to content

Commit 1121a6c

Browse files
forketyforkpivovarit
authored andcommitted
Core threadpool implementation examples (eugenp#588)
1 parent 009e004 commit 1121a6c

3 files changed

Lines changed: 187 additions & 0 deletions

File tree

Lines changed: 22 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,22 @@
1+
package com.baeldung.threadpool;
2+
3+
import java.util.concurrent.ForkJoinTask;
4+
import java.util.concurrent.RecursiveTask;
5+
import java.util.stream.Collectors;
6+
7+
public class CountingTask extends RecursiveTask<Integer> {
8+
9+
private final TreeNode node;
10+
11+
public CountingTask(TreeNode node) {
12+
this.node = node;
13+
}
14+
15+
@Override
16+
protected Integer compute() {
17+
return node.value + node.children.stream()
18+
.map(childNode -> new CountingTask(childNode).fork())
19+
.collect(Collectors.summingInt(ForkJoinTask::join));
20+
}
21+
22+
}
Lines changed: 19 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,19 @@
1+
package com.baeldung.threadpool;
2+
3+
import java.util.Arrays;
4+
import java.util.HashSet;
5+
import java.util.Set;
6+
7+
public class TreeNode {
8+
9+
int value;
10+
11+
Set<TreeNode> children;
12+
13+
public TreeNode(int value, TreeNode... children) {
14+
this.value = value;
15+
this.children = new HashSet<>();
16+
this.children.addAll(Arrays.asList(children));
17+
}
18+
19+
}
Lines changed: 146 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,146 @@
1+
package com.baeldung.threadpool;
2+
3+
import java.util.concurrent.*;
4+
import java.util.concurrent.atomic.AtomicInteger;
5+
6+
import org.junit.Test;
7+
8+
import static org.junit.Assert.assertEquals;
9+
10+
public class CoreThreadPoolTest {
11+
12+
@Test(timeout = 1000)
13+
public void whenCallingExecuteWithRunnable_thenRunnableIsExecuted() throws InterruptedException {
14+
15+
CountDownLatch lock = new CountDownLatch(1);
16+
17+
Executor executor = Executors.newSingleThreadExecutor();
18+
executor.execute(() -> {
19+
System.out.println("Hello World");
20+
lock.countDown();
21+
});
22+
23+
lock.await(1000, TimeUnit.MILLISECONDS);
24+
}
25+
26+
@Test
27+
public void whenUsingExecutorServiceAndFuture_thenCanWaitOnFutureResult() throws InterruptedException, ExecutionException {
28+
29+
ExecutorService executorService = Executors.newFixedThreadPool(10);
30+
Future<String> future = executorService.submit(() -> "Hello World");
31+
String result = future.get();
32+
33+
assertEquals("Hello World", result);
34+
35+
}
36+
37+
@Test
38+
public void whenUsingFixedThreadPool_thenCoreAndMaximumThreadSizeAreTheSame() {
39+
40+
ThreadPoolExecutor executor = (ThreadPoolExecutor) Executors.newFixedThreadPool(2);
41+
executor.submit(() -> {
42+
Thread.sleep(1000);
43+
return null;
44+
});
45+
executor.submit(() -> {
46+
Thread.sleep(1000);
47+
return null;
48+
});
49+
executor.submit(() -> {
50+
Thread.sleep(1000);
51+
return null;
52+
});
53+
54+
assertEquals(2, executor.getPoolSize());
55+
assertEquals(1, executor.getQueue().size());
56+
57+
}
58+
59+
@Test
60+
public void whenUsingCachedThreadPool_thenPoolSizeGrowsUnbounded() {
61+
ThreadPoolExecutor executor = (ThreadPoolExecutor) Executors.newCachedThreadPool();
62+
executor.submit(() -> {
63+
Thread.sleep(1000);
64+
return null;
65+
});
66+
executor.submit(() -> {
67+
Thread.sleep(1000);
68+
return null;
69+
});
70+
executor.submit(() -> {
71+
Thread.sleep(1000);
72+
return null;
73+
});
74+
75+
assertEquals(3, executor.getPoolSize());
76+
assertEquals(0, executor.getQueue().size());
77+
78+
}
79+
80+
@Test(timeout = 1000)
81+
public void whenUsingSingleThreadPool_thenTasksExecuteSequentially() throws InterruptedException {
82+
83+
CountDownLatch lock = new CountDownLatch(2);
84+
AtomicInteger counter = new AtomicInteger();
85+
86+
ExecutorService executor = Executors.newSingleThreadExecutor();
87+
executor.submit(() -> {
88+
counter.set(1);
89+
lock.countDown();
90+
});
91+
executor.submit(() -> {
92+
counter.compareAndSet(1, 2);
93+
lock.countDown();
94+
});
95+
96+
lock.await(1000, TimeUnit.MILLISECONDS);
97+
assertEquals(2, counter.get());
98+
99+
}
100+
101+
@Test(timeout = 1000)
102+
public void whenSchedulingTask_thenTaskExecutesWithinGivenPeriod() throws InterruptedException {
103+
104+
CountDownLatch lock = new CountDownLatch(1);
105+
106+
ScheduledExecutorService executor = Executors.newScheduledThreadPool(5);
107+
executor.schedule(() -> {
108+
System.out.println("Hello World");
109+
lock.countDown();
110+
}, 500, TimeUnit.MILLISECONDS);
111+
112+
lock.await(1000, TimeUnit.MILLISECONDS);
113+
114+
}
115+
116+
@Test(timeout = 1000)
117+
public void whenSchedulingTaskWithFixedPeriod_thenTaskExecutesMultipleTimes() throws InterruptedException {
118+
119+
CountDownLatch lock = new CountDownLatch(3);
120+
121+
ScheduledExecutorService executor = Executors.newScheduledThreadPool(5);
122+
ScheduledFuture<?> future = executor.scheduleAtFixedRate(() -> {
123+
System.out.println("Hello World");
124+
lock.countDown();
125+
}, 500, 100, TimeUnit.MILLISECONDS);
126+
127+
lock.await();
128+
future.cancel(true);
129+
130+
}
131+
132+
@Test
133+
public void whenUsingForkJoinPool_thenSumOfTreeElementsIsCalculatedCorrectly() {
134+
135+
TreeNode tree = new TreeNode(5,
136+
new TreeNode(3), new TreeNode(2,
137+
new TreeNode(2), new TreeNode(8)));
138+
139+
ForkJoinPool forkJoinPool = ForkJoinPool.commonPool();
140+
int sum = forkJoinPool.invoke(new CountingTask(tree));
141+
142+
assertEquals(20, sum);
143+
}
144+
145+
146+
}

0 commit comments

Comments
 (0)