-
Notifications
You must be signed in to change notification settings - Fork 1.5k
feat: rewrite subquery into dependent join logical plan #16016
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
base: main
Are you sure you want to change the base?
feat: rewrite subquery into dependent join logical plan #16016
Conversation
FYI @irenjj |
pub struct DependentJoinRewriter { | ||
// each logical plan traversal will assign it a integer id | ||
current_id: usize, | ||
subquery_depth: usize, |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Not very clear about the purpose of depth in dependent join? In DuckDB, depth is bound together with correlated_columns. It's the depth of the subquery block, for example:
select e1.c1 <------ depth 0 Binder0(metadata) <------+
from employees e1 |
where e1.c1 > ( |
select avg(f2) <------ depth 1 Binder1(metadata) |
from item i2 |
where i2.f2 = ( |
select avg(d3) <------ depth 2 Binder2(metadata) |
from salary s3 |
where s3.d3 = e1.c3 -- c3.depth=0 ---------------------------------+
)
);
In DuckDB, metadata(schema) is kept in Binder, and is split from LogicalPlan, so, it need depth to track the metadata(schema), but DataFusion has schema in LogicalPlan.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
If we want the depth to be recognized at column level, we can easily put a new field inside this structure and keep them inside the LogicalPlan::DependentJoin
#[derive(Debug, Hash, PartialEq, PartialOrd, Eq, Clone)]
struct ColumnAccess {
// node ids from root to the node that is referencing the column
stack: Vec<usize>,
// the node referencing the column
node_id: usize,
col: Column,
data_type: DataType,
}
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I believe the main purpose of recording depth is to be able to find the corresponding schema through depth during the physical planning phase. It's difficult to achieve accurate recording in the optimizer because the schema is provided to the LogicalPlan
through the PlannerContext
in the logical planning phase.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
looks like i need to do more research in the detail here, my assumption on how this decorrelation work is that, all decorrelation logic happen within logical planning and should not leak anything to the physical planning phase
}; | ||
|
||
if is_dependent_join_node { | ||
self.subquery_depth += 1 |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
And I think there's an issue with the way depth is calculated. The correct logic should be to traverse query blocks from top to bottom, and if a subquery query block is found, then depth +1, and maintain depth + 1, continue recursively traversing the subquery query block.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
and if a subquery query block is found
is_dependent_join_node is set to true if one of its expr is a subquery node, and the depth is only incremented when such node is met (i put the diagram on top of the struct doc for better visualization)
regarding the depth correctness, i also added a test case for demonstration how the temporary query plan look like using rewrite_dependent_join_two_nested_subqueries
function
Given the query
Filter: outer_table.a > Int32(1) AND (<subquery>) = outer_table.a
Subquery:
Aggregate: groupBy=[[]], aggr=[[count(inner_table_lv1.a)]]
Filter: inner_table_lv1.c = outer_ref(outer_table.c) AND (<subquery>) = Int32(1)
Subquery:
Aggregate: groupBy=[[]], aggr=[[count(inner_table_lv2.a)]]
Filter: inner_table_lv2.a = outer_ref(outer_table.a) AND inner_table_lv2.b = outer_ref(inner_table_lv1.b)
TableScan: inner_table_lv2
TableScan: inner_table_lv1
TableScan: outer_table
it shall be rewritten into dependentJoin node as
Projection: outer_table.a, outer_table.b, outer_table.c [a:UInt32, b:UInt32, c:UInt32]
Filter: outer_table.a > Int32(1) AND __scalar_sq_2.output = outer_table.a [a:UInt32, b:UInt32, c:UInt32, output:Int64]
DependentJoin on [outer_table.a, outer_table.c] with expr (<subquery>) depth 1 [a:UInt32, b:UInt32, c:UInt32, output:Int64]
TableScan: outer_table [a:UInt32, b:UInt32, c:UInt32]
Aggregate: groupBy=[[]], aggr=[[count(inner_table_lv1.a)]] [count(inner_table_lv1.a):Int64]
Projection: inner_table_lv1.a, inner_table_lv1.b, inner_table_lv1.c [a:UInt32, b:UInt32, c:UInt32]
Filter: inner_table_lv1.c = outer_ref(outer_table.c) AND __scalar_sq_1.output = Int32(1) [a:UInt32, b:UInt32, c:UInt32, output:Int64]
DependentJoin on [inner_table_lv1.b] with expr (<subquery>) depth 2 [a:UInt32, b:UInt32, c:UInt32, output:Int64]
TableScan: inner_table_lv1 [a:UInt32, b:UInt32, c:UInt32]
Aggregate: groupBy=[[]], aggr=[[count(inner_table_lv2.a)]] [count(inner_table_lv2.a):Int64]
Filter: inner_table_lv2.a = outer_ref(outer_table.a) AND inner_table_lv2.b = outer_ref(inner_table_lv1.b) [a:UInt32, b:UInt32, c:UInt32]
TableScan: inner_table_lv2 [a:UInt32, b:UInt32, c:UInt32]
Thanks @duongcongtoai , I'll review the other code later, but regarding the depth issue, I don't think it's likely to be handled in the optimizer. I'll organize some questions and maybe we can get @alamb @jayzhan211 @xudong963 's input on this. |
The difference between DataFusion and DuckDB in constructing logical plans is: DataFusion directly assigns schema to pub(super) fn parse_scalar_subquery(
&self,
subquery: Query,
input_schema: &DFSchema,
planner_context: &mut PlannerContext,
) -> Result<Expr> {
let old_outer_query_schema =
planner_context.set_outer_query_schema(Some(input_schema.clone().into()));
let sub_plan = self.query_to_plan(subquery, planner_context)?;
let outer_ref_columns = sub_plan.all_out_ref_exprs();
planner_context.set_outer_query_schema(old_outer_query_schema);
... As you can see, it only takes |
I think a possible implementation approach is to construct the PlannerContext at the outer layer, and then carry the PlannerContext information into the optimizer/physical_planner. let mut planner_context = PlannerContext::new();
let plan = create_plan(ctx, statement, &mut planner_context).await?;
let adjusted = adjusted.with_plan(&plan);
let df = ctx.execute_logical_plan(plan).await?;
let df = ctx
.execute_logical_plan(plan, Some(planner_context))
.await?;
let physical_plan = df.create_physical_plan().await?; It's unclear what breaking changes this would bring. |
I agree with @irenjj , it seems like correlated subqueries with depth>1 does not reach optimizer as they report |
Actually this error is thrown after all optimizors are executed, the error is thrown because no existing optimizers are capable of handle nested subqueries. Now whether or not the depth is recorded in the optimizor or in the planning phase, we eventually have to convert the plan into DependentJoin, am i right? If in the future we have all the information we need in the planning phase, we can remove this "depth detection" logic inside this optimizor and let it does what it does best (mainly decorrelation). |
It's thrown in planning phase, the error is thrown because in planning phase, planner can only get the schema info from upper query block.
Yes.
In fact, the optimizer doesn't need to be concerned with depth. When entering the optimizer, the subquery should already have the relevant column information, whether it's retrieved from a certain layer of Binder through depth (DuckDB style) or through other methods. |
Ahah, confirmed, given this query
|
How did you confirm that?
It was not executed. |
my bad, the EmptyRelation is actually invoked for the queries to create table 😞
Indeed the planning fails before reaching the optimizor for the select query |
regarding of providing the schema from outer to the deep down subquery, can we do something like this: pub struct PlannerContext {
/// Data types for numbered parameters ($1, $2, etc), if supplied
/// in `PREPARE` statement
prepare_param_data_types: Arc<Vec<DataType>>,
/// Map of CTE name to logical plan of the WITH clause.
/// Use `Arc<LogicalPlan>` to allow cheap cloning
ctes: HashMap<String, Arc<LogicalPlan>>,
/// The query schema of the outer query plan, used to resolve the columns in subquery
outer_query_schema: Option<DFSchemaRef>,
outer_query_schemas: Vec<DFSchemarRef>, <------------ newly added
}
pub(super) fn parse_scalar_subquery(
&self,
subquery: Query,
input_schema: &DFSchema,
planner_context: &mut PlannerContext,
) -> Result<Expr> {
planner_context.append_outer_query_schema(Some(input_schema.clone().into())); <----- insert "the latest outer schema" to the stack of schemas
...
let sub_plan = self.query_to_plan(subquery, planner_context)?;
let outer_ref_columns = sub_plan.all_out_ref_exprs();
planner_context.pop_outer_query_schema(); <-------- if we are done with planning the subquery, remove the schema from the stack |
// all the columns provided by the LHS being referenced | ||
// in the RHS (and its children nested subqueries, if any) (note that not all outer_refs from the RHS are mentioned in this vectors | ||
// because RHS may reference columns provided somewhere from the above join) | ||
pub correlated_columns: Vec<Column>, |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I think data_type
is needed as well in correlated_columns
.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
resolved, i added both depth and data_type
https://github.com/apache/datafusion/pull/16186/files More sophisticated subquery planning can be implemented later, and maybe in parallel |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Thanks @duongcongtoai , Here are some of my review comments. There are a few things I haven't fully figured out yet and will need some more time.
@@ -485,6 +485,7 @@ impl<'a, 'b> PgJsonVisitor<'a, 'b> { | |||
|
|||
object | |||
} | |||
LogicalPlan::DependentJoin(..) => todo!(), |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Maybe we can add an empty json!() here.
// TODO: apply expr on the subquery | ||
LogicalPlan::DependentJoin(..) => Ok(TreeNodeRecursion::Continue), |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Ditto
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
not sure what you mean, but i also updated the implementation here
LogicalPlan::Projection(proj) => { | ||
for expr in &proj.expr { | ||
if contains_subquery(expr) { | ||
is_dependent_join_node = true; | ||
break; | ||
} | ||
expr.apply(|expr| { | ||
if let Expr::OuterReferenceColumn(data_type, col) = expr { | ||
self.mark_outer_column_access(new_id, data_type, col); | ||
} | ||
Ok(TreeNodeRecursion::Continue) | ||
})?; | ||
} | ||
} |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
It seems that break
may prevent mark_outer_column_access
.🤔
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
right, i didn't test this logic, so didn't catch this issue 👍
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
done, problem addressed, i also handle f_up for the case of projecion, (also added a test case)
|
||
f.predicate | ||
.apply(|expr| { | ||
if let Expr::OuterReferenceColumn(data_type, col) = expr { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
If it's a dependent join, the depth should be incremented by 1, right?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
the depth is not incremented until this match block finishes, and function predicate.apply will not traverse down to the expr under a Subquery expr, for example:
Given a predicate: where col_level1=1 and col_level2=(select count(*) from inner where inner_col=col_level1)
This function only visits:
col_level1=1
,col_level1
,1
col_level2=ScalarSubqueryExpr
,col_level2
,ScalarSubqueryExpr
It will not visit the expr inner_col=col_level1
. This happen when f_down
function visits the children of this node (at that time depth has already been incremented)
LogicalPlan::Subquery(subquery) => { | ||
let parent = self.stack.last().unwrap(); | ||
let parent_node = self.nodes.get_mut(parent).unwrap(); | ||
// the inserting sequence matter here | ||
// when a parent has multiple children subquery at the same time | ||
// we rely on the order in which subquery children are visited | ||
// to later on find back the corresponding subquery (if some part of them | ||
// were rewritten in the lower node) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I think more considerations are needed for the subquery case, such as OuterRefColumn
, depth
, etc. How does DuckDB handle this?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
we don't do much for LogicalPlan::Subquery
here, because it contains little information of the OuterRefColumn
or depth
(notice how i create an empty entry for this node for the parent dependent_join_node
parent_node
.columns_accesses_by_subquery_id
.insert(new_id, vec![]);
The same entry will be updated with OuterRefColumn
and depth
should any such expr is found during the downard traversal (f_down
). I think the diagram mentioned here describe how the traversal works
↓1
↑12
┌────────────┐
│ FILTER │<--- DependentJoin rewrite
│ (1) │ happens here
└─────┬────┬─┘ Here we already have enough information
| | | of which node is accessing which column
| | | provided by "Table Scan t1" node
│ | | (for example node (6) below )
│ | |
│ | |
│ | |
↓2────┘ ↓6 └────↓10
↑5 ↑11 ↑11
┌───▼───┐ ┌──▼───┐ ┌───▼───────┐
│SUBQ1 │ │SUBQ2 │ │TABLE SCAN │
└──┬────┘ └──┬───┘ │ t1 │
| | └───────────┘
| |
| |
| ↓7
| ↑10
| ┌──▼───────┐
| │Filter │----> mark_outer_column_access(outer_ref)
| │outer_ref |
| │ (6) |
| └──┬───────┘
| |
↓3 ↓8
↑4 ↑9
┌──▼────┐ ┌──▼────┐
│SCAN t2│ │SCAN t2│
└───────┘ └───────┘
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I can relate this section to a codeblock in duckdb: https://github.com/duckdb/duckdb/blob/95d71ea4ec0fb3a151b18943fb532f7b5131dc95/src/planner/binder/query_node/plan_subquery.cpp#L294
unique_ptr<Expression> Binder::PlanSubquery(BoundSubqueryExpression &expr, unique_ptr<LogicalOperator> &root) {
...
auto plan = std::move(subquery_root);
unique_ptr<Expression> result_expression;
if (!expr.IsCorrelated()) {
result_expression = PlanUncorrelatedSubquery(*this, expr, root, std::move(plan));
} else {
result_expression = PlanCorrelatedSubquery(*this, expr, root, std::move(plan));
}
// finally, we recursively plan the nested subqueries (if there are any)
if (sub_binder->has_unplanned_dependent_joins) {
RecursiveDependentJoinPlanner plan(*this);
plan.VisitOperator(*root);
}
Let's use the SUBQ2 on the above diagram as demonstration:
for duckdb, plan=std::move(subquery_root)
is equivalent to node (6), and root
is equivalent to node (1) (I think the usage of root
is quite misleading, it should be named "subquery_parent" instead, because it is used to reference the direct parent node of subquery instead of the "root" of the whole query plan.
what PlanCorrelatedSubquery
does is convert node (6) into DelimJoin given all the information of correlated_columns.
static unique_ptr<Expression> PlanCorrelatedSubquery(Binder &binder, BoundSubqueryExpression &expr,
unique_ptr<LogicalOperator> &root,
unique_ptr<LogicalOperator> plan) {
auto &correlated_columns = expr.binder->correlated_columns;
// FIXME: there should be a way of disabling decorrelation for ANY queries as well, but not for now...
bool perform_delim =
expr.subquery_type == SubqueryType::ANY ? true : PerformDuplicateElimination(binder, correlated_columns);
however, in our context, because we does not have such information provided by the planner, we have to perform an initial traversal to get them.
After we are done with the traversal, we have a tree which looks exactly like what duckdb has, and the recursion can imitate the same logic
// because RHS may reference columns provided somewhere from the above join. | ||
// Depths of each correlated_columns should always be gte current dependent join | ||
// subquery_depth | ||
pub correlated_columns: Vec<(usize, Expr)>, |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Since the decorrelation phase involves column matching, this structure might be better as it could reduce the use of if let Expr::Column(col) =
:
struct CorrelatedColumnInfo
{
depth: usize,
type: DataType,
column: Column,
...
}
Which issue does this PR close?
This PR is a part of a long story for a general purpose subquery decorrelation, after some discussion in #5492, this PR proposes adding the followings:
To avoid breaking existing tests and smoother collaboration, the changes should happen in the following sequence
1. Merge item 1 without integrate the new optimizor to the main flow (behavior is tested in-code instead of sqllogictests)
2. Start implement more rewriting rules for different query plan (aggregate, projection, filter ...)
Note that we temporarily keep the old optimizors while integrating a new optimizor
This is a safe action, because existing sqllogictests will be correctly rewritten using the old optimizors, while more complex query plans are supported by the new one (on complex query plan the old optimizors will just back-off)
3. Implement DelimGetRemove optimizor similar to DuckDB
4. Deprecate the old optimizors
Rationale for this change
Are these changes tested?
Are there any user-facing changes?