8000 created a reactive socket interface, pointed rxjava2 to the specific … · MStart/reactivesocket-java@ac77485 · GitHub
[go: up one dir, main page]

Skip to content

Commit ac77485

Browse files
committed
created a reactive socket interface, pointed rxjava2 to the specific version of rxjava2 to get the tests to work, updated Agrona to 0.4.13
1 parent f07d471 commit ac77485

27 files changed

+258
-172
lines changed

build.gradle

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -18,9 +18,9 @@ repositories {
1818

1919
dependencies {
2020
compile 'org.reactivestreams:reactive-streams:1.0.0.final'
21-
compile 'uk.co.real-logic:Agrona:0.4.2'
21+
compile 'org.agrona:Agrona:0.4.13'
2222

23-
testCompile 'io.reactivex:rxjava:2.0.0-DP0-SNAPSHOT'
23+
testCompile 'io.reactivex:rxjava:2.0.0-DP0-20151003.214425-143'
2424
testCompile 'junit:junit-dep:4.10'
2525
testCompile 'org.mockito:mockito-core:1.8.5'
2626
}

src/main/java/io/reactivesocket/Frame.java

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -24,8 +24,8 @@
2424
import io.reactivesocket.internal.frame.RequestNFrameFlyweight;
2525
import io.reactivesocket.internal.frame.SetupFrameFlyweight;
2626
import io.reactivesocket.internal.frame.UnpooledFrame;
27-
import uk.co.real_logic.agrona.DirectBuffer;
28-
import uk.co.real_logic.agrona.MutableDirectBuffer;
27+
import org.agrona.DirectBuffer;
28+
import org.agrona.MutableDirectBuffer;
2929

3030
import java.nio.ByteBuffer;
3131
import java.nio.charset.Charset;
Lines changed: 103 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -1,7 +1,108 @@
1+
/**
2+
* Copyright 2015 Netflix, Inc.
3+
*
4+
* Licensed under the Apache License, Version 2.0 (the "License");
5+
* you may not use this file except in compliance with the License.
6+
* You may obtain a copy of the License at
7+
*
8+
* http://www.apache.org/licenses/LICENSE-2.0
9+
*
10+
* Unless required by applicable law or agreed to in writing, software
11+
* distributed under the License is distributed on an "AS IS" BASIS,
12+
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13+
* See the License for the specific language governing permissions and
14+
* limitations under the License.
15+
*/
116
package io.reactivesocket;
217

18+
import io.reactivesocket.rx.Completable;
19+
import org.reactivestreams.Publisher;
20+
21+
import java.util.concurrent.CountDownLatch;
22+
import java.util.concurrent.atomic.AtomicReference;
23+
import java.util.function.Consumer;
24+
325
/**
4-
* Created by rroeser on 3/25/16.
26+
* Interface for a connection that supports sending requests and receiving responses
527
*/
6-
public interface ReactiveSocket {
28+
public interface ReactiveSocket extends AutoCloseable {
29+
Publisher<Payload> requestResponse(final Payload payload);
30+
31+
Publisher<Void> fireAndForget(final Payload payload);
32+
33+
Publisher<Payload> requestStream(final Payload payload);
34+
35+
Publisher<Payload> requestSubscription(final Payload payload);
36+
37+
Publisher<Payload> requestChannel(final Publisher<Payload> payloads);
38+
39+
Publisher<Void> metadataPush(final Payload payload);
40+
41+
/**
42+
* Client check for availability to send request based on lease
43+
*
44+
* @return 0.0 to 1.0 indicating availability of sending requests
45+
*/
46+
double availability();
47+
48+
/**
49+
* Start protocol processing on the given DuplexConnection.
50+
*/
51+
void start(Completable c);
52+
53+
/**
54+
* Start and block the current thread until startup is finished.
55+
*
56+
* @throws RuntimeException
57+
* of InterruptedException
58+
*/
59+
default void startAndWait() {
60+
CountDownLatch latch = new CountDownLatch(1);
61+
AtomicReference<Throwable> err = new AtomicReference<>();
62+
start(new Completable() {
63+
@Override
64+
public void success() {
65+
latch.countDown();
66+
}
67+
68+
@Override
69+
public void error(Throwable e) {
70+
latch.countDown();
71+
}
72+
});
73+
try {
74+
latch.await();
75+
} catch (InterruptedException e) {
76+
throw new RuntimeException(e);
77+
}
78+
if (err.get() != null) {
79+
throw new RuntimeException(err.get());
80+
}
81+
}
82+
83+
/**
84+
* Invoked when Requester is ready. Non-null exception if error. Null if success.
85+
*
86+
* @param c
87+
*/
88+
void onRequestReady(Consumer<Throwable> c);
89+
90+
/**
91+
* Invoked when Requester is ready with success or fail.
92+
*
93+
* @param c
94+
*/
95+
void onRequestReady(Completable c);
96+
97+
/**
98+
* Server granting new lease information to client
99+
*
100+
* Initial lease semantics are that server waits for periodic granting of leases by server side.
101+
*
102+
* @param ttl
103+
* @param numberOfRequests
104+
*/
105+
void sendLease(int ttl, int numberOfRequests);
106+
107+
void shutdown();
7108
}

src/main/java/io/reactivesocket/ReactiveSocketFactory.java

Lines changed: 15 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -1,3 +1,18 @@
1+
/**
2+
* Copyright 2015 Netflix, Inc.
3+
*
4+
* Licensed under the Apache License, Version 2.0 (the "License");
5+
* you may not use this file except in compliance with the License.
6+
* You may obtain a copy of the License at
7+
*
8+
* http://www.apache.org/licenses/LICENSE-2.0
9+
*
10+
* Unless required by applicable law or agreed to in writing, software
11+
* distributed under the License is distributed on an "AS IS" BASIS,
12+
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13+
* See the License for the specific language governing permissions and
14+
* limitations under the License.
15+
*/
116
package io.reactivesocket;
217

318
import org.reactivestreams.Publisher;

src/main/java/io/reactivesocket/ReactiveSocketImpl.java

Lines changed: 25 additions & 75 deletions
Original file line numberDiff line numberDiff line change
@@ -15,17 +15,6 @@
1515
*/
1616
package io.reactivesocket;
1717

18-
import java.io.IOException;
19-
import java.util.concurrent.CountDownLatch;
20-
import java.util.concurrent.atomic.AtomicInteger;
21-
import java.util.concurrent.atomic.AtomicReference;
22-
import java.util.function.BiConsumer;
23-
import java.util.function.Consumer;
24-
25-
import org.reactivestreams.Publisher;
26-
import org.reactivestreams.Subscriber;
27-
import org.reactivestreams.Subscription;
28-
2918
import io.reactivesocket.internal.Requester;
3019
import io.reactivesocket.internal.Responder;
3120
import io.reactivesocket.internal.rx.CompositeCompletable;
@@ -34,16 +23,21 @@
3423
import io.reactivesocket.rx.Disposable;
3524
import io.reactivesocket.rx.Observable;
3625
import io.reactivesocket.rx.Observer;
37-
import uk.co.real_logic.agrona.BitUtil;
26+
import org.agrona.BitUtil;
27+
import org.reactivestreams.Publisher;
28+
import org.reactivestreams.Subscriber;
29+
import org.reactivestreams.Subscription;
30+
31+
import java.io.IOException;
32+
import java.util.concurrent.atomic.AtomicInteger;
33+
import java.util.function.Consumer;
3834

3935
import static io.reactivesocket.LeaseGovernor.NULL_LEASE_GOVERNOR;
4036

4137
/**
42-
* Interface for a connection that supports sending requests and receiving responses
43-
*
44-
* Created by servers for connections Created on demand for clients
38+
* An implementation of {@link ReactiveSocket}
4539
*/
46-
public class ReactiveSocket implements AutoCloseable {
40+
public class ReactiveSocketImpl implements ReactiveSocket {
4741
private static final RequestHandler EMPTY_HANDLER = new RequestHandler.Builder().build();
4842

4943
private static final Consumer<Throwable> DEFAULT_ERROR_STREAM = t -> {
@@ -62,7 +56,7 @@ public class ReactiveSocket implements AutoCloseable {
6256
private final ConnectionSetupHandler responderConnectionHandler;
6357
private final LeaseGovernor leaseGovernor;
6458

65-
private ReactiveSocket(
59+
private ReactiveSocketImpl(
6660
DuplexConnection connection,
6761
boolean isServer,
6862
ConnectionSetupPayload serverRequestorSetupPayload,
@@ -118,7 +112,7 @@ public static ReactiveSocket fromClientConnection(
118112
}
119113
final RequestHandler h = handler != null ? handler : EMPTY_HANDLER;
120114
Consumer<Throwable> es = errorStream != null ? errorStream : DEFAULT_ERROR_STREAM;
121-
return new ReactiveSocket(connection, false, setup, h, null, NULL_LEASE_GOVERNOR, es);
115+
return new ReactiveSocketImpl(connection, false, setup, h, null, NULL_LEASE_GOVERNOR, es);
122116
}
123117

124118
/**
@@ -178,7 +172,7 @@ public static ReactiveSocket fromServerConnection(
178172
LeaseGovernor leaseGovernor,
179173
Consumer<Throwable> errorConsumer
180174
) {
181-
return new ReactiveSocket(connection, true, null, null, connectionHandler,
175+
return new ReactiveSocketImpl(connection, true, null, null, connectionHandler,
182176
leaseGovernor, errorConsumer);
183177
}
184178

@@ -192,31 +186,37 @@ public static ReactiveSocket fromServerConnection(
192186
/**
193187
* Initiate a request response exchange
194188
*/
189+
@Override
195190
public Publisher<Payload> requestResponse(final Payload payload) {
196191
assertRequester();
197192
return requester.requestResponse(payload);
198193
}
199194

195+
@Override
200196
public Publisher<Void> fireAndForget(final Payload payload) {
201197
assertRequester();
202198
return requester.fireAndForget(payload);
203199
}
204200

201+
@Override
205202
public Publisher<Payload> requestStream(final Payload payload) {
206203
assertRequester();
207204
return requester.requestStream(payload);
208205
}
209206

207+
@Override
210208
public Publisher<Payload> requestSubscription(final Payload payload) {
211209
assertRequester();
212210
return requester.requestSubscription(payload);
213211
}
214212

213+
@Override
215214
public Publisher<Payload> requestChannel(final Publisher<Payload> payloads) {
216215
assertRequester();
217216
return requester.requestChannel(payloads);
218217
}
219218

219+
@Override
220220
public Publisher<Void> metadataPush(final Payload payload) {
221221
assertRequester();
222222
return requester.metadataPush(payload);
@@ -239,33 +239,20 @@ private void assertRequester() {
239239
}
240240
}
241241

242-
/**
243-
* Client check for availability to send request based on lease
244-
*
245-
* @return 0.0 to 1.0 indicating availability of sending requests
246-
*/
242+
@Override
247243
public double availability() {
248244
// TODO: can happen in either direction
249245
assertRequester();
250246
return requester.availability();
251247
}
252248

253-
/**
254-
* Server granting new lease information to client
255-
*
256-
* Initial lease semantics are that server waits for periodic granting of leases by server side.
257-
*
258-
* @param ttl
259-
* @param numberOfRequests
260-
*/
249+
@Override
261250
public void sendLease(int ttl, int numberOfRequests) {
262251
// TODO: can happen in either direction
263252
responder.sendLease(ttl, numberOfRequests);
264253
}
265254

266-
/**
267-
* Start protocol processing on the given DuplexConnection.
268-
*/
255+
@Override
269256
public final void start(Completable c) {
270257
if (isServer) {
271258
responder = Responder.createServerResponder(
@@ -345,20 +332,12 @@ public void error(Throwable e) {
345332

346333
private final CompositeCompletable requesterReady = new CompositeCompletable();
347334

348-
/**
349-
* Invoked when Requester is ready with success or fail.
350-
*
351-
* @param c
352-
*/
335+
@Override
353336
public final void onRequestReady(Completable c) {
354337
requesterReady.add(c);
355338
}
356339

357-
/**
358-
* Invoked when Requester is ready. Non-null exception if error. Null if success.
359-
*
360-
* @param c
361-
*/
340+
@Override
362341
public final void onRequestReady(Consumer<Throwable> c) {
363342
requesterReady.add(new Completable() {
364343
@Override
@@ -458,36 +437,6 @@ public void addOutput(Frame f, Completable callback) {
458437

459438
};
460439

461-
/**
462-
* Start and block the current thread until startup is finished.
463-
*
464-
* @throws RuntimeException
465-
* of InterruptedException
466-
*/
467-
public final void startAndWait() {
468-
CountDownLatch latch = new CountDownLatch(1);
469-
AtomicReference<Throwable> err = new AtomicReference<>();
470-
start(new Completable() {
471-
@Override
472-
public void success() {
473-
latch.countDown();
474-
}
475-
476-
@Override
477-
public void error(Throwable e) {
478-
latch.countDown();
479-
}
480-
});
481-
try {
482-
latch.await();
483-
} catch (InterruptedException e) {
484-
throw new RuntimeException(e);
485-
}
486-
if (err.get() != null) {
487-
throw new RuntimeException(err.get());
488-
}
489-
}
490-
491440
@Override
492441
public void close() throws Exception {
493442
connection.close();
@@ -500,6 +449,7 @@ public void close() throws Exception {
500449
}
501450
}
502451

452+
@Override
503453
public void shutdown() {
504454
try {
505455
close();

src/main/java/io/reactivesocket/internal/Requester.java

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -44,7 +44,7 @@
4444
import io.reactivesocket.rx.Completable;
4545
import io.reactivesocket.rx.Disposable;
4646
import io.reactivesocket.rx.Observer;
47-
import uk.co.real_logic.agrona.collections.Int2ObjectHashMap;
47+
import org.agrona.collections.Int2ObjectHashMap;
4848

4949
/**
5050
* Protocol implementation abstracted over a {@link DuplexConnection}.

0 commit comments

Comments
 (0)
0