15
15
16
16
import static org .junit .Assert .*;
17
17
18
+ import com .google .common .base .Throwables ;
19
+ import com .google .common .cache .CacheBuilder ;
20
+ import com .google .common .cache .CacheLoader ;
21
+ import com .google .common .cache .LoadingCache ;
18
22
import io .rsocket .Payload ;
19
23
import io .rsocket .RSocket ;
20
- import io .rsocket .RSocketFactory ;
21
24
import io .rsocket .tckdrivers .common .ConsoleUtils ;
22
25
import io .rsocket .tckdrivers .common .EchoSubscription ;
23
26
import io .rsocket .tckdrivers .common .MySubscriber ;
24
27
import io .rsocket .tckdrivers .common .ParseChannel ;
25
28
import io .rsocket .tckdrivers .common .ParseChannelThread ;
26
29
import io .rsocket .tckdrivers .common .ParseMarble ;
27
- import io .rsocket .tckdrivers .common .TckIndividualTest ;
30
+ import io .rsocket .tckdrivers .common .TckClientTest ;
28
31
import io .rsocket .tckdrivers .common .Tuple ;
29
- import io .rsocket .uri .UriTransportRegistry ;
30
32
import io .rsocket .util .PayloadImpl ;
31
- import java .io .BufferedReader ;
32
- import java .io .File ;
33
- import java .io .FileNotFoundException ;
34
- import java .io .FileReader ;
35
33
import java .util .ArrayList ;
36
34
import java .util .HashMap ;
37
35
import java .util .Iterator ;
38
36
import java .util .List ;
39
37
import java .util .Map ;
40
38
import java .util .concurrent .CountDownLatch ;
39
+ import java .util .concurrent .ExecutionException ;
41
40
import java .util .concurrent .atomic .AtomicReference ;
42
41
import org .reactivestreams .Publisher ;
43
42
import org .reactivestreams .Subscriber ;
44
43
import org .reactivestreams .Subscription ;
44
+ import reactor .core .publisher .Mono ;
45
45
46
46
/**
47
47
* This class is the driver for the Java RSocket client. To use with class with the current Java
51
51
*/
52
52
public class JavaClientDriver {
53
53
54
- private final Map <String , MySubscriber <Payload >> payloadSubscribers ;
55
- private final Map <String , MySubscriber <Void >> fnfSubscribers ;
56
- private final Map <String , String > idToType ;
57
- private final String uri ;
58
- priv
F438
ate final Map <String , RSocket > clientMap ;
54
+ private final Map <String , MySubscriber <Payload >> payloadSubscribers = new HashMap <>();
55
+ private final Map <String , MySubscriber <Void >> fnfSubscribers = new HashMap <>();
56
+ private final Map <String , String > idToType = new HashMap <>();
59
57
private final String AGENT = "[CLIENT]" ;
58
+ private final LoadingCache <String , RSocket > clientMap ;
60
59
private ConsoleUtils consoleUtils = new ConsoleUtils (AGENT );
61
60
62
- public JavaClientDriver (String uri ) throws FileNotFoundException {
63
- this .payloadSubscribers = new HashMap <>();
64
- this .fnfSubscribers = new HashMap <>();
65
- this .idToType = new HashMap <>();
66
-
67
- this .clientMap = new HashMap <>();
68
- this .uri = uri ;
69
- }
70
-
71
- public enum TestResult {
72
- PASS ,
73
- FAIL ,
74
- CHANNEL
61
+ public JavaClientDriver (Mono <RSocket > clientBuilder ) {
62
+ clientMap =
63
+ CacheBuilder .newBuilder ()
64
+ .build (
65
+ new CacheLoader <String , RSocket >() {
66
+ @ Override
67
+ public RSocket load (String key ) throws Exception {
68
+ return clientBuilder .block ();
69
+ }
70
+ });
75
71
}
76
72
77
- /**
78
- * A function that creates a RSocket on a new TCP connection.
79
- *
80
- * @return a RSocket
81
- */
82
- public RSocket createClient () {
83
- return RSocketFactory .connect ()
84
- .transport (UriTransportRegistry .clientForUri (uri ))
85
- .start ()
86
- .block ();
87
- }
88
- /**
89
- * Parses through the commands for each test, and calls handlers that execute the commands.
90
- *
91
- * @param test the list of strings which makes up each test case
92
- * @param name the name of the test
93
- */
94
- public void runTest (List <String > test , String name ) throws Exception {
73
+ /** Parses through the commands for each test, and calls handlers that execute the commands. */
74
+ public void runTest (TckClientTest test ) {
95
75
List <String > id = new ArrayList <>();
96
- Iterator <String > iter = test .iterator ();
76
+ Iterator <String > iter = test .testLines (). iterator ();
97
77
boolean channelTest = false ; // tells whether this is a test for channel or not
98
78
while (iter .hasNext ()) {
99
79
String line = iter .next ();
@@ -109,7 +89,7 @@ public void runTest(List<String> test, String name) throws Exception {
109
89
break ;
110
90
case "channel" :
111
91
channelTest = true ;
112
- handleChannel (args , iter , name , true );
92
+ handleChannel (args , iter , test . name , true );
113
93
break ;
114
94
case "echochannel" :
115
95
handleEchoChannel (args );
@@ -180,25 +160,19 @@ public void runTest(List<String> test, String name) throws Exception {
180
160
/**
181
161
* A function that do a look up in the clientMap hashtable. If entry does not exist, it creates
182
162
* one.
183
- *
184
- * @param id
185
- * @return a RSocket
186
163
*/
187
164
private RSocket getClient (String id ) {
188
- RSocket client = clientMap . get ( id );
189
- if ( client == null ) {
190
- client = createClient ();
191
- clientMap . put ( id , client );
165
+ try {
166
+ return clientMap . get ( id );
167
+ } catch ( ExecutionException e ) {
168
+ throw Throwables . propagate ( e );
192
169
}
193
- return client ;
194
170
}
195
171
196
172
/**
197
173
* This function takes in the arguments for the subscribe command, and subscribes an instance of
198
174
* MySubscriber with an initial request of 0 (which means don't immediately make a request) to an
199
- * instance of the corresponding publisher
200
- *
201
- * @param args
175
+ * instance of the corresponding publisher.
202
176
*/
203
177
private void handleSubscribe (String [] args ) {
204
178
switch (args [2 ]) {
@@ -238,10 +212,6 @@ private void handleSubscribe(String[] args) {
238
212
* This function takes in an iterator that is parsing through the test, and collects all the parts
239
213
* that make up the channel functionality. It then create a thread that runs the test, which we
240
214
* wait to finish before proceeding with the other tests.
241
- *
242
- * @param args
243
- * @param iter
244
- * @param name
245
215
*/
246
216
private void handleChannel (String [] args , Iterator <String > iter , String name , boolean pass ) {
247
217
List <String > commands = new ArrayList <>();
@@ -289,8 +259,6 @@ public void subscribe(Subscriber<? super Payload> s) {
289
259
/**
290
260
* This handles echo tests. This sets up a channel connection with the EchoSubscription, which we
291
261
* pass to the MySubscriber.
292
- *
293
- * @param args
294
262
*/
295
263
private void handleEchoChannel (String [] args ) {
296
264
Payload initPayload = new PayloadImpl (args [2 ], args [3 ]);
@@ -490,13 +458,13 @@ private void handleCancel(String[] args) {
490
458
491
459
private void handleEOF () {
492
460
MySubscriber <Void > fnfsub = new MySubscriber <>(0L , AGENT );
493
- if (clientMap .size () > 0 ) {
494
- // Use any Client to send shutdown msg to the server
495
- RSocket fnfclient = clientMap .get (clientMap .keySet ().toArray ()[0 ]);
496
- Publisher <Void > fnfpub = fnfclient .fireAndForget (new PayloadImpl ("shutdown" , "shutdown" ));
497
- fnfpub .subscribe (fnfsub );
498
- fnfsub .request (1 );
499
-
1241
}
461
+ // if (clientMap.size() > 0) {
462
+ // // Use any Client to send shutdown msg to the server
463
+ // RSocket fnfclient = clientMap.get(clientMap.keySet().toArray()[0]);
464
+ // Publisher<Void> fnfpub = fnfclient.fireAndForget(new PayloadImpl("shutdown", "shutdown"));
465
+ // fnfpub.subscribe(fnfsub);
466
+ // fnfsub.request(1);
467
+ // }
500
468
}
501
469
502
470
/** A subscription for channel, it handles request(n) by sort of faking an initial payload. */
@@ -529,48 +497,4 @@ public void request(long n) {
529
497
if (m > 0 ) pm .request (m );
530
498
}
531
499
}
532
-
533
- /**
534
- * A function that parses the file and extract the individual tests
535
- *
536
- * @param file The file to read as input.
537
- * @return a list of TckIndividualTest.
538
- */
539
- public static List <TckIndividualTest > extractTests (File file ) throws Exception {
540
-
541
- BufferedReader reader = new BufferedReader (new FileReader (file ));
542
- List <TckIndividualTest > tests = new ArrayList <>();
543
- List <String > test = new ArrayList <>();
544
- String line = reader .readLine ();
545
- String testFile = file .getName ().replaceFirst (TckIndividualTest .clientPrefix , "" );
546
-
547
- //Parsing the input client file to read all the tests
548
- while (line != null ) {
549
- switch (line ) {
550
- case "!" :
551
- String name = "" ;
552
- if (test .size () > 1 ) {
553
- name = test .get (0 ).split ("%%" )[1 ];
554
- }
555
-
556
- TckIndividualTest tckTest = new TckIndividualTest (name , test , testFile );
557
- tests .add (tckTest );
558
- test = new ArrayList <>();
559
- break ;
560
- default :
561
- test .add (line );
562
- break ;
563
- }
564
- line = reader .readLine ();
565
- }
566
-
567
- if (test .size () > 0 ) {
568
- String name = "" ;
569
- name = test .get (0 ).split ("%%" )[1 ];
570
- TckIndividualTest tckTest = new TckIndividualTest (name , test , testFile );
571
- tests .add (tckTest );
572
- tests = tests .subList (1 , tests .size ()); // remove the first list, which is empty
573
- }
574
- return tests ;
575
- }
576
500
}
0 commit comments