@@ -615,11 +615,27 @@ impl Decoder {
615
615
self . tape_decoder . serialize ( rows)
616
616
}
617
617
618
+ /// True if the decoder is currently part way through decoding a record.
619
+ pub fn has_partial_record ( & self ) -> bool {
620
+ self . tape_decoder . has_partial_row ( )
621
+ }
622
+
623
+ /// The number of unflushed records, including the partially decoded record (if any).
624
+ pub fn len ( & self ) -> usize {
625
+ self . tape_decoder . num_buffered_rows ( )
626
+ }
627
+
628
+ /// True if there are no records to flush, i.e. [`Self::len`] is zero.
629
+ pub fn is_empty ( & self ) -> bool {
630
+ self . len ( ) == 0
631
+ }
632
+
618
633
/// Flushes the currently buffered data to a [`RecordBatch`]
619
634
///
620
- /// Returns `Ok(None)` if no buffered data
635
+ /// Returns `Ok(None)` if no buffered data, i.e. [`Self::is_empty`] is true.
621
636
///
622
- /// Note: if called part way through decoding a record, this will return an error
637
+ /// Note: This will return an error if called part way through decoding a record,
638
+ /// i.e. [`Self::has_partial_record`] is true.
623
639
pub fn flush ( & mut self ) -> Result < Option < RecordBatch > , ArrowError > {
624
640
let tape = self . tape_decoder . finish ( ) ?;
625
641
@@ -803,6 +819,20 @@ mod tests {
803
819
Field :: new( "e" , DataType :: Date64 , true ) ,
804
820
] ) ) ;
805
821
822
+ let mut decoder = ReaderBuilder :: new ( schema. clone ( ) ) . build_decoder ( ) . unwrap ( ) ;
823
+ assert ! ( decoder. is_empty( ) ) ;
824
+ assert_eq ! ( decoder. len( ) , 0 ) ;
825
+ assert ! ( !decoder. has_partial_record( ) ) ;
826
+ assert_eq ! ( decoder. decode( buf. as_bytes( ) ) . unwrap( ) , 221 ) ;
827
+ assert ! ( !decoder. is_empty( ) ) ;
828
+ assert_eq ! ( decoder. len( ) , 6 ) ;
829
+ assert ! ( !decoder. has_partial_record( ) ) ;
830
+ let batch = decoder. flush ( ) . unwrap ( ) . unwrap ( ) ;
831
+ assert_eq ! ( batch. num_rows( ) , 6 ) ;
832
+ assert ! ( decoder. is_empty( ) ) ;
833
+ assert_eq ! ( decoder. len( ) , 0 ) ;
834
+ assert ! ( !decoder. has_partial_record( ) ) ;
835
+
806
836
let batches = do_read ( buf, 1024 , false , false , schema) ;
807
837
assert_eq ! ( batches. len( ) , 1 ) ;
808
838
@@ -2158,6 +2188,14 @@ mod tests {
2158
2188
true ,
2159
2189
) ] ) ) ;
2160
2190
2191
+ let mut decoder = ReaderBuilder :: new ( schema. clone ( ) ) . build_decoder ( ) . unwrap ( ) ;
2192
+ let _ = decoder. decode ( r#"{"a": { "child":"# . as_bytes ( ) ) . unwrap ( ) ;
2193
+ assert ! ( decoder. tape_decoder. has_partial_row( ) ) ;
2194
+ assert_eq ! ( decoder. tape_decoder. num_buffered_rows( ) , 1 ) ;
2195
+ let _ = decoder. flush ( ) . unwrap_err ( ) ;
2196
+ assert ! ( decoder. tape_decoder. has_partial_row( ) ) ;
2197
+ assert_eq ! ( decoder. tape_decoder. num_buffered_rows( ) , 1 ) ;
2198
+
2161
2199
let parse_err = |s : & str | {
2162
2200
ReaderBuilder :: new ( schema. clone ( ) )
2163
2201
. build ( Cursor :: new ( s. as_bytes ( ) ) )
0 commit comments