8000 Fix types in RSocketFactory (#381) · chakra-coder/rsocket-java@6debbb2 · GitHub
[go: up one dir, main page]

Skip to content

Commit 6debbb2

Browse files
Ryland Degnanyschimke
authored andcommitted
Fix types in RSocketFactory (rsocket#381)
1 parent ab6a37f commit 6debbb2

File tree

4 files changed

+57
-76
lines changed

4 files changed

+57
-76
lines changed

rsocket-core/src/main/java/io/rsocket/RSocketFactory.java

Lines changed: 55 additions & 66 deletions
Original file line numberDiff line numberDiff line change
@@ -35,13 +35,13 @@
3535
import reactor.core.publisher.Mono;
3636

3737
/** Factory for creating RSocket clients and servers. */
38-
public interface RSocketFactory {
38+
public class RSocketFactory {
3939
/**
4040
* Creates a factory that establishes client connections to other RSockets.
4141
*
4242
* @return a client factory
4343
*/
44-
static ClientRSocketFactory connect() {
44+
public static ClientRSocketFactory connect() {
4545
return new ClientRSocketFactory();
4646
}
4747

@@ -50,51 +50,51 @@ static ClientRSocketFactory connect() {
5050
*
5151
* @return a server factory.
5252
*/
53-
static ServerRSocketFactory receive() {
53+
public static ServerRSocketFactory receive() {
5454
return new ServerRSocketFactory();
5555
}
5656

57-
interface Start<T extends Closeable> {
57+
public interface Start<T extends Closeable> {
5858
Mono<T> start();
5959
}
6060

61-
interface SetupPayload<T> {
61+
public interface SetupPayload<T> {
6262
T setupPayload(Payload payload);
6363
}
6464

65-
interface Transport<T extends io.rsocket.transport.Transport, B extends Closeable> {
66-
Start<B> transport(Supplier<T> t);
65+
public interface Acceptor<T, A> {
66+
T acceptor(Supplier<A> acceptor);
6767

68-
default Start<B> transport(T t) {
69-
return transport(() -> t);
68+
default T acceptor(A acceptor) {
69+
return acceptor(() -> acceptor);
7070
}
7171
}
7272

73-
interface Acceptor<T extends io.rsocket.transport.Transport, A, B extends Closeable> {
74-
Transport<T, B> acceptor(Supplier<A> acceptor);
73+
public interface ClientTransportAcceptor {
74+
Start<RSocket> transport(Supplier<ClientTransport> transport);
7575

76-
default Transport<T, B> acceptor(A acceptor) {
77-
return acceptor(() -> acceptor);
76+
default Start<RSocket> transport(ClientTransport transport) {
77+
return transport(() -> transport);
78+
}
79+
}
80+
81+
public interface ServerTransportAcceptor {
82+
<T extends Closeable> Start<T> transport(Supplier<ServerTransport<T>> transport);
83+
84+
default <T extends Closeable> Start<T> transport(ServerTransport<T> transport) {
85+
return transport(() -> transport);
7886
}
7987
}
8088

81-
interface Fragmentation<
82-
R extends Acceptor<T, A, B>,
83-
T extends io.rsocket.transport.Transport,
84-
A,
85-
B extends Closeable> {
86-
R fragment(int mtu);
89+
public interface Fragmentation<T> {
90+
T fragment(int mtu);
8791
}
8892

89-
interface ErrorConsumer<
90-
R extends Acceptor<T, A, B>,
91-
T extends io.rsocket.transport.Transport,
92-
A,
93-
B extends Closeable> {
94-
R errorConsumer(Consumer<Throwable> errorConsumer);
93+
public interface ErrorConsumer<T> {
94+
T errorConsumer(Consumer<Throwable> errorConsumer);
9595
}
9696

97-
interface KeepAlive<T> {
97+
public interface KeepAlive<T> {
9898
T keepAlive();
9999

100100
T keepAlive(Duration tickPeriod, Duration ackTimeout, int missedAcks);
@@ -106,27 +106,26 @@ interface KeepAlive<T> {
106106
T keepAliveMissedAcks(int missedAcks);
107107
}
108108

109-
interface MimeType<T> {
109+
public interface MimeType<T> {
110110
T mimeType(String metadataMimeType, String dataMimeType);
111111

112112
T dataMimeType(String dataMimeType);
113113

114114
T metadataMimeType(String metadataMimeType);
115115
}
116116

117-
class ClientRSocketFactory
118-
implements KeepAlive<ClientRSocketFactory>,
117+
public static class ClientRSocketFactory
118+
implements Acceptor<ClientTransportAcceptor, Function<RSocket, RSocket>>,
119+
ClientTransportAcceptor,
120+
KeepAlive<ClientRSocketFactory>,
119121
MimeType<ClientRSocketFactory>,
120-
Acceptor<ClientTransport, Function<RSocket, RSocket>, RSocket>,
121-
Transport<ClientTransport, RSocket>,
122-
Fragmentation<ClientRSocketFactory, ClientTransport, Function<RSocket, RSocket>, RSocket>,
123-
ErrorConsumer<ClientRSocketFactory, ClientTransport, Function<RSocket, RSocket>, RSocket>,
122+
Fragmentation<ClientRSocketFactory>,
123+
ErrorConsumer<ClientRSocketFactory>,
124124
SetupPayload<ClientRSocketFactory> {
125125

126126
private Supplier<Function<RSocket, RSocket>> acceptor =
127127
() -> rSocket -> new AbstractRSocket() {};
128128

129-
private Supplier<io.rsocket.transport.ClientTransport> transportClient;
130129
private Consumer<Throwable> errorConsumer = Throwable::printStackTrace;
131130
private int mtu = 0;
132131
private PluginRegistry plugins = new PluginRegistry(Plugins.defaultPlugins());
@@ -209,25 +208,14 @@ public ClientRSocketFactory metadataMimeType(String metadataMimeType) {
209208
}
210209

211210
@Override
212-
public Start<RSocket> transport(Supplier<io.rsocket.transport.ClientTransport> t) {
213-
return new ClientTransport().transport(t);
214-
}
215-
216-
protected class ClientTransport
217-
implements Transport<io.rsocket.transport.ClientTransport, RSocket> {
218-
@Override
219-
public Start<RSocket> transport(
220-
Supplier<io.rsocket.transport.ClientTransport> transportClient) {
221-
ClientRSocketFactory.this.transportClient = transportClient;
222-
return new StartClient();
223-
}
211+
public Start<RSocket> transport(Supplier<ClientTransport> transportClient) {
212+
return new StartClient(transportClient);
224213
}
225214

226215
@Override
227-
public Transport<io.rsocket.transport.ClientTransport, RSocket> acceptor(
228-
Supplier<Function<RSocket, RSocket>> acceptor) {
216+
public ClientTransportAcceptor acceptor(Supplier<Function<RSocket, RSocket>> acceptor) {
229217
this.acceptor = acceptor;
230-
return new ClientTransport();
218+
return StartClient::new;
231219
}
232220

233221
@Override
@@ -249,6 +237,12 @@ public ClientRSocketFactory setupPayload(Payload payload) {
249237
}
250238

251239
protected class StartClient implements Start<RSocket> {
240+
private final Supplier<ClientTransport> transportClient;
241+
242+
StartClient(Supplier<ClientTransport> transportClient) {
243+
this.transportClient = transportClient;
244+
}
245+
252246
@Override
253247
public Mono<RSocket> start() {
254248
return transportClient
@@ -305,13 +299,12 @@ public Mono<RSocket> start() {
305299
}
306300
}
307301

308-
class ServerRSocketFactory
309-
implements Acceptor<ServerTransport, SocketAcceptor, Closeable>,
310-
Fragmentation<ServerRSocketFactory, ServerTransport, SocketAcceptor, Closeable>,
311-
ErrorConsumer<ServerRSocketFactory, ServerTransport, SocketAcceptor, Closeable> {
302+
public static class ServerRSocketFactory
303+
implements Acceptor<ServerTransportAcceptor, SocketAcceptor>,
304+
Fragmentation<ServerRSocketFactory>,
305+
ErrorConsumer<ServerRSocketFactory> {
312306

313307
private Supplier<SocketAcceptor> acceptor;
314-
private Supplier<io.rsocket.transport.ServerTransport> transportServer;
315308
private Consumer<Throwable> errorConsumer = Throwable::printStackTrace;
316309
private int mtu = 0;
317310
private PluginRegistry plugins = new PluginRegistry(Plugins.defaultPlugins());
@@ -334,10 +327,9 @@ public ServerRSocketFactory addServerPlugin(RSocketInterceptor interceptor) {
334327
}
335328

336329
@Override
337-
public Transport<io.rsocket.transport.ServerTransport, Closeable> acceptor(
338-
Supplier<SocketAcceptor> acceptor) {
330+
public ServerTransportAcceptor acceptor(Supplier<SocketAcceptor> acceptor) {
339331
this.acceptor = acceptor;
340-
return new ServerTransport();
332+
return ServerStart::new;
341333
}
342334

343335
@Override
@@ -352,18 +344,15 @@ public ServerRSocketFactory errorConsumer(Consumer<Throwable> errorConsumer) {
352344
return this;
353345
}
354346

355-
private class ServerTransport
356-
implements Transport<io.rsocket.transport.ServerTransport, Closeable> {
357-
@Override
358-
public Start transport(Supplier<io.rsocket.transport.ServerTransport> transportServer) {
359-
ServerRSocketFactory.this.transportServer = transportServer;
360-
return new ServerStart();
347+
private class ServerStart<T extends Closeable> implements Start<T> {
348+
private final Supplier<ServerTransport<T>> transportServer;
349+
350+
ServerStart(Supplier<ServerTransport<T>> transportServer) {
351+
this.transportServer = transportServer;
361352
}
362-
}
363353

364-
private class ServerStart implements Start {
365354
@Override
366-
public Mono<Closeable> start() {
355+
public Mono<T> start() {
367356
return transportServer
368357
.get()
369358
.start(

rsocket-examples/src/test/java/io/rsocket/integration/IntegrationTest.java

Lines changed: 0 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -122,8 +122,6 @@ public Flux<Payload> requestStream(Payload payload) {
122122
})
123123
.transport(serverTransport)
124124
.start()
125-
// TODO fix the Types through RSocketFactory.Start
126-
.cast(NettyContextCloseable.class)
127125
.block();
128126

129127
client =

rsocket-examples/src/test/java/io/rsocket/integration/TcpIntegrationTest.java

Lines changed: 2 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -20,7 +20,6 @@
2020
import static org.junit.Assert.assertFalse;
2121

2222
import io.rsocket.AbstractRSocket;
23-
import io.rsocket.Closeable;
2423
import io.rsocket.Payload;
2524
import io.rsocket.RSocket;
2625
import io.rsocket.RSocketFactory;
@@ -47,15 +46,11 @@ public class TcpIntegrationTest {
4746
@Before
4847
public void startup() {
4948
TcpServerTransport serverTransport = TcpServerTransport.create(0);
50-
RSocketFactory.Start<Closeable> transport =
49+
server =
5150
RSocketFactory.receive()
5251
.acceptor((setup, sendingSocket) -> Mono.just(new RSocketProxy(handler)))
53-
.transport(serverTransport);
54-
server =
55-
transport
52+
.transport(serverTransport)
5653
.start()
57-
// TODO fix the Types through RSocketFactory.Start
58-
.cast(NettyContextCloseable.class)
5954
.block();
6055
}
6156

rsocket-test/src/main/java/io/rsocket/test/ClientSetupRule.java

Lines changed: 0 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -49,7 +49,6 @@ public ClientSetupRule(
4949
.acceptor((setup, sendingSocket) -> Mono.just(new TestRSocket()))
5050
.transport(serverTransportSupplier.apply(address))
5151
.start()
52-
.map(s -> (S) s) // TODO fix casting
5352
.block();
5453

5554
this.clientConnector =

0 commit comments

Comments
 (0)
0