8000 Add `read_json` to `SessionContext` (#56) · apache/datafusion-python@12bb587 · GitHub
[go: up one dir, main page]

Skip to content

Commit 12bb587

Browse files
larskargLars
andauthored
Add read_json to SessionContext (#56)
* Expose read_json * Add additional tests cases * Address review comments * Fix Release Audit Tool error * Fix fmt issues * Add empty line to test data Co-authored-by: Lars <lars@Larss-MacBook-Pro.local>
1 parent 940f118 commit 12bb587

File tree

3 files changed

+68
-1
lines changed

3 files changed

+68
-1
lines changed
Lines changed: 3 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,3 @@
1+
{"A": "a", "B": 1}
2+
{"A": "b", "B": 2}
3+
{"A": "c", "B": 3}

datafusion/tests/test_context.py

Lines changed: 34 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -15,6 +15,8 @@
1515
# specific language governing permissions and limitations
1616
# under the License.
1717

18+
import os
19+
1820
import pyarrow as pa
1921
import pyarrow.dataset as ds
2022

@@ -181,6 +183,38 @@ def test_table_exist(ctx):
181183
assert ctx.table_exist("t") is True
182184

183185

186+
def test_read_json(ctx):
187+
path = os.path.dirname(os.path.abspath(__file__))
188+
189+
# Default
190+
test_data_path = os.path.join(path, "data_test_context", "data.json")
191+
df = ctx.read_json(test_data_path)
192+
result = df.collect()
193+
194+
assert result[0].column(0) == pa.array(["a", "b", "c"])
195+
assert result[0].column(1) == pa.array([1, 2, 3])
196+
197+
# Schema
198+
schema = pa.schema(
199+
[
200+
pa.field("A", pa.string(), nullable=True),
201+
]
202+
)
203+
df = ctx.read_json(test_data_path, schema=schema)
204+
result = df.collect()
205+
206+
assert result[0].column(0) == pa.array(["a", "b", "c"])
207+
assert result[0].schema == schema
208+
209+
# File extension
210+
test_data_path = os.path.join(path, "data_test_context", "data.json")
211+
df = ctx.read_json(test_data_path, file_extension=".json")
212+
result = df.collect()
213+
214+
assert result[0].column(0) == pa.array(["a", "b", "c"])
215+
assert result[0].column(1) == pa.array([1, 2, 3])
216+
217+
184218
def test_read_csv(ctx):
185219
csv_df = ctx.read_csv(path="testing/data/csv/aggregate_test_100.csv")
186220
csv_df.select(column("c1")).show()

src/context.rs

Lines changed: 31 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -29,7 +29,7 @@ use datafusion::arrow::record_batch::RecordBatch;
2929
use datafusion::datasource::datasource::TableProvider;
3030
use datafusion::datasource::MemTable;
3131
use datafusion::execution::context::{SessionConfig, SessionContext};
32-
use datafusion::prelude::{AvroReadOptions, CsvReadOptions, ParquetReadOptions};
32+
use datafusion::prelude::{AvroReadOptions, CsvReadOptions, NdJsonReadOptions, ParquetReadOptions};
3333

3434
use crate::catalog::{PyCatalog, PyTable};
3535
use crate::dataframe::PyDataFrame;
@@ -269,6 +269,36 @@ impl PySessionContext {
269269
Ok(self.ctx.session_id())
270270
}
271271

272+
#[allow(clippy::too_many_arguments)]
273+
#[args(
274+
schema = "None",
275+
schema_infer_max_records = "1000",
276+
file_extension = "\".json\"",
277+
table_partition_cols = "vec![]"
278+
)]
279+
fn read_json(
280+
&mut self,
281+
path: PathBuf,
282+
schema: Option<PyArrowType<Schema>>,
283+
schema_infer_max_records: usize,
284+
file_extension: &str,
285+
table_partition_cols: Vec<String>,
286+
py: Python,
287+
) -> PyResult<PyDataFrame> {
288+
let path = path
289+
.to_str()
290+
.ok_or_else(|| PyValueError::new_err("Unable to convert path to a string"))?;
291+
292+
let mut options = NdJsonReadOptions::default().table_partition_cols(table_partition_cols);
293+
options.schema = schema.map(|s| Arc::new(s.0));
294+
options.schema_infer_max_records = schema_infer_max_records;
295+
options.file_extension = file_extension;
296+
297+
let result = self.ctx.read_json(path, options);
298+
let df = wait_for_future(py, result).map_err(DataFusionError::from)?;
299+
Ok(PyDataFrame::new(df))
300+
}
301+
272302
#[allow(clippy::too_many_arguments)]
273303
#[args(
274304
schema = "None",

0 commit comments

Comments
 (0)
0