Skip to content

Commit 718e199

Browse files
authored
BAEL-4466 Smart Batching in Java (#14190)
* Tutorial code for BAEL-4466 Smart Batching in Java * Revert "Tutorial code for BAEL-4466 Smart Batching in Java" This reverts commit d8d4fa7a42b0806d1f0a64d53425a3a4337cb79d. * Tutorial code for BAEL-4466 Smart Batching in Java
1 parent 91b0d64 commit 718e199

3 files changed

Lines changed: 183 additions & 0 deletions

File tree

Lines changed: 72 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,72 @@
1+
package com.baeldung.smartbatching;
2+
3+
import java.io.IOException;
4+
import java.nio.file.Files;
5+
import java.nio.file.Path;
6+
import java.nio.file.Paths;
7+
import java.nio.file.StandardOpenOption;
8+
import java.util.ArrayList;
9+
import java.util.Collections;
10+
import java.util.HashSet;
11+
import java.util.List;
12+
import java.util.Set;
13+
import java.util.concurrent.Executors;
14+
import java.util.concurrent.Future;
15+
import java.util.concurrent.ScheduledExecutorService;
16+
17+
/**
18+
* @author KPentaris
19+
* @date 07/06/2023
20+
* @project design-patterns-behavioral-2
21+
*/
22+
public class BatchingApp {
23+
24+
static void simpleProcessing() throws Exception {
25+
final Path testPath = Paths.get("./testio.txt");
26+
testPath.toFile().createNewFile();
27+
ScheduledExecutorService executorService = Executors.newScheduledThreadPool(100);
28+
Set<Future> futures = new HashSet<>();
29+
for (int i = 0; i < 50000; i++) {
30+
futures.add(executorService.submit(() -> {
31+
try {
32+
Files.write(testPath, Collections.singleton(Thread.currentThread().getName()), StandardOpenOption.APPEND);
33+
} catch (IOException e) {
34+
e.printStackTrace();
35+
}
36+
}));
37+
}
38+
long start = System.currentTimeMillis();
39+
for (Future future : futures) {
40+
future.get();
41+
}
42+
System.out.println("Time: " + (System.currentTimeMillis() - start));
43+
executorService.shutdown();
44+
}
45+
46+
static void batchedProcessing() throws Exception {
47+
final Path testPath = Paths.get("./testio.txt");
48+
testPath.toFile().createNewFile();
49+
SmartBatcher batcher = new SmartBatcher(10, strings -> {
50+
List<String> content = new ArrayList<>(strings);
51+
content.add("-----Batch Operation-----");
52+
try {
53+
Files.write(testPath, content, StandardOpenOption.APPEND);
54+
} catch (IOException e) {
55+
e.printStackTrace();
56+
}
57+
});
58+
for (int i = 0; i < 50000; i++) {
59+
batcher.submit(Thread.currentThread().getName() + "-1");
60+
}
61+
long start = System.currentTimeMillis();
62+
while (!batcher.finished()) {
63+
Thread.sleep(10);
64+
}
65+
System.out.println("Time: " + (System.currentTimeMillis() - start));
66+
}
67+
68+
public static void main(String[] args) throws Exception {
69+
// simpleProcessing();
70+
batchedProcessing();
71+
}
72+
}
Lines changed: 59 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,59 @@
1+
package com.baeldung.smartbatching;
2+
3+
import java.util.ArrayList;
4+
import java.util.List;
5+
import java.util.Queue;
6+
import java.util.concurrent.ConcurrentLinkedQueue;
7+
import java.util.function.Consumer;
8+
9+
/**
10+
* @author KPentaris
11+
* @date 07/06/2023
12+
* @project design-patterns-behavioral-2
13+
*/
14+
public class MicroBatcher {
15+
Queue<String> tasksQueue = new ConcurrentLinkedQueue<>();
16+
Thread batchThread;
17+
int executionThreshold;
18+
int timeoutThreshold;
19+
boolean working = false;
20+
21+
MicroBatcher(int executionThreshold, int timeoutThreshold, Consumer<List<String>> executionLogic) {
22+
batchThread = new Thread(batchHandling(executionLogic));
23+
batchThread.setDaemon(true);
24+
batchThread.start();
25+
this.executionThreshold = executionThreshold;
26+
this.timeoutThreshold = timeoutThreshold;
27+
}
28+
29+
void submit(String task) {
30+
tasksQueue.add(task);
31+
}
32+
33+
Runnable batchHandling(Consumer<List<String>> executionLogic) {
34+
return () -> {
35+
while (!batchThread.isInterrupted()) {
36+
long startTime = System.currentTimeMillis();
37+
while (tasksQueue.size() < executionThreshold && (System.currentTimeMillis() - startTime) < timeoutThreshold) {
38+
try {
39+
Thread.sleep(100);
40+
} catch (InterruptedException e) {
41+
return; // exit the external loop
42+
}
43+
}
44+
List<String> tasks = new ArrayList<>(executionThreshold);
45+
while (tasksQueue.size() > 0 && tasks.size() < executionThreshold) {
46+
tasks.add(tasksQueue.poll());
47+
}
48+
working = true;
49+
executionLogic.accept(tasks);
50+
working = false;
51+
}
52+
};
53+
}
54+
55+
boolean finished() {
56+
return tasksQueue.isEmpty() && !working;
57+
}
58+
59+
}
Lines changed: 52 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,52 @@
1+
package com.baeldung.smartbatching;
2+
3+
import java.util.ArrayList;
4+
import java.util.List;
5+
import java.util.concurrent.BlockingQueue;
6+
import java.util.concurrent.LinkedBlockingQueue;
7+
import java.util.function.Consumer;
8+
9+
/**
10+
* @author KPentaris
11+
* @date 07/06/2023
12+
* @project design-patterns-behavioral-2
13+
*/
14+
public class SmartBatcher {
15+
BlockingQueue<String> tasksQueue = new LinkedBlockingQueue<>();
16+
Thread batchThread;
17+
int executionThreshold;
18+
boolean working = false;
19+
20+
SmartBatcher(int executionThreshold, Consumer<List<String>> executionLogic) {
21+
batchThread = new Thread(batchHandling(executionLogic));
22+
batchThread.setDaemon(true);
23+
batchThread.start();
24+
this.executionThreshold = executionThreshold;
25+
}
26+
27+
void submit(String task) {
28+
tasksQueue.add(task);
29+
}
30+
31+
Runnable batchHandling(Consumer<List<String>> executionLogic) {
32+
return () -> {
33+
while (!batchThread.isInterrupted()) {
34+
List<String> tasks = new ArrayList<>(executionThreshold);
35+
while (tasksQueue.drainTo(tasks, executionThreshold) == 0) {
36+
try {
37+
Thread.sleep(100);
38+
} catch (InterruptedException e) {
39+
return; // exit the external loop
40+
}
41+
}
42+
working = true;
43+
executionLogic.accept(tasks);
44+
working = false;
45+
}
46+
};
47+
}
48+
49+
boolean finished() {
50+
return tasksQueue.isEmpty() && !working;
51+
}
52+
}

0 commit comments

Comments
 (0)