8000 Resume cleanup round (#624) · FTTF-git/rsocket-java@5101748 · GitHub
[go: up one dir, main page]

Skip to content

Commit 5101748

Browse files
mostroverkhovrobertroeser
authored andcommitted
Resume cleanup round (rsocket#624)
* add ResumableKeepAlive tests Signed-off-by: Maksym Ostroverkhov <m.ostroverkhov@gmail.com> * resume in-memory store: switch to Flux.generate Signed-off-by: Maksym Ostroverkhov <m.ostroverkhov@gmail.com> * Revert "fix issue when long running channels eventually hang under load" This reverts commit 7c87f6c Signed-off-by: Maksym Ostroverkhov <m.ostroverkhov@gmail.com> * fix issue when long running channels eventually hang. Replace rsocket#618 as It caused memory leak in event of resumption Signed-off-by: Maksym Ostroverkhov <m.ostroverkhov@gmail.com>
1 parent 1b87e1f commit 5101748

File tree

11 files changed

+297
-118
lines changed

11 files changed

+297
-118
lines changed

rsocket-core/src/main/java/io/rsocket/RSocketClient.java

Lines changed: 0 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -433,9 +433,6 @@ private boolean contains(int streamId) {
433433
protected void terminate() {
434434
lifecycle.setTerminationError(new ClosedChannelException());
435435

436-
if (keepAliveFramesAcceptor != null) {
437-
keepAliveFramesAcceptor.dispose();
438-
}
439436
try {
440437
receivers.values().forEach(this::cleanUpSubscriber);
441438
senders.values().forEach(this::cleanUpLimitableRequestPublisher);

rsocket-core/src/main/java/io/rsocket/RSocketServer.java

Lines changed: 0 additions & 3 deletions
< 8000 /tbody>
Original file line numberDiff line numberDiff line change
@@ -280,9 +280,6 @@ public Mono<Void> onClose() {
280280
}
281281

282282
private void cleanup() {
283-
if (keepAliveFramesAcceptor != null) {
284-
keepAliveFramesAcceptor.dispose();
285-
}
286283
cleanUpSendingSubscriptions();
287284
cleanUpChannelProcessors();
288285

rsocket-core/src/main/java/io/rsocket/internal/ClientSetup.java

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -53,7 +53,7 @@ public DuplexConnection connection() {
5353

5454
@Override
5555
public KeepAliveHandler keepAliveHandler() {
56-
return new DefaultKeepAliveHandler();
56+
return new DefaultKeepAliveHandler(connection);
5757
}
5858

5959
@Override

rsocket-core/src/main/java/io/rsocket/internal/ServerSetup.java

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -64,7 +64,7 @@ public Mono<Void> acceptRSocketSetup(
6464
multiplexer.dispose();
6565
});
6666
} else {
67-
return then.apply(new DefaultKeepAliveHandler(), multiplexer);
67+
return then.apply(new DefaultKeepAliveHandler(multiplexer), multiplexer);
6868
}
6969
}
7070

@@ -132,7 +132,7 @@ public Mono<Void> acceptRSocketSetup(
132132
new ResumableKeepAliveHandler(connection),
133133
new ClientServerInputMultiplexer(connection));
134134
} else {
135-
return then.apply(new DefaultKeepAliveHandler(), multiplexer);
135+
return then.apply(new DefaultKeepAliveHandler(multiplexer), multiplexer);
136136
}
137137
}
138138

rsocket-core/src/main/java/io/rsocket/keepalive/KeepAliveFramesAcceptor.java

Lines changed: 0 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -5,6 +5,4 @@
55
public interface KeepAliveFramesAcceptor {
66

77
void receive(ByteBuf keepAliveFrame);
8-
9-
void dispose();
108
}

rsocket-core/src/main/java/io/rsocket/keepalive/KeepAliveHandler.java

Lines changed: 9 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -1,6 +1,7 @@
11
package io.rsocket.keepalive;
22

33
import io.netty.buffer.ByteBuf;
4+
import io.rsocket.Closeable;
45
import io.rsocket.keepalive.KeepAliveSupport.KeepAlive;
56
import io.rsocket.resume.ResumableDuplexConnection;
67
import java.util.function.Consumer;
@@ -13,12 +14,18 @@ KeepAliveFramesAcceptor start(
1314
Consumer<KeepAlive> onTimeout);
1415

1516
class DefaultKeepAliveHandler implements KeepAliveHandler {
17+
private final Closeable duplexConnection;
18+
19+
public DefaultKeepAliveHandler(Closeable duplexConnection) {
20+
this.duplexConnection = duplexConnection;
21+
}
1622

1723
@Override
1824
public KeepAliveFramesAcceptor start(
1925
KeepAliveSupport keepAliveSupport,
2026
Consumer<ByteBuf> onSendKeepAliveFrame,
2127
Consumer<KeepAlive> onTimeout) {
28+
duplexConnection.onClose().subscribe(v -> keepAliveSupport.stop());
2229
return keepAliveSupport
2330
.onSendKeepAliveFrame(onSendKeepAliveFrame)
2431
.onTimeout(onTimeout)
@@ -38,7 +45,8 @@ public KeepAliveFramesAcceptor start(
3845
KeepAliveSupport keepAliveSupport,
3946
Consumer<ByteBuf> onSendKeepAliveFrame,
4047
Consumer<KeepAlive> onTimeout) {
41-
resumableDuplexConnection.onResumed(keepAliveSupport::start);
48+
resumableDuplexConnection.onResume(keepAliveSupport::start);
49+
resumableDuplexConnection.onDisconnect(keepAliveSupport::stop);
4250
return keepAliveSupport
4351
.resumeState(resumableDuplexConnection)
4452
.onSendKeepAliveFrame(onSendKeepAliveFrame)

rsocket-core/src/main/java/io/rsocket/keepalive/KeepAliveSupport.java

Lines changed: 9 additions & 18 deletions
Original file line numberDiff line numberDiff line change
@@ -27,7 +27,7 @@
2727
import reactor.core.Disposable;
2828
import reactor.core.publisher.Flux;
2929

30-
public abstract class KeepAliveSupport implements Disposable, KeepAliveFramesAcceptor {
30+
public abstract class KeepAliveSupport implements KeepAliveFramesAcceptor {
3131
final ByteBufAllocator allocator;
3232
private final Duration keepAliveInterval;
3333
private final Duration keepAliveTimeout;
@@ -50,13 +50,16 @@ private KeepAliveSupport(
5050

5151
public KeepAliveSupport start() {
5252
this.lastReceivedMillis = System.currentTimeMillis();
53-
startTicks();
53+
if (started.compareAndSet(false, true)) {
54+
ticksDisposable = Flux.interval(keepAliveInterval).subscribe(v -> onIntervalTick());
55+
}
5456
return this;
5557
}
5658

57-
@Override
58-
public void dispose() {
59-
stopTicks();
59+
public void stop() {
60+
if (started.compareAndSet(true, false)) {
61+
ticksDisposable.dispose();
62+
}
6063
}
6164

6265
@Override
@@ -106,7 +109,7 @@ void tryTimeout() {
106109
if (onTimeout != null) {
107110
onTimeout.accept(new KeepAlive(keepAliveInterval, keepAliveTimeout));
108111
}
109-
stopTicks();
112+
stop();
110113
}
111114
}
112115

@@ -118,18 +121,6 @@ long remoteLastReceivedPosition(ByteBuf keepAliveFrame) {
118121
return KeepAliveFrameFlyweight.lastPosition(keepAliveFrame);
119122
}
120123

121-
void startTicks() {
122-
if (started.compareAndSet(false, true)) {
123-
ticksDisposable = Flux.interval(keepAliveInterval).subscribe(v -> onIntervalTick());
124-
}
125-
}
126-
127-
void stopTicks() {
128-
if (started.compareAndSet(true, false)) {
129-
ticksDisposable.dispose();
130-
}
131-
}
132-
133124
public static final class ServerKeepAliveSupport extends KeepAliveSupport {
134125

135126
public ServerKeepAliveSupport(

rsocket-core/src/main/java/io/rsocket/resume/InMemoryResumableFramesStore.java

Lines changed: 37 additions & 15 deletions
Original file line numberDiff line numberDiff line change
@@ -84,24 +84,22 @@ public void releaseFrames(long remoteImpliedPos) {
8484

8585
@Override
8686
public Flux<ByteBuf> resumeStream() {
87-
return Flux.create(
88-
s -> {
89-
int size = cachedFrames.size();
90-
int refCnt = upstreamFrameRefCnt;
91-
logger.debug("{} Resuming stream size: {}", tag, size);
92-
/*spsc queue has no iterator - iterating by consuming*/
93-
for (int i = 0; i < size; i++) {
87+
return Flux.generate(
88+
() -> new ResumeStreamState(cachedFrames.size(), upstreamFrameRefCnt),
89+
(state, sink) -> {
90+
if (state.next()) {
91+
/*spsc queue has no iterator - iterating by consuming*/
9492
ByteBuf frame = cachedFrames.poll();
95-
/*in the event of connection termination some frames
96-
* are not released on DuplexConnection*/
97-
if (frame.refCnt() == refCnt) {
93+
if (state.shouldRetain(frame)) {
9894
frame.retain();
9995
}
10096
cachedFrames.offer(frame);
101-
s.next(frame);
97+
sink.next(frame);
98+
} else {
99+
sink.complete();
100+
logger.debug("{} Resuming stream completed", tag);
102101
}
103-
s.complete();
104-
logger.debug("{} Resuming stream completed", tag);
102+
return state;
105103
});
106104
}
107105

@@ -177,11 +175,35 @@ void saveFrame(ByteBuf frame) {
177175
}
178176
}
179177

180-
private static Queue<ByteBuf> cachedFramesQueue(int size) {
178+
static class ResumeStreamState {
179+
private final int cacheSize;
180+
private final int expectedRefCnt;
181+
private int cacheCounter;
182+
183+
public ResumeStreamState(int cacheSize, int expectedRefCnt) {
184+
this.cacheSize = cacheSize;
185+
this.expectedRefCnt = expectedRefCnt;
186+
}
187+
188+
public boolean next() {
189+
if (cacheCounter < cacheSize) {
190+
cacheCounter++;
191+
return true;
192+
} else {
193+
return false;
194+
}
195+
}
196+
197+
public boolean shouldRetain(ByteBuf frame) {
198+
return frame.refCnt() == expectedRefCnt;
199+
}
200+
}
201+
202+
static Queue<ByteBuf> cachedFramesQueue(int size) {
181203
return Queues.<ByteBuf>get(size).get();
182204
}
183205

184-
private class FramesSubscriber implements Subscriber<ByteBuf> {
206+
class FramesSubscriber implements Subscriber<ByteBuf> {
185207
private final long firstRequestSize;
186208
private final long refillSize;
187209
private int received;

rsocket-core/src/main/java/io/rsocket/resume/ResumableDuplexConnection.java

Lines changed: 23 additions & 10 deletions
Original file line numberDiff line numberDiff line change
@@ -36,6 +36,7 @@
3636

3737
public class ResumableDuplexConnection implements DuplexConnection, ResumeStateHolder {
3838
private static final Logger logger = LoggerFactory.getLogger(ResumableDuplexConnection.class);
39+
private static final Throwable closedChannelException = new ClosedChannelException();
3940

4041
private final String tag;
4142
private final ResumableFramesStore resumableFramesStore;
@@ -64,11 +65,12 @@ public class ResumableDuplexConnection implements DuplexConnection, ResumeStateH
6465
resumeSaveStreamRequestListener.requests(),
6566
this::dispatch);
6667

67-
private volatile Runnable onResumedAction;
68+
private volatile Runnable onResume;
69+
private volatile Runnable onDisconnect;
6870
private volatile int state;
6971
private volatile Disposable resumedStreamDisposable = Disposables.disposed();
7072

71-
ResumableDuplexConnection(
73+
public ResumableDuplexConnection(
7274
String tag,
7375
DuplexConnection duplexConnection,
7476
ResumableFramesStore resumableFramesStore,
@@ -110,8 +112,12 @@ public void disconnect() {
110112
}
111113
}
112114

113-
public void onResumed(Runnable onResumedAction) {
114-
this.onResumedAction = onResumedAction;
115+
public void onDisconnect(Runnable onDisconnectAction) {
116+
this.onDisconnect = onDisconnectAction;
117+
}
118+
119+
public void onResume(Runnable onResumeAction) {
120+
this.onResume = onResumeAction;
115121
}
116122

117123
/*reconnected by session after error. After this downstream can receive frames,
@@ -284,9 +290,9 @@ private void doResume(
284290
.apply(impliedPositionOrError)
285291
.doOnSuccess(
286292
v -> {
287-
Runnable a = this.onResumedAction;
288-
if (a != null) {
289-
a.run();
293+
Runnable r = this.onResume;
294+
if (r != null) {
295+
r.run();
290296
}
291297
})
292298
.then(
@@ -336,10 +342,17 @@ private void onNewConnection(DuplexConnection connection) {
336342
private void disconnect(DuplexConnection connection) {
337343
/*do not report late disconnects on old connection if new one is available*/
338344
if (curConnection == connection && state != State.DISCONNECTED) {
339-
Throwable err = new ClosedChannelException();
345+
connection.dispose();
340346
state = State.DISCONNECTED;
341-
logger.debug("{} Inner connection disconnected: {}", tag, err.getClass().getSimpleName());
342-
connectionErrors.onNext(err);
347+
logger.debug(
348+
"{} Inner connection disconnected: {}",
349+
tag,
350+
closedChannelException.getClass().getSimpleName());
351+
connectionErrors.onNext(closedChannelException);
352+
Runnable r = this.onDisconnect;
353+
if (r != null) {
354+
r.run();
355+
}
343356
}
344357
}
345358

0 commit comments

Comments
 (0)
0