Skip to content

Commit 79664c1

Browse files
committed
Use a lighter weight processor for upgrades.
Note that extending the Http11 processors is a hack that I think can be removed with some further refactoring of the connectors. git-svn-id: https://svn.apache.org/repos/asf/tomcat/trunk@1241410 13f79535-47bb-0310-9956-ffa450edef68
1 parent 8b36a9a commit 79664c1

26 files changed

Lines changed: 715 additions & 181 deletions

java/org/apache/catalina/connector/Request.java

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -74,7 +74,7 @@
7474
import org.apache.catalina.util.ParameterMap;
7575
import org.apache.catalina.util.StringParser;
7676
import org.apache.coyote.ActionCode;
77-
import org.apache.coyote.http11.UpgradeInbound;
77+
import org.apache.coyote.http11.upgrade.UpgradeInbound;
7878
import org.apache.juli.logging.Log;
7979
import org.apache.juli.logging.LogFactory;
8080
import org.apache.tomcat.util.ExceptionUtils;

java/org/apache/catalina/connector/RequestFacade.java

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -41,7 +41,7 @@
4141

4242
import org.apache.catalina.Globals;
4343
import org.apache.catalina.security.SecurityUtil;
44-
import org.apache.coyote.http11.UpgradeInbound;
44+
import org.apache.coyote.http11.upgrade.UpgradeInbound;
4545
import org.apache.tomcat.util.res.StringManager;
4646

