8000 partial · FTTF-git/rsocket-java@fce6fe6 · GitHub
[go: up one dir, main page]

Skip to content

Commit fce6fe6

Browse files
committed
partial
Signed-off-by: Oleh Dokuka <shadowgun@i.ua>
1 parent f1ac884 commit fce6fe6

File tree

6 files changed

+43
-31
lines changed

6 files changed

+43
-31
lines changed

rsocket-core/src/main/java/io/rsocket/RSocketClient.java

Lines changed: 9 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -331,7 +331,7 @@ public void accept(long n) {
331331
f,
332332
sendProcessor.available() == Long.MAX_VALUE
333333
? Integer.MAX_VALUE
334-
: Queues.XS_BUFFER_SIZE);
334+
: Queues.SMALL_BUFFER_SIZE);
335335
// Need to set this to one for first the frame
336336
wrapped.request(1);
337337
senders.put(streamId, wrapped);
@@ -417,6 +417,10 @@ protected void hookOnError(Throwable t) {
417417
if (sender != null) {
418418
sendersAsASet.remove(sender);
419419
sender.cancel();
420+
long requested = sender.getInternalRequested();
421+
if (requested > 0) {
422+
shareRequest(requested, sendersAsASet);
423+
}
420424
}
421425
});
422426
});
@@ -524,6 +528,10 @@ private void handleFrame(int streamId, FrameType type, ByteBuf frame) {
524528
if (sender != null) {
525529
sendersAsASet.remove(sender);
526530
sender.cancel();
531+
long requested = sender.getInternalRequested();
532+
if (requested > 0) {
533+
shareRequest(requested, sendersAsASet);
534+
}
527535
}
528536
break;
529537
}

rsocket-core/src/main/java/io/rsocket/RSocketServer.java

Lines changed: 10 additions & 13 deletions
Original file line numberDiff line numberDiff line change
@@ -445,7 +445,7 @@ private void handleStream(int streamId, Flux<Payload> response, int initialReque
445445
frameFlux,
446446
sendProcessor.available() == Long.MAX_VALUE
447447
? Integer.MAX_VALUE
448-
: Queues.XS_BUFFER_SIZE);
448+
: Queues.SMALL_BUFFER_SIZE);
449449
sendingSubscriptions.put(streamId, payloads);
450450
sendingLimitableSubscriptions.add(payloads);
451451
payloads.request(
@@ -485,11 +485,10 @@ protected void hookFinally(SignalType type) {
485485
LimitableRequestPublisher subscription =
486486
(LimitableRequestPublisher) sendingSubscriptions.remove(streamId);
487487
sendingLimitableSubscriptions.remove(subscription);
488-
489-
// long requested = subscription.getInternalRequested();
490-
// if (requested > 0) {
491-
// shareRequest(requested, sendingLimitableSubscriptions);
492-
// }
488+
long requested = subscription.getInternalRequested();
489+
if (requested > 0) {
490+
shareRequest(requested, sendingLimitableSubscriptions);
491+
}
493492
}
494493
});
495494
}
@@ -534,13 +533,11 @@ private void handleCancelFrame(int streamId) {
534533
if (subscription instanceof LimitableRequestPublisher) {
535534
sendingLimitableSubscriptions.remove(subscription);
536535

537-
// if (limitableSubscription != null) {
538-
// limitableSubscription.cancel();
539-
// // long requested = limitableSubscription.getInternalRequested();
540-
// // if (requested > 0) {
541-
// // shareRequest(requested, sendingLimitableSubscriptions);
542-
// // }
543-
// }
536+
long requested =
537+
((LimitableRequestPublisher) subscription).getInternalRequested();
538+
if (requested > 0) {
539+
shareRequest(requested, sendingLimitableSubscriptions);
540+
}
544541
}
545542
}
546543
}

rsocket-core/src/main/java/io/rsocket/internal/LimitableRequestPublisher.java

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -52,7 +52,7 @@ public class LimitableRequestPublisher<T> extends Flux<T> implements Subscriptio
5252
private LimitableRequestPublisher(Publisher<T> source, int prefetch) {
5353
this.source = source;
5454
this.prefetch = prefetch;
55-
this.limit = prefetch / 2 + 1;
55+
this.limit = prefetch >> 2;
5656
}
5757

