diff --git a/fluss-flink/fluss-flink-2.2/src/test/java/org/apache/fluss/flink/catalog/Flink22CatalogITCase.java b/fluss-flink/fluss-flink-2.2/src/test/java/org/apache/fluss/flink/catalog/Flink22CatalogITCase.java index b87965727a..164a2e71dc 100644 --- a/fluss-flink/fluss-flink-2.2/src/test/java/org/apache/fluss/flink/catalog/Flink22CatalogITCase.java +++ b/fluss-flink/fluss-flink-2.2/src/test/java/org/apache/fluss/flink/catalog/Flink22CatalogITCase.java @@ -17,158 +17,14 @@ package org.apache.fluss.flink.catalog; -import org.apache.flink.table.api.DataTypes; import org.apache.flink.table.api.Schema; -import org.apache.flink.table.catalog.CatalogTable; -import org.apache.flink.table.catalog.ObjectPath; -import org.junit.jupiter.api.Test; - -import static org.assertj.core.api.Assertions.assertThat; /** IT case for catalog in Flink 2.2. */ public class Flink22CatalogITCase extends FlinkCatalogITCase { - @Test - void testGetTableWithIndex() throws Exception { - String tableName = "table_with_pk_only"; - tEnv.executeSql( - String.format( - "create table %s ( " - + " a int, " - + " b varchar, " - + " c bigint, " - + " primary key (a, b) NOT ENFORCED" - + ") with ( " - + " 'connector' = 'fluss' " - + ")", - tableName)); - CatalogTable table = (CatalogTable) catalog.getTable(new ObjectPath(DEFAULT_DB, tableName)); - Schema expectedSchema = - Schema.newBuilder() - .column("a", DataTypes.INT().notNull()) - .column("b", DataTypes.STRING().notNull()) - .column("c", DataTypes.BIGINT()) - .primaryKey("a", "b") - .index("a", "b") - .build(); - assertThat(table.getUnresolvedSchema()).isEqualTo(expectedSchema); - - tableName = "table_with_prefix_bucket_key"; - tEnv.executeSql( - String.format( - "create table %s ( " - + " a int, " - + " b varchar, " - + " c bigint, " - + " primary key (a, b) NOT ENFORCED" - + ") with ( " - + " 'connector' = 'fluss', " - + " 'bucket.key' = 'a'" - + ")", - tableName)); - - table = (CatalogTable) catalog.getTable(new ObjectPath(DEFAULT_DB, tableName)); - expectedSchema = - Schema.newBuilder() - .column("a", DataTypes.INT().notNull()) - .column("b", DataTypes.STRING().notNull()) - .column("c", DataTypes.BIGINT()) - .primaryKey("a", "b") - .index("a", "b") - .index("a") - .build(); - assertThat(table.getUnresolvedSchema()).isEqualTo(expectedSchema); - - tableName = "table_with_bucket_key_is_not_prefix_pk"; - tEnv.executeSql( - String.format( - "create table %s ( " - + " a int, " - + " b varchar, " - + " c bigint, " - + " primary key (a, b) NOT ENFORCED" - + ") with ( " - + " 'connector' = 'fluss', " - + " 'bucket.key' = 'b'" - + ")", - tableName)); - - table = (CatalogTable) catalog.getTable(new ObjectPath(DEFAULT_DB, tableName)); - expectedSchema = - Schema.newBuilder() - .column("a", DataTypes.INT().notNull()) - .column("b", DataTypes.STRING().notNull()) - .column("c", DataTypes.BIGINT()) - .primaryKey("a", "b") - .index("a", "b") - .build(); - assertThat(table.getUnresolvedSchema()).isEqualTo(expectedSchema); - - tableName = "table_with_partition_1"; - tEnv.executeSql( - String.format( - "create table %s ( " - + " a int, " - + " b varchar, " - + " c bigint, " - + " dt varchar, " - + " primary key (a, b, dt) NOT ENFORCED " - + ") " - + " partitioned by (dt) " - + " with ( " - + " 'connector' = 'fluss', " - + " 'bucket.key' = 'a'" - + ")", - tableName)); - - table = (CatalogTable) catalog.getTable(new ObjectPath(DEFAULT_DB, tableName)); - expectedSchema = - Schema.newBuilder() - .column("a", DataTypes.INT().notNull()) - .column("b", DataTypes.STRING().notNull()) - .column("c", DataTypes.BIGINT()) - .column("dt", DataTypes.STRING().notNull()) - .primaryKey("a", "b", "dt") - .index("a", "b", "dt") - .index("a", "dt") - .build(); - assertThat(table.getUnresolvedSchema()).isEqualTo(expectedSchema); - - tableName = "table_with_partition_2"; - tEnv.executeSql( - String.format( - "create table %s ( " - + " a int, " - + " b varchar, " - + " c bigint, " - + " dt varchar, " - + " primary key (dt, a, b) NOT ENFORCED " - + ") " - + " partitioned by (dt) " - + " with ( " - + " 'connector' = 'fluss', " - + " 'bucket.key' = 'a'" - + ")", - tableName)); - - table = (CatalogTable) catalog.getTable(new ObjectPath(DEFAULT_DB, tableName)); - expectedSchema = - Schema.newBuilder() - .column("a", DataTypes.INT().notNull()) - .column("b", DataTypes.STRING().notNull()) - .column("c", DataTypes.BIGINT()) - .column("dt", DataTypes.STRING().notNull()) - .primaryKey("dt", "a", "b") - .index("dt", "a", "b") - .index("a", "dt") - .build(); - assertThat(table.getUnresolvedSchema()).isEqualTo(expectedSchema); - } - @Override protected void addDefaultIndexKey(Schema.Builder schemaBuilder) { super.addDefaultIndexKey(schemaBuilder); - Schema currentSchema = schemaBuilder.build(); currentSchema.getPrimaryKey().ifPresent(pk -> schemaBuilder.index(pk.getColumnNames())); } diff --git a/fluss-flink/fluss-flink-2.2/src/test/java/org/apache/fluss/flink/source/Flink22TableSourceITCase.java b/fluss-flink/fluss-flink-2.2/src/test/java/org/apache/fluss/flink/source/Flink22TableSourceITCase.java index 7253720481..507527fb32 100644 --- a/fluss-flink/fluss-flink-2.2/src/test/java/org/apache/fluss/flink/source/Flink22TableSourceITCase.java +++ b/fluss-flink/fluss-flink-2.2/src/test/java/org/apache/fluss/flink/source/Flink22TableSourceITCase.java @@ -17,293 +17,5 @@ package org.apache.fluss.flink.source; -import org.apache.fluss.metadata.TablePath; -import org.apache.fluss.row.InternalRow; - -import org.apache.flink.table.api.config.ExecutionConfigOptions; -import org.apache.flink.table.api.config.OptimizerConfigOptions; -import org.apache.flink.types.Row; -import org.apache.flink.util.CloseableIterator; -import org.junit.jupiter.api.Test; - -import java.util.Arrays; -import java.util.List; - -import static org.apache.fluss.flink.source.testutils.FlinkRowAssertionsUtils.assertResultsIgnoreOrder; -import static org.apache.fluss.flink.utils.FlinkTestBase.writeRows; -import static org.apache.fluss.testutils.DataTestUtils.row; -import static org.assertj.core.api.Assertions.assertThat; - /** IT case for {@link FlinkTableSource} in Flink 2.2. */ -public class Flink22TableSourceITCase extends FlinkTableSourceITCase { - - @Test - void testDeltaJoin() throws Exception { - // start two jobs for this test: one for DML involving the delta join, and the other for DQL - // to query the results of the sink table - tEnv.getConfig().set(ExecutionConfigOptions.TABLE_EXEC_RESOURCE_DEFAULT_PARALLELISM, 2); - - String leftTableName = "left_table"; - tEnv.executeSql( - String.format( - "create table %s ( " - + " a1 int, " - + " b1 varchar, " - + " c1 bigint, " - + " d1 int, " - + " e1 bigint, " - + " primary key (c1, d1) NOT ENFORCED" - + ") with (" - + " 'connector' = 'fluss', " - + " 'bucket.key' = 'c1', " - // currently, delta join only support append-only source - + " 'table.merge-engine' = 'first_row' " - + ")", - leftTableName)); - List rows1 = - Arrays.asList( - row(1, "v1", 100L, 1, 10000L), - row(2, "v2", 200L, 2, 20000L), - row(3, "v1", 300L, 3, 30000L), - row(4, "v4", 400L, 4, 40000L)); - // write records - TablePath leftTablePath = TablePath.of(DEFAULT_DB, leftTableName); - writeRows(conn, leftTablePath, rows1, false); - - String rightTableName = "right_table"; - tEnv.executeSql( - String.format( - "create table %s (" - + " a2 int, " - + " b2 varchar, " - + " c2 bigint, " - + " d2 int, " - + " e2 bigint, " - + " primary key (c2, d2) NOT ENFORCED" - + ") with (" - + " 'connector' = 'fluss', " - + " 'bucket.key' = 'c2', " - // currently, delta join only support append-only source - + " 'table.merge-engine' = 'first_row' " - + ")", - rightTableName)); - List rows2 = - Arrays.asList( - row(1, "v1", 100L, 1, 10000L), - row(2, "v3", 200L, 2, 20000L), - row(3, "v4", 300L, 4, 30000L), - row(4, "v4", 500L, 4, 50000L)); - // write records - TablePath rightTablePath = TablePath.of(DEFAULT_DB, rightTableName); - writeRows(conn, rightTablePath, rows2, false); - - String sinkTableName = "sink_table"; - tEnv.executeSql( - String.format( - "create table %s ( " - + " a1 int, " - + " b1 varchar, " - + " c1 bigint, " - + " d1 int, " - + " e1 bigint, " - + " a2 int, " - + " b2 varchar, " - + " c2 bigint, " - + " d2 int, " - + " e2 bigint, " - + " primary key (c1, d1, c2, d2) NOT ENFORCED" - + ") with (" - + " 'connector' = 'fluss' " - + ")", - sinkTableName)); - - tEnv.getConfig() - .set( - OptimizerConfigOptions.TABLE_OPTIMIZER_DELTA_JOIN_STRATEGY, - OptimizerConfigOptions.DeltaJoinStrategy.FORCE); - - String sql = - String.format( - "INSERT INTO %s SELECT * FROM %s INNER JOIN %s ON c1 = c2 AND d1 = d2", - sinkTableName, leftTableName, rightTableName); - - assertThat(tEnv.explainSql(sql)) - .contains("DeltaJoin(joinType=[InnerJoin], where=[((c1 = c2) AND (d1 = d2))]"); - - tEnv.executeSql(sql); - - CloseableIterator collected = - tEnv.executeSql(String.format("select * from %s", sinkTableName)).collect(); - List expected = - Arrays.asList( - "+I[1, v1, 100, 1, 10000, 1, v1, 100, 1, 10000]", - "-U[1, v1, 100, 1, 10000, 1, v1, 100, 1, 10000]", - "+U[1, v1, 100, 1, 10000, 1, v1, 100, 1, 10000]", - "+I[2, v2, 200, 2, 20000, 2, v3, 200, 2, 20000]", - "-U[2, v2, 200, 2, 20000, 2, v3, 200, 2, 20000]", - "+U[2, v2, 200, 2, 20000, 2, v3, 200, 2, 20000]"); - assertResultsIgnoreOrder(collected, expected, true); - } - - @Test - void testDeltaJoinWithProjectionAndFilter() throws Exception { - // start two jobs for this test: one for DML involving the delta join, and the other for DQL - // to query the results of the sink table - tEnv.getConfig().set(ExecutionConfigOptions.TABLE_EXEC_RESOURCE_DEFAULT_PARALLELISM, 2); - - String leftTableName = "left_table_proj"; - tEnv.executeSql( - String.format( - "create table %s ( " - + " a1 int, " - + " b1 varchar, " - + " c1 bigint, " - + " d1 int, " - + " e1 bigint, " - + " primary key (c1, d1) NOT ENFORCED" - + ") with (" - + " 'connector' = 'fluss', " - + " 'bucket.key' = 'c1', " - + " 'table.merge-engine' = 'first_row' " - + ")", - leftTableName)); - List rows1 = - Arrays.asList( - row(1, "v1", 100L, 1, 10000L), - row(2, "v2", 200L, 2, 20000L), - row(3, "v1", 300L, 3, 30000L)); - TablePath leftTablePath = TablePath.of(DEFAULT_DB, leftTableName); - writeRows(conn, leftTablePath, rows1, false); - - String rightTableName = "right_table_proj"; - tEnv.executeSql( - String.format( - "create table %s (" - + " a2 int, " - + " b2 varchar, " - + " c2 bigint, " - + " d2 int, " - + " e2 bigint, " - + " primary key (c2, d2) NOT ENFORCED" - + ") with (" - + " 'connector' = 'fluss', " - + " 'bucket.key' = 'c2', " - + " 'table.merge-engine' = 'first_row' " - + ")", - rightTableName)); - List rows2 = - Arrays.asList( - row(1, "v1", 100L, 1, 10000L), - row(2, "v3", 200L, 2, 20000L), - row(3, "v4", 300L, 4, 30000L)); - TablePath rightTablePath = TablePath.of(DEFAULT_DB, rightTableName); - writeRows(conn, rightTablePath, rows2, false); - - String sinkTableName = "sink_table_proj"; - tEnv.executeSql( - String.format( - "create table %s ( " - + " a1 int, " - + " c1 bigint, " - + " a2 int, " - + " primary key (c1) NOT ENFORCED" - + ") with (" - + " 'connector' = 'fluss' " - + ")", - sinkTableName)); - - tEnv.getConfig() - .set( - OptimizerConfigOptions.TABLE_OPTIMIZER_DELTA_JOIN_STRATEGY, - OptimizerConfigOptions.DeltaJoinStrategy.FORCE); - - // Test with projection and filter - String sql = - String.format( - "INSERT INTO %s SELECT a1, c1, a2 FROM %s INNER JOIN %s ON c1 = c2 AND d1 = d2 WHERE a1 > 1", - sinkTableName, leftTableName, rightTableName); - - assertThat(tEnv.explainSql(sql)) - .contains("DeltaJoin(joinType=[InnerJoin], where=[((c1 = c2) AND (d1 = d2))]"); - - tEnv.executeSql(sql); - - CloseableIterator collected = - tEnv.executeSql(String.format("select * from %s", sinkTableName)).collect(); - List expected = Arrays.asList("+I[2, 200, 2]", "-U[2, 200, 2]", "+U[2, 200, 2]"); - assertResultsIgnoreOrder(collected, expected, true); - } - - @Test - void testDeltaJoinWithLookupCache() throws Exception { - tEnv.getConfig().set(ExecutionConfigOptions.TABLE_EXEC_RESOURCE_DEFAULT_PARALLELISM, 2); - - String leftTableName = "left_table_cache"; - tEnv.executeSql( - String.format( - "create table %s ( " - + " a1 int, " - + " c1 bigint, " - + " d1 int, " - + " primary key (c1, d1) NOT ENFORCED" - + ") with (" - + " 'connector' = 'fluss', " - + " 'bucket.key' = 'c1', " - + " 'table.merge-engine' = 'first_row' " - + ")", - leftTableName)); - List rows1 = Arrays.asList(row(1, 100L, 1)); - writeRows(conn, TablePath.of(DEFAULT_DB, leftTableName), rows1, false); - - String rightTableName = "right_table_cache"; - tEnv.executeSql( - String.format( - "create table %s (" - + " a2 int, " - + " c2 bigint, " - + " d2 int, " - + " primary key (c2, d2) NOT ENFORCED" - + ") with (" - + " 'connector' = 'fluss', " - + " 'bucket.key' = 'c2', " - + " 'table.merge-engine' = 'first_row', " - + " 'lookup.cache' = 'partial', " - + " 'lookup.partial-cache.max-rows' = '100' " - + ")", - rightTableName)); - List rows2 = Arrays.asList(row(1, 100L, 1)); - writeRows(conn, TablePath.of(DEFAULT_DB, rightTableName), rows2, false); - - String sinkTableName = "sink_table_cache"; - tEnv.executeSql( - String.format( - "create table %s ( " - + " a1 int, " - + " a2 int, " - + " primary key (a1) NOT ENFORCED" // Dummy PK - + ") with (" - + " 'connector' = 'fluss' " - + ")", - sinkTableName)); - - tEnv.getConfig() - .set( - OptimizerConfigOptions.TABLE_OPTIMIZER_DELTA_JOIN_STRATEGY, - OptimizerConfigOptions.DeltaJoinStrategy.FORCE); - - String sql = - String.format( - "INSERT INTO %s SELECT T1.a1, T2.a2 FROM %s AS T1 INNER JOIN %s AS T2 ON T1.c1 = T2.c2 AND T1.d1 = T2.d2", - sinkTableName, leftTableName, rightTableName); - - assertThat(tEnv.explainSql(sql)) - .contains("DeltaJoin(joinType=[InnerJoin], where=[((c1 = c2) AND (d1 = d2))]"); - - tEnv.executeSql(sql); - - CloseableIterator collected = - tEnv.executeSql(String.format("select * from %s", sinkTableName)).collect(); - List expected = Arrays.asList("+I[1, 1]", "-U[1, 1]", "+U[1, 1]"); - assertResultsIgnoreOrder(collected, expected, true); - } -} +public class Flink22TableSourceITCase extends FlinkTableSourceITCase {} diff --git a/fluss-flink/fluss-flink-common/src/test/java/org/apache/fluss/flink/catalog/FlinkCatalogITCase.java b/fluss-flink/fluss-flink-common/src/test/java/org/apache/fluss/flink/catalog/FlinkCatalogITCase.java index e544bfd1e9..5f6e2a048b 100644 --- a/fluss-flink/fluss-flink-common/src/test/java/org/apache/fluss/flink/catalog/FlinkCatalogITCase.java +++ b/fluss-flink/fluss-flink-common/src/test/java/org/apache/fluss/flink/catalog/FlinkCatalogITCase.java @@ -27,6 +27,7 @@ import org.apache.fluss.exception.InvalidAlterTableException; import org.apache.fluss.exception.InvalidConfigException; import org.apache.fluss.exception.InvalidTableException; +import org.apache.fluss.flink.adapter.SchemaAdapter; import org.apache.fluss.metadata.DataLakeFormat; import org.apache.fluss.metadata.TableInfo; import org.apache.fluss.metadata.TablePath; @@ -48,6 +49,7 @@ import org.apache.flink.util.CollectionUtil; import org.junit.jupiter.api.AfterAll; import org.junit.jupiter.api.AfterEach; +import org.junit.jupiter.api.Assumptions; import org.junit.jupiter.api.BeforeAll; import org.junit.jupiter.api.BeforeEach; import org.junit.jupiter.api.Test; @@ -69,6 +71,7 @@ import static org.apache.fluss.flink.FlinkConnectorOptions.BOOTSTRAP_SERVERS; import static org.apache.fluss.flink.FlinkConnectorOptions.BUCKET_KEY; import static org.apache.fluss.flink.FlinkConnectorOptions.BUCKET_NUMBER; +import static org.apache.fluss.flink.adapter.SchemaAdapter.supportIndex; import static org.apache.fluss.flink.source.testutils.FlinkRowAssertionsUtils.assertResultsIgnoreOrder; import static org.assertj.core.api.Assertions.assertThat; import static org.assertj.core.api.Assertions.assertThatThrownBy; @@ -878,6 +881,146 @@ void testCreateCatalogWithLakeProperties() throws Exception { .containsEntry("table.datalake.paimon.jdbc.password", "pass"); } + @Test + void testGetTableWithIndex() throws Exception { + Assumptions.assumeTrue(supportIndex()); + String tableName = "table_with_pk_only"; + tEnv.executeSql( + String.format( + "create table %s ( " + + " a int, " + + " b varchar, " + + " c bigint, " + + " primary key (a, b) NOT ENFORCED" + + ") with ( " + + " 'connector' = 'fluss' " + + ")", + tableName)); + CatalogTable table = (CatalogTable) catalog.getTable(new ObjectPath(DEFAULT_DB, tableName)); + Schema expectedSchema = + SchemaAdapter.withIndex( + Schema.newBuilder() + .column("a", DataTypes.INT().notNull()) + .column("b", DataTypes.STRING().notNull()) + .column("c", DataTypes.BIGINT()) + .primaryKey("a", "b") + .build(), + Collections.singletonList(Arrays.asList("a", "b"))); + assertThat(table.getUnresolvedSchema()).isEqualTo(expectedSchema); + + tableName = "table_with_prefix_bucket_key"; + tEnv.executeSql( + String.format( + "create table %s ( " + + " a int, " + + " b varchar, " + + " c bigint, " + + " primary key (a, b) NOT ENFORCED" + + ") with ( " + + " 'connector' = 'fluss', " + + " 'bucket.key' = 'a'" + + ")", + tableName)); + + table = (CatalogTable) catalog.getTable(new ObjectPath(DEFAULT_DB, tableName)); + expectedSchema = + SchemaAdapter.withIndex( + Schema.newBuilder() + .column("a", DataTypes.INT().notNull()) + .column("b", DataTypes.STRING().notNull()) + .column("c", DataTypes.BIGINT()) + .primaryKey("a", "b") + .build(), + Arrays.asList(Arrays.asList("a", "b"), Arrays.asList("a"))); + assertThat(table.getUnresolvedSchema()).isEqualTo(expectedSchema); + + tableName = "table_with_bucket_key_is_not_prefix_pk"; + tEnv.executeSql( + String.format( + "create table %s ( " + + " a int, " + + " b varchar, " + + " c bigint, " + + " primary key (a, b) NOT ENFORCED" + + ") with ( " + + " 'connector' = 'fluss', " + + " 'bucket.key' = 'b'" + + ")", + tableName)); + + table = (CatalogTable) catalog.getTable(new ObjectPath(DEFAULT_DB, tableName)); + expectedSchema = + SchemaAdapter.withIndex( + Schema.newBuilder() + .column("a", DataTypes.INT().notNull()) + .column("b", DataTypes.STRING().notNull()) + .column("c", DataTypes.BIGINT()) + .primaryKey("a", "b") + .build(), + Collections.singletonList(Arrays.asList("a", "b"))); + assertThat(table.getUnresolvedSchema()).isEqualTo(expectedSchema); + + tableName = "table_with_partition_1"; + tEnv.executeSql( + String.format( + "create table %s ( " + + " a int, " + + " b varchar, " + + " c bigint, " + + " dt varchar, " + + " primary key (a, b, dt) NOT ENFORCED " + + ") " + + " partitioned by (dt) " + + " with ( " + + " 'connector' = 'fluss', " + + " 'bucket.key' = 'a'" + + ")", + tableName)); + + table = (CatalogTable) catalog.getTable(new ObjectPath(DEFAULT_DB, tableName)); + expectedSchema = + SchemaAdapter.withIndex( + Schema.newBuilder() + .column("a", DataTypes.INT().notNull()) + .column("b", DataTypes.STRING().notNull()) + .column("c", DataTypes.BIGINT()) + .column("dt", DataTypes.STRING().notNull()) + .primaryKey("a", "b", "dt") + .build(), + Arrays.asList(Arrays.asList("a", "b", "dt"), Arrays.asList("a", "dt"))); + assertThat(table.getUnresolvedSchema()).isEqualTo(expectedSchema); + + tableName = "table_with_partition_2"; + tEnv.executeSql( + String.format( + "create table %s ( " + + " a int, " + + " b varchar, " + + " c bigint, " + + " dt varchar, " + + " primary key (dt, a, b) NOT ENFORCED " + + ") " + + " partitioned by (dt) " + + " with ( " + + " 'connector' = 'fluss', " + + " 'bucket.key' = 'a'" + + ")", + tableName)); + + table = (CatalogTable) catalog.getTable(new ObjectPath(DEFAULT_DB, tableName)); + expectedSchema = + SchemaAdapter.withIndex( + Schema.newBuilder() + .column("a", DataTypes.INT().notNull()) + .column("b", DataTypes.STRING().notNull()) + .column("c", DataTypes.BIGINT()) + .column("dt", DataTypes.STRING().notNull()) + .primaryKey("dt", "a", "b") + .build(), + Arrays.asList(Arrays.asList("dt", "a", "b"), Arrays.asList("a", "dt"))); + assertThat(table.getUnresolvedSchema()).isEqualTo(expectedSchema); + } + /** * Before Flink 2.1, the {@link Schema} did not include an index field. Starting from Flink 2.1, * Flink introduced the concept of an index, and in Fluss, the primary key is considered as an diff --git a/fluss-flink/fluss-flink-common/src/test/java/org/apache/fluss/flink/source/FlinkTableSourceITCase.java b/fluss-flink/fluss-flink-common/src/test/java/org/apache/fluss/flink/source/FlinkTableSourceITCase.java index 4d056a12df..ca1eddd500 100644 --- a/fluss-flink/fluss-flink-common/src/test/java/org/apache/fluss/flink/source/FlinkTableSourceITCase.java +++ b/fluss-flink/fluss-flink-common/src/test/java/org/apache/fluss/flink/source/FlinkTableSourceITCase.java @@ -46,6 +46,7 @@ import org.apache.flink.types.Row; import org.apache.flink.util.CloseableIterator; import org.junit.jupiter.api.AfterEach; +import org.junit.jupiter.api.Assumptions; import org.junit.jupiter.api.BeforeAll; import org.junit.jupiter.api.BeforeEach; import org.junit.jupiter.api.Test; @@ -70,6 +71,7 @@ import java.util.stream.Stream; import static org.apache.fluss.flink.FlinkConnectorOptions.BOOTSTRAP_SERVERS; +import static org.apache.fluss.flink.adapter.SchemaAdapter.supportIndex; import static org.apache.fluss.flink.source.testutils.FlinkRowAssertionsUtils.assertQueryResultExactOrder; import static org.apache.fluss.flink.source.testutils.FlinkRowAssertionsUtils.assertResultsIgnoreOrder; import static org.apache.fluss.flink.source.testutils.FlinkRowAssertionsUtils.collectRowsWithTimeout; @@ -1410,6 +1412,270 @@ void testStreamingReadPartitionComplexPushDown() throws Exception { assertResultsIgnoreOrder(rowIter, expectedRowValues, true); } + @Test + void testDeltaJoin() throws Exception { + Assumptions.assumeTrue(supportIndex()); + // start two jobs for this test: one for DML involving the delta join, and the other for DQL + // to query the results of the sink table + tEnv.getConfig().set(ExecutionConfigOptions.TABLE_EXEC_RESOURCE_DEFAULT_PARALLELISM, 2); + + String leftTableName = "left_table"; + tEnv.executeSql( + String.format( + "create table %s ( " + + " a1 int, " + + " b1 varchar, " + + " c1 bigint, " + + " d1 int, " + + " e1 bigint, " + + " primary key (c1, d1) NOT ENFORCED" + + ") with (" + + " 'connector' = 'fluss', " + + " 'bucket.key' = 'c1', " + // currently, delta join only support append-only source + + " 'table.merge-engine' = 'first_row' " + + ")", + leftTableName)); + List rows1 = + Arrays.asList( + row(1, "v1", 100L, 1, 10000L), + row(2, "v2", 200L, 2, 20000L), + row(3, "v1", 300L, 3, 30000L), + row(4, "v4", 400L, 4, 40000L)); + // write records + TablePath leftTablePath = TablePath.of(DEFAULT_DB, leftTableName); + writeRows(conn, leftTablePath, rows1, false); + + String rightTableName = "right_table"; + tEnv.executeSql( + String.format( + "create table %s (" + + " a2 int, " + + " b2 varchar, " + + " c2 bigint, " + + " d2 int, " + + " e2 bigint, " + + " primary key (c2, d2) NOT ENFORCED" + + ") with (" + + " 'connector' = 'fluss', " + + " 'bucket.key' = 'c2', " + // currently, delta join only support append-only source + + " 'table.merge-engine' = 'first_row' " + + ")", + rightTableName)); + List rows2 = + Arrays.asList( + row(1, "v1", 100L, 1, 10000L), + row(2, "v3", 200L, 2, 20000L), + row(3, "v4", 300L, 4, 30000L), + row(4, "v4", 500L, 4, 50000L)); + // write records + TablePath rightTablePath = TablePath.of(DEFAULT_DB, rightTableName); + writeRows(conn, rightTablePath, rows2, false); + + String sinkTableName = "sink_table"; + tEnv.executeSql( + String.format( + "create table %s ( " + + " a1 int, " + + " b1 varchar, " + + " c1 bigint, " + + " d1 int, " + + " e1 bigint, " + + " a2 int, " + + " b2 varchar, " + + " c2 bigint, " + + " d2 int, " + + " e2 bigint, " + + " primary key (c1, d1, c2, d2) NOT ENFORCED" + + ") with (" + + " 'connector' = 'fluss' " + + ")", + sinkTableName)); + + tEnv.getConfig().set("table.optimizer.delta-join.strategy", "FORCE"); + + String sql = + String.format( + "INSERT INTO %s SELECT * FROM %s INNER JOIN %s ON c1 = c2 AND d1 = d2", + sinkTableName, leftTableName, rightTableName); + + assertThat(tEnv.explainSql(sql)) + .contains("DeltaJoin(joinType=[InnerJoin], where=[((c1 = c2) AND (d1 = d2))]"); + + tEnv.executeSql(sql); + + CloseableIterator collected = + tEnv.executeSql(String.format("select * from %s", sinkTableName)).collect(); + List expected = + Arrays.asList( + "+I[1, v1, 100, 1, 10000, 1, v1, 100, 1, 10000]", + "-U[1, v1, 100, 1, 10000, 1, v1, 100, 1, 10000]", + "+U[1, v1, 100, 1, 10000, 1, v1, 100, 1, 10000]", + "+I[2, v2, 200, 2, 20000, 2, v3, 200, 2, 20000]", + "-U[2, v2, 200, 2, 20000, 2, v3, 200, 2, 20000]", + "+U[2, v2, 200, 2, 20000, 2, v3, 200, 2, 20000]"); + assertResultsIgnoreOrder(collected, expected, true); + } + + @Test + void testDeltaJoinWithProjectionAndFilter() throws Exception { + Assumptions.assumeTrue(supportIndex()); + // start two jobs for this test: one for DML involving the delta join, and the other for DQL + // to query the results of the sink table + tEnv.getConfig().set(ExecutionConfigOptions.TABLE_EXEC_RESOURCE_DEFAULT_PARALLELISM, 2); + + String leftTableName = "left_table_proj"; + tEnv.executeSql( + String.format( + "create table %s ( " + + " a1 int, " + + " b1 varchar, " + + " c1 bigint, " + + " d1 int, " + + " e1 bigint, " + + " primary key (c1, d1) NOT ENFORCED" + + ") with (" + + " 'connector' = 'fluss', " + + " 'bucket.key' = 'c1', " + + " 'table.merge-engine' = 'first_row' " + + ")", + leftTableName)); + List rows1 = + Arrays.asList( + row(1, "v1", 100L, 1, 10000L), + row(2, "v2", 200L, 2, 20000L), + row(3, "v1", 300L, 3, 30000L)); + TablePath leftTablePath = TablePath.of(DEFAULT_DB, leftTableName); + writeRows(conn, leftTablePath, rows1, false); + + String rightTableName = "right_table_proj"; + tEnv.executeSql( + String.format( + "create table %s (" + + " a2 int, " + + " b2 varchar, " + + " c2 bigint, " + + " d2 int, " + + " e2 bigint, " + + " primary key (c2, d2) NOT ENFORCED" + + ") with (" + + " 'connector' = 'fluss', " + + " 'bucket.key' = 'c2', " + + " 'table.merge-engine' = 'first_row' " + + ")", + rightTableName)); + List rows2 = + Arrays.asList( + row(1, "v1", 100L, 1, 10000L), + row(2, "v3", 200L, 2, 20000L), + row(3, "v4", 300L, 4, 30000L)); + TablePath rightTablePath = TablePath.of(DEFAULT_DB, rightTableName); + writeRows(conn, rightTablePath, rows2, false); + + String sinkTableName = "sink_table_proj"; + tEnv.executeSql( + String.format( + "create table %s ( " + + " a1 int, " + + " c1 bigint, " + + " a2 int, " + + " primary key (c1) NOT ENFORCED" + + ") with (" + + " 'connector' = 'fluss' " + + ")", + sinkTableName)); + + tEnv.getConfig().set("table.optimizer.delta-join.strategy", "FORCE"); + + // Test with projection and filter + String sql = + String.format( + "INSERT INTO %s SELECT a1, c1, a2 FROM %s INNER JOIN %s ON c1 = c2 AND d1 = d2 WHERE a1 > 1", + sinkTableName, leftTableName, rightTableName); + + assertThat(tEnv.explainSql(sql)) + .contains("DeltaJoin(joinType=[InnerJoin], where=[((c1 = c2) AND (d1 = d2))]"); + + tEnv.executeSql(sql); + + CloseableIterator collected = + tEnv.executeSql(String.format("select * from %s", sinkTableName)).collect(); + List expected = Arrays.asList("+I[2, 200, 2]", "-U[2, 200, 2]", "+U[2, 200, 2]"); + assertResultsIgnoreOrder(collected, expected, true); + } + + @Test + void testDeltaJoinWithLookupCache() throws Exception { + Assumptions.assumeTrue(supportIndex()); + tEnv.getConfig().set(ExecutionConfigOptions.TABLE_EXEC_RESOURCE_DEFAULT_PARALLELISM, 2); + + String leftTableName = "left_table_cache"; + tEnv.executeSql( + String.format( + "create table %s ( " + + " a1 int, " + + " c1 bigint, " + + " d1 int, " + + " primary key (c1, d1) NOT ENFORCED" + + ") with (" + + " 'connector' = 'fluss', " + + " 'bucket.key' = 'c1', " + + " 'table.merge-engine' = 'first_row' " + + ")", + leftTableName)); + List rows1 = Arrays.asList(row(1, 100L, 1)); + writeRows(conn, TablePath.of(DEFAULT_DB, leftTableName), rows1, false); + + String rightTableName = "right_table_cache"; + tEnv.executeSql( + String.format( + "create table %s (" + + " a2 int, " + + " c2 bigint, " + + " d2 int, " + + " primary key (c2, d2) NOT ENFORCED" + + ") with (" + + " 'connector' = 'fluss', " + + " 'bucket.key' = 'c2', " + + " 'table.merge-engine' = 'first_row', " + + " 'lookup.cache' = 'partial', " + + " 'lookup.partial-cache.max-rows' = '100' " + + ")", + rightTableName)); + List rows2 = Arrays.asList(row(1, 100L, 1)); + writeRows(conn, TablePath.of(DEFAULT_DB, rightTableName), rows2, false); + + String sinkTableName = "sink_table_cache"; + tEnv.executeSql( + String.format( + "create table %s ( " + + " a1 int, " + + " a2 int, " + + " primary key (a1) NOT ENFORCED" // Dummy PK + + ") with (" + + " 'connector' = 'fluss' " + + ")", + sinkTableName)); + + tEnv.getConfig().set("table.optimizer.delta-join.strategy", "FORCE"); + + String sql = + String.format( + "INSERT INTO %s SELECT T1.a1, T2.a2 FROM %s AS T1 INNER JOIN %s AS T2 ON T1.c1 = T2.c2 AND T1.d1 = T2.d2", + sinkTableName, leftTableName, rightTableName); + + assertThat(tEnv.explainSql(sql)) + .contains("DeltaJoin(joinType=[InnerJoin], where=[((c1 = c2) AND (d1 = d2))]"); + + tEnv.executeSql(sql); + + CloseableIterator collected = + tEnv.executeSql(String.format("select * from %s", sinkTableName)).collect(); + List expected = Arrays.asList("+I[1, 1]", "-U[1, 1]", "+U[1, 1]"); + assertResultsIgnoreOrder(collected, expected, true); + } + private enum Caching { ENABLE_CACHE, DISABLE_CACHE