10000 feat: rewrite subquery into dependent join logical plan by duongcongtoai · Pull Request #16016 · apache/datafusion · GitHub
[go: up one dir, main page]

Skip to content

feat: rewrite subquery into dependent join logical plan #16016

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Open
wants to merge 56 commits into
base: main
Choose a base branch
from

Conversation

duongcongtoai
Copy link
Contributor
@duongcongtoai duongcongtoai commented May 10, 2025

Which issue does this PR close?

This PR is a part of a long story for a general purpose subquery decorrelation, after some discussion in #5492, this PR proposes adding the followings:

  • an optimizor that convert all the subqueries into dependent join logical plan (this is only a temporary plan) <---- This is what this PR trying to achieve
  • in the same optimizor, decorrelate the dependent join

To avoid breaking existing tests and smoother collaboration, the changes should happen in the following sequence

1. Merge item 1 without integrate the new optimizor to the main flow (behavior is tested in-code instead of sqllogictests)
2. Start implement more rewriting rules for different query plan (aggregate, projection, filter ...)
Note that we temporarily keep the old optimizors while integrating a new optimizor

            Arc::new(DecorrelatePredicateSubquery::new()),
            Arc::new(ScalarSubqueryToJoin::new()),
            Arc::new(GeneralSubqueryDecorrelation::new()), <-----------This is newly added

This is a safe action, because existing sqllogictests will be correctly rewritten using the old optimizors, while more complex query plans are supported by the new one (on complex query plan the old optimizors will just back-off)
3. Implement DelimGetRemove optimizor similar to DuckDB
4. Deprecate the old optimizors

Rationale for this change

Are these changes tested?

Are there any user-facing changes?

@github-actions github-actions bot added logical-expr Logical plan and expressions optimizer Optimizer rules sqllogictest SQL Logic Tests (.slt) labels May 10, 2025
@duongcongtoai duongcongtoai changed the title refactor: framework for subquery unnesting [WIP] refactor: framework for subquery decorrelation May 10, 2025
@alamb
Copy link
Contributor
alamb commented May 12, 2025

FYI @irenjj

@xudong963 xudong963 self-requested a review May 14, 2025 14:49
pub struct DependentJoinRewriter {
// each logical plan traversal will assign it a integer id
current_id: usize,
subquery_depth: usize,
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Not very clear about the purpose of depth in dependent join? In DuckDB, depth is bound together with correlated_columns. It's the depth of the subquery block, for example:

select e1.c1                     <------ depth 0  Binder0(metadata) <------+
from employees e1                                                          |
where e1.c1 > (                                                            |
    select avg(f2)               <------ depth 1  Binder1(metadata)        |
    from item i2                                                           |
    where i2.f2 = (                                                        |
        select avg(d3)           <------ depth 2  Binder2(metadata)        |
        from salary s3                                                     |
        where s3.d3 = e1.c3 -- c3.depth=0 ---------------------------------+
    )
);

In DuckDB, metadata(schema) is kept in Binder, and is split from LogicalPlan, so, it need depth to track the metadata(schema), but DataFusion has schema in LogicalPlan.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

If we want the depth to be recognized at column level, we can easily put a new field inside this structure and keep them inside the LogicalPlan::DependentJoin

#[derive(Debug, Hash, PartialEq, PartialOrd, Eq, Clone)]
struct ColumnAccess {
    // node ids from root to the node that is referencing the column
    stack: Vec<usize>,
    // the node referencing the column
    node_id: usize,
    col: Column,
    data_type: DataType,
}

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I believe the main purpose of recording depth is to be able to find the corresponding schema through depth during the physical planning phase. It's difficult to achieve accurate recording in the optimizer because the schema is provided to the LogicalPlan through the PlannerContext in the logical planning phase.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

looks like i need to do more research in the detail here, my assumption on how this decorrelation work is that, all decorrelation logic happen within logical planning and should not leak anything to the physical planning phase

};

