10000 reworked backpressure control · FTTF-git/rsocket-java@943047d · GitHub
[go: up one dir, main page]

Skip to content

Commit 943047d

Browse files
committed
reworked backpressure control
Signed-off-by: Oleh Dokuka <shadowgun@i.ua>
1 parent 30d11d1 commit 943047d

File tree

5 files changed

+813
-21
lines changed

5 files changed

+813
-21
lines changed

rsocket-core/src/main/java/io/rsocket/RSocketClient.java

Lines changed: 11 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -18,16 +18,17 @@
1818

1919
import static io.rsocket.keepalive.KeepAliveSupport.ClientKeepAliveSupport;
2020
import static io.rsocket.keepalive.KeepAliveSupport.KeepAlive;
21+
import static io.rsocket.util.BackpressureUtils.shareRequest;
2122

2223
import io.netty.buffer.ByteBuf;
2324
import io.netty.buffer.ByteBufAllocator;
2425
import io.netty.util.ReferenceCountUtil;
25-
import io.netty.util.collection.IntObjectHashMap;
2626
import io.rsocket.exceptions.ConnectionErrorException;
2727
import io.rsocket.exceptions.Exceptions;
2828
import io.rsocket.frame.*;
2929
import io.rsocket.frame.decoder.PayloadDecoder;
3030
import io.rsocket.internal.LimitableRequestPublisher;
31+
import io.rsocket.internal.SynchronizedIntObjectHashMap;
3132
import io.rsocket.internal.UnboundedProcessor;
3233
import io.rsocket.internal.UnicastMonoProcessor;
3334
import io.rsocket.keepalive.KeepAliveFramesAcceptor;
@@ -52,8 +53,8 @@ class RSocketClient implements RSocket {
5253
private final PayloadDecoder payloadDecoder;
5354
private final Consumer<Throwable> errorConsumer;
5455
private final StreamIdSupplier streamIdSupplier;
55-
private final Map<Integer, LimitableRequestPublisher> senders;
56-
private final Map<Integer, Processor<Payload, Payload>> receivers;
56+
private final SynchronizedIntObjectHashMap<Processor<Payload, Payload>> receivers;
57+
private final SynchronizedIntObjectHashMap<LimitableRequestPublisher> senders;
5758
private final UnboundedProcessor<ByteBuf> sendProcessor;
5859
private final Lifecycle lifecycle = new Lifecycle();
5960
private final ByteBufAllocator allocator;
@@ -74,15 +75,17 @@ class RSocketClient implements RSocket {
7475
this.payloadDecoder = payloadDecoder;
7576
this.errorConsumer = errorConsumer;
7677
this.streamIdSupplier = streamIdSupplier;
77-
this.senders = Collections.synchronizedMap(new IntObjectHashMap<>());
78-
this.receivers = Collections.synchronizedMap(new IntObjectHashMap<>());
78+
this.senders = new SynchronizedIntObjectHashMap<>();
79+
this.receivers = new SynchronizedIntObjectHashMap<>();
7980

8081
// DO NOT Change the order here. The Send processor must be subscribed to before receiving
8182
this.sendProcessor = new UnboundedProcessor<>();
8283

8384
connection.onClose().doFinally(signalType -> terminate()).subscribe(null, errorConsumer);
84-
connection
85-
.send(sendProcessor)
85+
86+
sendProcessor
87+
.doOnRequest(r -> shareRequest(r, senders))
88+
.transform(connection::send)
8689
.doFinally(this::handleSendProcessorCancel)
8790
.subscribe(null, this::handleSendProcessorError);
8891

@@ -322,7 +325,7 @@ public void accept(long n) {
322325
.transform(
323326
f -> {
324327
LimitableRequestPublisher<Payload> wrapped =
325-
LimitableRequestPublisher.wrap(f);
328+
LimitableRequestPublisher.wrap(f, sendProcessor.available());
326329
// Need to set this to one for first the frame
327330
wrapped.request(1);
328331
senders.put(streamId, wrapped);

rsocket-core/src/main/java/io/rsocket/RSocketServer.java

Lines changed: 39 additions & 13 deletions
Original file line numberDiff line numberDiff line change
@@ -18,16 +18,24 @@
1818

1919
import static io.rsocket.keepalive.KeepAliveSupport.KeepAlive;
2020
import static io.rsocket.keepalive.KeepAliveSupport.ServerKeepAliveSupport;
21+
import static io.rsocket.util.BackpressureUtils.shareRequest;
2122

2223
import io.netty.buffer.ByteBuf;
2324
import io.netty.buffer.ByteBufAllocator;
2425
import io.netty.util.ReferenceCountUtil;
25-
import io.netty.util.collection.IntObjectHashMap;
2626
import io.rsocket.exceptions.ApplicationErrorException;
2727
import io.rsocket.exceptions.ConnectionErrorException;
28-
import io.rsocket.frame.*;
28+
import io.rsocket.frame.CancelFrameFlyweight;
29+
import io.rsocket.frame.ErrorFrameFlyweight;
30+
import io.rsocket.frame.FrameHeaderFlyweight;
31+
import io.rsocket.frame.FrameType;
32+
import io.rsocket.frame.PayloadFrameFlyweight;
33+
import io.rsocket.frame.RequestChannelFrameFlyweight;
34+
import io.rsocket.frame.RequestNFrameFlyweight;
35+
import io.rsocket.frame.RequestStreamFrameFlyweight;
2936
import io.rsocket.frame.decoder.PayloadDecoder;
3037
import io.rsocket.internal.LimitableRequestPublisher;
38+
import io.rsocket.internal.SynchronizedIntObjectHashMap;
3139
import io.rsocket.internal.UnboundedProcessor;
3240
import io.rsocket.keepalive.KeepAliveFramesAcceptor;
3341
import io.rsocket.keepalive.KeepAliveHandler;
@@ -42,7 +50,11 @@
4250
import org.reactivestreams.Subscription;
4351
import reactor.core.Disposable;
4452
import reactor.core.Exceptions;
45-
import reactor.core.publisher.*;
53+
import reactor.core.publisher.BaseSubscriber;
54+
import reactor.core.publisher.Flux;
55+
import reactor.core.publisher.Mono;
56+
import reactor.core.publisher.SignalType;
57+
import reactor.core.publisher.UnicastProcessor;
4658

4759
/** Server side RSocket. Receives {@link ByteBuf}s from a {@link RSocketClient} */
4860
class RSocketServer implements ResponderRSocket {
@@ -53,9 +65,10 @@ class RSocketServer implements ResponderRSocket {
5365
private final PayloadDecoder payloadDecoder;
5466
private final Consumer<Throwable> errorConsumer;
5567

56-
private final Map<Integer, LimitableRequestPublisher> sendingLimitableSubscriptions;
57-
private final Map<Integer, Subscription> sendingSubscriptions;
58-
private final Map<Integer, Processor<Payload, Payload>> channelProcessors;
68+
private final SynchronizedIntObjectHashMap<LimitableRequestPublisher>
69+
sendingLimitableSubscriptions;
70+
private final SynchronizedIntObjectHashMap<Subscription> sendingSubscriptions;
71+
private final SynchronizedIntObjectHashMap<Processor<Payload, Payload>> channelProcessors;
5972

6073
private final UnboundedProcessor<ByteBuf> sendProcessor;
6174
private final ByteBufAllocator allocator;
@@ -90,16 +103,17 @@ class RSocketServer implements ResponderRSocket {
90103

91104
this.payloadDecoder = payloadDecoder;
92105
this.errorConsumer = errorConsumer;
93-
this.sendingLimitableSubscriptions = Collections.synchronizedMap(new IntObjectHashMap<>());
94-
this.sendingSubscriptions = Collections.synchronizedMap(new IntObjectHashMap<>());
95-
this.channelProcessors = Collections.synchronizedMap(new IntObjectHashMap<>());
106+
this.sendingLimitableSubscriptions = new SynchronizedIntObjectHashMap<>();
107+
this.sendingSubscriptions = new SynchronizedIntObjectHashMap<>();
108+
this.channelProcessors = new SynchronizedIntObjectHashMap<>();
96109

97110
// DO NOT Change the order here. The Send processor must be subscribed to before receiving
98111
// connections
99112
this.sendProcessor = new UnboundedProcessor<>();
100113

101-
connection
102-
.send(sendProcessor)
114+
sendProcessor
115+
.doOnRequest(r -> shareRequest(r, sendingLimitableSubscriptions))
116+
.transform(connection::send)
103117
.doFinally(this::handleSendProcessorCancel)
104118
.subscribe(null, this::handleSendProcessorError);
105119

@@ -452,7 +466,7 @@ private void handleStream(int streamId, Flux<Payload> response, int initialReque
452466
.transform(
453467
frameFlux -> {
454468
LimitableRequestPublisher<Payload> payloads =
455-
LimitableRequestPublisher.wrap(frameFlux);
469+
LimitableRequestPublisher.wrap(frameFlux, sendProcessor.available());
456470
sendingLimitableSubscriptions.put(streamId, payloads);
457471
payloads.request(
458472
initialRequestN >= Integer.MAX_VALUE ? Long.MAX_VALUE : initialRequestN);
@@ -488,7 +502,8 @@ protected void hookOnError(Throwable throwable) {
488502

489503
@Override
490504
protected void hookFinally(SignalType type) {
491-
sendingLimitableSubscriptions.remove(streamId);
505+
LimitableRequestPublisher subscription =
506+
sendingLimitableSubscriptions.remove(streamId);
492507
}
493508
});
494509
}
@@ -532,6 +547,17 @@ private void handleCancelFrame(int streamId) {
532547
}
533548

534549
if (subscription != null) {
550+
LimitableRequestPublisher limitableSubscription =
551+
sendingLimitableSubscriptions.remove(streamId);
552+
553+
if (limitableSubscription != null) {
554+
limitableSubscription.cancel();
555+
long requested = limitableSubscription.getInternalRequested();
556+
if (requested > 0) {
557+
shareRequest(requested, sendingLimitableSubscriptions);
558+
}
559+
}
560+
} else {
535561
subscription.cancel();
536562
}
537563
}

rsocket-core/src/main/java/io/rsocket/internal/LimitableRequestPublisher.java

Lines changed: 4 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -88,6 +88,10 @@ public void increaseInternalLimit(long n) {
8888
requestN();
8989
}
9090

91+
public synchronized long getInternalRequested() {
92+
return internalRequested;
93+
}
94+
9195
@Override
9296
public void request(long n) {
9397
synchronized (this) {

0 commit comments

Comments
 (0)
0