8000 experiments · FTTF-git/rsocket-java@3e8e745 · GitHub
[go: up one dir, main page]

Skip to content

Commit 3e8e745

Browse files
committed
experiments
Signed-off-by: Oleh Dokuka <shadowgun@i.ua>
1 parent e004512 commit 3e8e745

File tree

2 files changed

+8
-25
lines changed

2 files changed

+8
-25
lines changed

rsocket-core/src/main/java/io/rsocket/RSocketServer.java

Lines changed: 4 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -556,10 +556,10 @@ private void handleCancelFrame(int streamId) {
556556

557557
if (limitableSubscription != null) {
558558
limitableSubscription.cancel();
559-
long requested = limitableSubscription.getInternalRequested();
560-
if (requested > 0) {
561-
shareRequest(requested, sendingLimitableSubscriptions);
562-
}
559+
// long requested = limitableSubscription.getInternalRequested();
560+
// if (requested > 0) {
561+
// shareRequest(requested, sendingLimitableSubscriptions);
562+
// }
563563
}
564564
} else {
565565
subscription.cancel();

rsocket-core/src/main/java/io/rsocket/util/BackpressureUtils.java

Lines changed: 4 additions & 21 deletions
Original file line numberDiff line numberDiff line change
@@ -2,41 +2,24 @@
22

33
import io.rsocket.internal.LimitableRequestPublisher;
44
import io.rsocket.internal.SynchronizedIntObjectHashMap;
5-
import java.util.concurrent.ThreadLocalRandom;
65

76
public class BackpressureUtils {
87

98
public static void shareRequest(
109
long requested,
1110
SynchronizedIntObjectHashMap<LimitableRequestPublisher> limitableSubscriptions) {
1211
try {
13-
1412
if (limitableSubscriptions.isEmpty()) {
1513
return;
1614
}
17-
ThreadLocalRandom threadLocalRandom = ThreadLocalRandom.current();
1815
Object[] activeSubscriptions;
19-
int size;
20-
synchronized (limitableSubscriptions) {
21-
activeSubscriptions = limitableSubscriptions.getValuesCopy();
22-
size = limitableSubscriptions.size();
23-
}
16+
activeSubscriptions = limitableSubscriptions.getValuesCopy();
2417
int length = activeSubscriptions.length;
25-
int randomStartIndex = threadLocalRandom.nextInt(0, size);
26-
long requestPerItem = requested / size;
2718

28-
requestPerItem = requestPerItem == 0 ? 1L : requestPerItem;
29-
30-
for (int i = 0; i < length && requested >= 0; i++) {
31-
LimitableRequestPublisher lrp =
32-
(LimitableRequestPublisher) activeSubscriptions[randomStartIndex];
19+
for (int i = 0; i < length; i++) {
20+
LimitableRequestPublisher lrp = (LimitableRequestPublisher) activeSubscriptions[i];
3321
if (lrp != null) {
34-
lrp.increaseInternalLimit(requestPerItem);
35-
requested -= requestPerItem;
36-
}
37-
randomStartIndex++;
38-
if (randomStartIndex == length) {
39-
randomStartIndex = 0;
22+
lrp.increaseInternalLimit(requested);
4023
}
4124
}
4225
} catch (Throwable e) {

0 commit comments

Comments
 (0)
0