26
26
import io .rsocket .tckdrivers .common .ParseMarble ;
27
27
import io .rsocket .tckdrivers .common .TckIndividualTest ;
28
28
import io .rsocket .tckdrivers .common .Tuple ;
29
- import io .rsocket .transport .ClientTransport ;
30
29
import io .rsocket .uri .UriTransportRegistry ;
31
30
import io .rsocket .util .PayloadImpl ;
32
31
import java .io .BufferedReader ;
40
39
import java .util .Map ;
41
40
import java .util .concurrent .CountDownLatch ;
42
41
import java .util .concurrent .atomic .AtomicReference ;
43
- import java .util .function .Supplier ;
44
42
import org .reactivestreams .Publisher ;
45
43
import org .reactivestreams .Subscriber ;
46
44
import org .reactivestreams .Subscription ;
@@ -56,26 +54,18 @@ public class JavaClientDriver {
56
54
private final Map <String , MySubscriber <Payload >> payloadSubscribers ;
57
55
private final Map <String , MySubscriber <Void >> fnfSubscribers ;
58
56
private final Map <String , String > idToType ;
59
- private Supplier <RSocket > createClient ;
60
57
private final String uri ;
58
+ private final Map <String , RSocket > clientMap ;
61
59
private final String AGENT = "[CLIENT]" ;
62
60
private ConsoleUtils consoleUtils = new ConsoleUtils (AGENT );
63
61
64
62
public JavaClientDriver (String uri ) throws FileNotFoundException {
65
63
this .payloadSubscribers = new HashMap <>();
66
64
this .fnfSubscribers = new HashMap <>();
67
65
this .idToType = new HashMap <>();
68
- this .uri = uri ;
69
-
70
- // creating a client object to run the test
71
- try {
72
-
73
- RSocket client = createClient (this .uri );
74
- this .createClient = () -> client ;
75
66
76
- } catch (Exception e ) {
77
- e .printStackTrace ();
78
- }
67
+ this .clientMap = new HashMap <>();
68
+ this .uri = uri ;
79
69
}
80
70
81
71
public enum TestResult {
@@ -89,10 +79,12 @@ public enum TestResult {
89
79
*
90
80
* @return a RSocket
91
81
*/
92
- public RSocket createClient (String uri ) {
93
- return RSocketFactory .connect ().transport (UriTransportRegistry .clientForUri (uri )).start ().block ();
82
+ public RSocket createClient () {
83
+ return RSocketFactory .connect ()
84
+ .transport (UriTransportRegistry .clientForUri (uri ))
85
+ .start ()
86
+ .block ();
94
87
}
95
-
96
88
/**
97
89
* Parses through the commands for each test, and calls handlers that execute the commands.
98
90
*
@@ -115,7 +107,7 @@ public void runTest(List<String> test, String name) throws Exception {
115
107
switch (args [1 ]) {
116
108
case "subscribe" :
117
109
handleSubscribe (args );
118
- id .add (args [3 ]);
110
+ id .add (args [0 ] + args [ 3 ]);
119
111
break ;
120
112
case "channel" :
121
113
channelTest = true ;
@@ -187,6 +179,22 @@ public void runTest(List<String> test, String name) throws Exception {
187
179
assertTrue ("There is no subscriber in this test" , (channelTest ) || (id .size () > 0 ));
188
180
}
189
181
182
+ /**
183
+ * A function that do a look up in the clientMap hashtable. If entry does not exist, it creates
184
+ * one.
185
+ *
186
+ * @param id
187
+ * @return a RSocket
188
+ */
189
+ private RSocket getClient (String id ) {
190
+ RSocket client = clientMap .get (id );
191
+ if (client == null ) {
192
+ client = createClient ();
193
+ clientMap .put (id , client );
194
+ }
195
+ return client ;
196
+ }
197
+
190
198
/**
191
199
* This function takes in the arguments for the subscribe command, and subscribes an instance of
192
200
* MySubscriber with an initial request of 0 (which means don't immediately make a request) to an
@@ -198,27 +206,27 @@ private void handleSubscribe(String[] args) {
198
206
switch (args [2 ]) {
199
207
case "rr" :
200
208
MySubscriber <Payload > rrsub = new MySubscriber <>(0L , AGENT );
201
- payloadSubscribers .put (args [3 ], rrsub );
202
- idToType .put (args [3 ], args [2 ]);
203
- RSocket rrclient = createClient . get ( );
209
+ payloadSubscribers .put (args [0 ] + args [ 3 ], rrsub );
210
+ idToType .put (args [0 ] + args [ 3 ], args [2 ]);
211
+ RSocket rrclient = getClient ( args [ 0 ] );
204
212
consoleUtils .info ("Sending RR with " + args [4 ] + " " + args [5 ]);
205
213
Publisher <Payload > rrpub = rrclient .requestResponse (new PayloadImpl (args [4 ], args [5 ]));
206
214
rrpub .subscribe (rrsub );
207
215
break ;
208
216
case "rs" :
209
217
MySubscriber <Payload > rssub = new MySubscriber <>(0L , AGENT );
210
- payloadSubscribers .put (args [3 ], rssub );
211
- idToType .put (args [3 ], args [2 ]);
212
- RSocket rsclient = createClient . get ( );
218
+ payloadSubscribers .put (args [0 ] + args [ 3 ], rssub );
219
+ idToType .put (args [0 ] + args [ 3 ], args [2 ]);
220
+ RSocket rsclient = getClient ( args [ 0 ] );
213
221
consoleUtils .info ("Sending RS with " + args [4 ] + " " + args [5 ]);
214
222
Publisher <Payload > rspub = rsclient .requestStream (new PayloadImpl (args [4 ], args [5 ]));
215
223
rspub .subscribe (rssub );
216
224
break ;
217
225
case "fnf" :
218
226
MySubscriber <Void > fnfsub = new MySubscriber <>(0L , AGENT );
219
- fnfSubscribers .put (args [3 ], fnfsub );
220
- idToType .put (args [3 ], args [2 ]);
221
- RSocket fnfclient = createClient . get ( );
227
+ fnfSubscribers .put (args [0 ] + args [ 3 ], fnfsub );
228
+ idToType .put (args [0 ] + args [ 3 ], args [2 ]);
229
+ RSocket fnfclient = getClient ( args [ 0 ] );
222
230
consoleUtils .info ("Sending fnf with " + args [4 ] + " " + args [5 ]);
223
231
Publisher <Void > fnfpub = fnfclient .fireAndForget (new PayloadImpl (args [4 ], args [5 ]));
224
232
fnfpub .subscribe (fnfsub );
@@ -254,7 +262,7 @@ private void handleChannel(String[] args, Iterator<String> iter, String name, bo
254
262
255
263
// we now create the publisher that the server will subscribe to with its own subscriber
256
264
// we want to give that subscriber a subscription that the client will use to send data to the server
257
- RSocket client = createClient . get ( );
265
+ RSocket client = getClient ( args [ 0 ] );
258
266
AtomicReference <ParseChannelThread > mypct = new AtomicReference <>();
259
267
Publisher <Payload > pub =
260
268
client .requestChannel (
@@ -289,7 +297,7 @@ public void subscribe(Subscriber<? super Payload> s) {
289
297
private void handleEchoChannel (String [] args ) {
290
298
Payload initPayload = new PayloadImpl (args [2 ], args [3 ]);
291
299
MySubscriber <Payload > testsub = new MySubscriber <>(1L , AGENT );
292
- RSocket client = createClient . get ( );
300
+ RSocket client = getClient ( args [ 0 ] );
293
301
Publisher <Payload > pub =
294
302
client .requestChannel (
295
303
new Publisher <Payload >() {
@@ -306,7 +314,7 @@ public void subscribe(Subscriber<? super Payload> s) {
306
314
307
315
private void handleAwaitTerminal (String [] args ) {
308
316
consoleUtils .info ("Awaiting at Terminal" );
309
- String id = args [3 ];
317
+ String id = args [0 ] + args [ 3 ];
310
318
311
319
assertNotEquals ("Could not find subscriber with given id" , idToType .get (id ), null );
312
320
@@ -322,7 +330,7 @@ private void handleAwaitTerminal(String[] args) {
322
330
private void handleAwaitAtLeast (String [] args ) {
323
331
consoleUtils .info ("Awaiting at Terminal for at least " + args [4 ]);
324
332
try {
325
- String id = args [3 ];
333
+ String id = args [0 ] + args [ 3 ];
326
334
MySubscriber <Payload > sub = payloadSubscribers .get (id );
327
335
sub .awaitAtLeast (Long .parseLong (args [4 ]));
328
336
} catch (InterruptedException e ) {
@@ -332,7 +340,7 @@ private void handleAwaitAtLeast(String[] args) {
332
340
333
341
private void handleAwaitNoEvents (String [] args ) {
334
342
try {
335
- String id = args [3 ];
343
+ String id = args [0 ] + args [ 3 ];
336
344
MySubscriber <Payload > sub = payloadSubscribers .get (id );
337
345
sub .awaitNoEvents (Long .parseLong (args [4 ]));
338
346
} catch (InterruptedException e ) {
@@ -341,7 +349,7 @@ private void handleAwaitNoEvents(String[] args) {
341
349
}
342
350
343
351
private void assertNoError (String [] args ) {
344
- String id = args [3 ];
352
+ String id = args [0 ] + args [ 3 ];
345
353
346
354
assertNotNull ("Could not find subscriber with given id" , idToType .get (id ));
347
355
if (idToType .get (id ).equals ("fnf" )) {
@@ -363,7 +371,7 @@ private void assertNoError(String[] args) {
363
371
364
372
private void assertError (String [] args ) {
365
373
consoleUtils .info ("Checking for error" );
366
- String id = args [3 ];
374
+ String id = args [0 ] + args [ 3 ];
367
375
assertNotNull ("Could not find subscriber with given id" , idToType .get (id ));
368
376
if (idToType .get (id ).equals ("fnf" )) {
369
377
MySubscriber <Void > sub = fnfSubscribers .get (id );
@@ -376,7 +384,7 @@ private void assertError(String[] args) {
376
384
377
385
private void assertReceived (String [] args ) {
378
386
consoleUtils .info ("Verify we received " + args [4 ]);
379
- String id = args [3 ];
387
+ String id = args [0 ] + args [ 3 ];
380
388
MySubscriber <Payload > sub = payloadSubscribers .get (id );
381
389
String [] values = args [4 ].split ("&&" );
382
390
List <Tuple <String , String >> assertList = new ArrayList <>();
@@ -388,7 +396,7 @@ private void assertReceived(String[] args) {
388
396
}
389
397
390
398
private void assertReceivedN (String [] args ) {
391
- String id = args [3 ];
399
+ String id = args [0 ] + args [ 3 ];
392
400
MySubscriber <Payload > sub = payloadSubscribers .get (id );
393
401
try {
394
402
sub .assertValueCount (Integer .parseInt (args [4 ]));
@@ -398,14 +406,14 @@ private void assertReceivedN(String[] args) {
398
406
}
399
407
400
408
private void assertReceivedAtLeast (String [] args ) {
401
- String id = args [3 ];
409
+ String id = args [0 ] + args [ 3 ];
402
410
MySubscriber <Payload > sub = payloadSubscribers .get (id );
403
411
sub .assertReceivedAtLeast (Integer .parseInt (args [4 ]));
404
412
}
405
413
406
414
private void assertCompleted (String [] args ) {
407
415
consoleUtils .info ("Handling onComplete" );
408
- String id = args [3 ];
416
+ String id = args [0 ] + args [ 3 ];
409
417
410
418
assertNotNull ("Could not find subscriber with given id" , idToType .get (id ));
411
419
if (idToType .get (id ).equals ("fnf" )) {
@@ -427,7 +435,7 @@ private void assertCompleted(String[] args) {
427
435
428
436
private void assertNoCompleted (String [] args ) {
429
437
consoleUtils .info ("Handling NO onComplete" );
430
- String id = args [3 ];
438
+ String id = args [0 ] + args [ 3 ];
431
439
432
440
assertNotNull ("Could not find subscriber with given id" , idToType .get (id ));
433
441
if (idToType .get (id ).equals ("fnf" )) {
@@ -448,14 +456,14 @@ private void assertNoCompleted(String[] args) {
448
456
}
449
457
450
458
private void assertCancelled (String [] args ) {
451
- String id = args [3 ];
459
+ String id = args [0 ] + args [ 3 ];
452
460
MySubscriber <Payload > sub = payloadSubscribers .get (id );
453
461
assertTrue (sub .isCancelled ());
454
462
}
455
463
456
464
private void handleRequest (String [] args ) {
457
465
Long num = Long .parseLong (args [2 ]);
458
- String id = args [3 ];
466
+ String id = args [0 ] + args [ 3 ];
459
467
460
468
assertNotNull ("Could not find subscriber with given id" , idToType .get (id ));
461
469
if (idToType .get (id ).equals ("fnf" )) {
@@ -470,24 +478,27 @@ private void handleRequest(String[] args) {
470
478
}
471
479
472
480
private void handleTake (String [] args ) {
473
- String id = args [3 ];
481
+ String id = args [0 ] + args [ 3 ];
474
482
Long num = Long .parseLong (args [2 ]);
475
483
MySubscriber <Payload > sub = payloadSubscribers .get (id );
476
484
sub .take (num );
477
485
}
478
486
479
487
private void handleCancel (String [] args ) {
480
- String id = args [2 ];
488
+ String id = args [0 ] + args [ 2 ];
481
489
MySubscriber <Payload > sub = payloadSubscribers .get (id );
482
490
sub .cancel ();
483
491
}
484
492
485
493
private void handleEOF () {
486
494
MySubscriber <Void > fnfsub = new MySubscriber <>(0L , AGENT );
487
- RSocket fnfclient = createClient .get ();
488
- Publisher <Void > fnfpub = fnfclient .fireAndForget (new PayloadImpl ("shutdown" , "shutdown" ));
489
- fnfpub .subscribe (fnfsub );
490
- fnfsub .request (1 );
495
+ if (clientMap .size () > 0 ) {
496
+ // Use any Client to send shutdown msg to the server
497
+ RSocket fnfclient = clientMap .get (clientMap .keySet ().toArray ()[0 ]);
498
+ Publisher <Void > fnfpub = fnfclient .fireAndForget (new PayloadImpl ("shutdown" , "shutdown" ));
499
+ fnfpub .subscribe (fnfsub );
500
+ fnfsub .request (1 );
501
+ }
491
502
}
492
503
493
504
/** A subscription for channel, it handles request(n) by sort of faking an initial payload. */
0 commit comments