8000 feat: rewrite subquery into dependent join logical plan by duongcongtoai · Pull Request #16016 · apache/datafusion · GitHub
[go: up one dir, main page]

Skip to content

feat: rewrite subquery into dependent join logical plan #16016

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Open
wants to merge 82 commits into
base: main
Choose a base branch
from
Open
Changes from 1 commit
Commits
Show all changes
82 commits
Select commit Hold shift + click to select a range
4ba36c0
chore: add test
duongcongtoai Feb 3, 2025
79eaca3
chore: more progress
duongcongtoai Feb 10, 2025
7ed0831
temp
duongcongtoai Mar 18, 2025
cc97879
Merge remote-tracking branch 'origin/main' into 14554-unnest-subquery…
duongcongtoai Mar 18, 2025
5096937
Merge remote-tracking branch 'origin/main' into 14554-unnest-subquery…
duongcongtoai Apr 10, 2025
68fd9ca
chore: some work
duongcongtoai Apr 16, 2025
ace332e
chore: some work on indexed algebra
duongcongtoai Apr 27, 2025
da8980c
chore: more progress
duongcongtoai May 4, 2025
483e3ac
chore: impl projection pull up
duongcongtoai May 4, 2025
f14b145
chore: complete unnesting simple subquery
duongcongtoai May 6, 2025
0cd8143
chore: correct join condition
duongcongtoai May 8, 2025
cc3e01c
chore: handle exist query
duongcongtoai May 8, 2025
9b5daa2
test: in sq test
duongcongtoai May 10, 2025
f26baf8
test: exist with no dependent column
duongcongtoai May 10, 2025
37852c1
test: exist with dependent columns
duongcongtoai May 10, 2025
2544478
Merge remote-tracking branch 'origin/main' into 14554-subquery-unnest…
duongcongtoai May 10, 2025
e984a55
chore: remove redundant clone
duongcongtoai May 11, 2025
94aba08
feat: dummy implementation for aggregation
duongcongtoai May 13, 2025
0f039fe
feat: handle count bug
duongcongtoai May 15, 2025
898bdc4
feat: add sq alias step
duongcongtoai May 16, 2025
1a600b6
test: simple count decorrelate
duongcongtoai May 16, 2025
6ce21b3
chore: some work to support multiple subqueries per level
duongcongtoai May 17, 2025
67923d4
feat: support multiple subqueries decorrelation untested
duongcongtoai May 19, 2025
64538cc
feat: correct node rewriting rule
duongcongtoai May 19, 2025
957403f
fix: subquery alias
duongcongtoai May 19, 2025
a465459
fix: adjust test case expectation
duongcongtoai May 19, 2025
479ae64
feat: convert sq to dependent joins
duongcongtoai May 24, 2025
2171e52
feat: impl dependent join rewriter
duongcongtoai May 24, 2025
9d26437
chore: clean up unused function
duongcongtoai May 24, 2025
24d1223
chore: clean up debug slt
duongcongtoai May 24, 2025
3533cd1
chore: simple logical plan type for dependent join
duongcongtoai May 24, 2025
e1002f8
fix: recursive dependent join rewrite
duongcongtoai May 24, 2025
7ba92f1
Merge remote-tracking branch 'origin/main' into 14554-subquery-unnest…
duongcongtoai May 24, 2025
e3c77d6
chore: some more note on further implementation
duongcongtoai May 24, 2025
1ae0926
chore: lint
duongcongtoai May 24, 2025
d15c2aa
chore: clippy
duongcongtoai May 24, 2025
e5baf2c
fix: test
duongcongtoai May 25, 2025
11dbb80
doc: draw diagram
duongcongtoai May 25, 2025
5856213
fix: proto
duongcongtoai May 25, 2025
a3f11a8
chore: revert unrelated change
duongcongtoai May 25, 2025
e2d9d14
chore: lint
duongcongtoai May 25, 2025
b298426
fix: subtrait
duongcongtoai May 25, 2025
cb1a757
fix: subtrait again
duongcongtoai May 25, 2025
baef066
fix: fail test
duongcongtoai May 25, 2025
a07b3b0
chore: clippy
duongcongtoai May 25, 2025
32db3a9
chore: add depth and data_type to correlated columns
duongcongtoai May 26, 2025
50d26f3
chore: rm snapshot
duongcongtoai May 26, 2025
b09e370
Merge branch 'main' into 14554-subquery-unnest-framework
duongcongtoai May 26, 2025
28dc7a4
feat: support alias and join
duongcongtoai May 26, 2025
cf830cb
feat: add lateral join fields to dependent join
duongcongtoai May 26, 2025
95994da
feat: rewrite lateral join
duongcongtoai May 27, 2025
9745a4f
feat: rewrite projection
duongcongtoai May 28, 2025
c2bf4d3
refactor: split rewrite logic
duongcongtoai May 28, 2025
c083501
feat: impl other api of logical plan for dependent join
duongcongtoai May 28, 2025
9512ccc
chore: rm debug file
duongcongtoai May 28, 2025
98d1c27
fix: not expose subquery expr for dependentjoin
duongcongtoai May 29, 2025
10f9aeb
chore: add data type to correlated column
duongcongtoai Jun 7, 2025
92bb175
fix: not expose subquery expr for dependentjoin
duongcongtoai May 29, 2025
29eff4b
spilt into rewrite_dependent_join & decorrelate_dependent_join
irenjj Jun 6, 2025
f4e332e
fix: cherry-pick conflict
duongcongtoai Jun 7, 2025
2a324bd
chore: move left over commit from feature branch
duongcongtoai Jun 7, 2025
f0c9f0b
chore: minor import format
duongcongtoai Jun 7, 2025
5e67945
Merge remote-tracking branch 'origin/main' into 14554-subquery-unnest…
duongcongtoai Jun 7, 2025
e964d6e
chore: clippy
duongcongtoai Jun 7, 2025
309511c
Merge remote-tracking branch 'origin/main' into 14554-subquery-unnest…
duongcongtoai Jun 7, 2025
2eb723e
fix: err msg
duongcongtoai Jun 7, 2025
b8a8de8
test: some more test cases
duongcongtoai Jun 7, 2025
a3d0b65
refactor: shared rewrite function
duongcongtoai Jun 7, 2025
8e858b4
refactor: remove all unwrap
duongcongtoai Jun 7, 2025
30300d1
fix: test expectation
duongcongtoai Jun 7, 2025
a93f901
fix subquery in join filter
irenjj Jun 8, 2025
4aed14f
rename
irenjj Jun 8, 2025
6f2ce78
add todo
irenjj Jun 8, 2025
7534a49
Merge pull request #9 from irenjj/subquery_in_join_filter
duongcongtoai Jun 8, 2025
c330c24
Merge branch 'main' into 14554-subquery-unnest-framework
duongcongtoai Jun 8, 2025
5be430a
chore: more constraint on correlated subquery in join filter
duongcongtoai Jun 8, 2025
7dc3dd9
Merge pull request #10 from duongcongtoai/dependent-join-multiple-sub…
duongcongtoai Jun 9, 2025
dc656f0
chore: try fix snapshot
duongcongtoai Jun 9, 2025
612ae33
Merge branch 'main' into 14554-subquery-unnest-framework
duongcongtoai Jun 9, 2025
f4c4ec0
chore: use normal assert
duongcongtoai Jun 10, 2025
f1f4529
Merge branch 'main' into 14554-subquery-unnest-framework
duongcongtoai Jun 10, 2025
0f5278f
fix: correct err assert
duongcongtoai Jun 10, 2025
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Prev Previous commit
Next Next commit
test: exist with dependent columns
  • Loading branch information
