@@ -72,83 +72,32 @@ use datafusion_ffi::table_provider::{FFI_TableProvider, ForeignTableProvider};
72
72
use pyo3:: types:: { PyCapsule , PyDict , PyList , PyTuple , PyType } ;
73
73
use tokio:: task:: JoinHandle ;
74
74
75
- /// Display configuration for DataFrames
76
- #[ pyclass( name = "DisplayConfig" , module = "datafusion" , subclass) ]
77
- #[ derive( Clone , Debug ) ]
78
- pub struct DisplayConfig {
79
- #[ pyo3( get, set) ]
80
- pub max_width : usize ,
81
- #[ pyo3( get, set) ]
82
- pub max_rows : Option < usize > ,
83
- #[ pyo3( get, set) ]
84
- pub show_nulls : bool ,
85
- }
86
-
87
- #[ pymethods]
88
- impl DisplayConfig {
89
- #[ new]
90
- pub fn new (
91
- max_width : Option < usize > ,
92
- max_rows : Option < usize > ,
93
- show_nulls : Option < bool > ,
94
- ) -> Self {
95
- Self {
96
- max_width : max_width. unwrap_or ( 80 ) ,
97
- max_rows,
98
- show_nulls : show_nulls. unwrap_or ( false ) ,
99
- }
100
- }
101
- }
102
-
103
75
/// Configuration options for a SessionContext
104
76
#[ pyclass( name = "SessionConfig" , module = "datafusion" , subclass) ]
105
77
#[ derive( Clone , Default ) ]
106
78
pub struct PySessionConfig {
107
79
pub config : SessionConfig ,
108
- pub display_config : DisplayConfig ,
109
80
}
110
81
111
82
impl From < SessionConfig > for PySessionConfig {
112
83
fn from ( config : SessionConfig ) -> Self {
113
- Self {
114
- config,
115
- display_config : DisplayConfig :: new ( Some ( 80 ) , None , Some ( false ) ) ,
116
- }
84
+ Self { config }
117
85
}
118
86
}
119
87
120
88
#[ pymethods]
121
89
impl PySessionConfig {
122
- #[ pyo3( signature = ( config_options=None , display_config= None ) ) ]
90
+ #[ pyo3( signature = ( config_options=None ) ) ]
123
91
#[ new]
124
- fn new (
125
- config_options : Option < HashMap < String , String > > ,
126
- display_config : Option < DisplayConfig > ,
127
- ) -> Self {
92
+ fn new ( config_options : Option < HashMap <
9E7A
String , String > > ) -> Self {
128
93
let mut config = SessionConfig :: new ( ) ;
129
94
if let Some ( hash_map) = config_options {
130
95
for ( k, v) in & hash_map {
131
96
config = config. set ( k, & ScalarValue :: Utf8 ( Some ( v. clone ( ) ) ) ) ;
132
97
}
133
98
}
134
99
135
- Self {
136
- config,
137
- display_config : display_config
138
- . unwrap_or_else ( || DisplayConfig :: new ( Some ( 80 ) , None , Some ( false ) ) ) ,
139
- }
140
- }
141
-
142
- // Get the display configuration
143
- pub fn get_display_config ( & self ) -> DisplayConfig {
144
- self . display_config . clone ( )
145
- }
146
-
147
- // Set the display configuration
148
- pub fn with_display_config ( & self , display_config : DisplayConfig ) -> Self {
149
- let mut new_config = self . clone ( ) ;
150
- new_config. display_config = display_config;
151
- new_config
100
+ Self { config }
152
101
}
153
102
154
103
fn with_create_default_catalog_and_schema ( & self , enabled : bool ) -> Self {
@@ -726,6 +675,226 @@ impl PySessionContext {
726
675
) ) ) ;
727
676
}
728
677
678
+ let mut options = CsvReadOptions :: new ( )
679
+ . has_header ( has_header)
680
+ . delimiter ( delimiter[ 0 ] )
681
+ . schema_infer_max_records ( schema_infer_max_records)
682
+ . file_extension ( file_extension)
683
+ . file_compression_type ( parse_file_compression_type ( file_compression_type) ?) ;
684
+ options. schema = schema. as_ref ( ) . map ( |x| & x. 0 ) ;
685
+
686
+ if path. is_instance_of :: < PyList > ( ) {
687
+ let paths = path. extract :: < Vec < String > > ( ) ?;
688
+ let result = self . register_csv_from_multiple_paths ( name, paths, options) ;
689
+ wait_for_future ( py, result) ?;
690
+ } else {
691
+ let path = path. extract :: < String > ( ) ?;
692
+ let result = self . ctx . register_csv ( name, & path, options) ;
693
+ wait_for_future ( py, result) ?;
694
+ }
695
+
696
+ Ok ( ( ) )
697
+ }
698
+
699
+ #[ allow( clippy:: too_many_arguments) ]
700
+ #[ pyo3( signature = ( name,
701
+ path,
702
+ schema=None ,
703
+ schema_infer_max_records=1000 ,
704
+ file_extension=".json" ,
705
+ table_partition_cols=vec![ ] ,
706
+ file_compression_type=None ) ) ]
707
+ pub fn register_json (
708
+ & mut self ,
709
+ name : & str ,
710
+ path : PathBuf ,
711
+ schema : Option < PyArrowType < Schema > > ,
712
+ schema_infer_max_records : usi
10000
ze ,
713
+ file_extension : & str ,
714
+ table_partition_cols : Vec < ( String , String ) > ,
715
+ file_compression_type : Option < String > ,
716
+ py : Python ,
717
+ ) -> PyDataFusionResult < ( ) > {
718
+ let path = path
719
+ . to_str ( )
720
+ . ok_or_else ( || PyValueError :: new_err ( "Unable to convert path to a string" ) ) ?;
721
+
722
+ let mut options = NdJsonReadOptions :: default ( )
723
+ . file_compression_type ( parse_file_compression_type ( file_compression_type) ?)
724
+ . table_partition_cols ( convert_table_partition_cols ( table_partition_cols) ?) ;
725
+ options. schema_infer_max_records = schema_infer_max_records;
726
+ options. file_extension = file_extension;
727
+ options. schema = schema. as_ref ( ) . map ( |x| & x. 0 ) ;
728
+
729
+ let result = self . ctx . register_json ( name, path, options) ;
730
+ wait_for_future ( py, result) ?;
731
+
732
+ Ok ( ( ) )
733
+ }
734
+
735
+ #[ allow( clippy:: too_many_arguments) ]
736
+ #[ pyo3( signature = ( name,
737
+ path,
738
+ schema=None ,
739
+ file_extension=".avro" ,
740
+ table_partition_cols=vec![ ] ) ) ]
741
+ pub fn register_avro (
742
+ & mut self ,
743
+ name : & str ,
744
+ path : PathBuf ,
745
+ schema : Option < PyArrowType < Schema > > ,
746
+ file_extension : & str ,
747
+ table_partition_cols : Vec < ( String , String ) > ,
748
+ py : Python ,
749
+ ) -> PyDataFusionResult < ( ) > {
750
+ let path = path
751
+ . to_str ( )
752
+ . ok_or_else ( || PyValueError :: new_err ( "Unable to convert path to a string" ) ) ?;
753
+
754
+ let mut options = AvroReadOptions :: default ( )
755
+ . table_partition_cols ( convert_table_partition_cols ( table_partition_cols) ?) ;
756
+ options. file_extension = file_extension;
757
+ options. schema = schema. as_ref ( ) . map ( |x| & x. 0 ) ;
758
+
759
+ let result = self . ctx . register_avro ( name, path, options) ;
760
+ wait_for_future ( py, result) ?;
761
+
762
+ Ok ( ( ) )
763
+ }
764
+
765
+ // Registers a PyArrow.Dataset
766
+ pub fn register_dataset (
767
+ & self ,
768
+ name : & str ,
769
+ dataset : & Bound < ' _ , PyAny > ,
770
+ py : Python ,
771
+ ) -> PyDataFusionResult < ( ) > {
772
+ let table: Arc < dyn TableProvider > = Arc :: new ( Dataset :: new ( dataset, py) ?) ;
773
+
774
+ self . ctx . register_table ( name, table) ?;
775
+
776
+ Ok ( ( ) )
777
+ }
778
+
779
+ pub fn register_udf ( & mut self , udf : PyScalarUDF ) -> PyResult < ( ) > {
780
+ self . ctx . register_udf ( udf. function ) ;
781
+ Ok ( ( ) )
782
+ }
783
+
784
+ pub fn register_udaf ( & mut self , udaf : PyAggregateUDF ) -> PyResult < ( ) > {
785
+ self . ctx . register_udaf ( udaf. function ) ;
786
+ Ok ( ( ) )
787
+ }
788
+
789
+ pub fn register_udwf ( & mut self , udwf : PyWindowUDF ) -> PyResult < ( ) > {
790
+ self . ctx . register_udwf ( udwf. function ) ;
791
+ Ok ( ( ) )
792
+ }
793
+
794
+ #[ pyo3( signature = ( name="datafusion" ) ) ]
795
+ pub fn catalog ( & self , name : & str ) -> PyResult < PyCatalog > {
796
+ match self . ctx . catalog ( name) {
797
+ Some ( catalog) => Ok ( PyCatalog :: new ( catalog) ) ,
798
+ None => Err ( PyKeyError :: new_err ( format ! (
799
+ "Catalog with name {} doesn't exist." ,
800
+ & name,
801
+ ) ) ) ,
802
+ }
803
+ }
804
+
805
+ pub fn tables ( & self ) -> HashSet < String > {
806
+ self . ctx
807
+ . catalog_names ( )
808
+ . into_iter ( )
809
+ . filter_map ( |name| self . ctx . catalog ( & name) )
810
+ . flat_map ( move |catalog| {
811
+ catalog
812
+ . schema_names ( )
813
+ . into_iter ( )
814
+ . filter_map ( move |name| catalog. schema ( & name) )
815
+ } )
816
+ . flat_map ( |schema| schema. table_names ( ) )
817
+ . collect ( )
818
+ }
819
+
820
+ pub fn table ( & self , name : & str , py : Python ) -> PyResult < PyDataFrame > {
821
+ let x = wait_for_future ( py, self . ctx . table ( name) )
822
+ . map_err ( |e| PyKeyError :: new_err ( e. to_string ( ) ) ) ?;
823
+ Ok ( PyDataFrame :: new ( x) )
824
+ }
825
+
826
+ pub fn table_exist ( & self , name : & str ) -> PyDataFusionResult < bool > {
827
+ Ok ( self . ctx . table_exist ( name) ?)
828
+ }
829
+
830
+ pub fn empty_table ( & self ) -> PyDataFusionResult < PyDataFrame > {
831
+ Ok ( PyDataFrame :: new ( self . ctx . read_empty ( ) ?) )
832
+ }
833
+
834
+ pub fn session_id ( & self ) -> String {
835
+ self . ctx . session_id ( )
836
+ }
837
+
838
+ #[ allow( clippy:: too_many_arguments) ]
839
+ #[ pyo3( signature = ( path, schema=None , schema_infer_max_records=1000 , file_extension=".json" , table_partition_cols=vec![ ] , file_compression_type=None ) ) ]
840
+ pub fn read_json (
841
+ & mut self ,
842
+ path : PathBuf ,
843
+ schema : Option < PyArrowType < Schema > > ,
844
+ schema_infer_max_records : usize ,
845
+ file_extension : & str ,
846
+ table_partition_cols : Vec < ( String , String ) > ,
847
+ file_compression_type : Option < String > ,
848
+ py : Python ,
849
+ ) -> PyDataFusionResult < PyDataFrame > {
850
+ let path = path
851
+ . to_str ( )
852
+ . ok_or_else ( || PyValueError :: new_err ( "Unable to convert path to a string" ) ) ?;
853
+ let mut options = NdJsonReadOptions :: default ( )
854
+ . table_partition_cols ( convert_table_partition_cols ( table_partition_cols) ?)
855
+ . file_compression_type ( parse_file_compression_type ( file_compression_type) ?) ;
856
+ options. schema_infer_max_records = schema_infer_max_records;
857
+ options. file_extension = file_extension;
858
+ let df = if let Some ( schema) = schema {
859
+ options. schema = Some ( & schema. 0 ) ;
860
+ let result = self . ctx . read_json ( path, options) ;
861
+ wait_for_future ( py, result) ?
862
+ } else {
863
+ let result = self . ctx . read_json ( path, options) ;
864
+ wait_for_future ( py, result) ?
865
+ } ;
866
+ Ok ( PyDataFrame :: new ( df) )
867
+ }
868
+
869
+ #[ allow( clippy:: too_many_arguments) ]
870
+ #[ pyo3( signature = (
871
+ path,
872
+ schema=None ,
873
+ has_header=true ,
874
+ delimiter="," ,
875
+ schema_infer_max_records=1000 ,
876
+ file_extension=".csv" ,
877
+ table_partition_cols=vec![ ] ,
878
+ file_compression_type=None ) ) ]
879
+ pub fn read_csv (
880
+ & self ,
881
+ path : & Bound < ' _ , PyAny > ,
882
+ schema : Option < PyArrowType < Schema > > ,
883
+ has_header : bool ,
884
+ delimiter : & str ,
885
+ schema_infer_max_records : usize ,
886
+ file_extension : & str ,
887
+ table_partition_cols : Vec < ( String , String ) > ,
888
+ file_compression_type : Option < String > ,
889
+ py : Python ,
890
+ ) -> PyDataFusionResult < PyDataFrame > {
891
+ let delimiter = delimiter. as_bytes ( ) ;
892
+ if delimiter. len ( ) != 1 {
893
+ return Err ( crate :: errors:: PyDataFusionError :: PythonError ( py_value_err (
894
+ "Delimiter must be a single character" ,
895
+ ) ) ) ;
896
+ } ;
897
+
729
898
let mut options = CsvReadOptions :: new ( )
730
899
. has_header ( has_header)
731
900
. delimiter ( delimiter[ 0 ] )
0 commit comments