8000 fix: LoadBalancedRSocketMono and RSocketSupplierPool not in sync (#655) · FzNl/rsocket-java@fb5aa1d · GitHub
[go: up one dir, main page]

Skip to content

Commit fb5aa1d

Browse files
xiazuojierobertroeser
authored andcommitted
fix: LoadBalancedRSocketMono and RSocketSupplierPool not in sync (rsocket#655)
* fix: LoadBalancedRSocketMono#activeSockets and RSocketSupplierPool#leasedSuppliers are not in sync add: more logs to help debugging fix: log when factoryPool has new items Signed-off-by: 刘禅 <zuojie@alibaba-inc.com> * fix: format Signed-off-by: 刘禅 <zuojie@alibaba-inc.com>
1 parent 19f0c78 commit fb5aa1d

File tree

2 files changed

+45
-18
lines changed

2 files changed

+45
-18
lines changed

rsocket-load-balancer/src/main/java/io/rsocket/client/LoadBalancedRSocketMono.java

Lines changed: 35 additions & 16 deletions
Original file line numberDiff line numberDiff line change
@@ -279,7 +279,6 @@ private synchronized void addSockets(int numberOfNewSocket) {
279279
if (optional.isPresent()) {
280280
RSocketSupplier supplier = optional.get();
281281
WeightedSocket socket = new WeightedSocket(supplier, lowerQuantile, higherQuantile);
282-
activeSockets.add(socket);
283282
} else {
284283
10000 break;
285284
}
@@ -356,7 +355,8 @@ private synchronized void quickSlowestRS() {
356355
}
357356

358357
if (slowest != null) {
359-
activeSockets.remove(slowest);
358+
logger.debug("Disposing slowest WeightedSocket {}", slowest);
359+
slowest.dispose();
360360
}
361361
}
362362

@@ -461,7 +461,7 @@ public synchronized String toString() {
461461

462462
@Override
463463
public void dispose() {
464-
synchronized (this) {;
464+
synchronized (this) {
465465
activeSockets.forEach(WeightedSocket::dispose);
466466
activeSockets.clear();
467467
onClose.onComplete();
@@ -573,12 +573,16 @@ private class WeightedSocket extends AbstractRSocket implements LoadBalancerSock
573573
this.interArrivalTime = new Ewma(1, TimeUnit.MINUTES, DEFAULT_INITIAL_INTER_ARRIVAL_TIME);
574574
this.pendingStreams = new AtomicLong();
575575

576+
logger.debug("Creating WeightedSocket {} from factory {}", WeightedSocket.this, factory);
577+
576578
WeightedSocket.this
577579
.onClose()
578580
.doFinally(
579581
s -> {
580582
pool.accept(factory);
581583
activeSockets.remove(WeightedSocket.this);
584+
logger.debug(
585+
"Removed {} from factory {} from activeSockets", WeightedSocket.this, factory);
582586
refreshSockets();
583587
})
584588
.subscribe();
@@ -588,7 +592,11 @@ private class WeightedSocket extends AbstractRSocket implements LoadBalancerSock
588592
.retryBackoff(weightedSocketRetries, weightedSocketBackOff, weightedSocketMaxBackOff)
589593
.doOnError(
590594
throwable -> {
591-
logger.error("error while connecting {}", throwable);
595+
logger.error(
596+
"error while connecting {} from factory {}",
597+
WeightedSocket.this,
598+
factory,
599+
throwable);
592600
WeightedSocket.this.dispose();
593601
})
594602
.subscribe(
@@ -598,7 +606,8 @@ private class WeightedSocket extends AbstractRSocket implements LoadBalancerSock
598606
.onClose()
599607
.doFinally(
600608
signalType -> {
601-
System.out.println("RSocket closed");
609+
logger.info(
610+
"RSocket {} from factory {} closed", WeightedSocket.this, factory);
602611
WeightedSocket.this.dispose();
603612
})
604613
.subscribe();
@@ -608,7 +617,7 @@ private class WeightedSocket extends AbstractRSocket implements LoadBalancerSock
608617
.onClose()
609618
.doFinally(
610619
signalType -> {
611-
System.out.println("Factory closed");
620+
logger.info("Factory {} closed", factory);
612621
rSocket.dispose();
613622
})
614623
.subscribe();
@@ -618,20 +627,30 @@ private class WeightedSocket extends AbstractRSocket implements LoadBalancerSock
618627
.onClose()
619628
.doFinally(
620629
signalType -> {
621-
System.out.println("WeightedSocket closed");
630+
logger.info(
631+
"WeightedSocket {} from factory {} closed",
632+
WeightedSocket.this,
633+
factory);
622634
rSocket.dispose();
623635
})
624636
.subscribe();
625637

626-
synchronized (LoadBalancedRSocketMono.this) {
638+
/*synchronized (LoadBalancedRSocketMono.this) {
627639
if (activeSockets.size() >= targetAperture) {
628640
quickSlowestRS();
629641
pendingSockets -= 1;
630642
}
631-
}
632-
643+
}*/
633644
rSocketMono.onNext(rSocket);
634645
availability = 1.0;
646+
if (!WeightedSocket.this
647+
.isDisposed()) { // May be already disposed because of retryBackoff delay
648+
activeSockets.add(WeightedSocket.this);
649+
logger.debug(
650+
"Added WeightedSocket {} from factory {} to activeSockets",
651+
WeightedSocket.this,
652+
factory);
653+
}
635654
});
636655
}
637656

@@ -829,11 +848,11 @@ public long lastTimeUsedMillis() {
829848
*/
830849
private class LatencySubscriber<U> implements Subscriber<U> {
831850
private final Subscriber<U> child;
832-
private final LoadBalancedRSocketMono.WeightedSocket socket;
851+
private final WeightedSocket socket;
833852
private final AtomicBoolean done;
834853
private long start;
835854

836-
LatencySubscriber(Subscriber<U> child, LoadBalancedRSocketMono.WeightedSocket socket) {
855+
LatencySubscriber(Subscriber<U> child, WeightedSocket socket) {
837856
this.child = child;
838857
this.socket = socket;
839858
this.done = new AtomicBoolean(false);
@@ -893,9 +912,9 @@ public void onComplete() {
893912
*/
894913
private class CountingSubscriber<U> implements Subscriber<U> {
895914
private final Subscriber<U> child;
896-
private final LoadBalancedRSocketMono.WeightedSocket socket;
915+
private final WeightedSocket socket;
897916

898-
CountingSubscriber(Subscriber<U> child, LoadBalancedRSocketMono.WeightedSocket socket) {
917+
CountingSubscriber(Subscriber<U> child, WeightedSocket socket) {
899918
this.child = child;
900919
this.socket = socket;
901920
}
@@ -916,8 +935,8 @@ public void onError(Throwable t) {
916935
socket.pendingStreams.decrementAndGet();
917936
child.onError(t);
918937
if (t instanceof TransportException || t instanceof ClosedChannelException) {
919-
activeSockets.remove(socket);
920-
refreshSockets();
938+
logger.debug("Disposing {} from activeSockets because of error {}", socket, t);
939+
socket.dispose();
921940
}
922941
}
923942

rsocket-load-balancer/src/main/java/io/rsocket/client/RSocketSupplierPool.java

Lines changed: 10 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -84,6 +84,9 @@ private synchronized void handleNewFactories(Collection<RSocketSupplier> newFact
8484
}
8585

8686
factoryPool.addAll(added);
87+
if (!added.isEmpty()) {
88+
changed = true;
89+
}
8790

8891
if (changed && logger.isDebugEnabled()) {
8992
StringBuilder msgBuilder = new StringBuilder();
@@ -104,8 +107,10 @@ private synchronized void handleNewFactories(Collection<RSocketSupplier> newFact
104107

105108
@Override
106109
public synchronized void accept(RSocketSupplier rSocketSupplier) {
107-
leasedSuppliers.remove(rSocketSupplier);
108-
if (!rSocketSupplier.isDisposed()) {
110+
boolean contained = leasedSuppliers.remove(rSocketSupplier);
111+
if (contained
112+
&& !rSocketSupplier
113+
.isDisposed()) { // only added leasedSupplier back to factoryPool if it's still there
109114
factoryPool.add(rSocketSupplier);
110115
}
111116
}
@@ -119,6 +124,7 @@ public synchronized Optional<RSocketSupplier> get() {
119124
if (rSocketSupplier.availability() > 0.0) {
120125
factoryPool.remove(0);
121126
leasedSuppliers.add(rSocketSupplier);
127+
logger.debug("Added {} to leasedSuppliers", rSocketSupplier);
122128
optional = Optional.of(rSocketSupplier);
123129
}
124130
} else if (poolSize > 1) {
@@ -143,10 +149,12 @@ public synchronized Optional<RSocketSupplier> get() {
143149
if (factory0.availability() > factory1.availability()) {
144150
factoryPool.remove(i0);
145151
leasedSuppliers.add(factory0);
152+
logger.debug("Added {} to leasedSuppliers", factory0);
146153
optional = Optional.of(factory0);
147154
} else {
148155
factoryPool.remove(i1);
149156
leasedSuppliers.add(factory1);
157+
logger.debug("Added {} to leasedSuppliers", factory1);
150158
optional = Optional.of(factory1);
151159
}
152160
}

0 commit comments

Comments
 (0)
0