Skip to content

Commit 940a62b

Browse files
committed
Work on iluwatar#74, added unit test cases
1 parent b94c1d3 commit 940a62b

File tree

7 files changed

+114
-18
lines changed

7 files changed

+114
-18
lines changed

reactor/src/main/java/com/iluwatar/reactor/App.java

Lines changed: 20 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -4,19 +4,32 @@
44

55
public class App {
66

7+
private NioReactor reactor;
8+
79
public static void main(String[] args) {
810
try {
9-
NioReactor reactor = new NioReactor(new ThreadPoolDispatcher(2));
10-
LoggingHandler loggingHandler = new LoggingHandler();
11-
reactor
12-
.registerChannel(tcpChannel(6666, loggingHandler))
13-
.registerChannel(tcpChannel(6667, loggingHandler))
14-
.registerChannel(udpChannel(6668, loggingHandler))
15-
.start();
11+
new App().start();
1612
} catch (IOException e) {
1713
e.printStackTrace();
1814
}
1915
}
16+
17+
public void start() throws IOException {
18+
reactor = new NioReactor(new ThreadPoolDispatcher(2));
19+
20+
LoggingHandler loggingHandler = new LoggingHandler();
21+
22+
reactor
23+
.registerChannel(tcpChannel(6666, loggingHandler))
24+
.registerChannel(tcpChannel(6667, loggingHandler))
25+
.registerChannel(udpChannel(6668, loggingHandler))
26+
.start();
27+
}
28+
29+
public void stop() {
30+
reactor.stop();
31+
}
32+
2033

2134
private static AbstractNioChannel tcpChannel(int port, ChannelHandler handler) throws IOException {
2235
NioServerSocketChannel channel = new NioServerSocketChannel(port, handler);

reactor/src/main/java/com/iluwatar/reactor/AppClient.java

Lines changed: 25 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -10,15 +10,34 @@
1010
import java.net.InetSocketAddress;
1111
import java.net.Socket;
1212
import java.net.SocketException;
13+
import java.util.concurrent.ExecutorService;
14+
import java.util.concurrent.Executors;
15+
import java.util.concurrent.TimeUnit;
1316

1417
public class AppClient {
15-
18+
private ExecutorService service = Executors.newFixedThreadPool(3);
19+
1620
public static void main(String[] args) {
17-
new Thread(new LoggingClient("Client 1", 6666)).start();
18-
new Thread(new LoggingClient("Client 2", 6667)).start();
19-
new Thread(new UDPLoggingClient(6668)).start();
21+
new AppClient().start();
2022
}
2123

24+
public void start() {
25+
service.execute(new LoggingClient("Client 1", 6666));
26+
service.execute(new LoggingClient("Client 2", 6667));
27+
service.execute(new UDPLoggingClient(6668));
28+
}
29+
30+
public void stop() {
31+
service.shutdown();
32+
if (!service.isTerminated()) {
33+
service.shutdownNow();
34+
try {
35+
service.awaitTermination(1000, TimeUnit.SECONDS);
36+
} catch (InterruptedException e) {
37+
e.printStackTrace();
38+
}
39+
}
40+
}
2241

2342
/*
2443
* A logging client that sends logging requests to logging server
@@ -55,7 +74,7 @@ public void run() {
5574
}
5675

5776
private void writeLogs(PrintWriter writer, InputStream inputStream) throws IOException {
58-
for (int i = 0; i < 1; i++) {
77+
for (int i = 0; i < 4; i++) {
5978
writer.println(clientName + " - Log request: " + i);
6079
try {
6180
Thread.sleep(100);
@@ -86,7 +105,7 @@ public void run() {
86105
DatagramSocket socket = null;
87106
try {
88107
socket = new DatagramSocket();
89-
for (int i = 0; i < 1; i++) {
108+
for (int i = 0; i < 4; i++) {
90109
String message = "UDP Client" + " - Log request: " + i;
91110
try {
92111
DatagramPacket packet = new DatagramPacket(message.getBytes(), message.getBytes().length, new InetSocketAddress(InetAddress.getLocalHost(), port));

reactor/src/main/java/com/iluwatar/reactor/Dispatcher.java

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -4,4 +4,5 @@
44

55
public interface Dispatcher {
66
void onChannelReadEvent(AbstractNioChannel channel, Object readObject, SelectionKey key);
7+
void stop();
78
}

reactor/src/main/java/com/iluwatar/reactor/NioReactor.java

Lines changed: 22 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -9,6 +9,9 @@
99
import java.util.Queue;
1010
import java.util.Set;
1111
import java.util.concurrent.ConcurrentLinkedQueue;
12+
import java.util.concurrent.ExecutorService;
13+
import java.util.concurrent.Executors;
14+
import java.util.concurrent.TimeUnit;
1215

1316
/*
1417
* Abstractions
@@ -20,6 +23,7 @@ public class NioReactor {
2023
private Selector selector;
2124
private Dispatcher dispatcher;
2225
private Queue<Command> pendingChanges = new ConcurrentLinkedQueue<>();
26+
private ExecutorService reactorService = Executors.newSingleThreadExecutor();
2327

2428
public NioReactor(Dispatcher dispatcher) throws IOException {
2529
this.dispatcher = dispatcher;
@@ -34,7 +38,7 @@ public NioReactor registerChannel(AbstractNioChannel channel) throws IOException
3438
}
3539

3640
public void start() throws IOException {
37-
new Thread( new Runnable() {
41+
reactorService.execute(new Runnable() {
3842
@Override
3943
public void run() {
4044
try {
@@ -44,11 +48,27 @@ public void run() {
4448
e.printStackTrace();
4549
}
4650
}
47-
}, "Reactor Main").start();
51+
});
52+
}
53+
54+
public void stop() {
55+
reactorService.shutdownNow();
56+
selector.wakeup();
57+
try {
58+
reactorService.awaitTermination(4, TimeUnit.SECONDS);
59+
} catch (InterruptedException e) {
60+
e.printStackTrace();
61+
}
62+
dispatcher.stop();
4863
}
4964

5065
private void eventLoop() throws IOException {
5166
while (true) {
67+
68+
if (Thread.interrupted()) {
69+
break;
70+
}
71+
5272
// honor any pending requests first
5373
processPendingChanges();
5474

reactor/src/main/java/com/iluwatar/reactor/SameThreadDispatcher.java

Lines changed: 5 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -10,4 +10,9 @@ public void onChannelReadEvent(AbstractNioChannel channel, Object readObject, Se
1010
channel.getHandler().handleChannelRead(channel, readObject, key);
1111
}
1212
}
13+
14+
@Override
15+
public void stop() {
16+
// no-op
17+
}
1318
}

reactor/src/main/java/com/iluwatar/reactor/ThreadPoolDispatcher.java

Lines changed: 14 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -3,24 +3,35 @@
33
import java.nio.channels.SelectionKey;
44
import java.util.concurrent.ExecutorService;
55
import java.util.concurrent.Executors;
6+
import java.util.concurrent.TimeUnit;
67

78
public class ThreadPoolDispatcher extends SameThreadDispatcher {
89

9-
private ExecutorService exectorService;
10+
private ExecutorService executorService;
1011

1112
public ThreadPoolDispatcher(int poolSize) {
12-
this.exectorService = Executors.newFixedThreadPool(poolSize);
13+
this.executorService = Executors.newFixedThreadPool(poolSize);
1314
}
1415

1516
@Override
1617
public void onChannelReadEvent(AbstractNioChannel channel, Object readObject, SelectionKey key) {
17-
exectorService.execute(new Runnable() {
18+
executorService.execute(new Runnable() {
1819

1920
@Override
2021
public void run() {
2122
ThreadPoolDispatcher.super.onChannelReadEvent(channel, readObject, key);
2223
}
2324
});
2425
}
26+
27+
@Override
28+
public void stop() {
29+
executorService.shutdownNow();
30+
try {
31+
executorService.awaitTermination(1000, TimeUnit.SECONDS);
32+
} catch (InterruptedException e) {
33+
e.printStackTrace();
34+
}
35+
}
2536

2637
}
Lines changed: 27 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,27 @@
1+
package com.iluwatar.reactor;
2+
3+
import java.io.IOException;
4+
5+
import org.junit.Test;
6+
7+
public class AppTest {
8+
9+
@Test
10+
public void testApp() throws IOException {
11+
App app = new App();
12+
app.start();
13+
14+
AppClient client = new AppClient();
15+
client.start();
16+
17+
try {
18+
Thread.sleep(2000);
19+
} catch (InterruptedException e) {
20+
e.printStackTrace();
21+
}
22+
23+
client.stop();
24+
25+
app.stop();
26+
}
27+
}

0 commit comments

Comments
 (0)