18
18
19
19
import static io .rsocket .keepalive .KeepAliveSupport .KeepAlive ;
20
20
import static io .rsocket .keepalive .KeepAliveSupport .ServerKeepAliveSupport ;
21
+ import static io .rsocket .util .BackpressureUtils .shareRequest ;
21
22
22
23
import io .netty .buffer .ByteBuf ;
23
24
import io .netty .buffer .ByteBufAllocator ;
24
25
import io .netty .util .ReferenceCountUtil ;
25
- import io .netty .util .collection .IntObjectHashMap ;
26
26
import io .rsocket .exceptions .ApplicationErrorException ;
27
27
import io .rsocket .exceptions .ConnectionErrorException ;
28
- import io .rsocket .frame .*;
28
+ import io .rsocket .frame .CancelFrameFlyweight ;
29
+ import io .rsocket .frame .ErrorFrameFlyweight ;
30
+ import io .rsocket .frame .FrameHeaderFlyweight ;
31
+ import io .rsocket .frame .FrameType ;
32
+ import io .rsocket .frame .PayloadFrameFlyweight ;
33
+ import io .rsocket .frame .RequestChannelFrameFlyweight ;
34
+ import io .rsocket .frame .RequestNFrameFlyweight ;
35
+ import io .rsocket .frame .RequestStreamFrameFlyweight ;
29
36
import io .rsocket .frame .decoder .PayloadDecoder ;
30
37
import io .rsocket .internal .LimitableRequestPublisher ;
38
+ import io .rsocket .internal .SynchronizedIntObjectHashMap ;
31
39
import io .rsocket .internal .UnboundedProcessor ;
32
40
import io .rsocket .keepalive .KeepAliveFramesAcceptor ;
33
41
import io .rsocket .keepalive .KeepAliveHandler ;
42
50
import org .reactivestreams .Subscription ;
43
51
import reactor .core .Disposable ;
44
52
import reactor .core .Exceptions ;
45
- import reactor .core .publisher .*;
53
+ import reactor .core .publisher .BaseSubscriber ;
54
+ import reactor .core .publisher .Flux ;
55
+ import reactor .core .publisher .Mono ;
56
+ import reactor .core .publisher .SignalType ;
57
+ import reactor .core .publisher .UnicastProcessor ;
46
58
47
59
/** Server side RSocket. Receives {@link ByteBuf}s from a {@link RSocketClient} */
48
60
class RSocketServer implements ResponderRSocket {
@@ -53,9 +65,10 @@ class RSocketServer implements ResponderRSocket {
53
65
private final PayloadDecoder payloadDecoder ;
54
66
private final Consumer <Throwable > errorConsumer ;
55
67
56
- private final Map <Integer , LimitableRequestPublisher > sendingLimitableSubscriptions ;
57
- private final Map <Integer , Subscription > sendingSubscriptions ;
58
- private final Map <Integer , Processor <Payload , Payload >> channelProcessors ;
68
+ private final SynchronizedIntObjectHashMap <LimitableRequestPublisher >
69
+ sendingLimitableSubscriptions ;
70
+ private final SynchronizedIntObjectHashMap <Subscription > sendingSubscriptions ;
71
+ private final SynchronizedIntObjectHashMap <Processor <Payload , Payload >> channelProcessors ;
59
72
60
73
private final UnboundedProcessor <ByteBuf > sendProcessor ;
61
74
private final ByteBufAllocator allocator ;
@@ -90,16 +103,17 @@ class RSocketServer implements ResponderRSocket {
90
103
91
104
this .payloadDecoder = payloadDecoder ;
92
105
this .errorConsumer = errorConsumer ;
93
- this .sendingLimitableSubscriptions = Collections . synchronizedMap ( new IntObjectHashMap <>() );
94
- this .sendingSubscriptions = Collections . synchronizedMap ( new IntObjectHashMap <>() );
95
- this .channelProcessors = Collections . synchronizedMap ( new IntObjectHashMap <>() );
106
+ this .sendingLimitableSubscriptions = new SynchronizedIntObjectHashMap <>();
107
+ this .sendingSubscriptions = new SynchronizedIntObjectHashMap <>();
108
+ this .channelProcessors = new SynchronizedIntObjectHashMap <>();
96
109
97
110
// DO NOT Change the order here. The Send processor must be subscribed to before receiving
98
111
// connections
99
112
this .sendProcessor = new UnboundedProcessor <>();
100
113
101
- connection
102
- .send (sendProcessor )
114
+ sendProcessor
115
+ .doOnRequest (r -> shareRequest (r , sendingLimitableSubscriptions ))
116
+ .transform (connection ::send )
103
117
.doFinally (this ::handleSendProcessorCancel )
104
118
.subscribe (null , this ::handleSendProcessorError );
105
119
@@ -452,7 +466,7 @@ private void handleStream(int streamId, Flux<Payload> response, int initialReque
452
466
.transform (
453
467
frameFlux -> {
454
468
LimitableRequestPublisher <Payload > payloads =
455
- LimitableRequestPublisher .wrap (frameFlux );
469
+ LimitableRequestPublisher .wrap (frameFlux , sendProcessor . available () );
456
470
sendingLimitableSubscriptions .put (streamId , payloads );
457
471
payloads .request (
458
472
initialRequestN >= Integer .MAX_VALUE ? Long .MAX_VALUE : initialRequestN );
@@ -488,7 +502,8 @@ protected void hookOnError(Throwable throwable) {
488
502
489
503
@ Override
490
504
protected void hookFinally (SignalType type ) {
491
- sendingLimitableSubscriptions .remove (streamId );
505
+ LimitableRequestPublisher subscription =
506
+ sendingLimitableSubscriptions .remove (streamId );
492
507
}
493
508
});
494
509
}
@@ -532,6 +547,17 @@ private void handleCancelFrame(int streamId) {
532
547
}
533
548
534
549
if (subscription != null ) {
550
+ LimitableRequestPublisher limitableSubscription =
551
+ sendingLimitableSubscriptions .remove (streamId );
552
+
553
+ if (limitableSubscription != null ) {
554
+ limitableSubscription .cancel ();
555
+ long requested = limitableSubscription .getInternalRequested ();
556
+ if (requested > 0 ) {
557
+ shareRequest (requested , sendingLimitableSubscriptions );
558
+ }
559
+ }
560
+ } else {
535
561
subscription .cancel ();
536
562
}
537
563
}
0 commit comments