8000 Allow frames without metadata (#354) · chakra-coder/rsocket-java@79dd245 · GitHub
[go: up one dir, main page]

Skip to content

Commit 79dd245

Browse files
authored
Allow frames without metadata (rsocket#354)
* allow null metadata, "" or non null * add isFlagSet check
1 parent b945400 commit 79dd245

18 files changed

+408
-249
lines changed

rsocket-core/src/main/java/io/rsocket/ConnectionSetupPayload.java

Lines changed: 18 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -15,6 +15,8 @@
1515
*/
1616
package io.rsocket;
1717

18+
import static io.rsocket.frame.FrameHeaderFlyweight.FLAGS_M;
19+
1820
import io.rsocket.frame.SetupFrameFlyweight;
1921
import java.nio.ByteBuffer;
2022

@@ -35,7 +37,11 @@ public static ConnectionSetupPayload create(String metadataMimeType, String data
3537
public static Connecti 57AE onSetupPayload create(
3638
String metadataMimeType, String dataMimeType, Payload payload) {
3739
return new ConnectionSetupPayloadImpl(
38-
metadataMimeType, dataMimeType, payload.getData(), payload.getMetadata(), NO_FLAGS);
40+
metadataMimeType,
41+
dataMimeType,
42+
payload.getData(),
43+
payload.getMetadata(),
44+
payload.hasMetadata() ? FLAGS_M : 0);
3945
}
4046

4147
public static ConnectionSetupPayload create(
@@ -58,18 +64,21 @@ public static ConnectionSetupPayload create(final Frame setupFrame) {
5864

5965
public abstract String dataMimeType();
6066

61-
public int getFlags() {
62-
return HONOR_LEASE;
63-
}
67+
public abstract int getFlags();
6468

6569
public boolean willClientHonorLease() {
66-
return HONOR_LEASE == (getFlags() & HONOR_LEASE);
70+
return Frame.isFlagSet(getFlags(), HONOR_LEASE);
6771
}
6872

6973
public boolean doesClientRequestStrictInterpretation() {
7074
return STRICT_INTERPRETATION == (getFlags() & STRICT_INTERPRETATION);
7175
}
7276

77+
@Override
78+
public boolean hasMetadata() {
79+
return Frame.isFlagSet(getFlags(), FLAGS_M);
80+
}
81+
7382
private static final class ConnectionSetupPayloadImpl extends ConnectionSetupPayload {
7483

7584
private final String metadataMimeType;
@@ -89,6 +98,10 @@ public ConnectionSetupPayloadImpl(
8998
this.data = data;
9099
this.metadata = metadata;
91100
this.flags = flags;
101+
102+
if (!hasMetadata() && metadata.remaining() > 0) {
103+
throw new IllegalArgumentException("metadata flag incorrect");
104+
}
92105
}
93106

94107
@Override

rsocket-core/src/main/java/io/rsocket/Frame.java

Lines changed: 86 additions & 87 deletions
Original file line numberDiff line numberDiff line change
@@ -15,10 +15,9 @@
1515
*/
1616
package io.rsocket;
1717

18-
import io.netty.buffer.ByteBuf;
19-
import io.netty.buffer.ByteBufAllocator;
20-
import io.netty.buffer.ByteBufHolder;
21-
import io.netty.buffer.Unpooled;
18+
import static io.rsocket.frame.FrameHeaderFlyweight.FLAGS_M;
19+
20+
import io.netty.buffer.*;
2221
import io.netty.util.IllegalReferenceCountException;
2322
import io.netty.util.Recycler;
2423
import io.netty.util.Recycler.Handle;
@@ -43,12 +42,7 @@
4342
* <p>This provides encoding, decoding and field accessors.
4443
*/
4544
public class Frame implements ByteBufHolder {
46-
47-
private static final Logger logger = LoggerFactory.getLogger(Frame.class);
48-
4945
public static final ByteBuffer NULL_BYTEBUFFER = ByteBuffer.allocateDirect(0);
50-
public static final int METADATA_MTU = 32 * 1024;
51-
public static final int DATA_MTU = 32 * 1024;
5246

5347
private static final Recycler<Frame> RECYCLER =
5448
new Recycler<Frame>() {
@@ -197,7 +191,9 @@ public boolean release(int decrement) {
197191
*/
198192
public ByteBuffer getMetadata() {
199193
final ByteBuf metadata = FrameHeaderFlyweight.sliceFrameMetadata(content);
200-
if (metadata.readableBytes() > 0) {
194+
if (metadata == null) {
195+
return NULL_BYTEBUFFER;
196+
} else if (metadata.readableBytes() > 0) {
201197
final ByteBuffer buffer = ByteBuffer.allocateDirect(metadata.readableBytes());
202198
metadata.readBytes(buffer);
203199
buffer.flip();
@@ -266,6 +262,18 @@ public static Frame from(final ByteBuf content) {
266262
return frame;
267263
}
268264

265+
public static boolean isFlagSet(int flags, int checkedFlag) {
266+
return (flags & checkedFlag) == checkedFlag;
267+
}
268+
269+
public static int setFlag(int current, int toSet) {
270+
return current | toSet;
271+
}
272+
273+
public boolean hasMetadata() {
274+
return Frame.isFlagSet(this.flags(), FLAGS_M);
275+
}
276+
269277
public String getDataUtf8() {
270278
return StandardCharsets.UTF_8.decode(getData()).toString();
271279
}
@@ -290,7 +298,7 @@ public static Frame from(
290298
String dataMimeType,
291299
Payload payload) {
292300
final ByteBuf metadata =
293-
payload.getMetadata() != null
301+
payload.hasMetadata()
294302
? Unpooled.wrappedBuffer(payload.getMetadata())
295303
: Unpooled.EMPTY_BUFFER;
296304
final ByteBuf data =
@@ -367,7 +375,7 @@ public static Frame from(int streamId, final Throwable throwable, ByteBuf dataBu
367375
final Frame frame = RECYCLER.get();
368376
frame.content =
369377
ByteBufAllocator.DEFAULT.buffer(
370-
ErrorFrameFlyweight.computeFrameLength(0, dataBuffer.readableBytes()));
378+
ErrorFrameFlyweight.computeFrameLength(dataBuffer.readableBytes()));
371379
frame.content.writerIndex(
372380
ErrorFrameFlyweight.encode(frame.content, streamId, code, dataBuffer));
373381
return frame;
@@ -452,28 +460,30 @@ public static Frame from(int streamId, FrameType type, Payload payload, int init
452460
if (initialRequestN < 1) {
453461
throw new IllegalStateException("initial request n must be greater than 0");
454462
}
455-
final ByteBuf metadata =
456-
payload.getMetadata() != null
457-
? Unpooled.wrappedBuffer(payload.getMetadata())
458-
: Unpooled.EMPTY_BUFFER;
459-
final ByteBuf data =
460-
payload.getData() != null
461-
? Unpooled.wrappedBuffer(payload.getData())
462-
: Unpooled.EMPTY_BUFFER;
463+
final @Nullable ByteBuf metadata =
464+
payload.hasMetadata() ? Unpooled.wrappedBuffer(payload.getMetadata()) : null;
465+
final ByteBuf data = Unpooled.wrappedBuffer(payload.getData());
463466

464467
final Frame frame = RECYCLER.get();
465468
frame.content =
466469
ByteBufAllocator.DEFAULT.buffer(
467470
RequestFrameFlyweight.computeFrameLength(
468-
type, metadata.readableBytes(), data.readableBytes()));
471+
type, metadata != null ? metadata.readableBytes() : null, data.readableBytes()));
469472

470473
if (type.hasInitialRequestN()) {
471474
frame.content.writerIndex(
472475
RequestFrameFlyweight.encode(
473-
frame.content, streamId, 0, type, initialRequestN, metadata, data));
476+
frame.content,
477+
streamId,
478+
metadata != null ? FLAGS_M : 0,
479+
type,
480+
initialRequestN,
481+
metadata,
482+
data));
474483
} else {
475484
frame.content.writerIndex(
476-
RequestFrameFlyweight.encode(frame.content, streamId, 0, type, metadata, data));
485+
RequestFrameFlyweight.encode(
486+
frame.content, streamId, metadata != null ? FLAGS_M : 0, type, metadata, data));
477487
}
478488

479489
return frame;
@@ -482,7 +492,7 @@ public static Frame from(int streamId, FrameType type, Payload payload, int init
482492
public static Frame from(int streamId, FrameType type, int flags) {
483493
final Frame frame = RECYCLER.get();
484494
frame.content =
485-
ByteBufAllocator.DEFAULT.buffer(RequestFrameFlyweight.computeFrameLength(type, 0, 0));
495+
ByteBufAllocator.DEFAULT.buffer(RequestFrameFlyweight.computeFrameLength(type, null, 0));
486496
frame.content.writerIndex(
487497
RequestFrameFlyweight.encode(
488498
frame.content, streamId, flags, type, Unpooled.EMPTY_BUFFER, Unpooled.EMPTY_BUFFER));
@@ -543,32 +553,27 @@ public static class PayloadFrame {
543553
private PayloadFrame() {}
544554

545555
public static Frame from(int streamId, FrameType type) {
546-
return from(streamId, type, Unpooled.EMPTY_BUFFER, Unpooled.EMPTY_BUFFER, 0);
556+
return from(streamId, type, null, Unpooled.EMPTY_BUFFER, 0);
547557
}
548558

549559
public static Frame from(int streamId, FrameType type, Payload payload) {
550-
return from(streamId, type, payload, 0);
560+
return from(streamId, type, payload, payload.hasMetadata() ? FLAGS_M : 0);
551561
}
552562

553563
public static Frame from(int streamId, FrameType type, Payload payload, int flags) {
554564
final ByteBuf metadata =
555-
payload.getMetadata() != null
556-
? Unpooled.wrappedBuffer(payload.getMetadata())
557-
: Unpooled.EMPTY_BUFFER;
558-
final ByteBuf data =
559-
payload.getData() != null
560-
? Unpooled.wrappedBuffer(payload.getData())
561-
: Unpooled.EMPTY_BUFFER;
565+
payload.hasMetadata() ? Unpooled.wrappedBuffer(payload.getMetadata()) : null;
566+
final ByteBuf data = Unpooled.wrappedBuffer(payload.getData());
562567
return from(streamId, type, metadata, data, flags);
563568
}
564569

565570
public static Frame from(
566-
int streamId, FrameType type, ByteBuf metadata, ByteBuf data, int flags) {
571+
int streamId, FrameType type, @Nullable ByteBuf metadata, ByteBuf data, int flags) {
567572
final Frame frame = RECYCLER.get();
568573
frame.content =
569574
ByteBufAllocator.DEFAULT.buffer(
570575
FrameHeaderFlyweight.computeFrameHeaderLength(
571-
type, metadata.readableBytes(), data.readableBytes()));
576+
type, metadata != null ? metadata.readableBytes() : null, data.readableBytes()));
572577
frame.content.writerIndex(
573578
FrameHeaderFlyweight.encode(frame.content, streamId, flags, type, metadata, data));
574579
return frame;
@@ -583,15 +588,10 @@ public static Frame from(int streamId) {
583588
final Frame frame = RECYCLER.get();
584589
frame.content =
585590
ByteBufAllocator.DEFAULT.buffer(
586-
FrameHeaderFlyweight.computeFrameHeaderLength(FrameType.CANCEL, 0, 0));
591+
FrameHeaderFlyweight.computeFrameHeaderLength(FrameType.CANCEL, null, 0));
587592
frame.content.writerIndex(
588593
FrameHeaderFlyweight.encode(
589-
frame.content,
590-
streamId,
591-
0,
592-
FrameType.CANCEL,
593-
Unpooled.EMPTY_BUFFER,
594-
Unpooled.EMPTY_BUFFER));
594+
frame.content, streamId, 0, FrameType.CANCEL, null, Unpooled.EMPTY_BUFFER));
595595
return frame;
596596
}
597597
}
@@ -636,58 +636,57 @@ public String toString() {
636636
long streamId = -1;
637637
String additionalFlags = "";
638638

639-
try {
640-
type = FrameHeaderFlyweight.frameType(content);
639+
type = FrameHeaderFlyweight.frameType(content);
640+
641+
@Nullable ByteBuf metadata = FrameHeaderFlyweight.sliceFrameMetadata(content);
641642

642-
ByteBuf metadata = FrameHeaderFlyweight.sliceFrameMetadata(content);
643+
if (metadata != null) {
643644
if (0 < metadata.readableBytes()) {
644645
payload.append(
645646
String.format("metadata: \"%s\" ", metadata.toString(StandardCharsets.UTF_8)));
646647
}
648+
}
647649

648-
ByteBuf data = FrameHeaderFlyweight.sliceFrameData(content);
649-
if (0 < data.readableBytes()) {
650-
payload.append(String.format("data: \"%s\" ", data.toString(StandardCharsets.UTF_8)));
651-
}
652-
653-
streamId = FrameHeaderFlyweight.streamId(content);
654-
655-
switch (type) {
656-
case LEASE:
657-
additionalFlags =
658-
" Permits: " + Lease.numberOfRequests(this) + " TTL: " + Lease.ttl(this);
659-
break;
660-
case REQUEST_N:
661-
additionalFlags = " RequestN: " + RequestN.requestN(this);
662-
break;
663-
case KEEPALIVE:
664-
additionalFlags = " Respond flag: " + Keepalive.hasRespondFlag(this);
665-
break;
666-
case REQUEST_STREAM:
667-
case REQUEST_CHANNEL:
668-
additionalFlags = " Initial Request N: " + Request.initialRequestN(this);
669-
break;
670-
case ERROR:
671-
additionalFlags = " Error code: " + Error.errorCode(this);
672-
break;
673-
case SETUP:
674-
int version = Setup.version(this);
675-
additionalFlags =
676-
" Version: "
677-
+ VersionFlyweight.toString(version)
678-
+ " keep-alive interval: "
679-
+ Setup.keepaliveInterval(this)
680-
+ " max lifetime: "
681-
+ Setup.maxLifetime(this)
682-
+ " metadata mime type: "
683-
+ Setup.metadataMimeType(this)
684-
+ " data mime type: "
685-
+ Setup.dataMimeType(this);
686-
break;
687-
}
688-
} catch (Exception e) {
689-
logger.error("Error generating toString, ignored.", e);
650+
ByteBuf data = FrameHeaderFlyweight.sliceFrameData(content);
651+
if (0 < data.readableBytes()) {
652+
payload.append(String.format("data: \"%s\" ", data.toString(StandardCharsets.UTF_8)));
653+
}
654+
655+
streamId = FrameHeaderFlyweight.streamId(content);
656+
657+
switch (type) {
658+
case LEASE:
659+
additionalFlags = " Permits: " + Lease.numberOfRequests(this) + " TTL: " + Lease.ttl(this);
660+
break;
661+
case REQUEST_N:
662+
additionalFlags = " RequestN: " + RequestN.requestN(this);
663+
break;
664+
case KEEPALIVE:
665+
additionalFlags = " Respond flag: " + Keepalive.hasRespondFlag(this);
666+
break;
667+
case REQUEST_STREAM:
668+
case REQUEST_CHANNEL:
669+
additionalFlags = " Initial Request N: " + Request.initialRequestN(this);
670+
break;
671+
case ERROR:
672+
additionalFlags = " Error code: " + Error.errorCode(this);
673+
break;
674+
case SETUP:
675+
int version = Setup.version(this);
676+
additionalFlags =
677+
" Version: "
678+
+ VersionFlyweight.toString(version)
679+
+ " keep-alive interval: "
680+
+ Setup.keepaliveInterval(this)
681+
+ " max lifetime: "
682+
+ Setup.maxLifetime(this)
683+
+ " metadata mime type: "
684+
+ Setup.metadataMimeType(this)
685+
+ " data mime type: "
686+
+ Setup.dataMimeType(this);
687+
break;
690688
}
689+
691690
return "Frame => Stream ID: "
692691
+ streamId
693692
+ " Type: "

rsocket-core/src/main/java/io/rsocket/Payload.java

Lines changed: 16 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -20,8 +20,24 @@
2020

2121
/** Payload of a {@link Frame}. */
2222
public interface Payload {
23+
/**
24+
* Returns whether the payload has metadata, useful for tell if metadata is empty or not present.
25+
*/
26+
boolean hasMetadata();
27+
28+
/**
29+
* Returns the Payload metadata. Always non-null, check {@link #hasMetadata()} to differentiate
30+
* null from "".
31+
*
32+
* @return payload metadata.
33+
*/
2334
ByteBuffer getMetadata();
2435

36+
/**
37+
* Returns the Payload data. Always non-null.
38+
*
39+
* @return payload data.
40+
*/
2541
ByteBuffer getData();
2642

2743
default String getMetadataUtf8() {

0 commit comments

Comments
 (0)
0