8000 Expose record boundary information in JSON decoder (#7092) · apache/arrow-rs@27d2a75 · GitHub
[go: up one dir, main page]

Skip to content

Commit 27d2a75

Browse files
authored
Expose record boundary information in JSON decoder (#7092)
* Expose record boundary information in JSON decoder * fix doc links
1 parent 19f01e3 commit 27d2a75

File tree

2 files changed

+62
-3
lines changed

2 files changed

+62
-3
lines changed

arrow-json/src/reader/mod.rs

Lines changed: 40 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -615,11 +615,27 @@ impl Decoder {
615615
self.tape_decoder.serialize(rows)
616616
}
617617

618+
/// True if the decoder is currently part way through decoding a record.
619+
pub fn has_partial_record(&self) -> bool {
620+
self.tape_decoder.has_partial_row()
621+
}
622+
623+
/// The number of unflushed records, including the partially decoded record (if any).
624+
pub fn len(&self) -> usize {
625+
self.tape_decoder.num_buffered_rows()
626+
}
627+
628+
/// True if there are no records to flush, i.e. [`Self::len`] is zero.
629+
pub fn is_empty(&self) -> bool {
630+
self.len() == 0
631+
}
632+
618633
/// Flushes the currently buffered data to a [`RecordBatch`]
619634
///
620-
/// Returns `Ok(None)` if no buffered data
635+
/// Returns `Ok(None)` if no buffered data, i.e. [`Self::is_empty`] is true.
621636
///
622-
/// Note: if called part way through decoding a record, this will return an error
637+
/// Note: This will return an error if called part way through decoding a record,
638+
/// i.e. [`Self::has_partial_record`] is true.
623639
pub fn flush(&mut self) -> Result<Option<RecordBatch>, ArrowError> {
624640
let tape = self.tape_decoder.finish()?;
625641

@@ -803,6 +819,20 @@ mod tests {
803819
Field::new("e", DataType::Date64, true),
804820
]));
805821

822+
let mut decoder = ReaderBuilder::new(schema.clone()).build_decoder().unwrap();
823+
assert!(decoder.is_empty());
824+
assert_eq!(decoder.len(), 0);
825+
assert!(!decoder.has_partial_record());
826+
assert_eq!(decoder.decode(buf.as_bytes()).unwrap(), 221);
827+
assert!(!decoder.is_empty());
828+
assert_eq!(decoder.len(), 6);
829+
assert!(!decoder.has_partial_record());
830+
let batch = decoder.flush().unwrap().unwrap();
831+
assert_eq!(batch.num_rows(), 6);
832+
assert!(decoder.is_empty());
833+
assert_eq!(decoder.len(), 0);
834+
assert!(!decoder.has_partial_record());
835+
806836
let batches = do_read(buf, 1024, false, false, schema);
807837
assert_eq!(batches.len(), 1);
808838

@@ -2158,6 +2188,14 @@ mod tests {
21582188
true,
21592189
)]));
21602190

2191+
let mut decoder = ReaderBuilder::new(schema.clone()).build_decoder().unwrap();
2192+
let _ = decoder.decode(r#"{"a": { "child":"#.as_bytes()).unwrap();
2193+
assert!(decoder.tape_decoder.has_partial_row());
2194+
assert_eq!(decoder.tape_decoder.num_buffered_rows(), 1);
2195+
let _ = decoder.flush().unwrap_err();
2196+
assert!(decoder.tape_decoder.has_partial_row());
2197+
assert_eq!(decoder.tape_decoder.num_buffered_rows(), 1);
2198+
21612199
let parse_err = |s: &str| {
21622200
ReaderBuilder::new(schema.clone())
21632201
.build(Cursor::new(s.as_bytes()))

arrow-json/src/reader/tape.rs

Lines changed: 22 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -545,6 +545,17 @@ impl TapeDecoder {
545545
Ok(())
546546
}
547547

548+
/// The number of buffered rows, including the partially decoded row (if any).
549+
pub fn num_buffered_rows(&self) -> usize {
550+
self.cur_row
551+
}
552+
553+
/// True if the decoder is part way through decoding a row. If so, calling [`Self::finish`]
554+
/// would return an error.
555+
pub fn has_partial_row(&self) -> bool {
556+
!self.stack.is_empty()
557+
}
558+
548559
/// Finishes the current [`Tape`]
549560
pub fn finish(&self) -> Result<Tape<'_>, ArrowError> {
550561
if let Some(b) = self.stack.last() {
@@ -726,8 +737,12 @@ mod tests {
726737
"#;
727738
let mut decoder = TapeDecoder::new(16, 2);
728739
decoder.decode(a.as_bytes()).unwrap();
740+
assert!(!decoder.has_partial_row());
741+
assert_eq!(decoder.num_buffered_rows(), 7);
729742

730743
let finished = decoder.finish().unwrap();
744+
assert!(!decoder.has_partial_row());
745+
assert_eq!(decoder.num_buffered_rows(), 7); // didn't call clear() yet
731746
assert_eq!(
732747
finished.elements,
733748
&[
@@ -820,7 +835,11 @@ mod tests {
820835
0, 5, 10, 13, 14, 17, 19, 22, 25, 28, 29, 30, 31, 32, 32, 32, 33, 34, 35, 41, 47,
821836
52, 55, 57, 58, 59, 62, 63, 63, 66, 69, 70, 71, 72, 73, 74, 75, 76, 77
822837
]
823-
)
838+
);
839+
840+
decoder.clear();
841+
assert!(!decoder.has_partial_row());
842+
assert_eq!(decoder.num_buffered_rows(), 0);
824843
}
825844

826845
#[test]
@@ -874,6 +893,8 @@ mod tests {
874893
// Test truncation
875894
let mut decoder = TapeDecoder::new(16, 2);
876895
decoder.decode(b"{\"he").unwrap();
896+
assert!(decoder.has_partial_row());
897+
assert_eq!(decoder.num_buffered_rows(), 1);
877898
let err = decoder.finish().unwrap_err().to_string();
878899
assert_eq!(err, "Json error: Truncated record whilst reading string");
879900

0 commit comments

Comments
 (0)
0