8000 Added support for multiple clients (#359) · chakra-coder/rsocket-java@0f3cde1 · GitHub
[go: up one dir, main page]

Skip to content
< 8000 /react-partial>

Commit 0f3cde1

Browse files
junaidkhalidyschimke
authored andcommitted
Added support for multiple clients (rsocket#359)
1 parent a0bc944 commit 0f3cde1

File tree

3 files changed

+141
-46
lines changed

3 files changed

+141
-46
lines changed

rsocket-tck-drivers/src/main/java/io/rsocket/tckdrivers/client/JavaClientDriver.java

Copy file name to clipboard
Lines changed: 57 additions & 46 deletions
Original file line numberDiff line numberDiff line change
@@ -26,7 +26,6 @@
2626
import io.rsocket.tckdrivers.common.ParseMarble;
2727
import io.rsocket.tckdrivers.common.TckIndividualTest;
2828
import io.rsocket.tckdrivers.common.Tuple;
29-
import io.rsocket.transport.ClientTransport;
3029
import io.rsocket.uri.UriTransportRegistry;
3130
import io.rsocket.util.PayloadImpl;
3231
import java.io.BufferedReader;
@@ -40,7 +39,6 @@
4039
import java.util.Map;
4140
import java.util.concurrent.CountDownLatch;
4241
import java.util.concurrent.atomic.AtomicReference;
43-
import java.util.function.Supplier;
4442
import org.reactivestreams.Publisher;
4543
import org.reactivestreams.Subscriber;
4644
import org.reactivestreams.Subscription;
@@ -56,26 +54,18 @@ public class JavaClientDriver {
5654
private final Map<String, MySubscriber<Payload>> payloadSubscribers;
5755
private final Map<String, MySubscriber<Void>> fnfSubscribers;
5856
private final Map<String, String> idToType;
59-
private Supplier<RSocket> createClient;
6057
private final String uri;
58+
private final Map<String, RSocket> clientMap;
6159
private final String AGENT = "[CLIENT]";
6260
private ConsoleUtils consoleUtils = new ConsoleUtils(AGENT);
6361

6462
public JavaClientDriver(String uri) throws FileNotFoundException {
6563
this.payloadSubscribers = new HashMap<>();
6664
this.fnfSubscribers = new HashMap<>();
6765
this.idToType = new HashMap<>();
68-
this.uri = uri;
69-
70-
// creating a client object to run the test
71-
try {
72-
73-
RSocket client = createClient(this.uri);
74-
this.createClient = () -> client;
7566

76-
} catch (Exception e) {
77-
e.printStackTrace();
78-
}
67+
this.clientMap = new HashMap<>();
68+
this.uri = uri;
7969
}
8070

8171
public enum TestResult {
@@ -89,10 +79,12 @@ public enum TestResult {
8979
*
9080
* @return a RSocket
9181
*/
92-
public RSocket createClient(String uri) {
93-
return RSocketFactory.connect().transport(UriTransportRegistry.clientForUri(uri)).start().block();
82+
public RSocket createClient() {
83+
return RSocketFactory.connect()
84+
.transport(UriTransportRegistry.clientForUri(uri))
85+
.start()
86+
.block();
9487
}
95-
9688
/**
9789
* Parses through the commands for each test, and calls handlers that execute the commands.
9890
*
@@ -115,7 +107,7 @@ public void runTest(List<String> test, String name) throws Exception {
115107
switch (args[1]) {
116108
case "subscribe":
117109
handleSubscribe(args);
118-
id.add(args[3]);
110+
id.add(args[0] + args[3]);
119111
break;
120112
case "channel":
121113
channelTest = true;
@@ -187,6 +179,22 @@ public void runTest(List<String> test, String name) throws Exception {
187179
assertTrue("There is no subscriber in this test", (channelTest) || (id.size() > 0));
188180
}
189181

182+
/**
183+
* A function that do a look up in the clientMap hashtable. If entry does not exist, it creates
184+
* one.
185+
*
186+
* @param id
187+
* @return a RSocket
188+
*/
189+
private RSocket getClient(String id) {
190+
RSocket client = clientMap.get(id);
191+
if (client == null) {
192+
client = createClient();
193+
clientMap.put(id, client);
194+
}
195+
return client;
196+
}
197+
190198
/**
191199
* This function takes in the arguments for the subscribe command, and subscribes an instance of
192200
* MySubscriber with an initial request of 0 (which means don't immediately make a request) to an
@@ -198,27 +206,27 @@ private void handleSubscribe(String[] args) {
198206
switch (args[2]) {
199207
case "rr":
200208
MySubscriber<Payload> rrsub = new MySubscriber<>(0L, AGENT);
201-
payloadSubscribers.put(args[3], rrsub);
202-
idToType.put(args[3], args[2]);
203-
RSocket rrclient = createClient.get();
209+
payloadSubscribers.put(args[0] + args[3], rrsub);
210+
idToType.put(args[0] + args[3], args[2]);
211+
RSocket rrclient = getClient(args[0]);
204212
consoleUtils.info("Sending RR with " + args[4] + " " + args[5]);
205213
Publisher<Payload> rrpub = rrclient.requestResponse(new PayloadImpl(args[4], args[5]));
206214
rrpub.subscribe(rrsub);
207215
break;
208216
case "rs":
209217
MySubscriber<Payload> rssub = new MySubscriber<>(0L, AGENT);
210-
payloadSubscribers.put(args[3], rssub);
211-
idToType.put(args[3], args[2]);
212-
RSocket rsclient = createClient.get();
218+
payloadSubscribers.put(args[0] + args[3], rssub);
219+
idToType.put(args[0] + args[3], args[2]);
220+
RSocket rsclient = getClient(args[0]);
213221
consoleUtils.info("Sending RS with " + args[4] + " " + args[5]);
214222
Publisher<Payload> rspub = rsclient.requestStream(new PayloadImpl(args[4], args[5]));
215223
rspub.subscribe(rssub);
216224
break;
217225
case "fnf":
218226
MySubscriber<Void> fnfsub = new MySubscriber<>(0L, AGENT);
219-
fnfSubscribers.put(args[3], fnfsub);
220-
idToType.put(args[3], args[2]);
221-
RSocket fnfclient = createClient.get();
227+
fnfSubscribers.put(args[0] + args[3], fnfsub);
228+
idToType.put(args[0] + args[3], args[2]);
229+
RSocket fnfclient = getClient(args[0]);
222230
consoleUtils.info("Sending fnf with " + args[4] + " " + args[5]);
223231
Publisher<Void> fnfpub = fnfclient.fireAndForget(new PayloadImpl(args[4], args[5]));
224232
fnfpub.subscribe(fnfsub);
@@ -254,7 +262,7 @@ private void handleChannel(String[] args, Iterator<String> iter, String name, bo
254262

255263
// we now create the publisher that the server will subscribe to with its own subscriber
256264
// we want to give that subscriber a subscription that the client will use to send data to the server
257-
RSocket client = createClient.get();
265+
RSocket client = getClient(args[0]);
258266
AtomicReference<ParseChannelThread> mypct = new AtomicReference<>();
259267
Publisher<Payload> pub =
260268
client.requestChannel(
@@ -289,7 +297,7 @@ public void subscribe(Subscriber<? super Payload> s) {
289297
private void handleEchoChannel(String[] args) {
290298
Payload initPayload = new PayloadImpl(args[2], args[3]);
291299
MySubscriber<Payload> testsub = new MySubscriber<>(1L, AGENT);
292-
RSocket client = createClient.get();
300+
RSocket client = getClient(args[0]);
293301
Publisher<Payload> pub =
294302
client.requestChannel(
295303
new Publisher<Payload>() {
@@ -306,7 +314,7 @@ public void subscribe(Subscriber<? super Payload> s) {
306314

307315
private void handleAwaitTerminal(String[] args) {
308316
consoleUtils.info("Awaiting at Terminal");
309-
String id = args[3];
317+
String id = args[0] + args[3];
310318

311319
assertNotEquals("Could not find subscriber with given id", idToType.get(id), null);
312320

@@ -322,7 +330,7 @@ private void handleAwaitTerminal(String[] args) {
322330
private void handleAwaitAtLeast(String[] args) {
323331
consoleUtils.info("Awaiting at Terminal for at least " + args[4]);
324332
try {
325-
String id = args[3];
333+
String id = args[0] + args[3];
326334
MySubscriber<Payload> sub = payloadSubscribers.get(id);
327335
sub.awaitAtLeast(Long.parseLong(args[4]));
328336
} catch (InterruptedException e) {
@@ -332,7 +340,7 @@ private void handleAwaitAtLeast(String[] args) {
332340

333341
private void handleAwaitNoEvents(String[] args) {
334342
try {
335-
String id = args[3];
343+
String id = args[0] + args[3];
336344
MySubscriber<Payload> sub = payloadSubscribers.get(id);
337345
sub.awaitNoEvents(Long.parseLong(args[4]));
338346
} catch (InterruptedException e) {
@@ -341,7 +349,7 @@ private void handleAwaitNoEvents(String[] args) {
341349
}
342350

343351
private void assertNoError(String[] args) {
344-
String id = args[3];
352+
String id = args[0] + args[3];
345353

346354
assertNotNull("Could not find subscriber with given id", idToType.get(id));
347355
if (idToType.get(id).equals("fnf")) {
@@ -363,7 +371,7 @@ private void assertNoError(String[] args) {
363371

364372
private void assertError(String[] args) {
365373
consoleUtils.info("Checking for error");
366-
String id = args[3];
374+
String id = args[0] + args[3];
367375
assertNotNull("Could not find subscriber with given id", idToType.get(id));
368376
if (idToType.get(id).equals("fnf")) {
369377
MySubscriber<Void> sub = fnfSubscribers.get(id);
@@ -376,7 +384,7 @@ private void assertError(String[] args) {
376384

377385
private void assertReceived(String[] args) {
378386
consoleUtils.info("Verify we received " + args[4]);
379-
String id = args[3];
387+
String id = args[0] + args[3];
380388
MySubscriber<Payload> sub = payloadSubscribers.get(id);
381389
String[] values = args[4].split("&&");
382390
List<Tuple<String, String>> assertList = new ArrayList<>();
@@ -388,7 +396,7 @@ private void assertReceived(String[] args) {
388396
}
389397

390398
private void assertReceivedN(String[] args) {
391-
String id = args[3];
399+
String id = args[0] + args[3];
392400
MySubscriber<Payload> sub = payloadSubscribers.get(id);
393401
try {
394402
sub.assertValueCount(Integer.parseInt(args[4]));
@@ -398,14 +406,14 @@ private void assertReceivedN(String[] args) {
398406
}
399407

400408
private void assertReceivedAtLeast(String[] args) {
401-
String id = args[3];
409+
String id = args[0] + args[3];
402410
MySubscriber<Payload> sub = payloadSubscribers.get(id);
403411
sub.assertReceivedAtLeast(Integer.parseInt(args[4]));
404412
}
405413

406414
private void assertCompleted(String[] args) {
407415
consoleUtils.info("Handling onComplete");
408-
String id = args[3];
416+
String id = args[0] + args[3];
409417

410418
assertNotNull("Could not find subscriber with given id", idToType.get(id));
411419
if (idToType.get(id).equals("fnf")) {
@@ -427,7 +435,7 @@ private void assertCompleted(String[] args) {
427435

428436
private void assertNoCompleted(String[] args) {
429437
consoleUtils.info("Handling NO onComplete");
430-
String id = args[3];
438+
String id = args[0] + args[3];
431439

432440
assertNotNull("Could not find subscriber with given id", idToType.get(id));
433441
if (idToType.get(id).equals("fnf")) {
@@ -448,14 +456,14 @@ private void assertNoCompleted(String[] args) {
448456
}
449457

450458
private void assertCancelled(String[] args) {
451-
String id = args[3];
459+
String id = args[0] + args[3];
452460
MySubscriber<Payload> sub = payloadSubscribers.get(id);
453461
assertTrue(sub.isCancelled());
454462
}
455463

456464
private void handleRequest(String[] args) {
457465
Long num = Long.parseLong(args[2]);
458-
String id = args[3];
466+
String id = args[0] + args[3];
459467

460468
assertNotNull("Could not find subscriber with given id", idToType.get(id));
461469
if (idToType.get(id).equals("fnf")) {
@@ -470,24 +478,27 @@ private void handleRequest(String[] args) {
470478
}
471479

472480
private void handleTake(String[] args) {
473-
String id = args[3];
481+
String id = args[0] + args[3];
474482
Long num = Long.parseLong(args[2]);
475483
MySubscriber<Payload> sub = payloadSubscribers.get(id);
476484
sub.take(num);
477485
}
478486

479487
private void handleCancel(String[] args) {
480-
String id = args[2];
488+
String id = args[0] + args[2];
481489
MySubscriber<Payload> sub = payloadSubscribers.get(id);
482490
sub.cancel();
483491
}
484492

485493
private void handleEOF() {
486494
MySubscriber<Void> fnfsub = new MySubscriber<>(0L, AGENT);
487-
RSocket fnfclient = createClient.get();
488-
Publisher<Void> fnfpub = fnfclient.fireAndForget(new PayloadImpl("shutdown", "shutdown"));
489-
fnfpub.subscribe(fnfsub);
490-
fnfsub.request(1);
495+
if (clientMap.size() > 0) {
496+
// Use any Client to send shutdown msg to the server
497+
RSocket fnfclient = clientMap.get(clientMap.keySet().toArray()[0]);
498+
Publisher<Void> fnfpub = fnfclient.fireAndForget(new PayloadImpl("shutdown", "shutdown"));
499+
fnfpub.subscribe(fnfsub);
500+
fnfsub.request(1);
501+
}
491502
}
492503

493504
/** A subscription for channel, it handles request(n) by sort of faking an initial payload. */
Lines changed: 79 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,79 @@
1+
!
2+
name%%requestResponseConcurrent
3+
c1%%subscribe%%rr%%1%%i%%j
4+
c1%%subscribe%%rr%%2%%m%%n
5+
c1%%request%%1%%1
6+
c1%%request%%1%%2
7+
c1%%subscribe%%rr%%3%%k%%l
8+
c2%%subscribe%%rr%%4%%k%%l
9+
c1%%request%%1%%3
10+
c2%%request%%1%%4
11+
c1%%await%%atLeast%%3%%1%%100
12+
c2%%await%%atLeast%%4%%1%%100
13+
c1%%assert%%received%%1%%homer,simpson
14+
c1%%assert%%received%%2%%seymour,skinner
15+
c1%%assert%%received%%3%%bart,simpson
16+
c2%%assert%%received%%4%%bart,simpson
17+
!
18+
name%%r 10000 equestResponseSequential
19+
c3%%subscribe%%rr%%5%%i%%j
20+
c3%%subscribe%%rr%%6%%k%%l
21+
c3%%request%%1%%5
22+
c3%%subscribe%%rr%%7%%m%%n
23+
c3%%request%%1%%6
24+
c3%%request%%1%%7
25+
c3%%await%%atLeast%%5%%1%%100
26+
c3%%await%%atLeast%%6%%1%%100
27+
c3%%await%%atLeast%%7%%1%%100
28+
c3%%assert%%received%%5%%homer,simpson
29+
c3%%assert%%received%%6%%bart,simpson
30+
c3%%assert%%received%%7%%seymour,skinner
31+
c4%%subscribe%%rr%%8%%m%%n
32+
c4%%subscribe%%rr%%9%%k%%l
33+
c4%%request%%1%%8
34+
c4%%subscribe%%rr%%10%%i%%j
35+
c4%%request%%1%%9
36+
c4%%request%%1%%10
37+
c4%%await%%atLeast%%8%%1%%100v
38+
c4%%await%%atLeast%%9%%1%%100
39+
c4%%await%%atLeast%%10%%1%%100
40+
c4%%assert%%received%%10%%homer,simpson
41+
c4%%assert%%received%%9%%bart,simpson
42+
c4%%assert%%received%%8%%seymour,skinner
43+
!
44+
name%%requestStreamConcurrent
45+
c5%%subscribe%%rs%%11%%a%%b
46+
c5%%request%%3%%11
47+
c6%%subscribe%%rs%%12%%c%%d
48+
c6%%request%%3%%12
49+
c5%%await%%atLeast%%11%%3%%100
50+
c6%%await%%atLeast%%12%%3%%100
51+
c5%%assert%%no_error%%11
52+
c5%%assert%%received_n%%11%%3
53+
c6%%assert%%no_error%%12
54+
c6%%assert%%received_n%%12%%3
55+
c5%%request%%3%%11
56+
c6%%request%%3%%12
57+
c5%%await%%atLeast%%11%%6%%100
58+
c6%%await%%atLeast%%12%%6%%100
59+
c5%%assert%%received_n%%11%%6
60+
c6%%assert%%received_n%%12%%6
61+
c5%%await%%terminal%%11
62+
c6%%await%%terminal%%12
63+
c5%%assert%%completed%%11
64+
c6%%assert%%completed%%12
B6B4 65+
!
66+
name%%requestStreamSequential
67+
c7%%subscribe%%rs%%13%%a%%b
68+
c7%%request%%3%%13
69+
c7%%await%%atLeast%%13%%3%%100
70+
c7%%assert%%received%%13%%a,b&&c,d&&e,f
71+
c7%%assert%%no_error%%13
72+
c7%%assert%%received_n%%13%%3
73+
c8%%subscribe%%rs%%14%%c%%d
74+
c8%%request%%3%%14
75+
c8%%await%%atLeast%%14%%3%%100
76+
c8%%assert%%received%%14%%a,b&&c,d&&e,f
77+
c8%%assert%%no_error%%14
78+
c8%%assert%%received_n%%14%%3
79+
EOF
Lines changed: 5 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,5 @@
1+
rs%%a%%b%%---a-----b-----c-----d--e--f---|&&{"a":{"a":"b"},"b":{"c":"d"},"c":{"e":"f"}}
2+
rs%%c%%d%%---a-----b-----c-----d--e--f---|&&{"a":{"a":"b"},"b":{"c":"d"},"c":{"e":"f"}}
3+
rr%%i%%j%%x-|&&{"x":{"homer":"simpson"}}
4+
rr%%k%%l%%y-|&&{"y":{"bart":"simpson"}}
5+
rr%%m%%n%%z-|&&{"z":{"seymour":"skinner"}}

0 commit comments

Comments
 (0)
0