8000 Fix types in RSocketFactory by rdegnan · Pull Request #381 · rsocket/rsocket-java · GitHub
[go: up one dir, main page]

Skip to content

Fix types in RSocketFactory #381

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Merged
merged 1 commit into from
Sep 13, 2017
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Fix types in RSocketFactory
  • Loading branch information
Ryland Degnan committed Sep 12, 2017
commit d63ff1c387eaa074f1354c9ea911ce9f52ae1e74
121 changes: 55 additions & 66 deletions rsocket-core/src/main/java/io/rsocket/RSocketFactory.java
Original file line number Diff line number Diff line change
Expand Up @@ -35,13 +35,13 @@
import reactor.core.publisher.Mono;

/** Factory for creating RSocket clients and servers. */
public interface RSocketFactory {
public class RSocketFactory {
/**
* Creates a factory that establishes client connections to other RSockets.
*
* @return a client factory
*/
static ClientRSocketFactory connect() {
public static ClientRSocketFactory connect() {
return new ClientRSocketFactory();
}

Expand All @@ -50,51 +50,51 @@ static ClientRSocketFactory connect() {
*
* @return a server factory.
*/
static ServerRSocketFactory receive() {
public static ServerRSocketFactory receive() {
return new ServerRSocketFactory();
}

interface Start<T extends Closeable> {
public interface Start<T extends Closeable> {
Mono<T> start();
}

interface SetupPayload<T> {
public interface SetupPayload<T> {
T setupPayload(Payload payload);
}

interface Transport<T extends io.rsocket.transport.Transport, B extends Closeable> {
Start<B> transport(Supplier<T> t);
public interface Acceptor<T, A> {
T acceptor(Supplier<A> acceptor);

default Start<B> transport(T t) {
return transport(() -> t);
default T acceptor(A acceptor) {
return acceptor(() -> acceptor);
}
}

interface Acceptor<T extends io.rsocket.transport.Transport, A, B extends Closeable> {
Transport<T, B> acceptor(Supplier<A> acceptor);
public interface ClientTransportAcceptor {
Start<RSocket> transport(Supplier<ClientTransport> transport);

default Transport<T, B> acceptor(A acceptor) {
return acceptor(() -> acceptor);
default Start<RSocket> transport(ClientTransport transport) {
return transport(() -> transport);
}
}

public interface ServerTransportAcceptor {
<T extends Closeable> Start<T> transport(Supplier<ServerTransport<T>> transport);

default <T extends Closeable> Start<T> transport(ServerTransport<T> transport) {
return transport(() -> transport);
}
}

interface Fragmentation<
R extends Acceptor<T, A, B>,
T extends io.rsocket.transport.Transport,
A,
B extends Closeable> {
R fragment(int mtu);
public interface Fragmentation<T> {
T fragment(int mtu);
}

interface ErrorConsumer<
R extends Acceptor<T, A, B>,
T extends io.rsocket.transport.Transport,
A,
B extends Closeable> {
R errorConsumer(Consumer<Throwable> errorConsumer);
public interface ErrorConsumer<T> {
T errorConsumer(Consumer<Throwable> errorConsumer);
}

interface KeepAlive<T> {
public interface KeepAlive<T> {
T keepAlive();

T keepAlive(Duration tickPeriod, Duration ackTimeout, int missedAcks);
Expand All @@ -106,27 +106,26 @@ interface KeepAlive<T> {
T keepAliveMissedAcks(int missedAcks);
}

interface MimeType<T> {
public interface MimeType<T> {
T mimeType(String metadataMimeType, String dataMimeType);

T dataMimeType(String dataMimeType);

T metadataMimeType(String metadataMimeType);
}

class ClientRSocketFactory
implements KeepAlive<ClientRSocketFactory>,
public static class ClientRSocketFactory
implements Acceptor<ClientTransportAcceptor, Function<RSocket, RSocket>>,
ClientTransportAcceptor,
KeepAlive<ClientRSocketFactory>,
MimeType<ClientRSocketFactory>,
Acceptor<ClientTransport, Function<RSocket, RSocket>, RSocket>,
Transport<ClientTransport, RSocket>,
Fragmentation<ClientRSocketFactory, ClientTransport, Function<RSocket, RSocket>, RSocket>,
ErrorConsumer<ClientRSocketFactory, ClientTransport, Function<RSocket, RSocket>, RSocket>,
Fragmentation<ClientRSocketFactory>,
ErrorConsumer<ClientRSocketFactory>,
SetupPayload<ClientRSocketFactory> {

private Supplier<Function<RSocket, RSocket>> acceptor =
() -> rSocket -> new AbstractRSocket() {};

private Supplier<io.rsocket.transport.ClientTransport> transportClient;
private Consumer<Throwable> errorConsumer = Throwable::printStackTrace;
private int mtu = 0;
private PluginRegistry plugins = new PluginRegistry(Plugins.defaultPlugins());
Expand Down Expand Up @@ -209,25 +208,14 @@ public ClientRSocketFactory metadataMimeType(String metadataMimeType) {
}

@Override
public Start<RSocket> transport(Supplier<io.rsocket.transport.ClientTransport> t) {
return new ClientTransport().transport(t);
}

protected class ClientTransport
implements Transport<io.rsocket.transport.ClientTransport, RSocket> {
@Override
public Start<RSocket> transport(
Supplier<io.rsocket.transport.ClientTransport> transportClient) {
ClientRSocketFactory.this.transportClient = transportClient;
return new StartClient();
}
public Start<RSocket> transport(Supplier<ClientTransport> transportClient) {
return new StartClient(transportClient);
}

@Override
public Transport<io.rsocket.transport.ClientTransport, RSocket> acceptor(
Supplier<Function<RSocket, RSocket>> acceptor) {
public ClientTransportAcceptor acceptor(Supplier<Function<RSocket, RSocket>> acceptor) {
this.acceptor = acceptor;
return new ClientTransport();
return StartClient::new;
}

@Override
Expand All @@ -249,6 +237,12 @@ public ClientRSocketFactory setupPayload(Payload payload) {
}

protected class StartClient implements Start<RSocket> {
private final Supplier<ClientTransport> transportClient;

StartClient(Supplier<ClientTransport> transportClient) {
this.transportClient = transportClient;
}

@Override
public Mono<RSocket> start() {
return transportClient
Expand Down Expand Up @@ -305,13 +299,12 @@ public Mono<RSocket> start() {
}
}

class ServerRSocketFactory
implements Acceptor<ServerTransport, SocketAcceptor, Closeable>,
Fragmentation<ServerRSocketFactory, ServerTransport, SocketAcceptor, Closeable>,
ErrorConsumer<ServerRSocketFactory, ServerTransport, SocketAcceptor, Closeable> {
public static class ServerRSocketFactory
implements Acceptor<ServerTransportAcceptor, SocketAcceptor>,
Fragmentation<ServerRSocketFactory>,
ErrorConsumer<ServerRSocketFactory> {

private Supplier<SocketAcceptor> acceptor;
private Supplier<io.rsocket.transport.ServerTransport> transportServer;
private Consumer<Throwable> errorConsumer = Throwable::printStackTrace;
private int mtu = 0;
private PluginRegistry plugins = new PluginRegistry(Plugins.defaultPlugins());
Expand All @@ -334,10 +327,9 @@ public ServerRSocketFactory addServerPlugin(RSocketInterceptor interceptor) {
}

@Override
public Transport<io.rsocket.transport.ServerTransport, Closeable> acceptor(
Supplier<SocketAcceptor> acceptor) {
public ServerTransportAcceptor acceptor(Supplier<SocketAcceptor> acceptor) {
this.acceptor = acceptor;
return new ServerTransport();
return ServerStart::new;
}

@Override
8000 Expand All @@ -352,18 +344,15 @@ public ServerRSocketFactory errorConsumer(Consumer<Throwable> errorConsumer) {
return this;
}

private class ServerTransport
implements Transport<io.rsocket.transport.ServerTransport, Closeable> {
@Override
public Start transport(Supplier<io.rsocket.transport.ServerTransport> transportServer) {
ServerRSocketFactory.this.transportServer = transportServer;
return new ServerStart();
private class ServerStart<T extends Closeable> implements Start<T> {
private final Supplier<ServerTransport<T>> transportServer;

ServerStart(Supplier<ServerTransport<T>> transportServer) {
this.transportServer = transportServer;
}
}

private class ServerStart implements Start {
@Override
public Mono<Closeable> start() {
public Mono<T> start() {
return transportServer
.get()
.start(
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -122,8 +122,6 @@ public Flux<Payload> requestStream(Payload payload) {
})
.transport(serverTransport)
.start()
// TODO fix the Types through RSocketFactory.Start
.cast(NettyContextCloseable.class)
.block();

client =
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -20,7 +20,6 @@
import static org.junit.Assert.assertFalse;

import io.rsocket.AbstractRSocket;
import io.rsocket.Closeable;
import io.rsocket.Payload;
import io.rsocket.RSocket;
import io.rsocket.RSocketFactory;
Expand All @@ -47,15 +46,11 @@ public class TcpIntegrationTest {
@Before
public void startup() {
TcpServerTransport serverTransport = TcpServerTransport.create(0);
RSocketFactory.Start<Closeable> transport =
server =
RSocketFactory.receive()
.acceptor((setup, sendingSocket) -> Mono.just(new RSocketProxy(handler)))
.transport(serverTransport);
server =
transport
.transport(serverTransport)
.start()
// TODO fix the Types through RSocketFactory.Start
.cast(NettyContextCloseable.class)
.block();
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -49,7 +49,6 @@ public ClientSetupRule(
.acceptor((setup, sendingSocket) -> Mono.just(new TestRSocket()))
.transport(serverTransportSupplier.apply(address))
.start()
.map(s -> (S) s) // TODO fix casting
.block();

this.clientConnector =
Expand Down
0