35
35
import reactor .core .publisher .Mono ;
36
36
37
37
/** Factory for creating RSocket clients and servers. */
38
- public interface RSocketFactory {
38
+ public class RSocketFactory {
39
39
/**
40
40
* Creates a factory that establishes client connections to other RSockets.
41
41
*
42
42
* @return a client factory
43
43
*/
44
- static ClientRSocketFactory connect () {
44
+ public static ClientRSocketFactory connect () {
45
45
return new ClientRSocketFactory ();
46
46
}
47
47
@@ -50,51 +50,51 @@ static ClientRSocketFactory connect() {
50
50
*
51
51
* @return a server factory.
52
52
*/
53
- static ServerRSocketFactory receive () {
53
+ public static ServerRSocketFactory receive () {
54
54
return new ServerRSocketFactory ();
55
55
}
56
56
57
- interface Start <T extends Closeable > {
57
+ public interface Start <T extends Closeable > {
58
58
Mono <T > start ();
59
59
}
60
60
61
- interface SetupPayload <T > {
61
+ public interface SetupPayload <T > {
62
62
T setupPayload (Payload payload );
63
63
}
64
64
65
- interface Transport < T extends io . rsocket . transport . Transport , B extends Closeable > {
66
- Start < B > transport (Supplier <T > t );
65
+ public interface Acceptor < T , A > {
66
+ T acceptor (Supplier <A > acceptor );
67
67
68
- default Start < B > transport ( T t ) {
69
- return transport (() -> t );
68
+ default T acceptor ( A acceptor ) {
69
+ return acceptor (() -> acceptor );
70
70
}
71
71
}
72
72
73
- interface Acceptor < T extends io . rsocket . transport . Transport , A , B extends Closeable > {
74
- Transport < T , B > acceptor (Supplier <A > acceptor );
73
+ public interface ClientTransportAcceptor {
74
+ Start < RSocket > transport (Supplier <ClientTransport > transport );
75
75
76
- default Transport <T , B > acceptor (A acceptor ) {
77
- return acceptor (() -> acceptor );
76
+ default Start <RSocket > transport (ClientTransport transport ) {
77
+ return transport (() -> transport );
78
+ }
79
+ }
80
+
81
+ public interface ServerTransportAcceptor {
82
+ <T extends Closeable > Start <T > transport (Supplier <ServerTransport <T >> transport );
83
+
84
+ default <T extends Closeable > Start <T > transport (ServerTransport <T > transport ) {
85
+ return transport (() -> transport );
78
86
}
79
87
}
80
88
81
- interface Fragmentation <
82
- R extends Acceptor <T , A , B >,
83
- T extends io .rsocket .transport .Transport ,
84
- A ,
85
- B extends Closeable > {
86
- R fragment (int mtu );
89
+ public interface Fragmentation <T > {
90
+ T fragment (int mtu );
87
91
}
88
92
89
- interface ErrorConsumer <
90
- R extends Acceptor <T , A , B >,
91
- T extends io .rsocket .transport .Transport ,
92
- A ,
93
- B extends Closeable > {
94
- R errorConsumer (Consumer <Throwable > errorConsumer );
93
+ public interface ErrorConsumer <T > {
94
+ T errorConsumer (Consumer <Throwable > errorConsumer );
95
95
}
96
96
97
- interface KeepAlive <T > {
97
+ public interface KeepAlive <T > {
98
98
T keepAlive ();
99
99
100
100
T keepAlive (Duration tickPeriod , Duration ackTimeout , int missedAcks );
@@ -106,27 +106,26 @@ interface KeepAlive<T> {
106
106
T keepAliveMissedAcks (int missedAcks );
107
107
}
108
108
109
- interface MimeType <T > {
109
+ public interface MimeType <T > {
110
110
T mimeType (String metadataMimeType , String dataMimeType );
111
111
112
112
T dataMimeType (String dataMimeType );
113
113
114
114
T metadataMimeType (String metadataMimeType );
115
115
}
116
116
117
- class ClientRSocketFactory
118
- implements KeepAlive <ClientRSocketFactory >,
117
+ public static class ClientRSocketFactory
118
+ implements Acceptor <ClientTransportAcceptor , Function <RSocket , RSocket >>,
119
+ ClientTransportAcceptor ,
120
+ KeepAlive <ClientRSocketFactory >,
119
121
MimeType <ClientRSocketFactory >,
120
- Acceptor <ClientTransport , Function <RSocket , RSocket >, RSocket >,
121
- Transport <ClientTransport , RSocket >,
122
- Fragmentation <ClientRSocketFactory , ClientTransport , Function <RSocket , RSocket >, RSocket >,
123
- ErrorConsumer <ClientRSocketFactory , ClientTransport , Function <RSocket , RSocket >, RSocket >,
122
+ Fragmentation <ClientRSocketFactory >,
123
+ ErrorConsumer <ClientRSocketFactory >,
124
124
SetupPayload <ClientRSocketFactory > {
125
125
126
126
private Supplier <Function <RSocket , RSocket >> acceptor =
127
127
() -> rSocket -> new AbstractRSocket () {};
128
128
129
- private Supplier <io .rsocket .transport .ClientTransport > transportClient ;
130
129
private Consumer <Throwable > errorConsumer = Throwable ::printStackTrace ;
131
130
private int mtu = 0 ;
132
131
private PluginRegistry plugins = new PluginRegistry (Plugins .defaultPlugins ());
@@ -209,25 +208,14 @@ public ClientRSocketFactory metadataMimeType(String metadataMimeType) {
209
208
}
210
209
211
210
@ Override
212
- public Start <RSocket > transport (Supplier <io .rsocket .transport .ClientTransport > t ) {
213
- return new ClientTransport ().transport (t );
214
- }
215
-
216
- protected class ClientTransport
217
- implements Transport <io .rsocket .transport .ClientTransport , RSocket > {
218
- @ Override
219
- public Start <RSocket > transport (
220
- Supplier <io .rsocket .transport .ClientTransport > transportClient ) {
221
- ClientRSocketFactory .this .transportClient = transportClient ;
222
- return new StartClient ();
223
- }
211
+ public Start <RSocket > transport (Supplier <ClientTransport > transportClient ) {
212
+ return new StartClient (transportClient );
224
213
}
225
214
226
215
@ Override
227
- public Transport <io .rsocket .transport .ClientTransport , RSocket > acceptor (
228
- Supplier <Function <RSocket , RSocket >> acceptor ) {
216
+ public ClientTransportAcceptor acceptor (Supplier <Function <RSocket , RSocket >> acceptor ) {
229
217
this .acceptor = acceptor ;
230
- return new ClientTransport () ;
218
+ return StartClient :: new ;
231
219
}
232
220
233
221
@ Override
@@ -249,6 +237,12 @@ public ClientRSocketFactory setupPayload(Payload payload) {
249
237
}
250
238
251
239
protected class StartClient implements Start <RSocket > {
240
+ private final Supplier <ClientTransport > transportClient ;
241
+
242
+ StartClient (Supplier <ClientTransport > transportClient ) {
243
+ this .transportClient = transportClient ;
244
+ }
245
+
252
246
@ Override
253
247
public Mono <RSocket > start () {
254
248
return transportClient
@@ -305,13 +299,12 @@ public Mono<RSocket> start() {
305
299
}
306
300
}
307
301
308
- class ServerRSocketFactory
309
- implements Acceptor <ServerTransport , SocketAcceptor , Closeable >,
310
- Fragmentation <ServerRSocketFactory , ServerTransport , SocketAcceptor , Closeable >,
311
- ErrorConsumer <ServerRSocketFactory , ServerTransport , SocketAcceptor , Closeable > {
302
+ public static class ServerRSocketFactory
303
+ implements Acceptor <ServerTransportAcceptor , SocketAcceptor >,
304
+ Fragmentation <ServerRSocketFactory >,
305
+ ErrorConsumer <ServerRSocketFactory > {
312
306
313
307
private Supplier <SocketAcceptor > acceptor ;
314
- private Supplier <io .rsocket .transport .ServerTransport > transportServer ;
315
308
private Consumer <Throwable > errorConsumer = Throwable ::printStackTrace ;
316
309
private int mtu = 0 ;
317
310
private PluginRegistry plugins = new PluginRegistry (Plugins .defaultPlugins ());
@@ -334,10 +327,9 @@ public ServerRSocketFactory addServerPlugin(RSocketInterceptor interceptor) {
334
327
}
335
328
336
329
@ Override
337
- public Transport <io .rsocket .transport .ServerTransport , Closeable > acceptor (
338
- Supplier <SocketAcceptor > acceptor ) {
330
+ public ServerTransportAcceptor acceptor (Supplier <SocketAcceptor > acceptor ) {
339
331
this .acceptor = acceptor ;
340
- return new ServerTransport () ;
332
+ return ServerStart :: new ;
341
333
}
342
334
343
335
@ Override
@@ -352,18 +344,15 @@ public ServerRSocketFactory errorConsumer(Consumer<Throwable> errorConsumer) {
352
344
return this ;
353
345
}
354
346
355
- private class ServerTransport
356
- implements Transport <io .rsocket .transport .ServerTransport , Closeable > {
357
- @ Override
358
- public Start transport (Supplier <io .rsocket .transport .ServerTransport > transportServer ) {
359
- ServerRSocketFactory .this .transportServer = transportServer ;
360
- return new ServerStart ();
347
+ private class ServerStart <T extends Closeable > implements Start <T > {
348
+ private final Supplier <ServerTransport <T >> transportServer ;
349
+
350
+ ServerStart (Supplier <ServerTransport <T >> transportServer ) {
351
+ this .transportServer = transportServer ;
361
352
}
362
- }
363
353
364
- private class ServerStart implements Start {
365
354
@ Override
366
- public Mono <Closeable > start () {
355
+ public Mono <T > start () {
367
356
return transportServer
368
357
.get ()
369
358
.start (
0 commit comments