8000 Fix bug when process OOMs due to retained LimitableRequestPublishers … · FzNl/rsocket-java@71915d4 · GitHub
[go: up one dir, main page]

Skip to content

Commit 71915d4

Browse files
mostroverkhovrobertroeser
authored andcommitted
Fix bug when process OOMs due to retained LimitableRequestPublishers in RSocketServer (rsocket#638)
* remove stream subscription on cancel as hookFinally() callback is executed only on BaseSubscriber.cancel() Signed-off-by: Maksym Ostroverkhov <m.ostroverkhov@gmail.com> * workaround reactor-netty bug when connections are not closed on DisposableChannel close Signed-off-by: Maksym Ostroverkhov <m.ostroverkhov@gmail.com> * responder stream/channel: calculate initialRequestN before applying handler Signed-off-by: Maksym Ostroverkhov <m.ostroverkhov@gmail.com> * disable LimitableRequestPublisher transport request coordination Signed-off-by: Maksym Ostroverkhov <m.ostroverkhov@gmail.com> * update version Signed-off-by: Maksym Ostroverkhov <m.ostroverkhov@gmail.com> * disable rsocket-transport request coordination tests Signed-off-by: Maksym Ostroverkhov <m.ostroverkhov@gmail.com> * add LimitableRequestPublisher.wrap(Flux) remove rsocket-transport request coordination tests Signed-off-by: Maksym Ostroverkhov <m.ostroverkhov@gmail.com> * use correct RSocket version Signed-off-by: Maksym Ostroverkhov <m.ostroverkhov@gmail.com> * Revert "workaround reactor-netty bug when connections are not closed on DisposableChannel close" This reverts commit 67bea79 Signed-off-by: Maksym Ostroverkhov <m.ostroverkhov@gmail.com>
1 parent 6b706f9 commit 71915d4

File tree

5 files changed

+17
-145
lines changed

5 files changed

+17
-145
lines changed

rsocket-core/src/main/java/io/rsocket/RSocketClient.java

Lines changed: 3 additions & 10 deletions
Original file line numberDiff line numberDiff line change
@@ -81,15 +81,8 @@ class RSocketClient implements RSocket {
8181
this.sendProcessor = new UnboundedProcessor<>();
8282

8383
connection.onClose().doFinally(signalType -> terminate()).subscribe(null, errorConsumer);
84-
85-
sendProcessor
86-
.doOnRequest(
87-
r -> {
88-
for (LimitableRequestPublisher lrp : senders.values()) {
89-
lrp.increaseInternalLimit(r);
90-
}
91-
})
92-
.transform(connection::send)
84+
connection
85+
.send(sendProcessor)
9386
.doFinally(this::handleSendProcessorCancel)
9487
.subscribe(null, this::handleSendProcessorError);
9588

@@ -329,7 +322,7 @@ public void accept(long n) {
329322
.transform(
330323
f -> {
331324
LimitableRequestPublisher<Payload> wrapped =
332-
LimitableRequestPublisher.wrap(f, sendProcessor.available());
325+
LimitableRequestPublisher.wrap(f);
333326
// Need to set this to one for first the frame
334327
wrapped.request(1);
335328
senders.put(streamId, wrapped);

rsocket-core/src/main/java/io/rsocket/RSocketServer.java

Lines changed: 10 additions & 18 deletions
Original file line numberDiff line numberDiff line change
@@ -97,14 +97,8 @@ class RSocketServer implements ResponderRSocket {
9797
// connections
9898
this.sendProcessor = new UnboundedProcessor<>();
9999

100-
sendProcessor
101-
.doOnRequest(
102-
r -> {
103-
for (LimitableRequestPublisher lrp : sendingLimitableSubscriptions.values()) {
104-
lrp.increaseInternalLimit(r);
105-
}
106-
})
107-
.transform(connection::send)
100+
connection
101+
.send(sendProcessor)
108102
.doFinally(this::handleSendProcessorCancel)
109103
.subscribe(null, this::handleSendProcessorError);
110104

@@ -322,16 +316,14 @@ private void handleFrame(ByteBuf frame) {
322316
handleRequestN(streamId, frame);
323317
break;
324318
case REQUEST_STREAM:
325-
handleStream(
326-
streamId,
327-
requestStream(payloadDecoder.apply(frame)),
328-
RequestStreamFrameFlyweight.initialRequestN(frame));
319+
int streamInitialRequestN = RequestStreamFrameFlyweight.initialRequestN(frame);
320+
Payload streamPayload = payloadDecoder.apply(frame);
321+
handleStream(streamId, requestStream(streamPayload), streamInitialRequestN);
329322
break;
330323
case REQUEST_CHANNEL:
331-
handleChannel(
332-
streamId,
333-
payloadDecoder.apply(frame),
334-
RequestChannelFrameFlyweight.initialRequestN(frame));
324+
int channelInitialRequestN = RequestChannelFrameFlyweight.initialRequestN(frame);
325+
Payload channelPayload = payloadDecoder.apply(frame);
326+
handleChannel(streamId, channelPayload, channelInitialRequestN);
335327
break;
336328
case METADATA_PUSH:
337329
metadataPush(payloadDecoder.apply(frame));
@@ -459,7 +451,7 @@ private void handleStream(int streamId, Flux<Payload> response, int initialReque
459451
.transform(
460452
frameFlux -> {
461453
LimitableRequestPublisher<Payload> payloads =
462-
LimitableRequestPublisher.wrap(frameFlux, sendProcessor.available());
454+
LimitableRequestPublisher.wrap(frameFlux);
463455
sendingLimitableSubscriptions.put(streamId, payloads);
464456
payloads.request(
465457
initialRequestN >= Integer.MAX_VALUE ? Long.MAX_VALUE : initialRequestN);
@@ -535,7 +527,7 @@ private void handleCancelFrame(int streamId) {
535527
Subscription subscription = sendingSubscriptions.remove(streamId);
536528

537529
if (subscription == null) {
538-
subscription = sendingLimitableSubscriptions.get(streamId);
530+
subscription = sendingLimitableSubscriptions.remove(streamId);
539531
}
540532

541533
if (subscription != null) {

rsocket-core/src/main/java/io/rsocket/internal/LimitableRequestPublisher.java

Lines changed: 4 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -56,6 +56,10 @@ public static <T> LimitableRequestPublisher<T> wrap(Publisher<T> source, long pr
5656
return new LimitableRequestPublisher<>(source, prefetch);
5757
}
5858

59+
public static <T> LimitableRequestPublisher<T> wrap(Publisher<T> source) {
60+
return wrap(source, Long.MAX_VALUE);
61+
}
62+
5963
@Override
6064
public void subscribe(CoreSubscriber<? super T> destination) {
6165
synchronized (this) {

rsocket-core/src/test/java/io/rsocket/RSocketClientTest.java

Lines changed: 0 additions & 29 deletions
Original file line numberDiff line numberDiff line change
@@ -28,15 +28,12 @@
2828
import io.rsocket.exceptions.ApplicationErrorException;
2929
import io.rsocket.exceptions.RejectedSetupException;
3030
import io.rsocket.frame.*;
31-
import io.rsocket.test.util.TestDuplexConnection;
3231
import io.rsocket.test.util.TestSubscriber;
3332
import io.rsocket.util.DefaultPayload;
3433
import io.rsocket.util.EmptyPayload;
3534
import java.time.Duration;
3635
import java.util.ArrayList;
3736
import java.util.List;
38-
import java.util.Queue;
39-
import java.util.concurrent.ConcurrentLinkedQueue;
4037
import java.util.stream.Collectors;
4138
import org.assertj.core.api.Assertions;
4239
import org.junit.Rule;
@@ -214,32 +211,6 @@ public void testChannelRequestServerSideCancellation() {
214211
Assertions.assertThat(request.isDisposed()).isTrue();
215212
}
216213

217-
@Test(timeout = 2_000)
218-
@SuppressWarnings("unchecked")
219-
public void
220-
testClientSideRequestChannelShouldNotHangInfinitelySendingElementsAndShouldProduceDataValuingConnectionBackpressure() {
221-
final Queue<Long> requests = new ConcurrentLinkedQueue<>();
222-
rule.connection.dispose();
223-
rule.connection = new TestDuplexConnection();
224-
rule.connection.setInitialSendRequestN(256);
225-
rule.init();
226-
227-
rule.socket
228-
.requestChannel(
229-
Flux.<Payload>generate(s -> s.next(EmptyPayload.INSTANCE)).doOnRequest(requests::add))
230-
.subscribe();
231-
232-
int streamId = rule.getStreamIdForRequestType(REQUEST_CHANNEL);
233-
234-
assertThat("Unexpected error.", rule.errors, is(empty()));
235-
236-
rule.connection.addToReceivedBuffer(
237-
RequestNFrameFlyweight.encode(ByteBufAllocator.DEFAULT, streamId, 2));
238-
rule.connection.addToReceivedBuffer(
239-
RequestNFrameFlyweight.encode(ByteBufAllocator.DEFAULT, streamId, Integer.MAX_VALUE));
240-
Assertions.assertThat(requests).containsOnly(1L, 2L, 253L);
241-
}
242-
243214
public int sendRequestResponse(Publisher<Payload> response) {
244215
Subscriber<Payload> sub = TestSubscriber.create();
245216
response.subscribe(sub);

rsocket-core/src/test/java/io/rsocket/RSocketServerTest.java

Lines changed: 0 additions & 88 deletions
Original file line numberDiff line numberDiff line change
@@ -29,17 +29,12 @@
2929
import io.rsocket.util.DefaultPayload;
3030
import io.rsocket.util.EmptyPayload;
3131
import java.util.Collection;
32-
import java.util.Queue;
3332
import java.util.concurrent.ConcurrentLinkedQueue;
3433
import java.util.concurrent.atomic.AtomicBoolean;
35-
import org.assertj.core.api.Assertions;
3634
import org.junit.Ignore;
3735
import org.junit.Rule;
3836
import org.junit.Test;
39-
import org.mockito.Mockito;
40-
import org.reactivestreams.Publisher;
4137
import org.reactivestreams.Subscriber;
42-
import reactor.core.publisher.Flux;
4338
import reactor.core.publisher.Mono;
4439

4540
public class RSocketServerTest {
@@ -111,89 +106,6 @@ public Mono<Payload> requestResponse(Payload payload) {
111106
assertThat("Subscription not cancelled.", cancelled.get(), is(true));
112107
}
113108

114-
@Test(timeout = 2_000)
115-
@SuppressWarnings("unchecked")
116-
public void
117-
testServerSideRequestStreamShouldNotHangInfinitelySendingElementsAndShouldProduceDataValuingConnectionBackpressure() {
118-
final int streamId = 5;
119-
final Queue<Object> received = new ConcurrentLinkedQueue<>();
120-
final Queue<Long> requests = new ConcurrentLinkedQueue<>();
121-
122-
rule.setAcceptingSocket(
123-
new AbstractRSocket() {
124-
@Override
125-
public Flux<Payload> requestStream(Payload payload) {
126-
return Flux.<Payload>generate(s -> s.next(payload.retain())).doOnRequest(requests::add);
127-
}
128-
},
129-
256);
130-
131-
rule.sendRequest(streamId, FrameType.REQUEST_STREAM);
132-
133-
assertThat("Unexpected error.", rule.errors, is(empty()));
134-
135-
Subscriber next = rule.connection.getSendSubscribers().iterator().next();
136-
137-
Mockito.doAnswer(
138-
invocation -> {
139-
received.add(invocation.getArgument(0));
140-
141-
if (received.size() == 256) {
142-
throw new RuntimeException();
143-
}
144-
145-
return null;
146-
})
147-
.when(next)
148-
.onNext(Mockito.any());
149-
150-
rule.connection.addToReceivedBuffer(
151-
RequestNFrameFlyweight.encode(ByteBufAllocator.DEFAULT, streamId, Integer.MAX_VALUE));
152-
Assertions.assertThat(requests).containsOnly(1L, 2L, 253L);
153-
}
154-
155-
@Test(timeout = 2_000)
156-
@SuppressWarnings("unchecked")
157-
public void
158-
testServerSideRequestChannelShouldNotHangInfinitelySendingElementsAndShouldProduceDataValuingConnectionBackpressure() {
159-
final int streamId = 5;
160-
final Queue<Object> received = new ConcurrentLinkedQueue<>();
161-
final Queue<Long> requests = new ConcurrentLinkedQueue<>();
162-
163-
rule.setAcceptingSocket(
164-
new AbstractRSocket() {
165-
@Override
166-
public Flux<Payload> requestChannel(Publisher<Payload> payload) {
167-
return Flux.<Payload>generate(s -> s.next(EmptyPayload.INSTANCE))
168-
.doOnRequest(requests::add);
169-
}
170-
},
171-
256);
172-
173-
rule.sendRequest(streamId, FrameType.REQUEST_CHANNEL);
174-
175-
assertThat("Unexpected error.", rule.errors, is(empty()));
176-
177-
Subscriber next = rule.connection.getSendSubscribers().iterator().next();
178-
179-
Mockito.doAnswer(
180-
invocation -> {
181-
received.add(invocation.getArgument(0));
182-
183-
if (received.size() == 256) {
184-
throw new RuntimeException();
185-
}
186-
187-
return null;
188-
})
189-
.when(next)
190-
.onNext(Mockito.any());
191-
192-
rule.connection.addToReceivedBuffer(
193-
RequestNFrameFlyweight.encode(ByteBufAllocator.DEFAULT, streamId, Integer.MAX_VALUE));
194-
Assertions.assertThat(requests).containsOnly(1L, 2L, 253L);
195-
}
196-
197109
public static class ServerSocketRule extends AbstractSocketRule<RSocketServer> {
198110

199111
private RSocket acceptingSocket;

0 commit comments

Comments
 (0)
0