8000 Revert "Refactor PyDataFrame: Simplify methods and improve performance" · kosiew/datafusion-python@a5d224f · GitHub
[go: up one dir, main page]

Skip to content

Commit a5d224f

Browse files
committed
Revert "Refactor PyDataFrame: Simplify methods and improve performance"
This reverts commit 0e30af3.
1 parent 0e30af3 commit a5d224f

File tree

2 files changed

+830
-187
lines changed

2 files changed

+830
-187
lines changed

src/context.rs

Lines changed: 224 additions & 55 deletions
Original file line numberDiff line numberDiff line change
@@ -72,83 +72,32 @@ use datafusion_ffi::table_provider::{FFI_TableProvider, ForeignTableProvider};
7272
use pyo3::types::{PyCapsule, PyDict, PyList, PyTuple, PyType};
7373
use tokio::task::JoinHandle;
7474

75-
/// Display configuration for DataFrames
76-
#[pyclass(name = "DisplayConfig", module = "datafusion", subclass)]
77-
#[derive(Clone, Debug)]
78-
pub struct DisplayConfig {
79-
#[pyo3(get, set)]
80-
pub max_width: usize,
81-
#[pyo3(get, set)]
82-
pub max_rows: Option<usize>,
83-
#[pyo3(get, set)]
84-
pub show_nulls: bool,
85-
}
86-
87-
#[pymethods]
88-
impl DisplayConfig {
89-
#[new]
90-
pub fn new(
91-
max_width: Option<usize>,
92-
max_rows: Option<usize>,
93-
show_nulls: Option<bool>,
94-
) -> Self {
95-
Self {
96-
max_width: max_width.unwrap_or(80),
97-
max_rows,
98-
show_nulls: show_nulls.unwrap_or(false),
99-
}
100-
}
101-
}
102-
10375
/// Configuration options for a SessionContext
10476
#[pyclass(name = "SessionConfig", module = "datafusion", subclass)]
10577
#[derive(Clone, Default)]
10678
pub struct PySessionConfig {
10779
pub config: SessionConfig,
108-
pub display_config: DisplayConfig,
10980
}
11081

11182
impl From<SessionConfig> for PySessionConfig {
11283
fn from(config: SessionConfig) -> Self {
113-
Self {
114-
config,
115-
display_config: DisplayConfig::new(Some(80), None, Some(false)),
116-
}
84+
Self { config }
11785
}
11886
}
11987

12088
#[pymethods]
12189
impl PySessionConfig {
122-
#[pyo3(signature = (config_options=None, display_config=None))]
90+
#[pyo3(signature = (config_options=None))]
12391
#[new]
124-
fn new(
125-
config_options: Option<HashMap<String, String>>,
126-
display_config: Option<DisplayConfig>,
127-
) -> Self {
92+
fn new(config_options: Option<HashMap< 9E7A String, String>>) -> Self {
12893
let mut config = SessionConfig::new();
12994
if let Some(hash_map) = config_options {
13095
for (k, v) in &hash_map {
13196
config = config.set(k, &ScalarValue::Utf8(Some(v.clone())));
13297
}
13398
}
13499

135-
Self {
136-
config,
137-
display_config: display_config
138-
.unwrap_or_else(|| DisplayConfig::new(Some(80), None, Some(false))),
139-
}
140-
}
141-
142-
// Get the display configuration
143-
pub fn get_display_config(&self) -> DisplayConfig {
144-
self.display_config.clone()
145-
}
146-
147-
// Set the display configuration
148-
pub fn with_display_config(&self, display_config: DisplayConfig) -> Self {
149-
let mut new_config = self.clone();
150-
new_config.display_config = display_config;
151-
new_config
100+
Self { config }
152101
}
153102

154103
fn with_create_default_catalog_and_schema(&self, enabled: bool) -> Self {
@@ -726,6 +675,226 @@ impl PySessionContext {
726675
)));
727676
}
728677

