8000 Client-side acceptor access to ConnectionSetupPayload (#636) · FTTF-git/rsocket-java@9065434 · GitHub 8000
[go: up one dir, main page]

Skip to content

Commit 9065434

Browse files
rstoyanchevrobertroeser
authored andcommitted
Client-side acceptor access to ConnectionSetupPayload (rsocket#636)
A client acceptor, especially if declared as a top-level class and a singleton can also benefit from access to the ConnectionSetupPayload. Signed-off-by: Rossen Stoyanchev <rstoyanchev@pivotal.io>
1 parent 71915d4 commit 9065434

File tree

1 file changed

+28
-13
lines changed

1 file changed

+28
-13
lines changed

rsocket-core/src/main/java/io/rsocket/RSocketFactory.java

Lines changed: 28 additions & 13 deletions
Original file line numberDiff line numberDiff line change
@@ -42,6 +42,7 @@
4242
import io.rsocket.util.EmptyPayload;
4343
import java.time.Duration;
4444
import java.util.Objects;
45+
import java.util.function.BiFunction;
4546
import java.util.function.Consumer;
4647
import java.util.function.Function;
4748
import java.util.function.Supplier;
@@ -94,6 +95,8 @@ public static class ClientRSocketFactory implements ClientTransportAcceptor {
9495
private Supplier<Function<RSocket, RSocket>> acceptor =
9596
() -> rSocket -> new AbstractRSocket() {};
9697

98+
private BiFunction<ConnectionSetupPayload, RSocket, RSocket> biAcceptor;
99+
97100
private Consumer<Throwable> errorConsumer = Throwable::printStackTrace;
98101
private int mtu = 0;
99102
private PluginRegistry plugins = new PluginRegistry(Plugins.defaultPlugins());
@@ -242,6 +245,12 @@ public ClientTransportAcceptor acceptor(Supplier<Function<RSocket, RSocket>> acc
242245
return StartClient::new;
243246
}
244247

248+
public ClientTransportAcceptor acceptor(
249+
BiFunction<ConnectionSetupPayload, RSocket, RSocket> biAcceptor) {
250+
this.biAcceptor = biAcceptor;
251+
return StartClient::new;
252+
}
253+
245254
public ClientRSocketFactory fragment(int mtu) {
246255
this.mtu = mtu;
247256
return this;
@@ -293,9 +302,27 @@ public Mono<RSocket> start() {
293302
keepAliveTimeout(),
294303
keepAliveHandler);
295304

305+
ByteBuf setupFrame =
306+
SetupFrameFlyweight.encode(
307+
allocator,
308+
false,
309+
keepAliveTickPeriod(),
310+
keepAliveTimeout(),
311+
resumeToken,
312+
metadataMimeType,
313+
dataMimeType,
314+
setupPayload.sliceMetadata(),
315+
setupPayload.sliceData());
316+
296317
RSocket wrappedRSocketClient = plugins.applyClient(rSocketClient);
297318

298-
RSocket unwrappedServerSocket = acceptor.get().apply(wrappedRSocketClient);
319+
RSocket unwrappedServerSocket;
320+
if (biAcceptor != null) {
321+
ConnectionSetupPayload setup = ConnectionSetupPayload.create(setupFrame);
322+
unwrappedServerSocket = biAcceptor.apply(setup, wrappedRSocketClient);
323+
} else {
324+
unwrappedServerSocket = acceptor.get().apply(wrappedRSocketClient);
325+
}
299326

300327
RSocket wrappedRSocketServer = plugins.applyServer(unwrappedServerSocket);
301328

@@ -307,18 +334,6 @@ public Mono<RSocket> start() {
307334
payloadDecoder,
308335
errorConsumer);
309336

310-
ByteBuf setupFrame =
311-
SetupFrameFlyweight.encode(
312-
allocator,
313-
false,
314-
keepAliveTickPeriod(),
315-
keepAliveTimeout(),
316-
resumeToken,
317-
metadataMimeType,
318-
dataMimeType,
319-
setupPayload.sliceMetadata(),
320-
setupPayload.sliceData());
321-
322337
return wrappedConnection.sendOne(setupFrame).thenReturn(wrappedRSocketClient);
323338
});
324339
}

0 commit comments

Comments
 (0)
0