@@ -20,10 +20,10 @@ use std::sync::Arc;
20
20
use datafusion:: arrow:: datatypes:: Schema ;
21
21
use datafusion:: arrow:: pyarrow:: { PyArrowType , ToPyArrow } ;
22
22
use datafusion:: arrow:: util:: pretty;
23
+ use datafusion:: config:: { ParquetOptions , TableParquetOptions } ;
23
24
use datafusion:: dataframe:: { DataFrame , DataFrameWriteOptions } ;
24
25
use datafusion:: execution:: SendableRecordBatchStream ;
25
26
use datafusion:: parquet:: basic:: { BrotliLevel , Compression , GzipLevel , ZstdLevel } ;
26
- use datafusion:: parquet:: file:: properties:: WriterProperties ;
27
27
use datafusion:: prelude:: * ;
28
28
use datafusion_common:: UnnestOptions ;
29
29
use pyo3:: exceptions:: { PyTypeError , PyValueError } ;
@@ -350,7 +350,7 @@ impl PyDataFrame {
350
350
cl. ok_or ( PyValueError :: new_err ( "compression_level is not defined" ) )
351
351
}
352
352
353
- let compression_type = match compression. to_lowercase ( ) . as_str ( ) {
353
+ let _validated = match compression. to_lowercase ( ) . as_str ( ) {
354
354
"snappy" => Compression :: SNAPPY ,
355
355
"gzip" => Compression :: GZIP (
356
356
GzipLevel :: try_new ( compression_level. unwrap_or ( 6 ) )
@@ -375,16 +375,20 @@ impl PyDataFrame {
375
375
}
376
376
} ;
377
377
378
- let writer_properties = WriterProperties :: builder ( )
379
- . set_compression ( compression_type)
380
- . build ( ) ;
378
+ let mut compression_string = compression. to_string ( ) ;
379
+ if let Some ( level) = compression_level {
380
+ compression_string. push_str ( & format ! ( "({level})" ) ) ;
381
+ }
382
+
383
+ let mut options = TableParquetOptions :: default ( ) ;
384
+ options. global . compression = Some ( compression_string) ;
381
385
382
386
wait_for_future (
383
387
py,
384
388
self . df . as_ref ( ) . clone ( ) . write_parquet (
385
389
path,
386
390
DataFrameWriteOptions :: new ( ) ,
387
- Option :: from ( writer_properties ) ,
391
+ Option :: from ( options ) ,
388
392
) ,
389
393
) ?;
390
394
Ok ( ( ) )
@@ -397,7 +401,7 @@ impl PyDataFrame {
397
401
self . df
398
402
. as_ref ( )
399
403
. clone ( )
400
- . write_json ( path, DataFrameWriteOptions :: new ( ) ) ,
404
+ . write_json ( path, DataFrameWriteOptions :: new ( ) , None ) ,
401
405
) ?;
402
406
Ok ( ( ) )
403
407
}
0 commit comments