8000 Fix leaks and issues related to fragmentation (#645) · FTTF-git/rsocket-java@c0c3e90 · GitHub
[go: up one dir, main page]

Skip to content

Commit c0c3e90

Browse files
mostroverkhovrobertroeser
authored andcommitted
Fix leaks and issues related to fragmentation (rsocket#645)
* fragmentation: server closeable Mono signals error if mtu is not sufficient client closeable Mono signals error if mtu is not sufficient, before opening a connection Signed-off-by: Maksym Ostroverkhov <m.ostroverkhov@gmail.com> * fix memory leaks on frame reassembly Signed-off-by: Maksym Ostroverkhov <m.ostroverkhov@gmail.com> * fix memory leak on frame logging in ClientServerInputMultiplexer Signed-off-by: Maksym Ostroverkhov <m.ostroverkhov@gmail.com> * rename RSocketFactory netty transport integration test Signed-off-by: Maksym Ostroverkhov <m.ostroverkhov@gmail.com> * fix stream and channel response having empty data on some (payload size/fragmentation mtu) combinations Signed-off-by: Maksym Ostroverkhov <m.ostroverkhov@gmail.com> * remove debug code Signed-off-by: Maksym Ostroverkhov <m.ostroverkhov@gmail.com> * workaround javac 11.0.3 issue Signed-off-by: Maksym Ostroverkhov <m.ostroverkhov@gmail.com>
1 parent a8f5a75 commit c0c3e90

File tree

11 files changed

+270
-106
lines changed

11 files changed

+270
-106
lines changed

rsocket-core/src/main/java/io/rsocket/fragmentation/FragmentationDuplexConnection.java

Lines changed: 33 additions & 9 deletions
Original file line numberDiff line numberDiff line change
@@ -26,6 +26,7 @@
2626
import io.rsocket.frame.FrameLengthFlyweight;
2727
import io.rsocket.frame.FrameType;
2828
import java.util.Objects;
29+
import javax.annotation.Nullable;
2930
import org.reactivestreams.Publisher;
3031
import org.slf4j.Logger;
3132
import org.slf4j.LoggerFactory;
@@ -57,13 +58,10 @@ public FragmentationDuplexConnection(
5758
String type) {
5859
Objects.requireNonNull(delegate, "delegate must not be null");
5960
Objects.requireNonNull(allocator, "byteBufAllocator must not be null");
60-
if (mtu < MIN_MTU_SIZE) {
61-
throw new IllegalArgumentException("smallest allowed mtu size is " + MIN_MTU_SIZE + " bytes");
62-
}
6361
this.encodeLength = encodeLength;
6462
this.allocator = allocator;
6563
this.delegate = delegate;
66-
this.mtu = mtu;
64+
this.mtu = assertMtu(mtu);
6765
this.frameReassembler = new FrameReassembler(allocator);
6866
this.type = type;
6967

@@ -74,6 +72,32 @@ private boolean shouldFragment(FrameType frameType, int readableBytes) {
7472
return frameType.isFragmentable() && readableBytes > mtu;
7573
}
7674

75+
/*TODO this is nullable and not returning empty to workaround javac 11.0.3 compiler issue on ubuntu (at least) */
76+
@Nullable
77+
public static <T> Mono<T> checkMtu(int mtu) {
78+
if (isInsufficientMtu(mtu)) {
79+
String msg =
80+
String.format("smallest allowed mtu size is %d bytes, provided: %d", MIN_MTU_SIZE, mtu);
81+
return Mono.error(new IllegalArgumentException(msg));
82+
} else {
83+
return null;
84+
}
85+
}
86+
87+
private static int assertMtu(int mtu) {
88+
if (isInsufficientMtu(mtu)) {
89+
String msg =
90+
String.format("smallest allowed mtu size is %d bytes, provided: %d", MIN_MTU_SIZE, mtu);
91+
throw new IllegalArgumentException(msg);
92+
} else {
93+
return mtu;
94+
}
95+
}
96+
97+
private static boolean isInsufficientMtu(int mtu) {
98+
return mtu > 0 && mtu < MIN_MTU_SIZE || mtu < 0;
99+
}
100+
77101
@Override
78102
public Mono<Void> send(Publisher<ByteBuf> frames) {
79103
return Flux.from(frames).concatMap(this::sendOne).then();
@@ -89,13 +113,13 @@ public Mono<Void> sendOne(ByteBuf frame) {
89113
Flux.from(fragmentFrame(allocator, mtu, frame, frameType, encodeLength))
90114
.doOnNext(
91115
byteBuf -> {
92-
ByteBuf frame1 = FrameLengthFlyweight.frame(byteBuf);
116+
ByteBuf f = encodeLength ? FrameLengthFlyweight.frame(byteBuf) : byteBuf;
93117
logger.debug(
94118
"{} - stream id {} - frame type {} - \n {}",
95119
type,
96-
FrameHeaderFlyweight.streamId(frame1),
97-
FrameHeaderFlyweight.frameType(frame1),
98-
ByteBufUtil.prettyHexDump(frame1));
120+
FrameHeaderFlyweight.streamId(f),
121+
FrameHeaderFlyweight.frameType(f),
122+
ByteBufUtil.prettyHexDump(f));
99123
}));
100124
} else {
101125
return delegate.send(
@@ -108,7 +132,7 @@ public Mono<Void> sendOne(ByteBuf frame) {
108132

109133
private ByteBuf encode(ByteBuf frame) {
110134
if (encodeLength) {
111-
return FrameLengthFlyweight.encode(allocator, frame.readableBytes(), frame).retain();
135+
return FrameLengthFlyweight.encode(allocator, frame.readableBytes(), frame);
112136
} else {
113137
return frame;
114138
}

rsocket-core/src/main/java/io/rsocket/fragmentation/FrameReassembler.java

Lines changed: 7 additions & 11 deletions
Original file line numberDiff line numberDiff line change
@@ -151,6 +151,7 @@ void handleNoFollowsFlag(ByteBuf frame, SynchronousSink<ByteBuf> sink, int strea
151151
ByteBuf assembledFrame = FragmentationFlyweight.encode(allocator, header, data);
152152
sink.next(assembledFrame);
153153
}
154+
frame.release();
154155
} else {
155156
sink.next(frame);
156157
}
@@ -220,9 +221,8 @@ void handleFollowsFlag(ByteBuf frame, int streamId, FrameType frameType) {
220221
throw new IllegalStateException("unsupported fragment type");
221222
}
222223

223-
if (data != Unpooled.EMPTY_BUFFER) {
224-
getData(streamId).addComponents(true, data);
225-
}
224+
getData(streamId).addComponents(true, data);
225+
frame.release();
226226
}
227227

228228
void reassembleFrame(ByteBuf frame, SynchronousSink<ByteBuf> sink) {
@@ -259,25 +259,21 @@ private ByteBuf assembleFrameWithMetadata(ByteBuf frame, int streamId, ByteBuf h
259259
ByteBuf metadata;
260260
CompositeByteBuf cm = removeMetadata(streamId);
261261
if (cm != null) {
262-
ByteBuf m = PayloadFrameFlyweight.metadata(frame);
263-
metadata = cm.addComponents(true, m);
262+
metadata = cm.addComponents(true, PayloadFrameFlyweight.metadata(frame).retain());
264263
} else {
265-
metadata = PayloadFrameFlyweight.metadata(frame);
264+
metadata = PayloadFrameFlyweight.metadata(frame).retain();
266265
}
267266

268267
ByteBuf data = assembleData(frame, streamId);
269268

270-
return FragmentationFlyweight.encode(allocator, header, metadata.retain(), data);
269+
return FragmentationFlyweight.encode(allocator, header, metadata, data);
271270
}
272271

273272
private ByteBuf assembleData(ByteBuf frame, int streamId) {
274273
ByteBuf data;
275274
CompositeByteBuf cd = removeData(streamId);
276275
if (cd != null) {
277-
ByteBuf d = PayloadFrameFlyweight.data(frame);
278-
if (d != null) {
279-
cd.addComponents(true, d);
280-
}
276+
cd.addComponents(true, PayloadFrameFlyweight.data(frame).retain());
281277
data = cd;
282278
} else {
283279
data = Unpooled.EMPTY_BUFFER;

rsocket-core/src/main/java/io/rsocket/frame/FrameUtil.java

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -70,7 +70,7 @@ private static ByteBuf getMetadata(ByteBuf frame, FrameType frameType) {
7070
default:
7171
return Unpooled.EMPTY_BUFFER;
7272
}
73-
return metadata.retain();
73+
return metadata;
7474
} else {
7575
return Unpooled.EMPTY_BUFFER;
7676
}

rsocket-core/src/test/java/io/rsocket/fragmentation/FragmentationDuplexConnectionTest.java

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -64,15 +64,15 @@ void constructorInvalidMaxFragmentSize() {
6464
() ->
6565
new FragmentationDuplexConnection(
6666
delegate, allocator, Integer.MIN_VALUE, false, ""))
67-
.withMessage("smallest allowed mtu size is 64 bytes");
67+
.withMessage("smallest allowed mtu size is 64 bytes, provided: -2147483648");
6868
}
6969

7070
@DisplayName("constructor throws IllegalArgumentException with negative maxFragmentLength")
7171
@Test
7272
void constructorMtuLessThanMin() {
7373
assertThatIllegalArgumentException()
7474
.isThrownBy(() -> new FragmentationDuplexConnection(delegate, allocator, 2, false, ""))
75-
.withMessage("smallest allowed mtu size is 64 bytes");
75+
.withMessage("smallest allowed mtu size is 64 bytes, provided: 2");
7676
}
7777

7878
@DisplayName("constructor throws NullPointerException with null byteBufAllocator")

rsocket-transport-local/src/main/java/io/rsocket/transport/local/LocalClientTransport.java

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -76,7 +76,8 @@ private Mono<DuplexConnection> connect() {
7676

7777
@Override
7878
public Mono<DuplexConnection> connect(int mtu) {
79-
Mono<DuplexConnection> connect = connect();
79+
Mono<DuplexConnection> isError = FragmentationDuplexConnection.checkMtu(mtu);
80+
Mono<DuplexConnection> connect = isError != null ? isError : connect();
8081
if (mtu > 0) {
8182
return connect.map(
8283
duplexConnection ->

rsocket-transport-local/src/main/java/io/rsocket/transport/local/LocalServerTransport.java

Lines changed: 14 additions & 11 deletions
Original file line numberDiff line numberDiff line change
@@ -107,17 +107,20 @@ public LocalClientTransport clientTransport() {
107107
public Mono<Closeable> start(ConnectionAcceptor acceptor, int mtu) {
108108
Objects.requireNonNull(acceptor, "acceptor must not be null");
109109

110-
return Mono.create(
111-
sink -> {
112-
ServerDuplexConnectionAcceptor serverDuplexConnectionAcceptor =
113-
new ServerDuplexConnectionAcceptor(name, acceptor, mtu);
114-
115-
if (registry.putIfAbsent(name, serverDuplexConnectionAcceptor) != null) {
116-
throw new IllegalStateException("name already registered: " + name);
117-
}
118-
119-
sink.success(serverDuplexConnectionAcceptor);
120-
});
110+
Mono<Closeable> isError = FragmentationDuplexConnection.checkMtu(mtu);
111+
return isError != null
112+
? isError
113+
: Mono.create(
114+
sink -> {
115+
ServerDuplexConnectionAcceptor serverDuplexConnectionAcceptor =
116+
new ServerDuplexConnectionAcceptor(name, acceptor, mtu);
117+
118+
if (registry.putIfAbsent(name, serverDuplexConnectionAcceptor) != null) {
119+
throw new IllegalStateException("name already registered: " + name);
120+
}
121+
122+
sink.success(serverDuplexConnectionAcceptor);
123+
});
121124
}
122125

123126
/**

rsocket-transport-netty/src/main/java/io/rsocket/transport/netty/client/TcpClientTransport.java

Lines changed: 19 additions & 16 deletions
Original file line numberDiff line numberDiff line change
@@ -94,21 +94,24 @@ public static TcpClientTransport create(TcpClient client) {
9494

9595
@Override
9696
public Mono<DuplexConnection> connect(int mtu) {
97-
return client
98-
.doOnConnected(c -> c.addHandlerLast(new RSocketLengthCodec()))
99-
.connect()
100-
.map(
101-
c -> {
102-
if (mtu > 0) {
103-
return new FragmentationDuplexConnection(
104-
new TcpDuplexConnection(c, false),
105-
ByteBufAllocator.DEFAULT,
106-
mtu,
107-
true,
108-
"client");
109-
} else {
110-
return new TcpDuplexConnection(c);
111-
}
112-
});
97+
Mono<DuplexConnection> isError = FragmentationDuplexConnection.checkMtu(mtu);
98+
return isError != null
99+
? isError
100+
: client
101+
.doOnConnected(c -> c.addHandlerLast(new RSocketLengthCodec()))
102+
.connect()
103+
.map(
104+
c -> {
105+
if (mtu > 0) {
106+
return new FragmentationDuplexConnection(
107+
new TcpDuplexConnection(c, false),
108+
ByteBufAllocator.DEFAULT,
109+
mtu,
110+
true,
111+
"client");
112+
} else {
113+
return new TcpDuplexConnection(c);
114+
}
115+
});
113116
}
114117
}

rsocket-transport-netty/src/main/java/io/rsocket/transport/netty/client/WebsocketClientTransport.java

Lines changed: 18 additions & 15 deletions
Original file line numberDiff line numberDiff line change
@@ -151,21 +151,24 @@ private static TcpClient createClient(URI uri) {
151151

152152
@Override
153153
public Mono<DuplexConnection> connect(int mtu) {
154-
return client
155-
.headers(headers -> transportHeaders.get().forEach(headers::set))
156-
.websocket(FRAME_LENGTH_MASK)
157-
.uri(path)
158-
.connect()
159-
.map(
160-
c -> {
161-
DuplexConnection connection = new WebsocketDuplexConnection(c);
162-
if (mtu > 0) {
163-
connection =
164-
new FragmentationDuplexConnection(
165-
connection, ByteBufAllocator.DEFAULT, mtu, false, "client");
166-
}
167-
return connection;
168-
});
154+
Mono<DuplexConnection> isError = FragmentationDuplexConnection.checkMtu(mtu);
155+
return isError != null
156+
? isError
157+
: client
158+
.headers(headers -> transportHeaders.get().forEach(headers::set))
159+
.websocket(FRAME_LENGTH_MASK)
160+
.uri(path)
161+
.connect()
162+
.map(
163+
c -> {
164+
DuplexConnection connection = new WebsocketDuplexConnection(c);
165+
if (mtu > 0) {
166+
connection =
167+
new FragmentationDuplexConnection(
168+
connection, ByteBufAllocator.DEFAULT, mtu, false, "client");
169+
}
170+
return connection;
171+
});
169172
}
170173

171174
@Override

rsocket-transport-netty/src/main/java/io/rsocket/transport/netty/server/TcpServerTransport.java

Lines changed: 26 additions & 21 deletions
Original file line numberDiff line numberDiff line change
@@ -94,26 +94,31 @@ public static TcpServerTransport create(TcpServer server) {
9494
@Override
9595
public Mono<CloseableChannel> start(ConnectionAcceptor acceptor, int mtu) {
9696
Objects.requireNonNull(acceptor, "acceptor must not be null");
97-
98-
return server
99-
.doOnConnection(
100-
c -> {
101-
c.addHandlerLast(new RSocketLengthCodec());
102-
DuplexConnection connection;
103-
if (mtu > 0) {
104-
connection =
105-
new FragmentationDuplexConnection(
106-
new TcpDuplexConnection(c, false),
107-
ByteBufAllocator.DEFAULT,
108-
mtu,
109-
true,
110-
"server");
111-
} else {
112-
connection = new TcpDuplexConnection(c);
113-
}
114-
acceptor.apply(connection).then(Mono.<Void>never()).subscribe(c.disposeSubscriber());
115-
})
116-
.bind()
117-
.map(CloseableChannel::new);
97+
Mono<CloseableChannel> isError = FragmentationDuplexConnection.checkMtu(mtu);
98+
return isError != null
99+
? isError
100+
: server
101+
.doOnConnection(
102+
c -> {
103+
c.addHandlerLast(new RSocketLengthCodec());
104+
DuplexConnection connection;
105+
if (mtu > 0) {
106+
connection =
107+
new FragmentationDuplexConnection(
108+
new TcpDuplexConnection(c, false),
109+
ByteBufAllocator.DEFAULT,
110+
mtu,
111+
true,
112+
"server");
113+
} else {
114+
connection = new TcpDuplexConnection(c);
115+
}
116+
acceptor
117+
.apply(connection)
118+
.then(Mono.<Void>never())
119+
.subscribe(c.disposeSubscriber());
120+
})
121+
.bind()
122+
.map(CloseableChannel::new);
118123
}
119124
}

rsocket-transport-netty/src/main/java/io/rsocket/transport/netty/server/WebsocketServerTransport.java

Lines changed: 23 additions & 19 deletions
Original file line numberDiff line numberDiff line change
@@ -111,24 +111,28 @@ public void setTransportHeaders(Supplier<Map<String, String>> transportHeaders)
111111
public Mono<CloseableChannel> start(ConnectionAcceptor acceptor, int mtu) {
112112
Objects.requireNonNull(acceptor, "acceptor must not be null");
113113

114-
return server
115-
.handle(
116-
(request, response) -> {
117-
transportHeaders.get().forEach(response::addHeader);
118-
return response.sendWebsocket(
119-
null,
120-
FRAME_LENGTH_MASK,
121-
(in, out) -> {
122-
DuplexConnection connection = new WebsocketDuplexConnection((Connection) in);
123-
if (mtu > 0) {
124-
connection =
125-
new FragmentationDuplexConnection(
126-
connection, ByteBufAllocator.DEFAULT, mtu, false, "server");
127-
}
128-
return acceptor.apply(connection).then(out.neverComplete());
129-
});
130-
})
131-
.bind()
132-
.map(CloseableChannel::new);
114+
Mono<CloseableChannel> isError = FragmentationDuplexConnection.checkMtu(mtu);
115+
return isError != null
116+
? isError
117+
: server
118+
.handle(
119+
(request, response) -> {
120+
transportHeaders.get().forEach(response::addHeader);
121+
return response.sendWebsocket(
122+
null,
123+
FRAME_LENGTH_MASK,
124+
(in, out) -> {
125+
DuplexConnection connection =
126+
new WebsocketDuplexConnection((Connection) in);
127+
if (mtu > 0) {
128+
connection =
129+
new FragmentationDuplexConnection(
130+
connection, ByteBufAllocator.DEFAULT, mtu, false, "server");
131+
}
132+
return acceptor.apply(connection).then(out.neverComplete());
133+
});
134+
})
135+
.bind()
136+
.map(CloseableChannel::new);
133137
}
134138
}

0 commit comments

Comments
 (0)
0