Skip to content

Commit 0b35307

Browse files
committed
Issue menacher#40 WebsocketLoginHandler is updated to handle reconnect
1 parent 9c5501b commit 0b35307

3 files changed

Lines changed: 154 additions & 86 deletions

File tree

Lines changed: 78 additions & 77 deletions
Original file line numberDiff line numberDiff line change
@@ -1,77 +1,78 @@
1-
package org.menacheri.jetserver.handlers.netty;
2-
3-
import java.io.ByteArrayInputStream;
4-
import java.io.IOException;
5-
6-
import org.jboss.netty.buffer.ChannelBuffer;
7-
import org.jboss.netty.channel.Channel;
8-
import org.jboss.netty.channel.ChannelHandler.Sharable;
9-
import org.jboss.netty.channel.ChannelHandlerContext;
10-
import org.jboss.netty.handler.codec.oneone.OneToOneDecoder;
11-
import org.menacheri.jetserver.convert.Transform;
12-
import org.menacheri.jetserver.convert.flex.AMFDeSerializer;
13-
import org.slf4j.Logger;
14-
import org.slf4j.LoggerFactory;
15-
16-
import flex.messaging.io.SerializationContext;
17-
18-
/**
19-
* This class takes a {@link ChannelBuffer} containing AMF3 object as input and
20-
* creates a java object from it using the {@link AMFDeSerializer} class.
21-
*
22-
* @author Abraham Menacherry.
23-
*
24-
*/
25-
@Sharable
26-
public class AMF3ToJavaObjectDecoder extends OneToOneDecoder implements Transform<ChannelBuffer, Object>
27-
{
28-
private static final Logger LOG = LoggerFactory.getLogger(AMF3ToJavaObjectDecoder.class);
29-
30-
@Override
31-
protected Object decode(ChannelHandlerContext ctx, Channel channel,
32-
Object msg) throws Exception
33-
{
34-
if(null == msg)
35-
{
36-
LOG.warn("Incoming message is null");
37-
return msg;
38-
}
39-
ChannelBuffer buffer = (ChannelBuffer)msg;
40-
// buffer.array() will ignore the readerIndex. Hence readBytes is used
41-
// and then .array is called
42-
ByteArrayInputStream bis = new ByteArrayInputStream(buffer.readBytes(
43-
buffer.readableBytes()).array());
44-
return deSerializeObjectFromStream(bis);
45-
}
46-
47-
@Override
48-
public Object convert(ChannelBuffer buffer) throws Exception {
49-
ByteArrayInputStream bis = new ByteArrayInputStream(buffer.array());
50-
return deSerializeObjectFromStream(bis);
51-
}
52-
53-
protected Object deSerializeObjectFromStream(ByteArrayInputStream bis) throws Exception
54-
{
55-
AMFDeSerializer serializer = new AMFDeSerializer(SerializationContext
56-
.getSerializationContext());
57-
Object o = null;
58-
try
59-
{
60-
// do the deserialization.
61-
o = serializer.fromAmf(bis);
62-
LOG.trace("Serialized object: {}",o);
63-
}
64-
catch (IOException e)
65-
{
66-
LOG.error("IO error in AMF3ToJavaObjectDecoder: {}",e);
67-
throw e;
68-
}
69-
catch (ClassNotFoundException e)
70-
{
71-
LOG.error("Error in AMF3ToJavaObjectDecoder: {}.\n " +
72-
"Check if flash class has corresponding java class",e);
73-
throw e;
74-
}
75-
return o;
76-
}
77-
}
1+
package org.menacheri.jetserver.handlers.netty;
2+
3+
import java.io.ByteArrayInputStream;
4+
import java.io.IOException;
5+
6+
import org.jboss.netty.buffer.ChannelBuffer;
7+
import org.jboss.netty.channel.Channel;
8+
import org.jboss.netty.channel.ChannelHandler.Sharable;
9+
import org.jboss.netty.channel.ChannelHandlerContext;
10+
import org.jboss.netty.handler.codec.oneone.OneToOneDecoder;
11+
import org.menacheri.jetserver.convert.Transform;
12+
import org.menacheri.jetserver.convert.flex.AMFDeSerializer;
13+
import org.slf4j.Logger;
14+
import org.slf4j.LoggerFactory;
15+
16+
import flex.messaging.io.SerializationContext;
17+
18+
/**
19+
* This class takes a {@link ChannelBuffer} containing AMF3 object as input and
20+
* creates a java object from it using the {@link AMFDeSerializer} class.
21+
*
22+
* @author Abraham Menacherry.
23+
*
24+
*/
25+
@Sharable
26+
public class AMF3ToJavaObjectDecoder extends OneToOneDecoder implements Transform<ChannelBuffer, Object>
27+
{
28+
private static final Logger LOG = LoggerFactory.getLogger(AMF3ToJavaObjectDecoder.class);
29+
30+
@Override
31+
protected Object decode(ChannelHandlerContext ctx, Channel channel,
32+
Object msg) throws Exception
33+
{
34+
if(null == msg)
35+
{
36+
LOG.warn("Incoming message is null");
37+
return msg;
38+
}
39+
ChannelBuffer buffer = (ChannelBuffer)msg;
40+
// buffer.array() will ignore the readerIndex. Hence readBytes is used
41+
// and then .array is called
42+
ByteArrayInputStream bis = new ByteArrayInputStream(buffer.readBytes(
43+
buffer.readableBytes()).array());
44+
return deSerializeObjectFromStream(bis);
45+
}
46+
47+
@Override
48+
public Object convert(ChannelBuffer buffer) throws Exception {
49+
ByteArrayInputStream bis = new ByteArrayInputStream(buffer.array());
50+
return deSerializeObjectFromStream(bis);
51+
}
52+
53+
protected Object deSerializeObjectFromStream(ByteArrayInputStream bis) throws Exception
54+
{
55+
AMFDeSerializer serializer = new AMFDeSerializer(SerializationContext
56+
.getSerializationContext());
57+
Object o = null;
58+
try
59+
{
60+
// do the deserialization.
61+
o = serializer.fromAmf(bis);
62+
LOG.trace("Serialized object: {}",o);
63+
bis.close();
64+
}
65+
catch (IOException e)
66+
{
67+
LOG.error("IO error in AMF3ToJavaObjectDecoder: {}",e);
68+
throw e;
69+
}
70+
catch (ClassNotFoundException e)
71+
{
72+
LOG.error("Error in AMF3ToJavaObjectDecoder: {}.\n " +
73+
"Check if flash class has corresponding java class",e);
74+
throw e;
75+
}
76+
return o;
77+
}
78+
}

