8000 utf8 helpers on payload (#355) · chakra-coder/rsocket-java@b945400 · GitHub
[go: up one dir, main page]

Skip to content

Commit b945400

Browse files
authored
utf8 helpers on payload (rsocket#355)
1 parent 8e8b328 commit b945400

File tree

15 files changed

+44
-110
lines changed

15 files changed

+44
-110
lines changed

rsocket-core/src/main/java/io/rsocket/Frame.java

Lines changed: 4 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -266,6 +266,10 @@ public static Frame from(final ByteBuf content) {
266266
return frame;
267267
}
268268

269+
public String getDataUtf8() {
270+
return StandardCharsets.UTF_8.decode(getData()).toString();
271+
}
272+
269273
/* TODO:
270274
*
271275
* fromRequest(type, id, payload)

rsocket-core/src/main/java/io/rsocket/Payload.java

Lines changed: 9 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -16,10 +16,19 @@
1616
package io.rsocket;
1717

1818
import java.nio.ByteBuffer;
19+
import java.nio.charset.StandardCharsets;
1920

2021
/** Payload of a {@link Frame}. */
2122
public interface Payload {
2223
ByteBuffer getMetadata();
2324

2425
ByteBuffer getData();
26+
27+
default String getMetadataUtf8() {
28+
return StandardCharsets.UTF_8.decode(getMetadata()).toString();
29+
}
30+
31+
default String getDataUtf8() {
32+
return StandardCharsets.UTF_8.decode(getData()).toString();
33+
}
2534
}

rsocket-core/src/main/java/io/rsocket/RSocketClient.java

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -472,7 +472,7 @@ private void handleMissingResponseProcessor(int streamId, FrameType type, Frame
472472
if (type == FrameType.ERROR) {
473473
// message for stream that has never existed, we have a problem with
474474
// the overall connection and must tear down
475-
String errorMessage = StandardCharsets.UTF_8.decode(frame.getData()).toString();
475+
String errorMessage = frame.getDataUtf8();
476476

477477
throw new IllegalStateException(
478478
"Client received error for non-existent stream: "

rsocket-core/src/main/java/io/rsocket/exceptions/Exceptions.java

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -27,7 +27,7 @@ private Exceptions() {}
2727
public static RuntimeException from(Frame frame) {
2828
final int errorCode = Frame.Error.errorCode(frame);
2929

30-
String message = StandardCharsets.UTF_8.decode(frame.getData()).toString();
30+
String message = frame.getDataUtf8();
3131
switch (errorCode) {
3232
case APPLICATION_ERROR:
3333
return new ApplicationException(message);

rsocket-core/src/test/java/io/rsocket/TestUtil.java

Lines changed: 0 additions & 52 deletions
This file was deleted.

rsocket-core/src/test/java/io/rsocket/util/PayloadImplTest.java

Lines changed: 2 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -16,7 +16,6 @@
1616
import static org.hamcrest.MatcherAssert.*;
1717
import static org.hamcrest.Matchers.*;
1818

19-
import io.rsocket.TestUtil;
2019
import org.junit.Test;
2120

2221
public class PayloadImplTest {
@@ -39,8 +38,8 @@ public void testReuseWithExternalMark() throws Exception {
3938
}
4039

4140
public void assertDataAndMetadata(PayloadImpl p) {
42-
assertThat("Unexpected data.", TestUtil.byteToString(p.getData()), equalTo(DATA_VAL));
41+
assertThat("Unexpected data.", p.getDataUtf8(), equalTo(DATA_VAL));
4342
assertThat(
44-
"Unexpected metadata.", TestUtil.byteToString(p.getMetadata()), equalTo(METADATA_VAL));
43+
"Unexpected metadata.", p.getMetadataUtf8(), equalTo(METADATA_VAL));
4544
}
4645
}

rsocket-examples/src/main/java/io/rsocket/examples/transport/tcp/channel/ChannelEchoClient.java

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -45,7 +45,7 @@ public static void main(String[] args) {
4545

4646
socket
4747
.requestChannel(Flux.interval(Duration.ofMillis(1000)).map(i -> new PayloadImpl("Hello")))
48-
.map(payload -> StandardCharsets.UTF_8.decode(payload.getData()).toString())
48+
.map(Payload::getDataUtf8)
4949
.doOnNext(System.out::println)
5050
.take(10)
5151
.thenEmpty(socket.close())
@@ -60,7 +60,7 @@ public Mono<RSocket> accept(ConnectionSetupPayload setupPayload, RSocket reactiv
6060
@Override
6161
public Flux<Payload> requestChannel(Publisher<Payload> payloads) {
6262
return Flux.from(payloads)
63-
.map(payload -> StandardCharsets.UTF_8.decode(payload.getData()).toString())
63+
.map(Payload::getDataUtf8)
6464
.map(s -> "Echo: " + s)
6565
.map(PayloadImpl::new);
6666
}

rsocket-examples/src/main/java/io/rsocket/examples/transport/tcp/duplex/DuplexClient.java

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -33,7 +33,7 @@ public static void main(String[] args) {
3333
(setup, reactiveSocket) -> {
3434
reactiveSocket
3535
.requestStream(new PayloadImpl("Hello-Bidi"))
36-
.map(payload -> StandardCharsets.UTF_8.decode(payload.getData()).toString())
36+
.map(Payload::getDataUtf8)
3737
.log()
3838
.subscribe();
3939

rsocket-examples/src/main/java/io/rsocket/examples/transport/tcp/requestresponse/HelloWorldClient.java

Lines changed: 3 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -58,21 +58,21 @@ public Mono<Payload> requestResponse(Payload p) {
5858

5959
socket
6060
.requestResponse(new PayloadImpl("Hello"))
61-
.map(payload -> StandardCharsets.UTF_8.decode(payload.getData()).toString())
61+
.map(Payload::getDataUtf8)
6262
.onErrorReturn("error")
6363
.doOnNext(System.out::println)
6464
.block();
6565

6666
socket
6767
.requestResponse(new PayloadImpl("Hello"))
68-
.map(payload -> StandardCharsets.UTF_8.decode(payload.getData()).toString())
68+
.map(Payload::getDataUtf8)
6969
.onErrorReturn("error")
7070
.doOnNext(System.out::println)
7171
.block();
7272

7373
socket
7474
.requestResponse(new PayloadImpl("Hello"))
75-
.map(payload -> StandardCharsets.UTF_8.decode(payload.getData()).toString())
75+
.map(Payload::getDataUtf8)
7676
.onErrorReturn("error")
7777
.doOnNext(System.out::println)
7878
.block();

rsocket-examples/src/main/java/io/rsocket/examples/transport/tcp/stream/StreamingClient.java

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -39,7 +39,7 @@ public static void main(String[] args) {
3939

4040
socket
4141
.requestStream(new PayloadImpl("Hello"))
42-
.map(payload -> StandardCharsets.UTF_8.decode(payload.getData()).toString())
42+
.map(Payload::getDataUtf8)
4343
.doOnNext(System.out::println)
4444
.take(10)
4545
.thenEmpty(socket.close())

rsocket-examples/src/test/java/io/rsocket/integration/TcpIntegrationTest.java

Lines changed: 5 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -102,7 +102,7 @@ public Flux<Payload> requestStream(Payload payload) {
102102

103103
Payload result = client.requestStream(new PayloadImpl("REQUEST", "META")).blockLast();
104104

105-
assertEquals("RESPONSE", StandardCharsets.UTF_8.decode(result.getData()).toString());
105+
assertEquals("RESPONSE", result.getDataUtf8());
106106
}
107107

108108
@Test(timeout = 5_000L)
@@ -119,7 +119,7 @@ public Flux<Payload> requestStream(Payload payload) {
119119

120120
Payload result = client.requestStream(new PayloadImpl("REQUEST", "META")).blockFirst();
121121

122-
assertEquals("", StandardCharsets.UTF_8.decode(result.getData()).toString());
122+
assertEquals("", result.getDataUtf8());
123123
}
124124

125125
@Test(timeout = 5_000L)
@@ -152,8 +152,8 @@ public Mono<Payload> requestResponse(Payload payload) {
152152
.onErrorReturn(new PayloadImpl("ERROR"))
153153
.block();
154154

155-
assertEquals("ERROR", StandardCharsets.UTF_8.decode(response1.getData()).toString());
156-
assertEquals("SUCCESS", StandardCharsets.UTF_8.decode(response2.getData()).toString());
155+
assertEquals("ERROR", response1.getDataUtf8());
156+
assertEquals("SUCCESS", response2.getDataUtf8());
157157
}
158158

159159
@Test(timeout = 5_000L)
@@ -168,8 +168,7 @@ public void testTwoConcurrentStreams() throws InterruptedException {
168168
new AbstractRSocket() {
169169
@Override
170170
public Flux<Payload> requestStream(Payload payload) {
171-
String s = StandardCharsets.UTF_8.decode(payload.getData()).toString();
172-
return map.get(s);
171+
return map.get(payload.getDataUtf8());
173172
}
174173
};
175174

rsocket-tck-drivers/src/main/java/io/rsocket/tckdrivers/common/MySubscriber.java

Lines changed: 4 additions & 15 deletions
Original file line numberDiff line numberDiff line change
@@ -64,10 +64,7 @@ public void onSubscribe(Subscription s) {
6464
@Override
6565
public void onNext(T t) {
6666
Payload p = (Payload) t;
67-
Tuple<String, String> tup =
68-
new Tuple<>(
69-
StandardCharsets.UTF_8.decode(p.getData()).toString(),
70-
StandardCharsets.UTF_8.decode(p.getMetadata()).toString());
67+
Tuple<String, String> tup = new Tuple<>(p.getDataUtf8(), p.getMetadataUtf8());
7168
consoleUtils.info("On NEXT got : " + tup.getK() + " " + tup.getV());
7269
if (isEcho) {
7370
echosub.add(tup);
@@ -129,9 +126,7 @@ public final void assertValues(List<Tuple<String, String>> values) {
129126
for (int i = 0; i < values.size(); i++) {
130127
Payload p = (Payload) this.values().get(i);
131128
Tuple<String, String> v =
132-
new Tuple<>(
133-
StandardCharsets.UTF_8.decode(p.getData()).toString(),
134-
StandardCharsets.UTF_8.decode(p.getMetadata()).toString());
129+
new Tuple<>(p.getDataUtf8(), p.getMetadataUtf8());
135130
Tuple<String, String> u = values.get(i);
136131
String msg = prefix + "Values at position %d differ; ";
137132
assertEquals(String.format(msg, i), u, v);
@@ -149,10 +144,7 @@ public final void assertValue(Tuple<String, String> value) {
149144
assertEquals(msg, 1, this.values.size());
150145

151146
Payload p = (Payload) values().get(0);
152-
Tuple<String, String> v =
153-
new Tuple<>(
154-
StandardCharsets.UTF_8.decode(p.getData()).toString(),
155-
StandardCharsets.UTF_8.decode(p.getMetadata()).toString());
147+
Tuple<String, String> v = new Tuple<>(p.getDataUtf8(), p.getMetadataUtf8());
156148
msg = prefix;
157149
assertEquals(msg, valueAndClass(value), valueAndClass(v));
158150

@@ -202,10 +194,7 @@ public final void take(long n) {
202194
public Tuple<String, String> getElement(int n) {
203195
assert (n < values.size());
204196
Payload p = (Payload) values().get(n);
205-
Tuple<String, String> tup =
206-
new Tuple<>(
207-
StandardCharsets.UTF_8.decode(p.getData()).toString(),
208-
StandardCharsets.UTF_8.decode(p.getMetadata()).toString());
197+
Tuple<String, String> tup = new Tuple<>(p.getDataUtf8(), p.getMetadataUtf8());
209198
return tup;
210199
}
211200

rsocket-tck-drivers/src/main/java/io/rsocket/tckdrivers/server/JavaServerDriver.java

Lines changed: 3 additions & 9 deletions
Original file line numberDiff line numberDiff line change
@@ -165,9 +165,7 @@ public final Mono<Void> fireAndForget(Payload payload) {
165165
return Mono.from(
166166
s -> {
167167
Tuple<String, String> initialPayload =
168-
new Tuple<>(
169-
StandardCharsets.UTF_8.decode(payload.getData()).toString(),
170-
StandardCharsets.UTF_8.decode(payload.getMetadata()).toString());
168+
new Tuple<>(payload.getDataUtf8(), payload.getMetadataUtf8());
171169
consoleUtils.initialPayload(
172170
"Received firenforget "
173171
+ initialPayload.getK()
@@ -190,9 +188,7 @@ public Mono<Payload> requestResponse(Payload payload) {
190188
return Mono.from(
191189
s -> {
192190
Tuple<String, String> initialPayload =
193-
new Tuple<>(
194-
StandardCharsets.UTF_8.decode(payload.getData()).toString(),
195-
StandardCharsets.UTF_8.decode(payload.getMetadata()).toString());
191+
new Tuple<>(payload.getDataUtf8(), payload.getMetadataUtf8());
196192
String marble = requestResponseMarbles.get(initialPayload);
197193
consoleUtils.initialPayload(
198194
"Received requestresponse "
@@ -213,9 +209,7 @@ public Flux<Payload> requestStream(Payload payload) {
213209
return Flux.from(
214210
s -> {
215211
Tuple<String, String> initialPayload =
216-
new Tuple<>(
217-
StandardCharsets.UTF_8.decode(payload.getData()).toString(),
218-
StandardCharsets.UTF_8.decode(payload.getMetadata()).toString());
212+
new Tuple<>(payload.getDataUtf8(), payload.getMetadataUtf8());
219213
String marble = requestStreamMarbles.get(initialPayload);
220214
consoleUtils.initialPayload(
221215
"Received Stream " + initialPayload.getK() + " " + initialPayload.getV());

rsocket-test/src/main/java/io/rsocket/test/BaseClientServerTest.java

Lines changed: 4 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -63,8 +63,7 @@ public void testRequestResponse1() {
6363
setup
6464
.getRSocket()
6565
.requestResponse(new PayloadImpl("hello", "metadata"))
66-
.map(
67-
payload -> StandardCharsets.UTF_8.decode(payload.getData()).toString()))
66+
.map(Payload::getDataUtf8))
6867
.doOnError(Throwable::printStackTrace)
6968
.count()
7069
.block();
@@ -81,8 +80,7 @@ public void testRequestResponse10() {
8180
setup
8281
.getRSocket()
8382
.requestResponse(new PayloadImpl("hello", "metadata"))
84-
.map(
85-
payload -> StandardCharsets.UTF_8.decode(payload.getData()).toString()))
83+
.map(Payload::getDataUtf8))
8684
.doOnError(Throwable::printStackTrace)
8785
.count()
8886
.block();
@@ -99,8 +97,7 @@ public void testRequestResponse100() {
9997
setup
10098
.getRSocket()
10199
.requestResponse(new PayloadImpl("hello", "metadata"))
102-
.map(
103-
payload -> StandardCharsets.UTF_8.decode(payload.getData()).toString()))
100+
.map(Payload::getDataUtf8))
104101
.doOnError(Throwable::printStackTrace)
105102
.count()
106103
.block();
@@ -117,8 +114,7 @@ public void testRequestResponse10_000() {
117114
setup
118115
.getRSocket()
119116
.requestResponse(new PayloadImpl("hello", "metadata"))
120-
.map(
121-
payload -> StandardCharsets.UTF_8.decode(payload.getData()).toString()))
117+
.map(Payload::getDataUtf8))
122118
.doOnError(Throwable::printStackTrace)
123119
.count()
124120
.block();

rsocket-transport-aeron/src/test/java/io/rsocket/aeron/ClientServerTest.java

Lines changed: 4 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -64,8 +64,7 @@ public void testRequestResponse1() {
6464
setup
6565
.getRSocket()
6666
.requestResponse(new PayloadImpl("hello", "metadata"))
67-
.map(
68-
payload -> StandardCharsets.UTF_8.decode(payload.getData()).toString()))
67+
.map(Payload::getDataUtf8))
6968
.doOnError(Throwable::printStackTrace)
7069
.count()
7170
.block();
@@ -82,8 +81,7 @@ public void testRequestResponse10() {
8281
setup
8382
.getRSocket()
8483
.requestResponse(new PayloadImpl("hello", "metadata"))
85-
.map(
86-
payload -> StandardCharsets.UTF_8.decode(payload.getData()).toString()))
84+
.map(Payload::getDataUtf8))
8785
.doOnError(Throwable::printStackTrace)
8886
.count()
8987
.block();
@@ -100,8 +98,7 @@ public void testRequestResponse100() {
10098
setup
10199
.getRSocket()
102100
.requestResponse(new PayloadImpl("hello", "metadata"))
103-
.map(
104-
payload -> StandardCharsets.UTF_8.decode(payload.getData()).toString()))
101+
.map(Payload::getDataUtf8))
105102
.doOnError(Throwable::printStackTrace)
106103
.count()
107104
.block();
@@ -118,8 +115,7 @@ public void testRequestResponse10_000() {
118115
setup
119116
.getRSocket()
120117
.requestResponse(new PayloadImpl("hello", "metadata"))
121-
.map(
122-
payload -> StandardCharsets.UTF_8.decode(payload.getData()).toString()))
118+
.map(Payload::getDataUtf8))
123119
.doOnError(Throwable::printStackTrace)
124120
.count()
125121
.block();

0 commit comments

Comments
 (0)
0