From 6035422d0fe8033c9f1af28b66fa8e69598979bf Mon Sep 17 00:00:00 2001 From: Anton Borisov Date: Sun, 28 Dec 2025 13:00:24 +0000 Subject: [PATCH] Delta Join additional IT tests and docs improvement --- .../source/Flink22TableSourceITCase.java | 697 ++++++++++++++++++ website/docs/engine-flink/delta-joins.md | 19 +- 2 files changed, 707 insertions(+), 9 deletions(-) diff --git a/fluss-flink/fluss-flink-2.2/src/test/java/org/apache/fluss/flink/source/Flink22TableSourceITCase.java b/fluss-flink/fluss-flink-2.2/src/test/java/org/apache/fluss/flink/source/Flink22TableSourceITCase.java index 7253720481..0f496742a2 100644 --- a/fluss-flink/fluss-flink-2.2/src/test/java/org/apache/fluss/flink/source/Flink22TableSourceITCase.java +++ b/fluss-flink/fluss-flink-2.2/src/test/java/org/apache/fluss/flink/source/Flink22TableSourceITCase.java @@ -20,6 +20,7 @@ import org.apache.fluss.metadata.TablePath; import org.apache.fluss.row.InternalRow; +import org.apache.flink.table.api.ValidationException; import org.apache.flink.table.api.config.ExecutionConfigOptions; import org.apache.flink.table.api.config.OptimizerConfigOptions; import org.apache.flink.types.Row; @@ -33,6 +34,7 @@ import static org.apache.fluss.flink.utils.FlinkTestBase.writeRows; import static org.apache.fluss.testutils.DataTestUtils.row; import static org.assertj.core.api.Assertions.assertThat; +import static org.assertj.core.api.Assertions.assertThatThrownBy; /** IT case for {@link FlinkTableSource} in Flink 2.2. */ public class Flink22TableSourceITCase extends FlinkTableSourceITCase { @@ -234,6 +236,83 @@ void testDeltaJoinWithProjectionAndFilter() throws Exception { assertResultsIgnoreOrder(collected, expected, true); } + @Test + void testDeltaJoinFailsWhenFilterOnNonUpsertKeys() throws Exception { + // When filtering on non-upsert-key columns with CDC sources, + // the optimizer can't use DeltaJoin + tEnv.getConfig().set(ExecutionConfigOptions.TABLE_EXEC_RESOURCE_DEFAULT_PARALLELISM, 2); + + String leftTableName = "left_table_force_fail"; + tEnv.executeSql( + String.format( + "create table %s ( " + + " a1 int, " + + " b1 varchar, " + + " c1 bigint, " + + " d1 int, " + + " e1 bigint, " + + " primary key (c1, d1) NOT ENFORCED" + + ") with (" + + " 'connector' = 'fluss', " + + " 'bucket.key' = 'c1', " + + " 'table.delete.behavior' = 'IGNORE' " + + ")", + leftTableName)); + + String rightTableName = "right_table_force_fail"; + tEnv.executeSql( + String.format( + "create table %s (" + + " a2 int, " + + " b2 varchar, " + + " c2 bigint, " + + " d2 int, " + + " e2 bigint, " + + " primary key (c2, d2) NOT ENFORCED" + + ") with (" + + " 'connector' = 'fluss', " + + " 'bucket.key' = 'c2', " + + " 'table.delete.behavior' = 'IGNORE' " + + ")", + rightTableName)); + + String sinkTableName = "sink_table_force_fail"; + tEnv.executeSql( + String.format( + "create table %s ( " + + " a1 int, " + + " b1 varchar, " + + " c1 bigint, " + + " d1 int, " + + " e1 bigint, " + + " a2 int, " + + " b2 varchar, " + + " c2 bigint, " + + " d2 int, " + + " e2 bigint, " + + " primary key (c1, d1, c2, d2) NOT ENFORCED" + + ") with (" + + " 'connector' = 'fluss' " + + ")", + sinkTableName)); + + // Use FORCE strategy - should throw exception + tEnv.getConfig() + .set( + OptimizerConfigOptions.TABLE_OPTIMIZER_DELTA_JOIN_STRATEGY, + OptimizerConfigOptions.DeltaJoinStrategy.FORCE); + + // Filter on e1 > e2, where e1 and e2 are NOT part of the upsert key + String sql = + String.format( + "INSERT INTO %s SELECT * FROM %s INNER JOIN %s ON c1 = c2 AND d1 = d2 WHERE e1 > e2", + sinkTableName, leftTableName, rightTableName); + + assertThatThrownBy(() -> tEnv.explainSql(sql)) + .isInstanceOf(ValidationException.class) + .hasMessageContaining("doesn't support to do delta join optimization"); + } + @Test void testDeltaJoinWithLookupCache() throws Exception { tEnv.getConfig().set(ExecutionConfigOptions.TABLE_EXEC_RESOURCE_DEFAULT_PARALLELISM, 2); @@ -306,4 +385,622 @@ void testDeltaJoinWithLookupCache() throws Exception { List expected = Arrays.asList("+I[1, 1]", "-U[1, 1]", "+U[1, 1]"); assertResultsIgnoreOrder(collected, expected, true); } + + @Test + void testDeltaJoinWithPrimaryKeyTableNoDeletes() throws Exception { + // Test delta join with normal primary key tables (not first_row) using + // table.delete.behavior=IGNORE + tEnv.getConfig().set(ExecutionConfigOptions.TABLE_EXEC_RESOURCE_DEFAULT_PARALLELISM, 2); + + String leftTableName = "left_table_normal_pk"; + tEnv.executeSql( + String.format( + "create table %s ( " + + " a1 int, " + + " b1 varchar, " + + " c1 bigint, " + + " d1 int, " + + " e1 bigint, " + + " primary key (c1, d1) NOT ENFORCED" + + ") with (" + + " 'connector' = 'fluss', " + + " 'bucket.key' = 'c1', " + + " 'table.delete.behavior' = 'IGNORE' " + + ")", + leftTableName)); + List rows1 = + Arrays.asList( + row(1, "v1", 100L, 1, 10000L), + row(2, "v2", 200L, 2, 20000L), + row(3, "v3", 300L, 3, 30000L)); + TablePath leftTablePath = TablePath.of(DEFAULT_DB, leftTableName); + writeRows(conn, leftTablePath, rows1, false); + + String rightTableName = "right_table_normal_pk"; + tEnv.executeSql( + String.format( + "create table %s (" + + " a2 int, " + + " b2 varchar, " + + " c2 bigint, " + + " d2 int, " + + " e2 bigint, " + + " primary key (c2, d2) NOT ENFORCED" + + ") with (" + + " 'connector' = 'fluss', " + + " 'bucket.key' = 'c2', " + + " 'table.delete.behavior' = 'IGNORE' " + + ")", + rightTableName)); + List rows2 = + Arrays.asList( + row(1, "v1", 100L, 1, 10000L), + row(2, "v4", 200L, 2, 20000L), + row(3, "v5", 400L, 4, 40000L)); + TablePath rightTablePath = TablePath.of(DEFAULT_DB, rightTableName); + writeRows(conn, rightTablePath, rows2, false); + + String sinkTableName = "sink_table_normal_pk"; + tEnv.executeSql( + String.format( + "create table %s ( " + + " a1 int, " + + " b1 varchar, " + + " c1 bigint, " + + " d1 int, " + + " a2 int, " + + " b2 varchar, " + + " primary key (c1, d1) NOT ENFORCED" + + ") with (" + + " 'connector' = 'fluss' " + + ")", + sinkTableName)); + + tEnv.getConfig() + .set( + OptimizerConfigOptions.TABLE_OPTIMIZER_DELTA_JOIN_STRATEGY, + OptimizerConfigOptions.DeltaJoinStrategy.FORCE); + + String sql = + String.format( + "INSERT INTO %s SELECT a1, b1, c1, d1, a2, b2 FROM %s INNER JOIN %s ON c1 = c2 AND d1 = d2", + sinkTableName, leftTableName, rightTableName); + + assertThat(tEnv.explainSql(sql)) + .contains("DeltaJoin(joinType=[InnerJoin], where=[((c1 = c2) AND (d1 = d2))]"); + + tEnv.executeSql(sql); + + CloseableIterator collected = + tEnv.executeSql(String.format("select * from %s", sinkTableName)).collect(); + List expected = + Arrays.asList( + "+I[1, v1, 100, 1, 1, v1]", + "-U[1, v1, 100, 1, 1, v1]", + "+U[1, v1, 100, 1, 1, v1]", + "+I[2, v2, 200, 2, 2, v4]", + "-U[2, v2, 200, 2, 2, v4]", + "+U[2, v2, 200, 2, 2, v4]"); + assertResultsIgnoreOrder(collected, expected, true); + } + + @Test + void testDeltaJoinOnBucketKey() throws Exception { + // Test delta join on bucket key only (not full primary key) + tEnv.getConfig().set(ExecutionConfigOptions.TABLE_EXEC_RESOURCE_DEFAULT_PARALLELISM, 2); + + String leftTableName = "left_table_bucket_key"; + tEnv.executeSql( + String.format( + "create table %s ( " + + " a1 int, " + + " b1 varchar, " + + " c1 bigint, " + + " d1 int, " + + " e1 bigint, " + + " primary key (c1, d1) NOT ENFORCED" + + ") with (" + + " 'connector' = 'fluss', " + + " 'bucket.key' = 'c1', " + + " 'table.merge-engine' = 'first_row' " + + ")", + leftTableName)); + List rows1 = + Arrays.asList( + row(1, "v1", 100L, 1, 10000L), + row(2, "v2", 100L, 2, 20000L), + row(3, "v3", 200L, 1, 30000L)); + TablePath leftTablePath = TablePath.of(DEFAULT_DB, leftTableName); + writeRows(conn, leftTablePath, rows1, false); + + String rightTableName = "right_table_bucket_key"; + tEnv.executeSql( + String.format( + "create table %s (" + + " a2 int, " + + " b2 varchar, " + + " c2 bigint, " + + " d2 int, " + + " e2 bigint, " + + " primary key (c2, d2) NOT ENFORCED" + + ") with (" + + " 'connector' = 'fluss', " + + " 'bucket.key' = 'c2', " + + " 'table.merge-engine' = 'first_row' " + + ")", + rightTableName)); + List rows2 = + Arrays.asList(row(10, "r1", 100L, 5, 50000L), row(20, "r2", 200L, 6, 60000L)); + TablePath rightTablePath = TablePath.of(DEFAULT_DB, rightTableName); + writeRows(conn, rightTablePath, rows2, false); + + String sinkTableName = "sink_table_bucket_key"; + tEnv.executeSql( + String.format( + "create table %s ( " + + " a1 int, " + + " b1 varchar, " + + " c1 bigint, " + + " a2 int, " + + " b2 varchar, " + + " primary key (a1, a2) NOT ENFORCED" + + ") with (" + + " 'connector' = 'fluss' " + + ")", + sinkTableName)); + + tEnv.getConfig() + .set( + OptimizerConfigOptions.TABLE_OPTIMIZER_DELTA_JOIN_STRATEGY, + OptimizerConfigOptions.DeltaJoinStrategy.FORCE); + + // Join on bucket key only (c1 = c2), not full primary key + String sql = + String.format( + "INSERT INTO %s SELECT a1, b1, c1, a2, b2 FROM %s INNER JOIN %s ON c1 = c2", + sinkTableName, leftTableName, rightTableName); + + assertThat(tEnv.explainSql(sql)) + .contains("DeltaJoin(joinType=[InnerJoin], where=[=(c1, c2)]"); + + tEnv.executeSql(sql); + + CloseableIterator collected = + tEnv.executeSql(String.format("select * from %s", sinkTableName)).collect(); + // Each left row with c1=100 joins with the right row with c2=100 + // Each left row with c1=200 joins with the right row with c2=200 + List expected = + Arrays.asList( + "+I[1, v1, 100, 10, r1]", + "-U[1, v1, 100, 10, r1]", + "+U[1, v1, 100, 10, r1]", + "+I[2, v2, 100, 10, r1]", + "-U[2, v2, 100, 10, r1]", + "+U[2, v2, 100, 10, r1]", + "+I[3, v3, 200, 20, r2]", + "-U[3, v3, 200, 20, r2]", + "+U[3, v3, 200, 20, r2]"); + assertResultsIgnoreOrder(collected, expected, true); + } + + @Test + void testDeltaJoinFailsWhenSourceHasDelete() throws Exception { + // When source can produce DELETE records, DeltaJoin is not applicable + tEnv.getConfig().set(ExecutionConfigOptions.TABLE_EXEC_RESOURCE_DEFAULT_PARALLELISM, 2); + + // No merge-engine or delete.behavior - regular PK tables with full CDC + String leftTableName = "left_table_delete_force"; + tEnv.executeSql( + String.format( + "create table %s ( " + + " a1 int, " + + " b1 varchar, " + + " c1 bigint, " + + " d1 int, " + + " e1 bigint, " + + " primary key (c1, d1) NOT ENFORCED" + + ") with (" + + " 'connector' = 'fluss', " + + " 'bucket.key' = 'c1' " + + ")", + leftTableName)); + + String rightTableName = "right_table_delete_force"; + tEnv.executeSql( + String.format( + "create table %s (" + + " a2 int, " + + " b2 varchar, " + + " c2 bigint, " + + " d2 int, " + + " e2 bigint, " + + " primary key (c2, d2) NOT ENFORCED" + + ") with (" + + " 'connector' = 'fluss', " + + " 'bucket.key' = 'c2' " + + ")", + rightTableName)); + + String sinkTableName = "sink_table_delete_force"; + tEnv.executeSql( + String.format( + "create table %s ( " + + " a1 int, " + + " b1 varchar, " + + " c1 bigint, " + + " d1 int, " + + " e1 bigint, " + + " a2 int, " + + " b2 varchar, " + + " c2 bigint, " + + " d2 int, " + + " e2 bigint, " + + " primary key (c1, d1, c2, d2) NOT ENFORCED" + + ") with (" + + " 'connector' = 'fluss' " + + ")", + sinkTableName)); + + tEnv.getConfig() + .set( + OptimizerConfigOptions.TABLE_OPTIMIZER_DELTA_JOIN_STRATEGY, + OptimizerConfigOptions.DeltaJoinStrategy.FORCE); + + String sql = + String.format( + "INSERT INTO %s SELECT * FROM %s INNER JOIN %s ON c1 = c2 AND d1 = d2", + sinkTableName, leftTableName, rightTableName); + + assertThatThrownBy(() -> tEnv.explainSql(sql)) + .isInstanceOf(ValidationException.class) + .hasMessageContaining("doesn't support to do delta join optimization"); + } + + @Test + void testDeltaJoinFailsWhenJoinKeyNotContainIndex() throws Exception { + // When join key doesn't include at least one complete index, DeltaJoin isn't applicable + tEnv.getConfig().set(ExecutionConfigOptions.TABLE_EXEC_RESOURCE_DEFAULT_PARALLELISM, 2); + + String leftTableName = "left_table_no_idx_force"; + tEnv.executeSql( + String.format( + "create table %s ( " + + " a1 int, " + + " b1 varchar, " + + " c1 bigint, " + + " d1 int, " + + " e1 bigint, " + + " primary key (c1, d1) NOT ENFORCED" + + ") with (" + + " 'connector' = 'fluss', " + + " 'bucket.key' = 'c1', " + + " 'table.merge-engine' = 'first_row' " + + ")", + leftTableName)); + + String rightTableName = "right_table_no_idx_force"; + tEnv.executeSql( + String.format( + "create table %s (" + + " a2 int, " + + " b2 varchar, " + + " c2 bigint, " + + " d2 int, " + + " e2 bigint, " + + " primary key (c2, d2) NOT ENFORCED" + + ") with (" + + " 'connector' = 'fluss', " + + " 'bucket.key' = 'c2', " + + " 'table.merge-engine' = 'first_row' " + + ")", + rightTableName)); + + String sinkTableName = "sink_table_no_idx_force"; + tEnv.executeSql( + String.format( + "create table %s ( " + + " a1 int, " + + " b1 varchar, " + + " c1 bigint, " + + " d1 int, " + + " e1 bigint, " + + " a2 int, " + + " b2 varchar, " + + " c2 bigint, " + + " d2 int, " + + " e2 bigint, " + + " primary key (a1, a2) NOT ENFORCED" + + ") with (" + + " 'connector' = 'fluss' " + + ")", + sinkTableName)); + + tEnv.getConfig() + .set( + OptimizerConfigOptions.TABLE_OPTIMIZER_DELTA_JOIN_STRATEGY, + OptimizerConfigOptions.DeltaJoinStrategy.FORCE); + + // Join on a1 = a2, but index is on c1/c2 (bucket.key), not a1/a2 + String sql = + String.format( + "INSERT INTO %s SELECT * FROM %s INNER JOIN %s ON a1 = a2", + sinkTableName, leftTableName, rightTableName); + + assertThatThrownBy(() -> tEnv.explainSql(sql)) + .isInstanceOf(ValidationException.class) + .hasMessageContaining("doesn't support to do delta join optimization"); + } + + @Test + void testDeltaJoinFailsWithLeftJoin() throws Exception { + tEnv.getConfig().set(ExecutionConfigOptions.TABLE_EXEC_RESOURCE_DEFAULT_PARALLELISM, 2); + + String leftTableName = "left_table_left_fail"; + tEnv.executeSql( + String.format( + "create table %s ( " + + " a1 int, " + + " c1 bigint, " + + " d1 int, " + + " primary key (c1, d1) NOT ENFORCED" + + ") with (" + + " 'connector' = 'fluss', " + + " 'bucket.key' = 'c1', " + + " 'table.merge-engine' = 'first_row' " + + ")", + leftTableName)); + + String rightTableName = "right_table_left_fail"; + tEnv.executeSql( + String.format( + "create table %s (" + + " a2 int, " + + " c2 bigint, " + + " d2 int, " + + " primary key (c2, d2) NOT ENFORCED" + + ") with (" + + " 'connector' = 'fluss', " + + " 'bucket.key' = 'c2', " + + " 'table.merge-engine' = 'first_row' " + + ")", + rightTableName)); + + String sinkTableName = "sink_table_left_fail"; + tEnv.executeSql( + String.format( + "create table %s ( " + + " a1 int, " + + " c1 bigint, " + + " a2 int, " + + " primary key (c1) NOT ENFORCED" + + ") with (" + + " 'connector' = 'fluss' " + + ")", + sinkTableName)); + + tEnv.getConfig() + .set( + OptimizerConfigOptions.TABLE_OPTIMIZER_DELTA_JOIN_STRATEGY, + OptimizerConfigOptions.DeltaJoinStrategy.FORCE); + + String sql = + String.format( + "INSERT INTO %s SELECT a1, c1, a2 FROM %s LEFT JOIN %s ON c1 = c2 AND d1 = d2", + sinkTableName, leftTableName, rightTableName); + + assertThatThrownBy(() -> tEnv.explainSql(sql)) + .isInstanceOf(ValidationException.class) + .hasMessageContaining("doesn't support to do delta join optimization"); + } + + @Test + void testDeltaJoinFailsWithRightJoin() throws Exception { + tEnv.getConfig().set(ExecutionConfigOptions.TABLE_EXEC_RESOURCE_DEFAULT_PARALLELISM, 2); + + String leftTableName = "left_table_right_fail"; + tEnv.executeSql( + String.format( + "create table %s ( " + + " a1 int, " + + " c1 bigint, " + + " d1 int, " + + " primary key (c1, d1) NOT ENFORCED" + + ") with (" + + " 'connector' = 'fluss', " + + " 'bucket.key' = 'c1', " + + " 'table.merge-engine' = 'first_row' " + + ")", + leftTableName)); + + String rightTableName = "right_table_right_fail"; + tEnv.executeSql( + String.format( + "create table %s (" + + " a2 int, " + + " c2 bigint, " + + " d2 int, " + + " primary key (c2, d2) NOT ENFORCED" + + ") with (" + + " 'connector' = 'fluss', " + + " 'bucket.key' = 'c2', " + + " 'table.merge-engine' = 'first_row' " + + ")", + rightTableName)); + + String sinkTableName = "sink_table_right_fail"; + tEnv.executeSql( + String.format( + "create table %s ( " + + " a1 int, " + + " c2 bigint, " + + " a2 int, " + + " primary key (c2) NOT ENFORCED" + + ") with (" + + " 'connector' = 'fluss' " + + ")", + sinkTableName)); + + tEnv.getConfig() + .set( + OptimizerConfigOptions.TABLE_OPTIMIZER_DELTA_JOIN_STRATEGY, + OptimizerConfigOptions.DeltaJoinStrategy.FORCE); + + String sql = + String.format( + "INSERT INTO %s SELECT a1, c2, a2 FROM %s RIGHT JOIN %s ON c1 = c2 AND d1 = d2", + sinkTableName, leftTableName, rightTableName); + + assertThatThrownBy(() -> tEnv.explainSql(sql)) + .isInstanceOf(ValidationException.class) + .hasMessageContaining("doesn't support to do delta join optimization"); + } + + @Test + void testDeltaJoinFailsWithFullOuterJoin() throws Exception { + tEnv.getConfig().set(ExecutionConfigOptions.TABLE_EXEC_RESOURCE_DEFAULT_PARALLELISM, 2); + + String leftTableName = "left_table_full_fail"; + tEnv.executeSql( + String.format( + "create table %s ( " + + " a1 int, " + + " c1 bigint, " + + " d1 int, " + + " primary key (c1, d1) NOT ENFORCED" + + ") with (" + + " 'connector' = 'fluss', " + + " 'bucket.key' = 'c1', " + + " 'table.merge-engine' = 'first_row' " + + ")", + leftTableName)); + + String rightTableName = "right_table_full_fail"; + tEnv.executeSql( + String.format( + "create table %s (" + + " a2 int, " + + " c2 bigint, " + + " d2 int, " + + " primary key (c2, d2) NOT ENFORCED" + + ") with (" + + " 'connector' = 'fluss', " + + " 'bucket.key' = 'c2', " + + " 'table.merge-engine' = 'first_row' " + + ")", + rightTableName)); + + String sinkTableName = "sink_table_full_fail"; + tEnv.executeSql( + String.format( + "create table %s ( " + + " a1 int, " + + " c1 bigint, " + + " c2 bigint, " + + " a2 int, " + + " primary key (c1, c2) NOT ENFORCED" + + ") with (" + + " 'connector' = 'fluss' " + + ")", + sinkTableName)); + + tEnv.getConfig() + .set( + OptimizerConfigOptions.TABLE_OPTIMIZER_DELTA_JOIN_STRATEGY, + OptimizerConfigOptions.DeltaJoinStrategy.FORCE); + + String sql = + String.format( + "INSERT INTO %s SELECT a1, c1, c2, a2 FROM %s FULL OUTER JOIN %s ON c1 = c2 AND d1 = d2", + sinkTableName, leftTableName, rightTableName); + + assertThatThrownBy(() -> tEnv.explainSql(sql)) + .isInstanceOf(ValidationException.class) + .hasMessageContaining("doesn't support to do delta join optimization"); + } + + @Test + void testDeltaJoinFailsWithCascadeJoin() throws Exception { + // DeltaJoin requires that all inputs must come directly from supported upstream nodes + // (TableSourceScan, Exchange, DropUpdateBefore or Calc) + tEnv.getConfig().set(ExecutionConfigOptions.TABLE_EXEC_RESOURCE_DEFAULT_PARALLELISM, 2); + + String table1 = "cascade_table1"; + tEnv.executeSql( + String.format( + "create table %s ( " + + " a1 int, " + + " c1 bigint, " + + " d1 int, " + + " primary key (c1, d1) NOT ENFORCED" + + ") with (" + + " 'connector' = 'fluss', " + + " 'bucket.key' = 'c1', " + + " 'table.merge-engine' = 'first_row' " + + ")", + table1)); + + String table2 = "cascade_table2"; + tEnv.executeSql( + String.format( + "create table %s (" + + " a2 int, " + + " c2 bigint, " + + " d2 int, " + + " primary key (c2, d2) NOT ENFORCED" + + ") with (" + + " 'connector' = 'fluss', " + + " 'bucket.key' = 'c2', " + + " 'table.merge-engine' = 'first_row' " + + ")", + table2)); + + String table3 = "cascade_table3"; + tEnv.executeSql( + String.format( + "create table %s (" + + " a3 int, " + + " c3 bigint, " + + " d3 int, " + + " primary key (c3, d3) NOT ENFORCED" + + ") with (" + + " 'connector' = 'fluss', " + + " 'bucket.key' = 'c3', " + + " 'table.merge-engine' = 'first_row' " + + ")", + table3)); + + String sinkTableName = "cascade_sink"; + tEnv.executeSql( + String.format( + "create table %s ( " + + " a1 int, " + + " c1 bigint, " + + " a2 int, " + + " c2 bigint, " + + " a3 int, " + + " c3 bigint, " + + " primary key (c1, c2, c3) NOT ENFORCED" + + ") with (" + + " 'connector' = 'fluss' " + + ")", + sinkTableName)); + + tEnv.getConfig() + .set( + OptimizerConfigOptions.TABLE_OPTIMIZER_DELTA_JOIN_STRATEGY, + OptimizerConfigOptions.DeltaJoinStrategy.FORCE); + + // Cascade join: (table1 JOIN table2) JOIN table3 + String sql = + String.format( + "INSERT INTO %s SELECT a1, c1, a2, c2, a3, c3 " + + "FROM %s " + + "INNER JOIN %s ON c1 = c2 AND d1 = d2 " + + "INNER JOIN %s ON c1 = c3 AND d1 = d3", + sinkTableName, table1, table2, table3); + + assertThatThrownBy(() -> tEnv.explainSql(sql)) + .isInstanceOf(ValidationException.class) + .hasMessageContaining("doesn't support to do delta join optimization"); + } } diff --git a/website/docs/engine-flink/delta-joins.md b/website/docs/engine-flink/delta-joins.md index deeb9ed74f..c831fe8656 100644 --- a/website/docs/engine-flink/delta-joins.md +++ b/website/docs/engine-flink/delta-joins.md @@ -169,24 +169,25 @@ There is a known issue ([FLINK-38399](https://issues.apache.org/jira/browse/FLIN #### Supported Features -- Support for optimizing a dual-stream join from CDC sources that do not include delete messages into a delta join. - - Disable delete on the source table to guarantee there is no delete message in the table, by adding the option `'table.delete.behavior' = 'IGNORE'` or `'DISABLE'` on the table. - - The source table is no more required to be a `first_row` merge engine table since this version. -- Support `Project` and `Filter` between source and delta join. -- Support cache in delta join. +- CDC sources are now supported in delta join, provided they do not produce DELETE messages. + - Set `'table.delete.behavior' = 'IGNORE'` or `'DISABLE'` on the source table to suppress deletes. + - The `'table.merge-engine' = 'first_row'` option is no longer required. +- Projection and filter operations are now supported between source and delta join. +- Lookup cache is now supported in delta join. #### Limitations - The primary key or the prefix key of the tables must be included as part of the equivalence conditions in the join. -- The join must be a INNER join. +- The join must be an INNER join. LEFT JOIN, RIGHT JOIN, and FULL OUTER JOIN are not supported. +- Cascade joins (e.g., `A JOIN B JOIN C`) are not supported. Each join input must come directly from a table source. - The downstream node of the join must support idempotent updates, typically it's an upsert sink and should not have a `SinkUpsertMaterializer` node before it. - - Flink planner automatically inserts a `SinkUpsertMaterializer` when the sink’s primary key does not fully cover the upstream update key. + - Flink planner automatically inserts a `SinkUpsertMaterializer` when the sink's primary key does not fully cover the upstream update key. - You can learn more details about `SinkUpsertMaterializer` by reading this [blog](https://www.ververica.com/blog/flink-sql-secrets-mastering-the-art-of-changelog-events). - Since delta join does not support to handle update-before messages, it is necessary to ensure that the entire pipeline can safely discard update-before messages. That means when consuming a CDC stream: - The join key used in the delta join must be part of the primary key. - The sink's primary key must be the same as the upstream update key. - - All filters must be applied on the upsert key. -- Neither filters nor projections should contain non-deterministic functions. + - All filters (including non-equi join conditions) must be applied on the upsert key. +- Filters, projections, and non-equi join conditions must not contain non-deterministic functions. ## Future Plan