@@ -279,7 +279,6 @@ private synchronized void addSockets(int numberOfNewSocket) {
279
279
if (optional .isPresent ()) {
280
280
RSocketSupplier supplier = optional .get ();
281
281
WeightedSocket socket = new WeightedSocket (supplier , lowerQuantile , higherQuantile );
282
- activeSockets .add (socket );
283
282
} else {
284
283
10000
break ;
285
284
}
@@ -356,7 +355,8 @@ private synchronized void quickSlowestRS() {
356
355
}
357
356
358
357
if (slowest != null ) {
359
- activeSockets .remove (slowest );
358
+ logger .debug ("Disposing slowest WeightedSocket {}" , slowest );
359
+ slowest .dispose ();
360
360
}
361
361
}
362
362
@@ -461,7 +461,7 @@ public synchronized String toString() {
461
461
462
462
@ Override
463
463
public void dispose () {
464
- synchronized (this ) {;
464
+ synchronized (this ) {
465
465
activeSockets .forEach (WeightedSocket ::dispose );
466
466
activeSockets .clear ();
467
467
onClose .onComplete ();
@@ -573,12 +573,16 @@ private class WeightedSocket extends AbstractRSocket implements LoadBalancerSock
573
573
this .interArrivalTime = new Ewma (1 , TimeUnit .MINUTES , DEFAULT_INITIAL_INTER_ARRIVAL_TIME );
574
574
this .pendingStreams = new AtomicLong ();
575
575
576
+ logger .debug ("Creating WeightedSocket {} from factory {}" , WeightedSocket .this , factory );
577
+
576
578
WeightedSocket .this
577
579
.onClose ()
578
580
.doFinally (
579
581
s -> {
580
582
pool .accept (factory );
581
583
activeSockets .remove (WeightedSocket .this );
584
+ logger .debug (
585
+ "Removed {} from factory {} from activeSockets" , WeightedSocket .this , factory );
582
586
refreshSockets ();
583
587
})
584
588
.subscribe ();
@@ -588,7 +592,11 @@ private class WeightedSocket extends AbstractRSocket implements LoadBalancerSock
588
592
.retryBackoff (weightedSocketRetries , weightedSocketBackOff , weightedSocketMaxBackOff )
589
593
.doOnError (
590
594
throwable -> {
591
- logger .error ("error while connecting {}" , throwable );
595
+ logger .error (
596
+ "error while connecting {} from factory {}" ,
597
+ WeightedSocket .this ,
598
+ factory ,
599
+ throwable );
592
600
WeightedSocket .this .dispose ();
593
601
})
594
602
.subscribe (
@@ -598,7 +606,8 @@ private class WeightedSocket extends AbstractRSocket implements LoadBalancerSock
598
606
.onClose ()
599
607
.doFinally (
600
608
signalType -> {
601
- System .out .println ("RSocket closed" );
609
+ logger .info (
610
+ "RSocket {} from factory {} closed" , WeightedSocket .this , factory );
602
611
WeightedSocket .this .dispose ();
603
612
})
604
613
.subscribe ();
@@ -608,7 +617,7 @@ private class WeightedSocket extends AbstractRSocket implements LoadBalancerSock
608
617
.onClose ()
609
618
.doFinally (
610
619
signalType -> {
611
- System . out . println ("Factory closed" );
620
+ logger . info ("Factory {} closed" , factory );
612
621
rSocket .dispose ();
613
622
})
614
623
.subscribe ();
@@ -618,20 +627,30 @@ private class WeightedSocket extends AbstractRSocket implements LoadBalancerSock
618
627
.onClose ()
619
628
.doFinally (
620
629
signalType -> {
621
- System .out .println ("WeightedSocket closed" );
630
+ logger .info (
631
+ "WeightedSocket {} from factory {} closed" ,
632
+ WeightedSocket .this ,
633
+ factory );
622
634
rSocket .dispose ();
623
635
})
624
636
.subscribe ();
625
637
626
- synchronized (LoadBalancedRSocketMono .this ) {
638
+ /* synchronized (LoadBalancedRSocketMono.this) {
627
639
if (activeSockets.size() >= targetAperture) {
628
640
quickSlowestRS();
629
641
pendingSockets -= 1;
630
642
}
631
- }
632
-
643
+ }*/
633
644
rSocketMono .onNext (rSocket );
634
645
availability = 1.0 ;
646
+ if (!WeightedSocket .this
647
+ .isDisposed ()) { // May be already disposed because of retryBackoff delay
648
+ activeSockets .add (WeightedSocket .this );
649
+ logger .debug (
650
+ "Added WeightedSocket {} from factory {} to activeSockets" ,
651
+ WeightedSocket .this ,
652
+ factory );
653
+ }
635
654
});
636
655
}
637
656
@@ -829,11 +848,11 @@ public long lastTimeUsedMillis() {
829
848
*/
830
849
private class LatencySubscriber <U > implements Subscriber <U > {
831
850
private final Subscriber <U > child ;
832
- private final LoadBalancedRSocketMono . WeightedSocket socket ;
851
+ private final WeightedSocket socket ;
833
852
private final AtomicBoolean done ;
834
853
private long start ;
835
854
836
- LatencySubscriber (Subscriber <U > child , LoadBalancedRSocketMono . WeightedSocket socket ) {
855
+ LatencySubscriber (Subscriber <U > child , WeightedSocket socket ) {
837
856
this .child = child ;
838
857
this .socket = socket ;
839
858
this .done = new AtomicBoolean (false );
@@ -893,9 +912,9 @@ public void onComplete() {
893
912
*/
894
913
private class CountingSubscriber <U > implements Subscriber <U > {
895
914
private final Subscriber <U > child ;
896
- private final LoadBalancedRSocketMono . WeightedSocket socket ;
915
+ private final WeightedSocket socket ;
897
916
898
- CountingSubscriber (Subscriber <U > child , LoadBalancedRSocketMono . WeightedSocket socket ) {
917
+ CountingSubscriber (Subscriber <U > child , WeightedSocket socket ) {
899
918
this .child = child ;
900
919
this .socket = socket ;
901
920
}
@@ -916,8 +935,8 @@ public void onError(Throwable t) {
916
935
socket .pendingStreams .decrementAndGet ();
917
936
child .onError (t );
918
937
if (t instanceof TransportException || t instanceof ClosedChannelException ) {
919
- activeSockets . remove ( socket );
920
- refreshSockets ();
938
+ logger . debug ( "Disposing {} from activeSockets because of error {}" , socket , t );
939
+ socket . dispose ();
921
940
}
922
941
}
923
942
0 commit comments