duongcongtoai committed May 10, 2025
commit 37852c1d556eb5893eebe007b2c6ba9c3118e03d
222 changes: 49 additions & 173 deletions datafusion/optimizer/src/decorrelate_general.rs
Original file line number Diff line number Diff line change
Expand Up @@ -30,6 +30,7 @@ use crate::utils::has_all_column_refs;
use crate::{ApplyOrder, OptimizerConfig, OptimizerRule};

use arrow::compute::kernels::cmp::eq;
use datafusion_common::alias::AliasGenerator;
use datafusion_common::tree_node::{
Transformed, TransformedResult, TreeNode, TreeNodeContainer, TreeNodeRecursion,
TreeNodeRewriter, TreeNodeVisitor,
Expand Down Expand Up @@ -60,6 +61,7 @@ pub struct DependentJoinTracker {
stack: Vec<usize>,
// track for each column, the nodes/logical plan that reference to its within the tree
accessed_columns: IndexMap<Column, Vec<ColumnAccess>>,
alias_generator: Arc<AliasGenerator>,
}

#[derive(Debug, Hash, PartialEq, PartialOrd, Eq, Clone)]
Expand Down Expand Up @@ -308,11 +310,7 @@ fn try_transform_subquery_to_join_expr(
..
}) => {
if inner_sq.clone() == *sq {
let mark_predicate = if *negated {
expr_fn::not(col("mark"))
} else {
col("mark")
};
let mark_predicate = if *negated { !col("mark") } else { col("mark") };
post_join_predicate = Some(mark_predicate);
return Ok(true);
}
Expand Down Expand Up @@ -997,10 +995,11 @@ impl DependentJoinTracker {
}
}