678+
let mut options = CsvReadOptions::new()
679+
.has_header(has_header)
680+
.delimiter(delimiter[0])
681+
.schema_infer_max_records(schema_infer_max_records)
682+
.file_extension(file_extension)
683+
.file_compression_type(parse_file_compression_type(file_compression_type)?);
684+
options.schema = schema.as_ref().map(|x| &x.0);
685+
686+
if path.is_instance_of::<PyList>() {
687+
let paths = path.extract::<Vec<String>>()?;
688+
let result = self.register_csv_from_multiple_paths(name, paths, options);
689+
wait_for_future(py, result)?;
690+
} else {
691+
let path = path.extract::<String>()?;
692+
let result = self.ctx.register_csv(name, &path, options);
693+
wait_for_future(py, result)?;
694+
}
695+
696+
Ok(())
697+
}
698+
699+
#[allow(clippy::too_many_arguments)]
700+
#[pyo3(signature = (name,
701+
path,
702+
schema=None,
703+
schema_infer_max_records=1000,
704+
file_extension=".json",
705+
table_partition_cols=vec![],
706+
file_compression_type=None))]
707+
pub fn register_json(
708+
&mut self,
709+
name: &str,
710+
path: PathBuf,
711+
schema: Option<PyArrowType<Schema>>,
712+
schema_infer_max_records: usi 10000 ze,
713+
file_extension: &str,
714+
table_partition_cols: Vec<(String, String)>,
715+
file_compression_type: Option<String>,
716+
py: Python,
717+
) -> PyDataFusionResult<()> {
718+
let path = path
719+
.to_str()
720+
.ok_or_else(|| PyValueError::new_err("Unable to convert path to a string"))?;
721+
722+
let mut options = NdJsonReadOptions::default()
723+
.file_compression_type(parse_file_compression_type(file_compression_type)?)
724+
.table_partition_cols(convert_table_partition_cols(table_partition_cols)?);
725+
options.schema_infer_max_records = schema_infer_max_records;
726+
options.file_extension = file_extension;
727+
options.schema = schema.as_ref().map(|x| &x.0);
728+
729+
let result = self.ctx.register_json(name, path, options);
730+
wait_for_future(py, result)?;
731+
732+
Ok(())
733+
}
734+
735+
#[allow(clippy::too_many_arguments)]
736+
#[pyo3(signature = (name,
737+
path,
738+
schema=None,
739+
file_extension=".avro",
740+
table_partition_cols=vec![]))]
741+
pub fn register_avro(
742+
&mut self,
743+
name: &str,
744+
path: PathBuf,
745+
schema: Option<PyArrowType<Schema>>,
746+
file_extension: &str,
747+
table_partition_cols: Vec<(String, String)>,
748+
py: Python,
749+
) -> PyDataFusionResult<()> {
750+
let path = path
751+
.to_str()
752+
.ok_or_else(|| PyValueError::new_err("Unable to convert path to a string"))?;
753+
754+
let mut options = AvroReadOptions::default()
755+
.table_partition_cols(convert_table_partition_cols(table_partition_cols)?);
756+
options.file_extension = file_extension;
757+
options.schema = schema.as_ref().map(|x| &x.0);
758+
759+
let result = self.ctx.register_avro(name, path, options);
760+
wait_for_future(py, result)?;
761+
762+
Ok(())
763+
}
764+
765+
// Registers a PyArrow.Dataset
766+
pub fn register_dataset(
767+
&self,
768+
name: &str,
769+
dataset: &Bound<'_, PyAny>,
770+
py: Python,
771+
) -> PyDataFusionResult<()> {
772+
let table: Arc<dyn TableProvider> = Arc::new(Dataset::new(dataset, py)?);
773+
774+
self.ctx.register_table(name, table)?;
775+
776+
Ok(())
777+
}
778+
779+
pub fn register_udf(&mut self, udf: PyScalarUDF) -> PyResult<()> {
780+
self.ctx.register_udf(udf.function);
781+
Ok(())
782+
}
783+
784+
pub fn register_udaf(&mut self, udaf: PyAggregateUDF) -> PyResult<()> {
785+
self.ctx.register_udaf(udaf.function);
786+
Ok(())
787+
}
788+
789+
pub fn register_udwf(&mut self, udwf: PyWindowUDF) -> PyResult<()> {
790+
self.ctx.register_udwf(udwf.function);
791+
Ok(())
792+
}
793+
794+
#[pyo3(signature = (name="datafusion"))]
795+
pub fn catalog(&self, name: &str) -> PyResult<PyCatalog> {
796+
match self.ctx.catalog(name) {
797+
Some(catalog) => Ok(PyCatalog::new(catalog)),
798+
None => Err(PyKeyError::new_err(format!(
799+
"Catalog with name {} doesn't exist.",
800+
&name,
801+
))),
802+
}
803+
}
804+
805+
pub fn tables(&self) -> HashSet<String> {
806+
self.ctx
807+
.catalog_names()
808+
.into_iter()
809+
.filter_map(|name| self.ctx.catalog(&name))
810+
.flat_map(move |catalog| {
811+
catalog
812+
.schema_names()
813+
.into_iter()
814+
.filter_map(move |name| catalog.schema(&name))
815+
})
816+
.flat_map(|schema| schema.table_names())
817+
.collect()
818+
}
819+
820+
pub fn table(&self, name: &str, py: Python) -> PyResult<PyDataFrame> {
821+
let x = wait_for_future(py, self.ctx.table(name))
822+
.map_err(|e| PyKeyError::new_err(e.to_string()))?;
823+
Ok(PyDataFrame::new(x))
824+
}
825+
826+
pub fn table_exist(&self, name: &str) -> PyDataFusionResult<bool> {
827+
Ok(self.ctx.table_exist(name)?)
828+
}
829+
830+
pub fn empty_table(&self) -> PyDataFusionResult<PyDataFrame> {
831+
Ok(PyDataFrame::new(self.ctx.read_empty()?))
832+
}
833+
834+
pub fn session_id(&self) -> String {
835+
self.ctx.session_id()
836+
}
837+
838+
#[allow(clippy::too_many_arguments)]
839+
#[pyo3(signature = (path, schema=None, schema_infer_max_records=1000, file_extension=".json", table_partition_cols=vec![], file_compression_type=None))]
840+
pub fn read_json(
841+
&mut self,
842+
path: PathBuf,
843+
schema: Option<PyArrowType<Schema>>,
844+
schema_infer_max_records: usize,
845+
file_extension: &str,
846+
table_partition_cols: Vec<(String, String)>,
847+
file_compression_type: Option<String>,
848+
py: Python,
849+
) -> PyDataFusionResult<PyDataFrame> {
850+
let path = path
851+
.to_str()
852+
.ok_or_else(|| PyValueError::new_err("Unable to convert path to a string"))?;
853+
let mut options = NdJsonReadOptions::default()
854+
.table_partition_cols(convert_table_partition_cols(table_partition_cols)?)
855+
.file_compression_type(parse_file_compression_type(file_compression_type)?);
856+
options.schema_infer_max_records = schema_infer_max_records;
857+
options.file_extension = file_extension;
858+
let df = if let Some(schema) = schema {
859+
options.schema = Some(&schema.0);
860+
let result = self.ctx.read_json(path, options);
861+
wait_for_future(py, result)?
862+
} else {
863+
let result = self.ctx.read_json(path, options);
864+
wait_for_future(py, result)?
865+
};
866+
Ok(PyDataFrame::new(df))
867+
}
868+
869+
#[allow(clippy::too_many_arguments)]
870+
#[pyo3(signature = (
871+
path,
872+
schema=None,
873+
has_header=true,
874+
delimiter=",",
875+
schema_infer_max_records=1000,
876+
file_extension=".csv",
877+
table_partition_cols=vec![],
878+
file_compression_type=None))]
879+
pub fn read_csv(
880+
&self,
881+
path: &Bound<'_, PyAny>,
882+
schema: Option<PyArrowType<Schema>>,
883+
has_header: bool,
884+
delimiter: &str,
885+
schema_infer_max_records: usize,
886+
file_extension: &str,
887+
table_partition_cols: Vec<(String, String)>,
888+
file_compression_type: Option<String>,
889+
py: Python,
890+
) -> PyDataFusionResult<PyDataFrame> {
891+
let delimiter = delimiter.as_bytes();
892+
if delimiter.len() != 1 {
893+
return Err(crate::errors::PyDataFusionError::PythonError(py_value_err(
894+
"Delimiter must be a single character",
895+
)));
896+
};
897+
729898
let mut options = CsvReadOptions::new()
730899
.has_header(has_header)
731900
.delimiter(delimiter[0])

0 commit comments

Comments
 (0)
0