10000 Add Python wrapper for LogicalPlan::Limit (#193) · andygrove/datafusion-python@b8ef9bf · GitHub
[go: up one dir, main page]

Skip to content

Commit b8ef9bf

Browse files
authored
Add Python wrapper for LogicalPlan::Limit (apache#193)
1 parent f934dc1 commit b8ef9bf

File tree

4 files changed

+101
-31
lines changed

4 files changed

+101
-31
lines changed

datafusion/tests/test_imports.py

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -35,6 +35,7 @@
3535
Expr,
3636
Projection,
3737
TableScan,
38+
Limit,
3839
Aggregate,
3940
Sort,
4041
)
@@ -57,7 +58,7 @@ def test_class_module_is_datafusion():
5758
]:
5859
assert klass.__module__ == "datafusion"
5960

60-
for klass in [Expr, Projection, TableScan, Aggregate, Sort]:
61+
for klass in [Expr, Projection, TableScan, Aggregate, Sort, Limit]:
6162
assert klass.__module__ == "datafusion.expr"
6263

6364
for klass in [DFField, DFSchema]:

src/expr.rs

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -25,6 +25,7 @@ use datafusion_expr::{col, lit, Cast, Expr, GetIndexedField};
2525
use datafusion::scalar::ScalarValue;
2626

2727
pub mod aggregate;
28+
pub mod limit;
2829
pub mod logical_node;
2930
pub mod projection;
3031
pub mod sort;
@@ -145,6 +146,7 @@ pub(crate) fn init_module(m: &PyModule) -> PyResult<()> {
145146
m.add_class::<PyExpr>()?;
146147
m.add_class::<table_scan::PyTableScan>()?;
147148
m.add_class::<projection::PyProjection>()?;
149+
m.add_class::<limit::PyLimit>()?;
148150
m.add_class::<aggregate::PyAggregate>()?;
149151
m.add_class::<sort::PySort>()?;
150152
Ok(())

src/expr/limit.rs

Lines changed: 88 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,88 @@
1+
// Licensed to the Apache Software Foundation (ASF) under one
2+
// or more contributor license agreements. See the NOTICE file
3+
// distributed with this work for additional information
4+
// regarding copyright ownership. The ASF licenses this file
5+
// to you under the Apache License, Version 2.0 (the
6+
// "License"); you may not use this file except in compliance
7+
// with the License. You may obtain a copy of the License at
8+
//
9+
// http://www.apache.org/licenses/LICENSE-2.0
10+
//
11+
// Unless required by applicable law or agreed to in writing,
12+
// software distributed under the License is distributed on an
13+
// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
14+
// KIND, either express or implied. See the License for the
15+
// specific language governing permissions and limitations
16+
// under the License.
17+
18+
use datafusion_expr::logical_plan::Limit;
19+
use pyo3::prelude::*;
20+
use std::fmt::{self, Display, Formatter};
21+
22+
use crate::common::df_schema::PyDFSchema;
23+
use crate::expr::logical_node::LogicalNode;
24+
use crate::sql::logical::PyLogicalPlan;
25+
26+
#[pyclass(name = "Limit", module = "datafusion.expr", subclass)]
27+
#[derive(Clone)]
28+
pub struct PyLimit {
29+
limit: Limit,
30+
}
31+
32+
impl From<Limit> for PyLimit {
33+
fn from(limit: Limit) -> PyLimit {
34+
PyLimit { limit }
35+
}
36+
}
37+
38+
impl From<PyLimit> for Limit {
39+
fn from(limit: PyLimit) -> Self {
40+
limit.limit
41+
}
42+
}
43+
44+
impl Display for PyLimit {
45+
fn fmt(&self, f: &mut Formatter) -> fmt::Result {
46+
write!(
47+
f,
48+
"Limit
49+
\nSkip: {}
50+
\nFetch: {:?}
51+
\nInput: {:?}",
52+
&self.limit.skip, &self.limit.fetch, &self.limit.input
53+
)
54+
}
55+
}
56+
57+
#[pymethods]
58+
impl PyLimit {
59+
/// Retrieves the skip value for this `Limit`
60+
fn skip(&self) -> usize {
61+
self.limit.skip
62+
}
63+
64+
/// Retrieves the fetch value for this `Limit`
65+
fn fetch(&self) -> Option<usize> {
66+
self.limit.fetch
67+
}
68+
69+
/// Retrieves the input `LogicalPlan` to this `Limit` node
70+
fn input(&self) -> PyLogicalPlan {
71+
PyLogicalPlan::from((*self.limit.input).clone())
72+
}
73+
74+
/// Resulting Schema for this `Limit` node instance
75+
fn schema(&self) -> PyResult<PyDFSchema> {
76+
Ok(self.limit.input.schema().as_ref().clone().into())
77+
}
78+
79+
fn __repr__(&self) -> PyResult<String> {
80+
Ok(format!("Limit({})", self))
81+
}
82+
}
83+
84+
impl LogicalNode for PyLimit {
85+
fn input(&self) -> Vec<PyLogicalPlan> {
86+
vec![PyLogicalPlan::from((*self.limit.input).clone())]
87+
}
88+
}

src/expr/projection.rs

Lines changed: 9 additions & 30 deletions
Original file line numberDiff line numberDiff line change
@@ -15,13 +15,11 @@
1515
// specific language governing permissions and limitations
1616
// under the License.
1717

18-
use datafusion_common::DataFusionError;
1918
use datafusion_expr::logical_plan::Projection;
2019
use pyo3::prelude::*;
2120
use std::fmt::{self, Display, Formatter};
2221

2322
use crate::common::df_schema::PyDFSchema;
24-
use crate::errors::py_runtime_err;
2523
use crate::expr::logical_node::LogicalNode;
2624
use crate::expr::PyExpr;
2725
use crate::sql::logical::PyLogicalPlan;
@@ -38,15 +36,9 @@ impl From<Projection> for PyProjection {
3836
}
3937
}
4038

