15
15
*/
16
16
package io .reactivesocket .internal ;
17
17
18
- import static io .reactivesocket .TestUtil .* ;
19
-
20
- import io .reactivesocket .* ;
21
- import org . junit . Test ;
22
- import org . reactivestreams . Subscription ;
23
-
24
- import static io .reactivesocket .LeaseGovernor . NULL_LEASE_GOVERNOR ;
18
+ import io .reactivesocket .Frame ;
19
+ import io . reactivesocket . FrameType ;
20
+ import io .reactivesocket .LatchedCompletable ;
21
+ import io . reactivesocket . Payload ;
22
+ import io . reactivesocket . ReactiveSocket ;
23
+ import io . reactivesocket . RequestHandler ;
24
+ import io .reactivesocket .TestConnection ;
25
25
import io .reactivex .Observable ;
26
26
import io .reactivex .schedulers .Schedulers ;
27
27
import io .reactivex .schedulers .TestScheduler ;
28
28
import io .reactivex .subjects .ReplaySubject ;
29
+ import org .junit .Test ;
30
+ import org .mockito .Mockito ;
31
+ import org .reactivestreams .Subscription ;
29
32
30
33
import java .util .List ;
31
34
import java .util .concurrent .TimeUnit ;
32
35
import java .util .concurrent .atomic .AtomicBoolean ;
33
36
import java .util .function .Consumer ;
34
37
38
+ import static io .reactivesocket .LeaseGovernor .NULL_LEASE_GOVERNOR ;
39
+ import static io .reactivesocket .TestUtil .byteToString ;
40
+ import static io .reactivesocket .TestUtil .utf8EncodedPayload ;
41
+ import static io .reactivesocket .TestUtil .utf8EncodedRequestFrame ;
42
+ import static io .reactivex .Observable .error ;
43
+ import static io .reactivex .Observable .interval ;
44
+ import static io .reactivex .Observable .just ;
45
+ import static io .reactivex .Observable .never ;
46
+ import static io .reactivex .Observable .range ;
35
47
import static org .junit .Assert .assertEquals ;
36
48
import static org .junit .Assert .assertFalse ;
37
49
import static org .junit .Assert .assertTrue ;
38
- import static io .reactivex .Observable .*;
39
50
40
51
public class ResponderTest
41
52
{
42
53
final static Consumer <Throwable > ERROR_HANDLER = Throwable ::printStackTrace ;
43
54
44
55
@ Test (timeout =2000 )
45
56
public void testRequestResponseSuccess () throws InterruptedException {
46
- TestConnection conn = establishConnection ();
57
+ ReactiveSocket reactiveSocket = Mockito .mock (ReactiveSocket .class );
58
+ TestConnection conn = establishConnection ();
47
59
LatchedCompletable lc = new LatchedCompletable (1 );
48
- Responder .createServerResponder (conn , setup -> new RequestHandler .Builder ()
49
- .withRequestResponse (request ->
50
- just (utf8EncodedPayload (byteToString (request .getData ()) + " world" , null ))).build (),
51
- NULL_LEASE_GOVERNOR , ERROR_HANDLER , lc );
60
+ Responder .createServerResponder (conn ,
61
+ (setup , rs ) ->
62
+ new RequestHandler .Builder ().withRequestResponse (
63
+ request ->
64
+ just (utf8EncodedPayload (byteToString (request .getData ()) + " world" , null ))).build (),
65
+ NULL_LEASE_GOVERNOR ,
66
+ ERROR_HANDLER ,
67
+ lc ,
68
+ reactiveSocket );
52
69
lc .await ();
53
70
54
71
ReplaySubject <Frame > cachedResponses = captureResponses (conn );
@@ -69,11 +86,12 @@ public void testRequestResponseSuccess() throws InterruptedException {
69
86
70
87
@ Test (timeout =2000 )
71
88
public void testRequestResponseError () throws InterruptedException {
89
+ ReactiveSocket reactiveSocket = Mockito .mock (ReactiveSocket .class );
72
90
TestConnection conn = establishConnection ();
73
91
LatchedCompletable lc = new LatchedCompletable (1 );
74
- Responder .createServerResponder (conn , setup -> new RequestHandler .Builder ()
92
+ Responder .createServerResponder (conn , ( setup , rs ) -> new RequestHandler .Builder ()
75
93
.withRequestResponse (request -> Observable .<Payload >error (new Exception ("Request Not Found" ))).build (),
76
- NULL_LEASE_GOVERNOR , ERROR_HANDLER , lc );
94
+ NULL_LEASE_GOVERNOR , ERROR_HANDLER , lc , reactiveSocket );
77
95
lc .await ();
78
96
79
97
Observable <Frame > cachedResponses = captureResponses (conn );
@@ -91,16 +109,17 @@ public void testRequestResponseError() throws InterruptedException {
91
109
92
110
@ Test (timeout =2000 )
93
111
public void testRequestResponseCancel () throws InterruptedException {
112
+ ReactiveSocket reactiveSocket = Mockito .mock (ReactiveSocket .class );
94
113
AtomicBoolean unsubscribed = new AtomicBoolean ();
95
114
Observable <Payload > delayed = never ()
96
115
.cast (Payload .class )
97
116
.doOnCancel (() -> unsubscribed .set (true ));
98
117
99
118
TestConnection conn = establishConnection ();
100
119
LatchedCompletable lc = new LatchedCompletable (1 );
101
- Responder .createServerResponder (conn , setup -> new RequestHandler .Builder ()
120
+ Responder .createServerResponder (conn , ( setup , rs ) -> new RequestHandler .Builder ()
102
121
.withRequestResponse (request -> delayed ).build (),
103
- NULL_LEASE_GOVERNOR , ERROR_HANDLER , lc );
122
+ NULL_LEASE_GOVERNOR , ERROR_HANDLER , lc , reactiveSocket );
104
123
lc .await ();
105
124
106
125
ReplaySubject <Frame > cachedResponses = captureResponses (conn );
@@ -118,12 +137,13 @@ public void testRequestResponseCancel() throws InterruptedException {
118
137
119
138
@ Test (timeout =2000 )
120
139
public void testRequestStreamSuccess () throws InterruptedException {
140
+ ReactiveSocket reactiveSocket = Mockito .mock (ReactiveSocket .class );
121
141
TestConnection conn = establishConnection ();
122
142
LatchedCompletable lc = new LatchedCompletable (1 );
123
- Responder .createServerResponder (conn , setup -> new RequestHandler .Builder ()
143
+ Responder .createServerResponder (conn , ( setup , rs ) -> new RequestHandler .Builder ()
124
144
.withRequestStream (
125
145
request -> range (Integer .parseInt (byteToString (request .getData ())), 10 ).map (i -> utf8EncodedPayload (i + "!" , null ))).build (),
126
- NULL_LEASE_GOVERNOR , ERROR_HANDLER , lc );
146
+ NULL_LEASE_GOVERNOR , ERROR_HANDLER , lc , reactiveSocket );
127
147
lc .await ();
128
148
129
149
ReplaySubject <Frame > cachedResponses = captureResponses (conn );
@@ -151,13 +171,14 @@ public void testRequestStreamSuccess() throws InterruptedException {
151
171
152
172
@ Test (timeout =2000 )
153
173
public void testRequestStreamError () throws InterruptedException {
174
+ ReactiveSocket reactiveSocket = Mockito .mock (ReactiveSocket .class );
154
175
TestConnection conn = establishConnection ();
155
176
LatchedCompletable lc = new LatchedCompletable (1 );
156
- Responder .createServerResponder (conn , setup -> new RequestHandler .Builder ()
177
+ Responder .createServerResponder (conn , ( setup , rs ) -> new RequestHandler .Builder ()
157
178
.withRequestStream (request -> range (Integer .parseInt (byteToString (request .getData ())), 3 )
158
179
.map (i -> utf8EncodedPayload (i + "!" , null ))
159
180
.concatWith (error (new Exception ("Error Occurred!" )))).build (),
160
- NULL_LEASE_GOVERNOR , ERROR_HANDLER , lc );
181
+ NULL_LEASE_GOVERNOR , ERROR_HANDLER , lc , reactiveSocket );
161
182
lc .await ();
162
183
163
184
ReplaySubject <Frame > cachedResponses = captureResponses (conn );
@@ -185,12 +206,13 @@ public void testRequestStreamError() throws InterruptedException {
185
206
186
207
@ Test (timeout =2000 )
187
208
public void testRequestStreamCancel () throws InterruptedException {
209
+ ReactiveSocket reactiveSocket = Mockito .mock (ReactiveSocket .class );
188
210
TestConnection conn = establishConnection ();
189
211
TestScheduler ts = Schedulers .test ();
190
212
LatchedCompletable lc = new LatchedCompletable (1 );
191
- Responder .createServerResponder (conn , setup -> new RequestHandler .Builder ()
213
+ Responder .createServerResponder (conn , ( setup , rs ) -> new RequestHandler .Builder ()
192
214
.withRequestStream (request -> interval (1000 , TimeUnit .MILLISECONDS , ts ).map (i -> utf8EncodedPayload (i + "!" , null ))).build (),
193
- NULL_LEASE_GOVERNOR , ERROR_HANDLER , lc );
215
+ NULL_LEASE_GOVERNOR , ERROR_HANDLER , lc , reactiveSocket );
194
216
lc .await ();
195
217
196
218
ReplaySubject <Frame > cachedResponses = captureResponses (conn );
@@ -226,12 +248,13 @@ public void testRequestStreamCancel() throws InterruptedException {
226
248
227
249
@ Test (timeout =2000 )
228
250
public void testMultiplexedStreams () throws InterruptedException {
251
+ ReactiveSocket reactiveSocket = Mockito .mock (ReactiveSocket .class );
229
252
TestScheduler ts = Schedulers .test ();
230
253
TestConnection conn = establishConnection ();
231
254
LatchedCompletable lc = new LatchedCompletable (1 );
232
- Responder .createServerResponder (conn , setup -> new RequestHandler .Builder ()
255
+ Responder .createServerResponder (conn , ( setup , rs ) -> new RequestHandler .Builder ()
233
256
.withRequestStream (request -> interval (1000 , TimeUnit .MILLISECONDS , ts ).map (i -> utf8EncodedPayload (i + "_" + byteToString (request .getData ()), null ))).build (),
234
- NULL_LEASE_GOVERNOR , ERROR_HANDLER , lc );
257
+ NULL_LEASE_GOVERNOR , ERROR_HANDLER , lc , reactiveSocket );
235
258
lc .await ();
236
259
237
260
ReplaySubject <Frame > cachedResponses = captureResponses (conn );
0 commit comments