Skip to content

Commit 33a0feb

Browse files
committed
Merged branch 2.6.x.
2 parents 1e907df + 5f2f928 commit 33a0feb

13 files changed

Lines changed: 354 additions & 68 deletions

File tree

cometd-java/bayeux-api/src/main/java/org/cometd/bayeux/server/ConfigurableServerChannel.java

Lines changed: 14 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -76,6 +76,20 @@ public interface Initializer
7676
*/
7777
void setLazy(boolean lazy);
7878

79+
/**
80+
* @return the lazy timeout for this channel
81+
* @see #setLazyTimeout(long)
82+
*/
83+
long getLazyTimeout();
84+
85+
/**
86+
* Sets the lazy timeout for this channel.
87+
* A positive value makes the channel lazy, a negative value makes the channel non-lazy.
88+
* @param lazyTimeout the lazy timeout for this channel
89+
* @see #setLazy(boolean)
90+
*/
91+
void setLazyTimeout(long lazyTimeout);
92+
7993
/**
8094
* @return whether the channel is persistent
8195
* @see #setPersistent(boolean)

cometd-java/cometd-java-annotations/src/main/java/org/cometd/annotation/AnnotationCometDServlet.java

Lines changed: 4 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -79,7 +79,7 @@ protected Object processService(ServerAnnotationProcessor processor, String serv
7979
{
8080
Object service = newService(serviceClassName);
8181
processor.process(service);
82-
_logger.info("Processed annotated service {}", service);
82+
_logger.debug("Processed annotated service {}", service);
8383
return service;
8484
}
8585
catch (Exception x)
@@ -97,7 +97,7 @@ protected Object newService(String serviceClassName) throws Exception
9797
protected void registerService(Object service)
9898
{
9999
getServletContext().setAttribute(service.getClass().getName(), service);
100-
_logger.info("Registered annotated service {} in servlet context", service);
100+
_logger.debug("Registered annotated service {} in servlet context", service);
101101
}
102102

103103
@Override
@@ -114,13 +114,13 @@ public void destroy()
114114
protected void deregisterService(Object service)
115115
{
116116
getServletContext().removeAttribute(service.getClass().getName());
117-
_logger.info("Deregistered annotated service {}", service);
117+
_logger.debug("Deregistered annotated service {}", service);
118118
}
119119

120120
protected void deprocessService(ServerAnnotationProcessor processor, Object service)
121121
{
122122
processor.deprocess(service);
123-
_logger.info("Deprocessed annotated service {}", service);
123+
_logger.debug("Deprocessed annotated service {}", service);
124124
}
125125

126126
protected List<Object> getServices()
Lines changed: 238 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,238 @@
1+
/*
2+
* Copyright (c) 2010 the original author or authors.
3+
*
4+
* Licensed under the Apache License, Version 2.0 (the "License");
5+
* you may not use this file except in compliance with the License.
6+
* You may obtain a copy of the License at
7+
*
8+
* http://www.apache.org/licenses/LICENSE-2.0
9+
*
10+
* Unless required by applicable law or agreed to in writing, software
11+
* distributed under the License is distributed on an "AS IS" BASIS,
12+
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13+
* See the License for the specific language governing permissions and
14+
* limitations under the License.
15+
*/
16+
17+
package org.cometd.client;
18+
19+
import java.util.HashMap;
20+
import java.util.concurrent.CountDownLatch;
21+
import java.util.concurrent.TimeUnit;
22+
import java.util.concurrent.atomic.AtomicLong;
23+
24+
import org.cometd.bayeux.Channel;
25+
import org.cometd.bayeux.Message;
26+
import org.cometd.bayeux.client.ClientSessionChannel;
27+
import org.cometd.bayeux.server.ConfigurableServerChannel;
28+
import org.cometd.server.AbstractServerTransport;
29+
import org.junit.Assert;
30+
import org.junit.Test;
31+
32+
public class LazyChannelAndMessageTest extends ClientServerTest
33+
{
34+
@Test
35+
public void testLazyChannelWithGlobalTimeout() throws Exception
36+
{
37+
final long globalLazyTimeout = 1000;
38+
startServer(new HashMap<String, String>(){{
39+
put(AbstractServerTransport.RANDOMIZE_LAZY_TIMEOUT_OPTION, String.valueOf(false));
40+
put(AbstractServerTransport.MAX_LAZY_TIMEOUT_OPTION, String.valueOf(globalLazyTimeout));
41+
}});
42+
43+
String channelName = "/testLazy";
44+
bayeux.createIfAbsent(channelName, new ConfigurableServerChannel.Initializer()
45+
{
46+
public void configureChannel(ConfigurableServerChannel channel)
47+
{
48+
channel.setPersistent(true);
49+
channel.setLazy(true);
50+
}
51+
});
52+
53+
BayeuxClient client = newBayeuxClient();
54+
client.handshake();
55+
client.waitFor(5000, BayeuxClient.State.CONNECTED);
56+
57+
final CountDownLatch subscribeLatch = new CountDownLatch(1);
58+
client.getChannel(Channel.META_SUBSCRIBE).addListener(new ClientSessionChannel.MessageListener()
59+
{
60+
public void onMessage(ClientSessionChannel channel, Message message)
61+
{
62+
subscribeLatch.countDown();
63+
}
64+
});
65+
final CountDownLatch latch = new CountDownLatch(1);
66+
final AtomicLong begin = new AtomicLong();
67+
client.getChannel(channelName).subscribe(new ClientSessionChannel.MessageListener()
68+
{
69+
public void onMessage(ClientSessionChannel channel, Message message)
70+
{
71+
if (message.getDataAsMap() == null)
72+
return;
73+
long elapsed = TimeUnit.NANOSECONDS.toMillis(System.nanoTime() - begin.get());
74+
long accuracy = globalLazyTimeout / 10;
75+
Assert.assertTrue("Expected " + elapsed + " >= " + globalLazyTimeout, elapsed >= globalLazyTimeout - accuracy);
76+
latch.countDown();
77+
}
78+
});
79+
// Make sure we are subscribed so that there are no
80+
// pending responses that may return the lazy message
81+
Assert.assertTrue(subscribeLatch.await(5, TimeUnit.SECONDS));
82+
83+
// Wait for the /meta/connect to establish
84+
TimeUnit.MILLISECONDS.sleep(1000);
85+
86+
begin.set(System.nanoTime());
87+
// Cannot publish from the client, as there will always be the "meta"
88+
// publish response to send, so the lazy message will be sent with it.
89+
bayeux.getChannel(channelName).publish(null, new HashMap<String, Object>());
90+
91+
Assert.assertTrue(latch.await(2 * globalLazyTimeout, TimeUnit.MILLISECONDS));
92+
93+
disconnectBayeuxClient(client);
94+
}
95+
96+
@Test
97+
public void testLazyChannelWithChannelTimeout() throws Exception
98+
{
99+
final long channelLazyTimeout = 1000;
100+
final long globalLazyTimeout = channelLazyTimeout * 4;
101+
startServer(new HashMap<String, String>(){{
102+
put(AbstractServerTransport.RANDOMIZE_LAZY_TIMEOUT_OPTION, String.valueOf(false));
103+
put(AbstractServerTransport.MAX_LAZY_TIMEOUT_OPTION, String.valueOf(globalLazyTimeout));
104+
}});
105+
106+
String channelName = "/testLazy";
107+
bayeux.createIfAbsent(channelName, new ConfigurableServerChannel.Initializer()
108+
{
109+
public void configureChannel(ConfigurableServerChannel channel)
110+
{
111+
channel.setLazyTimeout(channelLazyTimeout);
112+
channel.setPersistent(true);
113+
}
114+
});
115+
116+
BayeuxClient client = newBayeuxClient();
117+
client.handshake();
118+
client.waitFor(5000, BayeuxClient.State.CONNECTED);
119+
120+
final CountDownLatch subscribeLatch = new CountDownLatch(1);
121+
client.getChannel(Channel.META_SUBSCRIBE).addListener(new ClientSessionChannel.MessageListener()
122+
{
123+
public void onMessage(ClientSessionChannel channel, Message message)
124+
{
125+
subscribeLatch.countDown();
126+
}
127+
});
128+
final CountDownLatch latch = new CountDownLatch(1);
129+
final AtomicLong begin = new AtomicLong();
130+
client.getChannel(channelName).subscribe(new ClientSessionChannel.MessageListener()
131+
{
132+
public void onMessage(ClientSessionChannel channel, Message message)
133+
{
134+
if (message.getDataAsMap() == null)
135+
return;
136+
long elapsed = TimeUnit.NANOSECONDS.toMillis(System.nanoTime() - begin.get());
137+
Assert.assertTrue(elapsed < globalLazyTimeout / 2);
138+
long accuracy = channelLazyTimeout / 10;
139+
Assert.assertTrue("Expected " + elapsed + " >= " + channelLazyTimeout, elapsed >= channelLazyTimeout - accuracy);
140+
latch.countDown();
141+
}
142+
});
143+
// Make sure we are subscribed so that there are no
144+
// pending responses that may return the lazy message
145+
Assert.assertTrue(subscribeLatch.await(5, TimeUnit.SECONDS));
146+
147+
// Wait for the /meta/connect to establish
148+
TimeUnit.MILLISECONDS.sleep(1000);
149+
150+
begin.set(System.nanoTime());
151+
// Cannot publish from the client, as there will always be the "meta"
152+
// publish response to send, so the lazy message will be sent with it.
153+
bayeux.getChannel(channelName).publish(null, new HashMap<String, Object>());
154+
155+
Assert.assertTrue(latch.await(2 * globalLazyTimeout, TimeUnit.MILLISECONDS));
156+
157+
disconnectBayeuxClient(client);
158+
}
159+
160+
@Test
161+
public void testLazyChannelsWithDifferentChannelTimeouts() throws Exception
162+
{
163+
final long channelLazyTimeout = 1000;
164+
final long globalLazyTimeout = channelLazyTimeout * 4;
165+
startServer(new HashMap<String, String>(){{
166+
put(AbstractServerTransport.RANDOMIZE_LAZY_TIMEOUT_OPTION, String.valueOf(false));
167+
put(AbstractServerTransport.MAX_LAZY_TIMEOUT_OPTION, String.valueOf(globalLazyTimeout));
168+
}});
169+
170+
String shortLazyChannelName = "/shortLazy";
171+
bayeux.createIfAbsent(shortLazyChannelName, new ConfigurableServerChannel.Initializer()
172+
{
173+
public void configureChannel(ConfigurableServerChannel channel)
174+
{
175+
channel.setLazyTimeout(channelLazyTimeout);
176+
channel.setPersistent(true);
177+
}
178+
});
179+
180+
String longLazyChannelName = "/longLazy";
181+
bayeux.createIfAbsent(longLazyChannelName, new ConfigurableServerChannel.Initializer()
182+
{
183+
public void configureChannel(ConfigurableServerChannel channel)
184+
{
185+
channel.setLazyTimeout(globalLazyTimeout);
186+
channel.setPersistent(true);
187+
}
188+
});
189+
190+
BayeuxClient client = newBayeuxClient();
191+
client.handshake();
192+
client.waitFor(5000, BayeuxClient.State.CONNECTED);
193+
194+
final CountDownLatch subscribeLatch = new CountDownLatch(2);
195+
client.getChannel(Channel.META_SUBSCRIBE).addListener(new ClientSessionChannel.MessageListener()
196+
{
197+
public void onMessage(ClientSessionChannel channel, Message message)
198+
{
199+
subscribeLatch.countDown();
200+
}
201+
});
202+
final CountDownLatch latch = new CountDownLatch(1);
203+
final AtomicLong begin = new AtomicLong();
204+
ClientSessionChannel.MessageListener messageListener = new ClientSessionChannel.MessageListener()
205+
{
206+
public void onMessage(ClientSessionChannel channel, Message message)
207+
{
208+
if (message.getDataAsMap() == null)
209+
return;
210+
long elapsed = TimeUnit.NANOSECONDS.toMillis(System.nanoTime() - begin.get());
211+
Assert.assertTrue(elapsed < globalLazyTimeout / 2);
212+
long accuracy = channelLazyTimeout / 10;
213+
Assert.assertTrue("Expected " + elapsed + " >= " + channelLazyTimeout, elapsed >= channelLazyTimeout - accuracy);
214+
latch.countDown();
215+
}
216+
};
217+
client.getChannel(shortLazyChannelName).subscribe(messageListener);
218+
client.getChannel(longLazyChannelName).subscribe(messageListener);
219+
// Make sure we are subscribed so that there are no
220+
// pending responses that may return the lazy message
221+
Assert.assertTrue(subscribeLatch.await(5, TimeUnit.SECONDS));
222+
223+
// Wait for the /meta/connect to establish
224+
TimeUnit.MILLISECONDS.sleep(1000);
225+
226+
begin.set(System.nanoTime());
227+
// Cannot publish from the client, as there will always be the "meta"
228+
// publish response to send, so the lazy message will be sent with it.
229+
// Send first the long lazy and then the short lazy, to verify that
230+
// timeouts are properly respected.
231+
bayeux.getChannel(longLazyChannelName).publish(null, new HashMap<String, Object>());
232+
bayeux.getChannel(shortLazyChannelName).publish(null, new HashMap<String, Object>());
233+
234+
Assert.assertTrue(latch.await(2 * globalLazyTimeout, TimeUnit.MILLISECONDS));
235+
236+
disconnectBayeuxClient(client);
237+
}
238+
}

