10000 Revert refactor RequestHandler.handleChannel() to take two arguments · MStart/reactivesocket-java@500b6e6 · GitHub
[go: up one dir, main page]

Skip to content

Commit 500b6e6

Browse files
committed
Revert refactor RequestHandler.handleChannel() to take two arguments
1 parent adb7ce5 commit 500b6e6

File tree

8 files changed

+161
-210
lines changed

8 files changed

+161
-210
lines changed

build.gradle

Lines changed: 1 addition & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -25,7 +25,6 @@ dependencies {
2525
testCompile 'org.mockito:mockito-core:1.8.5'
2626
}
2727

28-
2928
// support for snapshot/final releases via versioned branch names like 1.x
3029
nebulaRelease {
3130
addReleaseBranchPattern(/\d+\.\d+\.\d+/)
@@ -38,4 +37,4 @@ if (project.hasProperty('release.useLastTag')) {
3837

3938
test {
4039
testLogging.showStandardStreams = true
41-
}
40+
}

src/main/java/io/reactivesocket/RequestHandler.java

Lines changed: 4 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -18,7 +18,6 @@
1818
import io.reactivesocket.internal.PublisherUtils;
1919
import org.reactivestreams.Publisher;
2020

21-
import java.util.function.BiFunction;
2221
import java.util.function.Function;
2322

2423
public abstract class RequestHandler {
@@ -50,9 +49,10 @@ public abstract class RequestHandler {
5049
public abstract Publisher<Void> handleFireAndForget(final Payload payload);
5150

5251
/**
53-
* @return
52+
* @note The initialPayload will also be part of the inputs publisher.
53+
* It is there to simplify routing logic.
5454
*/
55-
public abstract Publisher<Payload> handleChannel(final Publisher<Payload> inputs);
55+
public abstract Publisher<Payload> handleChannel(Payload initialPayload, final Publisher<Payload> inputs);
5656

5757
public abstract Publisher<Void> handleMetadataPush(final Payload payload);
5858

@@ -125,7 +125,7 @@ public Publisher<Void> handleFireAndForget(Payload payload)
125125
return handleFireAndForget.apply(payload);
126126
}
127127

128-
public Publisher<Payload> handleChannel(Publisher<Payload> inputs)
128+
public Publisher<Payload> handleChannel(Payload initialPayload, Publisher<Payload> inputs)
129129
{
130130
return handleRequestChannel.apply(inputs);
131131
}

src/main/java/io/reactivesocket/internal/Responder.java

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -742,7 +742,7 @@ public void request(long n) {
742742
channels.put(streamId, channelRequests);
743743
}
744744

745-
Publisher<Payload> responses = requestHandler.handleChannel(channelRequests);
745+
Publisher<Payload> responses = requestHandler.handleChannel(requestFrame, channelRequests);
746746
responses.subscribe(new Subscriber<Payload>() {
747747
@Override
748748
public void onSubscribe(Subscription s) {

src/perf/java/io/reactivesocket/ReactiveSocketPerf.java

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -122,7 +122,7 @@ public Publisher<Void> handleFireAndForget(Payload payload) {
122122
}
123123

124124
@Override
125-
public Publisher<Payload> handleChannel(Publisher<Payload> inputs) {
125+
public Publisher<Payload> handleChannel(Payload initialPayload, Publisher<Payload> inputs) {
126126
return null;
127127
}
128128

src/test/java/io/reactivesocket/LeaseTest.java

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -114,7 +114,7 @@ public Publisher<Void> handleFireAndForget(Payload payload) {
114114
*/
115115
@Override
116116
public Publisher<Payload> handleChannel(
117-
Publisher<Payload> inputs
117+
Payload initialPayload, Publisher<Payload> inputs
118118
) {
119119
return fromPublisher(inputs).map(p ->
120120
utf8EncodedPayload(byteToString(p.getData()) + "_echo", null));

src/test/java/io/reactivesocket/ReactiveSocketTest.java

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -125,7 +125,7 @@ public Publisher<Void> handleFireAndForget(Payload payload) {
125125
* Use Payload.metadata for routing
126126
*/
127127
@Override
128-
public Publisher<Payload> handleChannel(Publisher<Payload> inputs) {
128+
public Publisher<Payload> handleChannel(Payload initialPayload, Publisher<Payload> inputs) {
129129
return new Publisher<Payload>() {
130130
@Override
131131
public void subscribe(Subscriber<? super Payload> subscriber) {

0 commit comments

Comments
 (0)
0