|
16 | 16 |
|
17 | 17 | package org.cometd.server; |
18 | 18 |
|
| 19 | +import java.lang.reflect.Constructor; |
19 | 20 | import java.security.SecureRandom; |
20 | 21 | import java.util.ArrayList; |
21 | 22 | import java.util.Arrays; |
22 | 23 | import java.util.Collections; |
23 | 24 | import java.util.HashSet; |
| 25 | +import java.util.LinkedHashMap; |
24 | 26 | import java.util.List; |
25 | 27 | import java.util.ListIterator; |
26 | 28 | import java.util.Map; |
|
30 | 32 | import java.util.concurrent.ConcurrentMap; |
31 | 33 | import java.util.concurrent.CopyOnWriteArrayList; |
32 | 34 | import java.util.concurrent.TimeUnit; |
| 35 | +import javax.servlet.http.HttpServletRequest; |
33 | 36 |
|
34 | 37 | import org.cometd.bayeux.Channel; |
35 | 38 | import org.cometd.bayeux.ChannelId; |
|
49 | 52 | import org.cometd.bayeux.server.ServerSession; |
50 | 53 | import org.cometd.bayeux.server.ServerTransport; |
51 | 54 | import org.cometd.common.JSONContext; |
52 | | -import org.cometd.server.transport.AsyncJSONTransport; |
| 55 | +import org.cometd.server.transport.HttpTransport; |
53 | 56 | import org.cometd.server.transport.JSONPTransport; |
| 57 | +import org.cometd.server.transport.JSONTransport; |
54 | 58 | import org.eclipse.jetty.util.annotation.ManagedAttribute; |
55 | 59 | import org.eclipse.jetty.util.annotation.ManagedObject; |
56 | 60 | import org.eclipse.jetty.util.annotation.ManagedOperation; |
|
64 | 68 | @ManagedObject("The CometD server") |
65 | 69 | public class BayeuxServerImpl extends AbstractLifeCycle implements BayeuxServer |
66 | 70 | { |
| 71 | + public static final String ALLOWED_TRANSPORTS_OPTION = "allowedTransports"; |
67 | 72 | public static final String SWEEP_PERIOD_OPTION = "sweepPeriod"; |
| 73 | + public static final String TRANSPORTS_OPTION = "transports"; |
68 | 74 |
|
69 | 75 | private final Logger _logger = LoggerFactory.getLogger(getClass().getName() + "." + Integer.toHexString(System.identityHashCode(this))); |
70 | 76 | private final SecureRandom _random = new SecureRandom(); |
71 | 77 | private final List<BayeuxServerListener> _listeners = new CopyOnWriteArrayList<>(); |
72 | 78 | private final List<Extension> _extensions = new CopyOnWriteArrayList<>(); |
73 | 79 | private final ConcurrentMap<String, ServerSessionImpl> _sessions = new ConcurrentHashMap<>(); |
74 | 80 | private final ConcurrentMap<String, ServerChannelImpl> _channels = new ConcurrentHashMap<>(); |
75 | | - private final ConcurrentMap<String, ServerTransport> _transports = new ConcurrentHashMap<>(); |
76 | | - private final List<String> _allowedTransports = new CopyOnWriteArrayList<>(); |
| 81 | + private final Map<String, ServerTransport> _transports = new LinkedHashMap<>(); // Order is important |
| 82 | + private final List<String> _allowedTransports = new ArrayList<>(); |
77 | 83 | private final ThreadLocal<AbstractServerTransport> _currentTransport = new ThreadLocal<>(); |
78 | 84 | private final Map<String, Object> _options = new TreeMap<>(); |
79 | 85 | private final Scheduler _scheduler = new ScheduledExecutorScheduler("BayeuxServer" + hashCode() + " Scheduler", false); |
80 | 86 | private SecurityPolicy _policy = new DefaultSecurityPolicy(); |
81 | 87 | private JSONContext.Server _jsonContext; |
82 | 88 |
|
83 | | - public BayeuxServerImpl() |
84 | | - { |
85 | | - // TODO: consider making this module dependent on WebSocket and add the WS transport too ? |
86 | | - addTransport(new AsyncJSONTransport(this)); |
87 | | -// addTransport(new JSONTransport(this)); |
88 | | - addTransport(new JSONPTransport(this)); |
89 | | - } |
90 | | - |
91 | 89 | @Override |
92 | 90 | protected void doStart() throws Exception |
93 | 91 | { |
94 | 92 | super.doStart(); |
95 | 93 |
|
96 | 94 | initializeMetaChannels(); |
97 | 95 | initializeJSONContext(); |
98 | | - initializeAllowedTransports(); |
99 | | - |
100 | | - List<String> allowedTransportNames = getAllowedTransports(); |
101 | | - if (allowedTransportNames.isEmpty()) |
102 | | - throw new IllegalStateException("No allowed transport names are configured, there must be at least one"); |
103 | | - |
104 | | - for (String allowedTransportName : allowedTransportNames) |
105 | | - { |
106 | | - ServerTransport allowedTransport = getTransport(allowedTransportName); |
107 | | - if (allowedTransport instanceof AbstractServerTransport) |
108 | | - ((AbstractServerTransport)allowedTransport).init(); |
109 | | - } |
| 96 | + initializeServerTransports(); |
110 | 97 |
|
111 | 98 | _scheduler.start(); |
112 | 99 |
|
@@ -186,14 +173,92 @@ else if (option instanceof JSONContext.Server) |
186 | 173 | _options.put(AbstractServerTransport.JSON_CONTEXT_OPTION, _jsonContext); |
187 | 174 | } |
188 | 175 |
|
189 | | - protected void initializeAllowedTransports() |
| 176 | + protected void initializeServerTransports() |
| 177 | + { |
| 178 | + if (_transports.isEmpty()) |
| 179 | + { |
| 180 | + String option = (String)getOption(TRANSPORTS_OPTION); |
| 181 | + if (option == null) |
| 182 | + { |
| 183 | + // Order is important, see #findHttpTransport() |
| 184 | + ServerTransport transport = newWebSocketTransport(); |
| 185 | + if (transport != null) |
| 186 | + addTransport(transport); |
| 187 | + addTransport(new JSONTransport(this)); |
| 188 | + addTransport(new JSONPTransport(this)); |
| 189 | + } |
| 190 | + else |
| 191 | + { |
| 192 | + for (String className : option.split(",")) |
| 193 | + { |
| 194 | + ServerTransport transport = newServerTransport(className.trim()); |
| 195 | + if (transport != null) |
| 196 | + addTransport(transport); |
| 197 | + } |
| 198 | + |
| 199 | + if (_transports.isEmpty()) |
| 200 | + throw new IllegalArgumentException("Option '" + TRANSPORTS_OPTION + |
| 201 | + "' does not contain a valid list of server transport class names"); |
| 202 | + } |
| 203 | + } |
| 204 | + |
| 205 | + if (_allowedTransports.isEmpty()) |
| 206 | + { |
| 207 | + String option = (String)getOption(ALLOWED_TRANSPORTS_OPTION); |
| 208 | + if (option == null) |
| 209 | + { |
| 210 | + _allowedTransports.addAll(_transports.keySet()); |
| 211 | + } |
| 212 | + else |
| 213 | + { |
| 214 | + for (String transportName : option.split(",")) |
| 215 | + { |
| 216 | + if (_transports.containsKey(transportName)) |
| 217 | + _allowedTransports.add(transportName); |
| 218 | + } |
| 219 | + |
| 220 | + if (_allowedTransports.isEmpty()) |
| 221 | + throw new IllegalArgumentException("Option '" + ALLOWED_TRANSPORTS_OPTION + |
| 222 | + "' does not contain at least one configured server transport name"); |
| 223 | + } |
| 224 | + } |
| 225 | + |
| 226 | + for (String transportName : _allowedTransports) |
| 227 | + { |
| 228 | + ServerTransport serverTransport = getTransport(transportName); |
| 229 | + if (serverTransport instanceof AbstractServerTransport) |
| 230 | + ((AbstractServerTransport)serverTransport).init(); |
| 231 | + } |
| 232 | + } |
| 233 | + |
| 234 | + private ServerTransport newWebSocketTransport() |
| 235 | + { |
| 236 | + try |
| 237 | + { |
| 238 | + ClassLoader loader = Thread.currentThread().getContextClassLoader(); |
| 239 | + loader.loadClass("javax.websocket.server.ServerContainer"); |
| 240 | + return newServerTransport("org.cometd.websocket.server.WebSocketTransport"); |
| 241 | + } |
| 242 | + catch (Exception x) |
| 243 | + { |
| 244 | + return null; |
| 245 | + } |
| 246 | + } |
| 247 | + |
| 248 | + private ServerTransport newServerTransport(String className) |
190 | 249 | { |
191 | | - if (_allowedTransports.size() == 0) |
| 250 | + try |
192 | 251 | { |
193 | | - for (ServerTransport t : _transports.values()) |
194 | | - _allowedTransports.add(t.getName()); |
| 252 | + ClassLoader loader = Thread.currentThread().getContextClassLoader(); |
| 253 | + @SuppressWarnings("unchecked") |
| 254 | + Class<? extends ServerTransport> klass = (Class<? extends ServerTransport>)loader.loadClass(className); |
| 255 | + Constructor<? extends ServerTransport> constructor = klass.getConstructor(BayeuxServerImpl.class); |
| 256 | + return constructor.newInstance(this); |
| 257 | + } |
| 258 | + catch (Exception x) |
| 259 | + { |
| 260 | + return null; |
195 | 261 | } |
196 | | - _logger.debug("Allowed Transports: {}", _allowedTransports); |
197 | 262 | } |
198 | 263 |
|
199 | 264 | public Scheduler.Task schedule(Runnable task, long delay) |
@@ -995,6 +1060,24 @@ public List<ServerTransport> getTransports() |
995 | 1060 | return new ArrayList<>(_transports.values()); |
996 | 1061 | } |
997 | 1062 |
|
| 1063 | + @SuppressWarnings("ForLoopReplaceableByForEach") |
| 1064 | + protected HttpTransport findHttpTransport(HttpServletRequest request) |
| 1065 | + { |
| 1066 | + // Avoid allocation of the Iterator |
| 1067 | + for (int i = 0; i < _allowedTransports.size(); ++i) |
| 1068 | + { |
| 1069 | + String transportName = _allowedTransports.get(i); |
| 1070 | + ServerTransport serverTransport = getTransport(transportName); |
| 1071 | + if (serverTransport instanceof HttpTransport) |
| 1072 | + { |
| 1073 | + HttpTransport transport = (HttpTransport)serverTransport; |
| 1074 | + if (transport.accept(request)) |
| 1075 | + return transport; |
| 1076 | + } |
| 1077 | + } |
| 1078 | + return null; |
| 1079 | + } |
| 1080 | + |
998 | 1081 | @ManagedAttribute(value = "The transports allowed by this server", readonly = true) |
999 | 1082 | public List<String> getAllowedTransports() |
1000 | 1083 | { |
|
0 commit comments