jetserver/src/main/java/org/menacheri/jetserver/handlers/netty/LoginHandler.java

Lines changed: 6 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -59,16 +59,17 @@ public void messageReceived(final ChannelHandlerContext ctx,
5959
final ChannelBuffer buffer = (ChannelBuffer) event.getSource();
6060
final Channel channel = e.getChannel();
6161
int type = event.getType();
62-
if (type == Events.LOG_IN)
62+
if (Events.LOG_IN == type)
6363
{
6464
LOG.debug("Login attempt from {}", channel.getRemoteAddress());
6565
Player player = lookupPlayer(buffer, channel);
6666
handleLogin(player, channel, buffer);
6767
}
68-
else if (type == Events.RECONNECT)
68+
else if (Events.RECONNECT == type)
6969
{
7070
LOG.debug("Reconnect attempt from {}", channel.getRemoteAddress());
71-
PlayerSession playerSession = lookupSession(buffer);
71+
String reconnectKey = NettyUtils.readString(buffer);
72+
PlayerSession playerSession = lookupSession(reconnectKey);
7273
handleReconnect(playerSession, channel, buffer);
7374
}
7475
else
@@ -99,9 +100,8 @@ public Player lookupPlayer(final ChannelBuffer buffer, final Channel channel)
99100
return player;
100101
}
101102

