From d6d63dbb12db3fce22c4a9bea10ef27dc61ef61d Mon Sep 17 00:00:00 2001 From: larskarg Date: Tue, 11 Oct 2022 01:49:18 +0000 Subject: [PATCH 1/6] Expose read_json --- datafusion/tests/data_test_context/data.jsonl | 3 ++ datafusion/tests/test_context.py | 13 +++++++ src/context.rs | 36 ++++++++++++++++++- 3 files changed, 51 insertions(+), 1 deletion(-) create mode 100644 datafusion/tests/data_test_context/data.jsonl diff --git a/datafusion/tests/data_test_context/data.jsonl b/datafusion/tests/data_test_context/data.jsonl new file mode 100644 index 000000000..a20a1fbb9 --- /dev/null +++ b/datafusion/tests/data_test_context/data.jsonl @@ -0,0 +1,3 @@ +{"A": "a", "B": 1} +{"A": "b", "B": 2} +{"A": "c", "B": 3} \ No newline at end of file diff --git a/datafusion/tests/test_context.py b/datafusion/tests/test_context.py index 324bbec6f..4dde98dba 100644 --- a/datafusion/tests/test_context.py +++ b/datafusion/tests/test_context.py @@ -15,6 +15,8 @@ # specific language governing permissions and limitations # under the License. +import os + import pyarrow as pa import pyarrow.dataset as ds @@ -179,3 +181,14 @@ def test_table_exist(ctx): ctx.register_dataset("t", dataset) assert ctx.table_exist("t") is True + + +def test_read_json(ctx): + path = os.path.dirname(os.path.abspath(__file__)) + data_path = os.path.join(path, "data_test_context", "data.jsonl") + df = ctx.read_json(data_path, file_extension=".jsonl") + + result = df.collect() + + assert result[0].column(0) == pa.array(["a", "b", "c"]) + assert result[0].column(1) == pa.array([1, 2, 3]) diff --git a/src/context.rs b/src/context.rs index 25d08ef8e..36c3a57f2 100644 --- a/src/context.rs +++ b/src/context.rs @@ -28,7 +28,7 @@ use datafusion::arrow::record_batch::RecordBatch; use datafusion::datasource::datasource::TableProvider; use datafusion::datasource::MemTable; use datafusion::execution::context::{SessionConfig, SessionContext}; -use datafusion::prelude::{CsvReadOptions, ParquetReadOptions}; +use datafusion::prelude::{CsvReadOptions, ParquetReadOptions, NdJsonReadOptions}; use crate::catalog::{PyCatalog, PyTable}; use crate::dataframe::PyDataFrame; @@ -264,4 +264,38 @@ impl PySessionContext { fn session_id(&self) -> PyResult { Ok(self.ctx.session_id()) } + + #[allow(clippy::too_many_arguments)] + #[args( + schema = "None", + schema_infer_max_records = "1000", + file_extension = "\".json\"", + table_partition_cols = "vec![]", + )] + fn read_json( + &mut self, + path: PathBuf, + schema: Option, + schema_infer_max_records: usize, + file_extension: &str, + table_partition_cols: Vec, + py: Python + ) -> PyResult { + let path = path + .to_str() + .ok_or_else(|| PyValueError::new_err("Unable to convert path to a string"))?; + + let mut options = NdJsonReadOptions::default() + .table_partition_cols(table_partition_cols); + options.schema = match schema { + Some(x) => Some(Arc::new(x)), + None => None + }; + options.schema_infer_max_records = schema_infer_max_records; + options.file_extension = file_extension; + + let result = self.ctx.read_json(path, options); + let df = wait_for_future(py, result).map_err(DataFusionError::from)?; + Ok(PyDataFrame::new(df)) + } } From bd79f7d18ba508ff4b1328fd07d2dc2ddfd0b753 Mon Sep 17 00:00:00 2001 From: larskarg Date: Wed, 12 Oct 2022 01:07:51 +0000 Subject: [PATCH 2/6] Add additional tests cases --- datafusion/tests/data_test_context/data.json | 3 +++ datafusion/tests/test_context.py | 27 +++++++++++++++++--- 2 files changed, 27 insertions(+), 3 deletions(-) create mode 100644 datafusion/tests/data_test_context/data.json diff --git a/datafusion/tests/data_test_context/data.json b/datafusion/tests/data_test_context/data.json new file mode 100644 index 000000000..a20a1fbb9 --- /dev/null +++ b/datafusion/tests/data_test_context/data.json @@ -0,0 +1,3 @@ +{"A": "a", "B": 1} +{"A": "b", "B": 2} +{"A": "c", "B": 3} \ No newline at end of file diff --git a/datafusion/tests/test_context.py b/datafusion/tests/test_context.py index 4dde98dba..3dbb2e96e 100644 --- a/datafusion/tests/test_context.py +++ b/datafusion/tests/test_context.py @@ -185,9 +185,30 @@ def test_table_exist(ctx): def test_read_json(ctx): path = os.path.dirname(os.path.abspath(__file__)) - data_path = os.path.join(path, "data_test_context", "data.jsonl") - df = ctx.read_json(data_path, file_extension=".jsonl") - + + # Default + test_data_path = os.path.join(path, "data_test_context", "data.json") + df = ctx.read_json(test_data_path) + result = df.collect() + + assert result[0].column(0) == pa.array(["a", "b", "c"]) + assert result[0].column(1) == pa.array([1, 2, 3]) + + # Schema + schema = pa.schema( + [ + pa.field("A", pa.string(), nullable=True), + ] + ) + df = ctx.read_json(test_data_path, schema=schema) + result = df.collect() + + assert result[0].column(0) == pa.array(["a", "b", "c"]) + assert result[0].schema == schema + + # File extension + test_data_path = os.path.join(path, "data_test_context", "data.jsonl") + df = ctx.read_json(test_data_path, file_extension=".jsonl") result = df.collect() assert result[0].column(0) == pa.array(["a", "b", "c"]) From a3301255d09d52e115b022f49d3df77fabb8d304 Mon Sep 17 00:00:00 2001 From: larskarg Date: Wed, 12 Oct 2022 16:11:03 +0000 Subject: [PATCH 3/6] Address review comments --- src/context.rs | 5 +---- 1 file changed, 1 insertion(+), 4 deletions(-) diff --git a/src/context.rs b/src/context.rs index 36c3a57f2..809b10d79 100644 --- a/src/context.rs +++ b/src/context.rs @@ -287,10 +287,7 @@ impl PySessionContext { let mut options = NdJsonReadOptions::default() .table_partition_cols(table_partition_cols); - options.schema = match schema { - Some(x) => Some(Arc::new(x)), - None => None - }; + options.schema = schema.map(Arc::new); options.schema_infer_max_records = schema_infer_max_records; options.file_extension = file_extension; From 1da292d8eb069bea0bd5cffce5ffe809104a9a7b Mon Sep 17 00:00:00 2001 From: Lars Date: Wed, 26 Oct 2022 20:34:09 -0500 Subject: [PATCH 4/6] Fix Release Audit Tool error --- datafusion/tests/data_test_context/data.jsonl | 3 --- datafusion/tests/test_context.py | 4 ++-- 2 files changed, 2 insertions(+), 5 deletions(-) delete mode 100644 datafusion/tests/data_test_context/data.jsonl diff --git a/datafusion/tests/data_test_context/data.jsonl b/datafusion/tests/data_test_context/data.jsonl deleted file mode 100644 index a20a1fbb9..000000000 --- a/datafusion/tests/data_test_context/data.jsonl +++ /dev/null @@ -1,3 +0,0 @@ -{"A": "a", "B": 1} -{"A": "b", "B": 2} -{"A": "c", "B": 3} \ No newline at end of file diff --git a/datafusion/tests/test_context.py b/datafusion/tests/test_context.py index 7838f8739..50bdf4363 100644 --- a/datafusion/tests/test_context.py +++ b/datafusion/tests/test_context.py @@ -207,8 +207,8 @@ def test_read_json(ctx): assert result[0].schema == schema # File extension - test_data_path = os.path.join(path, "data_test_context", "data.jsonl") - df = ctx.read_json(test_data_path, file_extension=".jsonl") + test_data_path = os.path.join(path, "data_test_context", "data.json") + df = ctx.read_json(test_data_path, file_extension=".json") result = df.collect() assert result[0].column(0) == pa.array(["a", "b", "c"]) From 39ef2f5ae26d3572e8607c87f17a72f872e3f5f2 Mon Sep 17 00:00:00 2001 From: larskarg Date: Wed, 26 Oct 2022 20:52:10 -0500 Subject: [PATCH 5/6] Fix fmt issues --- src/context.rs | 13 ++++++------- 1 file changed, 6 insertions(+), 7 deletions(-) diff --git a/src/context.rs b/src/context.rs index 8a02028fa..21b3f06c4 100644 --- a/src/context.rs +++ b/src/context.rs @@ -29,7 +29,7 @@ use datafusion::arrow::record_batch::RecordBatch; use datafusion::datasource::datasource::TableProvider; use datafusion::datasource::MemTable; use datafusion::execution::context::{SessionConfig, SessionContext}; -use datafusion::prelude::{AvroReadOptions, CsvReadOptions, ParquetReadOptions, NdJsonReadOptions}; +use datafusion::prelude::{AvroReadOptions, CsvReadOptions, NdJsonReadOptions, ParquetReadOptions}; use crate::catalog::{PyCatalog, PyTable}; use crate::dataframe::PyDataFrame; @@ -274,7 +274,7 @@ impl PySessionContext { schema = "None", schema_infer_max_records = "1000", file_extension = "\".json\"", - table_partition_cols = "vec![]", + table_partition_cols = "vec![]" )] fn read_json( &mut self, @@ -283,18 +283,17 @@ impl PySessionContext { schema_infer_max_records: usize, file_extension: &str, table_partition_cols: Vec, - py: Python + py: Python, ) -> PyResult { let path = path .to_str() .ok_or_else(|| PyValueError::new_err("Unable to convert path to a string"))?; - let mut options = NdJsonReadOptions::default() - .table_partition_cols(table_partition_cols); + let mut options = NdJsonReadOptions::default().table_partition_cols(table_partition_cols); options.schema = schema.map(|s| Arc::new(s.0)); options.schema_infer_max_records = schema_infer_max_records; options.file_extension = file_extension; - + let result = self.ctx.read_json(path, options); let df = wait_for_future(py, result).map_err(DataFusionError::from)?; Ok(PyDataFrame::new(df)) @@ -399,4 +398,4 @@ impl PySessionContext { let df = PyDataFrame::new(wait_for_future(py, result).map_err(DataFusionError::from)?); Ok(df) } -} \ No newline at end of file +} From 7332b796aceeec4edd0c2b6354220557ca727c5d Mon Sep 17 00:00:00 2001 From: larskarg Date: Tue, 1 Nov 2022 20:24:36 -0500 Subject: [PATCH 6/6] Add empty line to test data --- datafusion/tests/data_test_context/data.json | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/datafusion/tests/data_test_context/data.json b/datafusion/tests/data_test_context/data.json index a20a1fbb9..ff895b61f 100644 --- a/datafusion/tests/data_test_context/data.json +++ b/datafusion/tests/data_test_context/data.json @@ -1,3 +1,3 @@ {"A": "a", "B": 1} {"A": "b", "B": 2} -{"A": "c", "B": 3} \ No newline at end of file +{"A": "c", "B": 3}