diff --git a/python/datafusion/common.py b/python/datafusion/common.py index e762a993b..c689a816d 100644 --- a/python/datafusion/common.py +++ b/python/datafusion/common.py @@ -33,8 +33,12 @@ SqlTable = common_internal.SqlTable SqlType = common_internal.SqlType SqlView = common_internal.SqlView +TableType = common_internal.TableType +TableSource = common_internal.TableSource +Constraints = common_internal.Constraints __all__ = [ + "Constraints", "DFSchema", "DataType", "DataTypeMap", @@ -47,6 +51,8 @@ "SqlTable", "SqlType", "SqlView", + "TableSource", + "TableType", ] diff --git a/python/datafusion/expr.py b/python/datafusion/expr.py index 3750eeb3f..9e58873d0 100644 --- a/python/datafusion/expr.py +++ b/python/datafusion/expr.py @@ -54,14 +54,29 @@ Case = expr_internal.Case Cast = expr_internal.Cast Column = expr_internal.Column +CopyTo = expr_internal.CopyTo +CreateCatalog = expr_internal.CreateCatalog +CreateCatalogSchema = expr_internal.CreateCatalogSchema +CreateExternalTable = expr_internal.CreateExternalTable +CreateFunction = expr_internal.CreateFunction +CreateFunctionBody = expr_internal.CreateFunctionBody +CreateIndex = expr_internal.CreateIndex CreateMemoryTable = expr_internal.CreateMemoryTable CreateView = expr_internal.CreateView +Deallocate = expr_internal.Deallocate +DescribeTable = expr_internal.DescribeTable Distinct = expr_internal.Distinct +DmlStatement = expr_internal.DmlStatement +DropCatalogSchema = expr_internal.DropCatalogSchema +DropFunction = expr_internal.DropFunction DropTable = expr_internal.DropTable +DropView = expr_internal.DropView EmptyRelation = expr_internal.EmptyRelation +Execute = expr_internal.Execute Exists = expr_internal.Exists Explain = expr_internal.Explain Extension = expr_internal.Extension +FileType = expr_internal.FileType Filter = expr_internal.Filter GroupingSet = expr_internal.GroupingSet Join = expr_internal.Join @@ -83,21 +98,31 @@ Literal = expr_internal.Literal Negative = expr_internal.Negative Not = expr_internal.Not +OperateFunctionArg = expr_internal.OperateFunctionArg Partitioning = expr_internal.Partitioning Placeholder = expr_internal.Placeholder +Prepare = expr_internal.Prepare Projection = expr_internal.Projection +RecursiveQuery = expr_internal.RecursiveQuery Repartition = expr_internal.Repartition ScalarSubquery = expr_internal.ScalarSubquery ScalarVariable = expr_internal.ScalarVariable +SetVariable = expr_internal.SetVariable SimilarTo = expr_internal.SimilarTo Sort = expr_internal.Sort Subquery = expr_internal.Subquery SubqueryAlias = expr_internal.SubqueryAlias TableScan = expr_internal.TableScan +TransactionAccessMode = expr_internal.TransactionAccessMode +TransactionConclusion = expr_internal.TransactionConclusion +TransactionEnd = expr_internal.TransactionEnd +TransactionIsolationLevel = expr_internal.TransactionIsolationLevel +TransactionStart = expr_internal.TransactionStart TryCast = expr_internal.TryCast Union = expr_internal.Union Unnest = expr_internal.Unnest UnnestExpr = expr_internal.UnnestExpr +Values = expr_internal.Values WindowExpr = expr_internal.WindowExpr __all__ = [ @@ -111,15 +136,30 @@ "CaseBuilder", "Cast", "Column", + "CopyTo", + "CreateCatalog", + "CreateCatalogSchema", + "CreateExternalTable", + "CreateFunction", + "CreateFunctionBody", + "CreateIndex", "CreateMemoryTable", "CreateView", + "Deallocate", + "DescribeTable", "Distinct", + "DmlStatement", + "DropCatalogSchema", + "DropFunction", "DropTable", + "DropView", "EmptyRelation", + "Execute", "Exists", "Explain", "Expr", "Extension", + "FileType", "Filter", "GroupingSet", "ILike", @@ -142,22 +182,32 @@ "Literal", "Negative", "Not", + "OperateFunctionArg", "Partitioning", "Placeholder", + "Prepare", "Projection", + "RecursiveQuery", "Repartition", "ScalarSubquery", "ScalarVariable", + "SetVariable", "SimilarTo", "Sort", "SortExpr", "Subquery", "SubqueryAlias", "TableScan", + "TransactionAccessMode", + "TransactionConclusion", + "TransactionEnd", + "TransactionIsolationLevel", + "TransactionStart", "TryCast", "Union", "Unnest", "UnnestExpr", + "Values", "Window", "WindowExpr", "WindowFrame", @@ -686,8 +736,8 @@ def log10(self) -> Expr: def initcap(self) -> Expr: """Set the initial letter of each word to capital. - Converts the first letter of each word in ``string`` - to uppercase and the remaining characters to lowercase. + Converts the first letter of each word in ``string`` to uppercase and the + remaining characters to lowercase. """ from . import functions as F diff --git a/python/tests/test_expr.py b/python/tests/test_expr.py index 3651b60d6..58a202724 100644 --- a/python/tests/test_expr.py +++ b/python/tests/test_expr.py @@ -23,12 +23,21 @@ AggregateFunction, BinaryExpr, Column, + CopyTo, + CreateIndex, + DescribeTable, + DmlStatement, + DropCatalogSchema, Filter, Limit, Literal, Projection, + RecursiveQuery, Sort, TableScan, + TransactionEnd, + TransactionStart, + Values, ) @@ -249,6 +258,83 @@ def test_fill_null(df): assert result.column(2) == pa.array([1234, 1234, 8]) +def test_copy_to(): + ctx = SessionContext() + ctx.sql("CREATE TABLE foo (a int, b int)").collect() + df = ctx.sql("COPY foo TO bar STORED AS CSV") + plan = df.logical_plan() + plan = plan.to_variant() + assert isinstance(plan, CopyTo) + + +def test_create_index(): + ctx = SessionContext() + ctx.sql("CREATE TABLE foo (a int, b int)").collect() + plan = ctx.sql("create index idx on foo (a)").logical_plan() + plan = plan.to_variant() + assert isinstance(plan, CreateIndex) + + +def test_describe_table(): + ctx = SessionContext() + ctx.sql("CREATE TABLE foo (a int, b int)").collect() + plan = ctx.sql("describe foo").logical_plan() + plan = plan.to_variant() + assert isinstance(plan, DescribeTable) + + +def test_dml_statement(): + ctx = SessionContext() + ctx.sql("CREATE TABLE foo (a int, b int)").collect() + plan = ctx.sql("insert into foo values (1, 2)").logical_plan() + plan = plan.to_variant() + assert isinstance(plan, DmlStatement) + + +def drop_catalog_schema(): + ctx = SessionContext() + plan = ctx.sql("drop schema cat").logical_plan() + plan = plan.to_variant() + assert isinstance(plan, DropCatalogSchema) + + +def test_recursive_query(): + ctx = SessionContext() + plan = ctx.sql( + """ + WITH RECURSIVE cte AS ( + SELECT 1 as n + UNION ALL + SELECT n + 1 FROM cte WHERE n < 5 + ) + SELECT * FROM cte; + """ + ).logical_plan() + plan = plan.inputs()[0].inputs()[0].to_variant() + assert isinstance(plan, RecursiveQuery) + + +def test_values(): + ctx = SessionContext() + plan = ctx.sql("values (1, 'foo'), (2, 'bar')").logical_plan() + plan = plan.to_variant() + assert isinstance(plan, Values) + + +def test_transaction_start(): + ctx = SessionContext() + plan = ctx.sql("START TRANSACTION").logical_plan() + plan = plan.to_variant() + assert isinstance(plan, TransactionStart) + + +def test_transaction_end(): + ctx = SessionContext() + plan = ctx.sql("COMMIT").logical_plan() + plan = plan.to_variant() + assert isinstance(plan, TransactionEnd) + + def test_col_getattr(): ctx = SessionContext() data = { diff --git a/src/common.rs b/src/common.rs index 453bf67a4..88d2fdd5f 100644 --- a/src/common.rs +++ b/src/common.rs @@ -36,5 +36,8 @@ pub(crate) fn init_module(m: &Bound<'_, PyModule>) -> PyResult<()> { m.add_class::()?; m.add_class::()?; m.add_class::()?; + m.add_class::()?; + m.add_class::()?; + m.add_class::()?; Ok(()) } diff --git a/src/common/schema.rs b/src/common/schema.rs index 66ce925ae..5a54fe333 100644 --- a/src/common/schema.rs +++ b/src/common/schema.rs @@ -15,14 +15,22 @@ // specific language governing permissions and limitations // under the License. +use std::fmt::{self, Display, Formatter}; +use std::sync::Arc; use std::{any::Any, borrow::Cow}; +use arrow::datatypes::Schema; +use arrow::pyarrow::PyArrowType; use datafusion::arrow::datatypes::SchemaRef; +use datafusion::common::Constraints; +use datafusion::datasource::TableType; use datafusion::logical_expr::{Expr, TableProviderFilterPushDown, TableSource}; use pyo3::prelude::*; use datafusion::logical_expr::utils::split_conjunction; +use crate::sql::logical::PyLogicalPlan; + use super::{data_type::DataTypeMap, function::SqlFunction}; #[pyclass(name = "SqlSchema", module = "datafusion.common", subclass)] @@ -218,3 +226,84 @@ impl SqlStatistics { self.row_count } } + +#[pyclass(name = "Constraints", module = "datafusion.expr", subclass)] +#[derive(Clone)] +pub struct PyConstraints { + pub constraints: Constraints, +} + +impl From for Constraints { + fn from(constraints: PyConstraints) -> Self { + constraints.constraints + } +} + +impl From for PyConstraints { + fn from(constraints: Constraints) -> Self { + PyConstraints { constraints } + } +} + +impl Display for PyConstraints { + fn fmt(&self, f: &mut Formatter) -> fmt::Result { + write!(f, "Constraints: {:?}", self.constraints) + } +} + +#[derive(Debug, Clone, PartialEq, Eq, Hash, PartialOrd, Ord)] +#[pyclass(eq, eq_int, name = "TableType", module = "datafusion.common")] +pub enum PyTableType { + Base, + View, + Temporary, +} + +impl From for datafusion::logical_expr::TableType { + fn from(table_type: PyTableType) -> Self { + match table_type { + PyTableType::Base => datafusion::logical_expr::TableType::Base, + PyTableType::View => datafusion::logical_expr::TableType::View, + PyTableType::Temporary => datafusion::logical_expr::TableType::Temporary, + } + } +} + +impl From for PyTableType { + fn from(table_type: TableType) -> Self { + match table_type { + datafusion::logical_expr::TableType::Base => PyTableType::Base, + datafusion::logical_expr::TableType::View => PyTableType::View, + datafusion::logical_expr::TableType::Temporary => PyTableType::Temporary, + } + } +} + +#[pyclass(name = "TableSource", module = "datafusion.common", subclass)] +#[derive(Clone)] +pub struct PyTableSource { + pub table_source: Arc, +} + +#[pymethods] +impl PyTableSource { + pub fn schema(&self) -> PyArrowType { + (*self.table_source.schema()).clone().into() + } + + pub fn constraints(&self) -> Option { + self.table_source.constraints().map(|c| PyConstraints { + constraints: c.clone(), + }) + } + + pub fn table_type(&self) -> PyTableType { + self.table_source.table_type().into() + } + + pub fn get_logical_plan(&self) -> Option { + self.table_source + .get_logical_plan() + .map(|plan| PyLogicalPlan::new(plan.into_owned())) + } +} diff --git a/src/expr.rs b/src/expr.rs index 7d4aa8798..404e575f8 100644 --- a/src/expr.rs +++ b/src/expr.rs @@ -67,10 +67,21 @@ pub mod case; pub mod cast; pub mod column; pub mod conditional_expr; +pub mod copy_to; +pub mod create_catalog; +pub mod create_catalog_schema; +pub mod create_external_table; +pub mod create_function; +pub mod create_index; pub mod create_memory_table; pub mod create_view; +pub mod describe_table; pub mod distinct; +pub mod dml; +pub mod drop_catalog_schema; +pub mod drop_function; pub mod drop_table; +pub mod drop_view; pub mod empty_relation; pub mod exists; pub mod explain; @@ -86,18 +97,21 @@ pub mod literal; pub mod logical_node; pub mod placeholder; pub mod projection; +pub mod recursive_query; pub mod repartition; pub mod scalar_subquery; pub mod scalar_variable; pub mod signature; pub mod sort; pub mod sort_expr; +pub mod statement; pub mod subquery; pub mod subquery_alias; pub mod table_scan; pub mod union; pub mod unnest; pub mod unnest_expr; +pub mod values; pub mod window; use sort_expr::{to_sort_expressions, PySortExpr}; @@ -802,5 +816,32 @@ pub(crate) fn init_module(m: &Bound<'_, PyModule>) -> PyResult<()> { m.add_class::()?; m.add_class::()?; m.add_class::()?; + m.add_class::()?; + m.add_class::()?; + m.add_class::()?; + m.add_class::()?; + m.add_class::()?; + m.add_class::()?; + m.add_class::()?; + m.add_class::()?; + m.add_class::()?; + m.add_class::()?; + m.add_class::()?; + m.add_class::()?; + m.add_class::()?; + m.add_class::()?; + m.add_class::()?; + + m.add_class::()?; + m.add_class::()?; + m.add_class::()?; + m.add_class::()?; + m.add_class::()?; + m.add_class::()?; + m.add_class::()?; + m.add_class::()?; + m.add_class::()?; + m.add_class::()?; + Ok(()) } diff --git a/src/expr/copy_to.rs b/src/expr/copy_to.rs new file mode 100644 index 000000000..ebfcb8ebc --- /dev/null +++ b/src/expr/copy_to.rs @@ -0,0 +1,138 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +use std::{ + collections::HashMap, + fmt::{self, Display, Formatter}, + sync::Arc, +}; + +use datafusion::{common::file_options::file_type::FileType, logical_expr::dml::CopyTo}; +use pyo3::{prelude::*, IntoPyObjectExt}; + +use crate::sql::logical::PyLogicalPlan; + +use super::logical_node::LogicalNode; + +#[pyclass(name = "CopyTo", module = "datafusion.expr", subclass)] +#[derive(Clone)] +pub struct PyCopyTo { + copy: CopyTo, +} + +impl From for CopyTo { + fn from(copy: PyCopyTo) -> Self { + copy.copy + } +} + +impl From for PyCopyTo { + fn from(copy: CopyTo) -> PyCopyTo { + PyCopyTo { copy } + } +} + +impl Display for PyCopyTo { + fn fmt(&self, f: &mut Formatter) -> fmt::Result { + write!(f, "CopyTo: {:?}", self.copy.output_url) + } +} + +impl LogicalNode for PyCopyTo { + fn inputs(&self) -> Vec { + vec![PyLogicalPlan::from((*self.copy.input).clone())] + } + + fn to_variant<'py>(&self, py: Python<'py>) -> PyResult> { + self.clone().into_bound_py_any(py) + } +} + +#[pymethods] +impl PyCopyTo { + #[new] + pub fn new( + input: PyLogicalPlan, + output_url: String, + partition_by: Vec, + file_type: PyFileType, + options: HashMap, + ) -> Self { + PyCopyTo { + copy: CopyTo { + input: input.plan(), + output_url, + partition_by, + file_type: file_type.file_type, + options, + }, + } + } + + fn input(&self) -> PyLogicalPlan { + PyLogicalPlan::from((*self.copy.input).clone()) + } + + fn output_url(&self) -> String { + self.copy.output_url.clone() + } + + fn partition_by(&self) -> Vec { + self.copy.partition_by.clone() + } + + fn file_type(&self) -> PyFileType { + PyFileType { + file_type: self.copy.file_type.clone(), + } + } + + fn options(&self) -> HashMap { + self.copy.options.clone() + } + + fn __repr__(&self) -> PyResult { + Ok(format!("CopyTo({})", self)) + } + + fn __name__(&self) -> PyResult { + Ok("CopyTo".to_string()) + } +} + +#[pyclass(name = "FileType", module = "datafusion.expr", subclass)] +#[derive(Clone)] +pub struct PyFileType { + file_type: Arc, +} + +impl Display for PyFileType { + fn fmt(&self, f: &mut Formatter) -> fmt::Result { + write!(f, "FileType: {}", self.file_type) + } +} + +#[pymethods] +impl PyFileType { + fn __repr__(&self) -> PyResult { + Ok(format!("FileType({})", self)) + } + + fn __name__(&self) -> PyResult { + Ok("FileType".to_string()) + } +} diff --git a/src/expr/create_catalog.rs b/src/expr/create_catalog.rs new file mode 100644 index 000000000..f4ea0f517 --- /dev/null +++ b/src/expr/create_catalog.rs @@ -0,0 +1,100 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +use std::{ + fmt::{self, Display, Formatter}, + sync::Arc, +}; + +use datafusion::logical_expr::CreateCatalog; +use pyo3::{prelude::*, IntoPyObjectExt}; + +use crate::{common::df_schema::PyDFSchema, sql::logical::PyLogicalPlan}; + +use super::logical_node::LogicalNode; + +#[pyclass(name = "CreateCatalog", module = "datafusion.expr", subclass)] +#[derive(Clone)] +pub struct PyCreateCatalog { + create: CreateCatalog, +} + +impl From for CreateCatalog { + fn from(create: PyCreateCatalog) -> Self { + create.create + } +} + +impl From for PyCreateCatalog { + fn from(create: CreateCatalog) -> PyCreateCatalog { + PyCreateCatalog { create } + } +} + +impl Display for PyCreateCatalog { + fn fmt(&self, f: &mut Formatter) -> fmt::Result { + write!(f, "CreateCatalog: {:?}", self.create.catalog_name) + } +} + +#[pymethods] +impl PyCreateCatalog { + #[new] + pub fn new( + catalog_name: String, + if_not_exists: bool, + schema: PyDFSchema, + ) -> PyResult { + Ok(PyCreateCatalog { + create: CreateCatalog { + catalog_name, + if_not_exists, + schema: Arc::new(schema.into()), + }, + }) + } + + pub fn catalog_name(&self) -> String { + self.create.catalog_name.clone() + } + + pub fn if_not_exists(&self) -> bool { + self.create.if_not_exists + } + + pub fn schema(&self) -> PyDFSchema { + (*self.create.schema).clone().into() + } + + fn __repr__(&self) -> PyResult { + Ok(format!("CreateCatalog({})", self)) + } + + fn __name__(&self) -> PyResult { + Ok("CreateCatalog".to_string()) + } +} + +impl LogicalNode for PyCreateCatalog { + fn inputs(&self) -> Vec { + vec![] + } + + fn to_variant<'py>(&self, py: Python<'py>) -> PyResult> { + self.clone().into_bound_py_any(py) + } +} diff --git a/src/expr/create_catalog_schema.rs b/src/expr/create_catalog_schema.rs new file mode 100644 index 000000000..85f447e1e --- /dev/null +++ b/src/expr/create_catalog_schema.rs @@ -0,0 +1,100 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +use std::{ + fmt::{self, Display, Formatter}, + sync::Arc, +}; + +use datafusion::logical_expr::CreateCatalogSchema; +use pyo3::{prelude::*, IntoPyObjectExt}; + +use crate::{common::df_schema::PyDFSchema, sql::logical::PyLogicalPlan}; + +use super::logical_node::LogicalNode; + +#[pyclass(name = "CreateCatalogSchema", module = "datafusion.expr", subclass)] +#[derive(Clone)] +pub struct PyCreateCatalogSchema { + create: CreateCatalogSchema, +} + +impl From for CreateCatalogSchema { + fn from(create: PyCreateCatalogSchema) -> Self { + create.create + } +} + +impl From for PyCreateCatalogSchema { + fn from(create: CreateCatalogSchema) -> PyCreateCatalogSchema { + PyCreateCatalogSchema { create } + } +} + +impl Display for PyCreateCatalogSchema { + fn fmt(&self, f: &mut Formatter) -> fmt::Result { + write!(f, "CreateCatalogSchema: {:?}", self.create.schema_name) + } +} + +#[pymethods] +impl PyCreateCatalogSchema { + #[new] + pub fn new( + schema_name: String, + if_not_exists: bool, + schema: PyDFSchema, + ) -> PyResult { + Ok(PyCreateCatalogSchema { + create: CreateCatalogSchema { + schema_name, + if_not_exists, + schema: Arc::new(schema.into()), + }, + }) + } + + pub fn schema_name(&self) -> String { + self.create.schema_name.clone() + } + + pub fn if_not_exists(&self) -> bool { + self.create.if_not_exists + } + + pub fn schema(&self) -> PyDFSchema { + (*self.create.schema).clone().into() + } + + fn __repr__(&self) -> PyResult { + Ok(format!("CreateCatalogSchema({})", self)) + } + + fn __name__(&self) -> PyResult { + Ok("CreateCatalogSchema".to_string()) + } +} + +impl LogicalNode for PyCreateCatalogSchema { + fn inputs(&self) -> Vec { + vec![] + } + + fn to_variant<'py>(&self, py: Python<'py>) -> PyResult> { + self.clone().into_bound_py_any(py) + } +} diff --git a/src/expr/create_external_table.rs b/src/expr/create_external_table.rs new file mode 100644 index 000000000..01ce7d0ca --- /dev/null +++ b/src/expr/create_external_table.rs @@ -0,0 +1,183 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +use crate::{common::schema::PyConstraints, expr::PyExpr, sql::logical::PyLogicalPlan}; +use std::{ + collections::HashMap, + fmt::{self, Display, Formatter}, + sync::Arc, +}; + +use datafusion::logical_expr::CreateExternalTable; +use pyo3::{prelude::*, IntoPyObjectExt}; + +use crate::common::df_schema::PyDFSchema; + +use super::{logical_node::LogicalNode, sort_expr::PySortExpr}; + +#[pyclass(name = "CreateExternalTable", module = "datafusion.expr", subclass)] +#[derive(Clone)] +pub struct PyCreateExternalTable { + create: CreateExternalTable, +} + +impl From for CreateExternalTable { + fn from(create: PyCreateExternalTable) -> Self { + create.create + } +} + +impl From for PyCreateExternalTable { + fn from(create: CreateExternalTable) -> PyCreateExternalTable { + PyCreateExternalTable { create } + } +} + +impl Display for PyCreateExternalTable { + fn fmt(&self, f: &mut Formatter) -> fmt::Result { + write!( + f, + "CreateExternalTable: {:?}{}", + self.create.name, self.create.constraints + ) + } +} + +#[pymethods] +impl PyCreateExternalTable { + #[allow(clippy::too_many_arguments)] + #[new] + #[pyo3(signature = (schema, name, location, file_type, table_partition_cols, if_not_exists, temporary, order_exprs, unbounded, options, constraints, column_defaults, definition=None))] + pub fn new( + schema: PyDFSchema, + name: String, + location: String, + file_type: String, + table_partition_cols: Vec, + if_not_exists: bool, + temporary: bool, + order_exprs: Vec>, + unbounded: bool, + options: HashMap, + constraints: PyConstraints, + column_defaults: HashMap, + definition: Option, + ) -> Self { + let create = CreateExternalTable { + schema: Arc::new(schema.into()), + name: name.into(), + location, + file_type, + table_partition_cols, + if_not_exists, + temporary, + definition, + order_exprs: order_exprs + .into_iter() + .map(|vec| vec.into_iter().map(|s| s.into()).collect::>()) + .collect::>(), + unbounded, + options, + constraints: constraints.constraints, + column_defaults: column_defaults + .into_iter() + .map(|(k, v)| (k, v.into())) + .collect(), + }; + PyCreateExternalTable { create } + } + + pub fn schema(&self) -> PyDFSchema { + (*self.create.schema).clone().into() + } + + pub fn name(&self) -> PyResult { + Ok(self.create.name.to_string()) + } + + pub fn location(&self) -> String { + self.create.location.clone() + } + + pub fn file_type(&self) -> String { + self.create.file_type.clone() + } + + pub fn table_partition_cols(&self) -> Vec { + self.create.table_partition_cols.clone() + } + + pub fn if_not_exists(&self) -> bool { + self.create.if_not_exists + } + + pub fn temporary(&self) -> bool { + self.create.temporary + } + + pub fn definition(&self) -> Option { + self.create.definition.clone() + } + + pub fn order_exprs(&self) -> Vec> { + self.create + .order_exprs + .iter() + .map(|vec| vec.iter().map(|s| s.clone().into()).collect()) + .collect() + } + + pub fn unbounded(&self) -> bool { + self.create.unbounded + } + + pub fn options(&self) -> HashMap { + self.create.options.clone() + } + + pub fn constraints(&self) -> PyConstraints { + PyConstraints { + constraints: self.create.constraints.clone(), + } + } + + pub fn column_defaults(&self) -> HashMap { + self.create + .column_defaults + .iter() + .map(|(k, v)| (k.clone(), v.clone().into())) + .collect() + } + + fn __repr__(&self) -> PyResult { + Ok(format!("CreateExternalTable({})", self)) + } + + fn __name__(&self) -> PyResult { + Ok("CreateExternalTable".to_string()) + } +} + +impl LogicalNode for PyCreateExternalTable { + fn inputs(&self) -> Vec { + vec![] + } + + fn to_variant<'py>(&self, py: Python<'py>) -> PyResult> { + self.clone().into_bound_py_any(py) + } +} diff --git a/src/expr/create_function.rs b/src/expr/create_function.rs new file mode 100644 index 000000000..6f3c3f0ff --- /dev/null +++ b/src/expr/create_function.rs @@ -0,0 +1,182 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +use std::{ + fmt::{self, Display, Formatter}, + sync::Arc, +}; + +use datafusion::logical_expr::{ + CreateFunction, CreateFunctionBody, OperateFunctionArg, Volatility, +}; +use pyo3::{prelude::*, IntoPyObjectExt}; + +use super::logical_node::LogicalNode; +use super::PyExpr; +use crate::common::{data_type::PyDataType, df_schema::PyDFSchema}; +use crate::sql::logical::PyLogicalPlan; + +#[pyclass(name = "CreateFunction", module = "datafusion.expr", subclass)] +#[derive(Clone)] +pub struct PyCreateFunction { + create: CreateFunction, +} + +impl From for CreateFunction { + fn from(create: PyCreateFunction) -> Self { + create.create + } +} + +impl From for PyCreateFunction { + fn from(create: CreateFunction) -> PyCreateFunction { + PyCreateFunction { create } + } +} + +impl Display for PyCreateFunction { + fn fmt(&self, f: &mut Formatter) -> fmt::Result { + write!(f, "CreateFunction: name {:?}", self.create.name) + } +} + +#[pyclass(name = "OperateFunctionArg", module = "datafusion.expr", subclass)] +#[derive(Clone)] +pub struct PyOperateFunctionArg { + arg: OperateFunctionArg, +} + +#[derive(Debug, Clone, PartialEq, Eq, Hash, PartialOrd, Ord)] +#[pyclass(eq, eq_int, name = "Volatility", module = "datafusion.expr")] +pub enum PyVolatility { + Immutable, + Stable, + Volatile, +} + +#[pyclass(name = "CreateFunctionBody", module = "datafusion.expr", subclass)] +#[derive(Clone)] +pub struct PyCreateFunctionBody { + body: CreateFunctionBody, +} + +#[pymethods] +impl PyCreateFunctionBody { + pub fn language(&self) -> Option { + self.body + .language + .as_ref() + .map(|language| language.to_string()) + } + + pub fn behavior(&self) -> Option { + self.body.behavior.as_ref().map(|behavior| match behavior { + Volatility::Immutable => PyVolatility::Immutable, + Volatility::Stable => PyVolatility::Stable, + Volatility::Volatile => PyVolatility::Volatile, + }) + } + + pub fn function_body(&self) -> Option { + self.body + .function_body + .as_ref() + .map(|function_body| function_body.clone().into()) + } +} + +#[pymethods] +impl PyCreateFunction { + #[new] + #[pyo3(signature = (or_replace, temporary, name, params, schema, return_type=None, args=None))] + pub fn new( + or_replace: bool, + temporary: bool, + name: String, + params: PyCreateFunctionBody, + schema: PyDFSchema, + return_type: Option, + args: Option>, + ) -> Self { + PyCreateFunction { + create: CreateFunction { + or_replace, + temporary, + name, + args: args.map(|args| args.into_iter().map(|arg| arg.arg).collect()), + return_type: return_type.map(|return_type| return_type.data_type), + params: params.body, + schema: Arc::new(schema.into()), + }, + } + } + + pub fn or_replace(&self) -> bool { + self.create.or_replace + } + + pub fn temporary(&self) -> bool { + self.create.temporary + } + + pub fn name(&self) -> String { + self.create.name.clone() + } + + pub fn params(&self) -> PyCreateFunctionBody { + PyCreateFunctionBody { + body: self.create.params.clone(), + } + } + + pub fn schema(&self) -> PyDFSchema { + (*self.create.schema).clone().into() + } + + pub fn return_type(&self) -> Option { + self.create + .return_type + .as_ref() + .map(|return_type| return_type.clone().into()) + } + + pub fn args(&self) -> Option> { + self.create.args.as_ref().map(|args| { + args.iter() + .map(|arg| PyOperateFunctionArg { arg: arg.clone() }) + .collect() + }) + } + + fn __repr__(&self) -> PyResult { + Ok(format!("CreateFunction({})", self)) + } + + fn __name__(&self) -> PyResult { + Ok("CreateFunction".to_string()) + } +} + +impl LogicalNode for PyCreateFunction { + fn inputs(&self) -> Vec { + vec![] + } + + fn to_variant<'py>(&self, py: Python<'py>) -> PyResult> { + self.clone().into_bound_py_any(py) + } +} diff --git a/src/expr/create_index.rs b/src/expr/create_index.rs new file mode 100644 index 000000000..13dadbc3f --- /dev/null +++ b/src/expr/create_index.rs @@ -0,0 +1,129 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +use std::{ + fmt::{self, Display, Formatter}, + sync::Arc, +}; + +use datafusion::logical_expr::CreateIndex; +use pyo3::{prelude::*, IntoPyObjectExt}; + +use crate::{common::df_schema::PyDFSchema, sql::logical::PyLogicalPlan}; + +use super::{logical_node::LogicalNode, sort_expr::PySortExpr}; + +#[pyclass(name = "CreateIndex", module = "datafusion.expr", subclass)] +#[derive(Clone)] +pub struct PyCreateIndex { + create: CreateIndex, +} + +impl From for CreateIndex { + fn from(create: PyCreateIndex) -> Self { + create.create + } +} + +impl From for PyCreateIndex { + fn from(create: CreateIndex) -> PyCreateIndex { + PyCreateIndex { create } + } +} + +impl Display for PyCreateIndex { + fn fmt(&self, f: &mut Formatter) -> fmt::Result { + write!(f, "CreateIndex: {:?}", self.create.name) + } +} + +#[pymethods] +impl PyCreateIndex { + #[new] + #[pyo3(signature = (table, columns, unique, if_not_exists, schema, name=None, using=None))] + pub fn new( + table: String, + columns: Vec, + unique: bool, + if_not_exists: bool, + schema: PyDFSchema, + name: Option, + using: Option, + ) -> PyResult { + Ok(PyCreateIndex { + create: CreateIndex { + name, + table: table.into(), + using, + columns: columns.iter().map(|c| c.clone().into()).collect(), + unique, + if_not_exists, + schema: Arc::new(schema.into()), + }, + }) + } + + pub fn name(&self) -> Option { + self.create.name.clone() + } + + pub fn table(&self) -> PyResult { + Ok(self.create.table.to_string()) + } + + pub fn using(&self) -> Option { + self.create.using.clone() + } + + pub fn columns(&self) -> Vec { + self.create + .columns + .iter() + .map(|c| c.clone().into()) + .collect() + } + + pub fn unique(&self) -> bool { + self.create.unique + } + + pub fn if_not_exists(&self) -> bool { + self.create.if_not_exists + } + + pub fn schema(&self) -> PyDFSchema { + (*self.create.schema).clone().into() + } + + fn __repr__(&self) -> PyResult { + Ok(format!("CreateIndex({})", self)) + } + + fn __name__(&self) -> PyResult { + Ok("CreateIndex".to_string()) + } +} + +impl LogicalNode for PyCreateIndex { + fn inputs(&self) -> Vec { + vec![] + } + + fn to_variant<'py>(&self, py: Python<'py>) -> PyResult> { + self.clone().into_bound_py_any(py) + } +} diff --git a/src/expr/describe_table.rs b/src/expr/describe_table.rs new file mode 100644 index 000000000..5658a13f2 --- /dev/null +++ b/src/expr/describe_table.rs @@ -0,0 +1,92 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +use std::{ + fmt::{self, Display, Formatter}, + sync::Arc, +}; + +use arrow::{datatypes::Schema, pyarrow::PyArrowType}; +use datafusion::logical_expr::DescribeTable; +use pyo3::{prelude::*, IntoPyObjectExt}; + +use crate::{common::df_schema::PyDFSchema, sql::logical::PyLogicalPlan}; + +use super::logical_node::LogicalNode; + +#[pyclass(name = "DescribeTable", module = "datafusion.expr", subclass)] +#[derive(Clone)] +pub struct PyDescribeTable { + describe: DescribeTable, +} + +impl Display for PyDescribeTable { + fn fmt(&self, f: &mut Formatter) -> fmt::Result { + write!(f, "DescribeTable") + } +} + +#[pymethods] +impl PyDescribeTable { + #[new] + fn new(schema: PyArrowType, output_schema: PyDFSchema) -> Self { + Self { + describe: DescribeTable { + schema: Arc::new(schema.0), + output_schema: Arc::new(output_schema.into()), + }, + } + } + + pub fn schema(&self) -> PyArrowType { + (*self.describe.schema).clone().into() + } + + pub fn output_schema(&self) -> PyDFSchema { + (*self.describe.output_schema).clone().into() + } + + fn __repr__(&self) -> PyResult { + Ok(format!("DescribeTable({})", self)) + } + + fn __name__(&self) -> PyResult { + Ok("DescribeTable".to_string()) + } +} + +impl From for DescribeTable { + fn from(describe: PyDescribeTable) -> Self { + describe.describe + } +} + +impl From for PyDescribeTable { + fn from(describe: DescribeTable) -> PyDescribeTable { + PyDescribeTable { describe } + } +} + +impl LogicalNode for PyDescribeTable { + fn inputs(&self) -> Vec { + vec![] + } + + fn to_variant<'py>(&self, py: Python<'py>) -> PyResult> { + self.clone().into_bound_py_any(py) + } +} diff --git a/src/expr/dml.rs b/src/expr/dml.rs new file mode 100644 index 000000000..251e336cc --- /dev/null +++ b/src/expr/dml.rs @@ -0,0 +1,136 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +use datafusion::logical_expr::dml::InsertOp; +use datafusion::logical_expr::{DmlStatement, WriteOp}; +use pyo3::{prelude::*, IntoPyObjectExt}; + +use crate::common::schema::PyTableSource; +use crate::{common::df_schema::PyDFSchema, sql::logical::PyLogicalPlan}; + +use super::logical_node::LogicalNode; + +#[pyclass(name = "DmlStatement", module = "datafusion.expr", subclass)] +#[derive(Clone)] +pub struct PyDmlStatement { + dml: DmlStatement, +} + +impl From for DmlStatement { + fn from(dml: PyDmlStatement) -> Self { + dml.dml + } +} + +impl From for PyDmlStatement { + fn from(dml: DmlStatement) -> PyDmlStatement { + PyDmlStatement { dml } + } +} + +impl LogicalNode for PyDmlStatement { + fn inputs(&self) -> Vec { + vec![PyLogicalPlan::from((*self.dml.input).clone())] + } + + fn to_variant<'py>(&self, py: Python<'py>) -> PyResult> { + self.clone().into_bound_py_any(py) + } +} + +#[pymethods] +impl PyDmlStatement { + pub fn table_name(&self) -> PyResult { + Ok(self.dml.table_name.to_string()) + } + + pub fn target(&self) -> PyResult { + Ok(PyTableSource { + table_source: self.dml.target.clone(), + }) + } + + pub fn op(&self) -> PyWriteOp { + self.dml.op.clone().into() + } + + pub fn input(&self) -> PyLogicalPlan { + PyLogicalPlan { + plan: self.dml.input.clone(), + } + } + + pub fn output_schema(&self) -> PyDFSchema { + (*self.dml.output_schema).clone().into() + } + + fn __repr__(&self) -> PyResult { + Ok("DmlStatement".to_string()) + } + + fn __name__(&self) -> PyResult { + Ok("DmlStatement".to_string()) + } +} + +#[derive(Debug, Clone, PartialEq, Eq, Hash, PartialOrd, Ord)] +#[pyclass(eq, eq_int, name = "WriteOp", module = "datafusion.expr")] +pub enum PyWriteOp { + Append, + Overwrite, + Replace, + + Update, + Delete, + Ctas, +} + +impl From for PyWriteOp { + fn from(write_op: WriteOp) -> Self { + match write_op { + WriteOp::Insert(InsertOp::Append) => PyWriteOp::Append, + WriteOp::Insert(InsertOp::Overwrite) => PyWriteOp::Overwrite, + WriteOp::Insert(InsertOp::Replace) => PyWriteOp::Replace, + + WriteOp::Update => PyWriteOp::Update, + WriteOp::Delete => PyWriteOp::Delete, + WriteOp::Ctas => PyWriteOp::Ctas, + } + } +} + +impl From for WriteOp { + fn from(py: PyWriteOp) -> Self { + match py { + PyWriteOp::Append => WriteOp::Insert(InsertOp::Append), + PyWriteOp::Overwrite => WriteOp::Insert(InsertOp::Overwrite), + PyWriteOp::Replace => WriteOp::Insert(InsertOp::Replace), + + PyWriteOp::Update => WriteOp::Update, + PyWriteOp::Delete => WriteOp::Delete, + PyWriteOp::Ctas => WriteOp::Ctas, + } + } +} + +#[pymethods] +impl PyWriteOp { + fn name(&self) -> String { + let write_op: WriteOp = self.clone().into(); + write_op.name().to_string() + } +} diff --git a/src/expr/drop_catalog_schema.rs b/src/expr/drop_catalog_schema.rs new file mode 100644 index 000000000..b7420a99c --- /dev/null +++ b/src/expr/drop_catalog_schema.rs @@ -0,0 +1,116 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +use std::{ + fmt::{self, Display, Formatter}, + sync::Arc, +}; + +use datafusion::{common::SchemaReference, logical_expr::DropCatalogSchema, sql::TableReference}; +use pyo3::{exceptions::PyValueError, prelude::*, IntoPyObjectExt}; + +use crate::common::df_schema::PyDFSchema; + +use super::logical_node::LogicalNode; +use crate::sql::logical::PyLogicalPlan; + +#[pyclass(name = "DropCatalogSchema", module = "datafusion.expr", subclass)] +#[derive(Clone)] +pub struct PyDropCatalogSchema { + drop: DropCatalogSchema, +} + +impl From for DropCatalogSchema { + fn from(drop: PyDropCatalogSchema) -> Self { + drop.drop + } +} + +impl From for PyDropCatalogSchema { + fn from(drop: DropCatalogSchema) -> PyDropCatalogSchema { + PyDropCatalogSchema { drop } + } +} + +impl Display for PyDropCatalogSchema { + fn fmt(&self, f: &mut Formatter) -> fmt::Result { + write!(f, "DropCatalogSchema") + } +} + +fn parse_schema_reference(name: String) -> PyResult { + match name.into() { + TableReference::Bare { table } => Ok(SchemaReference::Bare { schema: table }), + TableReference::Partial { schema, table } => Ok(SchemaReference::Full { + schema: table, + catalog: schema, + }), + TableReference::Full { + catalog: _, + schema: _, + table: _, + } => Err(PyErr::new::( + "Invalid schema specifier (has 3 parts)".to_string(), + )), + } +} + +#[pymethods] +impl PyDropCatalogSchema { + #[new] + fn new(name: String, schema: PyDFSchema, if_exists: bool, cascade: bool) -> PyResult { + let name = parse_schema_reference(name)?; + Ok(PyDropCatalogSchema { + drop: DropCatalogSchema { + name, + schema: Arc::new(schema.into()), + if_exists, + cascade, + }, + }) + } + + fn name(&self) -> PyResult { + Ok(self.drop.name.to_string()) + } + + fn schema(&self) -> PyDFSchema { + (*self.drop.schema).clone().into() + } + + fn if_exists(&self) -> PyResult { + Ok(self.drop.if_exists) + } + + fn cascade(&self) -> PyResult { + Ok(self.drop.cascade) + } + + fn __repr__(&self) -> PyResult { + Ok(format!("DropCatalogSchema({})", self)) + } +} + +impl LogicalNode for PyDropCatalogSchema { + fn inputs(&self) -> Vec { + vec![] + } + + fn to_variant<'py>(&self, py: Python<'py>) -> PyResult> { + self.clone().into_bound_py_any(py) + } +} diff --git a/src/expr/drop_function.rs b/src/expr/drop_function.rs new file mode 100644 index 000000000..9fbd78fdc --- /dev/null +++ b/src/expr/drop_function.rs @@ -0,0 +1,95 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +use std::{ + fmt::{self, Display, Formatter}, + sync::Arc, +}; + +use datafusion::logical_expr::DropFunction; +use pyo3::{prelude::*, IntoPyObjectExt}; + +use super::logical_node::LogicalNode; +use crate::common::df_schema::PyDFSchema; +use crate::sql::logical::PyLogicalPlan; + +#[pyclass(name = "DropFunction", module = "datafusion.expr", subclass)] +#[derive(Clone)] +pub struct PyDropFunction { + drop: DropFunction, +} + +impl From for DropFunction { + fn from(drop: PyDropFunction) -> Self { + drop.drop + } +} + +impl From for PyDropFunction { + fn from(drop: DropFunction) -> PyDropFunction { + PyDropFunction { drop } + } +} + +impl Display for PyDropFunction { + fn fmt(&self, f: &mut Formatter) -> fmt::Result { + write!(f, "DropFunction") + } +} + +#[pymethods] +impl PyDropFunction { + #[new] + fn new(name: String, schema: PyDFSchema, if_exists: bool) -> PyResult { + Ok(PyDropFunction { + drop: DropFunction { + name, + schema: Arc::new(schema.into()), + if_exists, + }, + }) + } + fn name(&self) -> PyResult { + Ok(self.drop.name.clone()) + } + + fn schema(&self) -> PyDFSchema { + (*self.drop.schema).clone().into() + } + + fn if_exists(&self) -> PyResult { + Ok(self.drop.if_exists) + } + + fn __repr__(&self) -> PyResult { + Ok(format!("DropFunction({})", self)) + } + + fn __name__(&self) -> PyResult { + Ok("DropFunction".to_string()) + } +} + +impl LogicalNode for PyDropFunction { + fn inputs(&self) -> Vec { + vec![] + } + + fn to_variant<'py>(&self, py: Python<'py>) -> PyResult> { + self.clone().into_bound_py_any(py) + } +} diff --git a/src/expr/drop_view.rs b/src/expr/drop_view.rs new file mode 100644 index 000000000..1d1ab1e59 --- /dev/null +++ b/src/expr/drop_view.rs @@ -0,0 +1,102 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +use std::{ + fmt::{self, Display, Formatter}, + sync::Arc, +}; + +use datafusion::logical_expr::DropView; +use pyo3::{prelude::*, IntoPyObjectExt}; + +use crate::common::df_schema::PyDFSchema; + +use super::logical_node::LogicalNode; +use crate::sql::logical::PyLogicalPlan; + +#[pyclass(name = "DropView", module = "datafusion.expr", subclass)] +#[derive(Clone)] +pub struct PyDropView { + drop: DropView, +} + +impl From for DropView { + fn from(drop: PyDropView) -> Self { + drop.drop + } +} + +impl From for PyDropView { + fn from(drop: DropView) -> PyDropView { + PyDropView { drop } + } +} + +impl Display for PyDropView { + fn fmt(&self, f: &mut Formatter) -> fmt::Result { + write!( + f, + "DropView: {name:?} if not exist:={if_exists}", + name = self.drop.name, + if_exists = self.drop.if_exists + ) + } +} + +#[pymethods] +impl PyDropView { + #[new] + fn new(name: String, schema: PyDFSchema, if_exists: bool) -> PyResult { + Ok(PyDropView { + drop: DropView { + name: name.into(), + schema: Arc::new(schema.into()), + if_exists, + }, + }) + } + + fn name(&self) -> PyResult { + Ok(self.drop.name.to_string()) + } + + fn schema(&self) -> PyDFSchema { + (*self.drop.schema).clone().into() + } + + fn if_exists(&self) -> PyResult { + Ok(self.drop.if_exists) + } + + fn __repr__(&self) -> PyResult { + Ok(format!("DropView({})", self)) + } + + fn __name__(&self) -> PyResult { + Ok("DropView".to_string()) + } +} + +impl LogicalNode for PyDropView { + fn inputs(&self) -> Vec { + vec![] + } + + fn to_variant<'py>(&self, py: Python<'py>) -> PyResult> { + self.clone().into_bound_py_any(py) + } +} diff --git a/src/expr/recursive_query.rs b/src/expr/recursive_query.rs new file mode 100644 index 000000000..65181f7d3 --- /dev/null +++ b/src/expr/recursive_query.rs @@ -0,0 +1,111 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +use std::fmt::{self, Display, Formatter}; + +use datafusion::logical_expr::RecursiveQuery; +use pyo3::{prelude::*, IntoPyObjectExt}; + +use crate::sql::logical::PyLogicalPlan; + +use super::logical_node::LogicalNode; + +#[pyclass(name = "RecursiveQuery", module = "datafusion.expr", subclass)] +#[derive(Clone)] +pub struct PyRecursiveQuery { + query: RecursiveQuery, +} + +impl From for RecursiveQuery { + fn from(query: PyRecursiveQuery) -> Self { + query.query + } +} + +impl From for PyRecursiveQuery { + fn from(query: RecursiveQuery) -> PyRecursiveQuery { + PyRecursiveQuery { query } + } +} + +impl Display for PyRecursiveQuery { + fn fmt(&self, f: &mut Formatter) -> fmt::Result { + write!( + f, + "RecursiveQuery {name:?} is_distinct:={is_distinct}", + name = self.query.name, + is_distinct = self.query.is_distinct + ) + } +} + +#[pymethods] +impl PyRecursiveQuery { + #[new] + fn new( + name: String, + static_term: PyLogicalPlan, + recursive_term: PyLogicalPlan, + is_distinct: bool, + ) -> Self { + Self { + query: RecursiveQuery { + name, + static_term: static_term.plan(), + recursive_term: recursive_term.plan(), + is_distinct, + }, + } + } + + fn name(&self) -> PyResult { + Ok(self.query.name.clone()) + } + + fn static_term(&self) -> PyLogicalPlan { + PyLogicalPlan::from((*self.query.static_term).clone()) + } + + fn recursive_term(&self) -> PyLogicalPlan { + PyLogicalPlan::from((*self.query.recursive_term).clone()) + } + + fn is_distinct(&self) -> PyResult { + Ok(self.query.is_distinct) + } + + fn __repr__(&self) -> PyResult { + Ok(format!("RecursiveQuery({})", self)) + } + + fn __name__(&self) -> PyResult { + Ok("RecursiveQuery".to_string()) + } +} + +impl LogicalNode for PyRecursiveQuery { + fn inputs(&self) -> Vec { + vec![ + PyLogicalPlan::from((*self.query.static_term).clone()), + PyLogicalPlan::from((*self.query.recursive_term).clone()), + ] + } + + fn to_variant<'py>(&self, py: Python<'py>) -> PyResult> { + self.clone().into_bound_py_any(py) + } +} diff --git a/src/expr/statement.rs b/src/expr/statement.rs new file mode 100644 index 000000000..83774cda1 --- /dev/null +++ b/src/expr/statement.rs @@ -0,0 +1,454 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +use datafusion::logical_expr::{ + Deallocate, Execute, Prepare, SetVariable, TransactionAccessMode, TransactionConclusion, + TransactionEnd, TransactionIsolationLevel, TransactionStart, +}; +use pyo3::{prelude::*, IntoPyObjectExt}; + +use crate::{common::data_type::PyDataType, sql::logical::PyLogicalPlan}; + +use super::{logical_node::LogicalNode, PyExpr}; + +#[pyclass(name = "TransactionStart", module = "datafusion.expr", subclass)] +#[derive(Clone)] +pub struct PyTransactionStart { + transaction_start: TransactionStart, +} + +impl From for PyTransactionStart { + fn from(transaction_start: TransactionStart) -> PyTransactionStart { + PyTransactionStart { transaction_start } + } +} + +impl TryFrom for TransactionStart { + type Error = PyErr; + + fn try_from(py: PyTransactionStart) -> Result { + Ok(py.transaction_start) + } +} + +impl LogicalNode for PyTransactionStart { + fn inputs(&self) -> Vec { + vec![] + } + + fn to_variant<'py>(&self, py: Python<'py>) -> PyResult> { + self.clone().into_bound_py_any(py) + } +} + +#[derive(Debug, Clone, PartialEq, Eq, Hash, PartialOrd, Ord)] +#[pyclass(eq, eq_int, name = "TransactionAccessMode", module = "datafusion.expr")] +pub enum PyTransactionAccessMode { + ReadOnly, + ReadWrite, +} + +impl From for PyTransactionAccessMode { + fn from(access_mode: TransactionAccessMode) -> PyTransactionAccessMode { + match access_mode { + TransactionAccessMode::ReadOnly => PyTransactionAccessMode::ReadOnly, + TransactionAccessMode::ReadWrite => PyTransactionAccessMode::ReadWrite, + } + } +} + +impl TryFrom for TransactionAccessMode { + type Error = PyErr; + + fn try_from(py: PyTransactionAccessMode) -> Result { + match py { + PyTransactionAccessMode::ReadOnly => Ok(TransactionAccessMode::ReadOnly), + PyTransactionAccessMode::ReadWrite => Ok(TransactionAccessMode::ReadWrite), + } + } +} + +#[derive(Debug, Clone, PartialEq, Eq, Hash, PartialOrd, Ord)] +#[pyclass( + eq, + eq_int, + name = "TransactionIsolationLevel", + module = "datafusion.expr" +)] +pub enum PyTransactionIsolationLevel { + ReadUncommitted, + ReadCommitted, + RepeatableRead, + Serializable, + Snapshot, +} + +impl From for PyTransactionIsolationLevel { + fn from(isolation_level: TransactionIsolationLevel) -> PyTransactionIsolationLevel { + match isolation_level { + TransactionIsolationLevel::ReadUncommitted => { + PyTransactionIsolationLevel::ReadUncommitted + } + TransactionIsolationLevel::ReadCommitted => PyTransactionIsolationLevel::ReadCommitted, + TransactionIsolationLevel::RepeatableRead => { + PyTransactionIsolationLevel::RepeatableRead + } + TransactionIsolationLevel::Serializable => PyTransactionIsolationLevel::Serializable, + TransactionIsolationLevel::Snapshot => PyTransactionIsolationLevel::Snapshot, + } + } +} + +impl TryFrom for TransactionIsolationLevel { + type Error = PyErr; + + fn try_from(value: PyTransactionIsolationLevel) -> Result { + match value { + PyTransactionIsolationLevel::ReadUncommitted => { + Ok(TransactionIsolationLevel::ReadUncommitted) + } + PyTransactionIsolationLevel::ReadCommitted => { + Ok(TransactionIsolationLevel::ReadCommitted) + } + PyTransactionIsolationLevel::RepeatableRead => { + Ok(TransactionIsolationLevel::RepeatableRead) + } + PyTransactionIsolationLevel::Serializable => { + Ok(TransactionIsolationLevel::Serializable) + } + PyTransactionIsolationLevel::Snapshot => Ok(TransactionIsolationLevel::Snapshot), + } + } +} + +#[pymethods] +impl PyTransactionStart { + #[new] + pub fn new( + access_mode: PyTransactionAccessMode, + isolation_level: PyTransactionIsolationLevel, + ) -> PyResult { + let access_mode = access_mode.try_into()?; + let isolation_level = isolation_level.try_into()?; + Ok(PyTransactionStart { + transaction_start: TransactionStart { + access_mode, + isolation_level, + }, + }) + } + + pub fn access_mode(&self) -> PyResult { + Ok(self.transaction_start.access_mode.clone().into()) + } + + pub fn isolation_level(&self) -> PyResult { + Ok(self.transaction_start.isolation_level.clone().into()) + } +} + +#[pyclass(name = "TransactionEnd", module = "datafusion.expr", subclass)] +#[derive(Clone)] +pub struct PyTransactionEnd { + transaction_end: TransactionEnd, +} + +impl From for PyTransactionEnd { + fn from(transaction_end: TransactionEnd) -> PyTransactionEnd { + PyTransactionEnd { transaction_end } + } +} + +impl TryFrom for TransactionEnd { + type Error = PyErr; + + fn try_from(py: PyTransactionEnd) -> Result { + Ok(py.transaction_end) + } +} + +impl LogicalNode for PyTransactionEnd { + fn inputs(&self) -> Vec { + vec![] + } + + fn to_variant<'py>(&self, py: Python<'py>) -> PyResult> { + self.clone().into_bound_py_any(py) + } +} + +#[derive(Debug, Clone, PartialEq, Eq, Hash, PartialOrd, Ord)] +#[pyclass(eq, eq_int, name = "TransactionConclusion", module = "datafusion.expr")] +pub enum PyTransactionConclusion { + Commit, + Rollback, +} + +impl From for PyTransactionConclusion { + fn from(value: TransactionConclusion) -> Self { + match value { + TransactionConclusion::Commit => PyTransactionConclusion::Commit, + TransactionConclusion::Rollback => PyTransactionConclusion::Rollback, + } + } +} + +impl TryFrom for TransactionConclusion { + type Error = PyErr; + + fn try_from(value: PyTransactionConclusion) -> Result { + match value { + PyTransactionConclusion::Commit => Ok(TransactionConclusion::Commit), + PyTransactionConclusion::Rollback => Ok(TransactionConclusion::Rollback), + } + } +} +#[pymethods] +impl PyTransactionEnd { + #[new] + pub fn new(conclusion: PyTransactionConclusion, chain: bool) -> PyResult { + let conclusion = conclusion.try_into()?; + Ok(PyTransactionEnd { + transaction_end: TransactionEnd { conclusion, chain }, + }) + } + + pub fn conclusion(&self) -> PyResult { + Ok(self.transaction_end.conclusion.clone().into()) + } + + pub fn chain(&self) -> bool { + self.transaction_end.chain + } +} + +#[pyclass(name = "SetVariable", module = "datafusion.expr", subclass)] +#[derive(Clone)] +pub struct PySetVariable { + set_variable: SetVariable, +} + +impl From for PySetVariable { + fn from(set_variable: SetVariable) -> PySetVariable { + PySetVariable { set_variable } + } +} + +impl TryFrom for SetVariable { + type Error = PyErr; + + fn try_from(py: PySetVariable) -> Result { + Ok(py.set_variable) + } +} + +impl LogicalNode for PySetVariable { + fn inputs(&self) -> Vec { + vec![] + } + + fn to_variant<'py>(&self, py: Python<'py>) -> PyResult> { + self.clone().into_bound_py_any(py) + } +} + +#[pymethods] +impl PySetVariable { + #[new] + pub fn new(variable: String, value: String) -> Self { + PySetVariable { + set_variable: SetVariable { variable, value }, + } + } + + pub fn variable(&self) -> String { + self.set_variable.variable.clone() + } + + pub fn value(&self) -> String { + self.set_variable.value.clone() + } +} + +#[pyclass(name = "Prepare", module = "datafusion.expr", subclass)] +#[derive(Clone)] +pub struct PyPrepare { + prepare: Prepare, +} + +impl From for PyPrepare { + fn from(prepare: Prepare) -> PyPrepare { + PyPrepare { prepare } + } +} + +impl TryFrom for Prepare { + type Error = PyErr; + + fn try_from(py: PyPrepare) -> Result { + Ok(py.prepare) + } +} + +impl LogicalNode for PyPrepare { + fn inputs(&self) -> Vec { + vec![PyLogicalPlan::from((*self.prepare.input).clone())] + } + + fn to_variant<'py>(&self, py: Python<'py>) -> PyResult> { + self.clone().into_bound_py_any(py) + } +} + +#[pymethods] +impl PyPrepare { + #[new] + pub fn new(name: String, data_types: Vec, input: PyLogicalPlan) -> Self { + let input = input.plan().clone(); + let data_types = data_types + .into_iter() + .map(|data_type| data_type.into()) + .collect(); + PyPrepare { + prepare: Prepare { + name, + data_types, + input, + }, + } + } + + pub fn name(&self) -> String { + self.prepare.name.clone() + } + + pub fn data_types(&self) -> Vec { + self.prepare + .data_types + .clone() + .into_iter() + .map(|t| t.into()) + .collect() + } + + pub fn input(&self) -> PyLogicalPlan { + PyLogicalPlan { + plan: self.prepare.input.clone(), + } + } +} + +#[pyclass(name = "Execute", module = "datafusion.expr", subclass)] +#[derive(Clone)] +pub struct PyExecute { + execute: Execute, +} + +impl From for PyExecute { + fn from(execute: Execute) -> PyExecute { + PyExecute { execute } + } +} + +impl TryFrom for Execute { + type Error = PyErr; + + fn try_from(py: PyExecute) -> Result { + Ok(py.execute) + } +} + +impl LogicalNode for PyExecute { + fn inputs(&self) -> Vec { + vec![] + } + + fn to_variant<'py>(&self, py: Python<'py>) -> PyResult> { + self.clone().into_bound_py_any(py) + } +} + +#[pymethods] +impl PyExecute { + #[new] + pub fn new(name: String, parameters: Vec) -> Self { + let parameters = parameters + .into_iter() + .map(|parameter| parameter.into()) + .collect(); + PyExecute { + execute: Execute { name, parameters }, + } + } + + pub fn name(&self) -> String { + self.execute.name.clone() + } + + pub fn parameters(&self) -> Vec { + self.execute + .parameters + .clone() + .into_iter() + .map(|t| t.into()) + .collect() + } +} + +#[pyclass(name = "Deallocate", module = "datafusion.expr", subclass)] +#[derive(Clone)] +pub struct PyDeallocate { + deallocate: Deallocate, +} + +impl From for PyDeallocate { + fn from(deallocate: Deallocate) -> PyDeallocate { + PyDeallocate { deallocate } + } +} + +impl TryFrom for Deallocate { + type Error = PyErr; + + fn try_from(py: PyDeallocate) -> Result { + Ok(py.deallocate) + } +} + +impl LogicalNode for PyDeallocate { + fn inputs(&self) -> Vec { + vec![] + } + + fn to_variant<'py>(&self, py: Python<'py>) -> PyResult> { + self.clone().into_bound_py_any(py) + } +} + +#[pymethods] +impl PyDeallocate { + #[new] + pub fn new(name: String) -> Self { + PyDeallocate { + deallocate: Deallocate { name }, + } + } + + pub fn name(&self) -> String { + self.deallocate.name.clone() + } +} diff --git a/src/expr/values.rs b/src/expr/values.rs new file mode 100644 index 000000000..fb2692230 --- /dev/null +++ b/src/expr/values.rs @@ -0,0 +1,86 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +use std::sync::Arc; + +use datafusion::logical_expr::Values; +use pyo3::{prelude::*, IntoPyObjectExt}; +use pyo3::{pyclass, PyErr, PyResult, Python}; + +use crate::{common::df_schema::PyDFSchema, sql::logical::PyLogicalPlan}; + +use super::{logical_node::LogicalNode, PyExpr}; + +#[pyclass(name = "Values", module = "datafusion.expr", subclass)] +#[derive(Clone)] +pub struct PyValues { + values: Values, +} + +impl From for PyValues { + fn from(values: Values) -> PyValues { + PyValues { values } + } +} + +impl TryFrom for Values { + type Error = PyErr; + + fn try_from(py: PyValues) -> Result { + Ok(py.values) + } +} + +impl LogicalNode for PyValues { + fn inputs(&self) -> Vec { + vec![] + } + + fn to_variant<'py>(&self, py: Python<'py>) -> PyResult> { + self.clone().into_bound_py_any(py) + } +} + +#[pymethods] +impl PyValues { + #[new] + pub fn new(schema: PyDFSchema, values: Vec>) -> PyResult { + let values = values + .into_iter() + .map(|row| row.into_iter().map(|expr| expr.into()).collect()) + .collect(); + Ok(PyValues { + values: Values { + schema: Arc::new(schema.into()), + values, + }, + }) + } + + pub fn schema(&self) -> PyResult { + Ok((*self.values.schema).clone().into()) + } + + pub fn values(&self) -> Vec> { + self.values + .values + .clone() + .into_iter() + .map(|row| row.into_iter().map(|expr| expr.into()).collect()) + .collect() + } +} diff --git a/src/sql/logical.rs b/src/sql/logical.rs index 96561c434..198d68bdc 100644 --- a/src/sql/logical.rs +++ b/src/sql/logical.rs @@ -17,10 +17,25 @@ use std::sync::Arc; +use crate::context::PySessionContext; use crate::errors::PyDataFusionResult; use crate::expr::aggregate::PyAggregate; use crate::expr::analyze::PyAnalyze; +use crate::expr::copy_to::PyCopyTo; +use crate::expr::create_catalog::PyCreateCatalog; +use crate::expr::create_catalog_schema::PyCreateCatalogSchema; +use crate::expr::create_external_table::PyCreateExternalTable; +use crate::expr::create_function::PyCreateFunction; +use crate::expr::create_index::PyCreateIndex; +use crate::expr::create_memory_table::PyCreateMemoryTable; +use crate::expr::create_view::PyCreateView; +use crate::expr::describe_table::PyDescribeTable; use crate::expr::distinct::PyDistinct; +use crate::expr::dml::PyDmlStatement; +use crate::expr::drop_catalog_schema::PyDropCatalogSchema; +use crate::expr::drop_function::PyDropFunction; +use crate::expr::drop_table::PyDropTable; +use crate::expr::drop_view::PyDropView; use crate::expr::empty_relation::PyEmptyRelation; use crate::expr::explain::PyExplain; use crate::expr::extension::PyExtension; @@ -28,14 +43,20 @@ use crate::expr::filter::PyFilter; use crate::expr::join::PyJoin; use crate::expr::limit::PyLimit; use crate::expr::projection::PyProjection; +use crate::expr::recursive_query::PyRecursiveQuery; +use crate::expr::repartition::PyRepartition; use crate::expr::sort::PySort; +use crate::expr::statement::{ + PyDeallocate, PyExecute, PyPrepare, PySetVariable, PyTransactionEnd, PyTransactionStart, +}; use crate::expr::subquery::PySubquery; use crate::expr::subquery_alias::PySubqueryAlias; use crate::expr::table_scan::PyTableScan; +use crate::expr::union::PyUnion; use crate::expr::unnest::PyUnnest; +use crate::expr::values::PyValues; use crate::expr::window::PyWindowExpr; -use crate::{context::PySessionContext, errors::py_unsupported_variant_err}; -use datafusion::logical_expr::LogicalPlan; +use datafusion::logical_expr::{DdlStatement, LogicalPlan, Statement}; use datafusion_proto::logical_plan::{AsLogicalPlan, DefaultLogicalExtensionCodec}; use prost::Message; use pyo3::{exceptions::PyRuntimeError, prelude::*, types::PyBytes}; @@ -82,18 +103,54 @@ impl PyLogicalPlan { LogicalPlan::SubqueryAlias(plan) => PySubqueryAlias::from(plan.clone()).to_variant(py), LogicalPlan::Unnest(plan) => PyUnnest::from(plan.clone()).to_variant(py), LogicalPlan::Window(plan) => PyWindowExpr::from(plan.clone()).to_variant(py), - LogicalPlan::Repartition(_) - | LogicalPlan::Union(_) - | LogicalPlan::Statement(_) - | LogicalPlan::Values(_) - | LogicalPlan::Dml(_) - | LogicalPlan::Ddl(_) - | LogicalPlan::Copy(_) - | LogicalPlan::DescribeTable(_) - | LogicalPlan::RecursiveQuery(_) => Err(py_unsupported_variant_err(format!( - "Conversion of variant not implemented: {:?}", - self.plan - ))), + LogicalPlan::Repartition(plan) => PyRepartition::from(plan.clone()).to_variant(py), + LogicalPlan::Union(plan) => PyUnion::from(plan.clone()).to_variant(py), + LogicalPlan::Statement(plan) => match plan { + Statement::TransactionStart(plan) => { + PyTransactionStart::from(plan.clone()).to_variant(py) + } + Statement::TransactionEnd(plan) => { + PyTransactionEnd::from(plan.clone()).to_variant(py) + } + Statement::SetVariable(plan) => PySetVariable::from(plan.clone()).to_variant(py), + Statement::Prepare(plan) => PyPrepare::from(plan.clone()).to_variant(py), + Statement::Execute(plan) => PyExecute::from(plan.clone()).to_variant(py), + Statement::Deallocate(plan) => PyDeallocate::from(plan.clone()).to_variant(py), + }, + LogicalPlan::Values(plan) => PyValues::from(plan.clone()).to_variant(py), + LogicalPlan::Dml(plan) => PyDmlStatement::from(plan.clone()).to_variant(py), + LogicalPlan::Ddl(plan) => match plan { + DdlStatement::CreateExternalTable(plan) => { + PyCreateExternalTable::from(plan.clone()).to_variant(py) + } + DdlStatement::CreateMemoryTable(plan) => { + PyCreateMemoryTable::from(plan.clone()).to_variant(py) + } + DdlStatement::CreateView(plan) => PyCreateView::from(plan.clone()).to_variant(py), + DdlStatement::CreateCatalogSchema(plan) => { + PyCreateCatalogSchema::from(plan.clone()).to_variant(py) + } + DdlStatement::CreateCatalog(plan) => { + PyCreateCatalog::from(plan.clone()).to_variant(py) + } + DdlStatement::CreateIndex(plan) => PyCreateIndex::from(plan.clone()).to_variant(py), + DdlStatement::DropTable(plan) => PyDropTable::from(plan.clone()).to_variant(py), + DdlStatement::DropView(plan) => PyDropView::from(plan.clone()).to_variant(py), + DdlStatement::DropCatalogSchema(plan) => { + PyDropCatalogSchema::from(plan.clone()).to_variant(py) + } + DdlStatement::CreateFunction(plan) => { + PyCreateFunction::from(plan.clone()).to_variant(py) + } + DdlStatement::DropFunction(plan) => { + PyDropFunction::from(plan.clone()).to_variant(py) + } + }, + LogicalPlan::Copy(plan) => PyCopyTo::from(plan.clone()).to_variant(py), + LogicalPlan::DescribeTable(plan) => PyDescribeTable::from(plan.clone()).to_variant(py), + LogicalPlan::RecursiveQuery(plan) => { + PyRecursiveQuery::from(plan.clone()).to_variant(py) + } } }