4747
/**

java/org/apache/catalina/websocket/StreamInbound.java

Lines changed: 14 additions & 12 deletions
Original file line numberDiff line numberDiff line change
@@ -22,8 +22,9 @@
2222
import java.io.Reader;
2323

2424
import org.apache.catalina.util.Conversions;
25-
import org.apache.coyote.http11.UpgradeInbound;
26-
import org.apache.coyote.http11.UpgradeOutbound;
25+
import org.apache.coyote.http11.upgrade.UpgradeInbound;
26+
import org.apache.coyote.http11.upgrade.UpgradeOutbound;
27+
import org.apache.coyote.http11.upgrade.UpgradeProcessor;
2728
import org.apache.tomcat.util.buf.B2CConverter;
2829
import org.apache.tomcat.util.net.AbstractEndpoint.Handler.SocketState;
2930

@@ -41,7 +42,7 @@ public abstract class StreamInbound implements UpgradeInbound {
4142
// frames
4243
// TODO
4344

44-
private InputStream is = null;
45+
private UpgradeProcessor processor = null;
4546
private WsOutbound outbound;
4647

4748
@Override
@@ -51,8 +52,8 @@ public void setUpgradeOutbound(UpgradeOutbound upgradeOutbound) {
5152

5253

5354
@Override
54-
public void setInputStream(InputStream is) {
55-
this.is = is;
55+
public void setUpgradeProcessor(UpgradeProcessor processor) {
56+
this.processor = processor;
5657
}
5758

5859
public WsOutbound getStreamOutbound() {
@@ -64,7 +65,7 @@ public SocketState onData() throws IOException {
6465
// Must be start the start of a frame
6566

6667
// Read the first byte
67-
int i = is.read();
68+
int i = processor.read();
6869

6970
fin = (i & 0x80) > 0;
7071

@@ -80,7 +81,7 @@ public SocketState onData() throws IOException {
8081
validateOpCode(opCode);
8182

8283
// Read the next byte
83-
i = is.read();
84+
i = processor.read();
8485

8586
// Client data must be masked and this isn't
8687
if ((i & 0x80) == 0) {
@@ -91,19 +92,20 @@ public SocketState onData() throws IOException {
9192
payloadLength = i & 0x7F;
9293
if (payloadLength == 126) {
9394
byte[] extended = new byte[2];
94-
is.read(extended);
95+
processor.read(extended);
9596
payloadLength = Conversions.byteArrayToLong(extended);
9697
} else if (payloadLength == 127) {
9798
byte[] extended = new byte[8];
98-
is.read(extended);
99+
processor.read(extended);
99100
payloadLength = Conversions.byteArrayToLong(extended);
100101
}
101102

102103
byte[] mask = new byte[4];
103-
is.read(mask);
104+
processor.read(mask);
104105

105106
if (opCode == 1 || opCode == 2) {
106-
WsInputStream wsIs = new WsInputStream(is, mask, payloadLength);
107+
WsInputStream wsIs = new WsInputStream(processor, mask,
108+
payloadLength);
107109
if (opCode == 2) {
108110
onBinaryData(wsIs);
109111
} else {
@@ -123,7 +125,7 @@ public SocketState onData() throws IOException {
123125
// TODO: Handle control frames appearing in the middle of a multi-frame
124126
// message
125127

126-
return SocketState.UPGRADE;
128+
return SocketState.UPGRADED;
127129
}
128130

129131
protected abstract void onBinaryData(InputStream is) throws IOException;

java/org/apache/catalina/websocket/WsInputStream.java

Lines changed: 7 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -17,17 +17,19 @@
1717
package org.apache.catalina.websocket;
1818

1919
import java.io.IOException;
20-
import java.io.InputStream;
20+
21+
import org.apache.coyote.http11.upgrade.UpgradeProcessor;
2122

2223
public class WsInputStream extends java.io.InputStream {
2324

24-
private InputStream wrapped;
25+
private UpgradeProcessor processor;
2526
private byte[] mask;
2627
private long remaining;
2728
private long read;
2829

29-
public WsInputStream(InputStream wrapped, byte[] mask, long remaining) {
30-
this.wrapped = wrapped;
30+
public WsInputStream(UpgradeProcessor processor, byte[] mask,
31+
long remaining) {
32+
this.processor = processor;
3133
this.mask = mask;
3234
this.remaining = remaining;
3335
this.read = 0;
@@ -42,7 +44,7 @@ public int read() throws IOException {
4244
remaining--;
4345
read++;
4446

45-
int masked = wrapped.read();
47+
int masked = processor.read();
4648
return masked ^ mask[(int) ((read - 1) % 4)];
4749
}
4850

java/org/apache/catalina/websocket/WsOutbound.java

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -20,7 +20,7 @@
2020
import java.nio.ByteBuffer;
2121
import java.nio.CharBuffer;
2222

23-
import org.apache.coyote.http11.UpgradeOutbound;
23+
import org.apache.coyote.http11.upgrade.UpgradeOutbound;
2424
import org.apache.tomcat.util.buf.B2CConverter;
2525

2626
public class WsOutbound {

java/org/apache/coyote/AbstractProcessor.java

Lines changed: 16 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -19,6 +19,7 @@
1919
import java.io.IOException;
2020
import java.util.concurrent.Executor;
2121

22+
import org.apache.coyote.http11.upgrade.UpgradeInbound;
2223
import org.apache.tomcat.util.net.AbstractEndpoint;
2324
import org.apache.tomcat.util.net.AbstractEndpoint.Handler.SocketState;
2425
import org.apache.tomcat.util.net.SocketStatus;
@@ -31,12 +32,20 @@
3132
public abstract class AbstractProcessor<S> implements ActionHook, Processor {
3233

3334
protected Adapter adapter;
34-
protected final AsyncStateMachine asyncStateMachine;
35-
protected final AbstractEndpoint endpoint;
36-
protected final Request request;
37-
protected final Response response;
35+
protected AsyncStateMachine asyncStateMachine;
36+
protected AbstractEndpoint endpoint;
37+
protected Request request;
38+
protected Response response;
3839

3940

41+
/**
42+
* Intended for use by the Upgrade sub-classes that have no need to
43+
* initialise the request, response, etc.
44+
*/
45+
protected AbstractProcessor() {
46+
// NOOP
47+
}
48+
4049
public AbstractProcessor(AbstractEndpoint endpoint) {
4150
this.endpoint = endpoint;
4251
asyncStateMachine = new AsyncStateMachine(this);
@@ -96,7 +105,7 @@ public Executor getExecutor() {
96105

97106

98107
public boolean isAsync() {
99-
return asyncStateMachine.isAsync();
108+
return (asyncStateMachine != null && asyncStateMachine.isAsync());
100109
}
101110

102111

@@ -131,4 +140,6 @@ public abstract SocketState process(SocketWrapper<S> socket)
131140
* upgrade.
132141
*/
133142
public abstract SocketState upgradeDispatch() throws IOException;
143+
144+
public abstract UpgradeInbound getUpgradeInbound();
134145
}

java/org/apache/coyote/AbstractProtocol.java

Lines changed: 23 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -16,6 +16,7 @@
1616
*/
1717
package org.apache.coyote;
1818

19+
import java.io.IOException;
1920
import java.net.InetAddress;
2021
import java.util.concurrent.ConcurrentHashMap;
2122
import java.util.concurrent.ConcurrentLinkedQueue;
@@ -28,6 +29,7 @@
2829
import javax.management.MalformedObjectNameException;
2930
import javax.management.ObjectName;
3031

32+
import org.apache.coyote.http11.upgrade.UpgradeInbound;
3133
import org.apache.juli.logging.Log;
3234
import org.apache.tomcat.util.ExceptionUtils;
3335
import org.apache.tomcat.util.modeler.Registry;
@@ -576,9 +578,18 @@ public SocketState process(SocketWrapper<S> socket,
576578
// closed. If it works, the socket will be re-added to the
577579
// poller
578580
release(socket, processor, false, false);
579-
} else if (state == SocketState.UPGRADE) {
581+
} else if (state == SocketState.UPGRADED) {
580582
// Need to keep the connection associated with the processor
581-
longPoll(socket, processor);
583+
upgradePoll(socket, processor);
584+
} else if (state == SocketState.UPGRADING) {
585+
// Get the UpgradeInbound handler
586+
UpgradeInbound inbound = processor.getUpgradeInbound();
587+
// Release the Http11 processor to be re-used
588+
release(socket, processor, false, false);
589+
// Create the light-weight upgrade processor
590+
processor = createUpgradeProcessor(socket, inbound);
591+
// Need to keep the connection associated with the processor
592+
upgradePoll(socket, processor);
582593
} else {
583594
// Connection closed. OK to recycle the processor.
584595
release(socket, processor, true, false);
@@ -610,9 +621,12 @@ public SocketState process(SocketWrapper<S> socket,
610621
protected abstract P createProcessor();
611622
protected abstract void initSsl(SocketWrapper<S> socket, P processor);
612623
protected abstract void longPoll(SocketWrapper<S> socket, P processor);
624+
protected abstract void upgradePoll(SocketWrapper<S> socket,
625+
P processor);
613626
protected abstract void release(SocketWrapper<S> socket, P processor,
614627
boolean socketClosing, boolean addToPoller);
615-
628+
protected abstract P createUpgradeProcessor(SocketWrapper<S> socket,
629+
UpgradeInbound inbound) throws IOException;
616630

617631
protected void register(AbstractProcessor<S> processor) {
618632
if (getProtocol().getDomain() != null) {
@@ -645,8 +659,12 @@ protected void unregister(AbstractProcessor<S> processor) {
645659
if (getProtocol().getDomain() != null) {
646660
synchronized (this) {
647661
try {
648-
RequestInfo rp =
649-
processor.getRequest().getRequestProcessor();
662+
Request r = processor.getRequest();
663+
if (r == null) {
664+
// Probably an UpgradeProcessor
665+
return;
666+
}
667+
RequestInfo rp = r.getRequestProcessor();
650668
rp.setGlobalProcessor(null);
651669
ObjectName rpName = rp.getRpName();
652670
if (getLog().isDebugEnabled()) {

java/org/apache/coyote/ajp/AbstractAjpProcessor.java

Lines changed: 9 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -33,6 +33,7 @@
3333
import org.apache.coyote.Request;
3434
import org.apache.coyote.RequestInfo;
3535
import org.apache.coyote.Response;
36+
import org.apache.coyote.http11.upgrade.UpgradeInbound;
3637
import org.apache.juli.logging.Log;
3738
import org.apache.tomcat.util.ExceptionUtils;
3839
import org.apache.tomcat.util.buf.ByteChunk;
@@ -521,6 +522,14 @@ public SocketState upgradeDispatch() throws IOException {
521522
}
522523

523524

525+
@Override
526+
public UpgradeInbound getUpgradeInbound() {
527+
// Should never reach this code but in case we do...
528+
throw new IllegalStateException(
529+
sm.getString("ajpprocessor.httpupgrade.notsupported"));
530+
}
531+
532+
524533
/**
525534
* Recycle the processor, ready for the next request which may be on the
526535
* same connection or a different connection.

java/org/apache/coyote/ajp/AbstractAjpProtocol.java

Lines changed: 13 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -17,6 +17,7 @@
1717
package org.apache.coyote.ajp;
1818

1919
import org.apache.coyote.AbstractProtocol;
20+
import org.apache.coyote.http11.upgrade.UpgradeInbound;
2021
import org.apache.tomcat.util.net.SocketWrapper;
2122
import org.apache.tomcat.util.res.StringManager;
2223

@@ -86,5 +87,17 @@ protected void longPoll(SocketWrapper<S> socket, P processor) {
8687
connections.put(socket.getSocket(), processor);
8788
socket.setAsync(true);
8889
}
90+
91+
@Override
92+
protected void upgradePoll(SocketWrapper<S> socket, P processor) {
93+
// TODO Should never happen. ISE?
94+
}
95+
96+
@Override
97+
protected P createUpgradeProcessor(SocketWrapper<S> socket,
98+
UpgradeInbound inbound) {
99+
// TODO should fail - throw IOE
100+
return null;
101+
}
89102
}
90103
}

0 commit comments

Comments
 (0)