8000 added reactivesocket to the signature of the connection handler so th… · MStart/reactivesocket-java@f8d0ea1 · GitHub
[go: up one dir, main page]

Skip to content

Commit f8d0ea1

Browse files
committed
added reactivesocket to the signature of the connection handler so that server can initiate requests/replays to the client
1 parent b80cf2e commit f8d0ea1

File tree

9 files changed

+93
-56
lines changed

9 files changed

+93
-56
lines changed

src/main/java/io/reactivesocket/ConnectionSetupHandler.java

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -18,5 +18,5 @@
1818
import io.reactivesocket.exceptions.SetupException;
1919

2020
public interface ConnectionSetupHandler {
21-
public RequestHandler apply(ConnectionSetupPayload setupPayload) throws SetupException; // yeah, a checked exception
21+
RequestHandler apply(ConnectionSetupPayload setupPayload, ReactiveSocket reactiveSocket) throws SetupException; // yeah, a checked exception
2222
}

src/main/java/io/reactivesocket/DefaultReactiveSocket.java

Lines changed: 4 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -285,7 +285,8 @@ public void error(Throwable e) {
285285
two
286286
);
287287
two.success(); // now that the reference is assigned in case of synchronous setup
288-
});
288+
},
289+
this);
289290
} else {
290291
Completable both = new Completable() {
291292
// wait for 2 success, or 1 error to pass on
@@ -325,7 +326,8 @@ public void error(Throwable e) {
325326
clientRequestHandler,
326327
leaseGovernor,
327328
errorStream,
328-
both
329+
both,
330+
this
329331
);
330332
}
331333
}

src/main/java/io/reactivesocket/internal/Responder.java

Lines changed: 12 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -22,6 +22,7 @@
2222
import io.reactivesocket.FrameType;
2323
import io.reactivesocket.LeaseGovernor;
2424
import io.reactivesocket.Payload;
25+
import io.reactivesocket.ReactiveSocket;
2526
import io.reactivesocket.RequestHandler;
2627
import io.reactivesocket.exceptions.InvalidSetupException;
2728
import io.reactivesocket.exceptions.RejectedException;
@@ -95,11 +96,12 @@ public static <T> Responder createServerResponder(
9596
LeaseGovernor leaseGovernor,
9697
Consumer<Throwable> errorStream,
9798
Completable responderCompletable,
98-
Consumer<ConnectionSetupPayload> setupCallback
99+
Consumer<ConnectionSetupPayload> setupCallback,
100+
ReactiveSocket reactiveSocket
99101
) {
100102
Responder responder = new Responder(true, connection, connectionHandler, null,
101103
leaseGovernor, errorStream, setupCallback);
102-
responder.start(responderCompletable);
104+
responder.start(responderCompletable, reactiveSocket);
103105
return responder;
104106
}
105107

@@ -108,22 +110,24 @@ public static <T> Responder createServerResponder(
108110
ConnectionSetupHandler connectionHandler,
109111
LeaseGovernor leaseGovernor,
110112
Consumer<Throwable> errorStream,
111-
Completable responderCompletable
113+
Completable responderCompletable,
114+
ReactiveSocket reactiveSocket
112115
) {
113116
return createServerResponder(connection, connectionHandler, leaseGovernor,
114-
errorStream, responderCompletable, s -> {});
117+
errorStream, responderCompletable, s -> {}, reactiveSocket);
115118
}
116119

117120
public static <T> Responder createClientResponder(
118121
DuplexConnection connection,
119122
RequestHandler requestHandler,
120123
LeaseGovernor leaseGovernor,
121124
Consumer<Throwable> errorStream,
122-
Completable responderCompletable
125+
Completable responderCompletable,
126+
ReactiveSocket reactiveSocket
123127
) {
124128
Responder responder = new Responder(false, connection, null, requestHandler,
125129
leaseGovernor, errorStream, s -> {});
126-
responder.start(responderCompletable);
130+
responder.start(responderCompletable, reactiveSocket);
127131
return responder;
128132
}
129133

@@ -156,7 +160,7 @@ public long timeOfLastKeepalive() {
156160
return timeOfLastKeepalive;
157161
}
158162

