Skip to content

Commit 5b13ecd

Browse files
authored
Only allow auto-scaling of EventLoop's when IoHandler supports it. (#15603)
Motivation: We recently added the feature of auto-scaling event-loops to be able to have a more elastic setup. While this works out in most cases some transport implementations might not allow it as it is not possible to change threads due performance reasons. One of these examples is atm io_uring. Here we require that the thread will never change during the life-time of the ring as we setup the ring with IORING_SETUP_SINGLE_ISSUER for performance reasons. Changing the thread results in have io_uring_enter fail with EEXIST. Modifications: - Add new default method to IoHandlerFactory which can be used to either signal support or no support for changing the Thread during the life-time of IoHandler instances that are created via the factory. - Override the method for Nio, Epoll and Kqueue to signal that changing is supported and so our current auto-scaling implementation can be used for these. - Don't override the method for IoUring as we can't support it with the current setup. Result: No more EEXIST errors when using io_uring with auto-scaling. This is related to #15524
1 parent 81d813d commit 5b13ecd

8 files changed

Lines changed: 75 additions & 16 deletions

File tree

testsuite/src/main/java/io/netty/testsuite/transport/AbstractSingleThreadEventLoopTest.java

Lines changed: 6 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -324,6 +324,9 @@ public void run() {
324324
@Timeout(30)
325325
public void testAutoScalingEventLoopGroupCanScaleDownAndBeUsed() throws Exception {
326326
EventLoopGroup group = newAutoScalingEventLoopGroup();
327+
if (group == null) {
328+
return;
329+
}
327330
try {
328331
startAllExecutors(group);
329332
assertEquals(SCALING_MAX_THREADS, countActiveExecutors(group),
@@ -345,6 +348,9 @@ public void testAutoScalingEventLoopGroupCanScaleDownAndBeUsed() throws Exceptio
345348
@Timeout(30)
346349
public void testSubmittingTaskWakesUpSuspendedExecutor() throws Exception {
347350
EventLoopGroup group = newAutoScalingEventLoopGroup();
351+
if (group == null) {
352+
return;
353+
}
348354
try {
349355
startAllExecutors(group);
350356
assertEquals(SCALING_MAX_THREADS, countActiveExecutors(group),

transport-classes-epoll/src/main/java/io/netty/channel/epoll/EpollIoHandler.java

Lines changed: 11 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -110,7 +110,17 @@ public static IoHandlerFactory newFactory(final int maxEvents,
110110
final SelectStrategyFactory selectStrategyFactory) {
111111
ObjectUtil.checkPositiveOrZero(maxEvents, "maxEvents");
112112
ObjectUtil.checkNotNull(selectStrategyFactory, "selectStrategyFactory");
113-
return executor -> new EpollIoHandler(executor, maxEvents, selectStrategyFactory.newSelectStrategy());
113+
return new IoHandlerFactory() {
114+
@Override
115+
public IoHandler newHandler(ThreadAwareExecutor executor) {
116+
return new EpollIoHandler(executor, maxEvents, selectStrategyFactory.newSelectStrategy());
117+
}
118+
119+
@Override
120+
public boolean isChangingThreadSupported() {
121+
return true;
122+
}
123+
};
114124
}
115125

116126
// Package-private for testing

transport-classes-kqueue/src/main/java/io/netty/channel/kqueue/KQueueIoHandler.java

Lines changed: 11 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -118,7 +118,17 @@ public static IoHandlerFactory newFactory(final int maxEvents,
118118
final SelectStrategyFactory selectStrategyFactory) {
119119
ObjectUtil.checkPositiveOrZero(maxEvents, "maxEvents");
120120
ObjectUtil.checkNotNull(selectStrategyFactory, "selectStrategyFactory");
121-
return executor -> new KQueueIoHandler(executor, maxEvents, selectStrategyFactory.newSelectStrategy());
121+
return new IoHandlerFactory() {
122+
@Override
123+
public IoHandler newHandler(ThreadAwareExecutor executor) {
124+
return new KQueueIoHandler(executor, maxEvents, selectStrategyFactory.newSelectStrategy());
125+
}
126+
127+
@Override
128+
public boolean isChangingThreadSupported() {
129+
return true;
130+
}
131+
};
122132
}
123133

124134
private KQueueIoHandler(ThreadAwareExecutor executor, int maxEvents, SelectStrategy strategy) {

transport-native-io_uring/src/test/java/io/netty/channel/uring/IoUringEventLoopTest.java

Lines changed: 1 addition & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -27,7 +27,6 @@
2727
import org.junit.jupiter.api.Test;
2828

2929
import java.util.concurrent.CountDownLatch;
30-
import java.util.concurrent.Executor;
3130
import java.util.concurrent.TimeUnit;
3231

3332
import static org.junit.jupiter.api.Assertions.assertTrue;
@@ -54,8 +53,7 @@ protected EventLoopGroup newEventLoopGroup() {
5453

5554
@Override
5655
protected EventLoopGroup newAutoScalingEventLoopGroup() {
57-
return new MultiThreadIoEventLoopGroup(SCALING_MAX_THREADS, (Executor) null, AUTO_SCALING_CHOOSER_FACTORY,
58-
newIoHandlerFactory());
56+
return null;
5957
}
6058

6159
@Override

transport/src/main/java/io/netty/channel/IoHandlerFactory.java

Lines changed: 12 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -29,4 +29,16 @@ public interface IoHandlerFactory {
2929
* @return a new {@link IoHandler} instance.
3030
*/
3131
IoHandler newHandler(ThreadAwareExecutor ioExecutor);
32+
33+
/**
34+
* Returns {@code true} if it's supported that the {@link ThreadAwareExecutor} might change its {@link Thread}
35+
* during the life-time of the {@link IoHandler} that can be created via {@link #newHandler(ThreadAwareExecutor)}.
36+
* That said even if changing the {@link Thread} is supported it must be guaranteed that the access rules specified
37+
* by {@link IoHandler} are not violated.
38+
*
39+
* @return {@code true} if changing is supported, {@code false} otherwise.
40+
*/
41+
default boolean isChangingThreadSupported() {
42+
return false;
43+
}
3244
}

transport/src/main/java/io/netty/channel/SingleThreadIoEventLoop.java

Lines changed: 18 additions & 10 deletions
Original file line numberDiff line numberDiff line change
@@ -84,9 +84,10 @@ public boolean shouldReportActiveIoTime() {
8484
*/
8585
public SingleThreadIoEventLoop(IoEventLoopGroup parent, ThreadFactory threadFactory,
8686
IoHandlerFactory ioHandlerFactory) {
87-
super(parent, threadFactory, false, true);
87+
super(parent, threadFactory, false,
88+
ObjectUtil.checkNotNull(ioHandlerFactory, "ioHandlerFactory").isChangingThreadSupported());
8889
this.maxTaskProcessingQuantumNs = DEFAULT_MAX_TASK_PROCESSING_QUANTUM_NS;
89-
this.ioHandler = ObjectUtil.checkNotNull(ioHandlerFactory, "ioHandlerFactory").newHandler(this);
90+
this.ioHandler = ioHandlerFactory.newHandler(this);
9091
}
9192

9293
/**
@@ -98,9 +99,10 @@ public SingleThreadIoEventLoop(IoEventLoopGroup parent, ThreadFactory threadFact
9899
* handle IO.
99100
*/
100101
public SingleThreadIoEventLoop(IoEventLoopGroup parent, Executor executor, IoHandlerFactory ioHandlerFactory) {
101-
super(parent, executor, false, true);
102+
super(parent, executor, false,
103+
ObjectUtil.checkNotNull(ioHandlerFactory, "ioHandlerFactory").isChangingThreadSupported());
102104
this.maxTaskProcessingQuantumNs = DEFAULT_MAX_TASK_PROCESSING_QUANTUM_NS;
103-
this.ioHandler = ObjectUtil.checkNotNull(ioHandlerFactory, "ioHandlerFactory").newHandler(this);
105+
this.ioHandler = ioHandlerFactory.newHandler(this);
104106
}
105107

106108
/**
@@ -123,12 +125,14 @@ public SingleThreadIoEventLoop(IoEventLoopGroup parent, Executor executor, IoHan
123125
public SingleThreadIoEventLoop(IoEventLoopGroup parent, ThreadFactory threadFactory,
124126
IoHandlerFactory ioHandlerFactory, int maxPendingTasks,
125127
RejectedExecutionHandler rejectedExecutionHandler, long maxTaskProcessingQuantumMs) {
126-
super(parent, threadFactory, false, true, maxPendingTasks, rejectedExecutionHandler);
128+
super(parent, threadFactory, false,
129+
ObjectUtil.checkNotNull(ioHandlerFactory, "ioHandlerFactory").isChangingThreadSupported(),
130+
maxPendingTasks, rejectedExecutionHandler);
127131
this.maxTaskProcessingQuantumNs =
128132
ObjectUtil.checkPositiveOrZero(maxTaskProcessingQuantumMs, "maxTaskProcessingQuantumMs") == 0 ?
129133
DEFAULT_MAX_TASK_PROCESSING_QUANTUM_NS :
130134
TimeUnit.MILLISECONDS.toNanos(maxTaskProcessingQuantumMs);
131-
this.ioHandler = ObjectUtil.checkNotNull(ioHandlerFactory, "ioHandlerFactory").newHandler(this);
135+
this.ioHandler = ioHandlerFactory.newHandler(this);
132136
}
133137

134138
/**
@@ -150,12 +154,14 @@ public SingleThreadIoEventLoop(IoEventLoopGroup parent, Executor executor,
150154
IoHandlerFactory ioHandlerFactory, int maxPendingTasks,
151155
RejectedExecutionHandler rejectedExecutionHandler,
152156
long maxTaskProcessingQuantumMs) {
153-
super(parent, executor, false, true, maxPendingTasks, rejectedExecutionHandler);
157+
super(parent, executor, false,
158+
ObjectUtil.checkNotNull(ioHandlerFactory, "ioHandlerFactory").isChangingThreadSupported(),
159+
maxPendingTasks, rejectedExecutionHandler);
154160
this.maxTaskProcessingQuantumNs =
155161
ObjectUtil.checkPositiveOrZero(maxTaskProcessingQuantumMs, "maxTaskProcessingQuantumMs") == 0 ?
156162
DEFAULT_MAX_TASK_PROCESSING_QUANTUM_NS :
157163
TimeUnit.MILLISECONDS.toNanos(maxTaskProcessingQuantumMs);
158-
this.ioHandler = ObjectUtil.checkNotNull(ioHandlerFactory, "ioHandlerFactory").newHandler(this);
164+
this.ioHandler = ioHandlerFactory.newHandler(this);
159165
}
160166

161167
/**
@@ -175,9 +181,11 @@ protected SingleThreadIoEventLoop(IoEventLoopGroup parent, Executor executor,
175181
IoHandlerFactory ioHandlerFactory, Queue<Runnable> taskQueue,
176182
Queue<Runnable> tailTaskQueue,
177183
RejectedExecutionHandler rejectedExecutionHandler) {
178-
super(parent, executor, false, true, taskQueue, tailTaskQueue, rejectedExecutionHandler);
184+
super(parent, executor, false,
185+
ObjectUtil.checkNotNull(ioHandlerFactory, "ioHandlerFactory").isChangingThreadSupported(),
186+
taskQueue, tailTaskQueue, rejectedExecutionHandler);
179187
this.maxTaskProcessingQuantumNs = DEFAULT_MAX_TASK_PROCESSING_QUANTUM_NS;
180-
this.ioHandler = ObjectUtil.checkNotNull(ioHandlerFactory, "ioHandlerFactory").newHandler(this);
188+
this.ioHandler = ioHandlerFactory.newHandler(this);
181189
}
182190

183191
@Override

transport/src/main/java/io/netty/channel/nio/NioIoHandler.java

Lines changed: 11 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -766,6 +766,16 @@ public static IoHandlerFactory newFactory(final SelectorProvider selectorProvide
766766
final SelectStrategyFactory selectStrategyFactory) {
767767
ObjectUtil.checkNotNull(selectorProvider, "selectorProvider");
768768
ObjectUtil.checkNotNull(selectStrategyFactory, "selectStrategyFactory");
769-
return context -> new NioIoHandler(context, selectorProvider, selectStrategyFactory.newSelectStrategy());
769+
return new IoHandlerFactory() {
770+
@Override
771+
public IoHandler newHandler(ThreadAwareExecutor executor) {
772+
return new NioIoHandler(executor, selectorProvider, selectStrategyFactory.newSelectStrategy());
773+
}
774+
775+
@Override
776+
public boolean isChangingThreadSupported() {
777+
return true;
778+
}
779+
};
770780
}
771781
}

transport/src/test/java/io/netty/channel/MultiThreadIoEventLoopGroupTest.java

Lines changed: 5 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -80,6 +80,11 @@ public IoRegistration register(IoHandle handle) {
8080
}
8181
};
8282
}
83+
84+
@Override
85+
public boolean isChangingThreadSupported() {
86+
return true;
87+
}
8388
}
8489

8590
private static class TestableIoEventLoop extends SingleThreadIoEventLoop {

0 commit comments

Comments
 (0)