8000 feat: upgrade dataframe write_parquet and write_json · Michael-J-Ward/datafusion-python@f9895a9 · GitHub
[go: up one dir, main page]

Skip to content

Commit f9895a9

Browse files
feat: upgrade dataframe write_parquet and write_json
The options to write_parquet changed. write_json has a new argument that I defaulted to None. We can expose that config later. Ref: apache/datafusion#9382
1 parent ccb2f99 commit f9895a9

File tree

1 file changed

+11
-7
lines changed

1 file changed

+11
-7
lines changed

src/dataframe.rs

Lines changed: 11 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -20,10 +20,10 @@ use std::sync::Arc;
2020
use datafusion::arrow::datatypes::Schema;
2121
use datafusion::arrow::pyarrow::{PyArrowType, ToPyArrow};
2222
use datafusion::arrow::util::pretty;
23+
use datafusion::config::{ParquetOptions, TableParquetOptions};
2324
use datafusion::dataframe::{DataFrame, DataFrameWriteOptions};
2425
use datafusion::execution::SendableRecordBatchStream;
2526
use datafusion::parquet::basic::{BrotliLevel, Compression, GzipLevel, ZstdLevel};
26-
use datafusion::parquet::file::properties::WriterProperties;
2727
use datafusion::prelude::*;
2828
use datafusion_common::UnnestOptions;
2929
use pyo3::exceptions::{PyTypeError, PyValueError};
@@ -350,7 +350,7 @@ impl PyDataFrame {
350350
cl.ok_or(PyValueError::new_err("compression_level is not defined"))
351351
}
352352

353-
let compression_type = match compression.to_lowercase().as_str() {
353+
let _validated = match compression.to_lowercase().as_str() {
354354
"snappy" => Compression::SNAPPY,
355355
"gzip" => Compression::GZIP(
356356
GzipLevel::try_new(compression_level.unwrap_or(6))
@@ -375,16 +375,20 @@ impl PyDataFrame {
375375
}
376376
};
377377

378-
let writer_properties = WriterProperties::builder()
379-
.set_compression(compression_type)
380-
.build();
378+
let mut compression_string = compression.to_string();
379+
if let Some(level) = compression_level {
380+
compression_string.push_str(&format!("({level})"));
381+
}
382+
383+
let mut options = TableParquetOptions::default();
384+
options.global.compression = Some(compression_string);
381385

382386
wait_for_future(
383387
py,
384388
self.df.as_ref().clone().write_parquet(
385389
path,
386390
DataFrameWriteOptions::new(),
387-
Option::from(writer_properties),
391+
Option::from(options),
388392
),
389393
)?;
390394
Ok(())
@@ -397,7 +401,7 @@ impl PyDataFrame {
397401
self.df
398402
.as_ref()
399403
.clone()
400-
.write_json(path, DataFrameWriteOptions::new()),
404+
.write_json(path, DataFrameWriteOptions::new(), None),
401405
)?;
402406
Ok(())
403407
}

0 commit comments

Comments
 (0)
0