8000 Add Python wrapper for LogicalPlan::Union (#240) · chenqin/arrow-datafusion-python@4004fbe · GitHub
[go: up one dir, main page]

Skip to content

Commit 4004fbe

Browse files
authored
Add Python wrapper for LogicalPlan::Union (apache#240)
* add union * Make clippy happier
1 parent d1f6567 commit 4004fbe

File tree

5 files changed

+91
-1
lines changed

5 files changed

+91
-1
lines changed

datafusion/tests/test_imports.py

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -44,6 +44,7 @@
4444
Aggregate,
4545
Sort,
4646
Analyze,
47+
Union,
4748
Like,
4849
ILike,
4950
SimilarTo,
@@ -105,6 +106,7 @@ def test_class_module_is_datafusion():
105106
Limit,
106107
Filter,
107108
Analyze,
109+
Union,
108110
Like,
109111
ILike,
110112
SimilarTo,

src/context.rs

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -605,7 +605,7 @@ impl PySessionContext {
605605
let plan = plan.plan.clone();
606606
let fut: JoinHandle<datafusion_common::Result<SendableRecordBatchStream>> =
607607
rt.spawn(async move { plan.execute(part, ctx) });
608-
let stream = wait_for_future(py, fut).map_err(|e| py_datafusion_err(e))?;
608+
let stream = wait_for_future(py, fut).map_err(py_datafusion_err)?;
609609
Ok(PyRecordBatchStream::new(stream?))
610610
}
611611
}

src/expr.rs

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -67,6 +67,7 @@ pub mod signature;
6767
pub mod sort;
6868
pub mod subquery;
6969
pub mod table_scan;
70+
pub mod union;
7071

7172
/// A PyExpr that can be used on a DataFrame
7273
#[pyclass(name = "Expr", module = "datafusion.expr", subclass)]
@@ -266,5 +267,6 @@ pub(crate) fn init_module(m: &PyModule) -> PyResult<()> {
266267
m.add_class::<sort::PySort>()?;
267268
m.add_class::<analyze::PyAnalyze>()?;
268269
m.add_class::<empty_relation::PyEmptyRelation>()?;
270+
m.add_class::<union::PyUnion>()?;
269271
Ok(())
270272
}

src/expr/signature.rs

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -19,6 +19,7 @@ use datafusion_expr::{TypeSignature, Volatility};
1919
use pyo3::prelude::*;
2020

2121
#[pyclass(name = "Signature", module = "datafusion.expr", subclass)]
22+
#[allow(dead_code)]
2223
#[derive(Clone)]
2324
pub struct PySignature {
2425
type_signature: TypeSignature,

src/expr/union.rs

Lines changed: 85 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,85 @@
1+
// Licensed to the Apache Software Foundation (ASF) under one
2+
// or more contributor license agreements. See the NOTICE file
3+
// distributed with this work for additional information
4+
// regarding copyright ownership. The ASF licenses this file
5+
// to you under the Apache License, Version 2.0 (the
6+
// "License"); you may not use this file except in compliance
7+
// with the License. You may obtain a copy of the License at
8+
//
9+
// http://www.apache.org/licenses/LICENSE-2.0
10+
//
11+
// Unless required by applicable law or agreed to in writing,
12+
// software distributed under the License is distributed on an
13+
// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
14+
// KIND, either express or implied. See the License for the
15+
// specific language governing permissions and limitations
16+
// under the License.
17+
18+
use datafusion_expr::logical_plan::Union;
19+
use pyo3::prelude::*;
20+
use std::fmt::{self, Display, Formatter};
21+
22+
use crate::common::df_schema::PyDFSchema;
23+
use crate::expr::logical_node::LogicalNode;
24+
use crate::sql::logical::PyLogicalPlan;
25+
26+
#[pyclass(name = "Union", module = "datafusion.expr", subclass)]
27+
#[derive(Clone)]
28+
pub struct PyUnion {
29+
union_: Union,
30+
}
31+
32+
impl From<Union> for PyUnion {
33+
fn from(union_: Union) -> PyUnion {
34+
PyUnion { union_ }
35+
}
36+
}
37+
38+
impl From<PyUnion> for Union {
39+
fn from(union_: PyUnion) -> Self {
40+
union_.union_
41+
}
42+
}
43+
44+
impl Display for PyUnion {
45+
fn fmt(&self, f: &mut Formatter) -> fmt::Result {
46+
write!(
47+
f,
48+
"Union
49+
\nInputs: {:?}
50+
\nSchema: {:?}",
51+
&self.union_.inputs, &self.union_.schema,
52+
)
53+
}
54+
}
55+
56+
#[pymethods]
57+
impl PyUnion {
58+
/// Retrieves the input `LogicalPlan` to this `Union` node
59+
fn input(&self) -> PyResult<Vec<PyLogicalPlan>> {
60+
Ok(Self::inputs(self))
61+
}
62+
63+
/// Resulting Schema for this `Union` node instance
64+
fn schema(&self) -> PyResult<PyDFSchema> {
65+
Ok(self.union_.schema.as_ref().clone().into())
66+
}
67+
68+
fn __repr__(&self) -> PyResult<String> {
69+
Ok(format!("Union({})", self))
70+
}
71+
72+
fn __name__(&self) -> PyResult<String> {
73+
Ok("Union".to_string())
74+
}
75+
}
76+
77+
impl LogicalNode for PyUnion {
78+
fn inputs(&self) -> Vec<PyLogicalPlan> {
79+
self.union_
80+
.inputs
81+
.iter()
82+
.map(|x| x.as_ref().clone().into())
83+
.collect()
84+
}
85+
}

0 commit comments

Comments
 (0)
0