From afd28a60e3e1c2affddd2993e6c332b4b409114d Mon Sep 17 00:00:00 2001 From: Chongchen Chen Date: Wed, 26 Mar 2025 22:12:36 +0800 Subject: [PATCH 01/11] add expr --- src/expr.rs | 43 ++++ src/expr/constraints.rs | 29 +++ src/expr/copy_to.rs | 132 +++++++++++ src/expr/create_catalog.rs | 94 ++++++++ src/expr/create_catalog_schema.rs | 96 ++++++++ src/expr/create_external_table.rs | 186 +++++++++++++++ src/expr/create_function.rs | 177 ++++++++++++++ src/expr/create_index.rs | 130 ++++++++++ src/expr/describe_table.rs | 71 ++++++ src/expr/dml.rs | 119 ++++++++++ src/expr/drop_catalog_schema.rs | 89 +++++++ src/expr/drop_function.rs | 75 ++++++ src/expr/drop_view.rs | 72 ++++++ src/expr/recursive_query.rs | 88 +++++++ src/expr/statement.rs | 378 ++++++++++++++++++++++++++++++ src/expr/values.rs | 58 +++++ src/sql/logical.rs | 61 ++++- 17 files changed, 1885 insertions(+), 13 deletions(-) create mode 100644 src/expr/constraints.rs create mode 100644 src/expr/copy_to.rs create mode 100644 src/expr/create_catalog.rs create mode 100644 src/expr/create_catalog_schema.rs create mode 100644 src/expr/create_external_table.rs create mode 100644 src/expr/create_function.rs create mode 100644 src/expr/create_index.rs create mode 100644 src/expr/describe_table.rs create mode 100644 src/expr/dml.rs create mode 100644 src/expr/drop_catalog_schema.rs create mode 100644 src/expr/drop_function.rs create mode 100644 src/expr/drop_view.rs create mode 100644 src/expr/recursive_query.rs create mode 100644 src/expr/statement.rs create mode 100644 src/expr/values.rs diff --git a/src/expr.rs b/src/expr.rs index 1e9983d42..542a83fe8 100644 --- a/src/expr.rs +++ b/src/expr.rs @@ -96,6 +96,21 @@ pub mod union; pub mod unnest; pub mod unnest_expr; pub mod window; +pub mod statement; +pub mod values; +pub mod dml; +pub mod create_external_table; +pub mod copy_to; +pub mod create_catalog_schema; +pub mod drop_view; +pub mod create_catalog; +pub mod drop_catalog_schema; +pub mod drop_function; +pub mod create_function; +pub mod create_index; +pub mod describe_table; +pub mod recursive_query; +pub mod constraints; use sort_expr::{to_sort_expressions, PySortExpr}; @@ -784,5 +799,33 @@ pub(crate) fn init_module(m: &Bound<'_, PyModule>) -> PyResult<()> { m.add_class::()?; m.add_class::()?; m.add_class::()?; + m.add_class::()?; + m.add_class::()?; + m.add_class::()?; + m.add_class::()?; + m.add_class::()?; + m.add_class::()?; + m.add_class::()?; + m.add_class::()?; + m.add_class::()?; + m.add_class::()?; + m.add_class::()?; + m.add_class::()?; + m.add_class::()?; + m.add_class::()?; + m.add_class::()?; + m.add_class::()?; + + m.add_class::()?; + m.add_class::()?; + m.add_class::()?; + m.add_class::()?; + m.add_class::()?; + m.add_class::()?; + m.add_class::()?; + m.add_class::()?; + m.add_class::()?; + m.add_class::()?; + Ok(()) } diff --git a/src/expr/constraints.rs b/src/expr/constraints.rs new file mode 100644 index 000000000..3f02ddbd0 --- /dev/null +++ b/src/expr/constraints.rs @@ -0,0 +1,29 @@ +use std::fmt::{self, Display, Formatter}; + +use datafusion::common::Constraints; +use pyo3::prelude::*; + +#[pyclass(name = "Constraints", module = "datafusion.expr", subclass)] +#[derive(Clone)] +pub struct PyConstraints { + pub constraints: Constraints, +} + +impl From for Constraints { + fn from(constraints: PyConstraints) -> Self { + constraints.constraints + } +} + +impl From for PyConstraints { + fn from(constraints: Constraints) -> Self { + PyConstraints { constraints } + } +} + +impl Display for PyConstraints { + fn fmt(&self, f: &mut Formatter) -> fmt::Result { + write!(f, "Constraints: {:?}", self.constraints) + } +} + diff --git a/src/expr/copy_to.rs b/src/expr/copy_to.rs new file mode 100644 index 000000000..ba8e89957 --- /dev/null +++ b/src/expr/copy_to.rs @@ -0,0 +1,132 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +use std::{collections::HashMap, fmt::{self, Display, Formatter}, sync::Arc}; + +use datafusion::{common::file_options::file_type::FileType, logical_expr::dml::CopyTo}; +use pyo3::prelude::*; + +use crate::sql::logical::PyLogicalPlan; + +use super::logical_node::LogicalNode; + +#[pyclass(name = "CopyTo", module = "datafusion.expr", subclass)] +#[derive(Clone)] +pub struct PyCopyTo { + copy: CopyTo, +} + +impl From for CopyTo { + fn from(copy: PyCopyTo) -> Self { + copy.copy + } +} + +impl From for PyCopyTo { + fn from(copy: CopyTo) -> PyCopyTo { + PyCopyTo { copy } + } +} + +impl Display for PyCopyTo { + fn fmt(&self, f: &mut Formatter) -> fmt::Result { + write!(f, "CopyTo: {:?}", self.copy.output_url) + } +} + +impl LogicalNode for PyCopyTo { + + fn inputs(&self) -> Vec { + vec![PyLogicalPlan::from((*self.copy.input).clone())] + } + + fn to_variant(&self, py: Python) -> PyResult { + Ok(self.clone().into_py(py)) + } +} + +#[pymethods] +impl PyCopyTo { + #[new] + pub fn new( + input: PyLogicalPlan, + output_url: String, + partition_by: Vec, + file_type: PyFileType, + options: HashMap) -> Self { + return PyCopyTo { + copy: CopyTo { + input: input.plan(), + output_url, + partition_by, + file_type: file_type.file_type, + options, + }, + } + } + + fn input(&self) -> PyLogicalPlan { + PyLogicalPlan::from((*self.copy.input).clone()) + } + + fn output_url(&self) -> String { + self.copy.output_url.clone() + } + + fn partition_by(&self) -> Vec { + self.copy.partition_by.clone() + } + + fn file_type(&self) -> PyFileType { + PyFileType { file_type: self.copy.file_type.clone() } + } + + fn options(&self) -> HashMap { + self.copy.options.clone() + } + + fn __repr__(&self) -> PyResult { + Ok(format!("CopyTo({})", self)) + } + + fn __name__(&self) -> PyResult { + Ok("CopyTo".to_string()) + } +} + +#[pyclass(name = "FileType", module = "datafusion.expr", subclass)] +#[derive(Clone)] +pub struct PyFileType { + file_type: Arc, +} + +impl Display for PyFileType { + fn fmt(&self, f: &mut Formatter) -> fmt::Result { + write!(f, "FileType: {}", self.file_type) + } +} + +#[pymethods] +impl PyFileType { + fn __repr__(&self) -> PyResult { + Ok(format!("FileType({})", self)) + } + + fn __name__(&self) -> PyResult { + Ok("FileType".to_string()) + } +} diff --git a/src/expr/create_catalog.rs b/src/expr/create_catalog.rs new file mode 100644 index 000000000..47385af54 --- /dev/null +++ b/src/expr/create_catalog.rs @@ -0,0 +1,94 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +use std::{fmt::{self, Display, Formatter}, sync::Arc}; + +use datafusion::logical_expr::CreateCatalog; +use pyo3::prelude::*; + +use crate::{common::df_schema::PyDFSchema, sql::logical::PyLogicalPlan}; + +use super::logical_node::LogicalNode; + +#[pyclass(name = "CreateCatalog", module = "datafusion.expr", subclass)] +#[derive(Clone)] +pub struct PyCreateCatalog { + create: CreateCatalog, +} + +impl From for CreateCatalog { + fn from(create: PyCreateCatalog) -> Self { + create.create + } +} + +impl From for PyCreateCatalog { + fn from(create: CreateCatalog) -> PyCreateCatalog { + PyCreateCatalog { create } + } +} + +impl Display for PyCreateCatalog { + fn fmt(&self, f: &mut Formatter) -> fmt::Result { + write!(f, "CreateCatalog: {:?}", self.create.catalog_name) + } +} + +#[pymethods] +impl PyCreateCatalog { + + #[new] + pub fn new(catalog_name: String, if_not_exists: bool, schema: PyDFSchema) -> PyResult { + Ok(PyCreateCatalog { + create: CreateCatalog { + catalog_name, + if_not_exists, + schema: Arc::new(schema.into()), + } + }) + } + + pub fn catalog_name(&self) -> String { + self.create.catalog_name.clone() + } + + pub fn if_not_exists(&self) -> bool { + self.create.if_not_exists + } + + pub fn schema(&self) -> PyDFSchema { + (*self.create.schema).clone().into() + } + + fn __repr__(&self) -> PyResult { + Ok(format!("CreateCatalog({})", self)) + } + + fn __name__(&self) -> PyResult { + Ok("CreateCatalog".to_string()) + } +} + +impl LogicalNode for PyCreateCatalog { + fn inputs(&self) -> Vec { + vec![] + } + + fn to_variant(&self, py: Python) -> PyResult { + Ok(self.clone().into_py(py)) + } +} diff --git a/src/expr/create_catalog_schema.rs b/src/expr/create_catalog_schema.rs new file mode 100644 index 000000000..f256a6e04 --- /dev/null +++ b/src/expr/create_catalog_schema.rs @@ -0,0 +1,96 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +use std::{fmt::{self, Display, Formatter}, sync::Arc}; + +use datafusion::logical_expr::CreateCatalogSchema; +use pyo3::prelude::*; + +use crate::{common::df_schema::PyDFSchema, sql::logical::PyLogicalPlan}; + +use super::logical_node::LogicalNode; + +#[pyclass(name = "CreateCatalogSchema", module = "datafusion.expr", subclass)] +#[derive(Clone)] +pub struct PyCreateCatalogSchema { + create: CreateCatalogSchema, +} + + +impl From for CreateCatalogSchema { + fn from(create: PyCreateCatalogSchema) -> Self { + create.create + } +} + +impl From for PyCreateCatalogSchema { + fn from(create: CreateCatalogSchema) -> PyCreateCatalogSchema { + PyCreateCatalogSchema { create } + } +} + +impl Display for PyCreateCatalogSchema { + fn fmt(&self, f: &mut Formatter) -> fmt::Result { + write!(f, "CreateCatalogSchema: {:?}", self.create.schema_name) + } +} + + +#[pymethods] +impl PyCreateCatalogSchema { + + #[new] + pub fn new(schema_name: String, if_not_exists: bool, schema: PyDFSchema) -> PyResult { + Ok(PyCreateCatalogSchema { + create: CreateCatalogSchema { + schema_name, + if_not_exists, + schema: Arc::new(schema.into()), + } + }) + } + + pub fn schema_name(&self) -> String { + self.create.schema_name.clone() + } + + pub fn if_not_exists(&self) -> bool { + self.create.if_not_exists + } + + pub fn schema(&self) -> PyDFSchema { + (*self.create.schema).clone().into() + } + + fn __repr__(&self) -> PyResult { + Ok(format!("CreateCatalogSchema({})", self)) + } + + fn __name__(&self) -> PyResult { + Ok("CreateCatalogSchema".to_string()) + } +} + +impl LogicalNode for PyCreateCatalogSchema { + fn inputs(&self) -> Vec { + vec![] + } + + fn to_variant(&self, py: Python) -> PyResult { + Ok(self.clone().into_py(py)) + } +} diff --git a/src/expr/create_external_table.rs b/src/expr/create_external_table.rs new file mode 100644 index 000000000..0c4be1167 --- /dev/null +++ b/src/expr/create_external_table.rs @@ -0,0 +1,186 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +use crate::{expr::PyExpr, sql::logical::PyLogicalPlan}; +use std::{collections::HashMap, fmt::{self, Display, Formatter}, sync::Arc}; + +use datafusion::logical_expr::CreateExternalTable; +use pyo3::prelude::*; + +use crate::common::df_schema::PyDFSchema; + +use super::{constraints::PyConstraints, logical_node::LogicalNode, sort_expr::PySortExpr}; + +#[pyclass(name = "CreateExternalTable", module = "datafusion.expr", subclass)] +#[derive(Clone)] +pub struct PyCreateExternalTable { + create: CreateExternalTable, +} + +impl From for CreateExternalTable { + fn from(create: PyCreateExternalTable) -> Self { + create.create + } +} + +impl From for PyCreateExternalTable { + fn from(create: CreateExternalTable) -> PyCreateExternalTable { + PyCreateExternalTable { create } + } +} + +impl Display for PyCreateExternalTable { + fn fmt(&self, f: &mut Formatter) -> fmt::Result { + write!(f, "CreateExternalTable: {:?}{}", self.create.name, self.create.constraints) + } +} + +#[pymethods] +impl PyCreateExternalTable { + #[new] + #[pyo3(signature = (schema, name, location, file_type, table_partition_cols, if_not_exists, temporary, order_exprs, unbounded, options, constraints, column_defaults, definition=None))] + pub fn new( + schema: PyDFSchema, + name: String, + location: String, + file_type: String, + table_partition_cols: Vec, + if_not_exists: bool, + temporary: bool, + order_exprs: Vec>, + unbounded: bool, + options: HashMap, + constraints: PyConstraints, + column_defaults: HashMap, + definition: Option, + ) -> Self { + let create = CreateExternalTable { + schema: Arc::new(schema.into()), + name: name.into(), + location, + file_type, + table_partition_cols, + if_not_exists, + temporary, + definition, + order_exprs: order_exprs + .into_iter() + .map(|vec| vec.into_iter().map(|s| s.into()).collect::>()) + .collect::>(), + unbounded, + options, + constraints: constraints.constraints, + column_defaults: column_defaults + .into_iter() + .map(|(k, v)| (k, v.into())) + .collect(), + }; + PyCreateExternalTable { create: create } + } + + + pub fn schema(&self) -> PyDFSchema { + (*self.create.schema).clone().into() + } + + + pub fn name(&self) -> PyResult { + Ok(self.create.name.to_string()) + } + + + pub fn location(&self) -> String { + self.create.location.clone() + } + + + pub fn file_type(&self) -> String { + self.create.file_type.clone() + } + + + pub fn table_partition_cols(&self) -> Vec { + self.create.table_partition_cols.clone() + } + + + pub fn if_not_exists(&self) -> bool { + self.create.if_not_exists + } + + + pub fn temporary(&self) -> bool { + self.create.temporary + } + + + pub fn definition(&self) -> Option { + self.create.definition.clone() + } + + + pub fn order_exprs(&self) -> Vec> { + self.create.order_exprs.iter().map(|vec| vec.iter().map(|s| s.clone().into()).collect()).collect() + } + + + pub fn unbounded(&self) -> bool { + self.create.unbounded + } + + + pub fn options(&self) -> HashMap { + self.create.options.clone() + } + + + pub fn constraints(&self) -> PyConstraints { + PyConstraints { + constraints: self.create.constraints.clone(), + } + } + + + pub fn column_defaults(&self) -> HashMap { + self.create + .column_defaults + .iter() + .map(|(k, v)| (k.clone(), v.clone().into())) + .collect() + + } + + fn __repr__(&self) -> PyResult { + Ok(format!("CreateExternalTable({})", self)) + } + + fn __name__(&self) -> PyResult { + Ok("CreateExternalTable".to_string()) + } +} + + +impl LogicalNode for PyCreateExternalTable { + fn inputs(&self) -> Vec { + vec![] + } + + fn to_variant<'py>(&self, py: Python<'py>) -> PyResult { + Ok(self.clone().into_py(py)) + } +} + diff --git a/src/expr/create_function.rs b/src/expr/create_function.rs new file mode 100644 index 000000000..c118511b3 --- /dev/null +++ b/src/expr/create_function.rs @@ -0,0 +1,177 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +use std::{fmt::{self, Display, Formatter}, sync::Arc}; + +use datafusion::logical_expr::{CreateFunction, CreateFunctionBody, OperateFunctionArg, Volatility}; +use pyo3::prelude::*; + +use crate::common::{data_type::PyDataType, df_schema::PyDFSchema}; +use super::logical_node::LogicalNode; +use crate::sql::logical::PyLogicalPlan; +use super::PyExpr; + +#[pyclass(name = "CreateFunction", module = "datafusion.expr", subclass)] +#[derive(Clone)] +pub struct PyCreateFunction { + create: CreateFunction, +} + + +impl From for CreateFunction { + fn from(create: PyCreateFunction) -> Self { + create.create + } +} + +impl From for PyCreateFunction { + fn from(create: CreateFunction) -> PyCreateFunction { + PyCreateFunction { create } + } +} + +impl Display for PyCreateFunction { + fn fmt(&self, f: &mut Formatter) -> fmt::Result { + write!(f, "CreateFunction: name {:?}", self.create.name) + } +} + +#[pyclass(name = "OperateFunctionArg", module = "datafusion.expr", subclass)] +#[derive(Clone)] +pub struct PyOperateFunctionArg { + arg: OperateFunctionArg, +} + +#[derive(Debug, Clone, PartialEq, Eq, Hash, PartialOrd, Ord)] +#[pyclass(eq, eq_int, name = "Volatility", module = "datafusion.expr")] +pub enum PyVolatility { + Immutable, + Stable, + Volatile, +} + +#[pyclass(name = "CreateFunctionBody", module = "datafusion.expr", subclass)] +#[derive(Clone)] +pub struct PyCreateFunctionBody { + body: CreateFunctionBody +} + +#[pymethods] +impl PyCreateFunctionBody { + + + pub fn language(&self) -> Option { + self.body.language.as_ref().map(|language| language.to_string()) + } + + + pub fn behavior(&self) -> Option { + self.body.behavior.as_ref().map(|behavior| match behavior { + Volatility::Immutable => PyVolatility::Immutable, + Volatility::Stable => PyVolatility::Stable, + Volatility::Volatile => PyVolatility::Volatile, + }) + } + + pub fn function_body(&self) -> Option { + self.body.function_body.as_ref().map(|function_body| function_body.clone().into()) + } +} + +#[pymethods] +impl PyCreateFunction { + + #[new] + #[pyo3(signature = (or_replace, temporary, name, params, schema, return_type=None, args=None))] + pub fn new( + or_replace: bool, + temporary: bool, + name: String, + params: PyCreateFunctionBody, + schema: PyDFSchema, + return_type: Option, + args: Option>, + ) -> Self { + PyCreateFunction { + create: CreateFunction { + or_replace, + temporary, + name, + args: args.map(|args| args.into_iter().map(|arg| arg.arg).collect()), + return_type: return_type.map(|return_type| return_type.data_type), + params: params.body, + schema: Arc::new(schema.into()), + } + } + } + + + pub fn or_replace(&self) -> bool { + self.create.or_replace + } + + + pub fn temporary(&self) -> bool { + self.create.temporary + } + + + pub fn name(&self) -> String { + self.create.name.clone() + } + + + pub fn params(&self) -> PyCreateFunctionBody { + PyCreateFunctionBody { + body: self.create.params.clone() + } + } + + + pub fn schema(&self) -> PyDFSchema { + (*self.create.schema).clone().into() + } + + + pub fn return_type(&self) -> Option { + self.create.return_type.as_ref().map(|return_type| return_type.clone().into()) + } + + + pub fn args(&self) -> Option> { + self.create.args.as_ref().map(|args| args.iter().map(|arg| PyOperateFunctionArg { arg: arg.clone() }).collect()) + } + + fn __repr__(&self) -> PyResult { + Ok(format!("CreateFunction({})", self)) + } + + fn __name__(&self) -> PyResult { + Ok("CreateFunction".to_string()) + } +} + + +impl LogicalNode for PyCreateFunction { + fn inputs(&self) -> Vec { + vec![] + } + + fn to_variant<'py>(&self, py: Python<'py>) -> PyResult { + Ok(self.clone().into_py(py)) + } +} diff --git a/src/expr/create_index.rs b/src/expr/create_index.rs new file mode 100644 index 000000000..f2fae7558 --- /dev/null +++ b/src/expr/create_index.rs @@ -0,0 +1,130 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +use std::{fmt::{self, Display, Formatter}, sync::Arc}; + +use datafusion::logical_expr::CreateIndex; +use pyo3::prelude::*; + +use crate::{common::df_schema::PyDFSchema, sql::logical::PyLogicalPlan}; + +use super::{logical_node::LogicalNode, sort_expr::PySortExpr}; + +#[pyclass(name = "CreateIndex", module = "datafusion.expr", subclass)] +#[derive(Clone)] +pub struct PyCreateIndex { + create: CreateIndex, +} + +impl From for CreateIndex { + fn from(create: PyCreateIndex) -> Self { + create.create + } +} + +impl From for PyCreateIndex { + fn from(create: CreateIndex) -> PyCreateIndex { + PyCreateIndex { create } + } +} + +impl Display for PyCreateIndex { + fn fmt(&self, f: &mut Formatter) -> fmt::Result { + write!(f, "CreateIndex: {:?}", self.create.name) + } +} + +#[pymethods] +impl PyCreateIndex { + #[new] + #[pyo3(signature = (table, columns, unique, if_not_exists, schema, name=None, using=None))] + pub fn new( + table: String, + columns: Vec, + unique: bool, + if_not_exists: bool, + schema: PyDFSchema, + name: Option, + using: Option, + ) -> PyResult { + Ok(PyCreateIndex { + create: CreateIndex { + name, + table: table.into(), + using, + columns: columns.iter().map(|c| c.clone().into()).collect(), + unique, + if_not_exists, + schema: Arc::new(schema.into()) + } + }) + } + + + pub fn name(&self) -> Option { + self.create.name.clone() + } + + + pub fn table(&self) -> PyResult { + Ok(self.create.table.to_string()) + } + + + pub fn using(&self) -> Option { + self.create.using.clone() + } + + + pub fn columns(&self) -> Vec { + self.create.columns.iter().map(|c| c.clone().into()).collect() + } + + + pub fn unique(&self) -> bool { + self.create.unique + } + + + pub fn if_not_exists(&self) -> bool { + self.create.if_not_exists + } + + + pub fn schema(&self) -> PyDFSchema { + (*self.create.schema).clone().into() + } + + fn __repr__(&self) -> PyResult { + Ok(format!("CreateIndex({})", self)) + } + + fn __name__(&self) -> PyResult { + Ok("CreateIndex".to_string()) + } +} + +impl LogicalNode for PyCreateIndex { + fn inputs(&self) -> Vec { + vec![] + } + + fn to_variant(&self, py: Python) -> PyResult { + Ok(self.clone().into_py(py)) + } + +} diff --git a/src/expr/describe_table.rs b/src/expr/describe_table.rs new file mode 100644 index 000000000..63e75fd82 --- /dev/null +++ b/src/expr/describe_table.rs @@ -0,0 +1,71 @@ +use std::{fmt::{self, Display, Formatter}, sync::Arc}; + +use arrow::{datatypes::Schema, pyarrow::PyArrowType}; +use datafusion::logical_expr::DescribeTable; +use pyo3::prelude::*; + +use crate::{common::df_schema::PyDFSchema, sql::logical::PyLogicalPlan}; + +use super::logical_node::LogicalNode; + + +#[pyclass(name = "DescribeTable", module = "datafusion.expr", subclass)] +#[derive(Clone)] +pub struct PyDescribeTable { + describe: DescribeTable, +} + +impl Display for PyDescribeTable { + fn fmt(&self, f: &mut Formatter) -> fmt::Result { + write!(f, "DescribeTable") + } +} + +#[pymethods] +impl PyDescribeTable { + #[new] + fn new(schema: PyArrowType, output_schema: PyDFSchema) -> Self { + Self { + describe: DescribeTable { schema: Arc::new(schema.0), output_schema: Arc::new(output_schema.into())} + } + } + + pub fn schema(&self) -> PyArrowType { + (*self.describe.schema).clone().into() + } + + pub fn output_schema(&self) -> PyDFSchema { + (*self.describe.output_schema).clone().into() + } + + fn __repr__(&self) -> PyResult { + Ok(format!("DescribeTable({})", self)) + } + + fn __name__(&self) -> PyResult { + Ok("DescribeTable".to_string()) + } +} + + +impl From for DescribeTable { + fn from(describe: PyDescribeTable) -> Self { + describe.describe + } +} + +impl From for PyDescribeTable { + fn from(describe: DescribeTable) -> PyDescribeTable { + PyDescribeTable { describe } + } +} + +impl LogicalNode for PyDescribeTable { + fn inputs(&self) -> Vec { + vec![] + } + + fn to_variant<'py>(&self, py: Python<'py>) -> PyResult { + Ok(self.clone().into_py(py)) + } +} diff --git a/src/expr/dml.rs b/src/expr/dml.rs new file mode 100644 index 000000000..9063c4e72 --- /dev/null +++ b/src/expr/dml.rs @@ -0,0 +1,119 @@ +use datafusion::logical_expr::dml::InsertOp; +use datafusion::logical_expr::{DmlStatement, WriteOp}; +use pyo3::prelude::*; + +use crate::{common::df_schema::PyDFSchema, sql::logical::PyLogicalPlan}; + +use super::logical_node::LogicalNode; + + +#[pyclass(name = "DmlStatement", module = "datafusion.expr", subclass)] +#[derive(Clone)] +pub struct PyDmlStatement { + dml: DmlStatement +} + +impl From for DmlStatement { + fn from(dml: PyDmlStatement) -> Self { + dml.dml + } +} + +impl From for PyDmlStatement { + fn from(dml: DmlStatement) -> PyDmlStatement { + PyDmlStatement { dml } + } +} + +impl LogicalNode for PyDmlStatement { + + fn inputs(&self) -> Vec { + vec![PyLogicalPlan::from((*self.dml.input).clone())] + } + + fn to_variant<'py>(&self, py: Python<'py>) -> PyResult { + Ok(self.clone().into_py(py)) + } +} + +#[pymethods] +impl PyDmlStatement { + + pub fn table_name(&self) -> PyResult { + Ok(self.dml.table_name.to_string()) + } + + pub fn table_schema(&self) -> PyDFSchema { + (*self.dml.table_schema).clone().into() + } + + pub fn op(&self) -> PyWriteOp { + self.dml.op.clone().into() + } + + pub fn input(&self) -> PyLogicalPlan { + PyLogicalPlan{plan: self.dml.input.clone()} + } + + pub fn output_schema(&self) -> PyDFSchema { + (*self.dml.output_schema).clone().into() + } + + fn __repr__(&self) -> PyResult { + Ok("DmlStatement".to_string()) + } + + fn __name__(&self) -> PyResult { + Ok("DmlStatement".to_string()) + } +} + + +#[derive(Debug, Clone, PartialEq, Eq, Hash, PartialOrd, Ord)] +#[pyclass(eq, eq_int, name = "WriteOp", module = "datafusion.expr")] +pub enum PyWriteOp { + Append, + Overwrite, + Replace, + + Update, + Delete, + Ctas, +} + +impl From for PyWriteOp { + fn from(write_op: WriteOp) -> Self { + match write_op { + WriteOp::Insert(InsertOp::Append) => PyWriteOp::Append, + WriteOp::Insert(InsertOp::Overwrite) => PyWriteOp::Overwrite, + WriteOp::Insert(InsertOp::Replace) => PyWriteOp::Replace, + + WriteOp::Update => PyWriteOp::Update, + WriteOp::Delete => PyWriteOp::Delete, + WriteOp::Ctas => PyWriteOp::Ctas, + } + } +} + +impl From for WriteOp { + + fn from(py: PyWriteOp) -> Self { + match py { + PyWriteOp::Append => WriteOp::Insert(InsertOp::Append), + PyWriteOp::Overwrite => WriteOp::Insert(InsertOp::Overwrite), + PyWriteOp::Replace => WriteOp::Insert(InsertOp::Replace), + + PyWriteOp::Update => WriteOp::Update, + PyWriteOp::Delete => WriteOp::Delete, + PyWriteOp::Ctas => WriteOp::Ctas, + } + } +} + +#[pymethods] +impl PyWriteOp { + fn name(&self) -> String { + let write_op: WriteOp = self.clone().into(); + write_op.name().to_string() + } +} diff --git a/src/expr/drop_catalog_schema.rs b/src/expr/drop_catalog_schema.rs new file mode 100644 index 000000000..ed2620c7c --- /dev/null +++ b/src/expr/drop_catalog_schema.rs @@ -0,0 +1,89 @@ +use std::{fmt::{self, Display, Formatter}, sync::Arc}; + +use datafusion::{common::SchemaReference, logical_expr::DropCatalogSchema, sql::TableReference}; +use pyo3::{exceptions::PyValueError, prelude::*}; + +use crate::common::df_schema::PyDFSchema; + +use super::logical_node::LogicalNode; +use crate::sql::logical::PyLogicalPlan; + +#[pyclass(name = "DropCatalogSchema", module = "datafusion.expr", subclass)] +#[derive(Clone)] +pub struct PyDropCatalogSchema { + drop: DropCatalogSchema, +} + +impl From for DropCatalogSchema { + fn from(drop: PyDropCatalogSchema) -> Self { + drop.drop + } +} + +impl From for PyDropCatalogSchema { + fn from(drop: DropCatalogSchema) -> PyDropCatalogSchema { + PyDropCatalogSchema { drop } + } +} + +impl Display for PyDropCatalogSchema { + fn fmt(&self, f: &mut Formatter) -> fmt::Result { + write!(f, "DropCatalogSchema") + } +} + +fn parse_schema_reference(name: String) -> PyResult { + match name.into() { + TableReference::Bare { table } => Ok(SchemaReference::Bare { schema: table }), + TableReference::Partial { schema, table } => Ok(SchemaReference::Full { schema: table, catalog: schema }), + TableReference::Full { catalog: _, schema: _, table: _ } => { + Err(PyErr::new::("Invalid schema specifier (has 3 parts)".to_string())) + } + } +} + +#[pymethods] +impl PyDropCatalogSchema { + #[new] + fn new(name: String, schema: PyDFSchema, if_exists: bool, cascade: bool) -> PyResult { + let name = parse_schema_reference(name)?; + Ok(PyDropCatalogSchema { drop: DropCatalogSchema { + name, + schema: Arc::new(schema.into()), + if_exists, + cascade + } }) + } + + fn name(&self) -> PyResult { + Ok(self.drop.name.to_string()) + } + + fn schema(&self) -> PyDFSchema { + (*self.drop.schema).clone().into() + } + + fn if_exists(&self) -> PyResult { + Ok(self.drop.if_exists) + } + + fn cascade(&self) -> PyResult { + Ok(self.drop.cascade) + } + + fn __repr__(&self) -> PyResult { + Ok(format!("DropCatalogSchema({})", self)) + } + +} + + +impl LogicalNode for PyDropCatalogSchema { + fn inputs(&self) -> Vec { + vec![] + } + + fn to_variant<'py>(&self, py: Python<'py>) -> PyResult { + Ok(self.clone().into_py(py)) + } +} diff --git a/src/expr/drop_function.rs b/src/expr/drop_function.rs new file mode 100644 index 000000000..d3fe0550d --- /dev/null +++ b/src/expr/drop_function.rs @@ -0,0 +1,75 @@ +use std::{fmt::{self, Display, Formatter}, sync::Arc}; + +use datafusion::logical_expr::DropFunction; +use pyo3::prelude::*; + +use crate::common::df_schema::PyDFSchema; +use super::logical_node::LogicalNode; +use crate::sql::logical::PyLogicalPlan; + +#[pyclass(name = "DropFunction", module = "datafusion.expr", subclass)] +#[derive(Clone)] +pub struct PyDropFunction { + drop: DropFunction, +} + + +impl From for DropFunction { + fn from(drop: PyDropFunction) -> Self { + drop.drop + } +} + +impl From for PyDropFunction { + fn from(drop: DropFunction) -> PyDropFunction { + PyDropFunction { drop } + } +} + +impl Display for PyDropFunction { + fn fmt(&self, f: &mut Formatter) -> fmt::Result { + write!(f, "DropFunction") + } +} + +#[pymethods] +impl PyDropFunction { + #[new] + fn new(name: String, schema: PyDFSchema, if_exists: bool) -> PyResult { + Ok(PyDropFunction { drop: DropFunction { + name: name.into(), + schema: Arc::new(schema.into()), + if_exists, + } }) + } + fn name(&self) -> PyResult { + Ok(self.drop.name.clone()) + } + + fn schema(&self) -> PyDFSchema { + (*self.drop.schema).clone().into() + } + + fn if_exists(&self) -> PyResult { + Ok(self.drop.if_exists) + } + + fn __repr__(&self) -> PyResult { + Ok(format!("DropFunction({})", self)) + } + + fn __name__(&self) -> PyResult { + Ok("DropFunction".to_string()) + } +} + +impl LogicalNode for PyDropFunction { + fn inputs(&self) -> Vec { + vec![] + } + + fn to_variant<'py>(&self, py: Python<'py>) -> PyResult { + Ok(self.clone().into_py(py)) + } +} + diff --git a/src/expr/drop_view.rs b/src/expr/drop_view.rs new file mode 100644 index 000000000..00e4625b5 --- /dev/null +++ b/src/expr/drop_view.rs @@ -0,0 +1,72 @@ +use std::{fmt::{self, Display, Formatter}, sync::Arc}; + +use datafusion::logical_expr::DropView; +use pyo3::prelude::*; + +use crate::common::df_schema::PyDFSchema; + +use super::logical_node::LogicalNode; +use crate::sql::logical::PyLogicalPlan; + +#[pyclass(name = "DropView", module = "datafusion.expr", subclass)] +#[derive(Clone)] +pub struct PyDropView { + drop: DropView, +} + + +impl From for DropView { + fn from(drop: PyDropView) -> Self { + drop.drop + } +} + +impl From for PyDropView { + fn from(drop: DropView) -> PyDropView { + PyDropView { drop } + } +} + +impl Display for PyDropView { + fn fmt(&self, f: &mut Formatter) -> fmt::Result { + write!(f, "DropView: {name:?} if not exist:={if_exists}", name = self.drop.name, if_exists = self.drop.if_exists) + } +} + +#[pymethods] +impl PyDropView { + #[new] + fn new(name: String, schema: PyDFSchema, if_exists: bool) -> PyResult { + Ok(PyDropView { drop: DropView { name: name.into(), schema: Arc::new(schema.into()), if_exists } }) + } + + fn name(&self) -> PyResult { + Ok(self.drop.name.to_string()) + } + + fn schema(&self) -> PyDFSchema { + (*self.drop.schema).clone().into() + } + + fn if_exists(&self) -> PyResult { + Ok(self.drop.if_exists) + } + + fn __repr__(&self) -> PyResult { + Ok(format!("DropView({})", self)) + } + + fn __name__(&self) -> PyResult { + Ok("DropView".to_string()) + } +} + +impl LogicalNode for PyDropView { + fn inputs(&self) -> Vec { + vec![] + } + + fn to_variant<'py>(&self, py: Python<'py>) -> PyResult { + Ok(self.clone().into_py(py)) + } +} diff --git a/src/expr/recursive_query.rs b/src/expr/recursive_query.rs new file mode 100644 index 000000000..f5435613d --- /dev/null +++ b/src/expr/recursive_query.rs @@ -0,0 +1,88 @@ +use std::fmt::{self, Display, Formatter}; + +use datafusion::logical_expr::RecursiveQuery; +use pyo3::prelude::*; + +use crate::sql::logical::PyLogicalPlan; + +use super::logical_node::LogicalNode; + +#[pyclass(name = "RecursiveQuery", module = "datafusion.expr", subclass)] +#[derive(Clone)] +pub struct PyRecursiveQuery { + query: RecursiveQuery +} + +impl From for RecursiveQuery { + fn from(query: PyRecursiveQuery) -> Self { + query.query + } +} + +impl From for PyRecursiveQuery { + fn from(query: RecursiveQuery) -> PyRecursiveQuery { + PyRecursiveQuery { query } + } +} + +impl Display for PyRecursiveQuery { + fn fmt(&self, f: &mut Formatter) -> fmt::Result { + write!(f, "RecursiveQuery {name:?} is_distinct:={is_distinct}", name = self.query.name, is_distinct = self.query.is_distinct) + } +} + +#[pymethods] +impl PyRecursiveQuery { + #[new] + fn new( + name: String, + static_term: PyLogicalPlan, + recursive_term: PyLogicalPlan, + is_distinct: bool) -> Self { + Self { + query: RecursiveQuery{ + name, + static_term: static_term.plan(), + recursive_term: recursive_term.plan(), + is_distinct + } + } + } + + fn name(&self) -> PyResult { + Ok(self.query.name.clone()) + } + + fn static_term(&self) -> PyLogicalPlan { + PyLogicalPlan::from((*self.query.static_term).clone()) + } + + fn recursive_term(&self) -> PyLogicalPlan { + PyLogicalPlan::from((*self.query.recursive_term).clone()) + } + + fn is_distinct(&self) -> PyResult { + Ok(self.query.is_distinct) + } + + fn __repr__(&self) -> PyResult { + Ok(format!("RecursiveQuery({})", self)) + } + + fn __name__(&self) -> PyResult { + Ok("RecursiveQuery".to_string()) + } +} + +impl LogicalNode for PyRecursiveQuery { + fn inputs(&self) -> Vec { + vec![ + PyLogicalPlan::from((*self.query.static_term).clone()), + PyLogicalPlan::from((*self.query.recursive_term).clone()) + ] + } + + fn to_variant<'py>(&self, py: Python<'py>) -> PyResult { + Ok(self.clone().into_py(py)) + } +} diff --git a/src/expr/statement.rs b/src/expr/statement.rs new file mode 100644 index 000000000..78fe73194 --- /dev/null +++ b/src/expr/statement.rs @@ -0,0 +1,378 @@ +use datafusion::logical_expr::{Deallocate, Execute, Prepare, SetVariable, TransactionAccessMode, TransactionConclusion, TransactionEnd, TransactionIsolationLevel, TransactionStart}; +use pyo3::prelude::*; + +use crate::{common::data_type::PyDataType, sql::logical::PyLogicalPlan}; + +use super::{logical_node::LogicalNode, PyExpr}; + +#[pyclass(name = "TransactionStart", module = "datafusion.expr", subclass)] +#[derive(Clone)] +pub struct PyTransactionStart { + transaction_start: TransactionStart +} + +impl From for PyTransactionStart { + fn from(transaction_start: TransactionStart) -> PyTransactionStart { + PyTransactionStart { transaction_start } + } +} + +impl TryFrom for TransactionStart { + type Error = PyErr; + + fn try_from(py: PyTransactionStart) -> Result { + Ok(py.transaction_start) + } +} + +impl LogicalNode for PyTransactionStart { + fn inputs(&self) -> Vec { + vec![] + } + + fn to_variant<'py>(&self, py: Python<'py>) -> PyResult { + Ok(self.clone().into_py(py)) + } +} + +#[derive(Debug, Clone, PartialEq, Eq, Hash, PartialOrd, Ord)] +#[pyclass(eq, eq_int, name = "TransactionAccessMode", module = "datafusion.expr")] +pub enum PyTransactionAccessMode { + ReadOnly, + ReadWrite, +} + +impl From for PyTransactionAccessMode { + fn from(access_mode: TransactionAccessMode) -> PyTransactionAccessMode { + match access_mode { + TransactionAccessMode::ReadOnly => PyTransactionAccessMode::ReadOnly, + TransactionAccessMode::ReadWrite => PyTransactionAccessMode::ReadWrite, + } + } +} + +impl TryFrom for TransactionAccessMode { + type Error = PyErr; + + fn try_from(py: PyTransactionAccessMode) -> Result { + match py { + PyTransactionAccessMode::ReadOnly => Ok(TransactionAccessMode::ReadOnly), + PyTransactionAccessMode::ReadWrite => Ok(TransactionAccessMode::ReadWrite), + } + } +} + +#[derive(Debug, Clone, PartialEq, Eq, Hash, PartialOrd, Ord)] +#[pyclass(eq, eq_int, name = "TransactionIsolationLevel", module = "datafusion.expr")] +pub enum PyTransactionIsolationLevel { + ReadUncommitted, + ReadCommitted, + RepeatableRead, + Serializable, +} + +impl From for PyTransactionIsolationLevel { + fn from(isolation_level: TransactionIsolationLevel) -> PyTransactionIsolationLevel { + match isolation_level { + TransactionIsolationLevel::ReadUncommitted => PyTransactionIsolationLevel::ReadUncommitted, + TransactionIsolationLevel::ReadCommitted => PyTransactionIsolationLevel::ReadCommitted, + TransactionIsolationLevel::RepeatableRead => PyTransactionIsolationLevel::RepeatableRead, + TransactionIsolationLevel::Serializable => PyTransactionIsolationLevel::Serializable, + } + } +} + +impl TryFrom for TransactionIsolationLevel { + type Error = PyErr; + + fn try_from(value: PyTransactionIsolationLevel) -> Result { + match value { + PyTransactionIsolationLevel::ReadUncommitted => Ok(TransactionIsolationLevel::ReadUncommitted), + PyTransactionIsolationLevel::ReadCommitted => Ok(TransactionIsolationLevel::ReadCommitted), + PyTransactionIsolationLevel::RepeatableRead => Ok(TransactionIsolationLevel::RepeatableRead), + PyTransactionIsolationLevel::Serializable => Ok(TransactionIsolationLevel::Serializable), + } + } +} + +#[pymethods] +impl PyTransactionStart { + + #[new] + pub fn new(access_mode: PyTransactionAccessMode, isolation_level: PyTransactionIsolationLevel) -> PyResult { + let access_mode = access_mode.try_into()?; + let isolation_level = isolation_level.try_into()?; + Ok(PyTransactionStart { transaction_start: TransactionStart {access_mode, isolation_level} }) + } + + + pub fn access_mode(&self) -> PyResult { + Ok(self.transaction_start.access_mode.clone().into()) + } + + pub fn isolation_level(&self) -> PyResult { + Ok(self.transaction_start.isolation_level.clone().into()) + } +} + +#[pyclass(name = "TransactionEnd", module = "datafusion.expr", subclass)] +#[derive(Clone)] +pub struct PyTransactionEnd { + transaction_end: TransactionEnd +} + +impl From for PyTransactionEnd { + fn from(transaction_end: TransactionEnd) -> PyTransactionEnd { + PyTransactionEnd { transaction_end } + } +} + +impl TryFrom for TransactionEnd { + type Error = PyErr; + + fn try_from(py: PyTransactionEnd) -> Result { + Ok(py.transaction_end) + } +} + +impl LogicalNode for PyTransactionEnd { + fn inputs(&self) -> Vec { + vec![] + } + + fn to_variant<'py>(&self, py: Python<'py>) -> PyResult { + Ok(self.clone().into_py(py)) + } +} + +#[derive(Debug, Clone, PartialEq, Eq, Hash, PartialOrd, Ord)] +#[pyclass(eq, eq_int, name = "TransactionConclusion", module = "datafusion.expr")] +pub enum PyTransactionConclusion { + Commit, + Rollback, +} + +impl From for PyTransactionConclusion { + fn from(value: TransactionConclusion) -> Self { + match value { + TransactionConclusion::Commit => PyTransactionConclusion::Commit, + TransactionConclusion::Rollback => PyTransactionConclusion::Rollback, + } + } +} + +impl TryFrom for TransactionConclusion { + type Error = PyErr; + + fn try_from(value: PyTransactionConclusion) -> Result { + match value { + PyTransactionConclusion::Commit => Ok(TransactionConclusion::Commit), + PyTransactionConclusion::Rollback => Ok(TransactionConclusion::Rollback), + } + } +} +#[pymethods] +impl PyTransactionEnd { + #[new] + pub fn new(conclusion: PyTransactionConclusion, chain: bool) -> PyResult { + let conclusion = conclusion.try_into()?; + Ok(PyTransactionEnd { transaction_end: TransactionEnd {conclusion, chain} }) + } + + pub fn conclusion(&self) -> PyResult { + Ok(self.transaction_end.conclusion.clone().into()) + } + + pub fn chain(&self) -> bool { + self.transaction_end.chain + } +} + +#[pyclass(name = "SetVariable", module = "datafusion.expr", subclass)] +#[derive(Clone)] +pub struct PySetVariable { + set_variable: SetVariable +} + +impl From for PySetVariable { + fn from(set_variable: SetVariable) -> PySetVariable { + PySetVariable { set_variable } + } +} + +impl TryFrom for SetVariable { + type Error = PyErr; + + fn try_from(py: PySetVariable) -> Result { + Ok(py.set_variable) + } +} + +impl LogicalNode for PySetVariable { + fn inputs(&self) -> Vec { + vec![] + } + + fn to_variant<'py>(&self, py: Python<'py>) -> PyResult { + Ok(self.clone().into_py(py)) + } +} + +#[pymethods] +impl PySetVariable { + #[new] + pub fn new(variable: String, value: String) -> Self { + PySetVariable { set_variable: SetVariable{variable, value} } + } + + pub fn variable(&self) -> String { + self.set_variable.variable.clone() + } + + pub fn value(&self) -> String { + self.set_variable.value.clone() + } + +} + +#[pyclass(name = "Prepare", module = "datafusion.expr", subclass)] +#[derive(Clone)] +pub struct PyPrepare { + prepare: Prepare +} + +impl From for PyPrepare { + fn from(prepare: Prepare) -> PyPrepare { + PyPrepare { prepare } + } +} + +impl TryFrom for Prepare { + type Error = PyErr; + + fn try_from(py: PyPrepare) -> Result { + Ok(py.prepare) + } +} + +impl LogicalNode for PyPrepare { + fn inputs(&self) -> Vec { + vec![PyLogicalPlan::from((*self.prepare.input).clone())] + } + + fn to_variant<'py>(&self, py: Python<'py>) -> PyResult { + Ok(self.clone().into_py(py)) + } +} + +#[pymethods] +impl PyPrepare { + #[new] + pub fn new(name: String, data_types: Vec, input: PyLogicalPlan) -> Self { + let input = input.plan().clone(); + let data_types = data_types.into_iter().map(|data_type| data_type.try_into().unwrap()).collect(); + PyPrepare { prepare: Prepare {name, data_types, input} } + } + + pub fn name(&self) -> String { + self.prepare.name.clone() + } + + pub fn data_types(&self) -> Vec { + self.prepare.data_types.clone().into_iter().map(|t| t.into()).collect() + } + + pub fn input(&self) -> PyLogicalPlan { + PyLogicalPlan{plan: self.prepare.input.clone()} + } +} + + +#[pyclass(name = "Execute", module = "datafusion.expr", subclass)] +#[derive(Clone)] +pub struct PyExecute { + execute: Execute +} + +impl From for PyExecute { + fn from(execute: Execute) -> PyExecute { + PyExecute { execute } + } +} + +impl TryFrom for Execute { + type Error = PyErr; + + fn try_from(py: PyExecute) -> Result { + Ok(py.execute) + } +} + +impl LogicalNode for PyExecute { + fn inputs(&self) -> Vec { + vec![] + } + + fn to_variant<'py>(&self, py: Python<'py>) -> PyResult { + Ok(self.clone().into_py(py)) + } +} + +#[pymethods] +impl PyExecute { + #[new] + pub fn new(name: String, parameters: Vec) -> Self { + let parameters = parameters.into_iter().map(|parameter| parameter.try_into().unwrap()).collect(); + PyExecute { execute: Execute {name, parameters} } + } + + pub fn name(&self) -> String { + self.execute.name.clone() + } + + pub fn parameters(&self) -> Vec { + self.execute.parameters.clone().into_iter().map(|t| t.into()).collect() + } +} + +#[pyclass(name = "Deallocate", module = "datafusion.expr", subclass)] +#[derive(Clone)] +pub struct PyDeallocate { + deallocate: Deallocate +} + +impl From for PyDeallocate { + fn from(deallocate: Deallocate) -> PyDeallocate { + PyDeallocate { deallocate } + } +} + +impl TryFrom for Deallocate { + type Error = PyErr; + + fn try_from(py: PyDeallocate) -> Result { + Ok(py.deallocate) + } +} + +impl LogicalNode for PyDeallocate { + fn inputs(&self) -> Vec { + vec![] + } + + fn to_variant<'py>(&self, py: Python<'py>) -> PyResult { + Ok(self.clone().into_py(py)) + } +} + +#[pymethods] +impl PyDeallocate { + #[new] + pub fn new(name: String) -> Self { + PyDeallocate { deallocate: Deallocate {name} } + } + + pub fn name(&self) -> String { + self.deallocate.name.clone() + } +} diff --git a/src/expr/values.rs b/src/expr/values.rs new file mode 100644 index 000000000..eeabbe496 --- /dev/null +++ b/src/expr/values.rs @@ -0,0 +1,58 @@ +use std::sync::Arc; + +use datafusion::logical_expr::Values; +use pyo3::{pyclass, Bound, IntoPyObjectExt, PyAny, PyErr, PyResult, Python}; +use pyo3::prelude::*; + +use crate::{common::df_schema::PyDFSchema, sql::logical::PyLogicalPlan}; + +use super::{logical_node::LogicalNode, PyExpr}; + +#[pyclass(name = "Values", module = "datafusion.expr", subclass)] +#[derive(Clone)] +pub struct PyValues { + values: Values +} + +impl From for PyValues { + fn from(values: Values) -> PyValues { + PyValues { values } + } +} + +impl TryFrom for Values { + type Error = PyErr; + + fn try_from(py: PyValues) -> Result { + Ok(py.values) + } +} + +impl LogicalNode for PyValues { + fn inputs(&self) -> Vec { + vec![] + } + + fn to_variant<'py>(&self, py: Python<'py>) -> PyResult { + Ok(self.clone().into_py(py)) + } +} + +#[pymethods] +impl PyValues { + + #[new] + pub fn new(schema: PyDFSchema, values: Vec>) -> PyResult { + let values = values.into_iter().map(|row| row.into_iter().map(|expr| expr.into()).collect()).collect(); + Ok(PyValues { values: Values {schema: Arc::new(schema.into()), values} }) + } + + pub fn schema(&self) -> PyResult { + Ok((*self.values.schema).clone().into()) + } + + pub fn values(&self) -> Vec> { + self.values.values.clone().into_iter().map(|row| row.into_iter().map(|expr| expr.into()).collect()).collect() + } +} + diff --git a/src/sql/logical.rs b/src/sql/logical.rs index 1be33b75f..8bbb57d80 100644 --- a/src/sql/logical.rs +++ b/src/sql/logical.rs @@ -20,7 +20,21 @@ use std::sync::Arc; use crate::errors::PyDataFusionResult; use crate::expr::aggregate::PyAggregate; use crate::expr::analyze::PyAnalyze; +use crate::expr::copy_to::PyCopyTo; +use crate::expr::create_catalog::PyCreateCatalog; +use crate::expr::create_catalog_schema::PyCreateCatalogSchema; +use crate::expr::create_external_table::PyCreateExternalTable; +use crate::expr::create_function::PyCreateFunction; +use crate::expr::create_index::PyCreateIndex; +use crate::expr::create_memory_table::PyCreateMemoryTable; +use crate::expr::create_view::PyCreateView; +use crate::expr::describe_table::PyDescribeTable; use crate::expr::distinct::PyDistinct; +use crate::expr::dml::PyDmlStatement; +use crate::expr::drop_catalog_schema::PyDropCatalogSchema; +use crate::expr::drop_function::PyDropFunction; +use crate::expr::drop_table::PyDropTable; +use crate::expr::drop_view::PyDropView; use crate::expr::empty_relation::PyEmptyRelation; use crate::expr::explain::PyExplain; use crate::expr::extension::PyExtension; @@ -28,14 +42,19 @@ use crate::expr::filter::PyFilter; use crate::expr::join::PyJoin; use crate::expr::limit::PyLimit; use crate::expr::projection::PyProjection; +use crate::expr::recursive_query::PyRecursiveQuery; +use crate::expr::repartition::PyRepartition; use crate::expr::sort::PySort; +use crate::expr::statement::{PyDeallocate, PyExecute, PyPrepare, PySetVariable, PyTransactionEnd, PyTransactionStart}; use crate::expr::subquery::PySubquery; use crate::expr::subquery_alias::PySubqueryAlias; use crate::expr::table_scan::PyTableScan; +use crate::expr::union::PyUnion; use crate::expr::unnest::PyUnnest; +use crate::expr::values::PyValues; use crate::expr::window::PyWindowExpr; use crate::{context::PySessionContext, errors::py_unsupported_variant_err}; -use datafusion::logical_expr::LogicalPlan; +use datafusion::logical_expr::{DdlStatement, LogicalPlan, Statement}; use datafusion_proto::logical_plan::{AsLogicalPlan, DefaultLogicalExtensionCodec}; use prost::Message; use pyo3::{exceptions::PyRuntimeError, prelude::*, types::PyBytes}; @@ -82,18 +101,34 @@ impl PyLogicalPlan { LogicalPlan::SubqueryAlias(plan) => PySubqueryAlias::from(plan.clone()).to_variant(py), LogicalPlan::Unnest(plan) => PyUnnest::from(plan.clone()).to_variant(py), LogicalPlan::Window(plan) => PyWindowExpr::from(plan.clone()).to_variant(py), - LogicalPlan::Repartition(_) - | LogicalPlan::Union(_) - | LogicalPlan::Statement(_) - | LogicalPlan::Values(_) - | LogicalPlan::Dml(_) - | LogicalPlan::Ddl(_) - | LogicalPlan::Copy(_) - | LogicalPlan::DescribeTable(_) - | LogicalPlan::RecursiveQuery(_) => Err(py_unsupported_variant_err(format!( - "Conversion of variant not implemented: {:?}", - self.plan - ))), + LogicalPlan::Repartition(plan) => PyRepartition::from(plan.clone()).to_variant(py), + LogicalPlan::Union(plan) => PyUnion::from(plan.clone()).to_variant(py), + LogicalPlan::Statement(plan) => match plan { + Statement::TransactionStart(plan) => PyTransactionStart::from(plan.clone()).to_variant(py), + Statement::TransactionEnd(plan) => PyTransactionEnd::from(plan.clone()).to_variant(py), + Statement::SetVariable(plan) => PySetVariable::from(plan.clone()).to_variant(py), + Statement::Prepare(plan) => PyPrepare::from(plan.clone()).to_variant(py), + Statement::Execute(plan) => PyExecute::from(plan.clone()).to_variant(py), + Statement::Deallocate(plan) => PyDeallocate::from(plan.clone()).to_variant(py), + }, + LogicalPlan::Values(plan) => PyValues::from(plan.clone()).to_variant(py), + LogicalPlan::Dml(plan) => PyDmlStatement::from(plan.clone()).to_variant(py), + LogicalPlan::Ddl(plan) => match plan { + DdlStatement::CreateExternalTable(plan) => PyCreateExternalTable::from(plan.clone()).to_variant(py), + DdlStatement::CreateMemoryTable(plan) => PyCreateMemoryTable::from(plan.clone()).to_variant(py), + DdlStatement::CreateView(plan) => PyCreateView::from(plan.clone()).to_variant(py), + DdlStatement::CreateCatalogSchema(plan) => PyCreateCatalogSchema::from(plan.clone()).to_variant(py), + DdlStatement::CreateCatalog(plan) => PyCreateCatalog::from(plan.clone()).to_variant(py), + DdlStatement::CreateIndex(plan) => PyCreateIndex::from(plan.clone()).to_variant(py), + DdlStatement::DropTable(plan) => PyDropTable::from(plan.clone()).to_variant(py), + DdlStatement::DropView(plan) => PyDropView::from(plan.clone()).to_variant(py), + DdlStatement::DropCatalogSchema(plan) => PyDropCatalogSchema::from(plan.clone()).to_variant(py), + DdlStatement::CreateFunction(plan) => PyCreateFunction::from(plan.clone()).to_variant(py), + DdlStatement::DropFunction(plan) => PyDropFunction::from(plan.clone()).to_variant(py), + }, + LogicalPlan::Copy(plan) => PyCopyTo::from(plan.clone()).to_variant(py), + LogicalPlan::DescribeTable(plan) => PyDescribeTable::from(plan.clone()).to_variant(py), + LogicalPlan::RecursiveQuery(plan) => PyRecursiveQuery::from(plan.clone()).to_variant(py), } } From 088334b9268de4285137181eb5abe561ff00d086 Mon Sep 17 00:00:00 2001 From: Chongchen Chen Date: Wed, 26 Mar 2025 22:14:31 +0800 Subject: [PATCH 02/11] format --- src/expr.rs | 30 ++++---- src/expr/constraints.rs | 1 - src/expr/copy_to.rs | 16 +++-- src/expr/create_catalog.rs | 14 ++-- src/expr/create_catalog_schema.rs | 16 +++-- src/expr/create_external_table.rs | 34 ++++----- src/expr/create_function.rs | 57 ++++++++------- src/expr/create_index.rs | 23 +++--- src/expr/describe_table.rs | 14 ++-- src/expr/dml.rs | 15 ++-- src/expr/drop_catalog_schema.rs | 36 ++++++---- src/expr/drop_function.rs | 21 +++--- src/expr/drop_view.rs | 21 ++++-- src/expr/recursive_query.rs | 30 ++++---- src/expr/statement.rs | 116 ++++++++++++++++++++++-------- src/expr/values.rs | 25 +++++-- src/sql/logical.rs | 44 +++++++++--- 17 files changed, 325 insertions(+), 188 deletions(-) diff --git a/src/expr.rs b/src/expr.rs index 542a83fe8..a87e4fed0 100644 --- a/src/expr.rs +++ b/src/expr.rs @@ -64,10 +64,22 @@ pub mod case; pub mod cast; pub mod column; pub mod conditional_expr; +pub mod constraints; +pub mod copy_to; +pub mod create_catalog; +pub mod create_catalog_schema; +pub mod create_external_table; +pub mod create_function; +pub mod create_index; pub mod create_memory_table; pub mod create_view; +pub mod describe_table; pub mod distinct; +pub mod dml; +pub mod drop_catalog_schema; +pub mod drop_function; pub mod drop_table; +pub mod drop_view; pub mod empty_relation; pub mod exists; pub mod explain; @@ -83,34 +95,22 @@ pub mod literal; pub mod logical_node; pub mod placeholder; pub mod projection; +pub mod recursive_query; pub mod repartition; pub mod scalar_subquery; pub mod scalar_variable; pub mod signature; pub mod sort; pub mod sort_expr; +pub mod statement; pub mod subquery; pub mod subquery_alias; pub mod table_scan; pub mod union; pub mod unnest; pub mod unnest_expr; -pub mod window; -pub mod statement; pub mod values; -pub mod dml; -pub mod create_external_table; -pub mod copy_to; -pub mod create_catalog_schema; -pub mod drop_view; -pub mod create_catalog; -pub mod drop_catalog_schema; -pub mod drop_function; -pub mod create_function; -pub mod create_index; -pub mod describe_table; -pub mod recursive_query; -pub mod constraints; +pub mod window; use sort_expr::{to_sort_expressions, PySortExpr}; diff --git a/src/expr/constraints.rs b/src/expr/constraints.rs index 3f02ddbd0..d2480bac4 100644 --- a/src/expr/constraints.rs +++ b/src/expr/constraints.rs @@ -26,4 +26,3 @@ impl Display for PyConstraints { write!(f, "Constraints: {:?}", self.constraints) } } - diff --git a/src/expr/copy_to.rs b/src/expr/copy_to.rs index ba8e89957..2a0eb6253 100644 --- a/src/expr/copy_to.rs +++ b/src/expr/copy_to.rs @@ -15,7 +15,11 @@ // specific language governing permissions and limitations // under the License. -use std::{collections::HashMap, fmt::{self, Display, Formatter}, sync::Arc}; +use std::{ + collections::HashMap, + fmt::{self, Display, Formatter}, + sync::Arc, +}; use datafusion::{common::file_options::file_type::FileType, logical_expr::dml::CopyTo}; use pyo3::prelude::*; @@ -49,7 +53,6 @@ impl Display for PyCopyTo { } impl LogicalNode for PyCopyTo { - fn inputs(&self) -> Vec { vec![PyLogicalPlan::from((*self.copy.input).clone())] } @@ -67,7 +70,8 @@ impl PyCopyTo { output_url: String, partition_by: Vec, file_type: PyFileType, - options: HashMap) -> Self { + options: HashMap, + ) -> Self { return PyCopyTo { copy: CopyTo { input: input.plan(), @@ -76,7 +80,7 @@ impl PyCopyTo { file_type: file_type.file_type, options, }, - } + }; } fn input(&self) -> PyLogicalPlan { @@ -92,7 +96,9 @@ impl PyCopyTo { } fn file_type(&self) -> PyFileType { - PyFileType { file_type: self.copy.file_type.clone() } + PyFileType { + file_type: self.copy.file_type.clone(), + } } fn options(&self) -> HashMap { diff --git a/src/expr/create_catalog.rs b/src/expr/create_catalog.rs index 47385af54..cc53283d4 100644 --- a/src/expr/create_catalog.rs +++ b/src/expr/create_catalog.rs @@ -15,7 +15,10 @@ // specific language governing permissions and limitations // under the License. -use std::{fmt::{self, Display, Formatter}, sync::Arc}; +use std::{ + fmt::{self, Display, Formatter}, + sync::Arc, +}; use datafusion::logical_expr::CreateCatalog; use pyo3::prelude::*; @@ -50,15 +53,18 @@ impl Display for PyCreateCatalog { #[pymethods] impl PyCreateCatalog { - #[new] - pub fn new(catalog_name: String, if_not_exists: bool, schema: PyDFSchema) -> PyResult { + pub fn new( + catalog_name: String, + if_not_exists: bool, + schema: PyDFSchema, + ) -> PyResult { Ok(PyCreateCatalog { create: CreateCatalog { catalog_name, if_not_exists, schema: Arc::new(schema.into()), - } + }, }) } diff --git a/src/expr/create_catalog_schema.rs b/src/expr/create_catalog_schema.rs index f256a6e04..8bed75553 100644 --- a/src/expr/create_catalog_schema.rs +++ b/src/expr/create_catalog_schema.rs @@ -15,7 +15,10 @@ // specific language governing permissions and limitations // under the License. -use std::{fmt::{self, Display, Formatter}, sync::Arc}; +use std::{ + fmt::{self, Display, Formatter}, + sync::Arc, +}; use datafusion::logical_expr::CreateCatalogSchema; use pyo3::prelude::*; @@ -30,7 +33,6 @@ pub struct PyCreateCatalogSchema { create: CreateCatalogSchema, } - impl From for CreateCatalogSchema { fn from(create: PyCreateCatalogSchema) -> Self { create.create @@ -49,18 +51,20 @@ impl Display for PyCreateCatalogSchema { } } - #[pymethods] impl PyCreateCatalogSchema { - #[new] - pub fn new(schema_name: String, if_not_exists: bool, schema: PyDFSchema) -> PyResult { + pub fn new( + schema_name: String, + if_not_exists: bool, + schema: PyDFSchema, + ) -> PyResult { Ok(PyCreateCatalogSchema { create: CreateCatalogSchema { schema_name, if_not_exists, schema: Arc::new(schema.into()), - } + }, }) } diff --git a/src/expr/create_external_table.rs b/src/expr/create_external_table.rs index 0c4be1167..659154972 100644 --- a/src/expr/create_external_table.rs +++ b/src/expr/create_external_table.rs @@ -16,7 +16,11 @@ // under the License. use crate::{expr::PyExpr, sql::logical::PyLogicalPlan}; -use std::{collections::HashMap, fmt::{self, Display, Formatter}, sync::Arc}; +use std::{ + collections::HashMap, + fmt::{self, Display, Formatter}, + sync::Arc, +}; use datafusion::logical_expr::CreateExternalTable; use pyo3::prelude::*; @@ -45,7 +49,11 @@ impl From for PyCreateExternalTable { impl Display for PyCreateExternalTable { fn fmt(&self, f: &mut Formatter) -> fmt::Result { - write!(f, "CreateExternalTable: {:?}{}", self.create.name, self.create.constraints) + write!( + f, + "CreateExternalTable: {:?}{}", + self.create.name, self.create.constraints + ) } } @@ -92,76 +100,66 @@ impl PyCreateExternalTable { PyCreateExternalTable { create: create } } - pub fn schema(&self) -> PyDFSchema { (*self.create.schema).clone().into() } - pub fn name(&self) -> PyResult { Ok(self.create.name.to_string()) } - pub fn location(&self) -> String { self.create.location.clone() } - pub fn file_type(&self) -> String { self.create.file_type.clone() } - pub fn table_partition_cols(&self) -> Vec { self.create.table_partition_cols.clone() } - pub fn if_not_exists(&self) -> bool { self.create.if_not_exists } - pub fn temporary(&self) -> bool { self.create.temporary } - pub fn definition(&self) -> Option { self.create.definition.clone() } - pub fn order_exprs(&self) -> Vec> { - self.create.order_exprs.iter().map(|vec| vec.iter().map(|s| s.clone().into()).collect()).collect() + self.create + .order_exprs + .iter() + .map(|vec| vec.iter().map(|s| s.clone().into()).collect()) + .collect() } - pub fn unbounded(&self) -> bool { self.create.unbounded } - pub fn options(&self) -> HashMap { self.create.options.clone() } - pub fn constraints(&self) -> PyConstraints { PyConstraints { constraints: self.create.constraints.clone(), } } - pub fn column_defaults(&self) -> HashMap { self.create .column_defaults .iter() .map(|(k, v)| (k.clone(), v.clone().into())) .collect() - } fn __repr__(&self) -> PyResult { @@ -173,7 +171,6 @@ impl PyCreateExternalTable { } } - impl LogicalNode for PyCreateExternalTable { fn inputs(&self) -> Vec { vec![] @@ -183,4 +180,3 @@ impl LogicalNode for PyCreateExternalTable { Ok(self.clone().into_py(py)) } } - diff --git a/src/expr/create_function.rs b/src/expr/create_function.rs index c118511b3..f86e5719c 100644 --- a/src/expr/create_function.rs +++ b/src/expr/create_function.rs @@ -15,15 +15,20 @@ // specific language governing permissions and limitations // under the License. -use std::{fmt::{self, Display, Formatter}, sync::Arc}; - -use datafusion::logical_expr::{CreateFunction, CreateFunctionBody, OperateFunctionArg, Volatility}; +use std::{ + fmt::{self, Display, Formatter}, + sync::Arc, +}; + +use datafusion::logical_expr::{ + CreateFunction, CreateFunctionBody, OperateFunctionArg, Volatility, +}; use pyo3::prelude::*; -use crate::common::{data_type::PyDataType, df_schema::PyDFSchema}; use super::logical_node::LogicalNode; -use crate::sql::logical::PyLogicalPlan; use super::PyExpr; +use crate::common::{data_type::PyDataType, df_schema::PyDFSchema}; +use crate::sql::logical::PyLogicalPlan; #[pyclass(name = "CreateFunction", module = "datafusion.expr", subclass)] #[derive(Clone)] @@ -31,7 +36,6 @@ pub struct PyCreateFunction { create: CreateFunction, } - impl From for CreateFunction { fn from(create: PyCreateFunction) -> Self { create.create @@ -67,18 +71,18 @@ pub enum PyVolatility { #[pyclass(name = "CreateFunctionBody", module = "datafusion.expr", subclass)] #[derive(Clone)] pub struct PyCreateFunctionBody { - body: CreateFunctionBody + body: CreateFunctionBody, } #[pymethods] impl PyCreateFunctionBody { - - pub fn language(&self) -> Option { - self.body.language.as_ref().map(|language| language.to_string()) + self.body + .language + .as_ref() + .map(|language| language.to_string()) } - pub fn behavior(&self) -> Option { self.body.behavior.as_ref().map(|behavior| match behavior { Volatility::Immutable => PyVolatility::Immutable, @@ -88,13 +92,15 @@ impl PyCreateFunctionBody { } pub fn function_body(&self) -> Option { - self.body.function_body.as_ref().map(|function_body| function_body.clone().into()) + self.body + .function_body + .as_ref() + .map(|function_body| function_body.clone().into()) } } #[pymethods] impl PyCreateFunction { - #[new] #[pyo3(signature = (or_replace, temporary, name, params, schema, return_type=None, args=None))] pub fn new( @@ -105,7 +111,7 @@ impl PyCreateFunction { schema: PyDFSchema, return_type: Option, args: Option>, - ) -> Self { + ) -> Self { PyCreateFunction { create: CreateFunction { or_replace, @@ -115,45 +121,45 @@ impl PyCreateFunction { return_type: return_type.map(|return_type| return_type.data_type), params: params.body, schema: Arc::new(schema.into()), - } + }, } } - pub fn or_replace(&self) -> bool { self.create.or_replace } - pub fn temporary(&self) -> bool { self.create.temporary } - pub fn name(&self) -> String { self.create.name.clone() } - pub fn params(&self) -> PyCreateFunctionBody { PyCreateFunctionBody { - body: self.create.params.clone() + body: self.create.params.clone(), } } - pub fn schema(&self) -> PyDFSchema { (*self.create.schema).clone().into() } - pub fn return_type(&self) -> Option { - self.create.return_type.as_ref().map(|return_type| return_type.clone().into()) + self.create + .return_type + .as_ref() + .map(|return_type| return_type.clone().into()) } - pub fn args(&self) -> Option> { - self.create.args.as_ref().map(|args| args.iter().map(|arg| PyOperateFunctionArg { arg: arg.clone() }).collect()) + self.create.args.as_ref().map(|args| { + args.iter() + .map(|arg| PyOperateFunctionArg { arg: arg.clone() }) + .collect() + }) } fn __repr__(&self) -> PyResult { @@ -165,7 +171,6 @@ impl PyCreateFunction { } } - impl LogicalNode for PyCreateFunction { fn inputs(&self) -> Vec { vec![] diff --git a/src/expr/create_index.rs b/src/expr/create_index.rs index f2fae7558..64e486eed 100644 --- a/src/expr/create_index.rs +++ b/src/expr/create_index.rs @@ -15,7 +15,10 @@ // specific language governing permissions and limitations // under the License. -use std::{fmt::{self, Display, Formatter}, sync::Arc}; +use std::{ + fmt::{self, Display, Formatter}, + sync::Arc, +}; use datafusion::logical_expr::CreateIndex; use pyo3::prelude::*; @@ -69,42 +72,39 @@ impl PyCreateIndex { columns: columns.iter().map(|c| c.clone().into()).collect(), unique, if_not_exists, - schema: Arc::new(schema.into()) - } + schema: Arc::new(schema.into()), + }, }) } - pub fn name(&self) -> Option { self.create.name.clone() } - pub fn table(&self) -> PyResult { Ok(self.create.table.to_string()) } - pub fn using(&self) -> Option { self.create.using.clone() } - pub fn columns(&self) -> Vec { - self.create.columns.iter().map(|c| c.clone().into()).collect() + self.create + .columns + .iter() + .map(|c| c.clone().into()) + .collect() } - pub fn unique(&self) -> bool { self.create.unique } - pub fn if_not_exists(&self) -> bool { self.create.if_not_exists } - pub fn schema(&self) -> PyDFSchema { (*self.create.schema).clone().into() } @@ -126,5 +126,4 @@ impl LogicalNode for PyCreateIndex { fn to_variant(&self, py: Python) -> PyResult { Ok(self.clone().into_py(py)) } - } diff --git a/src/expr/describe_table.rs b/src/expr/describe_table.rs index 63e75fd82..c7155fdd9 100644 --- a/src/expr/describe_table.rs +++ b/src/expr/describe_table.rs @@ -1,4 +1,7 @@ -use std::{fmt::{self, Display, Formatter}, sync::Arc}; +use std::{ + fmt::{self, Display, Formatter}, + sync::Arc, +}; use arrow::{datatypes::Schema, pyarrow::PyArrowType}; use datafusion::logical_expr::DescribeTable; @@ -8,7 +11,6 @@ use crate::{common::df_schema::PyDFSchema, sql::logical::PyLogicalPlan}; use super::logical_node::LogicalNode; - #[pyclass(name = "DescribeTable", module = "datafusion.expr", subclass)] #[derive(Clone)] pub struct PyDescribeTable { @@ -26,8 +28,11 @@ impl PyDescribeTable { #[new] fn new(schema: PyArrowType, output_schema: PyDFSchema) -> Self { Self { - describe: DescribeTable { schema: Arc::new(schema.0), output_schema: Arc::new(output_schema.into())} - } + describe: DescribeTable { + schema: Arc::new(schema.0), + output_schema: Arc::new(output_schema.into()), + }, + } } pub fn schema(&self) -> PyArrowType { @@ -47,7 +52,6 @@ impl PyDescribeTable { } } - impl From for DescribeTable { fn from(describe: PyDescribeTable) -> Self { describe.describe diff --git a/src/expr/dml.rs b/src/expr/dml.rs index 9063c4e72..9b55b7673 100644 --- a/src/expr/dml.rs +++ b/src/expr/dml.rs @@ -6,11 +6,10 @@ use crate::{common::df_schema::PyDFSchema, sql::logical::PyLogicalPlan}; use super::logical_node::LogicalNode; - #[pyclass(name = "DmlStatement", module = "datafusion.expr", subclass)] #[derive(Clone)] pub struct PyDmlStatement { - dml: DmlStatement + dml: DmlStatement, } impl From for DmlStatement { @@ -26,7 +25,6 @@ impl From for PyDmlStatement { } impl LogicalNode for PyDmlStatement { - fn inputs(&self) -> Vec { vec![PyLogicalPlan::from((*self.dml.input).clone())] } @@ -38,21 +36,22 @@ impl LogicalNode for PyDmlStatement { #[pymethods] impl PyDmlStatement { - pub fn table_name(&self) -> PyResult { Ok(self.dml.table_name.to_string()) } - pub fn table_schema(&self) -> PyDFSchema { + pub fn table_schema(&self) -> PyDFSchema { (*self.dml.table_schema).clone().into() } - pub fn op(&self) -> PyWriteOp { + pub fn op(&self) -> PyWriteOp { self.dml.op.clone().into() } pub fn input(&self) -> PyLogicalPlan { - PyLogicalPlan{plan: self.dml.input.clone()} + PyLogicalPlan { + plan: self.dml.input.clone(), + } } pub fn output_schema(&self) -> PyDFSchema { @@ -68,7 +67,6 @@ impl PyDmlStatement { } } - #[derive(Debug, Clone, PartialEq, Eq, Hash, PartialOrd, Ord)] #[pyclass(eq, eq_int, name = "WriteOp", module = "datafusion.expr")] pub enum PyWriteOp { @@ -96,7 +94,6 @@ impl From for PyWriteOp { } impl From for WriteOp { - fn from(py: PyWriteOp) -> Self { match py { PyWriteOp::Append => WriteOp::Insert(InsertOp::Append), diff --git a/src/expr/drop_catalog_schema.rs b/src/expr/drop_catalog_schema.rs index ed2620c7c..2014ab913 100644 --- a/src/expr/drop_catalog_schema.rs +++ b/src/expr/drop_catalog_schema.rs @@ -1,4 +1,7 @@ -use std::{fmt::{self, Display, Formatter}, sync::Arc}; +use std::{ + fmt::{self, Display, Formatter}, + sync::Arc, +}; use datafusion::{common::SchemaReference, logical_expr::DropCatalogSchema, sql::TableReference}; use pyo3::{exceptions::PyValueError, prelude::*}; @@ -35,10 +38,17 @@ impl Display for PyDropCatalogSchema { fn parse_schema_reference(name: String) -> PyResult { match name.into() { TableReference::Bare { table } => Ok(SchemaReference::Bare { schema: table }), - TableReference::Partial { schema, table } => Ok(SchemaReference::Full { schema: table, catalog: schema }), - TableReference::Full { catalog: _, schema: _, table: _ } => { - Err(PyErr::new::("Invalid schema specifier (has 3 parts)".to_string())) - } + TableReference::Partial { schema, table } => Ok(SchemaReference::Full { + schema: table, + catalog: schema, + }), + TableReference::Full { + catalog: _, + schema: _, + table: _, + } => Err(PyErr::new::( + "Invalid schema specifier (has 3 parts)".to_string(), + )), } } @@ -47,12 +57,14 @@ impl PyDropCatalogSchema { #[new] fn new(name: String, schema: PyDFSchema, if_exists: bool, cascade: bool) -> PyResult { let name = parse_schema_reference(name)?; - Ok(PyDropCatalogSchema { drop: DropCatalogSchema { - name, - schema: Arc::new(schema.into()), - if_exists, - cascade - } }) + Ok(PyDropCatalogSchema { + drop: DropCatalogSchema { + name, + schema: Arc::new(schema.into()), + if_exists, + cascade, + }, + }) } fn name(&self) -> PyResult { @@ -74,10 +86,8 @@ impl PyDropCatalogSchema { fn __repr__(&self) -> PyResult { Ok(format!("DropCatalogSchema({})", self)) } - } - impl LogicalNode for PyDropCatalogSchema { fn inputs(&self) -> Vec { vec![] diff --git a/src/expr/drop_function.rs b/src/expr/drop_function.rs index d3fe0550d..99a6cf566 100644 --- a/src/expr/drop_function.rs +++ b/src/expr/drop_function.rs @@ -1,10 +1,13 @@ -use std::{fmt::{self, Display, Formatter}, sync::Arc}; +use std::{ + fmt::{self, Display, Formatter}, + sync::Arc, +}; use datafusion::logical_expr::DropFunction; use pyo3::prelude::*; -use crate::common::df_schema::PyDFSchema; use super::logical_node::LogicalNode; +use crate::common::df_schema::PyDFSchema; use crate::sql::logical::PyLogicalPlan; #[pyclass(name = "DropFunction", module = "datafusion.expr", subclass)] @@ -13,7 +16,6 @@ pub struct PyDropFunction { drop: DropFunction, } - impl From for DropFunction { fn from(drop: PyDropFunction) -> Self { drop.drop @@ -36,11 +38,13 @@ impl Display for PyDropFunction { impl PyDropFunction { #[new] fn new(name: String, schema: PyDFSchema, if_exists: bool) -> PyResult { - Ok(PyDropFunction { drop: DropFunction { - name: name.into(), - schema: Arc::new(schema.into()), - if_exists, - } }) + Ok(PyDropFunction { + drop: DropFunction { + name: name.into(), + schema: Arc::new(schema.into()), + if_exists, + }, + }) } fn name(&self) -> PyResult { Ok(self.drop.name.clone()) @@ -72,4 +76,3 @@ impl LogicalNode for PyDropFunction { Ok(self.clone().into_py(py)) } } - diff --git a/src/expr/drop_view.rs b/src/expr/drop_view.rs index 00e4625b5..78518c940 100644 --- a/src/expr/drop_view.rs +++ b/src/expr/drop_view.rs @@ -1,4 +1,7 @@ -use std::{fmt::{self, Display, Formatter}, sync::Arc}; +use std::{ + fmt::{self, Display, Formatter}, + sync::Arc, +}; use datafusion::logical_expr::DropView; use pyo3::prelude::*; @@ -14,7 +17,6 @@ pub struct PyDropView { drop: DropView, } - impl From for DropView { fn from(drop: PyDropView) -> Self { drop.drop @@ -29,7 +31,12 @@ impl From for PyDropView { impl Display for PyDropView { fn fmt(&self, f: &mut Formatter) -> fmt::Result { - write!(f, "DropView: {name:?} if not exist:={if_exists}", name = self.drop.name, if_exists = self.drop.if_exists) + write!( + f, + "DropView: {name:?} if not exist:={if_exists}", + name = self.drop.name, + if_exists = self.drop.if_exists + ) } } @@ -37,7 +44,13 @@ impl Display for PyDropView { impl PyDropView { #[new] fn new(name: String, schema: PyDFSchema, if_exists: bool) -> PyResult { - Ok(PyDropView { drop: DropView { name: name.into(), schema: Arc::new(schema.into()), if_exists } }) + Ok(PyDropView { + drop: DropView { + name: name.into(), + schema: Arc::new(schema.into()), + if_exists, + }, + }) } fn name(&self) -> PyResult { diff --git a/src/expr/recursive_query.rs b/src/expr/recursive_query.rs index f5435613d..667238df5 100644 --- a/src/expr/recursive_query.rs +++ b/src/expr/recursive_query.rs @@ -10,7 +10,7 @@ use super::logical_node::LogicalNode; #[pyclass(name = "RecursiveQuery", module = "datafusion.expr", subclass)] #[derive(Clone)] pub struct PyRecursiveQuery { - query: RecursiveQuery + query: RecursiveQuery, } impl From for RecursiveQuery { @@ -27,7 +27,12 @@ impl From for PyRecursiveQuery { impl Display for PyRecursiveQuery { fn fmt(&self, f: &mut Formatter) -> fmt::Result { - write!(f, "RecursiveQuery {name:?} is_distinct:={is_distinct}", name = self.query.name, is_distinct = self.query.is_distinct) + write!( + f, + "RecursiveQuery {name:?} is_distinct:={is_distinct}", + name = self.query.name, + is_distinct = self.query.is_distinct + ) } } @@ -38,15 +43,16 @@ impl PyRecursiveQuery { name: String, static_term: PyLogicalPlan, recursive_term: PyLogicalPlan, - is_distinct: bool) -> Self { - Self { - query: RecursiveQuery{ - name, - static_term: static_term.plan(), - recursive_term: recursive_term.plan(), - is_distinct - } - } + is_distinct: bool, + ) -> Self { + Self { + query: RecursiveQuery { + name, + static_term: static_term.plan(), + recursive_term: recursive_term.plan(), + is_distinct, + }, + } } fn name(&self) -> PyResult { @@ -78,7 +84,7 @@ impl LogicalNode for PyRecursiveQuery { fn inputs(&self) -> Vec { vec![ PyLogicalPlan::from((*self.query.static_term).clone()), - PyLogicalPlan::from((*self.query.recursive_term).clone()) + PyLogicalPlan::from((*self.query.recursive_term).clone()), ] } diff --git a/src/expr/statement.rs b/src/expr/statement.rs index 78fe73194..37a42e819 100644 --- a/src/expr/statement.rs +++ b/src/expr/statement.rs @@ -1,4 +1,7 @@ -use datafusion::logical_expr::{Deallocate, Execute, Prepare, SetVariable, TransactionAccessMode, TransactionConclusion, TransactionEnd, TransactionIsolationLevel, TransactionStart}; +use datafusion::logical_expr::{ + Deallocate, Execute, Prepare, SetVariable, TransactionAccessMode, TransactionConclusion, + TransactionEnd, TransactionIsolationLevel, TransactionStart, +}; use pyo3::prelude::*; use crate::{common::data_type::PyDataType, sql::logical::PyLogicalPlan}; @@ -8,7 +11,7 @@ use super::{logical_node::LogicalNode, PyExpr}; #[pyclass(name = "TransactionStart", module = "datafusion.expr", subclass)] #[derive(Clone)] pub struct PyTransactionStart { - transaction_start: TransactionStart + transaction_start: TransactionStart, } impl From for PyTransactionStart { @@ -63,7 +66,12 @@ impl TryFrom for TransactionAccessMode { } #[derive(Debug, Clone, PartialEq, Eq, Hash, PartialOrd, Ord)] -#[pyclass(eq, eq_int, name = "TransactionIsolationLevel", module = "datafusion.expr")] +#[pyclass( + eq, + eq_int, + name = "TransactionIsolationLevel", + module = "datafusion.expr" +)] pub enum PyTransactionIsolationLevel { ReadUncommitted, ReadCommitted, @@ -74,9 +82,13 @@ pub enum PyTransactionIsolationLevel { impl From for PyTransactionIsolationLevel { fn from(isolation_level: TransactionIsolationLevel) -> PyTransactionIsolationLevel { match isolation_level { - TransactionIsolationLevel::ReadUncommitted => PyTransactionIsolationLevel::ReadUncommitted, + TransactionIsolationLevel::ReadUncommitted => { + PyTransactionIsolationLevel::ReadUncommitted + } TransactionIsolationLevel::ReadCommitted => PyTransactionIsolationLevel::ReadCommitted, - TransactionIsolationLevel::RepeatableRead => PyTransactionIsolationLevel::RepeatableRead, + TransactionIsolationLevel::RepeatableRead => { + PyTransactionIsolationLevel::RepeatableRead + } TransactionIsolationLevel::Serializable => PyTransactionIsolationLevel::Serializable, } } @@ -87,25 +99,39 @@ impl TryFrom for TransactionIsolationLevel { fn try_from(value: PyTransactionIsolationLevel) -> Result { match value { - PyTransactionIsolationLevel::ReadUncommitted => Ok(TransactionIsolationLevel::ReadUncommitted), - PyTransactionIsolationLevel::ReadCommitted => Ok(TransactionIsolationLevel::ReadCommitted), - PyTransactionIsolationLevel::RepeatableRead => Ok(TransactionIsolationLevel::RepeatableRead), - PyTransactionIsolationLevel::Serializable => Ok(TransactionIsolationLevel::Serializable), + PyTransactionIsolationLevel::ReadUncommitted => { + Ok(TransactionIsolationLevel::ReadUncommitted) + } + PyTransactionIsolationLevel::ReadCommitted => { + Ok(TransactionIsolationLevel::ReadCommitted) + } + PyTransactionIsolationLevel::RepeatableRead => { + Ok(TransactionIsolationLevel::RepeatableRead) + } + PyTransactionIsolationLevel::Serializable => { + Ok(TransactionIsolationLevel::Serializable) + } } } } #[pymethods] impl PyTransactionStart { - #[new] - pub fn new(access_mode: PyTransactionAccessMode, isolation_level: PyTransactionIsolationLevel) -> PyResult { + pub fn new( + access_mode: PyTransactionAccessMode, + isolation_level: PyTransactionIsolationLevel, + ) -> PyResult { let access_mode = access_mode.try_into()?; let isolation_level = isolation_level.try_into()?; - Ok(PyTransactionStart { transaction_start: TransactionStart {access_mode, isolation_level} }) + Ok(PyTransactionStart { + transaction_start: TransactionStart { + access_mode, + isolation_level, + }, + }) } - pub fn access_mode(&self) -> PyResult { Ok(self.transaction_start.access_mode.clone().into()) } @@ -118,7 +144,7 @@ impl PyTransactionStart { #[pyclass(name = "TransactionEnd", module = "datafusion.expr", subclass)] #[derive(Clone)] pub struct PyTransactionEnd { - transaction_end: TransactionEnd + transaction_end: TransactionEnd, } impl From for PyTransactionEnd { @@ -176,7 +202,9 @@ impl PyTransactionEnd { #[new] pub fn new(conclusion: PyTransactionConclusion, chain: bool) -> PyResult { let conclusion = conclusion.try_into()?; - Ok(PyTransactionEnd { transaction_end: TransactionEnd {conclusion, chain} }) + Ok(PyTransactionEnd { + transaction_end: TransactionEnd { conclusion, chain }, + }) } pub fn conclusion(&self) -> PyResult { @@ -191,7 +219,7 @@ impl PyTransactionEnd { #[pyclass(name = "SetVariable", module = "datafusion.expr", subclass)] #[derive(Clone)] pub struct PySetVariable { - set_variable: SetVariable + set_variable: SetVariable, } impl From for PySetVariable { @@ -222,7 +250,9 @@ impl LogicalNode for PySetVariable { impl PySetVariable { #[new] pub fn new(variable: String, value: String) -> Self { - PySetVariable { set_variable: SetVariable{variable, value} } + PySetVariable { + set_variable: SetVariable { variable, value }, + } } pub fn variable(&self) -> String { @@ -232,13 +262,12 @@ impl PySetVariable { pub fn value(&self) -> String { self.set_variable.value.clone() } - } #[pyclass(name = "Prepare", module = "datafusion.expr", subclass)] #[derive(Clone)] pub struct PyPrepare { - prepare: Prepare + prepare: Prepare, } impl From for PyPrepare { @@ -270,8 +299,17 @@ impl PyPrepare { #[new] pub fn new(name: String, data_types: Vec, input: PyLogicalPlan) -> Self { let input = input.plan().clone(); - let data_types = data_types.into_iter().map(|data_type| data_type.try_into().unwrap()).collect(); - PyPrepare { prepare: Prepare {name, data_types, input} } + let data_types = data_types + .into_iter() + .map(|data_type| data_type.try_into().unwrap()) + .collect(); + PyPrepare { + prepare: Prepare { + name, + data_types, + input, + }, + } } pub fn name(&self) -> String { @@ -279,19 +317,25 @@ impl PyPrepare { } pub fn data_types(&self) -> Vec { - self.prepare.data_types.clone().into_iter().map(|t| t.into()).collect() + self.prepare + .data_types + .clone() + .into_iter() + .map(|t| t.into()) + .collect() } pub fn input(&self) -> PyLogicalPlan { - PyLogicalPlan{plan: self.prepare.input.clone()} + PyLogicalPlan { + plan: self.prepare.input.clone(), + } } } - #[pyclass(name = "Execute", module = "datafusion.expr", subclass)] #[derive(Clone)] pub struct PyExecute { - execute: Execute + execute: Execute, } impl From for PyExecute { @@ -322,8 +366,13 @@ impl LogicalNode for PyExecute { impl PyExecute { #[new] pub fn new(name: String, parameters: Vec) -> Self { - let parameters = parameters.into_iter().map(|parameter| parameter.try_into().unwrap()).collect(); - PyExecute { execute: Execute {name, parameters} } + let parameters = parameters + .into_iter() + .map(|parameter| parameter.try_into().unwrap()) + .collect(); + PyExecute { + execute: Execute { name, parameters }, + } } pub fn name(&self) -> String { @@ -331,14 +380,19 @@ impl PyExecute { } pub fn parameters(&self) -> Vec { - self.execute.parameters.clone().into_iter().map(|t| t.into()).collect() + self.execute + .parameters + .clone() + .into_iter() + .map(|t| t.into()) + .collect() } } #[pyclass(name = "Deallocate", module = "datafusion.expr", subclass)] #[derive(Clone)] pub struct PyDeallocate { - deallocate: Deallocate + deallocate: Deallocate, } impl From for PyDeallocate { @@ -369,7 +423,9 @@ impl LogicalNode for PyDeallocate { impl PyDeallocate { #[new] pub fn new(name: String) -> Self { - PyDeallocate { deallocate: Deallocate {name} } + PyDeallocate { + deallocate: Deallocate { name }, + } } pub fn name(&self) -> String { diff --git a/src/expr/values.rs b/src/expr/values.rs index eeabbe496..719a29127 100644 --- a/src/expr/values.rs +++ b/src/expr/values.rs @@ -1,8 +1,8 @@ use std::sync::Arc; use datafusion::logical_expr::Values; -use pyo3::{pyclass, Bound, IntoPyObjectExt, PyAny, PyErr, PyResult, Python}; use pyo3::prelude::*; +use pyo3::{pyclass, Bound, IntoPyObjectExt, PyAny, PyErr, PyResult, Python}; use crate::{common::df_schema::PyDFSchema, sql::logical::PyLogicalPlan}; @@ -11,7 +11,7 @@ use super::{logical_node::LogicalNode, PyExpr}; #[pyclass(name = "Values", module = "datafusion.expr", subclass)] #[derive(Clone)] pub struct PyValues { - values: Values + values: Values, } impl From for PyValues { @@ -40,11 +40,18 @@ impl LogicalNode for PyValues { #[pymethods] impl PyValues { - #[new] pub fn new(schema: PyDFSchema, values: Vec>) -> PyResult { - let values = values.into_iter().map(|row| row.into_iter().map(|expr| expr.into()).collect()).collect(); - Ok(PyValues { values: Values {schema: Arc::new(schema.into()), values} }) + let values = values + .into_iter() + .map(|row| row.into_iter().map(|expr| expr.into()).collect()) + .collect(); + Ok(PyValues { + values: Values { + schema: Arc::new(schema.into()), + values, + }, + }) } pub fn schema(&self) -> PyResult { @@ -52,7 +59,11 @@ impl PyValues { } pub fn values(&self) -> Vec> { - self.values.values.clone().into_iter().map(|row| row.into_iter().map(|expr| expr.into()).collect()).collect() + self.values + .values + .clone() + .into_iter() + .map(|row| row.into_iter().map(|expr| expr.into()).collect()) + .collect() } } - diff --git a/src/sql/logical.rs b/src/sql/logical.rs index 8bbb57d80..73d53434b 100644 --- a/src/sql/logical.rs +++ b/src/sql/logical.rs @@ -45,7 +45,9 @@ use crate::expr::projection::PyProjection; use crate::expr::recursive_query::PyRecursiveQuery; use crate::expr::repartition::PyRepartition; use crate::expr::sort::PySort; -use crate::expr::statement::{PyDeallocate, PyExecute, PyPrepare, PySetVariable, PyTransactionEnd, PyTransactionStart}; +use crate::expr::statement::{ + PyDeallocate, PyExecute, PyPrepare, PySetVariable, PyTransactionEnd, PyTransactionStart, +}; use crate::expr::subquery::PySubquery; use crate::expr::subquery_alias::PySubqueryAlias; use crate::expr::table_scan::PyTableScan; @@ -104,8 +106,12 @@ impl PyLogicalPlan { LogicalPlan::Repartition(plan) => PyRepartition::from(plan.clone()).to_variant(py), LogicalPlan::Union(plan) => PyUnion::from(plan.clone()).to_variant(py), LogicalPlan::Statement(plan) => match plan { - Statement::TransactionStart(plan) => PyTransactionStart::from(plan.clone()).to_variant(py), - Statement::TransactionEnd(plan) => PyTransactionEnd::from(plan.clone()).to_variant(py), + Statement::TransactionStart(plan) => { + PyTransactionStart::from(plan.clone()).to_variant(py) + } + Statement::TransactionEnd(plan) => { + PyTransactionEnd::from(plan.clone()).to_variant(py) + } Statement::SetVariable(plan) => PySetVariable::from(plan.clone()).to_variant(py), Statement::Prepare(plan) => PyPrepare::from(plan.clone()).to_variant(py), Statement::Execute(plan) => PyExecute::from(plan.clone()).to_variant(py), @@ -114,21 +120,37 @@ impl PyLogicalPlan { LogicalPlan::Values(plan) => PyValues::from(plan.clone()).to_variant(py), LogicalPlan::Dml(plan) => PyDmlStatement::from(plan.clone()).to_variant(py), LogicalPlan::Ddl(plan) => match plan { - DdlStatement::CreateExternalTable(plan) => PyCreateExternalTable::from(plan.clone()).to_variant(py), - DdlStatement::CreateMemoryTable(plan) => PyCreateMemoryTable::from(plan.clone()).to_variant(py), + DdlStatement::CreateExternalTable(plan) => { + PyCreateExternalTable::from(plan.clone()).to_variant(py) + } + DdlStatement::CreateMemoryTable(plan) => { + PyCreateMemoryTable::from(plan.clone()).to_variant(py) + } DdlStatement::CreateView(plan) => PyCreateView::from(plan.clone()).to_variant(py), - DdlStatement::CreateCatalogSchema(plan) => PyCreateCatalogSchema::from(plan.clone()).to_variant(py), - DdlStatement::CreateCatalog(plan) => PyCreateCatalog::from(plan.clone()).to_variant(py), + DdlStatement::CreateCatalogSchema(plan) => { + PyCreateCatalogSchema::from(plan.clone()).to_variant(py) + } + DdlStatement::CreateCatalog(plan) => { + PyCreateCatalog::from(plan.clone()).to_variant(py) + } DdlStatement::CreateIndex(plan) => PyCreateIndex::from(plan.clone()).to_variant(py), DdlStatement::DropTable(plan) => PyDropTable::from(plan.clone()).to_variant(py), DdlStatement::DropView(plan) => PyDropView::from(plan.clone()).to_variant(py), - DdlStatement::DropCatalogSchema(plan) => PyDropCatalogSchema::from(plan.clone()).to_variant(py), - DdlStatement::CreateFunction(plan) => PyCreateFunction::from(plan.clone()).to_variant(py), - DdlStatement::DropFunction(plan) => PyDropFunction::from(plan.clone()).to_variant(py), + DdlStatement::DropCatalogSchema(plan) => { + PyDropCatalogSchema::from(plan.clone()).to_variant(py) + } + DdlStatement::CreateFunction(plan) => { + PyCreateFunction::from(plan.clone()).to_variant(py) + } + DdlStatement::DropFunction(plan) => { + PyDropFunction::from(plan.clone()).to_variant(py) + } }, LogicalPlan::Copy(plan) => PyCopyTo::from(plan.clone()).to_variant(py), LogicalPlan::DescribeTable(plan) => PyDescribeTable::from(plan.clone()).to_variant(py), - LogicalPlan::RecursiveQuery(plan) => PyRecursiveQuery::from(plan.clone()).to_variant(py), + LogicalPlan::RecursiveQuery(plan) => { + PyRecursiveQuery::from(plan.clone()).to_variant(py) + } } } From 7bd654e430c4824b44385a00756ef6ee0a8ad52f Mon Sep 17 00:00:00 2001 From: Chongchen Chen Date: Wed, 26 Mar 2025 22:15:11 +0800 Subject: [PATCH 03/11] clippy --- src/expr/copy_to.rs | 4 ++-- src/expr/create_external_table.rs | 4 ++-- src/expr/create_function.rs | 2 +- src/expr/describe_table.rs | 2 +- src/expr/dml.rs | 2 +- src/expr/drop_catalog_schema.rs | 2 +- src/expr/drop_function.rs | 4 ++-- src/expr/drop_view.rs | 2 +- src/expr/recursive_query.rs | 2 +- src/expr/statement.rs | 16 ++++++++-------- src/expr/values.rs | 4 ++-- src/sql/logical.rs | 2 +- 12 files changed, 23 insertions(+), 23 deletions(-) diff --git a/src/expr/copy_to.rs b/src/expr/copy_to.rs index 2a0eb6253..7425eecbd 100644 --- a/src/expr/copy_to.rs +++ b/src/expr/copy_to.rs @@ -72,7 +72,7 @@ impl PyCopyTo { file_type: PyFileType, options: HashMap, ) -> Self { - return PyCopyTo { + PyCopyTo { copy: CopyTo { input: input.plan(), output_url, @@ -80,7 +80,7 @@ impl PyCopyTo { file_type: file_type.file_type, options, }, - }; + } } fn input(&self) -> PyLogicalPlan { diff --git a/src/expr/create_external_table.rs b/src/expr/create_external_table.rs index 659154972..6a23cf69a 100644 --- a/src/expr/create_external_table.rs +++ b/src/expr/create_external_table.rs @@ -97,7 +97,7 @@ impl PyCreateExternalTable { .map(|(k, v)| (k, v.into())) .collect(), }; - PyCreateExternalTable { create: create } + PyCreateExternalTable { create } } pub fn schema(&self) -> PyDFSchema { @@ -176,7 +176,7 @@ impl LogicalNode for PyCreateExternalTable { vec![] } - fn to_variant<'py>(&self, py: Python<'py>) -> PyResult { + fn to_variant(&self, py: Python<'_>) -> PyResult { Ok(self.clone().into_py(py)) } } diff --git a/src/expr/create_function.rs b/src/expr/create_function.rs index f86e5719c..ea72aa790 100644 --- a/src/expr/create_function.rs +++ b/src/expr/create_function.rs @@ -176,7 +176,7 @@ impl LogicalNode for PyCreateFunction { vec![] } - fn to_variant<'py>(&self, py: Python<'py>) -> PyResult { + fn to_variant(&self, py: Python<'_>) -> PyResult { Ok(self.clone().into_py(py)) } } diff --git a/src/expr/describe_table.rs b/src/expr/describe_table.rs index c7155fdd9..bb1351221 100644 --- a/src/expr/describe_table.rs +++ b/src/expr/describe_table.rs @@ -69,7 +69,7 @@ impl LogicalNode for PyDescribeTable { vec![] } - fn to_variant<'py>(&self, py: Python<'py>) -> PyResult { + fn to_variant(&self, py: Python<'_>) -> PyResult { Ok(self.clone().into_py(py)) } } diff --git a/src/expr/dml.rs b/src/expr/dml.rs index 9b55b7673..677dbcd73 100644 --- a/src/expr/dml.rs +++ b/src/expr/dml.rs @@ -29,7 +29,7 @@ impl LogicalNode for PyDmlStatement { vec![PyLogicalPlan::from((*self.dml.input).clone())] } - fn to_variant<'py>(&self, py: Python<'py>) -> PyResult { + fn to_variant(&self, py: Python<'_>) -> PyResult { Ok(self.clone().into_py(py)) } } diff --git a/src/expr/drop_catalog_schema.rs b/src/expr/drop_catalog_schema.rs index 2014ab913..4c5273fbc 100644 --- a/src/expr/drop_catalog_schema.rs +++ b/src/expr/drop_catalog_schema.rs @@ -93,7 +93,7 @@ impl LogicalNode for PyDropCatalogSchema { vec![] } - fn to_variant<'py>(&self, py: Python<'py>) -> PyResult { + fn to_variant(&self, py: Python<'_>) -> PyResult { Ok(self.clone().into_py(py)) } } diff --git a/src/expr/drop_function.rs b/src/expr/drop_function.rs index 99a6cf566..8d3a6ee23 100644 --- a/src/expr/drop_function.rs +++ b/src/expr/drop_function.rs @@ -40,7 +40,7 @@ impl PyDropFunction { fn new(name: String, schema: PyDFSchema, if_exists: bool) -> PyResult { Ok(PyDropFunction { drop: DropFunction { - name: name.into(), + name, schema: Arc::new(schema.into()), if_exists, }, @@ -72,7 +72,7 @@ impl LogicalNode for PyDropFunction { vec![] } - fn to_variant<'py>(&self, py: Python<'py>) -> PyResult { + fn to_variant(&self, py: Python<'_>) -> PyResult { Ok(self.clone().into_py(py)) } } diff --git a/src/expr/drop_view.rs b/src/expr/drop_view.rs index 78518c940..6764a4346 100644 --- a/src/expr/drop_view.rs +++ b/src/expr/drop_view.rs @@ -79,7 +79,7 @@ impl LogicalNode for PyDropView { vec![] } - fn to_variant<'py>(&self, py: Python<'py>) -> PyResult { + fn to_variant(&self, py: Python<'_>) -> PyResult { Ok(self.clone().into_py(py)) } } diff --git a/src/expr/recursive_query.rs b/src/expr/recursive_query.rs index 667238df5..d09c27c31 100644 --- a/src/expr/recursive_query.rs +++ b/src/expr/recursive_query.rs @@ -88,7 +88,7 @@ impl LogicalNode for PyRecursiveQuery { ] } - fn to_variant<'py>(&self, py: Python<'py>) -> PyResult { + fn to_variant(&self, py: Python<'_>) -> PyResult { Ok(self.clone().into_py(py)) } } diff --git a/src/expr/statement.rs b/src/expr/statement.rs index 37a42e819..568e13ef7 100644 --- a/src/expr/statement.rs +++ b/src/expr/statement.rs @@ -33,7 +33,7 @@ impl LogicalNode for PyTransactionStart { vec![] } - fn to_variant<'py>(&self, py: Python<'py>) -> PyResult { + fn to_variant(&self, py: Python<'_>) -> PyResult { Ok(self.clone().into_py(py)) } } @@ -166,7 +166,7 @@ impl LogicalNode for PyTransactionEnd { vec![] } - fn to_variant<'py>(&self, py: Python<'py>) -> PyResult { + fn to_variant(&self, py: Python<'_>) -> PyResult { Ok(self.clone().into_py(py)) } } @@ -241,7 +241,7 @@ impl LogicalNode for PySetVariable { vec![] } - fn to_variant<'py>(&self, py: Python<'py>) -> PyResult { + fn to_variant(&self, py: Python<'_>) -> PyResult { Ok(self.clone().into_py(py)) } } @@ -289,7 +289,7 @@ impl LogicalNode for PyPrepare { vec![PyLogicalPlan::from((*self.prepare.input).clone())] } - fn to_variant<'py>(&self, py: Python<'py>) -> PyResult { + fn to_variant(&self, py: Python<'_>) -> PyResult { Ok(self.clone().into_py(py)) } } @@ -301,7 +301,7 @@ impl PyPrepare { let input = input.plan().clone(); let data_types = data_types .into_iter() - .map(|data_type| data_type.try_into().unwrap()) + .map(|data_type| data_type.into()) .collect(); PyPrepare { prepare: Prepare { @@ -357,7 +357,7 @@ impl LogicalNode for PyExecute { vec![] } - fn to_variant<'py>(&self, py: Python<'py>) -> PyResult { + fn to_variant(&self, py: Python<'_>) -> PyResult { Ok(self.clone().into_py(py)) } } @@ -368,7 +368,7 @@ impl PyExecute { pub fn new(name: String, parameters: Vec) -> Self { let parameters = parameters .into_iter() - .map(|parameter| parameter.try_into().unwrap()) + .map(|parameter| parameter.into()) .collect(); PyExecute { execute: Execute { name, parameters }, @@ -414,7 +414,7 @@ impl LogicalNode for PyDeallocate { vec![] } - fn to_variant<'py>(&self, py: Python<'py>) -> PyResult { + fn to_variant(&self, py: Python<'_>) -> PyResult { Ok(self.clone().into_py(py)) } } diff --git a/src/expr/values.rs b/src/expr/values.rs index 719a29127..ecdde4479 100644 --- a/src/expr/values.rs +++ b/src/expr/values.rs @@ -2,7 +2,7 @@ use std::sync::Arc; use datafusion::logical_expr::Values; use pyo3::prelude::*; -use pyo3::{pyclass, Bound, IntoPyObjectExt, PyAny, PyErr, PyResult, Python}; +use pyo3::{pyclass, PyErr, PyResult, Python}; use crate::{common::df_schema::PyDFSchema, sql::logical::PyLogicalPlan}; @@ -33,7 +33,7 @@ impl LogicalNode for PyValues { vec![] } - fn to_variant<'py>(&self, py: Python<'py>) -> PyResult { + fn to_variant(&self, py: Python<'_>) -> PyResult { Ok(self.clone().into_py(py)) } } diff --git a/src/sql/logical.rs b/src/sql/logical.rs index 73d53434b..f2eb36a07 100644 --- a/src/sql/logical.rs +++ b/src/sql/logical.rs @@ -55,7 +55,7 @@ use crate::expr::union::PyUnion; use crate::expr::unnest::PyUnnest; use crate::expr::values::PyValues; use crate::expr::window::PyWindowExpr; -use crate::{context::PySessionContext, errors::py_unsupported_variant_err}; +use crate::context::PySessionContext; use datafusion::logical_expr::{DdlStatement, LogicalPlan, Statement}; use datafusion_proto::logical_plan::{AsLogicalPlan, DefaultLogicalExtensionCodec}; use prost::Message; From 4974c788244aaa2494ddc58ec78cc616057368a1 Mon Sep 17 00:00:00 2001 From: Chongchen Chen Date: Wed, 26 Mar 2025 22:25:09 +0800 Subject: [PATCH 04/11] add license --- src/expr/constraints.rs | 17 +++++++++++++++++ src/expr/describe_table.rs | 17 +++++++++++++++++ src/expr/dml.rs | 17 +++++++++++++++++ src/expr/drop_catalog_schema.rs | 17 +++++++++++++++++ src/expr/drop_function.rs | 17 +++++++++++++++++ src/expr/drop_view.rs | 17 +++++++++++++++++ src/expr/recursive_query.rs | 17 +++++++++++++++++ src/expr/statement.rs | 17 +++++++++++++++++ src/expr/values.rs | 17 +++++++++++++++++ src/sql/logical.rs | 2 +- 10 files changed, 154 insertions(+), 1 deletion(-) diff --git a/src/expr/constraints.rs b/src/expr/constraints.rs index d2480bac4..ee76310b1 100644 --- a/src/expr/constraints.rs +++ b/src/expr/constraints.rs @@ -1,3 +1,20 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + use std::fmt::{self, Display, Formatter}; use datafusion::common::Constraints; diff --git a/src/expr/describe_table.rs b/src/expr/describe_table.rs index bb1351221..6c43679a5 100644 --- a/src/expr/describe_table.rs +++ b/src/expr/describe_table.rs @@ -1,3 +1,20 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + use std::{ fmt::{self, Display, Formatter}, sync::Arc, diff --git a/src/expr/dml.rs b/src/expr/dml.rs index 677dbcd73..2c5083d00 100644 --- a/src/expr/dml.rs +++ b/src/expr/dml.rs @@ -1,3 +1,20 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + use datafusion::logical_expr::dml::InsertOp; use datafusion::logical_expr::{DmlStatement, WriteOp}; use pyo3::prelude::*; diff --git a/src/expr/drop_catalog_schema.rs b/src/expr/drop_catalog_schema.rs index 4c5273fbc..6b33b17ce 100644 --- a/src/expr/drop_catalog_schema.rs +++ b/src/expr/drop_catalog_schema.rs @@ -1,3 +1,20 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + use std::{ fmt::{self, Display, Formatter}, sync::Arc, diff --git a/src/expr/drop_function.rs b/src/expr/drop_function.rs index 8d3a6ee23..4613fe064 100644 --- a/src/expr/drop_function.rs +++ b/src/expr/drop_function.rs @@ -1,3 +1,20 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + use std::{ fmt::{self, Display, Formatter}, sync::Arc, diff --git a/src/expr/drop_view.rs b/src/expr/drop_view.rs index 6764a4346..097f3f667 100644 --- a/src/expr/drop_view.rs +++ b/src/expr/drop_view.rs @@ -1,3 +1,20 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + use std::{ fmt::{self, Display, Formatter}, sync::Arc, diff --git a/src/expr/recursive_query.rs b/src/expr/recursive_query.rs index d09c27c31..0e8c0abd3 100644 --- a/src/expr/recursive_query.rs +++ b/src/expr/recursive_query.rs @@ -1,3 +1,20 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + use std::fmt::{self, Display, Formatter}; use datafusion::logical_expr::RecursiveQuery; diff --git a/src/expr/statement.rs b/src/expr/statement.rs index 568e13ef7..f8ee91951 100644 --- a/src/expr/statement.rs +++ b/src/expr/statement.rs @@ -1,3 +1,20 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + use datafusion::logical_expr::{ Deallocate, Execute, Prepare, SetVariable, TransactionAccessMode, TransactionConclusion, TransactionEnd, TransactionIsolationLevel, TransactionStart, diff --git a/src/expr/values.rs b/src/expr/values.rs index ecdde4479..33e4794d3 100644 --- a/src/expr/values.rs +++ b/src/expr/values.rs @@ -1,3 +1,20 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + use std::sync::Arc; use datafusion::logical_expr::Values; diff --git a/src/sql/logical.rs b/src/sql/logical.rs index f2eb36a07..b9064ba35 100644 --- a/src/sql/logical.rs +++ b/src/sql/logical.rs @@ -17,6 +17,7 @@ use std::sync::Arc; +use crate::context::PySessionContext; use crate::errors::PyDataFusionResult; use crate::expr::aggregate::PyAggregate; use crate::expr::analyze::PyAnalyze; @@ -55,7 +56,6 @@ use crate::expr::union::PyUnion; use crate::expr::unnest::PyUnnest; use crate::expr::values::PyValues; use crate::expr::window::PyWindowExpr; -use crate::context::PySessionContext; use datafusion::logical_expr::{DdlStatement, LogicalPlan, Statement}; use datafusion_proto::logical_plan::{AsLogicalPlan, DefaultLogicalExtensionCodec}; use prost::Message; From ba4696ffb93d44faf806d319a69d0c429da3065a Mon Sep 17 00:00:00 2001 From: Chongchen Chen Date: Thu, 27 Mar 2025 08:43:51 +0800 Subject: [PATCH 05/11] update --- python/datafusion/common.py | 6 +++ src/common.rs | 3 ++ src/common/schema.rs | 89 +++++++++++++++++++++++++++++++ src/expr.rs | 2 - src/expr/constraints.rs | 45 ---------------- src/expr/copy_to.rs | 6 +-- src/expr/create_catalog.rs | 6 +-- src/expr/create_catalog_schema.rs | 6 +-- src/expr/create_external_table.rs | 11 ++-- src/expr/create_function.rs | 6 +-- src/expr/create_index.rs | 6 +-- src/expr/describe_table.rs | 6 +-- src/expr/dml.rs | 13 +++-- src/expr/drop_catalog_schema.rs | 6 +-- src/expr/drop_function.rs | 6 +-- src/expr/drop_view.rs | 6 +-- src/expr/recursive_query.rs | 6 +-- src/expr/statement.rs | 29 +++++----- src/expr/values.rs | 6 +-- 19 files changed, 161 insertions(+), 103 deletions(-) delete mode 100644 src/expr/constraints.rs diff --git a/python/datafusion/common.py b/python/datafusion/common.py index e762a993b..7f18cba5a 100644 --- a/python/datafusion/common.py +++ b/python/datafusion/common.py @@ -33,6 +33,9 @@ SqlTable = common_internal.SqlTable SqlType = common_internal.SqlType SqlView = common_internal.SqlView +TableType = common_internal.TableType +TableSource = common_internal.TableSource +Constraints = common_internal.Constraints __all__ = [ "DFSchema", @@ -47,6 +50,9 @@ "SqlTable", "SqlType", "SqlView", + "TableType", + "TableSource", + "Constraints", ] diff --git a/src/common.rs b/src/common.rs index 453bf67a4..88d2fdd5f 100644 --- a/src/common.rs +++ b/src/common.rs @@ -36,5 +36,8 @@ pub(crate) fn init_module(m: &Bound<'_, PyModule>) -> PyResult<()> { m.add_class::()?; m.add_class::()?; m.add_class::()?; + m.add_class::()?; + m.add_class::()?; + m.add_class::()?; Ok(()) } diff --git a/src/common/schema.rs b/src/common/schema.rs index 66ce925ae..5a54fe333 100644 --- a/src/common/schema.rs +++ b/src/common/schema.rs @@ -15,14 +15,22 @@ // specific language governing permissions and limitations // under the License. +use std::fmt::{self, Display, Formatter}; +use std::sync::Arc; use std::{any::Any, borrow::Cow}; +use arrow::datatypes::Schema; +use arrow::pyarrow::PyArrowType; use datafusion::arrow::datatypes::SchemaRef; +use datafusion::common::Constraints; +use datafusion::datasource::TableType; use datafusion::logical_expr::{Expr, TableProviderFilterPushDown, TableSource}; use pyo3::prelude::*; use datafusion::logical_expr::utils::split_conjunction; +use crate::sql::logical::PyLogicalPlan; + use super::{data_type::DataTypeMap, function::SqlFunction}; #[pyclass(name = "SqlSchema", module = "datafusion.common", subclass)] @@ -218,3 +226,84 @@ impl SqlStatistics { self.row_count } } + +#[pyclass(name = "Constraints", module = "datafusion.expr", subclass)] +#[derive(Clone)] +pub struct PyConstraints { + pub constraints: Constraints, +} + +impl From for Constraints { + fn from(constraints: PyConstraints) -> Self { + constraints.constraints + } +} + +impl From for PyConstraints { + fn from(constraints: Constraints) -> Self { + PyConstraints { constraints } + } +} + +impl Display for PyConstraints { + fn fmt(&self, f: &mut Formatter) -> fmt::Result { + write!(f, "Constraints: {:?}", self.constraints) + } +} + +#[derive(Debug, Clone, PartialEq, Eq, Hash, PartialOrd, Ord)] +#[pyclass(eq, eq_int, name = "TableType", module = "datafusion.common")] +pub enum PyTableType { + Base, + View, + Temporary, +} + +impl From for datafusion::logical_expr::TableType { + fn from(table_type: PyTableType) -> Self { + match table_type { + PyTableType::Base => datafusion::logical_expr::TableType::Base, + PyTableType::View => datafusion::logical_expr::TableType::View, + PyTableType::Temporary => datafusion::logical_expr::TableType::Temporary, + } + } +} + +impl From for PyTableType { + fn from(table_type: TableType) -> Self { + match table_type { + datafusion::logical_expr::TableType::Base => PyTableType::Base, + datafusion::logical_expr::TableType::View => PyTableType::View, + datafusion::logical_expr::TableType::Temporary => PyTableType::Temporary, + } + } +} + +#[pyclass(name = "TableSource", module = "datafusion.common", subclass)] +#[derive(Clone)] +pub struct PyTableSource { + pub table_source: Arc, +} + +#[pymethods] +impl PyTableSource { + pub fn schema(&self) -> PyArrowType { + (*self.table_source.schema()).clone().into() + } + + pub fn constraints(&self) -> Option { + self.table_source.constraints().map(|c| PyConstraints { + constraints: c.clone(), + }) + } + + pub fn table_type(&self) -> PyTableType { + self.table_source.table_type().into() + } + + pub fn get_logical_plan(&self) -> Option { + self.table_source + .get_logical_plan() + .map(|plan| PyLogicalPlan::new(plan.into_owned())) + } +} diff --git a/src/expr.rs b/src/expr.rs index f332cd22a..d1312fb06 100644 --- a/src/expr.rs +++ b/src/expr.rs @@ -66,7 +66,6 @@ pub mod case; pub mod cast; pub mod column; pub mod conditional_expr; -pub mod constraints; pub mod copy_to; pub mod create_catalog; pub mod create_catalog_schema; @@ -810,7 +809,6 @@ pub(crate) fn init_module(m: &Bound<'_, PyModule>) -> PyResult<()> { m.add_class::()?; m.add_class::()?; m.add_class::()?; - m.add_class::()?; m.add_class::()?; m.add_class::()?; m.add_class::()?; diff --git a/src/expr/constraints.rs b/src/expr/constraints.rs deleted file mode 100644 index ee76310b1..000000000 --- a/src/expr/constraints.rs +++ /dev/null @@ -1,45 +0,0 @@ -// Licensed to the Apache Software Foundation (ASF) under one -// or more contributor license agreements. See the NOTICE file -// distributed with this work for additional information -// regarding copyright ownership. The ASF licenses this file -// to you under the Apache License, Version 2.0 (the -// "License"); you may not use this file except in compliance -// with the License. You may obtain a copy of the License at -// -// http://www.apache.org/licenses/LICENSE-2.0 -// -// Unless required by applicable law or agreed to in writing, -// software distributed under the License is distributed on an -// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY -// KIND, either express or implied. See the License for the -// specific language governing permissions and limitations -// under the License. - -use std::fmt::{self, Display, Formatter}; - -use datafusion::common::Constraints; -use pyo3::prelude::*; - -#[pyclass(name = "Constraints", module = "datafusion.expr", subclass)] -#[derive(Clone)] -pub struct PyConstraints { - pub constraints: Constraints, -} - -impl From for Constraints { - fn from(constraints: PyConstraints) -> Self { - constraints.constraints - } -} - -impl From for PyConstraints { - fn from(constraints: Constraints) -> Self { - PyConstraints { constraints } - } -} - -impl Display for PyConstraints { - fn fmt(&self, f: &mut Formatter) -> fmt::Result { - write!(f, "Constraints: {:?}", self.constraints) - } -} diff --git a/src/expr/copy_to.rs b/src/expr/copy_to.rs index 7425eecbd..ebfcb8ebc 100644 --- a/src/expr/copy_to.rs +++ b/src/expr/copy_to.rs @@ -22,7 +22,7 @@ use std::{ }; use datafusion::{common::file_options::file_type::FileType, logical_expr::dml::CopyTo}; -use pyo3::prelude::*; +use pyo3::{prelude::*, IntoPyObjectExt}; use crate::sql::logical::PyLogicalPlan; @@ -57,8 +57,8 @@ impl LogicalNode for PyCopyTo { vec![PyLogicalPlan::from((*self.copy.input).clone())] } - fn to_variant(&self, py: Python) -> PyResult { - Ok(self.clone().into_py(py)) + fn to_variant<'py>(&self, py: Python<'py>) -> PyResult> { + self.clone().into_bound_py_any(py) } } diff --git a/src/expr/create_catalog.rs b/src/expr/create_catalog.rs index cc53283d4..f4ea0f517 100644 --- a/src/expr/create_catalog.rs +++ b/src/expr/create_catalog.rs @@ -21,7 +21,7 @@ use std::{ }; use datafusion::logical_expr::CreateCatalog; -use pyo3::prelude::*; +use pyo3::{prelude::*, IntoPyObjectExt}; use crate::{common::df_schema::PyDFSchema, sql::logical::PyLogicalPlan}; @@ -94,7 +94,7 @@ impl LogicalNode for PyCreateCatalog { vec![] } - fn to_variant(&self, py: Python) -> PyResult { - Ok(self.clone().into_py(py)) + fn to_variant<'py>(&self, py: Python<'py>) -> PyResult> { + self.clone().into_bound_py_any(py) } } diff --git a/src/expr/create_catalog_schema.rs b/src/expr/create_catalog_schema.rs index 8bed75553..85f447e1e 100644 --- a/src/expr/create_catalog_schema.rs +++ b/src/expr/create_catalog_schema.rs @@ -21,7 +21,7 @@ use std::{ }; use datafusion::logical_expr::CreateCatalogSchema; -use pyo3::prelude::*; +use pyo3::{prelude::*, IntoPyObjectExt}; use crate::{common::df_schema::PyDFSchema, sql::logical::PyLogicalPlan}; @@ -94,7 +94,7 @@ impl LogicalNode for PyCreateCatalogSchema { vec![] } - fn to_variant(&self, py: Python) -> PyResult { - Ok(self.clone().into_py(py)) + fn to_variant<'py>(&self, py: Python<'py>) -> PyResult> { + self.clone().into_bound_py_any(py) } } diff --git a/src/expr/create_external_table.rs b/src/expr/create_external_table.rs index 6a23cf69a..01ce7d0ca 100644 --- a/src/expr/create_external_table.rs +++ b/src/expr/create_external_table.rs @@ -15,7 +15,7 @@ // specific language governing permissions and limitations // under the License. -use crate::{expr::PyExpr, sql::logical::PyLogicalPlan}; +use crate::{common::schema::PyConstraints, expr::PyExpr, sql::logical::PyLogicalPlan}; use std::{ collections::HashMap, fmt::{self, Display, Formatter}, @@ -23,11 +23,11 @@ use std::{ }; use datafusion::logical_expr::CreateExternalTable; -use pyo3::prelude::*; +use pyo3::{prelude::*, IntoPyObjectExt}; use crate::common::df_schema::PyDFSchema; -use super::{constraints::PyConstraints, logical_node::LogicalNode, sort_expr::PySortExpr}; +use super::{logical_node::LogicalNode, sort_expr::PySortExpr}; #[pyclass(name = "CreateExternalTable", module = "datafusion.expr", subclass)] #[derive(Clone)] @@ -59,6 +59,7 @@ impl Display for PyCreateExternalTable { #[pymethods] impl PyCreateExternalTable { + #[allow(clippy::too_many_arguments)] #[new] #[pyo3(signature = (schema, name, location, file_type, table_partition_cols, if_not_exists, temporary, order_exprs, unbounded, options, constraints, column_defaults, definition=None))] pub fn new( @@ -176,7 +177,7 @@ impl LogicalNode for PyCreateExternalTable { vec![] } - fn to_variant(&self, py: Python<'_>) -> PyResult { - Ok(self.clone().into_py(py)) + fn to_variant<'py>(&self, py: Python<'py>) -> PyResult> { + self.clone().into_bound_py_any(py) } } diff --git a/src/expr/create_function.rs b/src/expr/create_function.rs index ea72aa790..6f3c3f0ff 100644 --- a/src/expr/create_function.rs +++ b/src/expr/create_function.rs @@ -23,7 +23,7 @@ use std::{ use datafusion::logical_expr::{ CreateFunction, CreateFunctionBody, OperateFunctionArg, Volatility, }; -use pyo3::prelude::*; +use pyo3::{prelude::*, IntoPyObjectExt}; use super::logical_node::LogicalNode; use super::PyExpr; @@ -176,7 +176,7 @@ impl LogicalNode for PyCreateFunction { vec![] } - fn to_variant(&self, py: Python<'_>) -> PyResult { - Ok(self.clone().into_py(py)) + fn to_variant<'py>(&self, py: Python<'py>) -> PyResult> { + self.clone().into_bound_py_any(py) } } diff --git a/src/expr/create_index.rs b/src/expr/create_index.rs index 64e486eed..13dadbc3f 100644 --- a/src/expr/create_index.rs +++ b/src/expr/create_index.rs @@ -21,7 +21,7 @@ use std::{ }; use datafusion::logical_expr::CreateIndex; -use pyo3::prelude::*; +use pyo3::{prelude::*, IntoPyObjectExt}; use crate::{common::df_schema::PyDFSchema, sql::logical::PyLogicalPlan}; @@ -123,7 +123,7 @@ impl LogicalNode for PyCreateIndex { vec![] } - fn to_variant(&self, py: Python) -> PyResult { - Ok(self.clone().into_py(py)) + fn to_variant<'py>(&self, py: Python<'py>) -> PyResult> { + self.clone().into_bound_py_any(py) } } diff --git a/src/expr/describe_table.rs b/src/expr/describe_table.rs index 6c43679a5..5658a13f2 100644 --- a/src/expr/describe_table.rs +++ b/src/expr/describe_table.rs @@ -22,7 +22,7 @@ use std::{ use arrow::{datatypes::Schema, pyarrow::PyArrowType}; use datafusion::logical_expr::DescribeTable; -use pyo3::prelude::*; +use pyo3::{prelude::*, IntoPyObjectExt}; use crate::{common::df_schema::PyDFSchema, sql::logical::PyLogicalPlan}; @@ -86,7 +86,7 @@ impl LogicalNode for PyDescribeTable { vec![] } - fn to_variant(&self, py: Python<'_>) -> PyResult { - Ok(self.clone().into_py(py)) + fn to_variant<'py>(&self, py: Python<'py>) -> PyResult> { + self.clone().into_bound_py_any(py) } } diff --git a/src/expr/dml.rs b/src/expr/dml.rs index 2c5083d00..251e336cc 100644 --- a/src/expr/dml.rs +++ b/src/expr/dml.rs @@ -17,8 +17,9 @@ use datafusion::logical_expr::dml::InsertOp; use datafusion::logical_expr::{DmlStatement, WriteOp}; -use pyo3::prelude::*; +use pyo3::{prelude::*, IntoPyObjectExt}; +use crate::common::schema::PyTableSource; use crate::{common::df_schema::PyDFSchema, sql::logical::PyLogicalPlan}; use super::logical_node::LogicalNode; @@ -46,8 +47,8 @@ impl LogicalNode for PyDmlStatement { vec![PyLogicalPlan::from((*self.dml.input).clone())] } - fn to_variant(&self, py: Python<'_>) -> PyResult { - Ok(self.clone().into_py(py)) + fn to_variant<'py>(&self, py: Python<'py>) -> PyResult> { + self.clone().into_bound_py_any(py) } } @@ -57,8 +58,10 @@ impl PyDmlStatement { Ok(self.dml.table_name.to_string()) } - pub fn table_schema(&self) -> PyDFSchema { - (*self.dml.table_schema).clone().into() + pub fn target(&self) -> PyResult { + Ok(PyTableSource { + table_source: self.dml.target.clone(), + }) } pub fn op(&self) -> PyWriteOp { diff --git a/src/expr/drop_catalog_schema.rs b/src/expr/drop_catalog_schema.rs index 6b33b17ce..b7420a99c 100644 --- a/src/expr/drop_catalog_schema.rs +++ b/src/expr/drop_catalog_schema.rs @@ -21,7 +21,7 @@ use std::{ }; use datafusion::{common::SchemaReference, logical_expr::DropCatalogSchema, sql::TableReference}; -use pyo3::{exceptions::PyValueError, prelude::*}; +use pyo3::{exceptions::PyValueError, prelude::*, IntoPyObjectExt}; use crate::common::df_schema::PyDFSchema; @@ -110,7 +110,7 @@ impl LogicalNode for PyDropCatalogSchema { vec![] } - fn to_variant(&self, py: Python<'_>) -> PyResult { - Ok(self.clone().into_py(py)) + fn to_variant<'py>(&self, py: Python<'py>) -> PyResult> { + self.clone().into_bound_py_any(py) } } diff --git a/src/expr/drop_function.rs b/src/expr/drop_function.rs index 4613fe064..9fbd78fdc 100644 --- a/src/expr/drop_function.rs +++ b/src/expr/drop_function.rs @@ -21,7 +21,7 @@ use std::{ }; use datafusion::logical_expr::DropFunction; -use pyo3::prelude::*; +use pyo3::{prelude::*, IntoPyObjectExt}; use super::logical_node::LogicalNode; use crate::common::df_schema::PyDFSchema; @@ -89,7 +89,7 @@ impl LogicalNode for PyDropFunction { vec![] } - fn to_variant(&self, py: Python<'_>) -> PyResult { - Ok(self.clone().into_py(py)) + fn to_variant<'py>(&self, py: Python<'py>) -> PyResult> { + self.clone().into_bound_py_any(py) } } diff --git a/src/expr/drop_view.rs b/src/expr/drop_view.rs index 097f3f667..1d1ab1e59 100644 --- a/src/expr/drop_view.rs +++ b/src/expr/drop_view.rs @@ -21,7 +21,7 @@ use std::{ }; use datafusion::logical_expr::DropView; -use pyo3::prelude::*; +use pyo3::{prelude::*, IntoPyObjectExt}; use crate::common::df_schema::PyDFSchema; @@ -96,7 +96,7 @@ impl LogicalNode for PyDropView { vec![] } - fn to_variant(&self, py: Python<'_>) -> PyResult { - Ok(self.clone().into_py(py)) + fn to_variant<'py>(&self, py: Python<'py>) -> PyResult> { + self.clone().into_bound_py_any(py) } } diff --git a/src/expr/recursive_query.rs b/src/expr/recursive_query.rs index 0e8c0abd3..65181f7d3 100644 --- a/src/expr/recursive_query.rs +++ b/src/expr/recursive_query.rs @@ -18,7 +18,7 @@ use std::fmt::{self, Display, Formatter}; use datafusion::logical_expr::RecursiveQuery; -use pyo3::prelude::*; +use pyo3::{prelude::*, IntoPyObjectExt}; use crate::sql::logical::PyLogicalPlan; @@ -105,7 +105,7 @@ impl LogicalNode for PyRecursiveQuery { ] } - fn to_variant(&self, py: Python<'_>) -> PyResult { - Ok(self.clone().into_py(py)) + fn to_variant<'py>(&self, py: Python<'py>) -> PyResult> { + self.clone().into_bound_py_any(py) } } diff --git a/src/expr/statement.rs b/src/expr/statement.rs index f8ee91951..83774cda1 100644 --- a/src/expr/statement.rs +++ b/src/expr/statement.rs @@ -19,7 +19,7 @@ use datafusion::logical_expr::{ Deallocate, Execute, Prepare, SetVariable, TransactionAccessMode, TransactionConclusion, TransactionEnd, TransactionIsolationLevel, TransactionStart, }; -use pyo3::prelude::*; +use pyo3::{prelude::*, IntoPyObjectExt}; use crate::{common::data_type::PyDataType, sql::logical::PyLogicalPlan}; @@ -50,8 +50,8 @@ impl LogicalNode for PyTransactionStart { vec![] } - fn to_variant(&self, py: Python<'_>) -> PyResult { - Ok(self.clone().into_py(py)) + fn to_variant<'py>(&self, py: Python<'py>) -> PyResult> { + self.clone().into_bound_py_any(py) } } @@ -94,6 +94,7 @@ pub enum PyTransactionIsolationLevel { ReadCommitted, RepeatableRead, Serializable, + Snapshot, } impl From for PyTransactionIsolationLevel { @@ -107,6 +108,7 @@ impl From for PyTransactionIsolationLevel { PyTransactionIsolationLevel::RepeatableRead } TransactionIsolationLevel::Serializable => PyTransactionIsolationLevel::Serializable, + TransactionIsolationLevel::Snapshot => PyTransactionIsolationLevel::Snapshot, } } } @@ -128,6 +130,7 @@ impl TryFrom for TransactionIsolationLevel { PyTransactionIsolationLevel::Serializable => { Ok(TransactionIsolationLevel::Serializable) } + PyTransactionIsolationLevel::Snapshot => Ok(TransactionIsolationLevel::Snapshot), } } } @@ -183,8 +186,8 @@ impl LogicalNode for PyTransactionEnd { vec![] } - fn to_variant(&self, py: Python<'_>) -> PyResult { - Ok(self.clone().into_py(py)) + fn to_variant<'py>(&self, py: Python<'py>) -> PyResult> { + self.clone().into_bound_py_any(py) } } @@ -258,8 +261,8 @@ impl LogicalNode for PySetVariable { vec![] } - fn to_variant(&self, py: Python<'_>) -> PyResult { - Ok(self.clone().into_py(py)) + fn to_variant<'py>(&self, py: Python<'py>) -> PyResult> { + self.clone().into_bound_py_any(py) } } @@ -306,8 +309,8 @@ impl LogicalNode for PyPrepare { vec![PyLogicalPlan::from((*self.prepare.input).clone())] } - fn to_variant(&self, py: Python<'_>) -> PyResult { - Ok(self.clone().into_py(py)) + fn to_variant<'py>(&self, py: Python<'py>) -> PyResult> { + self.clone().into_bound_py_any(py) } } @@ -374,8 +377,8 @@ impl LogicalNode for PyExecute { vec![] } - fn to_variant(&self, py: Python<'_>) -> PyResult { - Ok(self.clone().into_py(py)) + fn to_variant<'py>(&self, py: Python<'py>) -> PyResult> { + self.clone().into_bound_py_any(py) } } @@ -431,8 +434,8 @@ impl LogicalNode for PyDeallocate { vec![] } - fn to_variant(&self, py: Python<'_>) -> PyResult { - Ok(self.clone().into_py(py)) + fn to_variant<'py>(&self, py: Python<'py>) -> PyResult> { + self.clone().into_bound_py_any(py) } } diff --git a/src/expr/values.rs b/src/expr/values.rs index 33e4794d3..fb2692230 100644 --- a/src/expr/values.rs +++ b/src/expr/values.rs @@ -18,7 +18,7 @@ use std::sync::Arc; use datafusion::logical_expr::Values; -use pyo3::prelude::*; +use pyo3::{prelude::*, IntoPyObjectExt}; use pyo3::{pyclass, PyErr, PyResult, Python}; use crate::{common::df_schema::PyDFSchema, sql::logical::PyLogicalPlan}; @@ -50,8 +50,8 @@ impl LogicalNode for PyValues { vec![] } - fn to_variant(&self, py: Python<'_>) -> PyResult { - Ok(self.clone().into_py(py)) + fn to_variant<'py>(&self, py: Python<'py>) -> PyResult> { + self.clone().into_bound_py_any(py) } } From fbe4e922be7d1b18d7a5f3e66015d919c237a5fc Mon Sep 17 00:00:00 2001 From: Chongchen Chen Date: Thu, 27 Mar 2025 08:47:48 +0800 Subject: [PATCH 06/11] ruff --- python/datafusion/common.py | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/python/datafusion/common.py b/python/datafusion/common.py index 7f18cba5a..c689a816d 100644 --- a/python/datafusion/common.py +++ b/python/datafusion/common.py @@ -38,6 +38,7 @@ Constraints = common_internal.Constraints __all__ = [ + "Constraints", "DFSchema", "DataType", "DataTypeMap", @@ -50,9 +51,8 @@ "SqlTable", "SqlType", "SqlView", - "TableType", "TableSource", - "Constraints", + "TableType", ] From 088d3121c742d729a99ddff489268986a5aa4b6f Mon Sep 17 00:00:00 2001 From: Chen Chongchen Date: Thu, 27 Mar 2025 12:52:23 +0800 Subject: [PATCH 07/11] Update expr.py --- python/datafusion/expr.py | 50 +++++++++++++++++++++++++++++++++++++++ 1 file changed, 50 insertions(+) diff --git a/python/datafusion/expr.py b/python/datafusion/expr.py index 2697d8143..c96f733b5 100644 --- a/python/datafusion/expr.py +++ b/python/datafusion/expr.py @@ -54,14 +54,29 @@ Case = expr_internal.Case Cast = expr_internal.Cast Column = expr_internal.Column +CopyTo = expr_internal.CopyTo +CreateCatalog = expr_internal.CreateCatalog +CreateCatalogSchema = expr_internal.CreateCatalogSchema +CreateExternalTable = expr_internal.CreateExternalTable +CreateFunction = expr_internal.CreateFunction +CreateFunctionBody = expr_internal.CreateFunctionBody +CreateIndex = expr_internal.CreateIndex CreateMemoryTable = expr_internal.CreateMemoryTable CreateView = expr_internal.CreateView +Deallocate = expr_internal.Deallocate +DescribeTable = expr_internal.DescribeTable Distinct = expr_internal.Distinct +DmlStatement = expr_internal.DmlStatement +DropCatalogSchema = expr_internal.DropCatalogSchema +DropFunction = expr_internal.DropFunction DropTable = expr_internal.DropTable +DropView = expr_internal.DropView EmptyRelation = expr_internal.EmptyRelation +Execute = expr_internal.Execute Exists = expr_internal.Exists Explain = expr_internal.Explain Extension = expr_internal.Extension +FileType = expr_internal.FileType Filter = expr_internal.Filter GroupingSet = expr_internal.GroupingSet Join = expr_internal.Join @@ -83,21 +98,31 @@ Literal = expr_internal.Literal Negative = expr_internal.Negative Not = expr_internal.Not +OperateFunctionArg = expr_internal.OperateFunctionArg Partitioning = expr_internal.Partitioning Placeholder = expr_internal.Placeholder +Prepare = expr_internal.Prepare Projection = expr_internal.Projection +RecursiveQuery = expr_internal.RecursiveQuery Repartition = expr_internal.Repartition ScalarSubquery = expr_internal.ScalarSubquery ScalarVariable = expr_internal.ScalarVariable +SetVariable = expr_internal.SetVariable SimilarTo = expr_internal.SimilarTo Sort = expr_internal.Sort Subquery = expr_internal.Subquery SubqueryAlias = expr_internal.SubqueryAlias TableScan = expr_internal.TableScan +TransactionAccessMode = expr_internal.TransactionAccessMode +TransactionConclusion = expr_internal.TransactionConclusion +TransactionEnd = expr_internal.TransactionEnd +TransactionIsolationLevel = expr_internal.TransactionIsolationLevel +TransactionStart = expr_internal.TransactionStart TryCast = expr_internal.TryCast Union = expr_internal.Union Unnest = expr_internal.Unnest UnnestExpr = expr_internal.UnnestExpr +Values = expr_internal.Values WindowExpr = expr_internal.WindowExpr __all__ = [ @@ -111,15 +136,30 @@ "CaseBuilder", "Cast", "Column", + "CopyTo", + "CreateCatalog", + "CreateCatalogSchema", + "CreateExternalTable", + "CreateFunction", + "CreateFunctionBody", + "CreateIndex", "CreateMemoryTable", "CreateView", + "Deallocate", + "DescribeTable", "Distinct", + "DmlStatement", + "DropCatalogSchema", + "DropFunction", "DropTable", + "DropView", "EmptyRelation", + "Execute", "Exists", "Explain", "Expr", "Extension", + "FileType", "Filter", "GroupingSet", "ILike", @@ -142,22 +182,32 @@ "Literal", "Negative", "Not", + "OperateFunctionArg", "Partitioning", "Placeholder", + "Prepare", "Projection", + "RecursiveQuery", "Repartition", "ScalarSubquery", "ScalarVariable", + "SetVariable", "SimilarTo", "Sort", "SortExpr", "Subquery", "SubqueryAlias", "TableScan", + "TransactionAccessMode", + "TransactionConclusion", + "TransactionEnd", + "TransactionIsolationLevel", + "TransactionStart", "TryCast", "Union", "Unnest", "UnnestExpr", + "Values", "Window", "WindowExpr", "WindowFrame", From 40052252c293ee752565f8a782ef3c2e67dce7f7 Mon Sep 17 00:00:00 2001 From: Chongchen Chen Date: Fri, 28 Mar 2025 21:05:58 +0800 Subject: [PATCH 08/11] add test --- python/tests/test_expr.py | 86 +++++++++++++++++++++++++++++++++++++++ 1 file changed, 86 insertions(+) diff --git a/python/tests/test_expr.py b/python/tests/test_expr.py index 926e69845..3b75fb1a5 100644 --- a/python/tests/test_expr.py +++ b/python/tests/test_expr.py @@ -29,6 +29,15 @@ Projection, Sort, TableScan, + CopyTo, + CreateIndex, + DescribeTable, + DmlStatement, + DropCatalogSchema, + RecursiveQuery, + TransactionEnd, + TransactionStart, + Values, ) @@ -247,3 +256,80 @@ def test_fill_null(df): assert result.column(0) == pa.array([1, 2, 100]) assert result.column(1) == pa.array([4, 25, 6]) assert result.column(2) == pa.array([1234, 1234, 8]) + + +def test_copy_to(): + ctx = SessionContext() + ctx.sql("CREATE TABLE foo (a int, b int)").collect() + df = ctx.sql("COPY foo TO bar STORED AS CSV") + plan = df.logical_plan() + plan = plan.to_variant() + assert isinstance(plan, CopyTo) + + +def test_create_index(): + ctx = SessionContext() + ctx.sql("CREATE TABLE foo (a int, b int)").collect() + plan = ctx.sql("create index idx on foo (a)").logical_plan() + plan = plan.to_variant() + assert isinstance(plan, CreateIndex) + + +def test_describe_table(): + ctx = SessionContext() + ctx.sql("CREATE TABLE foo (a int, b int)").collect() + plan = ctx.sql("describe foo").logical_plan() + plan = plan.to_variant() + assert isinstance(plan, DescribeTable) + + +def test_dml_statement(): + ctx = SessionContext() + ctx.sql("CREATE TABLE foo (a int, b int)").collect() + plan = ctx.sql("insert into foo values (1, 2)").logical_plan() + plan = plan.to_variant() + assert isinstance(plan, DmlStatement) + + +def drop_catalog_schema(): + ctx = SessionContext() + plan = ctx.sql("drop schema cat").logical_plan() + plan = plan.to_variant() + assert isinstance(plan, DropCatalogSchema) + + +def test_recursive_query(): + ctx = SessionContext() + plan = ctx.sql( + """ + WITH RECURSIVE cte AS ( + SELECT 1 as n + UNION ALL + SELECT n + 1 FROM cte WHERE n < 5 + ) + SELECT * FROM cte; + """ + ).logical_plan() + plan = plan.inputs()[0].inputs()[0].to_variant() + assert isinstance(plan, RecursiveQuery) + + +def test_values(): + ctx = SessionContext() + plan = ctx.sql("values (1, 'foo'), (2, 'bar')").logical_plan() + plan = plan.to_variant() + assert isinstance(plan, Values) + + +def test_transaction_start(): + ctx = SessionContext() + plan = ctx.sql("START TRANSACTION").logical_plan() + plan = plan.to_variant() + assert isinstance(plan, TransactionStart) + + +def test_transaction_end(): + ctx = SessionContext() + plan = ctx.sql("COMMIT").logical_plan() + plan = plan.to_variant() + assert isinstance(plan, TransactionEnd) From 66f8a38f7373f2499c4bc05ad48bec6dc0fa0c88 Mon Sep 17 00:00:00 2001 From: Chongchen Chen Date: Fri, 28 Mar 2025 22:19:22 +0800 Subject: [PATCH 09/11] ruff --- python/tests/test_expr.py | 12 ++++++------ 1 file changed, 6 insertions(+), 6 deletions(-) diff --git a/python/tests/test_expr.py b/python/tests/test_expr.py index 3b75fb1a5..b6493a906 100644 --- a/python/tests/test_expr.py +++ b/python/tests/test_expr.py @@ -23,18 +23,18 @@ AggregateFunction, BinaryExpr, Column, - Filter, - Limit, - Literal, - Projection, - Sort, - TableScan, CopyTo, CreateIndex, DescribeTable, DmlStatement, DropCatalogSchema, + Filter, + Limit, + Literal, + Projection, RecursiveQuery, + Sort, + TableScan, TransactionEnd, TransactionStart, Values, From 28577f8d32b600a537b159b6e6763c2a390de645 Mon Sep 17 00:00:00 2001 From: Tim Saucer Date: Sun, 27 Apr 2025 10:06:40 -0400 Subject: [PATCH 10/11] Minor ruff whitespace change --- python/tests/test_expr.py | 1 + 1 file changed, 1 insertion(+) diff --git a/python/tests/test_expr.py b/python/tests/test_expr.py index c5483af1e..110926140 100644 --- a/python/tests/test_expr.py +++ b/python/tests/test_expr.py @@ -334,6 +334,7 @@ def test_transaction_end(): plan = plan.to_variant() assert isinstance(plan, TransactionEnd) + def test_alias_with_metadata(df): df = df.select(col("a").alias("b", {"key": "value"})) assert df.schema().field("b").metadata == {b"key": b"value"} From 52d7269842cc6d2c47417472a936268360d8677e Mon Sep 17 00:00:00 2001 From: Tim Saucer Date: Sun, 27 Apr 2025 10:25:15 -0400 Subject: [PATCH 11/11] Minor format change --- python/datafusion/expr.py | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/python/datafusion/expr.py b/python/datafusion/expr.py index 15894e473..dc84fdd19 100644 --- a/python/datafusion/expr.py +++ b/python/datafusion/expr.py @@ -713,8 +713,8 @@ def log10(self) -> Expr: def initcap(self) -> Expr: """Set the initial letter of each word to capital. - Converts the first letter of each word in ``string`` to uppercase and the remaining - characters to lowercase. + Converts the first letter of each word in ``string`` to uppercase and the + remaining characters to lowercase. """ return F.initcap(self)