Skip to content

Commit b94c1d3

Browse files
committed
Work on iluwatar#74, server mode works with both UDP and TCP channels
1 parent ec8203a commit b94c1d3

File tree

10 files changed

+83
-43
lines changed

10 files changed

+83
-43
lines changed

reactor/src/main/java/com/iluwatar/reactor/AbstractNioChannel.java

Lines changed: 8 additions & 9 deletions
Original file line numberDiff line numberDiff line change
@@ -1,7 +1,6 @@
11
package com.iluwatar.reactor;
22

33
import java.io.IOException;
4-
import java.nio.ByteBuffer;
54
import java.nio.channels.SelectableChannel;
65
import java.nio.channels.SelectionKey;
76
import java.util.Map;
@@ -13,7 +12,7 @@ public abstract class AbstractNioChannel {
1312

1413
private SelectableChannel channel;
1514
private ChannelHandler handler;
16-
private Map<SelectableChannel, Queue<ByteBuffer>> channelToPendingWrites = new ConcurrentHashMap<>();
15+
private Map<SelectableChannel, Queue<Object>> channelToPendingWrites = new ConcurrentHashMap<>();
1716
private NioReactor reactor;
1817

1918
public AbstractNioChannel(ChannelHandler handler, SelectableChannel channel) {
@@ -31,7 +30,7 @@ public SelectableChannel getChannel() {
3130

3231
public abstract int getInterestedOps();
3332

34-
public abstract ByteBuffer read(SelectionKey key) throws IOException;
33+
public abstract Object read(SelectionKey key) throws IOException;
3534

3635
public void setHandler(ChannelHandler handler) {
3736
this.handler = handler;
@@ -43,9 +42,9 @@ public ChannelHandler getHandler() {
4342

4443
// Called from the context of reactor thread
4544
public void write(SelectionKey key) throws IOException {
46-
Queue<ByteBuffer> pendingWrites = channelToPendingWrites.get(key.channel());
45+
Queue<Object> pendingWrites = channelToPendingWrites.get(key.channel());
4746
while (true) {
48-
ByteBuffer pendingWrite = pendingWrites.poll();
47+
Object pendingWrite = pendingWrites.poll();
4948
if (pendingWrite == null) {
5049
System.out.println("No more pending writes");
5150
reactor.changeOps(key, SelectionKey.OP_READ);
@@ -56,10 +55,10 @@ public void write(SelectionKey key) throws IOException {
5655
}
5756
}
5857

59-
protected abstract void doWrite(ByteBuffer pendingWrite, SelectionKey key) throws IOException;
58+
protected abstract void doWrite(Object pendingWrite, SelectionKey key) throws IOException;
6059

61-
public void write(ByteBuffer buffer, SelectionKey key) {
62-
Queue<ByteBuffer> pendingWrites = this.channelToPendingWrites.get(key.channel());
60+
public void write(Object data, SelectionKey key) {
61+
Queue<Object> pendingWrites = this.channelToPendingWrites.get(key.channel());
6362
if (pendingWrites == null) {
6463
synchronized (this.channelToPendingWrites) {
6564
pendingWrites = this.channelToPendingWrites.get(key.channel());
@@ -69,7 +68,7 @@ public void write(ByteBuffer buffer, SelectionKey key) {
6968
}
7069
}
7170
}
72-
pendingWrites.add(buffer);
71+
pendingWrites.add(data);
7372
reactor.changeOps(key, SelectionKey.OP_WRITE);
7473
}
7574
}

reactor/src/main/java/com/iluwatar/reactor/AppClient.java

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -14,8 +14,8 @@
1414
public class AppClient {
1515

1616
public static void main(String[] args) {
17-
// new Thread(new LoggingClient("Client 1", 6666)).start();
18-
// new Thread(new LoggingClient("Client 2", 6667)).start();
17+
new Thread(new LoggingClient("Client 1", 6666)).start();
18+
new Thread(new LoggingClient("Client 2", 6667)).start();
1919
new Thread(new UDPLoggingClient(6668)).start();
2020
}
2121

Lines changed: 1 addition & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -1,9 +1,8 @@
11
package com.iluwatar.reactor;
22

3-
import java.nio.ByteBuffer;
43
import java.nio.channels.SelectionKey;
54

65
public interface ChannelHandler {
76

8-
void handleChannelRead(AbstractNioChannel channel, ByteBuffer readBytes, SelectionKey key);
7+
void handleChannelRead(AbstractNioChannel channel, Object readObject, SelectionKey key);
98
}
Lines changed: 1 addition & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -1,8 +1,7 @@
11
package com.iluwatar.reactor;
22

3-
import java.nio.ByteBuffer;
43
import java.nio.channels.SelectionKey;
54

65
public interface Dispatcher {
7-
void onChannelReadEvent(AbstractNioChannel channel, ByteBuffer readBytes, SelectionKey key);
6+
void onChannelReadEvent(AbstractNioChannel channel, Object readObject, SelectionKey key);
87
}

reactor/src/main/java/com/iluwatar/reactor/LoggingHandler.java

Lines changed: 20 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -3,16 +3,31 @@
33
import java.nio.ByteBuffer;
44
import java.nio.channels.SelectionKey;
55

6+
import com.iluwatar.reactor.NioDatagramChannel.DatagramPacket;
7+
68
public class LoggingHandler implements ChannelHandler {
79

810
@Override
9-
public void handleChannelRead(AbstractNioChannel channel, ByteBuffer readBytes, SelectionKey key) {
10-
byte[] data = readBytes.array();
11-
doLogging(data);
12-
sendEchoReply(channel, data, key);
11+
public void handleChannelRead(AbstractNioChannel channel, Object readObject, SelectionKey key) {
12+
if (readObject instanceof ByteBuffer) {
13+
byte[] data = ((ByteBuffer)readObject).array();
14+
doLogging(data);
15+
sendReply(channel, data, key);
16+
} else if (readObject instanceof DatagramPacket) {
17+
DatagramPacket datagram = (DatagramPacket)readObject;
18+
byte[] data = datagram.getData().array();
19+
doLogging(data);
20+
sendReply(channel, datagram, key);
21+
}
22+
}
23+
24+
private void sendReply(AbstractNioChannel channel, DatagramPacket datagram, SelectionKey key) {
25+
DatagramPacket replyPacket = new DatagramPacket(ByteBuffer.wrap("Data logged successfully".getBytes()));
26+
replyPacket.setReceiver(datagram.getSender());
27+
channel.write(replyPacket, key);
1328
}
1429

15-
private void sendEchoReply(AbstractNioChannel channel, byte[] data, SelectionKey key) {
30+
private void sendReply(AbstractNioChannel channel, byte[] data, SelectionKey key) {
1631
ByteBuffer buffer = ByteBuffer.wrap("Data logged successfully".getBytes());
1732
channel.write(buffer, key);
1833
}

reactor/src/main/java/com/iluwatar/reactor/NioDatagramChannel.java

Lines changed: 39 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -3,6 +3,7 @@
33
import java.io.IOException;
44
import java.net.InetAddress;
55
import java.net.InetSocketAddress;
6+
import java.net.SocketAddress;
67
import java.nio.ByteBuffer;
78
import java.nio.channels.DatagramChannel;
89
import java.nio.channels.SelectionKey;
@@ -22,10 +23,12 @@ public int getInterestedOps() {
2223
}
2324

2425
@Override
25-
public ByteBuffer read(SelectionKey key) throws IOException {
26+
public Object read(SelectionKey key) throws IOException {
2627
ByteBuffer buffer = ByteBuffer.allocate(1024);
27-
getChannel().receive(buffer);
28-
return buffer;
28+
SocketAddress sender = getChannel().receive(buffer);
29+
DatagramPacket packet = new DatagramPacket(buffer);
30+
packet.setSender(sender);
31+
return packet;
2932
}
3033

3134
@Override
@@ -40,8 +43,38 @@ public void bind() throws IOException {
4043
}
4144

4245
@Override
43-
protected void doWrite(ByteBuffer pendingWrite, SelectionKey key) throws IOException {
44-
pendingWrite.flip();
45-
getChannel().write(pendingWrite);
46+
protected void doWrite(Object pendingWrite, SelectionKey key) throws IOException {
47+
DatagramPacket pendingPacket = (DatagramPacket) pendingWrite;
48+
getChannel().send(pendingPacket.getData(), pendingPacket.getReceiver());
49+
}
50+
51+
static class DatagramPacket {
52+
private SocketAddress sender;
53+
private ByteBuffer data;
54+
private SocketAddress receiver;
55+
56+
public DatagramPacket(ByteBuffer data) {
57+
this.data = data;
58+
}
59+
60+
public SocketAddress getSender() {
61+
return sender;
62+
}
63+
64+
public void setSender(SocketAddress sender) {
65+
this.sender = sender;
66+
}
67+
68+
public SocketAddress getReceiver() {
69+
return receiver;
70+
}
71+
72+
public void setReceiver(SocketAddress receiver) {
73+
this.receiver = receiver;
74+
}
75+
76+
public ByteBuffer getData() {
77+
return data;
78+
}
4679
}
4780
}

reactor/src/main/java/com/iluwatar/reactor/NioReactor.java

Lines changed: 5 additions & 9 deletions
Original file line numberDiff line numberDiff line change
@@ -1,7 +1,6 @@
11
package com.iluwatar.reactor;
22

33
import java.io.IOException;
4-
import java.nio.ByteBuffer;
54
import java.nio.channels.SelectionKey;
65
import java.nio.channels.Selector;
76
import java.nio.channels.ServerSocketChannel;
@@ -75,7 +74,6 @@ private void processPendingChanges() {
7574
Iterator<Command> iterator = pendingChanges.iterator();
7675
while (iterator.hasNext()) {
7776
Command command = iterator.next();
78-
System.out.println("Processing pending change: " + command);
7977
command.execute();
8078
iterator.remove();
8179
}
@@ -85,10 +83,8 @@ private void processKey(SelectionKey key) throws IOException {
8583
if (key.isAcceptable()) {
8684
acceptConnection(key);
8785
} else if (key.isReadable()) {
88-
System.out.println("Key is readable");
8986
read(key);
9087
} else if (key.isWritable()) {
91-
System.out.println("Key is writable");
9288
write(key);
9389
}
9490
}
@@ -99,10 +95,10 @@ private void write(SelectionKey key) throws IOException {
9995
}
10096

10197
private void read(SelectionKey key) {
102-
ByteBuffer readBytes;
98+
Object readObject;
10399
try {
104-
readBytes = ((AbstractNioChannel)key.attachment()).read(key);
105-
dispatchReadEvent(key, readBytes);
100+
readObject = ((AbstractNioChannel)key.attachment()).read(key);
101+
dispatchReadEvent(key, readObject);
106102
} catch (IOException e) {
107103
try {
108104
key.channel().close();
@@ -112,8 +108,8 @@ private void read(SelectionKey key) {
112108
}
113109
}
114110

115-
private void dispatchReadEvent(SelectionKey key, ByteBuffer readBytes) {
116-
dispatcher.onChannelReadEvent((AbstractNioChannel)key.attachment(), readBytes, key);
111+
private void dispatchReadEvent(SelectionKey key, Object readObject) {
112+
dispatcher.onChannelReadEvent((AbstractNioChannel)key.attachment(), readObject, key);
117113
}
118114

119115
private void acceptConnection(SelectionKey key) throws IOException {

reactor/src/main/java/com/iluwatar/reactor/NioServerSocketChannel.java

Lines changed: 3 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -45,8 +45,9 @@ public void bind() throws IOException {
4545
}
4646

4747
@Override
48-
protected void doWrite(ByteBuffer pendingWrite, SelectionKey key) throws IOException {
48+
protected void doWrite(Object pendingWrite, SelectionKey key) throws IOException {
49+
ByteBuffer pendingBuffer = (ByteBuffer) pendingWrite;
4950
System.out.println("Writing on channel");
50-
((SocketChannel)key.channel()).write(pendingWrite);
51+
((SocketChannel)key.channel()).write(pendingBuffer);
5152
}
5253
}
Lines changed: 2 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -1,14 +1,13 @@
11
package com.iluwatar.reactor;
22

3-
import java.nio.ByteBuffer;
43
import java.nio.channels.SelectionKey;
54

65
public class SameThreadDispatcher implements Dispatcher {
76

87
@Override
9-
public void onChannelReadEvent(AbstractNioChannel channel, ByteBuffer readBytes, SelectionKey key) {
8+
public void onChannelReadEvent(AbstractNioChannel channel, Object readObject, SelectionKey key) {
109
if (channel.getHandler() != null) {
11-
channel.getHandler().handleChannelRead(channel, readBytes, key);
10+
channel.getHandler().handleChannelRead(channel, readObject, key);
1211
}
1312
}
1413
}

reactor/src/main/java/com/iluwatar/reactor/ThreadPoolDispatcher.java

Lines changed: 2 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -1,6 +1,5 @@
11
package com.iluwatar.reactor;
22

3-
import java.nio.ByteBuffer;
43
import java.nio.channels.SelectionKey;
54
import java.util.concurrent.ExecutorService;
65
import java.util.concurrent.Executors;
@@ -14,12 +13,12 @@ public ThreadPoolDispatcher(int poolSize) {
1413
}
1514

1615
@Override
17-
public void onChannelReadEvent(AbstractNioChannel channel, ByteBuffer readBytes, SelectionKey key) {
16+
public void onChannelReadEvent(AbstractNioChannel channel, Object readObject, SelectionKey key) {
1817
exectorService.execute(new Runnable() {
1918

2019
@Override
2120
public void run() {
22-
ThreadPoolDispatcher.super.onChannelReadEvent(channel, readBytes, key);
21+
ThreadPoolDispatcher.super.onChannelReadEvent(channel, readObject, key);
2322
}
2423
});
2524
}

0 commit comments

Comments
 (0)