impl Default for DependentJoinTracker {
fn default() -> Self {
impl DependentJoinTracker {
fn new(alias_generator: Arc<AliasGenerator>) -> Self {
return DependentJoinTracker {
root: None,
alias_generator,
current_id: 0,
nodes: IndexMap::new(),
stack: vec![],
Expand Down Expand Up @@ -1179,7 +1178,7 @@ impl OptimizerRule for DependentJoinTracker {
mod tests {
use std::sync::Arc;

use datafusion_common::{DFSchema, Result};
use datafusion_common::{alias::AliasGenerator, DFSchema, Result};
use datafusion_expr::{
exists,
expr_fn::{self, col, not},
Expand All @@ -1197,6 +1196,45 @@ mod tests {
datatypes::{DataType as ArrowDataType, Field, Fields, Schema},
};
#[test]
fn simple_decorrelate_with_exist_subquery_with_dependent_columns() -> Result<()> {
let outer_table = test_table_scan_with_name("outer_table")?;
let inner_table_lv1 = test_table_scan_with_name("inner_table_lv1")?;
let sq_level1 = Arc::new(
LogicalPlanBuilder::from(inner_table_lv1)
.filter(
col("inner_table_lv1.a")
.eq(out_ref_col(ArrowDataType::UInt32, "outer_table.a"))
.and(
out_ref_col(ArrowDataType::UInt32, "outer_table.a")
.gt(col("inner_table_lv1.c")),
)
.and(col("inner_table_lv1.b").eq(lit(1)))
.and(
out_ref_col(ArrowDataType::UInt32, "outer_table.b")
.eq(col("inner_table_lv1.b")),
),
)?
.project(vec![out_ref_col(ArrowDataType::UInt32, "outer_table.b")
.alias("outer_b_alias")])?
.build()?,
);

let input1 = LogicalPlanBuilder::from(outer_table.clone())
.filter(col("outer_table.a").gt(lit(1)).and(exists(sq_level1)))?
.build()?;
let mut index = DependentJoinTracker::new(Arc::new(AliasGenerator::new()));
index.build(&input1)?;
let new_plan = index.root_dependent_join_elimination()?;
let expected = "\
Filter: outer_table.a > Int32(1) AND inner_table_lv1.mark\
\n LeftMark Join: Filter: inner_table_lv1.a = outer_table.a AND outer_table.a > inner_table_lv1.c AND outer_table.b = inner_table_lv1.b\
\n TableScan: outer_table\
\n Filter: inner_table_lv1.b = Int32(1)\
\n TableScan: inner_table_lv1";
assert_eq!(expected, format!("{new_plan}"));
Ok(())
}
#[test]
fn simple_decorrelate_with_exist_subquery_no_dependent_column() -> Result<()> {
let outer_table = test_table_scan_with_name("outer_table")?;
let inner_table_lv1 = test_table_scan_with_name("inner_table_lv1")?;
Expand All @@ -1210,7 +1248,7 @@ mod tests {
let input1 = LogicalPlanBuilder::from(outer_table.clone())
.filter(col("outer_table.a").gt(lit(1)).and(exists(sq_level1)))?
.build()?;
let mut index = DependentJoinTracker::default();
let mut index = DependentJoinTracker::new(Arc::new(AliasGenerator::new()));
index.build(&input1)?;
let new_plan = index.root_dependent_join_elimination()?;
let expected = "\
Expand Down Expand Up @@ -1241,7 +1279,7 @@ mod tests {
.and(in_subquery(col("outer_table.c"), sq_level1)),
)?
.build()?;
let mut index = DependentJoinTracker::default();
let mut index = DependentJoinTracker::new(Arc::new(AliasGenerator::new()));
index.build(&input1)?;
let new_plan = index.root_dependent_join_elimination()?;
let expected = "\
Expand Down Expand Up @@ -1285,7 +1323,7 @@ mod tests {
.and(in_subquery(col("outer_table.c"), sq_level1)),
)?
.build()?;
let mut index = DependentJoinTracker::default();
let mut index = DependentJoinTracker::new(Arc::new(AliasGenerator::new()));
index.build(&input1)?;
let new_plan = index.root_dependent_join_elimination()?;
let expected = "\
Expand All @@ -1297,166 +1335,4 @@ mod tests {
assert_eq!(expected, format!("{new_plan}"));
Ok(())
}
#[test]
fn play_unnest_simple_predicate_pull_up() -> Result<()> {
// let mut framework = GeneralDecorrelation::default();

let outer_table = test_table_scan_with_name("outer_table")?;
let inner_table_lv1 = test_table_scan_with_name("inner_table_lv1")?;
// let inner_table_lv2 = test_table_scan_with_name("inner_table_lv2")?;
// let sq_level2 = Arc::new(
// LogicalPlanBuilder::from(inner_table_lv2)
// .filter(
// out_ref_col(ArrowDataType::UInt32, "inner_table_lv1.b")
// .eq(col("inner_table_lv2.b"))
// .and(
// out_ref_col(ArrowDataType::UInt32, "outer_table.c")
// .eq(col("inner_table_lv2.c")),
// ),
// )?
// .aggregate(Vec::<Expr>::new(), vec![count(col("inner_table_lv2.a"))])?
// .build()?,
// );
let sq_level1 = Arc::new(
LogicalPlanBuilder::from(inner_table_lv1)
.filter(
col("inner_table_lv1.a")
.eq(out_ref_col(ArrowDataType::UInt32, "outer_table.a"))
.and(
out_ref_col(ArrowDataType::UInt32, "outer_table.a")
.eq(lit(1)),
),
)?
.aggregate(Vec::<Expr>::new(), vec![sum(col("inner_table_lv1.b"))])?
.project(vec![sum(col("inner_table_lv1.b"))])?
.build()?,
);

let input1 = LogicalPlanBuilder::from(outer_table.clone())
.filter(
col("outer_table.a")
.gt(lit(1))
.and(col("outer_table.b").gt(scalar_subquery(sq_level1))),
)?
.build()?;
let mut index = DependentJoinTracker::default();
index.build(&input1)?;
let new_plan = index.root_dependent_join_elimination()?;
println!("{}", new_plan);

// let input2 = LogicalPlanBuilder::from(input.clone())
// .filter(col("int_col").gt(lit(1)))?
// .project(vec![col("string_col")])?
// .build()?;

// let mut b = GeneralDecorrelation::default();
// b.build_algebra_index(input2)?;

Ok(())
}
#[test]
fn play_unnest() -> Result<()> {
// let mut framework = GeneralDecorrelation::default();

let outer_table = test_table_scan_with_name("outer_table")?;
let inner_table_lv1 = test_table_scan_with_name("inner_table_lv1")?;
// let inner_table_lv2 = test_table_scan_with_name("inner_table_lv2")?;
// let sq_level2 = Arc::new(
// LogicalPlanBuilder::from(inner_table_lv2)
// .filter(
// out_ref_col(ArrowDataType::UInt32, "inner_table_lv1.b")
// .eq(col("inner_table_lv2.b"))
// .and(
// out_ref_col(ArrowDataType::UInt32, "outer_table.c")
// .eq(col("inner_table_lv2.c")),
// ),
// )?
// .aggregate(Vec::<Expr>::new(), vec![count(col("inner_table_lv2.a"))])?
// .build()?,
// );
let sq_level1 = Arc::new(
LogicalPlanBuilder::from(inner_table_lv1)
.filter(
col("inner_table_lv1.a")
.eq(out_ref_col(ArrowDataType::UInt32, "outer_table.a")),
)?
.aggregate(Vec::<Expr>::new(), vec![sum(col("inner_table_lv1.b"))])?
.project(vec![sum(col("inner_table_lv1.b"))])?
.build()?,
);

let input1 = LogicalPlanBuilder::from(outer_table.clone())
.filter(
col("outer_table.a")
.gt(lit(1))
.and(col("outer_table.b").gt(scalar_subquery(sq_level1))),
)?
.build()?;
let mut index = DependentJoinTracker::default();
index.build(&input1)?;
let new_plan = index.root_dependent_join_elimination()?;
println!("{}", new_plan);

// let input2 = LogicalPlanBuilder::from(input.clone())
// .filter(col("int_col").gt(lit(1)))?
// .project(vec![col("string_col")])?
// .build()?;

// let mut b = GeneralDecorrelation::default();
// b.build_algebra_index(input2)?;

Ok(())
}

// #[test]
// fn todo() -> Result<()> {
// let mut framework = GeneralDecorrelation::default();

// let outer_table = test_table_scan_with_name("outer_table")?;
// let inner_table_lv1 = test_table_scan_with_name("inner_table_lv1")?;
// let inner_table_lv2 = test_table_scan_with_name("inner_table_lv2")?;
// let sq_level2 = Arc::new(
// LogicalPlanBuilder::from(inner_table_lv2)
// .filter(
// out_ref_col(ArrowDataType::UInt32, "inner_table_lv1.b")
// .eq(col("inner_table_lv2.b"))
// .and(
// out_ref_col(ArrowDataType::UInt32, "outer_table.c")
// .eq(col("inner_table_lv2.c")),
// ),
// )?
// .aggregate(Vec::<Expr>::new(), vec![count(col("inner_table_lv2.a"))])?
// .build()?,
// );
// let sq_level1 = Arc::new(
// LogicalPlanBuilder::from(inner_table_lv1)
// .filter(
// col("inner_table_lv1.a")
// .eq(out_ref_col(ArrowDataType::UInt32, "outer_table.a"))
// .and(scalar_subquery(sq_level2).gt(lit(5))),
// )?
// .aggregate(Vec::<Expr>::new(), vec![sum(col("inner_table_lv1.b"))])?
// .project(vec![sum(col("inner_table_lv1.b"))])?
// .build()?,
// );

// let input1 = LogicalPlanBuilder::from(outer_table.clone())
// .filter(
// col("outer_table.a")
// .gt(lit(1))
// .and(col("outer_table.b").gt(scalar_subquery(sq_level1))),
// )?
// .build()?;
// framework.build(&input1)?;

// // let input2 = LogicalPlanBuilder::from(input.clone())
// // .filter(col("int_col").gt(lit(1)))?
// // .project(vec![col("string_col")])?
// // .build()?;

// // let mut b = GeneralDecorrelation::default();
// // b.build_algebra_index(input2)?;

// Ok(())
// }
}
0