Skip to content

Commit 9b3a541

Browse files
authored
Fix timeout when auto read is disabled late in io_uring (#14989)
Motivation: If auto read is disabled late (during a channelRead), but there is a read call during the next channelReadComplete, that read call would be ignored, leading to the channel freezing up and the connection timing out. Modification: When clearRead cancels the scheduled POLLIN, clear the iostate immediately, so that the next `read()` will correctly schedule a new POLLIN. Not sure if this is the proper way to do it. Maybe this should go into cancelComplete0 instead? But then we'd have to add some mechanism to hold back the read call. I think the risk of duplicate reads that this change adds is better than no reads. Result: Echo server does not freeze up anymore after turning off auto read during a read operation.
1 parent 55c0d56 commit 9b3a541

2 files changed

Lines changed: 82 additions & 0 deletions

File tree

transport-classes-io_uring/src/main/java/io/netty/channel/uring/AbstractIoUringChannel.java

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -184,6 +184,7 @@ private void clearRead() {
184184
long id = registration.submit(
185185
IoUringIoOps.newAsyncCancel((byte) 0, pollInId, Native.IORING_OP_POLL_ADD));
186186
assert id != 0;
187+
ioState &= ~POLL_IN_SCHEDULED;
187188
}
188189
// Also cancel all outstanding reads as the user did signal there is no more desire to read.
189190
cancelOutstandingReads(registration(), numOutstandingReads);
Lines changed: 81 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,81 @@
1+
/*
2+
* Copyright 2025 The Netty Project
3+
*
4+
* The Netty Project licenses this file to you under the Apache License,
5+
* version 2.0 (the "License"); you may not use this file except in compliance
6+
* with the License. You may obtain a copy of the License at:
7+
*
8+
* https://www.apache.org/licenses/LICENSE-2.0
9+
*
10+
* Unless required by applicable law or agreed to in writing, software
11+
* distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
12+
* WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
13+
* License for the specific language governing permissions and limitations
14+
* under the License.
15+
*/
16+
package io.netty.channel.uring;
17+
18+
import io.netty.bootstrap.ServerBootstrap;
19+
import io.netty.channel.ChannelHandlerContext;
20+
import io.netty.channel.ChannelInboundHandlerAdapter;
21+
import io.netty.channel.IoEventLoopGroup;
22+
import io.netty.channel.MultiThreadIoEventLoopGroup;
23+
import io.netty.channel.socket.ServerSocketChannel;
24+
import org.junit.jupiter.api.Assertions;
25+
import org.junit.jupiter.api.BeforeAll;
26+
import org.junit.jupiter.api.Test;
27+
import org.junit.jupiter.api.Timeout;
28+
29+
import java.io.InputStream;
30+
import java.io.OutputStream;
31+
import java.net.Socket;
32+
import java.util.concurrent.TimeUnit;
33+
34+
import static org.junit.jupiter.api.Assumptions.assumeTrue;
35+
36+
public class IoUringAutoReadTest {
37+
38+
@BeforeAll
39+
public static void loadJNI() {
40+
assumeTrue(IoUring.isAvailable());
41+
}
42+
43+
@Test
44+
@Timeout(value = 1, unit = TimeUnit.MINUTES)
45+
public void testLateAutoRead() throws Exception {
46+
IoEventLoopGroup group = new MultiThreadIoEventLoopGroup(1, IoUringIoHandler.newFactory());
47+
try {
48+
ServerSocketChannel server = (ServerSocketChannel) new ServerBootstrap()
49+
.group(group)
50+
.channel(IoUringServerSocketChannel.class)
51+
.childHandler(new ChannelInboundHandlerAdapter() {
52+
@Override
53+
public void channelRead(ChannelHandlerContext ctx, Object msg) {
54+
ctx.channel().config().setAutoRead(false);
55+
ctx.writeAndFlush(msg, ctx.voidPromise());
56+
}
57+
58+
@Override
59+
public void channelReadComplete(ChannelHandlerContext ctx) {
60+
ctx.read();
61+
}
62+
})
63+
.bind(0).sync().channel();
64+
65+
try (Socket sock = new Socket(server.localAddress().getAddress(), server.localAddress().getPort())) {
66+
OutputStream out = sock.getOutputStream();
67+
InputStream in = sock.getInputStream();
68+
69+
out.write(1);
70+
out.flush();
71+
Assertions.assertEquals(1, in.read());
72+
73+
out.write(2);
74+
out.flush();
75+
Assertions.assertEquals(2, in.read());
76+
}
77+
} finally {
78+
group.shutdownGracefully();
79+
}
80+
}
81+
}

0 commit comments

Comments
 (0)