Skip to content

Commit 9a183ec

Browse files
committed
Add methods to easily release messages from inbound / outbound buffer of EmbeddedChannel
Motivation: Often the user uses EmbeddedChannel within unit tests where the only "important" thing is to know if any pending messages were in the buffer and then release these. We should provide methods for this so the user not need to manually loop through these and release. Modifications: Add methods to easily handle releasing of messages. Result: Less boiler-plate code for the user to write.
1 parent 2696778 commit 9a183ec

2 files changed

Lines changed: 150 additions & 2 deletions

File tree

transport/src/main/java/io/netty/channel/embedded/EmbeddedChannel.java

Lines changed: 59 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -290,9 +290,66 @@ public boolean writeOutbound(Object... msgs) {
290290
* @return bufferReadable returns {@code true} if any of the used buffers has something left to read
291291
*/
292292
public boolean finish() {
293+
return finish(false);
294+
}
295+
296+
/**
297+
* Mark this {@link Channel} as finished and release all pending message in the inbound and outbound buffer.
298+
* Any futher try to write data to it will fail.
299+
*
300+
* @return bufferReadable returns {@code true} if any of the used buffers has something left to read
301+
*/
302+
public boolean finishAndReleaseAll() {
303+
return finish(true);
304+
}
305+
306+
/**
307+
* Mark this {@link Channel} as finished. Any futher try to write data to it will fail.
308+
*
309+
* @param releaseAll if {@code true} all pending message in the inbound and outbound buffer are released.
310+
* @return bufferReadable returns {@code true} if any of the used buffers has something left to read
311+
*/
312+
private boolean finish(boolean releaseAll) {
293313
close();
294-
checkException();
295-
return isNotEmpty(inboundMessages) || isNotEmpty(outboundMessages);
314+
try {
315+
checkException();
316+
return isNotEmpty(inboundMessages) || isNotEmpty(outboundMessages);
317+
} finally {
318+
if (releaseAll) {
319+
releaseAll(inboundMessages);
320+
releaseAll(outboundMessages);
321+
}
322+
}
323+
}
324+
325+
/**
326+
* Release all buffered inbound messages and return {@code true} if any were in the inbound buffer, {@code false}
327+
* otherwise.
328+
*/
329+
public boolean releaseInbound() {
330+
return releaseAll(inboundMessages);
331+
}
332+
333+
/**
334+
* Release all buffered outbound messages and return {@code true} if any were in the outbound buffer, {@code false}
335+
* otherwise.
336+
*/
337+
public boolean releaseOutbound() {
338+
return releaseAll(outboundMessages);
339+
}
340+
341+
private static boolean releaseAll(Queue<Object> queue) {
342+
if (isNotEmpty(queue)) {
343+
for (;;) {
344+
Object msg = queue.poll();
345+
if (msg == null) {
346+
break;
347+
}
348+
ReferenceCountUtil.release(msg);
349+
}
350+
return true;
351+
}
352+
return false;
296353
}
297354

298355
private void finishPendingTasks(boolean cancel) {

transport/src/test/java/io/netty/channel/embedded/EmbeddedChannelTest.java

Lines changed: 91 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -15,6 +15,8 @@
1515
*/
1616
package io.netty.channel.embedded;
1717

18+
import io.netty.buffer.ByteBuf;
19+
import io.netty.buffer.Unpooled;
1820
import io.netty.channel.Channel;
1921
import io.netty.channel.ChannelFuture;
2022
import io.netty.channel.ChannelHandler;
@@ -233,6 +235,95 @@ public void testHasNoDisconnect() {
233235
assertNull(handler.pollEvent());
234236
}
235237

238+
@Test
239+
public void testFinishAndReleaseAll() {
240+
ByteBuf in = Unpooled.buffer();
241+
ByteBuf out = Unpooled.buffer();
242+
try {
243+
EmbeddedChannel channel = new EmbeddedChannel();
244+
assertTrue(channel.writeInbound(in));
245+
assertEquals(1, in.refCnt());
246+
247+
assertTrue(channel.writeOutbound(out));
248+
assertEquals(1, out.refCnt());
249+
250+
assertTrue(channel.finishAndReleaseAll());
251+
assertEquals(0, in.refCnt());
252+
assertEquals(0, out.refCnt());
253+
254+
assertNull(channel.readInbound());
255+
assertNull(channel.readOutbound());
256+
} finally {
257+
release(in, out);
258+
}
259+
}
260+
261+
@Test
262+
public void testReleaseInbound() {
263+
ByteBuf in = Unpooled.buffer();
264+
ByteBuf out = Unpooled.buffer();
265+
try {
266+
EmbeddedChannel channel = new EmbeddedChannel();
267+
assertTrue(channel.writeInbound(in));
268+
assertEquals(1, in.refCnt());
269+
270+
assertTrue(channel.writeOutbound(out));
271+
assertEquals(1, out.refCnt());
272+
273+
assertTrue(channel.releaseInbound());
274+
assertEquals(0, in.refCnt());
275+
assertEquals(1, out.refCnt());
276+
277+
assertTrue(channel.finish());
278+
assertNull(channel.readInbound());
279+
280+
ByteBuf buffer = channel.readOutbound();
281+
assertSame(out, buffer);
282+
buffer.release();
283+
284+
assertNull(channel.readOutbound());
285+
} finally {
286+
release(in, out);
287+
}
288+
}
289+
290+
@Test
291+
public void testReleaseOutbound() {
292+
ByteBuf in = Unpooled.buffer();
293+
ByteBuf out = Unpooled.buffer();
294+
try {
295+
EmbeddedChannel channel = new EmbeddedChannel();
296+
assertTrue(channel.writeInbound(in));
297+
assertEquals(1, in.refCnt());
298+
299+
assertTrue(channel.writeOutbound(out));
300+
assertEquals(1, out.refCnt());
301+
302+
assertTrue(channel.releaseOutbound());
303+
assertEquals(1, in.refCnt());
304+
assertEquals(0, out.refCnt());
305+
306+
assertTrue(channel.finish());
307+
assertNull(channel.readOutbound());
308+
309+
ByteBuf buffer = channel.readInbound();
310+
assertSame(in, buffer);
311+
buffer.release();
312+
313+
assertNull(channel.readInbound());
314+
} finally {
315+
release(in, out);
316+
}
317+
}
318+
319+
private static void release(ByteBuf... buffers) {
320+
for (ByteBuf buffer : buffers) {
321+
if (buffer.refCnt() > 0) {
322+
buffer.release();
323+
}
324+
}
325+
}
326+
236327
private static final class EventOutboundHandler extends ChannelOutboundHandlerAdapter {
237328
static final Integer DISCONNECT = 0;
238329
static final Integer CLOSE = 1;

0 commit comments

Comments
 (0)