41-
impl TryFrom<PyProjection> for Projection {
42-
type Error = DataFusionError;
43-
44-
fn try_from(py_proj: PyProjection) -> Result<Self, Self::Error> {
45-
Projection::try_new_with_schema(
46-
py_proj.projection.expr,
47-
py_proj.projection.input.clone(),
48-
py_proj.projection.schema,
49-
)
39+
impl From<PyProjection> for Projection {
40+
fn from(proj: PyProjection) -> Self {
41+
proj.projection
5042
}
5143
}
5244

@@ -66,8 +58,7 @@ impl Display for PyProjection {
6658
#[pymethods]
6759
impl PyProjection {
6860
/// Retrieves the expressions for this `Projection`
69-
#[pyo3(name = "projections")]
70-
fn py_projections(&self) -> PyResult<Vec<PyExpr>> {
61+
fn projections(&self) -> PyResult<Vec<PyExpr>> {
7162
Ok(self
7263
.projection
7364
.expr
@@ -76,25 +67,13 @@ impl PyProjection {
7667
.collect())
7768
}
7869

79-
// Retrieves the input `LogicalPlan` to this `Projection` node
80-
#[pyo3(name = "input")]
81-
fn py_input(&self) -> PyResult<PyLogicalPlan> {
82-
// DataFusion make a loose guarantee that each Projection should have an input, however
83-
// we check for that hear since we are performing explicit index retrieval
84-
let inputs = LogicalNode::input(self);
85-
if !inputs.is_empty() {
86-
return Ok(inputs[0].clone());
87-
}
88-
89-
Err(py_runtime_err(format!(
90-
"Expected `input` field for Projection node: {}",
91-
self
92-
)))
70+
/// Retrieves the input `LogicalPlan` to this `Projection` node
71+
fn input(&self) -> PyLogicalPlan {
72+
PyLogicalPlan::from((*self.projection.input).clone())
9373
}
9474

95-
// Resulting Schema for this `Projection` node instance
96-
#[pyo3(name = "schema")]
97-
fn py_schema(&self) -> PyResult<PyDFSchema> {
75+
/// Resulting Schema for this `Projection` node instance
76+
fn schema(&self) -> PyResult<PyDFSchema> {
9877
Ok((*self.projection.schema).clone().into())
9978
}
10079

0 commit comments

Comments
 (0)
0