if is_dependent_join_node {
self.subquery_depth += 1
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

And I think there's an issue with the way depth is calculated. The correct logic should be to traverse query blocks from top to bottom, and if a subquery query block is found, then depth +1, and maintain depth + 1, continue recursively traversing the subquery query block.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

and if a subquery query block is found

is_dependent_join_node is set to true if one of its expr is a subquery node, and the depth is only incremented when such node is met (i put the diagram on top of the struct doc for better visualization)

regarding the depth correctness, i also added a test case for demonstration how the temporary query plan look like using rewrite_dependent_join_two_nested_subqueries function

Given the query

Filter: outer_table.a > Int32(1) AND (<subquery>) = outer_table.a
  Subquery:
    Aggregate: groupBy=[[]], aggr=[[count(inner_table_lv1.a)]]
      Filter: inner_table_lv1.c = outer_ref(outer_table.c) AND (<subquery>) = Int32(1)
        Subquery:
          Aggregate: groupBy=[[]], aggr=[[count(inner_table_lv2.a)]]
            Filter: inner_table_lv2.a = outer_ref(outer_table.a) AND inner_table_lv2.b = outer_ref(inner_table_lv1.b)
              TableScan: inner_table_lv2
        TableScan: inner_table_lv1
  TableScan: outer_table

it shall be rewritten into dependentJoin node as

Projection: outer_table.a, outer_table.b, outer_table.c [a:UInt32, b:UInt32, c:UInt32]
  Filter: outer_table.a > Int32(1) AND __scalar_sq_2.output = outer_table.a [a:UInt32, b:UInt32, c:UInt32, output:Int64]
    DependentJoin on [outer_table.a, outer_table.c] with expr (<subquery>) depth 1 [a:UInt32, b:UInt32, c:UInt32, output:Int64]
      TableScan: outer_table [a:UInt32, b:UInt32, c:UInt32]
      Aggregate: groupBy=[[]], aggr=[[count(inner_table_lv1.a)]] [count(inner_table_lv1.a):Int64]
        Projection: inner_table_lv1.a, inner_table_lv1.b, inner_table_lv1.c [a:UInt32, b:UInt32, c:UInt32]
          Filter: inner_table_lv1.c = outer_ref(outer_table.c) AND __scalar_sq_1.output = Int32(1) [a:UInt32, b:UInt32, c:UInt32, output:Int64]
            DependentJoin on [inner_table_lv1.b] with expr (<subquery>) depth 2 [a:UInt32, b:UInt32, c:UInt32, output:Int64]
              TableScan: inner_table_lv1 [a:UInt32, b:UInt32, c:UInt32]
              Aggregate: groupBy=[[]], aggr=[[count(inner_table_lv2.a)]] [count(inner_table_lv2.a):Int64]
                Filter: inner_table_lv2.a = outer_ref(outer_table.a) AND inner_table_lv2.b = outer_ref(inner_table_lv1.b) [a:UInt32, b:UInt32, c:UInt32]
                  TableScan: inner_table_lv2 [a:UInt32, b:UInt32, c:UInt32]

@irenjj
Copy link
Contributor
irenjj commented May 25, 2025

Thanks @duongcongtoai , I'll review the other code later, but regarding the depth issue, I don't think it's likely to be handled in the optimizer. I'll organize some questions and maybe we can get @alamb @jayzhan211 @xudong963 's input on this.

@irenjj
Copy link
Contributor
irenjj commented May 25, 2025

The difference between DataFusion and DuckDB in constructing logical plans is: DataFusion directly assigns schema to LogicalPlan, while DuckDB saves metadata information in the Binder. In DuckDB, an independent Binder is constructed for each query block layer. During plan construction, if correlated_columns are found and the required column information for correlated_columns cannot be found in the current layer's Binder, it will search in the upper layer Binder. If found, the Binder depth information is recorded in correlated_columns. Since Binder and LogicalPlan are separated, even if the planner stage unnests correlated subqueries (subquery->join, the depth information of subqueries changes), the corresponding columns can still be retrieved through depth during subsequent physical planning because the Binder is separated.
DataFusion's handling of this is to put schema information into the logical plan during logical plan construction, and only supports logical plan construction for correlated subqueries of adjacent levels:

pub(super) fn parse_scalar_subquery(
        &self,
        subquery: Query,
        input_schema: &DFSchema,
        planner_context: &mut PlannerContext,
    ) -> Result<Expr> {
        let old_outer_query_schema =
        planner_context.set_outer_query_schema(Some(input_schema.clone().into()));
        let sub_plan = self.query_to_plan(subquery, planner_context)?;
        let outer_ref_columns = sub_plan.all_out_ref_exprs();
        planner_context.set_outer_query_schema(old_outer_query_schema);
         ...

As you can see, it only takes input_schema, which is the schema of the previous query block, to construct the plan. I first tried to address this issue by merging/layering input_schema each time entering parse_scalar_subquery, and enabling normal recognition and completion of logical plan construction in query_to_plan. However, when it came to physical planning, the problem appeared again. Since the logical plan doesn't contain the schema required by outer_ref_col, it will still report errors.
But why can adjacent-level correlated subqueries run? Adjacent-level correlated subqueries also construct outer_ref_col during construction, and similarly don't have corresponding schema in subqueries. This is because after experiencing optimizer decorrelation, correlated subqueries are converted to joins. When constructing physical plans, filters are constructed based on filter_schema = left_schema + right_schema, and left_schema happens to be the upper level of adjacent subqueries. So the ability to run normally is just a coincidence. DataFusion doesn't yet have the capability to read schema across query blocks.

@irenjj
Copy link
Contributor
irenjj commented May 25, 2025

I think a possible implementation approach is to construct the PlannerContext at the outer layer, and then carry the PlannerContext information into the optimizer/physical_planner.

let mut planner_context = PlannerContext::new();
let plan = create_plan(ctx, statement, &mut planner_context).await?;
let adjusted = adjusted.with_plan(&plan);

let df = ctx.execute_logical_plan(plan).await?;
let df = ctx
    .execute_logical_plan(plan, Some(planner_context))
    .await?;
let physical_plan = df.create_physical_plan().await?;

It's unclear what breaking changes this would bring.

@logan-keede
Copy link
Contributor

regarding the depth issue, I don't think it's likely to be handled in the optimizer.

I agree with @irenjj , it seems like correlated subqueries with depth>1 does not reach optimizer as they report Schema Error: No field named xyz.col (#15558) in planning phase.
I think the first step would be to recognise subquery's correlated columns and make dependent join in planning phase itself or somehow store relevant information in logical plan to make dependent join in optimizer like #16174.

@duongcongtoai
Copy link
Contributor Author
duongcongtoai commented May 25, 2025

it seems like correlated subqueries with depth>1 does not reach optimizer as they report Schema Error: No field named xyz.col

Actually this error is thrown after all optimizors are executed, the error is thrown because no existing optimizers are capable of handle nested subqueries.

Now whether or not the depth is recorded in the optimizor or in the planning phase, we eventually have to convert the plan into DependentJoin, am i right?

If in the future we have all the information we need in the planning phase, we can remove this "depth detection" logic inside this optimizor and let it does what it does best (mainly decorrelation).

@irenjj
Copy link
Contributor
irenjj commented May 25, 2025

Actually this error is thrown after all optimizors are executed, the error is thrown because no existing optimizers are capable of handle nested subqueries.

It's thrown in planning phase, the error is thrown because in planning phase, planner can only get the schema info from upper query block.

Now whether or not the depth is recorded in the optimizor or in the planning phase, we eventually have to convert the plan into DependentJoin, am i right?

Yes.

If in the future we have all the information we need in the planning phase, we can remove this "depth detection" logic inside this optimizor and let it does what it does best (mainly decorrelation).

In fact, the optimizer doesn't need to be concerned with depth. When entering the optimizer, the subquery should already have the relevant column information, whether it's retrieved from a certain layer of Binder through depth (DuckDB style) or through other methods.

@duongcongtoai
Copy link
Contributor Author
duongcongtoai commented May 25, 2025

It's thrown in planning phase, the error is thrown because in planning phase, planner can only get the schema info from upper query block.

Ahah, confirmed, given this query

SELECT e1.employee_name, e1.salary
FROM employees e1
WHERE e1.salary > (
    SELECT AVG(e2.salary)
    FROM employees e2 
    WHERE e2.dept_id = e1.dept_id
    AND e2.salary > (
        SELECT AVG(e3.salary)
        FROM employees e3
        WHERE e3.dept_id = e1.dept_id
    )
);

The optimizor is actually invoked, but with the plan of EmptyRelation for some reason we better do something in the planning!

@logan-keede
Copy link
Contributor

The optimizor is actually invoked, but with the plan of EmptyRelation for some reason, we better do something in the planning!

How did you confirm that?
I tried by putting a debug statement here-

pub fn optimize(&self, plan: &LogicalPlan) -> datafusion_common::Result<LogicalPlan> {

It was not executed.

@duongcongtoai
Copy link
Contributor Author

How did you confirm that?
I tried by putting a debug statement here-

my bad, the EmptyRelation is actually invoked for the queries to create table 😞

CREATE TABLE project_assignments (
    project_id INTEGER,
    employee_id INTEGER,
    priority INTEGER
);

Indeed the planning fails before reaching the optimizor for the select query

@duongcongtoai
Copy link
Contributor Author
duongcongtoai commented May 25, 2025

regarding of providing the schema from outer to the deep down subquery, can we do something like this:

pub struct PlannerContext {
    /// Data types for numbered parameters ($1, $2, etc), if supplied
    /// in `PREPARE` statement
    prepare_param_data_types: Arc<Vec<DataType>>,
    /// Map of CTE name to logical plan of the WITH clause.
    /// Use `Arc<LogicalPlan>` to allow cheap cloning
    ctes: HashMap<String, Arc<LogicalPlan>>,
    /// The query schema of the outer query plan, used to resolve the columns in subquery
    outer_query_schema: Option<DFSchemaRef>,
    outer_query_schemas: Vec<DFSchemarRef>, <------------ newly added
}

    pub(super) fn parse_scalar_subquery(
        &self,
        subquery: Query,
        input_schema: &DFSchema,
        planner_context: &mut PlannerContext,
    ) -> Result<Expr> {
        planner_context.append_outer_query_schema(Some(input_schema.clone().into())); <----- insert "the latest outer schema" to the stack of schemas
    ...
        let sub_plan = self.query_to_plan(subquery, planner_context)?;
        let outer_ref_columns = sub_plan.all_out_ref_exprs();
        planner_context.pop_outer_query_schema(); <-------- if we are done with planning the subquery, remove the schema from the stack

// all the columns provided by the LHS being referenced
// in the RHS (and its children nested subqueries, if any) (note that not all outer_refs from the RHS are mentioned in this vectors
// because RHS may reference columns provided somewhere from the above join)
pub correlated_columns: Vec<Column>,
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think data_type is needed as well in correlated_columns.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

resolved, i added both depth and data_type

@duongcongtoai
Copy link
Contributor Author

https://github.com/apache/datafusion/pull/16186/files
I added this dummy fix to support multiple level outer ref columns, it is enough for us to continue with this story.

More sophisticated subquery planning can be implemented later, and maybe in parallel

Copy link
Contributor
@irenjj irenjj left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Thanks @duongcongtoai , Here are some of my review comments. There are a few things I haven't fully figured out yet and will need some more time.

@@ -485,6 +485,7 @@ impl<'a, 'b> PgJsonVisitor<'a, 'b> {

object
}
LogicalPlan::DependentJoin(..) => todo!(),
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Maybe we can add an empty json!() here.

Comment on lines 406 to 407
// TODO: apply expr on the subquery
LogicalPlan::DependentJoin(..) => Ok(TreeNodeRecursion::Continue),
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Ditto

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

not sure what you mean, but i also updated the implementation here

Comment on lines 317 to 330
LogicalPlan::Projection(proj) => {
for expr in &proj.expr {
if contains_subquery(expr) {
is_dependent_join_node = true;
break;
}
expr.apply(|expr| {
if let Expr::OuterReferenceColumn(data_type, col) = expr {
self.mark_outer_column_access(new_id, data_type, col);
}
Ok(TreeNodeRecursion::Continue)
})?;
}
}
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

It seems that break may prevent mark_outer_column_access.🤔

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

right, i didn't test this logic, so didn't catch this issue 👍

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

done, problem addressed, i also handle f_up for the case of projecion, (also added a test case)


f.predicate
.apply(|expr| {
if let Expr::OuterReferenceColumn(data_type, col) = expr {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

If it's a dependent join, the depth should be incremented by 1, right?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

the depth is not incremented until this match block finishes, and function predicate.apply will not traverse down to the expr under a Subquery expr, for example:
Given a predicate: where col_level1=1 and col_level2=(select count(*) from inner where inner_col=col_level1)
This function only visits:

  • col_level1=1, col_level1, 1
  • col_level2=ScalarSubqueryExpr, col_level2, ScalarSubqueryExpr

It will not visit the expr inner_col=col_level1. This happen when f_down function visits the children of this node (at that time depth has already been incremented)

Comment on lines +331 to +338
LogicalPlan::Subquery(subquery) => {
let parent = self.stack.last().unwrap();
let parent_node = self.nodes.get_mut(parent).unwrap();
// the inserting sequence matter here
// when a parent has multiple children subquery at the same time
// we rely on the order in which subquery children are visited
// to later on find back the corresponding subquery (if some part of them
// were rewritten in the lower node)
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think more considerations are needed for the subquery case, such as OuterRefColumn, depth, etc. How does DuckDB handle this?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

we don't do much for LogicalPlan::Subquery here, because it contains little information of the OuterRefColumn or depth (notice how i create an empty entry for this node for the parent dependent_join_node

                parent_node
                    .columns_accesses_by_subquery_id
                    .insert(new_id, vec![]);

The same entry will be updated with OuterRefColumn and depth should any such expr is found during the downard traversal (f_down). I think the diagram mentioned here describe how the traversal works

                  ↓1   
                  ↑12   
             ┌────────────┐
             │  FILTER    │<--- DependentJoin rewrite
             │   (1)      │     happens here
             └─────┬────┬─┘     Here we already have enough information
             |     |    |       of which node is accessing which column
             |     |    |       provided by "Table Scan t1" node        
             │     |    |       (for example node (6) below )           
             │     |    |       
             │     |    |       
             │     |    |
       ↓2────┘     ↓6   └────↓10
       ↑5          ↑11         ↑11
   ┌───▼───┐    ┌──▼───┐   ┌───▼───────┐
   │SUBQ1  │    │SUBQ2 │   │TABLE SCAN │
   └──┬────┘    └──┬───┘   │    t1     │
      |            |       └───────────┘
      |            |
      |            |
      |            ↓7
      |            ↑10
      |         ┌──▼───────┐
      |         │Filter    │----> mark_outer_column_access(outer_ref)
      |         │outer_ref |
      |         │ (6)      |
      |         └──┬───────┘  
      |            |
     ↓3           ↓8
     ↑4           ↑9
  ┌──▼────┐    ┌──▼────┐
  │SCAN t2│    │SCAN t2│
  └───────┘    └───────┘

Copy link
Contributor Author
@duongcongtoai duongcongtoai May 29, 2025

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I can relate this section to a codeblock in duckdb: https://github.com/duckdb/duckdb/blob/95d71ea4ec0fb3a151b18943fb532f7b5131dc95/src/planner/binder/query_node/plan_subquery.cpp#L294

unique_ptr<Expression> Binder::PlanSubquery(BoundSubqueryExpression &expr, unique_ptr<LogicalOperator> &root) {
	...
	auto plan = std::move(subquery_root);
	unique_ptr<Expression> result_expression;
	if (!expr.IsCorrelated()) {
		result_expression = PlanUncorrelatedSubquery(*this, expr, root, std::move(plan));
	} else {
		result_expression = PlanCorrelatedSubquery(*this, expr, root, std::move(plan));
	}
	// finally, we recursively plan the nested subqueries (if there are any)
	if (sub_binder->has_unplanned_dependent_joins) {
		RecursiveDependentJoinPlanner plan(*this);
		plan.VisitOperator(*root);
	}

Let's use the SUBQ2 on the above diagram as demonstration:
for duckdb, plan=std::move(subquery_root) is equivalent to node (6), and root is equivalent to node (1) (I think the usage of root is quite misleading, it should be named "subquery_parent" instead, because it is used to reference the direct parent node of subquery instead of the "root" of the whole query plan.

what PlanCorrelatedSubquery does is convert node (6) into DelimJoin given all the information of correlated_columns.

static unique_ptr<Expression> PlanCorrelatedSubquery(Binder &binder, BoundSubqueryExpression &expr,
                                                     unique_ptr<LogicalOperator> &root,
                                                     unique_ptr<LogicalOperator> plan) {
	auto &correlated_columns = expr.binder->correlated_columns;
	// FIXME: there should be a way of disabling decorrelation for ANY queries as well, but not for now...
	bool perform_delim =
	    expr.subquery_type == SubqueryType::ANY ? true : PerformDuplicateElimination(binder, correlated_columns);

however, in our context, because we does not have such information provided by the planner, we have to perform an initial traversal to get them.

After we are done with the traversal, we have a tree which looks exactly like what duckdb has, and the recursion can imitate the same logic

@github-actions github-actions bot added the sqllogictest SQL Logic Tests (.slt) label May 28, 2025
@github-actions github-actions bot removed the sqllogictest SQL Logic Tests (.slt) label May 28, 2025
// because RHS may reference columns provided somewhere from the above join.
// Depths of each correlated_columns should always be gte current dependent join
// subquery_depth
pub correlated_columns: Vec<(usize, Expr)>,
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Since the decorrelation phase involves column matching, this structure might be better as it could reduce the use of if let Expr::Column(col) =:

struct CorrelatedColumnInfo
{
    depth: usize,
    type: DataType,
    column: Column,
    ...
}

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
core Core DataFusion crate logical-expr Logical plan and expressions optimizer Optimizer rules proto Related to proto crate sql SQL Planner substrait Changes to the substrait crate
Projects
None yet
Development

Successfully merging this pull request may close these issues.

4 participants
0