102-
public PlayerSession lookupSession(final ChannelBuffer buffer)
103+
public PlayerSession lookupSession(final String reconnectKey)
103104
{
104-
String reconnectKey = NettyUtils.readString(buffer);
105105
PlayerSession playerSession = (PlayerSession)reconnectRegistry.getSession(reconnectKey);
106106
if(null != playerSession)
107107
{
@@ -213,7 +213,7 @@ protected void handleReJoin(PlayerSession playerSession, GameRoom gameRoom, Chan
213213
playerSession.setTcpSender(sender);
214214
// Connect the pipeline to the game room.
215215
gameRoom.connectSession(playerSession);
216-
playerSession.setWriteable(true);
216+
playerSession.setWriteable(true);// TODO remove if unnecessary. It should be done in start event
217217
// Send the re-connect event so that it will in turn send the START event.
218218
playerSession.onEvent(new ReconnetEvent(sender));
219219
loginUdp(playerSession, buffer);

jetserver/src/main/java/org/menacheri/jetserver/handlers/netty/WebSocketLoginHandler.java

Lines changed: 70 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -13,14 +13,17 @@
1313
import org.menacheri.jetserver.app.GameRoom;
1414
import org.menacheri.jetserver.app.Player;
1515
import org.menacheri.jetserver.app.PlayerSession;
16+
import org.menacheri.jetserver.app.Session;
1617
import org.menacheri.jetserver.communication.NettyTCPMessageSender;
1718
import org.menacheri.jetserver.event.Event;
1819
import org.menacheri.jetserver.event.Events;
1920
import org.menacheri.jetserver.event.impl.DefaultEvent;
21+
import org.menacheri.jetserver.event.impl.ReconnetEvent;
2022
import org.menacheri.jetserver.service.LookupService;
2123
import org.menacheri.jetserver.service.UniqueIDGeneratorService;
2224
import org.menacheri.jetserver.service.impl.ReconnectSessionRegistry;
2325
import org.menacheri.jetserver.util.Credentials;
26+
import org.menacheri.jetserver.util.JetConfig;
2427
import org.menacheri.jetserver.util.SimpleCredentials;
2528
import org.slf4j.Logger;
2629
import org.slf4j.LoggerFactory;
@@ -60,8 +63,8 @@ public void messageReceived(ChannelHandlerContext ctx, MessageEvent e)
6063
String data = frame.getText();
6164
LOG.trace("From websocket: " + data);
6265
Event event = gson.fromJson(data, DefaultEvent.class);
63-
64-
if (Events.LOG_IN == event.getType())
66+
int type = event.getType();
67+
if (Events.LOG_IN == type)
6568
{
6669
LOG.trace("Login attempt from {}", channel.getRemoteAddress());
6770
List<String> credList = null;
@@ -70,6 +73,12 @@ public void messageReceived(ChannelHandlerContext ctx, MessageEvent e)
7073
handleLogin(player, channel);
7174
handleGameRoomJoin(player, channel, credList.get(2));
7275
}
76+
else if (type == Events.RECONNECT)
77+
{
78+
LOG.debug("Reconnect attempt from {}", channel.getRemoteAddress());
79+
PlayerSession playerSession = lookupSession((String)event.getSource());
80+
handleReconnect(playerSession, channel);
81+
}
7382
else
7483
{
7584
LOG.error(
@@ -86,6 +95,60 @@ public void messageReceived(ChannelHandlerContext ctx, MessageEvent e)
8695
}
8796
}
8897

98+
public PlayerSession lookupSession(final String reconnectKey)
99+
{
100+
PlayerSession playerSession = (PlayerSession)reconnectRegistry.getSession(reconnectKey);
101+
if(null != playerSession)
102+
{
103+
synchronized(playerSession){
104+
// if its an already active session then do not allow a
105+
// reconnect. So the only state in which a client is allowed to
106+
// reconnect is if it is "NOT_CONNECTED"
107+
if(playerSession.getStatus() == Session.Status.NOT_CONNECTED)
108+
{
109+
playerSession.setStatus(Session.Status.CONNECTING);
110+
}
111+
else
112+
{
113+
playerSession = null;
114+
}
115+
}
116+
}
117+
return playerSession;
118+
}
119+
120+
protected void handleReconnect(PlayerSession playerSession, Channel channel)
121+
{
122+
if (null != playerSession)
123+
{
124+
channel.write(eventToFrame(Events.LOG_IN_SUCCESS, null));
125+
GameRoom gameRoom = playerSession.getGameRoom();
126+
gameRoom.disconnectSession(playerSession);
127+
if (null != playerSession.getTcpSender())
128+
playerSession.getTcpSender().close();
129+
130+
handleReJoin(playerSession, gameRoom, channel);
131+
}
132+
else
133+
{
134+
// Write future and close channel
135+
closeChannelWithLoginFailure(channel);
136+
}
137+
}
138+
139+
protected void handleReJoin(PlayerSession playerSession, GameRoom gameRoom, Channel channel)
140+
{
141+
// Set the tcp channel on the session.
142+
NettyTCPMessageSender sender = new NettyTCPMessageSender(channel);
143+
playerSession.setTcpSender(sender);
144+
// Connect the pipeline to the game room.
145+
gameRoom.connectSession(playerSession);
146+
channel.write(Events.GAME_ROOM_JOIN_SUCCESS, null);//assumes that the protocol applied will take care of event objects.
147+
playerSession.setWriteable(true);// TODO remove if unnecessary. It should be done in start event
148+
// Send the re-connect event so that it will in turn send the START event.
149+
playerSession.onEvent(new ReconnetEvent(sender));
150+
}
151+
89152
public Player lookupPlayer(String username, String password)
90153
{
91154
Credentials credentials = new SimpleCredentials(username, password);
@@ -124,10 +187,14 @@ public void handleGameRoomJoin(Player player, Channel channel, String refKey)
124187
{
125188
PlayerSession playerSession = gameRoom.createPlayerSession(player);
126189
gameRoom.onLogin(playerSession);
190+
String reconnectKey = (String)idGeneratorService
191+
.generateFor(playerSession.getClass());
192+
playerSession.setAttribute(JetConfig.RECONNECT_KEY, reconnectKey);
193+
playerSession.setAttribute(JetConfig.RECONNECT_REGISTRY, reconnectRegistry);
127194
LOG.trace("Sending GAME_ROOM_JOIN_SUCCESS to channel {}",
128195
channel.getId());
129196
ChannelFuture future = channel.write(eventToFrame(
130-
Events.GAME_ROOM_JOIN_SUCCESS, null));
197+
Events.GAME_ROOM_JOIN_SUCCESS, reconnectKey));
131198
connectToGameRoom(gameRoom, playerSession, future);
132199
}
133200
else

0 commit comments

Comments
 (0)