15
15
*/
16
16
package io .reactivesocket ;
17
17
18
- import java .io .IOException ;
19
- import java .util .concurrent .CountDownLatch ;
20
- import java .util .concurrent .atomic .AtomicInteger ;
21
- import java .util .concurrent .atomic .AtomicReference ;
22
- import java .util .function .BiConsumer ;
23
- import java .util .function .Consumer ;
24
-
25
- import org .reactivestreams .Publisher ;
26
- import org .reactivestreams .Subscriber ;
27
- import org .reactivestreams .Subscription ;
28
-
29
18
import io .reactivesocket .internal .Requester ;
30
19
import io .reactivesocket .internal .Responder ;
31
20
import io .reactivesocket .internal .rx .CompositeCompletable ;
34
23
import io .reactivesocket .rx .Disposable ;
35
24
import io .reactivesocket .rx .Observable ;
36
25
import io .reactivesocket .rx .Observer ;
37
- import uk .co .real_logic .agrona .BitUtil ;
26
+ import org .agrona .BitUtil ;
27
+ import org .reactivestreams .Publisher ;
28
+ import org .reactivestreams .Subscriber ;
29
+ import org .reactivestreams .Subscription ;
30
+
31
+ import java .io .IOException ;
32
+ import java .util .concurrent .atomic .AtomicInteger ;
33
+ import java .util .function .Consumer ;
38
34
39
35
import static io .reactivesocket .LeaseGovernor .NULL_LEASE_GOVERNOR ;
40
36
41
37
/**
42
- * Interface for a connection that supports sending requests and receiving responses
43
- *
44
- * Created by servers for connections Created on demand for clients
38
+ * An implementation of {@link ReactiveSocket}
45
39
*/
46
- public class ReactiveSocket implements AutoCloseable {
40
+ public class ReactiveSocketImpl implements ReactiveSocket {
47
41
private static final RequestHandler EMPTY_HANDLER = new RequestHandler .Builder ().build ();
48
42
49
43
private static final Consumer <Throwable > DEFAULT_ERROR_STREAM = t -> {
@@ -62,7 +56,7 @@ public class ReactiveSocket implements AutoCloseable {
62
56
private final ConnectionSetupHandler responderConnectionHandler ;
63
57
private final LeaseGovernor leaseGovernor ;
64
58
65
- private ReactiveSocket (
59
+ private ReactiveSocketImpl (
66
60
DuplexConnection connection ,
67
61
boolean isServer ,
68
62
ConnectionSetupPayload serverRequestorSetupPayload ,
@@ -118,7 +112,7 @@ public static ReactiveSocket fromClientConnection(
118
112
}
119
113
final RequestHandler h = handler != null ? handler : EMPTY_HANDLER ;
120
114
Consumer <Throwable > es = errorStream != null ? errorStream : DEFAULT_ERROR_STREAM ;
121
- return new ReactiveSocket (connection , false , setup , h , null , NULL_LEASE_GOVERNOR , es );
115
+ return new ReactiveSocketImpl (connection , false , setup , h , null , NULL_LEASE_GOVERNOR , es );
122
116
}
123
117
124
118
/**
@@ -178,7 +172,7 @@ public static ReactiveSocket fromServerConnection(
178
172
LeaseGovernor leaseGovernor ,
179
173
Consumer <Throwable > errorConsumer
180
174
) {
181
- return new ReactiveSocket (connection , true , null , null , connectionHandler ,
175
+ return new ReactiveSocketImpl (connection , true , null , null , connectionHandler ,
182
176
leaseGovernor , errorConsumer );
183
177
}
184
178
@@ -192,31 +186,37 @@ public static ReactiveSocket fromServerConnection(
192
186
/**
193
187
* Initiate a request response exchange
194
188
*/
189
+ @ Override
195
190
public Publisher <Payload > requestResponse (final Payload payload ) {
196
191
assertRequester ();
197
192
return requester .requestResponse (payload );
198
193
}
199
194
195
+ @ Override
200
196
public Publisher <Void > fireAndForget (final Payload payload ) {
201
197
assertRequester ();
202
198
return requester .fireAndForget (payload );
203
199
}
204
200
201
+ @ Override
205
202
public Publisher <Payload > requestStream (final Payload payload ) {
206
203
assertRequester ();
207
204
return requester .requestStream (payload );
208
205
}
209
206
207
+ @ Override
210
208
public Publisher <Payload > requestSubscription (final Payload payload ) {
211
209
assertRequester ();
212
210
return requester .requestSubscription (payload );
213
211
}
214
212
213
+ @ Override
215
214
public Publisher <Payload > requestChannel (final Publisher <Payload > payloads ) {
216
215
assertRequester ();
217
216
return requester .requestChannel (payloads );
218
217
}
219
218
219
+ @ Override
220
220
public Publisher <Void > metadataPush (final Payload payload ) {
221
221
assertRequester ();
222
222
return requester .metadataPush (payload );
@@ -239,33 +239,20 @@ private void assertRequester() {
239
239
}
240
240
}
241
241
242
- /**
243
- * Client check for availability to send request based on lease
244
- *
245
- * @return 0.0 to 1.0 indicating availability of sending requests
246
- */
242
+ @ Override
247
243
public double availability () {
248
244
// TODO: can happen in either direction
249
245
assertRequester ();
250
246
return requester .availability ();
251
247
}
252
248
253
- /**
254
- * Server granting new lease information to client
255
- *
256
- * Initial lease semantics are that server waits for periodic granting of leases by server side.
257
- *
258
- * @param ttl
259
- * @param numberOfRequests
260
- */
249
+ @ Override
261
250
public void sendLease (int ttl , int numberOfRequests ) {
262
251
// TODO: can happen in either direction
263
252
responder .sendLease (ttl , numberOfRequests );
264
253
}
265
254
266
- /**
267
- * Start protocol processing on the given DuplexConnection.
268
- */
255
+ @ Override
269
256
public final void start (Completable c ) {
270
257
if (isServer ) {
271
258
responder = Responder .createServerResponder (
@@ -345,20 +332,12 @@ public void error(Throwable e) {
345
332
346
333
private final CompositeCompletable requesterReady = new CompositeCompletable ();
347
334
348
- /**
349
- * Invoked when Requester is ready with success or fail.
350
- *
351
- * @param c
352
- */
335
+ @ Override
353
336
public final void onRequestReady (Completable c ) {
354
337
requesterReady .add (c );
355
338
}
356
339
357
- /**
358
- * Invoked when Requester is ready. Non-null exception if error. Null if success.
359
- *
360
- * @param c
361
- */
340
+ @ Override
362
341
public final void onRequestReady (Consumer <Throwable > c ) {
363
342
requesterReady .add (new Completable () {
364
343
@ Override
@@ -458,36 +437,6 @@ public void addOutput(Frame f, Completable callback) {
458
437
459
438
};
460
439
461
- /**
462
- * Start and block the current thread until startup is finished.
463
- *
464
- * @throws RuntimeException
465
- * of InterruptedException
466
- */
467
- public final void startAndWait () {
468
- CountDownLatch latch = new CountDownLatch (1 );
469
- AtomicReference <Throwable > err = new AtomicReference <>();
470
- start (new Completable () {
471
- @ Override
472
- public void success () {
473
- latch .countDown ();
474
- }
475
-
476
- @ Override
477
- public void error (Throwable e ) {
478
- latch .countDown ();
479
- }
480
- });
481
- try {
482
- latch .await ();
483
- } catch (InterruptedException e ) {
484
- throw new RuntimeException (e );
485
- }
486
- if (err .get () != null ) {
487
- throw new RuntimeException (err .get ());
488
- }
489
- }
490
-
491
440
@ Override
492
441
public void close () throws Exception {
493
442
connection .close ();
@@ -500,6 +449,7 @@ public void close() throws Exception {
500
449
}
501
450
}
502
451
452
+ @ Override
503
453
public void shutdown () {
504
454
try {
505
455
close ();
0 commit comments