cometd-java/cometd-java-client/src/test/java/org/cometd/client/PublishCallbackTest.java

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -21,14 +21,14 @@
2121
import java.util.concurrent.TimeUnit;
2222
import java.util.concurrent.atomic.AtomicReference;
2323

24-
import junit.framework.Assert;
2524
import org.cometd.bayeux.Message;
2625
import org.cometd.bayeux.client.ClientSessionChannel;
2726
import org.cometd.bayeux.server.BayeuxServer;
2827
import org.cometd.bayeux.server.ServerChannel;
2928
import org.cometd.bayeux.server.ServerMessage;
3029
import org.cometd.bayeux.server.ServerSession;
3130
import org.cometd.server.DefaultSecurityPolicy;
31+
import org.junit.Assert;
3232
import org.junit.Before;
3333
import org.junit.Test;
3434

cometd-java/cometd-java-oort/src/main/java/org/cometd/oort/Oort.java

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -841,7 +841,7 @@ public void removed(ServerSession session, boolean timeout)
841841
ServerCometInfo serverCometInfo = cometInfos.next();
842842
if (serverCometInfo.getServerSession().getId().equals(session.getId()))
843843
{
844-
_logger.info("Disconnected from comet {} with session {}", cometURL, session);
844+
_logger.debug("Disconnected from comet {} with session {}", cometURL, session);
845845
assert remoteOortId.equals(serverCometInfo.getId());
846846
cometInfos.remove();
847847

@@ -992,7 +992,7 @@ public void onMessage(ClientSessionChannel channel, Message message)
992992
}
993993

