42
42
import io .rsocket .util .EmptyPayload ;
43
43
import java .time .Duration ;
44
44
import java .util .Objects ;
45
+ import java .util .function .BiFunction ;
45
46
import java .util .function .Consumer ;
46
47
import java .util .function .Function ;
47
48
import java .util .function .Supplier ;
@@ -94,6 +95,8 @@ public static class ClientRSocketFactory implements ClientTransportAcceptor {
94
95
private Supplier <Function <RSocket , RSocket >> acceptor =
95
96
() -> rSocket -> new AbstractRSocket () {};
96
97
98
+ private BiFunction <ConnectionSetupPayload , RSocket , RSocket > biAcceptor ;
99
+
97
100
private Consumer <Throwable > errorConsumer = Throwable ::printStackTrace ;
98
101
private int mtu = 0 ;
99
102
private PluginRegistry plugins = new PluginRegistry (Plugins .defaultPlugins ());
@@ -242,6 +245,12 @@ public ClientTransportAcceptor acceptor(Supplier<Function<RSocket, RSocket>> acc
242
245
return StartClient ::new ;
243
246
}
244
247
248
+ public ClientTransportAcceptor acceptor (
249
+ BiFunction <ConnectionSetupPayload , RSocket , RSocket > biAcceptor ) {
250
+ this .biAcceptor = biAcceptor ;
251
+ return StartClient ::new ;
252
+ }
253
+
245
254
public ClientRSocketFactory fragment (int mtu ) {
246
255
this .mtu = mtu ;
247
256
return this ;
@@ -293,9 +302,27 @@ public Mono<RSocket> start() {
293
302
keepAliveTimeout (),
294
303
keepAliveHandler );
295
304
305
+ ByteBuf setupFrame =
306
+ SetupFrameFlyweight .encode (
307
+ allocator ,
308
+ false ,
309
+ keepAliveTickPeriod (),
310
+ keepAliveTimeout (),
311
+ resumeToken ,
312
+ metadataMimeType ,
313
+ dataMimeType ,
314
+ setupPayload .sliceMetadata (),
315
+ setupPayload .sliceData ());
316
+
296
317
RSocket wrappedRSocketClient = plugins .applyClient (rSocketClient );
297
318
298
- RSocket unwrappedServerSocket = acceptor .get ().apply (wrappedRSocketClient );
319
+ RSocket unwrappedServerSocket ;
320
+ if (biAcceptor != null ) {
321
+ ConnectionSetupPayload setup = ConnectionSetupPayload .create (setupFrame );
322
+ unwrappedServerSocket = biAcceptor .apply (setup , wrappedRSocketClient );
323
+ } else {
324
+ unwrappedServerSocket = acceptor .get ().apply (wrappedRSocketClient );
325
+ }
299
326
300
327
RSocket wrappedRSocketServer = plugins .applyServer (unwrappedServerSocket );
301
328
@@ -307,18 +334,6 @@ public Mono<RSocket> start() {
307
334
payloadDecoder ,
308
335
errorConsumer );
309
336
310
- ByteBuf setupFrame =
311
- SetupFrameFlyweight .encode (
312
- allocator ,
313
- false ,
314
- keepAliveTickPeriod (),
315
- keepAliveTimeout (),
316
- resumeToken ,
317
- metadataMimeType ,
318
- dataMimeType ,
319
- setupPayload .sliceMetadata (),
320
- setupPayload .sliceData ());
321
-
322
337
return wrappedConnection .sendOne (setupFrame ).thenReturn (wrappedRSocketClient );
323
338
});
324
339
}
0 commit comments