8000 null pointer caused by race condition (#406) · chakra-coder/rsocket-java@a2b19af · GitHub
[go: up one dir, main page]

Skip to content

Commit a2b19af

Browse files
authored
null pointer caused by race condition (rsocket#406)
* null pointer caused by the connection handler receiving items before the sendProcessor was created * fixed comment * fixed formating
1 parent f7fa6da commit a2b19af

File tree

2 files changed

+18
-14
lines changed

2 files changed

+18
-14
lines changed

rsocket-core/src/main/java/io/rsocket/RSocketClient.java

Lines changed: 8 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -78,6 +78,8 @@ class RSocketClient implements RSocket {
7878
this.senders = new IntObjectHashMap<>(256, 0.9f);
7979
this.receivers = new IntObjectHashMap<>(256, 0.9f);
8080
this.missedAckCounter = new AtomicInteger();
81+
82+
// DO NOT Change the order here. The Send processor must be subscribed to before receiving connections
8183
this.sendProcessor = EmitterProcessor.create();
8284

8385
if (!Duration.ZERO.equals(tickPeriod)) {
@@ -98,18 +100,18 @@ class RSocketClient implements RSocket {
98100

99101
connection.onClose().doFinally(signalType -> cleanup()).doOnError(errorConsumer).subscribe();
100102

103+
connection
104+
.send(sendProcessor)
105+
.doOnError(this::handleSendProcessorError)
106+
.doFinally(this::handleSendProcessorCancel)
107+
.subscribe();
108+
101109
connection
102110
.receive()
103111
.doOnSubscribe(subscription -> started.onComplete())
104112
.doOnNext(this::handleIncomingFrames)
105113
.doOnError(errorConsumer)
106114
.subscribe();
107-
108-
connection
109-
.send(sendProcessor)
110-
.doOnError(this::handleSendProcessorError)
111-
.doFinally(this::handleSendProcessorCancel)
112-
.subscribe();
113115
}
114116

115117
private void handleSendProcessorError(Throwable t) {

rsocket-core/src/main/java/io/rsocket/RSocketServer.java

Lines changed: 10 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -55,11 +55,19 @@ class RSocketServer implements RSocket {
5555
this.errorConsumer = errorConsumer;
5656
this.sendingSubscriptions = new IntObjectHashMap<>();
5757
this.channelProcessors = new IntObjectHashMap<>();
58-
this.receiveDisposable =
59-
connection.receive().flatMap(this::handleFrame).doOnError(errorConsumer).then().subscribe();
6058

59+
// DO NOT Change the order here. The Send processor must be subscribed to before receiving connections
6160
this.sendProcessor = EmitterProcessor.create();
6261

62+
connection
63+
.send(sendProcessor)
64+
.doOnError(this::handleSendProcessorError)
65+
.doFinally(this::handleSendProcessorCancel)
66+
.subscribe();
67+
68+
this.receiveDisposable =
69+
connection.receive().flatMap(this::handleFrame).doOnError(errorConsumer).then().subscribe();
70+
6371
this.connection
6472
.onClose()
6573
.doOnError(errorConsumer)
@@ -69,12 +77,6 @@ class RSocketServer implements RSocket {
6977
receiveDisposable.dispose();
7078
})
7179
.subscribe();
72-
73-
connection
74-
.send(sendProcessor)
75-
.doOnError(this::handleSendProcessorError)
76-
.doFinally(this::handleSendProcessorCancel)
77-
.subscribe();
7880
}
7981

8082
private void handleSendProcessorError(Throwable t) {

0 commit comments

Comments
 (0)
0