8000 Refresh sockets on `select` in LoadBalancedRSocketMono before checkin… · rwinch/rsocket-java@1b87e1f · GitHub 8000
[go: up one dir, main page]

Skip to content

Commit 1b87e1f

Browse files
jareddellittrobertroeser
authored andcommitted
Refresh sockets on select in LoadBalancedRSocketMono before checking for active sockets (rsocket#623)
Signed-off-by: Jared Dellitt <jared@dwolla.com>
1 parent 6405898 commit 1b87e1f

File tree

2 files changed

+33
-48
lines changed

2 files changed

+33
-48
lines changed

rsocket-load-balancer/src/main/java/io/rsocket/client/LoadBalancedRSocketMono.java

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -374,10 +374,11 @@ public synchronized double availability() {
374374
}
375375

376376
private synchronized RSocket select() {
377+
refreshSockets();
378+
377379
if (activeSockets.isEmpty()) {
378380
return FAILING_REACTIVE_SOCKET;
379381
}
380-
refreshSockets();
381382

382383
int size = activeSockets.size();
383384
if (size == 1) {

rsocket-load-balancer/src/test/java/io/rsocket/client/LoadBalancedRSocketMonoTest.java

Lines changed: 31 additions & 47 deletions
< 6D47 tr class="diff-line-row">
Original file line numberDiff line numberDiff line change
@@ -19,31 +19,24 @@
1919
import io.rsocket.Payload;
2020
import io.rsocket.RSocket;
2121
import io.rsocket.client.filter.RSocketSupplier;
22-
import io.rsocket.util.EmptyPayload;
23-
import java.net.InetSocketAddress;
24-
import java.net.SocketAddress;
2522
import java.util.Arrays;
23+
import java.util.Collections;
2624
import java.util.List;
27-
import java.util.concurrent.CountDownLatch;
25+
import java.util.concurrent.CompletableFuture;
2826
import java.util.function.Function;
2927
import org.junit.Assert;
3028
import org.junit.Test;
3129
import org.mockito.Mockito;
3230
import org.reactivestreams.Publisher;
33-
import org.reactivestreams.Subscriber;
34-
import org.reactivestreams.Subscription;
3531
import reactor.core.publisher.Flux;
3632
import reactor.core.publisher.Mono;
3733

3834
public class LoadBalancedRSocketMonoTest {
3935

4036
@Test(timeout = 10_000L)
4137
public void testNeverSelectFailingFactories() throws InterruptedException {
42-
InetSocketAddress local0 = InetSocketAddress.createUnresolved("localhost", 7000);
43-
InetSocketAddress local1 = InetSocketAddress.createUnresolved("localhost", 7001);
44-
4538
TestingRSocket socket = new TestingRSocket(Function.identity());
46-
RSocketSupplier failing = failingClient(local0);
39+
RSocketSupplier failing = failingClient();
4740
RSocketSupplier succeeding = succeedingFactory(socket);
4841
List<RSocketSupplier> factories = Arrays.asList(failing, succeeding);
4942

@@ -52,9 +45,6 @@ public void testNeverSelectFailingFactories() throws InterruptedException {
5245

5346
@Test(timeout = 10_000L)
5447
public void testNeverSelectFailingSocket() throws InterruptedException {
55-
InetSocketAddress local0 = InetSocketAddress.createUnresolved("localhost", 7000);
56-
InetSocketAddress local1 = InetSocketAddress.createUnresolved("localhost", 7001);
57-
5848
TestingRSocket socket = new TestingRSocket(Function.identity());
5949
TestingRSocket failingSocket =
6050
new TestingRSocket(Function.identity()) {
@@ -76,6 +66,33 @@ public double availability() {
7666
testBalancer(clients);
7767
}
7868

69+
@Test(timeout = 10_000L)
70+
public void testRefreshesSocketsOnSelectBeforeReturningFailedAfterNewFactoriesDelivered() {
71+
TestingRSocket socket = new TestingRSocket(Function.identity());
72+
73+
CompletableFuture<RSocketSupplier> laterSupplier = new CompletableFuture<>();
74+
Flux<List<RSocketSupplier>> factories =
75+
Flux.create(
76+
s -> {
77+
s.next(Collections.emptyList());
78+
79+
laterSupplier.handle(
80+
(RSocketSupplier result, Throwable t) -> {
81+
s.next(Collections.singletonList(result));
82+
return null;
83+
});
84+
});
85+
86+
LoadBalancedRSocketMono balancer = LoadBalancedRSocketMono.create(factories);
87+
88+
Assert.assertEquals(0.0, balancer.availability(), 0);
89+
90+
laterSupplier.complete(succeedingFactory(socket));
91+
balancer.rSocketMono.block();
92+
93+
Assert.assertEquals(1.0, balancer.availability(), 0);
94+
}
95+
7996
private void testBalancer(List<RSocketSupplier> factories) throws InterruptedException {
8097
Publisher<List<RSocketSupplier>> src =
8198
s -> {
@@ -92,39 +109,6 @@ private void testBalancer(List<RSocketSupplier> factories) throws InterruptedExc
92109
Flux.range(0, 100).flatMap(i -> balancer).blockLast();
93110
}
94111

95-
private void makeAcall(RSocket balancer) throws InterruptedException {
96-
CountDownLatch latch = new CountDownLatch(1);
97-
98-
balancer
99-
.requestResponse(EmptyPayload.INSTANCE)
100-
.subscribe(
101-
new Subscriber<Payload>() {
102-
@Override
103-
public void onSubscribe(Subscription s) {
104-
s.request(1L);
105-
}
106-
107-
@Override
108-
public void onNext(Payload payload) {
109-
System.out.println("Successfully receiving a response");
110-
}
111-
112-
@Override
113-
public void onError(Throwable t) {
114-
t.printStackTrace();
115-
Assert.assertTrue(false);
116-
latch.countDown();
117-
}
118-
119-
@Override
120-
public void onComplete() {
121-
latch.countDown();
122-
}
123-
});
124-
125-
latch.await();
126-
}
127-
128112
private static RSocketSupplier succeedingFactory(RSocket socket) {
129113
RSocketSupplier mock = Mockito.mock(RSocketSupplier.class);
130114

@@ -135,7 +119,7 @@ private static RSocketSupplier succeedingFactory(RSocket socket) {
135119
return mock;
136120
}
137121

138-
private static RSocketSupplier failingClient(SocketAddress sa) {
122+
private static RSocketSupplier failingClient() {
139123
RSocketSupplier mock = Mockito.mock(RSocketSupplier.class);
140124

141125
Mockito.when(mock.availability()).thenReturn(0.0);

0 commit comments

Comments
 (0)
0