From f807ab696b5977d4271d52acd44946424ad99340 Mon Sep 17 00:00:00 2001 From: duanhao-jk Date: Fri, 26 Dec 2025 17:57:42 +0800 Subject: [PATCH 1/9] Keep the null result in the reverse connection result --- .../datafusion-ext-plans/src/joins/bhj/semi_join.rs | 11 ++++++++--- 1 file changed, 8 insertions(+), 3 deletions(-) diff --git a/native-engine/datafusion-ext-plans/src/joins/bhj/semi_join.rs b/native-engine/datafusion-ext-plans/src/joins/bhj/semi_join.rs index 26841cee9..7b66c68ff 100644 --- a/native-engine/datafusion-ext-plans/src/joins/bhj/semi_join.rs +++ b/native-engine/datafusion-ext-plans/src/joins/bhj/semi_join.rs @@ -189,11 +189,16 @@ impl Joiner for SemiJoiner

{ let mut hashes_idx = 0; for row_idx in 0..probed_batch.num_rows() { - if probed_valids + let key_valid = + probed_valids .as_ref() .map(|nb| nb.is_valid(row_idx)) - .unwrap_or(true) - { + .unwrap_or(true); + if P.mode == Anti && P.probe_is_join_side && !key_is_valid { + probed_joined.set(row_idx, true); + continue; + } + if key_valid { let map_value = map_values[hashes_idx]; hashes_idx += 1; From 76a497a7531c5f1457201d63d660912d8fea097e Mon Sep 17 00:00:00 2001 From: duanhao-jk Date: Fri, 26 Dec 2025 18:08:52 +0800 Subject: [PATCH 2/9] Keep the null result in the reverse connection result --- native-engine/datafusion-ext-plans/src/joins/bhj/semi_join.rs | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/native-engine/datafusion-ext-plans/src/joins/bhj/semi_join.rs b/native-engine/datafusion-ext-plans/src/joins/bhj/semi_join.rs index 7b66c68ff..c65577744 100644 --- a/native-engine/datafusion-ext-plans/src/joins/bhj/semi_join.rs +++ b/native-engine/datafusion-ext-plans/src/joins/bhj/semi_join.rs @@ -189,7 +189,7 @@ impl Joiner for SemiJoiner

{ let mut hashes_idx = 0; for row_idx in 0..probed_batch.num_rows() { - let key_valid = + let key_is_valid = probed_valids .as_ref() .map(|nb| nb.is_valid(row_idx)) @@ -198,7 +198,7 @@ impl Joiner for SemiJoiner

{ probed_joined.set(row_idx, true); continue; } - if key_valid { + if key_is_valid { let map_value = map_values[hashes_idx]; hashes_idx += 1; From ff8305e0ea9a402c8b2b3c75a5fd881a63dc8991 Mon Sep 17 00:00:00 2001 From: duanhao-jk Date: Mon, 29 Dec 2025 10:15:38 +0800 Subject: [PATCH 3/9] Keep the null result in the reverse connection result --- .../src/joins/bhj/semi_join.rs | 6 +--- .../datafusion-ext-plans/src/joins/test.rs | 33 +++++++++++++++++++ 2 files changed, 34 insertions(+), 5 deletions(-) diff --git a/native-engine/datafusion-ext-plans/src/joins/bhj/semi_join.rs b/native-engine/datafusion-ext-plans/src/joins/bhj/semi_join.rs index c65577744..f34da9b3b 100644 --- a/native-engine/datafusion-ext-plans/src/joins/bhj/semi_join.rs +++ b/native-engine/datafusion-ext-plans/src/joins/bhj/semi_join.rs @@ -189,11 +189,7 @@ impl Joiner for SemiJoiner

{ let mut hashes_idx = 0; for row_idx in 0..probed_batch.num_rows() { - let key_is_valid = - probed_valids - .as_ref() - .map(|nb| nb.is_valid(row_idx)) - .unwrap_or(true); + let key_is_valid = probed_valids.as_ref().map(|nb| nb.is_valid(row_idx)).unwrap_or(true); if P.mode == Anti && P.probe_is_join_side && !key_is_valid { probed_joined.set(row_idx, true); continue; diff --git a/native-engine/datafusion-ext-plans/src/joins/test.rs b/native-engine/datafusion-ext-plans/src/joins/test.rs index 9125ed53e..06b1cf881 100644 --- a/native-engine/datafusion-ext-plans/src/joins/test.rs +++ b/native-engine/datafusion-ext-plans/src/joins/test.rs @@ -600,6 +600,39 @@ mod tests { Ok(()) } + #[tokio::test(flavor = "multi_thread", worker_threads = 1)] + async fn join_anti_with_null_keys() -> Result<()> { + for test_type in ALL_TEST_TYPE { + let left = build_table_i32_nullable( + ("a1", &vec![Some(1), Some(2), None, Some(4), Some(5)]), + ("b1", &vec![Some(4), Some(5), Some(6), None, Some(8)]), + ("c1", &vec![Some(7), Some(8), Some(9), Some(10), Some(11)]), + ); + let right = build_table_i32_nullable( + ("a2", &vec![Some(10), Some(20), Some(30)]), + ("b1", &vec![Some(4), Some(5), Some(7)]), + ("c2", &vec![Some(70), Some(80), Some(90)]), + ); + let on: JoinOn = vec![( + Arc::new(Column::new_with_schema("b1", &left.schema())?), + Arc::new(Column::new_with_schema("b1", &right.schema())?), + )]; + + let (_, batches) = join_collect(test_type, left, right, on, LeftAnti).await?; + let expected = vec![ + "+----+----+----+", + "| a1 | b1 | c1 |", + "+----+----+----+", + "| 2 | 6 | 9 |", + "| 4 | | 10 |", + "| 5 | 8 | 11 |", + "+----+----+----+", + ]; + assert_batches_sorted_eq!(expected, &batches); + } + Ok(()) + } + #[tokio::test(flavor = "multi_thread", worker_threads = 1)] async fn join_with_duplicated_column_names() -> Result<()> { for test_type in ALL_TEST_TYPE { From c32b878f34d3f0ae1601e6cabcf7d5b2aea087c3 Mon Sep 17 00:00:00 2001 From: duanhao-jk Date: Mon, 29 Dec 2025 10:38:48 +0800 Subject: [PATCH 4/9] Keep the null result in the reverse connection result --- .../datafusion-ext-plans/src/joins/bhj/semi_join.rs | 5 ++++- native-engine/datafusion-ext-plans/src/joins/test.rs | 1 - 2 files changed, 4 insertions(+), 2 deletions(-) diff --git a/native-engine/datafusion-ext-plans/src/joins/bhj/semi_join.rs b/native-engine/datafusion-ext-plans/src/joins/bhj/semi_join.rs index f34da9b3b..41ebcf6fd 100644 --- a/native-engine/datafusion-ext-plans/src/joins/bhj/semi_join.rs +++ b/native-engine/datafusion-ext-plans/src/joins/bhj/semi_join.rs @@ -189,7 +189,10 @@ impl Joiner for SemiJoiner

{ let mut hashes_idx = 0; for row_idx in 0..probed_batch.num_rows() { - let key_is_valid = probed_valids.as_ref().map(|nb| nb.is_valid(row_idx)).unwrap_or(true); + let key_is_valid = probed_valids + .as_ref() + .map(|nb| nb.is_valid(row_idx)) + .unwrap_or(true); if P.mode == Anti && P.probe_is_join_side && !key_is_valid { probed_joined.set(row_idx, true); continue; diff --git a/native-engine/datafusion-ext-plans/src/joins/test.rs b/native-engine/datafusion-ext-plans/src/joins/test.rs index 06b1cf881..bc9ef3378 100644 --- a/native-engine/datafusion-ext-plans/src/joins/test.rs +++ b/native-engine/datafusion-ext-plans/src/joins/test.rs @@ -624,7 +624,6 @@ mod tests { "| a1 | b1 | c1 |", "+----+----+----+", "| 2 | 6 | 9 |", - "| 4 | | 10 |", "| 5 | 8 | 11 |", "+----+----+----+", ]; From 52296f93062929bdd695329d074cb1f6a3758101 Mon Sep 17 00:00:00 2001 From: duanhao-jk Date: Mon, 29 Dec 2025 10:45:30 +0800 Subject: [PATCH 5/9] Keep the null result in the reverse connection result --- native-engine/datafusion-ext-plans/src/joins/test.rs | 3 ++- 1 file changed, 2 insertions(+), 1 deletion(-) diff --git a/native-engine/datafusion-ext-plans/src/joins/test.rs b/native-engine/datafusion-ext-plans/src/joins/test.rs index bc9ef3378..efffca2d5 100644 --- a/native-engine/datafusion-ext-plans/src/joins/test.rs +++ b/native-engine/datafusion-ext-plans/src/joins/test.rs @@ -623,7 +623,8 @@ mod tests { "+----+----+----+", "| a1 | b1 | c1 |", "+----+----+----+", - "| 2 | 6 | 9 |", + "| | 6 | 9 |", + "| 4 | | 10 |", "| 5 | 8 | 11 |", "+----+----+----+", ]; From 38528c3510cdd77e4ea57dfa4859fcccc1dd929d Mon Sep 17 00:00:00 2001 From: duanhao-jk Date: Mon, 29 Dec 2025 10:52:08 +0800 Subject: [PATCH 6/9] Keep the null result in the reverse connection result --- native-engine/datafusion-ext-plans/src/joins/test.rs | 1 - 1 file changed, 1 deletion(-) diff --git a/native-engine/datafusion-ext-plans/src/joins/test.rs b/native-engine/datafusion-ext-plans/src/joins/test.rs index efffca2d5..3a8262bb2 100644 --- a/native-engine/datafusion-ext-plans/src/joins/test.rs +++ b/native-engine/datafusion-ext-plans/src/joins/test.rs @@ -624,7 +624,6 @@ mod tests { "| a1 | b1 | c1 |", "+----+----+----+", "| | 6 | 9 |", - "| 4 | | 10 |", "| 5 | 8 | 11 |", "+----+----+----+", ]; From 01ebcb36f61bed5ec0ad429825765e3c9209146a Mon Sep 17 00:00:00 2001 From: duanhao-jk Date: Mon, 29 Dec 2025 11:02:11 +0800 Subject: [PATCH 7/9] Keep the null result in the reverse connection result --- .../datafusion-ext-plans/src/joins/test.rs | 46 ++++++++++++------- 1 file changed, 30 insertions(+), 16 deletions(-) diff --git a/native-engine/datafusion-ext-plans/src/joins/test.rs b/native-engine/datafusion-ext-plans/src/joins/test.rs index 3a8262bb2..717bbc752 100644 --- a/native-engine/datafusion-ext-plans/src/joins/test.rs +++ b/native-engine/datafusion-ext-plans/src/joins/test.rs @@ -602,28 +602,42 @@ mod tests { #[tokio::test(flavor = "multi_thread", worker_threads = 1)] async fn join_anti_with_null_keys() -> Result<()> { - for test_type in ALL_TEST_TYPE { - let left = build_table_i32_nullable( - ("a1", &vec![Some(1), Some(2), None, Some(4), Some(5)]), - ("b1", &vec![Some(4), Some(5), Some(6), None, Some(8)]), - ("c1", &vec![Some(7), Some(8), Some(9), Some(10), Some(11)]), - ); - let right = build_table_i32_nullable( - ("a2", &vec![Some(10), Some(20), Some(30)]), - ("b1", &vec![Some(4), Some(5), Some(7)]), - ("c2", &vec![Some(70), Some(80), Some(90)]), - ); - let on: JoinOn = vec![( - Arc::new(Column::new_with_schema("b1", &left.schema())?), - Arc::new(Column::new_with_schema("b1", &right.schema())?), - )]; + let left = build_table_i32_nullable( + ("a1", &vec![Some(1), Some(2), None, Some(4), Some(5)]), + ("b1", &vec![Some(4), Some(5), Some(6), None, Some(8)]), + ("c1", &vec![Some(7), Some(8), Some(9), Some(10), Some(11)]), + ); + let right = build_table_i32_nullable( + ("a2", &vec![Some(10), Some(20), Some(30)]), + ("b1", &vec![Some(4), Some(5), Some(7)]), + ("c2", &vec![Some(70), Some(80), Some(90)]), + ); + let on: JoinOn = vec![( + Arc::new(Column::new_with_schema("b1", &left.schema())?), + Arc::new(Column::new_with_schema("b1", &right.schema())?), + )]; - let (_, batches) = join_collect(test_type, left, right, on, LeftAnti).await?; + for test_type in [BHJLeftProbed, SHJLeftProbed] { + let (_, batches) = join_collect(test_type, left.clone(), right.clone(), on.clone(), LeftAnti).await?; + let expected = vec![ + "+----+----+----+", + "| a1 | b1 | c1 |", + "+----+----+----+", + "| | 6 | 9 |", + "| 5 | 8 | 11 |", + "+----+----+----+", + ]; + assert_batches_sorted_eq!(expected, &batches); + } + + for test_type in [SMJ, BHJRightProbed, SHJRightProbed] { + let (_, batches) = join_collect(test_type, left.clone(), right.clone(), on.clone(), LeftAnti).await?; let expected = vec![ "+----+----+----+", "| a1 | b1 | c1 |", "+----+----+----+", "| | 6 | 9 |", + "| 4 | | 10 |", "| 5 | 8 | 11 |", "+----+----+----+", ]; From 7f704acca7f310cea013c0f6d216c7476a166ff4 Mon Sep 17 00:00:00 2001 From: duanhao-jk Date: Mon, 29 Dec 2025 11:13:56 +0800 Subject: [PATCH 8/9] Keep the null result in the reverse connection result --- native-engine/datafusion-ext-plans/src/joins/test.rs | 8 ++++++-- 1 file changed, 6 insertions(+), 2 deletions(-) diff --git a/native-engine/datafusion-ext-plans/src/joins/test.rs b/native-engine/datafusion-ext-plans/src/joins/test.rs index 717bbc752..9b78394e2 100644 --- a/native-engine/datafusion-ext-plans/src/joins/test.rs +++ b/native-engine/datafusion-ext-plans/src/joins/test.rs @@ -618,7 +618,9 @@ mod tests { )]; for test_type in [BHJLeftProbed, SHJLeftProbed] { - let (_, batches) = join_collect(test_type, left.clone(), right.clone(), on.clone(), LeftAnti).await?; + let (_, batches) = + join_collect(test_type, left.clone(), right.clone(), on.clone(), LeftAnti) + .await?; let expected = vec![ "+----+----+----+", "| a1 | b1 | c1 |", @@ -631,7 +633,9 @@ mod tests { } for test_type in [SMJ, BHJRightProbed, SHJRightProbed] { - let (_, batches) = join_collect(test_type, left.clone(), right.clone(), on.clone(), LeftAnti).await?; + let (_, batches) = + join_collect(test_type, left.clone(), right.clone(), on.clone(), LeftAnti) + .await?; let expected = vec![ "+----+----+----+", "| a1 | b1 | c1 |", From 6ed3d318d7cccc14bd14e94ece5236b04c9f479c Mon Sep 17 00:00:00 2001 From: duanhao-jk Date: Mon, 29 Dec 2025 11:16:31 +0800 Subject: [PATCH 9/9] Keep the null result in the reverse connection result --- native-engine/datafusion-ext-plans/src/joins/test.rs | 6 ++---- 1 file changed, 2 insertions(+), 4 deletions(-) diff --git a/native-engine/datafusion-ext-plans/src/joins/test.rs b/native-engine/datafusion-ext-plans/src/joins/test.rs index 9b78394e2..671ecd732 100644 --- a/native-engine/datafusion-ext-plans/src/joins/test.rs +++ b/native-engine/datafusion-ext-plans/src/joins/test.rs @@ -619,8 +619,7 @@ mod tests { for test_type in [BHJLeftProbed, SHJLeftProbed] { let (_, batches) = - join_collect(test_type, left.clone(), right.clone(), on.clone(), LeftAnti) - .await?; + join_collect(test_type, left.clone(), right.clone(), on.clone(), LeftAnti).await?; let expected = vec![ "+----+----+----+", "| a1 | b1 | c1 |", @@ -634,8 +633,7 @@ mod tests { for test_type in [SMJ, BHJRightProbed, SHJRightProbed] { let (_, batches) = - join_collect(test_type, left.clone(), right.clone(), on.clone(), LeftAnti) - .await?; + join_collect(test_type, left.clone(), right.clone(), on.clone(), LeftAnti).await?; let expected = vec![ "+----+----+----+", "| a1 | b1 | c1 |",