8000 `StreamBufferingEncoder` doesn't work when settings auto-ack is disab… · netty/netty@abc2310 · GitHub
[go: up one dir, main page]

Skip to content

Commit abc2310

Browse files
StreamBufferingEncoder doesn't work when settings auto-ack is disabled (#15301)
Motivation: When users do ``` Http2FrameCodecBuilder.forClient() .autoAckSettingsFrame(false) .encoderEnforceMaxConcurrentStreams(true) ``` `StreamBufferingEncoder` will be out of sync with wrapped `DefaultHttp2ConnectionEncoder` on `maxConcurrentStreams` value because `StreamBufferingEncoder` updates its internal state only when auto-ack settings is enabled (via `remoteSettings`). When it's disabled, `DefaultHttp2ConnectionEncoder` will call `remoteSettings` internally on `writeSettingsAck` and none of `DecoratingHttp2ConnectionEncoder` wrappers will see that. Modifications: - Implement `StreamBufferingEncoder.writeSettingsAck` and update `maxConcurrentStreams` value after the super class returns control flow. - Parametrize some of `StreamBufferingEncoderTest` tests to verify both flows: with and without auto-ack settings. Result: `StreamBufferingEncoder` keeps its internal `maxConcurrentStreams` state in sync with underlying `DefaultHttp2ConnectionEncoder` regardless of `autoAckSettings` configuration.
1 parent 1fa1b7e commit abc2310

File tree

2 files changed

+79
-59
lines changed

2 files changed

+79
-59
lines changed

codec-http2/src/main/java/io/netty/handler/codec/http2/StreamBufferingEncoder.java

Lines changed: 13 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -229,15 +229,27 @@ public ChannelFuture writeData(ChannelHandlerContext ctx, int streamId, ByteBuf
229229
return promise;
230230
}
231231

232+
@Override
233+
public ChannelFuture writeSettingsAck(ChannelHandlerContext ctx, ChannelPromise promise) {
234+
final ChannelFuture future = super.writeSettingsAck(ctx, promise);
235+
// In case autoAckSettings was set to false, decorated DefaultHttp2ConnectionEncoder will dequeue pending
236+
// settings and call remoteSettings on its own instance. Therefore, we need to consume potentially updated value
237+
// after this method returns.
238+
updateMaxConcurrentStreams();
239+
return future;
240+
}
241+
232242
@Override
233243
public void remoteSettings(Http2Settings settings) throws Http2Exception {
234244
// Need to let the delegate decoder handle the settings first, so that it sees the
235245
// new setting before we attempt to create any new streams.
236246
super.remoteSettings(settings);
247+
updateMaxConcurrentStreams();
248+
}
237249

250+
private void updateMaxConcurrentStreams() {
238251
// Get the updated value for SETTINGS_MAX_CONCURRENT_STREAMS.
239252
maxConcurrentStreams = connection().local().maxActiveStreams();
240-
241253
// Try to create new streams up to the new threshold.
242254
tryCreatePendingStreams();
243255
}

codec-http2/src/test/java/io/netty/handler/codec/http2/StreamBufferingEncoderTest.java

Lines changed: 66 additions & 58 deletions
Original file line numberDiff line numberDiff line change
@@ -15,30 +15,6 @@
1515

1616
package io.netty.handler.codec.http2;
1717

18-
import static io.netty.buffer.Unpooled.EMPTY_BUFFER;
19-
import static io.netty.handler.codec.http2.Http2CodecUtil.DEFAULT_MAX_FRAME_SIZE;
20-
import static io.netty.handler.codec.http2.Http2CodecUtil.DEFAULT_PRIORITY_WEIGHT;
21-
import static io.netty.handler.codec.http2.Http2CodecUtil.SMALLEST_MAX_CONCURRENT_STREAMS;
22-
import static io.netty.handler.codec.http2.Http2Error.CANCEL;
23-
import static io.netty.handler.codec.http2.Http2Stream.State.HALF_CLOSED_LOCAL;
24-
import static org.junit.jupiter.api.Assertions.assertEquals;
25-
import static org.junit.jupiter.api.Assertions.assertFalse;
26-
import static org.junit.jupiter.api.Assertions.assertNotNull;
27-
import static org.junit.jupiter.api.Assertions.assertNull;
28-
import static org.junit.jupiter.api.Assertions.assertTrue;
29-
import static org.mockito.Mockito.any;
30-
import static org.mockito.Mockito.anyBoolean;
31-
import static org.mockito.Mockito.anyInt;
32-
import static org.mockito.Mockito.anyLong;
33-
import static org.mockito.Mockito.anyShort;
34-
import static org.mockito.Mockito.eq;
35-
import static org.mockito.Mockito.doAnswer;
36-
import static org.mockito.Mockito.mock;
37-
import static org.mockito.Mockito.never;
38-
import static org.mockito.Mockito.times;
39-
import static org.mockito.Mockito.verify;
40-
import static org.mockito.Mockito.when;
41-
4218
import io.netty.buffer.ByteBuf;
4319
import io.netty.buffer.Unpooled;
4420
import io.netty.buffer.UnpooledByteBufAllocator;
@@ -57,6 +33,8 @@
5733
import org.junit.jupiter.api.AfterEach;
5834
import org.junit.jupiter.api.BeforeEach;
5935
import org.junit.jupiter.api.Test;
36+
import org.junit.jupiter.params.ParameterizedTest;
37+
import org.junit.jupiter.params.provider.ValueSource;
6038
import org.mockito.ArgumentCaptor;
6139
import org.mockito.Mock;
6240
import org.mockito.MockitoAnnotations;
@@ -67,6 +45,32 @@
6745
import java.util.ArrayList;
6846
import java.util.List;
6947

