Skip to content

Commit 7880714

Browse files
yawkatnormanmaurer
andauthored
Allow delayed set of owningThread in ManualIoEventLoop (#14976)
Motivation: In some scenarios, it is desirable to immediately create a ManualIoEventLoop, without knowing which thread it is assigned to. In particular, to implement `MultiThreadIoEventLoopGroup.newChild` properly, the owning thread is assigned through an `Executor` call, so we don't see it until the `Executor` actually invokes the `Runnable` it receives, but we need to return the `ManualIoEventLoop` from `newChild` immediately. Modification: Allow passing `null` for the `owningThread` constructor parameter, and add a `setOwningThread` setter that can be called at most once. Result: `newChild` can be implemented without hacks. The only use of the `owningThread` field is in `inEventLoop`. That means that the only difference compared to the old constructor approach is that there is a period of time when the future owning thread is running, but inEventLoop still returns `false`. I'm struggling to imagine scenarios where this matters, but @franz1981 had some thoughts, so I'm open to concerns and alternative solutions. --------- Co-authored-by: Norman Maurer <norman_maurer@apple.com>
1 parent 1fc0885 commit 7880714

2 files changed

Lines changed: 77 additions & 4 deletions

File tree

transport/src/main/java/io/netty/channel/ManualIoEventLoop.java

Lines changed: 22 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -35,6 +35,7 @@
3535
import java.util.concurrent.TimeUnit;
3636
import java.util.concurrent.TimeoutException;
3737
import java.util.concurrent.atomic.AtomicInteger;
38+
import java.util.concurrent.atomic.AtomicReference;
3839

3940
/**
4041
* {@link IoEventLoop} implementation that is owned by the user and so needs to be driven by the user manually with the
@@ -75,7 +76,7 @@ public long deadlineNanos() {
7576
};
7677
private final BlockingIoHandlerContext blockingContext = new BlockingIoHandlerContext();
7778
private final IoEventLoopGroup parent;
78-
private final Thread owningThread;
79+
private final AtomicReference<Thread> owningThread;
7980
private final IoHandler handler;
8081

8182
private volatile long gracefulShutdownQuietPeriod;
@@ -107,13 +108,14 @@ public ManualIoEventLoop(Thread owningThread, IoHandlerFactory factory) {
107108
* @param parent the parent {@link IoEventLoopGroup} or {@code null} if no parent.
108109
* @param owningThread the {@link Thread} that executes the IO and tasks for this {@link IoEventLoop}. The
109110
* user will use this {@link Thread} to call {@link #runNow()} or {@link #run(long)} to
110-
* make progress.
111+
* make progress. If {@code null}, must be set later using
112+
* {@link #setOwningThread(Thread)}.
111113
* @param factory the {@link IoHandlerFactory} that will be used to create the {@link IoHandler} that is
112114
* used by this {@link IoEventLoop}.
113115
*/
114116
public ManualIoEventLoop(IoEventLoopGroup parent, Thread owningThread, IoHandlerFactory factory) {
115117
this.parent = parent;
116-
this.owningThread = Objects.requireNonNull(owningThread, "owningThread");
118+
this.owningThread = new AtomicReference<>(owningThread);
117119
this.handler = factory.newHandler(this);
118120
state = new AtomicInteger(ST_STARTED);
119121
}
@@ -141,6 +143,9 @@ private int runAllTasks() {
141143

142144
private int run(IoHandlerContext context) {
143145
if (!initialized) {
146+
if (owningThread.get() == null) {
147+
throw new IllegalStateException("Owning thread not set");
148+
}
144149
initialized = true;
145150
handler.initialize();
146151
}
@@ -295,7 +300,20 @@ public boolean isIoType(Class<? extends IoHandler> handlerType) {
295300

296301
@Override
297302
public boolean inEventLoop(Thread thread) {
298-
return this.owningThread == thread;
303+
return this.owningThread.get() == thread;
304+
}
305+
306+
/**
307+
* Set the owning thread that will call {@link #run}. May only be called once, and only if the owning thread was
308+
* not set in the constructor already.
309+
*
310+
* @param owningThread The owning thread
311+
*/
312+
public void setOwningThread(Thread owningThread) {
313+
Objects.requireNonNull(owningThread, "owningThread");
314+
if (!this.owningThread.compareAndSet(null, owningThread)) {
315+
throw new IllegalStateException("Owning thread already set");
316+
}
299317
}
300318

301319
private void shutdown0(long quietPeriod, long timeout, int shutdownState) {

transport/src/test/java/io/netty/channel/ManualIoEventLoopTest.java

Lines changed: 55 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -26,6 +26,8 @@
2626
import java.util.Set;
2727
import java.util.concurrent.BlockingQueue;
2828
import java.util.concurrent.Callable;
29+
import java.util.concurrent.CompletableFuture;
30+
import java.util.concurrent.ExecutionException;
2931
import java.util.concurrent.LinkedBlockingQueue;
3032
import java.util.concurrent.RejectedExecutionException;
3133
import java.util.concurrent.Semaphore;
@@ -216,6 +218,59 @@ public Boolean call() {
216218
}
217219
}
218220

221+
@Test
222+
public void testDelayOwningThread() throws ExecutionException, InterruptedException {
223+
Semaphore semaphore = new Semaphore(0);
224+
ManualIoEventLoop eventLoop = new ManualIoEventLoop(null, executor ->
225+
new TestIoHandler(semaphore));
226+
Thread thread = new Thread(() -> {
227+
eventLoop.setOwningThread(Thread.currentThread());
228+
assertTrue(eventLoop.inEventLoop());
229+
while (!eventLoop.isTerminated()) {
230+
eventLoop.runNow();
231+
}
232+
});
233+
234+
assertFalse(eventLoop.inEventLoop());
235+
236+
CompletableFuture<Void> cf = new CompletableFuture<>();
237+
eventLoop.execute(() -> {
238+
assertTrue(eventLoop.inEventLoop());
239+
cf.complete(null);
240+
});
241+
242+
thread.start();
243+
cf.get();
244+
245+
eventLoop.shutdownGracefully();
246+
thread.join();
247+
}
248+
249+
@Test
250+
public void testRunWithoutOwner() throws ExecutionException, InterruptedException {
251+
ManualIoEventLoop eventLoop = new ManualIoEventLoop(null, executor ->
252+
new TestIoHandler(new Semaphore(0)));
253+
254+
// prior to setOwningThread, runNow is forbidden
255+
assertThrows(IllegalStateException.class, eventLoop::runNow);
256+
257+
eventLoop.setOwningThread(Thread.currentThread());
258+
259+
eventLoop.runNow(); // runs fine
260+
261+
eventLoop.shutdownGracefully();
262+
}
263+
264+
@Test
265+
public void testSetOwnerMultipleTimes() {
266+
ManualIoEventLoop eventLoop = new ManualIoEventLoop(null, executor ->
267+
new TestIoHandler(new Semaphore(0)));
268+
eventLoop.setOwningThread(Thread.currentThread());
269+
assertThrows(IllegalStateException.class, () -> eventLoop.setOwningThread(Thread.currentThread()));
270+
271+
eventLoop.shutdownGracefully();
272+
}
273+
219274
private static final class TestRunnable implements Runnable {
220275
private boolean done;
221276
@Override

0 commit comments

Comments
 (0)