5858
public static <T> LimitableRequestPublisher<T> wrap(Publisher<T> source, int prefetch) {
@@ -110,7 +110,7 @@ public void internalRequest(long n) {
110110
internalRequested = r;
111111
}
112112

113-
if (p == 0 && r >= limit) {
113+
if (r >= limit) {
114114
requestN();
115115
}
116116
}

rsocket-core/src/main/java/io/rsocket/internal/SynchronizedObjectHashSet.java

Lines changed: 8 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -201,7 +201,8 @@ private void rehash(final int newCapacity) {
201201
final T[] tempValues = (T[]) new Object[newCapacity];
202202
Arrays.fill(tempValues, MISSING_VALUE);
203203

204-
for (int i = 0; i < size; i++) {
204+
int length = values.length();
205+
for (int i = 0; i < length; i++) {
205206
T value = values.get(i);
206207
if (value != MISSING_VALUE) {
207208
int newHash = value.hashCode() & mask;
@@ -309,7 +310,8 @@ public synchronized boolean isEmpty() {
309310
@SuppressWarnings("unchecked")
310311
public synchronized void clear() {
311312
if (size > 0) {
312-
for (int i = 0; i < size; i++) {
313+
int lenght = values.length();
314+
for (int i = 0; i < lenght; i++) {
313315
values.set(i, (T) MISSING_VALUE);
314316
}
315317
size = 0;
@@ -424,7 +426,8 @@ public synchronized String toString() {
424426
final StringBuilder sb = new StringBuilder();
425427
sb.append('{');
426428

427-
for (int i = 0; i < size; i++) {
429+
int length = values.length();
430+
for (int i = 0; i < length; i++) {
428431
T value = values.get(i);
429432
if (value != MISSING_VALUE) {
430433
sb.append(value).append(", ");
@@ -476,7 +479,8 @@ public boolean equals(final Object other) {
476479
public synchronized int hashCode() {
477480
int hashCode = 0;
478481

479-
for (int i = 0; i < size; i++) {
482+
int length = values.length();
483+
for (int i = 0; i < length; i++) {
480484
T value = values.get(i);
481485
if (value != MISSING_VALUE) {
482486
hashCode += value.hashCode();

rsocket-core/src/main/java/io/rsocket/util/BackpressureUtils.java

Lines changed: 12 additions & 9 deletions
Original file line numberDiff line numberDiff line change
@@ -27,29 +27,32 @@ public static void shareRequest(
2727
}
2828
} else {
2929

30-
long prefetch = requested > length ? requested / length : 1;
31-
3230
int i = ThreadLocalRandom.current().nextInt(0, length);
3331
int count = 0;
32+
boolean any = false;
33+
34+
long minimumPrefetch = requested > length ? requested / length : 1;
3435

35-
while (requested <= 0) {
36+
while (requested > 0) {
3637
LimitableRequestPublisher subscription = values.get(i);
3738

3839
if (subscription != null) {
40+
any = true;
3941
if ((subscription.getExternalRequested() != 0
40-
&& subscription.getExternalRequested() <= subscription.getLimit())
42+
&& subscription.getInternalRequested() == 0)
4143
|| count >= length) {
42-
subscription.internalRequest(prefetch);
44+
int prefetch = subscription.getLimit();
45+
subscription.internalRequest(minimumPrefetch > prefetch ? minimumPrefetch : prefetch);
4346
requested -= prefetch;
44-
45-
if (requested < prefetch) {
46-
prefetch = requested;
47-
}
4847
}
4948
}
5049

5150
count++;
5251
i = ++i % length;
52+
53+
if (count >= length && !any) {
54+
return;
55+
}
5356
}
5457
}
5558

rsocket-core/src/test/java/io/rsocket/internal/LimitableRequestPublisherTest.java

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -15,10 +15,10 @@ class LimitableRequestPublisherTest {
1515
public void requestLimitRacingTest() throws InterruptedException {
1616
Queue<Long> requests = new ArrayDeque<>(10000);
1717
LimitableRequestPublisher<Object> limitableRequestPublisher =
18-
LimitableRequestPublisher.wrap(DirectProcessor.create().doOnRequest(requests::add), 0);
18+
LimitableRequestPublisher.wrap(DirectProcessor.create().doOnRequest(requests::add), 32);
1919

2020
Runnable request1 = () -> limitableRequestPublisher.request(1);
21-
Runnable request2 = () -> limitableRequestPublisher.increaseInternalLimit(2);
21+
Runnable request2 = () -> limitableRequestPublisher.internalRequest(2);
2222

2323
limitableRequestPublisher.subscribe();
2424

0 commit comments

Comments
 (0)
0