48+
import static io.netty.buffer.Unpooled.EMPTY_BUFFER;
49+
import static io.netty.handler.codec.http2.Http2CodecUtil.DEFAULT_MAX_FRAME_SIZE;
50+
import static io.netty.handler.codec.http2.Http2CodecUtil.DEFAULT_PRIORITY_WEIGHT;
51+
import static io.netty.handler.codec.http2.Http2CodecUtil.SMALLEST_MAX_CONCURRENT_STREAMS;
52+
import static io.netty.handler.codec.http2.Http2Error.CANCEL;
53+
import static io.netty.handler.codec.http2.Http2PromisedRequestVerifier.ALWAYS_VERIFY;
54+
import static io.netty.handler.codec.http2.Http2Stream.State.HALF_CLOSED_LOCAL;
55+
import static org.junit.jupiter.api.Assertions.assertEquals;
56+
import static org.junit.jupiter.api.Assertions.assertFalse;
57+
import static org.junit.jupiter.api.Assertions.assertInstanceOf;
58+
import static org.junit.jupiter.api.Assertions.assertNotNull;
59+
import static org.junit.jupiter.api.Assertions.assertNull;
60+
import static org.junit.jupiter.api.Assertions.assertTrue;
61+
import static org.mockito.Mockito.any;
62+
import static org.mockito.Mockito.anyBoolean;
63+
import static org.mockito.Mockito.anyInt;
64+
import static org.mockito.Mockito.anyLong;
65+
import static org.mockito.Mockito.anyShort;
66+
import static org.mockito.Mockito.doAnswer;
67+
import static org.mockito.Mockito.eq;
68+
import static org.mockito.Mockito.mock;
69+
import static org.mockito.Mockito.never;
70+
import static org.mockito.Mockito.times;
71+
import static org.mockito.Mockito.verify;
72+
import static org.mockito.Mockito.when;
73+
7074
/**
7175
* Tests for {@link StreamBufferingEncoder}.
7276
*/
@@ -126,8 +130,8 @@ public void setup() throws Exception {
126130
DefaultHttp2ConnectionEncoder defaultEncoder =
127131
new DefaultHttp2ConnectionEncoder(connection, writer);
128132
encoder = new StreamBufferingEncoder(defaultEncoder);
129-
DefaultHttp2ConnectionDecoder decoder =
130-
new DefaultHttp2ConnectionDecoder(connection, encoder, mock(Http2FrameReader.class));
133+
DefaultHttp2ConnectionDecoder decoder = new DefaultHttp2ConnectionDecoder(
134+
connection, encoder, mock(Http2FrameReader.class), ALWAYS_VERIFY, false);
131135
Http2ConnectionHandler handler = new Http2ConnectionHandlerBuilder()
132136
.frameListener(mock(Http2FrameListener.class))
133137
.codec(decoder, encoder).build();
@@ -184,7 +188,6 @@ public void multipleWritesToActiveStream() {
184188

185189
@Test
186190
public void ensureCanCreateNextStreamWhenStreamCloses() {
187-
encoder.writeSettingsAck(ctx, newPromise());
188191
setMaxConcurrentStreams(1);
189192

190193
encoderWriteHeaders(3, newPromise());
@@ -209,10 +212,10 @@ public void ensureCanCreateNextStreamWhenStreamCloses() {
209212
assertEquals(1, encoder.numBufferedStreams());
210213
}
211214

212-
@Test
213-
public void alternatingWritesToActiveAndBufferedStreams() {
214-
encoder.writeSettingsAck(ctx, newPromise());
215-
setMaxConcurrentStreams(1);
215+
@ParameterizedTest(name = "{displayName} [{index}]: autoAckSettings={0}")
216+
@ValueSource(booleans = {true, false})
217+
public void alternatingWritesToActiveAndBufferedStreams(boolean autoAckSettings) {
218+
setMaxConcurrentStreams(autoAckSettings, 1);
216219

217220
encoderWriteHeaders(3, newPromise());
218221
assertEquals(0, encoder.numBufferedStreams());
@@ -230,7 +233,6 @@ public void alternatingWritesToActiveAndBufferedStreams() {
230233

231234
@Test
232235
public void bufferingNewStreamFailsAfterGoAwayReceived() throws Http2Exception {
233-
encoder.writeSettingsAck(ctx, newPromise());
234236
setMaxConcurrentStreams(0);
235237
connection.goAwayReceived(1, 8, EMPTY_BUFFER);
236238

@@ -243,7 +245,6 @@ public void bufferingNewStreamFailsAfterGoAwayReceived() throws Http2Exception {
243245

244246
@Test
245247
public void receivingGoAwayFailsBufferedStreams() throws Http2Exception {
246-
encoder.writeSettingsAck(ctx, newPromise());
247248
setMaxConcurrentStreams(5);
248249

249250
int streamId = 3;
@@ -271,19 +272,17 @@ public void receivingGoAwayFailsBufferedStreams() throws Http2Exception {
271272

272273
@Test
273274
public void receivingGoAwayFailsNewStreamIfMaxConcurrentStreamsReached() throws Http2Exception {
274-
encoder.writeSettingsAck(ctx, newPromise());
275275
setMaxConcurrentStreams(1);
276276
encoderWriteHeaders(3, newPromise());
277277
connection.goAwayReceived(11, 8, EMPTY_BUFFER);
278278
ChannelFuture f = encoderWriteHeaders(5, newPromise());
279279

280-
assertTrue(f.cause() instanceof Http2GoAwayException);
280+
assertInstanceOf(Http2GoAwayException.class, f.cause());
281281
assertEquals(0, encoder.numBufferedStreams());
282282
}
283283

284284
@Test
285285
public void sendingGoAwayShouldNotFailStreams() {
286-
encoder.writeSettingsAck(ctx, newPromise());
287286
setMaxConcurrentStreams(1);
288287

289288
when(writer.writeHeaders(any(ChannelHandlerContext.class), anyInt(), any(Http2Headers.class), anyInt(),
@@ -308,10 +307,10 @@ public void sendingGoAwayShouldNotFailStreams() {
308307
assertFalse(f3.isDone());
309308
}
310309

311-
@Test
312-
public void endStreamDoesNotFailBufferedStream() {
313-
encoder.writeSettingsAck(ctx, newPromise());
314-
setMaxConcurrentStreams(0);
310+
@ParameterizedTest(name = "{displayName} [{index}]: autoAckSettings={0}")
311+
@ValueSource(booleans = {true, false})
312+
public void endStreamDoesNotFailBufferedStream(boolean autoAckSettings) {
313+
setMaxConcurrentStreams(autoAckSettings, 0);
315314

316315
encoderWriteHeaders(3, newPromise());
317316
assertEquals(1, encoder.numBufferedStreams());
@@ -323,8 +322,7 @@ public void endStreamDoesNotFailBufferedStream() {
323322

324323
// Simulate that we received a SETTINGS frame which
325324
// increased MAX_CONCURRENT_STREAMS to 1.
326-
setMaxConcurrentStreams(1);
327-
encoder.writeSettingsAck(ctx, newPromise());
325+
setMaxConcurrentStreams(autoAckSettings, 1);
328326

329327
assertEquals(1, connection.numActiveStreams());
330328
assertEquals(0, encoder.numBufferedStreams());
@@ -333,7 +331,6 @@ public void endStreamDoesNotFailBufferedStream() {
333331

334332
@Test
335333
public void rstStreamClosesBufferedStream() {
336-
encoder.writeSettingsAck(ctx, newPromise());
337334
setMaxConcurrentStreams(0);
338335

339336
encoderWriteHeaders(3, newPromise());
@@ -345,10 +342,10 @@ public void rstStreamClosesBufferedStream() {
345342
assertEquals(0, encoder.numBufferedStreams());
346343
}
347344

348-
@Test
349-
public void bufferUntilActiveStreamsAreReset() throws Exception {
350-
encoder.writeSettingsAck(ctx, newPromise());
351-
setMaxConcurrentStreams(1);
345+
@ParameterizedTest(name = "{displayName} [{index}]: autoAckSettings={0}")
346+
@ValueSource(booleans = {true, false})
347+
public void bufferUntilActiveStreamsAreReset(boolean autoAckSettings) throws Exception {
348+
setMaxConcurrentStreams(autoAckSettings, 1);
352349

353350
encoderWriteHeaders(3, newPromise());
354351
assertEquals(0, encoder.numBufferedStreams());
@@ -379,10 +376,10 @@ public void bufferUntilActiveStreamsAreReset() throws Exception {
379376
assertEquals(0, encoder.numBufferedStreams());
380377
}
381378

382-
@Test
383-
public void bufferUntilMaxStreamsIncreased() {
384-
encoder.writeSettingsAck(ctx, newPromise());
385-
setMaxConcurrentStreams(2);
379+
@ParameterizedTest(name = "{displayName} [{index}]: autoAckSettings={0}")
380+
@ValueSource(booleans = {true, false})
381+
public void bufferUntilMaxStreamsIncreased(boolean autoAckSettings) {
382+
setMaxConcurrentStreams(autoAckSettings, 2);
386383

387384
encoderWriteHeaders(3, newPromise());
388385
encoderWriteHeaders(5, newPromise());
@@ -397,8 +394,7 @@ public void bufferUntilMaxStreamsIncreased() {
397394

398395
// Simulate that we received a SETTINGS frame which
399396
// increased MAX_CONCURRENT_STREAMS to 5.
400-
setMaxConcurrentStreams(5);
401-
encoder.writeSettingsAck(ctx, newPromise());
397+
setMaxConcurrentStreams(autoAckSettings, 5);
402398

403399
assertEquals(0, encoder.numBufferedStreams());
404400
writeVerifyWriteHeaders(times(1), 7);
@@ -411,8 +407,9 @@ public void bufferUntilMaxStreamsIncreased() {
411407
assertEquals(5, connection.local().numActiveStreams());
412408
}
413409

414-
@Test
415-
public void bufferUntilSettingsReceived() throws Http2Exception {
410+
@ParameterizedTest(name = "{displayName} [{index}]: autoAckSettings={0}")
411+
@ValueSource(booleans = {true, false})
412+
public void bufferUntilSettingsReceived(boolean autoAckSettings) {
416413
int initialLimit = SMALLEST_MAX_CONCURRENT_STREAMS;
417414
int numStreams = initialLimit * 2;
418415
for (int ix = 0, nextStreamId = 3; ix < numStreams; ++ix, nextStreamId += 2) {
@@ -426,7 +423,7 @@ public void bufferUntilSettingsReceived() throws Http2Exception {
426423
assertEquals(numStreams / 2, encoder.numBufferedStreams());
427424

428425
// Simulate that we received a SETTINGS frame.
429-
setMaxConcurrentStreams(initialLimit * 2);
426+
setMaxConcurrentStreams(autoAckSettings, initialLimit * 2);
430427

431428
assertEquals(0, encoder.numBufferedStreams());
432429
assertEquals(numStreams, connection.local().numActiveStreams());
@@ -471,7 +468,6 @@ public void exhaustedStreamsDoNotBuffer() throws Http2Exception {
471468

472469
@Test
473470
public void closedBufferedStreamReleasesByteBuf() {
474-
encoder.writeSettingsAck(ctx, newPromise());
475471
setMaxConcurrentStreams(0);
476472
ByteBuf data = mock(ByteBuf.class);
477473
ChannelFuture f1 = encoderWriteHeaders(3, newPromise());
@@ -525,8 +521,20 @@ private void testStreamId(int nextStreamId) throws Http2Exception {
525521
}
526522

527523
private void setMaxConcurrentStreams(int newValue) {
524+
setMaxConcurrentStreams(true, newValue);
525+
}
526+
527+
private void setMaxConcurrentStreams(boolean autoAckSettings, int newValue) {
528528
try {
529-
encoder.remoteSettings(new Http2Settings().maxConcurrentStreams(newValue));
529+
Http2Settings settings = new Http2Settings().maxConcurrentStreams(newValue);
530+
// Mimic behavior of DefaultHttp2ConnectionDecoder.onSettingsRead:
531+
if (autoAckSettings) {
532+
encoder.writeSettingsAck(ctx, newPromise());
533+
encoder.remoteSettings(settings);
534+
} else {
535+
encoder.consumeReceivedSettings(settings);
536+
encoder.writeSettingsAck(ctx, newPromise());
537+
}
530538
// Flush the remote flow controller to write data
531539
encoder.flowController().writePendingBytes();
532540
} catch (Http2Exception e) {

0 commit comments

Comments
 (0)
0