|
2 | 2 |
|
3 | 3 | import io.rsocket.internal.LimitableRequestPublisher;
|
4 | 4 | import io.rsocket.internal.SynchronizedIntObjectHashMap;
|
5 |
| -import java.util.concurrent.ThreadLocalRandom; |
6 | 5 |
|
7 | 6 | public class BackpressureUtils {
|
8 | 7 |
|
9 | 8 | public static void shareRequest(
|
10 | 9 | long requested,
|
11 | 10 | SynchronizedIntObjectHashMap<LimitableRequestPublisher> limitableSubscriptions) {
|
12 | 11 | try {
|
13 |
| - |
14 | 12 | if (limitableSubscriptions.isEmpty()) {
|
15 | 13 | return;
|
16 | 14 | }
|
17 |
| - ThreadLocalRandom threadLocalRandom = ThreadLocalRandom.current(); |
18 | 15 | Object[] activeSubscriptions;
|
19 |
| - int size; |
20 |
| - synchronized (limitableSubscriptions) { |
21 |
| - activeSubscriptions = limitableSubscriptions.getValuesCopy(); |
22 |
| - size = limitableSubscriptions.size(); |
23 |
| - } |
| 16 | + activeSubscriptions = limitableSubscriptions.getValuesCopy(); |
24 | 17 | int length = activeSubscriptions.length;
|
25 |
| - int randomStartIndex = threadLocalRandom.nextInt(0, size); |
26 |
| - long requestPerItem = requested / size; |
27 | 18 |
|
28 |
| - requestPerItem = requestPerItem == 0 ? 1L : requestPerItem; |
29 |
| - |
30 |
| - for (int i = 0; i < length && requested >= 0; i++) { |
31 |
| - LimitableRequestPublisher lrp = |
32 |
| - (LimitableRequestPublisher) activeSubscriptions[randomStartIndex]; |
| 19 | + for (int i = 0; i < length; i++) { |
| 20 | + LimitableRequestPublisher lrp = (LimitableRequestPublisher) activeSubscriptions[i]; |
33 | 21 | if (lrp != null) {
|
34 |
| - lrp.increaseInternalLimit(requestPerItem); |
35 |
| - requested -= requestPerItem; |
36 |
| - } |
37 |
| - randomStartIndex++; |
38 |
| - if (randomStartIndex == length) { |
39 |
| - randomStartIndex = 0; |
| 22 | + lrp.increaseInternalLimit(requested); |
40 | 23 | }
|
41 | 24 | }
|
42 | 25 | } catch (Throwable e) {
|
|
0 commit comments