2424import io .netty .channel .ChannelHandlerContext ;
2525import io .netty .channel .ChannelId ;
2626import io .netty .channel .ChannelMetadata ;
27+ import io .netty .channel .ChannelOption ;
2728import io .netty .channel .ChannelOutboundBuffer ;
2829import io .netty .channel .ChannelPipeline ;
2930import io .netty .channel .ChannelProgressivePromise ;
4142import io .netty .handler .ssl .SslCloseCompletionEvent ;
4243import io .netty .util .DefaultAttributeMap ;
4344import io .netty .util .ReferenceCountUtil ;
45+ import io .netty .util .internal .ObjectUtil ;
4446import io .netty .util .internal .StringUtil ;
4547import io .netty .util .internal .logging .InternalLogger ;
4648import io .netty .util .internal .logging .InternalLoggerFactory ;
4951import java .net .SocketAddress ;
5052import java .nio .channels .ClosedChannelException ;
5153import java .util .ArrayDeque ;
54+ import java .util .Map ;
5255import java .util .Queue ;
5356import java .util .concurrent .RejectedExecutionException ;
5457import java .util .concurrent .atomic .AtomicIntegerFieldUpdater ;
@@ -906,24 +909,19 @@ void readEOS() {
906909 readEOS = true ;
907910 }
908911
909- private void updateLocalWindowIfNeeded () {
910- if (flowControlledBytes != 0 && !parentContext ().isRemoved ()) {
912+ private boolean updateLocalWindowIfNeeded () {
913+ if (flowControlledBytes != 0 && !parentContext ().isRemoved () && config . autoStreamFlowControl ) {
911914 int bytes = flowControlledBytes ;
912915 flowControlledBytes = 0 ;
913- ChannelFuture future = write0 (parentContext (), new DefaultHttp2WindowUpdateFrame (bytes ).stream (stream ));
914- // window update frames are commonly swallowed by the Http2FrameCodec and the promise is synchronously
915- // completed but the flow controller _may_ have generated a wire level WINDOW_UPDATE. Therefore we need,
916- // to assume there was a write done that needs to be flushed or we risk flow control starvation.
917- writeDoneAndNoFlush = true ;
918- // Add a listener which will notify and teardown the stream
919- // when a window update fails if needed or check the result of the future directly if it was completed
920- // already.
921- // See https://github.com/netty/netty/issues/9663
922- if (future .isDone ()) {
923- windowUpdateFrameWriteComplete (future , AbstractHttp2StreamChannel .this );
924- } else {
925- future .addListener (windowUpdateFrameWriteListener );
926- }
916+ writeWindowUpdateFrame (new DefaultHttp2WindowUpdateFrame (bytes ).stream (stream ));
917+ return true ;
918+ }
919+ return false ;
920+ }
921+
922+ void updateLocalWindowIfNeededAndFlush () {
923+ if (updateLocalWindowIfNeeded ()) {
924+ flush ();
927925 }
928926 }
929927
@@ -982,6 +980,24 @@ void doRead0(Http2Frame frame, RecvByteBufAllocator.Handle allocHandle) {
982980 pipeline ().fireChannelRead (frame );
983981 }
984982
983+ private ChannelFuture writeWindowUpdateFrame (Http2WindowUpdateFrame windowUpdateFrame ) {
984+ ChannelFuture future = write0 (parentContext (), windowUpdateFrame );
985+ // window update frames are commonly swallowed by the Http2FrameCodec and the promise is synchronously
986+ // completed but the flow controller _may_ have generated a wire level WINDOW_UPDATE. Therefore we need,
987+ // to assume there was a write done that needs to be flushed or we risk flow control starvation.
988+ writeDoneAndNoFlush = true ;
989+ // Add a listener which will notify and teardown the stream
990+ // when a window update fails if needed or check the result of the future directly if it was completed
991+ // already.
992+ // See https://github.com/netty/netty/issues/9663
993+ if (future .isDone ()) {
994+ windowUpdateFrameWriteComplete (future , AbstractHttp2StreamChannel .this );
995+ } else {
996+ future .addListener (windowUpdateFrameWriteListener );
997+ }
998+ return future ;
999+ }
1000+
9851001 @ Override
9861002 public void write (Object msg , final ChannelPromise promise ) {
9871003 // After this point its not possible to cancel a write anymore.
@@ -1001,7 +1017,42 @@ public void write(Object msg, final ChannelPromise promise) {
10011017 try {
10021018 if (msg instanceof Http2StreamFrame ) {
10031019 Http2StreamFrame frame = validateStreamFrame ((Http2StreamFrame ) msg ).stream (stream ());
1004- writeHttp2StreamFrame (frame , promise );
1020+ if (msg instanceof Http2WindowUpdateFrame ) {
1021+ Http2WindowUpdateFrame updateFrame = (Http2WindowUpdateFrame ) msg ;
1022+ if (config .autoStreamFlowControl ) {
1023+ ReferenceCountUtil .release (msg );
1024+ promise .setFailure (new UnsupportedOperationException (
1025+ Http2StreamChannelOption .AUTO_STREAM_FLOW_CONTROL + " is set to false" ));
1026+ return ;
1027+ }
1028+ try {
1029+ ObjectUtil .checkInRange (updateFrame .windowSizeIncrement (), 0 ,
1030+ flowControlledBytes , "windowSizeIncrement" );
1031+ } catch (RuntimeException e ) {
1032+ ReferenceCountUtil .release (updateFrame );
1033+ promise .setFailure (e );
1034+ return ;
1035+ }
1036+ flowControlledBytes -= updateFrame .windowSizeIncrement ();
1037+ if (parentContext ().isRemoved ()) {
1038+ ReferenceCountUtil .release (msg );
1039+ promise .setFailure (new ClosedChannelException ());
1040+ return ;
1041+ }
1042+ ChannelFuture f = writeWindowUpdateFrame (updateFrame );
1043+ if (f .isDone ()) {
1044+ writeComplete (f , promise );
1045+ } else {
1046+ f .addListener (new ChannelFutureListener () {
1047+ @ Override
1048+ public void operationComplete (ChannelFuture future ) {
1049+ writeComplete (future , promise );
1050+ }
1051+ });
1052+ }
1053+ } else {
1054+ writeHttp2StreamFrame (frame , promise );
1055+ }
10051056 } else {
10061057 String msgStr = msg .toString ();
10071058 ReferenceCountUtil .release (msg );
@@ -1152,6 +1203,8 @@ public ChannelOutboundBuffer outboundBuffer() {
11521203 * changes.
11531204 */
11541205 private static final class Http2StreamChannelConfig extends DefaultChannelConfig {
1206+
1207+ volatile boolean autoStreamFlowControl = true ;
11551208 Http2StreamChannelConfig (Channel channel ) {
11561209 super (channel );
11571210 }
@@ -1175,6 +1228,49 @@ public ChannelConfig setRecvByteBufAllocator(RecvByteBufAllocator allocator) {
11751228 super .setRecvByteBufAllocator (allocator );
11761229 return this ;
11771230 }
1231+
1232+ @ Override
1233+ public Map <ChannelOption <?>, Object > getOptions () {
1234+ return getOptions (
1235+ super .getOptions (),
1236+ Http2StreamChannelOption .AUTO_STREAM_FLOW_CONTROL );
1237+ }
1238+
1239+ @ SuppressWarnings ("unchecked" )
1240+ @ Override
1241+ public <T > T getOption (ChannelOption <T > option ) {
1242+ if (option == Http2StreamChannelOption .AUTO_STREAM_FLOW_CONTROL ) {
1243+ return (T ) Boolean .valueOf (autoStreamFlowControl );
1244+ }
1245+ return super .getOption (option );
1246+ }
1247+
1248+ @ Override
1249+ public <T > boolean setOption (ChannelOption <T > option , T value ) {
1250+ validate (option , value );
1251+ if (option == Http2StreamChannelOption .AUTO_STREAM_FLOW_CONTROL ) {
1252+ boolean newValue = (Boolean ) value ;
1253+ boolean changed = newValue && !autoStreamFlowControl ;
1254+ autoStreamFlowControl = (Boolean ) value ;
1255+ if (changed ) {
1256+ if (channel .isRegistered ()) {
1257+ final Http2ChannelUnsafe unsafe = (Http2ChannelUnsafe ) channel .unsafe ();
1258+ if (channel .eventLoop ().inEventLoop ()) {
1259+ unsafe .updateLocalWindowIfNeededAndFlush ();
1260+ } else {
1261+ channel .eventLoop ().execute (new Runnable () {
1262+ @ Override
1263+ public void run () {
1264+ unsafe .updateLocalWindowIfNeededAndFlush ();
1265+ }
1266+ });
1267+ }
1268+ }
1269+ }
1270+ return true ;
1271+ }
1272+ return super .setOption (option , value );
1273+ }
11781274 }
11791275
11801276 private void maybeAddChannelToReadCompletePendingQueue () {
0 commit comments