File tree Expand file tree Collapse file tree 2 files changed +18
-14
lines changed
rsocket-core/src/main/java/io/rsocket Expand file tree Collapse file tree 2 files changed +18
-14
lines changed Original file line number Diff line number Diff line change @@ -78,6 +78,8 @@ class RSocketClient implements RSocket {
78
78
this .senders = new IntObjectHashMap <>(256 , 0.9f );
79
79
this .receivers = new IntObjectHashMap <>(256 , 0.9f );
80
80
this .missedAckCounter = new AtomicInteger ();
81
+
82
+ // DO NOT Change the order here. The Send processor must be subscribed to before receiving connections
81
83
this .sendProcessor = EmitterProcessor .create ();
82
84
83
85
if (!Duration .ZERO .equals (tickPeriod )) {
@@ -98,18 +100,18 @@ class RSocketClient implements RSocket {
98
100
99
101
connection .onClose ().doFinally (signalType -> cleanup ()).doOnError (errorConsumer ).subscribe ();
100
102
103
+ connection
104
+ .send (sendProcessor )
105
+ .doOnError (this ::handleSendProcessorError )
106
+ .doFinally (this ::handleSendProcessorCancel )
107
+ .subscribe ();
108
+
101
109
connection
102
110
.receive ()
103
111
.doOnSubscribe (subscription -> started .onComplete ())
104
112
.doOnNext (this ::handleIncomingFrames )
105
113
.doOnError (errorConsumer )
106
114
.subscribe ();
107
-
108
- connection
109
- .send (sendProcessor )
110
- .doOnError (this ::handleSendProcessorError )
111
- .doFinally (this ::handleSendProcessorCancel )
112
- .subscribe ();
113
115
}
114
116
115
117
private void handleSendProcessorError (Throwable t ) {
Original file line number Diff line number Diff line change @@ -55,11 +55,19 @@ class RSocketServer implements RSocket {
55
55
this .errorConsumer = errorConsumer ;
56
56
this .sendingSubscriptions = new IntObjectHashMap <>();
57
57
this .channelProcessors = new IntObjectHashMap <>();
58
- this .receiveDisposable =
59
- connection .receive ().flatMap (this ::handleFrame ).doOnError (errorConsumer ).then ().subscribe ();
60
58
59
+ // DO NOT Change the order here. The Send processor must be subscribed to before receiving connections
61
60
this .sendProcessor = EmitterProcessor .create ();
62
61
62
+ connection
63
+ .send (sendProcessor )
64
+ .doOnError (this ::handleSendProcessorError )
65
+ .doFinally (this ::handleSendProcessorCancel )
66
+ .subscribe ();
67
+
68
+ this .receiveDisposable =
69
+ connection .receive ().flatMap (this ::handleFrame ).doOnError (errorConsumer ).then ().subscribe ();
70
+
63
71
this .connection
64
72
.onClose ()
65
73
.doOnError (errorConsumer )
@@ -69,12 +77,6 @@ class RSocketServer implements RSocket {
69
77
receiveDisposable .dispose ();
70
78
})
71
79
.subscribe ();
72
-
73
- connection
74
- .send (sendProcessor )
75
- .doOnError (this ::handleSendProcessorError )
76
- .doFinally (this ::handleSendProcessorCancel )
77
- .subscribe ();
78
80
}
79
81
80
82
private void handleSendProcessorError (Throwable t ) {
You can’t perform that action at this time.
0 commit comments