15
15
*/
16
16
package io .rsocket ;
17
17
18
- import io .netty .buffer .ByteBuf ;
19
- import io .netty .buffer .ByteBufAllocator ;
20
- import io .netty .buffer .ByteBufHolder ;
21
- import io .netty .buffer .Unpooled ;
18
+ import static io .rsocket .frame .FrameHeaderFlyweight .FLAGS_M ;
19
+
20
+ import io .netty .buffer .*;
22
21
import io .netty .util .IllegalReferenceCountException ;
23
22
import io .netty .util .Recycler ;
24
23
import io .netty .util .Recycler .Handle ;
43
42
* <p>This provides encoding, decoding and field accessors.
44
43
*/
45
44
public class Frame implements ByteBufHolder {
46
-
47
- private static final Logger logger = LoggerFactory .getLogger (Frame .class );
48
-
49
45
public static final ByteBuffer NULL_BYTEBUFFER = ByteBuffer .allocateDirect (0 );
50
- public static final int METADATA_MTU = 32 * 1024 ;
51
- public static final int DATA_MTU = 32 * 1024 ;
52
46
53
47
private static final Recycler <Frame > RECYCLER =
54
48
new Recycler <Frame >() {
@@ -197,7 +191,9 @@ public boolean release(int decrement) {
197
191
*/
198
192
public ByteBuffer getMetadata () {
199
193
final ByteBuf metadata = FrameHeaderFlyweight .sliceFrameMetadata (content );
200
- if (metadata .readableBytes () > 0 ) {
194
+ if (metadata == null ) {
195
+ return NULL_BYTEBUFFER ;
196
+ } else if (metadata .readableBytes () > 0 ) {
201
197
final ByteBuffer buffer = ByteBuffer .allocateDirect (metadata .readableBytes ());
202
198
metadata .readBytes (buffer );
203
199
buffer .flip ();
@@ -266,6 +262,18 @@ public static Frame from(final ByteBuf content) {
266
262
return frame ;
267
263
}
268
264
265
+ public static boolean isFlagSet (int flags , int checkedFlag ) {
266
+ return (flags & checkedFlag ) == checkedFlag ;
267
+ }
268
+
269
+ public static int setFlag (int current , int toSet ) {
270
+ return current | toSet ;
271
+ }
272
+
273
+ public boolean hasMetadata () {
274
+ return Frame .isFlagSet (this .flags (), FLAGS_M );
275
+ }
276
+
269
277
public String getDataUtf8 () {
270
278
return StandardCharsets .UTF_8 .decode (getData ()).toString ();
271
279
}
@@ -290,7 +298,7 @@ public static Frame from(
290
298
String dataMimeType ,
291
299
Payload payload ) {
292
300
final ByteBuf metadata =
293
- payload .getMetadata () != null
301
+ payload .hasMetadata ()
294
302
? Unpooled .wrappedBuffer (payload .getMetadata ())
295
303
: Unpooled .EMPTY_BUFFER ;
296
304
final ByteBuf data =
@@ -367,7 +375,7 @@ public static Frame from(int streamId, final Throwable throwable, ByteBuf dataBu
367
375
final Frame frame = RECYCLER .get ();
368
376
frame .content =
369
377
ByteBufAllocator .DEFAULT .buffer (
370
- ErrorFrameFlyweight .computeFrameLength (0 , dataBuffer .readableBytes ()));
378
+ ErrorFrameFlyweight .computeFrameLength (dataBuffer .readableBytes ()));
371
379
frame .content .writerIndex (
372
380
ErrorFrameFlyweight .encode (frame .content , streamId , code , dataBuffer ));
373
381
return frame ;
@@ -452,28 +460,30 @@ public static Frame from(int streamId, FrameType type, Payload payload, int init
452
460
if (initialRequestN < 1 ) {
453
461
throw new IllegalStateException ("initial request n must be greater than 0" );
454
462
}
455
- final ByteBuf metadata =
456
- payload .getMetadata () != null
457
- ? Unpooled .wrappedBuffer (payload .getMetadata ())
458
- : Unpooled .EMPTY_BUFFER ;
459
- final ByteBuf data =
460
- payload .getData () != null
461
- ? Unpooled .wrappedBuffer (payload .getData ())
462
- : Unpooled .EMPTY_BUFFER ;
463
+ final @ Nullable ByteBuf metadata =
464
+ payload .hasMetadata () ? Unpooled .wrappedBuffer (payload .getMetadata ()) : null ;
465
+ final ByteBuf data = Unpooled .wrappedBuffer (payload .getData ());
463
466
464
467
final Frame frame = RECYCLER .get ();
465
468
frame .content =
466
469
ByteBufAllocator .DEFAULT .buffer (
467
470
RequestFrameFlyweight .computeFrameLength (
468
- type , metadata .readableBytes (), data .readableBytes ()));
471
+ type , metadata != null ? metadata .readableBytes () : null , data .readableBytes ()));
469
472
470
473
if (type .hasInitialRequestN ()) {
471
474
frame .content .writerIndex (
472
475
RequestFrameFlyweight .encode (
473
- frame .content , streamId , 0 , type , initialRequestN , metadata , data ));
476
+ frame .content ,
477
+ streamId ,
478
+ metadata != null ? FLAGS_M : 0 ,
479
+ type ,
480
+ initialRequestN ,
481
+ metadata ,
482
+ data ));
474
483
} else {
475
484
frame .content .writerIndex (
476
- RequestFrameFlyweight .encode (frame .content , streamId , 0 , type , metadata , data ));
485
+ RequestFrameFlyweight .encode (
486
+ frame .content , streamId , metadata != null ? FLAGS_M : 0 , type , metadata , data ));
477
487
}
478
488
479
489
return frame ;
@@ -482,7 +492,7 @@ public static Frame from(int streamId, FrameType type, Payload payload, int init
482
492
public static Frame from (int streamId , FrameType type , int flags ) {
483
493
final Frame frame = RECYCLER .get ();
484
494
frame .content =
485
- ByteBufAllocator .DEFAULT .buffer (RequestFrameFlyweight .computeFrameLength (type , 0 , 0 ));
495
+ ByteBufAllocator .DEFAULT .buffer (RequestFrameFlyweight .computeFrameLength (type , null , 0 ));
486
496
frame .content .writerIndex (
487
497
RequestFrameFlyweight .encode (
488
498
frame .content , streamId , flags , type , Unpooled .EMPTY_BUFFER , Unpooled .EMPTY_BUFFER ));
@@ -543,32 +553,27 @@ public static class PayloadFrame {
543
553
private PayloadFrame () {}
544
554
545
555
public static Frame from (int streamId , FrameType type ) {
546
- return from (streamId , type , Unpooled . EMPTY_BUFFER , Unpooled .EMPTY_BUFFER , 0 );
556
+ return from (streamId , type , null , Unpooled .EMPTY_BUFFER , 0 );
547
557
}
548
558
549
559
public static Frame from (int streamId , FrameType type , Payload payload ) {
550
- return from (streamId , type , payload , 0 );
560
+ return from (streamId , type , payload , payload . hasMetadata () ? FLAGS_M : 0 );
551
561
}
552
562
553
563
public static Frame from (int streamId , FrameType type , Payload payload , int flags ) {
554
564
final ByteBuf metadata =
555
- payload .getMetadata () != null
556
- ? Unpooled .wrappedBuffer (payload .getMetadata ())
557
- : Unpooled .EMPTY_BUFFER ;
558
- final ByteBuf data =
559
- payload .getData () != null
560
- ? Unpooled .wrappedBuffer (payload .getData ())
561
- : Unpooled .EMPTY_BUFFER ;
565
+ payload .hasMetadata () ? Unpooled .wrappedBuffer (payload .getMetadata ()) : null ;
566
+ final ByteBuf data = Unpooled .wrappedBuffer (payload .getData ());
562
567
return from (streamId , type , metadata , data , flags );
563
568
}
564
569
565
570
public static Frame from (
566
- int streamId , FrameType type , ByteBuf metadata , ByteBuf data , int flags ) {
571
+ int streamId , FrameType type , @ Nullable ByteBuf metadata , ByteBuf data , int flags ) {
567
572
final Frame frame = RECYCLER .get ();
568
573
frame .content =
569
574
ByteBufAllocator .DEFAULT .buffer (
570
575
FrameHeaderFlyweight .computeFrameHeaderLength (
571
- type , metadata .readableBytes (), data .readableBytes ()));
576
+ type , metadata != null ? metadata .readableBytes () : null , data .readableBytes ()));
572
577
frame .content .writerIndex (
573
578
FrameHeaderFlyweight .encode (frame .content , streamId , flags , type , metadata , data ));
574
579
return frame ;
@@ -583,15 +588,10 @@ public static Frame from(int streamId) {
583
588
final Frame frame = RECYCLER .get ();
584
589
frame .content =
585
590
ByteBufAllocator .DEFAULT .buffer (
586
- FrameHeaderFlyweight .computeFrameHeaderLength (FrameType .CANCEL , 0 , 0 ));
591
+ FrameHeaderFlyweight .computeFrameHeaderLength (FrameType .CANCEL , null , 0 ));
587
592
frame .content .writerIndex (
588
593
FrameHeaderFlyweight .encode (
589
- frame .content ,
590
- streamId ,
591
- 0 ,
592
- FrameType .CANCEL ,
593
- Unpooled .EMPTY_BUFFER ,
594
- Unpooled .EMPTY_BUFFER ));
594
+ frame .content , streamId , 0 , FrameType .CANCEL , null , Unpooled .EMPTY_BUFFER ));
595
595
return frame ;
596
596
}
597
597
}
@@ -636,58 +636,57 @@ public String toString() {
636
636
long streamId = -1 ;
637
637
String additionalFlags = "" ;
638
638
639
- try {
640
- type = FrameHeaderFlyweight .frameType (content );
639
+ type = FrameHeaderFlyweight .frameType (content );
640
+
641
+ @ Nullable ByteBuf metadata = FrameHeaderFlyweight .sliceFrameMetadata (content );
641
642
642
- ByteBuf metadata = FrameHeaderFlyweight . sliceFrameMetadata ( content );
643
+ if ( metadata != null ) {
643
644
if (0 < metadata .readableBytes ()) {
644
645
payload .append (
645
646
String .format ("metadata: \" %s\" " , metadata .toString (StandardCharsets .UTF_8 )));
646
647
}
648
+ }
647
649
648
- ByteBuf data = FrameHeaderFlyweight .sliceFrameData (content );
649
- if (0 < data .readableBytes ()) {
650
- payload .append (String .format ("data: \" %s\" " , data .toString (StandardCharsets .UTF_8 )));
651
- }
652
-
653
- streamId = FrameHeaderFlyweight .streamId (content );
654
-
655
- switch (type ) {
656
- case LEASE :
657
- additionalFlags =
658
- " Permits: " + Lease .numberOfRequests (this ) + " TTL: " + Lease .ttl (this );
659
- break ;
660
- case REQUEST_N :
661
- additionalFlags = " RequestN: " + RequestN .requestN (this );
662
- break ;
663
- case KEEPALIVE :
664
- additionalFlags = " Respond flag: " + Keepalive .hasRespondFlag (this );
665
- break ;
666
- case REQUEST_STREAM :
667
- case REQUEST_CHANNEL :
668
- additionalFlags = " Initial Request N: " + Request .initialRequestN (this );
669
- break ;
670
- case ERROR :
671
- additionalFlags = " Error code: " + Error .errorCode (this );
672
- break ;
673
- case SETUP :
674
- int version = Setup .version (this );
675
- additionalFlags =
676
- " Version: "
677
- + VersionFlyweight .toString (version )
678
- + " keep-alive interval: "
679
- + Setup .keepaliveInterval (this )
680
- + " max lifetime: "
681
- + Setup .maxLifetime (this )
682
- + " metadata mime type: "
683
- + Setup .metadataMimeType (this )
684
- + " data mime type: "
685
- + Setup .dataMimeType (this );
686
- break ;
687
- }
688
- } catch (Exception e ) {
689
- logger .error ("Error generating toString, ignored." , e );
650
+ ByteBuf data = FrameHeaderFlyweight .sliceFrameData (content );
651
+ if (0 < data .readableBytes ()) {
652
+ payload .append (String .format ("data: \" %s\" " , data .toString (StandardCharsets .UTF_8 )));
653
+ }
654
+
655
+ streamId = FrameHeaderFlyweight .streamId (content );
656
+
657
+ switch (type ) {
658
+ case LEASE :
659
+ additionalFlags = " Permits: " + Lease .numberOfRequests (this ) + " TTL: " + Lease .ttl (this );
660
+ break ;
661
+ case REQUEST_N :
662
+ additionalFlags = " RequestN: " + RequestN .requestN (this );
663
+ break ;
664
+ case KEEPALIVE :
665
+ additionalFlags = " Respond flag: " + Keepalive .hasRespondFlag (this );
666
+ break ;
667
+ case REQUEST_STREAM :
668
+ case REQUEST_CHANNEL :
669
+ additionalFlags = " Initial Request N: " + Request .initialRequestN (this );
670
+ break ;
671
+ case ERROR :
672
+ additionalFlags = " Error code: " + Error .errorCode (this );
673
+ break ;
674
+ case SETUP :
675
+ int version = Setup .version (this );
676
+ additionalFlags =
677
+ " Version: "
678
+ + VersionFlyweight .toString (version )
679
+ + " keep-alive interval: "
680
+ + Setup .keepaliveInterval (this )
681
+ + " max lifetime: "
682
+ + Setup .maxLifetime (this )
683
+ + " metadata mime type: "
684
+ + Setup .metadataMimeType (this )
685
+ + " data mime type: "
686
+ + Setup .dataMimeType (this );
687
+ break ;
690
688
}
689
+
691
690
return "Frame => Stream ID: "
692
691
+ streamId
693
692
+ " Type: "
0 commit comments