994994
if (message.isSuccessful())
995-
getLogger().info("Connected to comet {} as {} with {}/{}", url, cometURL, message.getClientId(), oortComet.getTransport());
995+
getLogger().debug("Connected to comet {} as {} with {}/{}", url, cometURL, message.getClientId(), oortComet.getTransport());
996996
}
997997
}
998998

cometd-java/cometd-java-oort/src/main/java/org/cometd/oort/OortMulticastConfigurer.java

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -183,7 +183,7 @@ public void run()
183183
byte[] cometURLBytes = cometURL.getBytes("UTF-8");
184184
if (cometURLBytes.length > MTU)
185185
{
186-
logger.info("Oort URL {} exceeds max transmission unit and will not be advertised", cometURL);
186+
logger.warn("Oort URL {} exceeds max transmission unit and will not be advertised", cometURL);
187187
return;
188188
}
189189

cometd-java/cometd-java-server/src/main/java/org/cometd/server/AbstractServerTransport.java

Lines changed: 3 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -42,7 +42,8 @@ public abstract class AbstractServerTransport extends AbstractTransport implemen
4242
public static final String TIMEOUT_OPTION = "timeout";
4343
public static final String INTERVAL_OPTION = "interval";
4444
public static final String MAX_INTERVAL_OPTION = "maxInterval";
45-
public static final String MAX_LAZY_OPTION = "maxLazyTimeout";
45+
public static final String MAX_LAZY_TIMEOUT_OPTION = "maxLazyTimeout";
46+
public static final String RANDOMIZE_LAZY_TIMEOUT_OPTION = "randomizeLazyTimeout";
4647
public static final String META_CONNECT_DELIVERY_OPTION = "metaConnectDeliverOnly";
4748
public static final String MAX_QUEUE_OPTION = "maxQueue";
4849