159-
private void start(final Completable responderCompletable) {
163+
private void start(final Completable responderCompletable, ReactiveSocket reactiveSocket) {
160164
/* state of cancellation subjects during connection */
161165
final Int2ObjectHashMap<Subscription> cancellationSubscriptions = new Int2ObjectHashMap<>();
162166
/* streams in flight that can receive REQUEST_N messages */
@@ -208,7 +212,7 @@ public void onNext(Frame requestFrame) {
208212
// accept setup for ReactiveSocket/Requester usage
209213
setupCallback.accept(connectionSetupPayload);
210214
// handle setup
211-
requestHandler = connectionHandler.apply(connectionSetupPayload);
215+
requestHandler = connectionHandler.apply(connectionSetupPayload, reactiveSocket);
212216
} catch (SetupException setupException) {
213217
setupErrorAndTearDown(connection, setupException);
214218
} catch (Throwable e) {

src/perf/java/io/reactivesocket/ReactiveSocketPerf.java

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -132,7 +132,7 @@ public Publisher<Void> handleMetadataPush(Payload payload)
132132
}
133133
};
134134

135-
final static ReactiveSocket serverSocket = DefaultReactiveSocket.fromServerConnection(serverConnection, setupFrame -> handler);
135+
final static ReactiveSocket serverSocket = DefaultReactiveSocket.fromServerConnection(serverConnection, (setup, rs) -> handler);
136136

137137
final static ReactiveSocket client =
138138
DefaultReactiveSocket.fromClientConnection(

src/test/java/io/reactivesocket/LeaseTest.java

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -81,7 +81,7 @@ public void setup() throws InterruptedException {
8181
leaseGovernor = new TestingLeaseGovernor();
8282

8383
socketServer = DefaultReactiveSocket.fromServerConnection(
84-
serverConnection, setup -> new RequestHandler() {
84+
serverConnection, (setup, rs) -> new RequestHandler() {
8585

8686
@Override
8787
public Publisher<Payload> handleRequestResponse(Payload payload) {

src/test/java/io/reactivesocket/ReactiveSocketTest.java

Lines changed: 24 additions & 16 deletions
Original file line numberDiff line numberDiff line change
@@ -15,16 +15,10 @@
1515
*/
1616
package io.reactivesocket;
1717

18-
import static io.reactivesocket.ConnectionSetupPayload.*;
19-
import static io.reactivesocket.TestUtil.*;
20-
import static io.reactivex.Observable.*;
21-
import static org.junit.Assert.*;
22-
23-
import java.util.concurrent.CountDownLatch;
24-
import java.util.concurrent.TimeUnit;
25-
import java.util.concurrent.atomic.AtomicBoolean;
26-
import java.util.concurrent.atomic.AtomicReference;
27-
18+
import io.reactivesocket.lease.FairLeaseGovernor;
19+
import io.reactivex.disposables.Disposable;
20+
import io.reactivex.observables.ConnectableObservable;
21+
import io.reactivex.subscribers.TestSubscriber;
2822
import org.junit.After;
2923
import org.junit.Before;
3024
import org.junit.Test;
@@ -33,14 +27,28 @@
3327
import org.junit.experimental.theories.Theory;
3428
import org.junit.runner.RunWith;
3529
import org.reactivestreams.Publisher;
36-
37-
import io.reactivesocket.lease.FairLeaseGovernor;
38-
import io.reactivex.disposables.Disposable;
39-
import io.reactivex.observables.ConnectableObservable;
40-
import io.reactivex.subscribers.TestSubscriber;
4130< 10000 /td>
import org.reactivestreams.Subscriber;
4231
import org.reactivestreams.Subscription;
4332

33+
import java.util.concurrent.CountDownLatch;
34+
import java.util.concurrent.TimeUnit;
35+
import java.util.concurrent.atomic.AtomicBoolean;
36+
import java.util.concurrent.atomic.AtomicReference;
37+
38+
import static io.reactivesocket.ConnectionSetupPayload.HONOR_LEASE;
39+
import static io.reactivesocket.ConnectionSetupPayload.NO_FLAGS;
40+
import static io.reactivesocket.TestUtil.byteToString;
41+
import static io.reactivesocket.TestUtil.utf8EncodedPayload;
42+
import static io.reactivex.Observable.empty;
43+
import static io.reactivex.Observable.error;
44+
import static io.reactivex.Observable.fromPublisher;
45+
import static io.reactivex.Observable.interval;
46+
import static io.reactivex.Observable.just;
47+
import static io.reactivex.Observable.range;
48+
import static org.junit.Assert.assertEquals;
49+
import static org.junit.Assert.assertFalse;
50+
import static org.junit.Assert.assertTrue;
51+
4452
@RunWith(Theories.class)
4553
public class ReactiveSocketTest {
4654

@@ -64,7 +72,7 @@ public void setup() {
6472
fireAndForgetOrMetadataPush = new CountDownLatch(1);
6573
lastServerErrorCountDown = new CountDownLatch(1);
6674

67-
socketServer = DefaultReactiveSocket.fromServerConnection(serverConnection, setup -> new RequestHandler() {
75+
socketServer = DefaultReactiveSocket.fromServerConnection(serverConnection, (setup,rs) -> new RequestHandler() {
6876

6977
@Override
7078
public Publisher<Payload> handleRequestResponse(Payload payload) {

src/test/java/io/reactivesocket/TestFlowControlRequestN.java

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -315,7 +315,7 @@ public static void setup() throws InterruptedException {
315315
clientConnection.connectToServerConnection(serverConnection, false);
316316

317317

318-
socketServer = DefaultReactiveSocket.fromServerConnection(serverConnection, setup -> new RequestHandler() {
318+
socketServer = DefaultReactiveSocket.fromServerConnection(serverConnection, (setup,rs) -> new RequestHandler() {
319319

320320
@Override
321321
public Publisher<Payload> handleRequestStream(Payload payload) {

src/test/java/io/reactivesocket/TestTransportRequestN.java

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -172,7 +172,7 @@ public void setup(TestConnectionWithControlledRequestN clientConnection, TestCon
172172
clientConnection.connectToServerConnection(serverConnection, false);
173173
lastServerErrorCountDown = new CountDownLatch(1);
174174

175-
socketServer = DefaultReactiveSocket.fromServerConnection(serverConnection, setup -> new RequestHandler() {
175+
socketServer = DefaultReactiveSocket.fromServerConnection(serverConnection, (setup,rs) -> new RequestHandler() {
176176

177177
@Override
178178
public Publisher<Payload> handleRequestResponse(Payload payload) {

src/test/java/io/reactivesocket/internal/ResponderTest.java

Lines changed: 48 additions & 25 deletions
Original file line numberDiff line numberDiff line change
@@ -15,40 +15,57 @@
1515
*/
1616
package io.reactivesocket.internal;
1717

18-
import static io.reactivesocket.TestUtil.*;
19-
20-
import io.reactivesocket.*;
21-
import org.junit.Test;
22-
import org.reactivestreams.Subscription;
23-
24-
import static io.reactivesocket.LeaseGovernor.NULL_LEASE_GOVERNOR;
18+
import io.reactivesocket.Frame;
19+
import io.reactivesocket.FrameType;
20+
import io.reactivesocket.LatchedCompletable;
21+
import io.reactivesocket.Payload;
22+
import io.reactivesocket.ReactiveSocket;
23+
import io.reactivesocket.RequestHandler;
24+
import io.reactivesocket.TestConnection;
2525
import io.reactivex.Observable;
2626
import io.reactivex.schedulers.Schedulers;
2727
import io.reactivex.schedulers.TestScheduler;
2828
import io.reactivex.subjects.ReplaySubject;
29+
import org.junit.Test;
30+
import org.mockito.Mockito;
31+
import org.reactivestreams.Subscription;
2932

3033
import java.util.List;
3134
import java.util.concurrent.TimeUnit;
3235
import java.util.concurrent.atomic.AtomicBoolean;
3336
import java.util.function.Consumer;
3437

38+
import static io.reactivesocket.LeaseGovernor.NULL_LEASE_GOVERNOR;
39+
import static io.reactivesocket.TestUtil.byteToString;
40+
import static io.reactivesocket.TestUtil.utf8EncodedPayload;
41+
import static io.reactivesocket.TestUtil.utf8EncodedRequestFrame;
42+
import static io.reactivex.Observable.error;
43+
import static io.reactivex.Observable.interval;
44+
import static io.reactivex.Observable.just;
45+
import static io.reactivex.Observable.never;
46+
import static io.reactivex.Observable.range;
3547
import static org.junit.Assert.assertEquals;
3648
import static org.junit.Assert.assertFalse;
3749
import static org.junit.Assert.assertTrue;
38-
import static io.reactivex.Observable.*;
3950

4051
public class ResponderTest
4152
{
4253
final static Consumer<Throwable> ERROR_HANDLER = Throwable::printStackTrace;
4354

4455
@Test(timeout=2000)
4556
public void testRequestResponseSuccess() throws InterruptedException {
46-
TestConnection conn = establishConnection();
57+
ReactiveSocket reactiveSocket = Mockito.mock(ReactiveSocket.class);
58+
TestConnection conn = establishConnection();
4759
LatchedCompletable lc = new LatchedCompletable(1);
48-
Responder.createServerResponder(conn, setup -> new RequestHandler.Builder()
49-
.withRequestResponse(request ->
50-
just(utf8EncodedPayload(byteToString(request.getData()) + " world", null))).build(),
51-
NULL_LEASE_GOVERNOR, ERROR_HANDLER, lc);
60+
Responder.createServerResponder(conn,
61+
(setup, rs) ->
62+
new RequestHandler.Builder().withRequestResponse(
63+
request ->
64+
just(utf8EncodedPayload(byteToString(request.getData()) + " world", null))).build(),
65+
NULL_LEASE_GOVERNOR,
66+
ERROR_HANDLER,
67+
lc,
68+
reactiveSocket);
5269
lc.await();
5370

5471
ReplaySubject<Frame> cachedResponses = captureResponses(conn);
@@ -69,11 +86,12 @@ public void testRequestResponseSuccess() throws InterruptedException {
6986

7087
@Test(timeout=2000)
7188
public void testRequestResponseError() throws InterruptedException {
89+
ReactiveSocket reactiveSocket = Mockito.mock(ReactiveSocket.class);
7290
TestConnection conn = establishConnection();
7391
LatchedCompletable lc = new LatchedCompletable(1);
74-
Responder.createServerResponder(conn, setup -> new RequestHandler.Builder()
92+
Responder.createServerResponder(conn, (setup, rs) -> new RequestHandler.Builder()
7593
.withRequestResponse(request -> Observable.<Payload>error(new Exception("Request Not Found"))).build(),
76-
NULL_LEASE_GOVERNOR, ERROR_HANDLER, lc);
94+
NULL_LEASE_GOVERNOR, ERROR_HANDLER, lc, reactiveSocket);
7795
lc.await();
7896

7997
Observable<Frame> cachedResponses = captureResponses(conn);
@@ -91,16 +109,17 @@ public void testRequestResponseError() throws InterruptedException {
91109

92110
@Test(timeout=2000)
93111
public void testRequestResponseCancel() throws InterruptedException {
112+
ReactiveSocket reactiveSocket = Mockito.mock(ReactiveSocket.class);
94113
AtomicBoolean unsubscribed = new AtomicBoolean();
95114
Observable<Payload> delayed = never()
96115
.cast(Payload.class)
97116
.doOnCancel(() -> unsubscribed.set(true));
98117

99118
TestConnection conn = establishConnection();
100119
LatchedCompletable lc = new LatchedCompletable(1);
101-
Responder.createServerResponder(conn, setup -> new RequestHandler.Builder()
120+
Responder.createServerResponder(conn, (setup, rs) -> new RequestHandler.Builder()
102121
.withRequestResponse(request -> delayed).build(),
103-
NULL_LEASE_GOVERNOR, ERROR_HANDLER, lc);
122+
NULL_LEASE_GOVERNOR, ERROR_HANDLER, lc, reactiveSocket);
104123
lc.await();
105124

106125
ReplaySubject<Frame> cachedResponses = captureResponses(conn);
@@ -118,12 +137,13 @@ public void testRequestResponseCancel() throws InterruptedException {
118137

119138
@Test(timeout=2000)
120139
public void testRequestStreamSuccess() throws InterruptedException {
140+
ReactiveSocket reactiveSocket = Mockito.mock(ReactiveSocket.class);
121141
TestConnection conn = establishConnection();
122142
LatchedCompletable lc = new LatchedCompletable(1);
123-
Responder.createServerResponder(conn, setup -> new RequestHandler.Builder()
143+
Responder.createServerResponder(conn, (setup, rs) -> new RequestHandler.Builder()
124144
.withRequestStream(
125145
request -> range(Integer.parseInt(byteToString(request.getData())), 10).map(i -> utf8EncodedPayload(i + "!", null))).build(),
126-
NULL_LEASE_GOVERNOR, ERROR_HANDLER, lc);
146+
NULL_LEASE_GOVERNOR, ERROR_HANDLER, lc, reactiveSocket);
127147
lc.await();
128148

129149
ReplaySubject<Frame> cachedResponses = captureResponses(conn);
@@ -151,13 +171,14 @@ public void testRequestStreamSuccess() throws InterruptedException {
151171

152172
@Test(timeout=2000)
153173
public void testRequestStreamError() throws InterruptedException {
174+
ReactiveSocket reactiveSocket = Mockito.mock(ReactiveSocket.class);
154175
TestConnection conn = establishConnection();
155176
LatchedCompletable lc = new LatchedCompletable(1);
156-
Responder.createServerResponder(conn, setup -> new RequestHandler.Builder()
177+
Responder.createServerResponder(conn, (setup,rs) -> new RequestHandler.Builder()
157178
.withRequestStream(request -> range(Integer.parseInt(byteToString(request.getData())), 3)
158179
.map(i -> utf8EncodedPayload(i + "!", null))
159180
.concatWith(error(new Exception("Error Occurred!")))).build(),
160-
NULL_LEASE_GOVERNOR, ERROR_HANDLER, lc);
181+
NULL_LEASE_GOVERNOR, ERROR_HANDLER, lc, reactiveSocket);
161182
lc.await();
162183

163184
ReplaySubject<Frame> cachedResponses = captureResponses(conn);
@@ -185,12 +206,13 @@ public void testRequestStreamError() throws InterruptedException {
185206

186207
@Test(timeout=2000)
187208
public void testRequestStreamCancel() throws InterruptedException {
209+
ReactiveSocket reactiveSocket = Mockito.mock(ReactiveSocket.class);
188210
TestConnection conn = establishConnection();
189211
TestScheduler ts = Schedulers.test();
190212
LatchedCompletable lc = new LatchedCompletable(1);
191-
Responder.createServerResponder(conn, setup -> new RequestHandler.Builder()
213+
Responder.createServerResponder(conn, (setup,rs) -> new RequestHandler.Builder()
192214
.withRequestStream(request -> interval(1000, TimeUnit.MILLISECONDS, ts).map(i -> utf8EncodedPayload(i + "!", null))).build(),
193-
NULL_LEASE_GOVERNOR, ERROR_HANDLER, lc);
215+
NULL_LEASE_GOVERNOR, ERROR_HANDLER, lc, reactiveSocket);
194216
lc.await();
195217

196218
ReplaySubject<Frame> cachedResponses = captureResponses(conn);
@@ -226,12 +248,13 @@ public void testRequestStreamCancel() throws InterruptedException {
226248

227249
@Test(timeout=2000)
228250
public void testMultiplexedStreams() throws InterruptedException {
251+
ReactiveSocket reactiveSocket = Mockito.mock(ReactiveSocket.class);
229252
TestScheduler ts = Schedulers.test();
230253
TestConnection conn = establishConnection();
231254
LatchedCompletable lc = new LatchedCompletable(1);
232-
Responder.createServerResponder(conn, setup -> new RequestHandler.Builder()
255+
Responder.createServerResponder(conn, (setup,rs) -> new RequestHandler.Builder()
233256
.withRequestStream(request -> interval(1000, TimeUnit.MILLISECONDS, ts).map(i -> utf8EncodedPayload(i + "_" + byteToString(request.getData()), null))).build(),
234-
NULL_LEASE_GOVERNOR, ERROR_HANDLER, lc);
257+
NULL_LEASE_GOVERNOR, ERROR_HANDLER, lc, reactiveSocket);
235258
lc.await();
236259

237260
ReplaySubject<Frame> cachedResponses = captureResponses(conn);

0 commit comments

Comments
 (0)
0