Skip to content

Commit db8d5c5

Browse files
authored
Fix race in FlowControlHandlerTest that could lead to a NPE (#12457)
Motivation: How we accessed the internal state of the handler was not thread safe and so we could see a NPE during testing like: ``` 2022-06-09T10:27:09.1309221Z java.lang.NullPointerException: Cannot invoke "io.netty5.handler.flow.FlowControlHandler$RecyclableArrayDeque.isEmpty()" because "this.queue" is null 2022-06-09T10:27:09.1310056Z at io.netty5.handler.flow.FlowControlHandler.isQueueEmpty(FlowControlHandler.java:90) 2022-06-09T10:27:09.1310829Z at io.netty5.handler.flow.FlowControlHandlerTest.testFlowAutoReadOff(FlowControlHandlerTest.java:370) 2022-06-09T10:27:09.1311575Z at java.base/jdk.internal.reflect.NativeMethodAccessorImpl.invoke0(Native Method) 2022-06-09T10:27:09.1312301Z at java.base/jdk.internal.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:77) 2022-06-09T10:27:09.1313093Z at java.base/jdk.internal.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43) 2022-06-09T10:27:09.1313760Z at java.base/java.lang.reflect.Method.invoke(Method.java:568) 2022-06-09T10:27:09.1314400Z at org.junit.platform.commons.util.ReflectionUtils.invokeMethod(ReflectionUtils.java:725) 2022-06-09T10:27:09.1315137Z at org.junit.jupiter.engine.execution.MethodInvocation.proceed(MethodInvocation.java:60) 2022-06-09T10:27:09.1315983Z at org.junit.jupiter.engine.execution.InvocationInterceptorChain$ValidatingInvocation.proceed(InvocationInterceptorChain.java:131) 2022-06-09T10:27:09.1316828Z at org.junit.jupiter.engine.extension.TimeoutExtension.intercept(TimeoutExtension.java:149) 2022-06-09T10:27:09.1317641Z at org.junit.jupiter.engine.extension.TimeoutExtension.interceptTestableMethod(TimeoutExtension.java:140) 2022-06-09T10:27:09.1318493Z at org.junit.jupiter.engine.extension.TimeoutExtension.interceptTestMethod(TimeoutExtension.java:84) 2022-06-09T10:27:09.1319348Z at org.junit.jupiter.engine.execution.ExecutableInvoker$ReflectiveInterceptorCall.lambda$ofVoidMethod$0(ExecutableInvoker.java:115) 2022-06-09T10:27:09.1320165Z at org.junit.jupiter.engine.execution.ExecutableInvoker.lambda$invoke$0(ExecutableInvoker.java:105) 2022-06-09T10:27:09.1321012Z at org.junit.jupiter.engine.execution.InvocationInterceptorChain$InterceptedInvocation.proceed(InvocationInterceptorChain.java:106) 2022-06-09T10:27:09.1322136Z at org.junit.jupiter.engine.execution.InvocationInterceptorChain.proceed(InvocationInterceptorChain.java:64) 2022-06-09T10:27:09.1323045Z at org.junit.jupiter.engine.execution.InvocationInterceptorChain.chainAndInvoke(InvocationInterceptorChain.java:45) 2022-06-09T10:27:09.1323943Z at org.junit.jupiter.engine.execution.InvocationInterceptorChain.invoke(InvocationInterceptorChain.java:37) 2022-06-09T10:27:09.1324743Z at org.junit.jupiter.engine.execution.ExecutableInvoker.invoke(ExecutableInvoker.java:104) 2022-06-09T10:27:09.1325482Z at org.junit.jupiter.engine.execution.ExecutableInvoker.invoke(ExecutableInvoker.java:98) 2022-06-09T10:27:09.1326315Z at org.junit.jupiter.engine.descriptor.TestMethodTestDescriptor.lambda$invokeTestMethod$7(TestMethodTestDescriptor.java:214) 2022-06-09T10:27:09.1327204Z at org.junit.platform.engine.support.hierarchical.ThrowableCollector.execute(ThrowableCollector.java:73) 2022-06-09T10:27:09.1328123Z at org.junit.jupiter.engine.descriptor.TestMethodTestDescriptor.invokeTestMethod(TestMethodTestDescriptor.java:210) 2022-06-09T10:27:09.1329864Z at org.junit.jupiter.engine.descriptor.TestMethodTestDescriptor.execute(TestMethodTestDescriptor.java:135) 2022-06-09T10:27:09.1330733Z at org.junit.jupiter.engine.descriptor.TestMethodTestDescriptor.execute(TestMethodTestDescriptor.java:66) 2022-06-09T10:27:09.1331591Z at org.junit.platform.engine.support.hierarchical.NodeTestTask.lambda$executeRecursively$6(NodeTestTask.java:151) 2022-06-09T10:27:09.1332454Z at org.junit.platform.engine.support.hierarchical.ThrowableCollector.execute(ThrowableCollector.java:73) 2022-06-09T10:27:09.1333319Z at org.junit.platform.engine.support.hierarchical.NodeTestTask.lambda$executeRecursively$8(NodeTestTask.java:141) 2022-06-09T10:27:09.1334081Z at org.junit.platform.engine.support.hierarchical.Node.around(Node.java:137) 2022-06-09T10:27:09.1334851Z at org.junit.platform.engine.support.hierarchical.NodeTestTask.lambda$executeRecursively$9(NodeTestTask.java:139) 2022-06-09T10:27:09.1335715Z at org.junit.platform.engine.support.hierarchical.ThrowableCollector.execute(ThrowableCollector.java:73) 2022-06-09T10:27:09.1336583Z at org.junit.platform.engine.support.hierarchical.NodeTestTask.executeRecursively(NodeTestTask.java:138) 2022-06-09T10:27:09.1337418Z at org.junit.platform.engine.support.hierarchical.NodeTestTask.execute(NodeTestTask.java:95) 2022-06-09T10:27:09.1338067Z at java.base/java.util.ArrayList.forEach(ArrayList.java:1511) 2022-06-09T10:27:09.1338985Z at org.junit.platform.engine.support.hierarchical.SameThreadHierarchicalTestExecutorService.invokeAll(SameThreadHierarchicalTestExecutorService.java:41) 2022-06-09T10:27:09.1340052Z at org.junit.platform.engine.support.hierarchical.NodeTestTask.lambda$executeRecursively$6(NodeTestTask.java:155) 2022-06-09T10:27:09.1340908Z at org.junit.platform.engine.support.hierarchical.ThrowableCollector.execute(ThrowableCollector.java:73) 2022-06-09T10:27:09.1341965Z at org.junit.platform.engine.support.hierarchical.NodeTestTask.lambda$executeRecursively$8(NodeTestTask.java:141) 2022-06-09T10:27:09.1342739Z at org.junit.platform.engine.support.hierarchical.Node.around(Node.java:137) 2022-06-09T10:27:09.1343508Z at org.junit.platform.engine.support.hierarchical.NodeTestTask.lambda$executeRecursively$9(NodeTestTask.java:139) 2022-06-09T10:27:09.1344444Z at org.junit.platform.engine.support.hierarchical.ThrowableCollector.execute(ThrowableCollector.java:73) 2022-06-09T10:27:09.1345330Z at org.junit.platform.engine.support.hierarchical.NodeTestTask.executeRecursively(NodeTestTask.java:138) 2022-06-09T10:27:09.1346176Z at org.junit.platform.engine.support.hierarchical.NodeTestTask.execute(NodeTestTask.java:95) 2022-06-09T10:27:09.1346868Z at java.base/java.util.ArrayList.forEach(ArrayList.java:1511) 2022-06-09T10:27:09.1347789Z at org.junit.platform.engine.support.hierarchical.SameThreadHierarchicalTestExecutorService.invokeAll(SameThreadHierarchicalTestExecutorService.java:41) 2022-06-09T10:27:09.1348854Z at org.junit.platform.engine.support.hierarchical.NodeTestTask.lambda$executeRecursively$6(NodeTestTask.java:155) 2022-06-09T10:27:09.1349822Z at org.junit.platform.engine.support.hierarchical.ThrowableCollector.execute(ThrowableCollector.java:73) 2022-06-09T10:27:09.1350695Z at org.junit.platform.engine.support.hierarchical.NodeTestTask.lambda$executeRecursively$8(NodeTestTask.java:141) 2022-06-09T10:27:09.1351529Z at org.junit.platform.engine.support.hierarchical.Node.around(Node.java:137) 2022-06-09T10:27:09.1352298Z at org.junit.platform.engine.support.hierarchical.NodeTestTask.lambda$executeRecursively$9(NodeTestTask.java:139) 2022-06-09T10:27:09.1353161Z at org.junit.platform.engine.support.hierarchical.ThrowableCollector.execute(ThrowableCollector.java:73) 2022-06-09T10:27:09.1354049Z at org.junit.platform.engine.support.hierarchical.NodeTestTask.executeRecursively(NodeTestTask.java:138) 2022-06-09T10:27:09.1354894Z at org.junit.platform.engine.support.hierarchical.NodeTestTask.execute(NodeTestTask.java:95) 2022-06-09T10:27:09.1355997Z at org.junit.platform.engine.support.hierarchical.SameThreadHierarchicalTestExecutorService.submit(SameThreadHierarchicalTestExecutorService.java:35) 2022-06-09T10:27:09.1357115Z at org.junit.platform.engine.support.hierarchical.HierarchicalTestExecutor.execute(HierarchicalTestExecutor.java:57) 2022-06-09T10:27:09.1358070Z at org.junit.platform.engine.support.hierarchical.HierarchicalTestEngine.execute(HierarchicalTestEngine.java:54) 2022-06-09T10:27:09.1358898Z at org.junit.platform.launcher.core.DefaultLauncher.execute(DefaultLauncher.java:220) 2022-06-09T10:27:09.1359628Z at org.junit.platform.launcher.core.DefaultLauncher.lambda$execute$6(DefaultLauncher.java:188) 2022-06-09T10:27:09.1360424Z at org.junit.platform.launcher.core.DefaultLauncher.withInterceptedStreams(DefaultLauncher.java:202) 2022-06-09T10:27:09.1361216Z at org.junit.platform.launcher.core.DefaultLauncher.execute(DefaultLauncher.java:181) 2022-06-09T10:27:09.1361937Z at org.junit.platform.launcher.core.DefaultLauncher.execute(DefaultLauncher.java:128) 2022-06-09T10:27:09.1362790Z at org.apache.maven.surefire.junitplatform.JUnitPlatformProvider.invokeAllTests(JUnitPlatformProvider.java:150) 2022-06-09T10:27:09.1363688Z at org.apache.maven.surefire.junitplatform.JUnitPlatformProvider.invoke(JUnitPlatformProvider.java:124) 2022-06-09T10:27:09.1364571Z at org.apache.maven.surefire.booter.ForkedBooter.invokeProviderInSameClassLoader(ForkedBooter.java:384) 2022-06-09T10:27:09.1365399Z at org.apache.maven.surefire.booter.ForkedBooter.runSuitesInProcess(ForkedBooter.java:345) 2022-06-09T10:27:09.1366134Z at org.apache.maven.surefire.booter.ForkedBooter.execute(ForkedBooter.java:126) 2022-06-09T10:27:09.1366804Z at org.apache.maven.surefire.booter.ForkedBooter.main(ForkedBooter.java:418) 2022-06-09T10:27:09.1375974Z ``` Modifications: Dispatch the EventLoop before accessing the state Result: No more NPE possible
1 parent bc085b0 commit db8d5c5

1 file changed

Lines changed: 58 additions & 10 deletions

File tree

handler/src/test/java/io/netty/handler/flow/FlowControlHandlerTest.java

Lines changed: 58 additions & 10 deletions
Original file line numberDiff line numberDiff line change
@@ -44,6 +44,7 @@
4444
import java.net.SocketAddress;
4545
import java.util.List;
4646
import java.util.Queue;
47+
import java.util.concurrent.Callable;
4748
import java.util.concurrent.CountDownLatch;
4849
import java.util.concurrent.Exchanger;
4950
import java.util.concurrent.LinkedBlockingQueue;
@@ -215,25 +216,43 @@ public void channelRead(ChannelHandlerContext ctx, Object msg) {
215216
@Test
216217
public void testFlowAutoReadOn() throws Exception {
217218
final CountDownLatch latch = new CountDownLatch(3);
219+
final Exchanger<Channel> peerRef = new Exchanger<Channel>();
218220

219221
ChannelInboundHandlerAdapter handler = new ChannelDuplexHandler() {
222+
223+
@Override
224+
public void channelActive(ChannelHandlerContext ctx) throws Exception {
225+
peerRef.exchange(ctx.channel(), 1L, SECONDS);
226+
super.channelActive(ctx);
227+
}
228+
220229
@Override
221230
public void channelRead(ChannelHandlerContext ctx, Object msg) {
231+
ReferenceCountUtil.release(msg);
222232
latch.countDown();
223233
}
224234
};
225235

226-
FlowControlHandler flow = new FlowControlHandler();
236+
final FlowControlHandler flow = new FlowControlHandler();
227237
Channel server = newServer(true, flow, handler);
228238
Channel client = newClient(server.localAddress());
229239
try {
240+
// The client connection on the server side
241+
Channel peer = peerRef.exchange(null, 1L, SECONDS);
242+
230243
// Write the message
231244
client.writeAndFlush(newOneMessage())
232245
.syncUninterruptibly();
233246

234247
// We should receive 3 messages
235248
assertTrue(latch.await(1L, SECONDS));
236-
assertTrue(flow.isQueueEmpty());
249+
250+
assertTrue(peer.eventLoop().submit(new Callable<Boolean>() {
251+
@Override
252+
public Boolean call() {
253+
return flow.isQueueEmpty();
254+
}
255+
}).get());
237256
} finally {
238257
client.close();
239258
server.close();
@@ -292,7 +311,7 @@ public void channelRead(ChannelHandlerContext ctx, Object msg) throws Interrupte
292311
}
293312
};
294313

295-
FlowControlHandler flow = new FlowControlHandler();
314+
final FlowControlHandler flow = new FlowControlHandler();
296315
Channel server = newServer(true, flow, handler);
297316
Channel client = newClient(server.localAddress());
298317
try {
@@ -314,7 +333,13 @@ public void channelRead(ChannelHandlerContext ctx, Object msg) throws Interrupte
314333
peer.config().setAutoRead(true);
315334
setAutoReadLatch2.countDown();
316335
assertTrue(msgRcvLatch3.await(1L, SECONDS));
317-
assertTrue(flow.isQueueEmpty());
336+
337+
assertTrue(peer.eventLoop().submit(new Callable<Boolean>() {
338+
@Override
339+
public Boolean call() {
340+
return flow.isQueueEmpty();
341+
}
342+
}).get());
318343
} finally {
319344
client.close();
320345
server.close();
@@ -348,7 +373,7 @@ public void channelRead(ChannelHandlerContext ctx, Object msg) {
348373
}
349374
};
350375

351-
FlowControlHandler flow = new FlowControlHandler();
376+
final FlowControlHandler flow = new FlowControlHandler();
352377
Channel server = newServer(false, flow, handler);
353378
Channel client = newClient(server.localAddress());
354379
try {
@@ -370,7 +395,13 @@ public void channelRead(ChannelHandlerContext ctx, Object msg) {
370395
// channelRead(3)
371396
peer.read();
372397
assertTrue(msgRcvLatch3.await(1L, SECONDS));
373-
assertTrue(flow.isQueueEmpty());
398+
399+
assertTrue(peer.eventLoop().submit(new Callable<Boolean>() {
400+
@Override
401+
public Boolean call() {
402+
return flow.isQueueEmpty();
403+
}
404+
}).get());
374405
} finally {
375406
client.close();
376407
server.close();
@@ -401,7 +432,7 @@ public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) {
401432
}
402433
};
403434

404-
FlowControlHandler flow = new FlowControlHandler();
435+
final FlowControlHandler flow = new FlowControlHandler();
405436
Channel server = newServer(false, flow, handler);
406437
Channel client = newClient(server.localAddress());
407438
try {
@@ -415,7 +446,13 @@ public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) {
415446
// channelRead(1)
416447
peer.read();
417448
assertTrue(latch.await(1L, SECONDS));
418-
assertTrue(flow.isQueueEmpty());
449+
450+
assertTrue(peer.eventLoop().submit(new Callable<Boolean>() {
451+
@Override
452+
public Boolean call() {
453+
return flow.isQueueEmpty();
454+
}
455+
}).get());
419456

420457
Throwable cause = causeRef.get();
421458
if (cause != null) {
@@ -485,11 +522,14 @@ public void userEventTriggered(ChannelHandlerContext ctx, Object evt) {
485522

486523
@Test
487524
public void testRemoveFlowControl() throws Exception {
525+
final Exchanger<Channel> peerRef = new Exchanger<Channel>();
526+
488527
final CountDownLatch latch = new CountDownLatch(3);
489528

490529
ChannelInboundHandlerAdapter handler = new ChannelDuplexHandler() {
491530
@Override
492531
public void channelActive(ChannelHandlerContext ctx) throws Exception {
532+
peerRef.exchange(ctx.channel(), 1L, SECONDS);
493533
//do the first read
494534
ctx.read();
495535
super.channelActive(ctx);
@@ -501,7 +541,7 @@ public void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception
501541
}
502542
};
503543

504-
FlowControlHandler flow = new FlowControlHandler() {
544+
final FlowControlHandler flow = new FlowControlHandler() {
505545
private int num;
506546
@Override
507547
public void channelRead(final ChannelHandlerContext ctx, Object msg) throws Exception {
@@ -530,12 +570,20 @@ public void channelRead(ChannelHandlerContext ctx, Object msg) {
530570
Channel server = newServer(false /* no auto read */, flow, handler, tail);
531571
Channel client = newClient(server.localAddress());
532572
try {
573+
// The client connection on the server side
574+
Channel peer = peerRef.exchange(null, 1L, SECONDS);
575+
533576
// Write one message
534577
client.writeAndFlush(newOneMessage()).sync();
535578

536579
// We should receive 3 messages
537580
assertTrue(latch.await(1L, SECONDS));
538-
assertTrue(flow.isQueueEmpty());
581+
assertTrue(peer.eventLoop().submit(new Callable<Boolean>() {
582+
@Override
583+
public Boolean call() {
584+
return flow.isQueueEmpty();
585+
}
586+
}).get());
539587
} finally {
540588
client.close();
541589
server.close();

0 commit comments

Comments
 (0)