@@ -136,7 +137,7 @@ protected void init()
136137
_interval = getOption(INTERVAL_OPTION, _interval);
137138
_maxInterval = getOption(MAX_INTERVAL_OPTION, _maxInterval);
138139
_timeout = getOption(TIMEOUT_OPTION, _timeout);
139-
_maxLazyTimeout = getOption(MAX_LAZY_OPTION, _maxLazyTimeout);
140+
_maxLazyTimeout = getOption(MAX_LAZY_TIMEOUT_OPTION, _maxLazyTimeout);
140141
_metaConnectDeliveryOnly = getOption(META_CONNECT_DELIVERY_OPTION, _metaConnectDeliveryOnly);
141142
jsonContext = (JSONContext.Server)getOption(BayeuxServerImpl.JSON_CONTEXT);
142143
}

cometd-java/cometd-java-server/src/main/java/org/cometd/server/ServerChannelImpl.java

Lines changed: 14 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -52,6 +52,7 @@ public class ServerChannelImpl implements ServerChannel
5252
private final Set<ServerChannelImpl> _children = new ConcurrentHashSet<>();
5353
private final ServerChannelImpl _parent;
5454
private boolean _lazy;
55+
private long _lazyTimeout = -1;
5556
private boolean _persistent;
5657

5758
protected ServerChannelImpl(BayeuxServerImpl bayeux, ChannelId id, ServerChannelImpl parent)
@@ -240,6 +241,19 @@ public boolean isWild()
240241
public void setLazy(boolean lazy)
241242
{
242243
_lazy = lazy;
244+
if (!lazy)
245+
_lazyTimeout = -1;
246+
}
247+
248+
public long getLazyTimeout()
249+
{
250+
return _lazyTimeout;
251+
}
252+
253+
public void setLazyTimeout(long lazyTimeout)
254+
{
255+
_lazyTimeout = lazyTimeout;
256+
setLazy(lazyTimeout > 0);
243257
}
244258

245259
public void setPersistent(boolean persistent)

0 commit comments

Comments
 (0)