8000 stream id wrap fix (#667) · FTTF-git/rsocket-java@44f911b · GitHub
[go: up one dir, main page]

Skip to content

Commit 44f911b

Browse files
authored
stream id wrap fix (rsocket#667)
* wraps the stream id when it reaches 2^31 - it also checks for active streams and increams the id if it finds a collision Signed-off-by: Robert Roeser <rroeserr@gmail.com> * fix wrapping when stream id is 0 Signed-off-by: Robert Roeser <rroeserr@gmail.com>
1 parent 72cf960 commit 44f911b

File tree

4 files changed

+97
-22
lines changed

4 files changed

+97
-22
lines changed
Lines changed: 35 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,35 @@
1+
package io.rsocket;
2+
3+
import io.netty.util.collection.IntObjectMap;
4+
import io.rsocket.internal.SynchronizedIntObjectHashMap;
5+
import org.openjdk.jmh.annotations.*;
6+
import org.openjdk.jmh.infra.Blackhole;
7+
8+
@BenchmarkMode(Mode.Throughput)
9+
@Fork(
10+
value = 1 // , jvmArgsAppend = {"-Dio.netty.leakDetection.level=advanced"}
11+
)
12+
@Warmup(iterations = 10)
13+
@Measurement(iterations = 10)
14+
@State(Scope.Thread)
15+
public class StreamIdSupplierPerf {
16+
@Benchmark
17+
public void benchmarkStreamId(Input input) {
18+
int i = input.supplier.nextStreamId(input.map);
19+
input.bh.consume(i);
20+
}
21+
22+
@State(Scope.Benchmark)
23+
public static class Input {
24+
Blackhole bh;
25+
IntObjectMap map;
26+
StreamIdSupplier supplier;
27+
28+
@Setup
29+
public void setup(Blackhole bh) {
30+
this.supplier = StreamIdSupplier.clientSupplier();
31+
this.bh = bh;
32+
this.map = new SynchronizedIntObjectHashMap();
33+
}
34+
}
35+
}

rsocket-core/src/main/java/io/rsocket/RSocketRequester.java

Lines changed: 4 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -205,7 +205,7 @@ private Mono<Void> handleFireAndForget(Payload payload) {
205205
return Mono.error(err);
206206
}
207207

208-
final int streamId = streamIdSupplier.nextStreamId();
208+
final int streamId = streamIdSupplier.nextStreamId(receivers);
209209

210210
return emptyUnicastMono()
211211
.doOnSubscribe(
@@ -233,7 +233,7 @@ private Mono<Payload> handleRequestResponse(final Payload payload) {
233233
return Mono.error(err);
234234
}
235235

236-
int streamId = streamIdSupplier.nextStreamId();
236+
int streamId = streamIdSupplier.nextStreamId(receivers);
237237
final UnboundedProcessor<ByteBuf> sendProcessor = this.sendProcessor;
238238

239239
UnicastMonoProcessor<Payload> receiver = UnicastMonoProcessor.create();
@@ -274,7 +274,7 @@ private Flux<Payload> handleRequestStream(final Payload payload) {
274274
return Flux.error(err);
275275
}
276276

277-
int streamId = streamIdSupplier.nextStreamId();
277+
int streamId = streamIdSupplier.nextStreamId(receivers);
278278

279279
final UnboundedProcessor<ByteBuf> sendProcessor = this.sendProcessor;
280280
final UnicastProcessor<Payload> receiver = UnicastProcessor.create();
@@ -328,7 +328,7 @@ private Flux<Payload> handleChannel(Flux<Payload> request) {
328328

329329
final UnboundedProcessor<ByteBuf> sendProcessor = this.sendProcessor;
330330
final UnicastProcessor<Payload> receiver = UnicastProcessor.create();
331-
final int streamId = streamIdSupplier.nextStreamId();
331+
final int streamId = streamIdSupplier.nextStreamId(receivers);
332332

333333
return receiver
334334
.doOnRequest(

rsocket-core/src/main/java/io/rsocket/StreamIdSupplier.java

Lines changed: 14 additions & 8 deletions
F438
Original file line numberDiff line numberDiff line change
@@ -13,18 +13,20 @@
1313
* See the License for the specific language governing permissions and
1414
* limitations under the License.
1515
*/
16-
1716
package io.rsocket;
1817

19-
import java.util.concurrent.atomic.AtomicIntegerFieldUpdater;
18+
import io.netty.util.collection.IntObjectMap;
19+
import java.util.concurrent.atomic.AtomicLongFieldUpdater;
2020

2121
final class StreamIdSupplier {
22+
private static final int MASK = 0x7FFFFFFF;
2223

23-
private static final AtomicIntegerFieldUpdater<StreamIdSupplier> STREAM_ID =
24-
AtomicIntegerFieldUpdater.newUpdater(StreamIdSupplier.class, "streamId");
25-
private volatile int streamId;
24+
private static final AtomicLongFieldUpdater<StreamIdSupplier> STREAM_ID =
25+
AtomicLongFieldUpdater.newUpdater(StreamIdSupplier.class, "streamId");
26+
private volatile long streamId;
2627

27-
private StreamIdSupplier(int streamId) {
28+
// Visible for testing
29+
StreamIdSupplier(int streamId) {
2830
this.streamId = streamId;
2931
}
3032

@@ -36,8 +38,12 @@ static StreamIdSupplier serverSupplier() {
3638
return new StreamIdSupplier(0);
3739
}
3840

39-
int nextStreamId() {
40-
return STREAM_ID.addAndGet(this, 2);
41+
int nextStreamId(IntObjectMap<?> streamIds) {
42+
int streamId;
43+
do {
44+
streamId = (int) STREAM_ID.addAndGet(this, 2) & MASK;
45+
} while (streamId == 0 || streamIds.containsKey(streamId));
46+
return streamId;
4147
}
4248

4349
boolean isBeforeOrCurrent(int streamId) {

rsocket-core/src/test/java/io/rsocket/StreamIdSupplierTest.java

Lines changed: 44 additions & 10 deletions
34
Original file line numberDiff line numberDiff line change
@@ -20,37 +20,42 @@
2020
import static org.junit.Assert.assertFalse;
2121
import static org.junit.Assert.assertTrue;
2222

23+
import io.netty.util.collection.IntObjectMap;
24+
import io.rsocket.internal.SynchronizedIntObjectHashMap;
2325
import org.junit.Test;
2426

2527
public class StreamIdSupplierTest {
2628
@Test
2729
public void testClientSequence() {
30+
IntObjectMap<Object> map = new SynchronizedIntObjectHashMap<>();
2831
StreamIdSupplier s = StreamIdSupplier.clientSupplier();
29-
assertEquals(1, s.nextStreamId());
30-
assertEquals(3, s.nextStreamId());
31-
assertEquals(5, s.nextStreamId());
32+
assertEquals(1, s.nextStreamId(map));
33+
assertEquals(3, s.nextStreamId(map));
34+
assertEquals(5, s.nextStreamId(map));
3235
}
3336

37
@Test
3538
public void testServerSequence() {
39+
IntObjectMap<Object> map = new SynchronizedIntObjectHashMap<>();
3640
StreamIdSupplier s = StreamIdSupplier.serverSupplier();
37-
assertEquals(2, s.nextStreamId());
38-
assertEquals(4, s.nextStreamId());
39-
assertEquals(6, s.nextStreamId());
41+
assertEquals(2, s.nextStreamId(map));
42+
assertEquals(4, s.nextStreamId(map));
43+
assertEquals(6, s.nextStreamId(map));
4044
}
4145

4246
@Test
4347
public void testClientIsValid() {
48+
IntObjectMap<Object> map = new SynchronizedIntObjectHashMap<>();
4449
StreamIdSupplier s = StreamIdSupplier.clientSupplier();
4550

4651
assertFalse(s.isBeforeOrCurrent(1));
4752
assertFalse(s.isBeforeOrCurrent(3));
4853

49-
s.nextStreamId();
54+
s.nextStreamId(map);
5055
assertTrue(s.isBeforeOrCurrent(1));
5156
assertFalse(s.isBeforeOrCurrent(3));
5257

53-
s.nextStreamId();
58+
s.nextStreamId(map);
5459
assertTrue(s.isBeforeOrCurrent(3));
5560

5661
// negative
@@ -63,16 +68,17 @@ public void testClientIsValid() {
6368

6469
@Test
6570
public void testServerIsValid() {
71+
IntObjectMap<Object> map = new SynchronizedIntObjectHashMap<>();
6672
StreamIdSupplier s = StreamIdSupplier.serverSupplier();
6773

6874
assertFalse(s.isBeforeOrCurrent(2));
6975
assertFalse(s.isBeforeOrCurrent(4));
7076

71-
s.nextStreamId();
77+
s.nextStreamId(map);
7278
assertTrue(s.isBeforeOrCurrent(2));
7379
assertFalse(s.isBeforeOrCurrent(4));
7480

75-
s.nextStreamId();
81+
s.nextStreamId(map);
< FFAD /code>
7682
assertTrue(s.isBeforeOrCurrent(4));
7783

7884
// negative
@@ -82,4 +88,32 @@ public void testServerIsValid() {
8288
// client also accepted (checked externally)
8389
assertTrue(s.isBeforeOrCurrent(1));
8490
}
91+
92+
@Test
93+
public void testWrap() {
94+
IntObjectMap<Object> map = new SynchronizedIntObjectHashMap<>();
95+
StreamIdSupplier s = new StreamIdSupplier(Integer.MAX_VALUE - 3);
96+
97+
assertEquals(2147483646, s.nextStreamId(map));
98+
assertEquals(2, s.nextStreamId(map));
99+
assertEquals(4, s.nextStreamId(map));
100+
101+
s = new StreamIdSupplier(Integer.MAX_VALUE - 2);
102+
103+
assertEquals(2147483647, s.nextStreamId(map));
104+
assertEquals(1, s.nextStreamId(map));
105+
assertEquals(3, s.nextStreamId(map));
106+
}
107+
108+
@Test
109+
public void testSkipFound() {
110+
IntObjectMap<Object> map = new SynchronizedIntObjectHashMap<>();
111+
map.put(5, new Object());
112+
map.put(9, new Object());
113+
StreamIdSupplier s = StreamIdSupplier.clientSupplier();
114+
assertEquals(1, s.nextStreamId(map));
115+
assertEquals(3, s.nextStreamId(map));
116+
assertEquals(7, s.nextStreamId(map));
117+
assertEquals(11, s.nextStreamId(map));
118+
}
85119
}

0 commit comments

Comments
 (0)
0