From 05d9c237a5cb06eef6c1333d4fd1cb02cb00528c Mon Sep 17 00:00:00 2001 From: Pei Yu <125331682@qq.com> Date: Fri, 2 Jan 2026 20:42:26 +0800 Subject: [PATCH 1/4] Refactor CALL procedure "sys.xxx_cluster_config" to support multiple conifg keys Signed-off-by: Pei Yu <125331682@qq.com> --- ...e.java => GetClusterConfigsProcedure.java} | 37 ++++-- .../flink/procedure/ProcedureManager.java | 5 +- .../ResetClusterConfigsProcedure.java | 108 ++++++++++++++++++ ...e.java => SetClusterConfigsProcedure.java} | 97 +++++++--------- .../flink/procedure/FlinkProcedureITCase.java | 80 +++++++++---- 5 files changed, 240 insertions(+), 87 deletions(-) rename fluss-flink/fluss-flink-common/src/main/java/org/apache/fluss/flink/procedure/{GetClusterConfigProcedure.java => GetClusterConfigsProcedure.java} (72%) create mode 100644 fluss-flink/fluss-flink-common/src/main/java/org/apache/fluss/flink/procedure/ResetClusterConfigsProcedure.java rename fluss-flink/fluss-flink-common/src/main/java/org/apache/fluss/flink/procedure/{SetClusterConfigProcedure.java => SetClusterConfigsProcedure.java} (50%) diff --git a/fluss-flink/fluss-flink-common/src/main/java/org/apache/fluss/flink/procedure/GetClusterConfigProcedure.java b/fluss-flink/fluss-flink-common/src/main/java/org/apache/fluss/flink/procedure/GetClusterConfigsProcedure.java similarity index 72% rename from fluss-flink/fluss-flink-common/src/main/java/org/apache/fluss/flink/procedure/GetClusterConfigProcedure.java rename to fluss-flink/fluss-flink-common/src/main/java/org/apache/fluss/flink/procedure/GetClusterConfigsProcedure.java index 444f1dd13d..ff2688bc86 100644 --- a/fluss-flink/fluss-flink-common/src/main/java/org/apache/fluss/flink/procedure/GetClusterConfigProcedure.java +++ b/fluss-flink/fluss-flink-common/src/main/java/org/apache/fluss/flink/procedure/GetClusterConfigsProcedure.java @@ -30,6 +30,9 @@ import java.util.ArrayList; import java.util.Collection; import java.util.List; +import java.util.Map; +import java.util.function.Function; +import java.util.stream.Collectors; /** * Procedure to get cluster configuration(s). @@ -37,7 +40,7 @@ *

This procedure allows querying dynamic cluster configurations. It can retrieve: * *

* @@ -45,39 +48,43 @@ * *
  * -- Get a specific configuration
- * CALL sys.get_cluster_config('kv.rocksdb.shared-rate-limiter.bytes-per-sec');
+ * CALL sys.get_cluster_configs('kv.rocksdb.shared-rate-limiter.bytes-per-sec');
+ *
+ * -- Get multiple configurations
+ * CALL sys.get_cluster_configs('kv.rocksdb.shared-rate-limiter.bytes-per-sec', 'datalake.format');
  *
  * -- Get all cluster configurations
- * CALL sys.get_cluster_config();
+ * CALL sys.get_cluster_configs();
  * 
*/ -public class GetClusterConfigProcedure extends ProcedureBase { +public class GetClusterConfigsProcedure extends ProcedureBase { @ProcedureHint( output = @DataTypeHint( "ROW")) public Row[] call(ProcedureContext context) throws Exception { - return getConfigs(null); + return getConfigs(); } @ProcedureHint( argument = {@ArgumentHint(name = "config_key", type = @DataTypeHint("STRING"))}, + isVarArgs = true, output = @DataTypeHint( "ROW")) - public Row[] call(ProcedureContext context, String configKey) throws Exception { + public Row[] call(ProcedureContext context, String... configKey) throws Exception { return getConfigs(configKey); } - private Row[] getConfigs(@Nullable String configKey) throws Exception { + private Row[] getConfigs(@Nullable String... configKeys) throws Exception { try { // Get all cluster configurations Collection configs = admin.describeClusterConfigs().get(); List results = new ArrayList<>(); - if (configKey == null || configKey.isEmpty()) { + if (configKeys == null || configKeys.length == 0) { // Return all configurations for (ConfigEntry entry : configs) { results.add( @@ -87,9 +94,14 @@ private Row[] getConfigs(@Nullable String configKey) throws Exception { entry.source() != null ? entry.source().name() : "UNKNOWN")); } } else { - // Find specific configuration - for (ConfigEntry entry : configs) { - if (entry.key().equals(configKey)) { + // Find configurations + // The order of the results is the same as that of the key. + Map configEntryMap = + configs.stream() + .collect(Collectors.toMap(ConfigEntry::key, Function.identity())); + for (String key : configKeys) { + ConfigEntry entry = configEntryMap.get(key); + if (null != entry) { results.add( Row.of( entry.key(), @@ -97,7 +109,8 @@ private Row[] getConfigs(@Nullable String configKey) throws Exception { entry.source() != null ? entry.source().name() : "UNKNOWN")); - break; + } else { + results.add(Row.of()); } } } diff --git a/fluss-flink/fluss-flink-common/src/main/java/org/apache/fluss/flink/procedure/ProcedureManager.java b/fluss-flink/fluss-flink-common/src/main/java/org/apache/fluss/flink/procedure/ProcedureManager.java index 12d4cf3675..7ebf73b818 100644 --- a/fluss-flink/fluss-flink-common/src/main/java/org/apache/fluss/flink/procedure/ProcedureManager.java +++ b/fluss-flink/fluss-flink-common/src/main/java/org/apache/fluss/flink/procedure/ProcedureManager.java @@ -70,8 +70,9 @@ private enum ProcedureEnum { ADD_ACL("sys.add_acl", AddAclProcedure.class), DROP_ACL("sys.drop_acl", DropAclProcedure.class), List_ACL("sys.list_acl", ListAclProcedure.class), - SET_CLUSTER_CONFIG("sys.set_cluster_config", SetClusterConfigProcedure.class), - GET_CLUSTER_CONFIG("sys.get_cluster_config", GetClusterConfigProcedure.class), + SET_CLUSTER_CONFIGS("sys.set_cluster_configs", SetClusterConfigsProcedure.class), + GET_CLUSTER_CONFIGS("sys.get_cluster_configs", GetClusterConfigsProcedure.class), + RESET_CLUSTER_CONFIGS("sys.reset_cluster_configs", ResetClusterConfigsProcedure.class), ADD_SERVER_TAG("sys.add_server_tag", AddServerTagProcedure.class), REMOVE_SERVER_TAG("sys.remove_server_tag", RemoveServerTagProcedure.class); diff --git a/fluss-flink/fluss-flink-common/src/main/java/org/apache/fluss/flink/procedure/ResetClusterConfigsProcedure.java b/fluss-flink/fluss-flink-common/src/main/java/org/apache/fluss/flink/procedure/ResetClusterConfigsProcedure.java new file mode 100644 index 0000000000..745e55f17c --- /dev/null +++ b/fluss-flink/fluss-flink-common/src/main/java/org/apache/fluss/flink/procedure/ResetClusterConfigsProcedure.java @@ -0,0 +1,108 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.fluss.flink.procedure; + +import org.apache.fluss.config.cluster.AlterConfig; +import org.apache.fluss.config.cluster.AlterConfigOpType; + +import org.apache.flink.table.annotation.ArgumentHint; +import org.apache.flink.table.annotation.DataTypeHint; +import org.apache.flink.table.annotation.ProcedureHint; +import org.apache.flink.table.procedure.ProcedureContext; + +import java.util.ArrayList; +import java.util.List; + +/** + * Procedure to reset cluster configuration dynamically. + * + *

This procedure allows modifying dynamic cluster configurations. The changes are: + * + *

    + *
  • Validated by the CoordinatorServer before persistence + *
  • Persisted in ZooKeeper for durability + *
  • Applied to all relevant servers (Coordinator and TabletServers) + *
  • Survive server restarts + *
+ * + *

Usage examples: + * + *

+ * -- reset a configuration
+ * CALL sys.reset_cluster_configs('kv.rocksdb.shared-rate-limiter.bytes-per-sec');
+ *
+ * -- reset multiple configurations at one time
+ * CALL sys.reset_cluster_configs('kv.rocksdb.shared-rate-limiter.bytes-per-sec','datalake.format');
+ *
+ * 
+ * + *

Note: Not all configurations support dynamic changes. The server will validate the + * change and reject it if the configuration cannot be reset dynamically. + */ +public class ResetClusterConfigsProcedure extends ProcedureBase { + + @ProcedureHint( + argument = {@ArgumentHint(name = "config_keys", type = @DataTypeHint("STRING"))}, + isVarArgs = true) + public String[] call(ProcedureContext context, String... configKeys) throws Exception { + try { + // Validate config key + if (configKeys.length == 0) { + throw new IllegalArgumentException( + "config_pairs cannot be null or empty. " + + "Please specify valid configuration keys."); + } + + List configList = new ArrayList<>(); + StringBuilder resultMessage = new StringBuilder(); + + for (String key : configKeys) { + String configKey = key.trim(); + if (configKey.isEmpty()) { + throw new IllegalArgumentException( + "Config key cannot be null or empty. " + + "Please specify valid configuration key."); + } + + String operationDesc = "deleted (reset to default)"; + + AlterConfig alterConfig = + new AlterConfig(configKey, null, AlterConfigOpType.DELETE); + configList.add(alterConfig); + resultMessage.append( + String.format( + "Successfully %s configuration '%s'. ", operationDesc, configKey)); + } + + // Call Admin API to modify cluster configuration + // This will trigger validation on CoordinatorServer before persistence + admin.alterClusterConfigs(configList).get(); + + return new String[] { + resultMessage + "The change is persisted in ZooKeeper and applied to all servers." + }; + } catch (IllegalArgumentException e) { + // Re-throw validation errors with original message + throw e; + } catch (Exception e) { + // Wrap other exceptions with more context + throw new RuntimeException( + String.format("Failed to reset cluster config: %s", e.getMessage()), e); + } + } +} diff --git a/fluss-flink/fluss-flink-common/src/main/java/org/apache/fluss/flink/procedure/SetClusterConfigProcedure.java b/fluss-flink/fluss-flink-common/src/main/java/org/apache/fluss/flink/procedure/SetClusterConfigsProcedure.java similarity index 50% rename from fluss-flink/fluss-flink-common/src/main/java/org/apache/fluss/flink/procedure/SetClusterConfigProcedure.java rename to fluss-flink/fluss-flink-common/src/main/java/org/apache/fluss/flink/procedure/SetClusterConfigsProcedure.java index ee4dbcf37e..4a9e0266a8 100644 --- a/fluss-flink/fluss-flink-common/src/main/java/org/apache/fluss/flink/procedure/SetClusterConfigProcedure.java +++ b/fluss-flink/fluss-flink-common/src/main/java/org/apache/fluss/flink/procedure/SetClusterConfigsProcedure.java @@ -25,9 +25,8 @@ import org.apache.flink.table.annotation.ProcedureHint; import org.apache.flink.table.procedure.ProcedureContext; -import javax.annotation.Nullable; - -import java.util.Collections; +import java.util.ArrayList; +import java.util.List; /** * Procedure to set or delete cluster configuration dynamically. @@ -45,84 +44,76 @@ * *

  * -- Set a configuration
- * CALL sys.set_cluster_config('kv.rocksdb.shared-rate-limiter.bytes-per-sec', '200MB');
- * CALL sys.set_cluster_config('datalake.format', 'paimon');
+ * CALL sys.set_cluster_configs('kv.rocksdb.shared-rate-limiter.bytes-per-sec', '200MB');
+ * CALL sys.set_cluster_configs('datalake.format', 'paimon');
+ *
+ * -- Set multiple configurations at one time
+ * CALL sys.set_cluster_configs('kv.rocksdb.shared-rate-limiter.bytes-per-sec', '200MB','datalake.format', 'paimon');
  *
  * -- Delete a configuration (reset to default)
- * CALL sys.set_cluster_config('kv.rocksdb.shared-rate-limiter.bytes-per-sec', NULL);
- * CALL sys.set_cluster_config('kv.rocksdb.shared-rate-limiter.bytes-per-sec', '');
+ * CALL sys.set_cluster_configs('kv.rocksdb.shared-rate-limiter.bytes-per-sec', '');
  * 
* *

Note: Not all configurations support dynamic changes. The server will validate the * change and reject it if the configuration cannot be modified dynamically or if the new value is * invalid. */ -public class SetClusterConfigProcedure extends ProcedureBase { - - @ProcedureHint(argument = {@ArgumentHint(name = "config_key", type = @DataTypeHint("STRING"))}) - public String[] call(ProcedureContext context, String configKey) throws Exception { - return performSet(configKey, null); - } +public class SetClusterConfigsProcedure extends ProcedureBase { @ProcedureHint( - argument = { - @ArgumentHint(name = "config_key", type = @DataTypeHint("STRING")), - @ArgumentHint(name = "config_value", type = @DataTypeHint("STRING")) - }) - public String[] call(ProcedureContext context, String configKey, String configValue) - throws Exception { - return performSet(configKey, configValue); - } - - private String[] performSet(String configKey, @Nullable String configValue) throws Exception { - + argument = {@ArgumentHint(name = "config_pairs", type = @DataTypeHint("STRING"))}, + isVarArgs = true) + public String[] call(ProcedureContext context, String... configPairs) throws Exception { try { // Validate config key - if (configKey == null || configKey.trim().isEmpty()) { + if (configPairs.length == 0) { throw new IllegalArgumentException( - "Config key cannot be null or empty. " - + "Please specify a valid configuration key."); + "config_pairs cannot be null or empty. " + + "Please specify a valid configuration pairs."); } - configKey = configKey.trim(); - - // Determine operation type - AlterConfigOpType opType; - String operationDesc; - - if (configValue == null || configValue.trim().isEmpty()) { - // Delete operation - reset to default - opType = AlterConfigOpType.DELETE; - operationDesc = "deleted (reset to default)"; - } else { - // Set operation - opType = AlterConfigOpType.SET; - operationDesc = String.format("set to '%s'", configValue); + if (configPairs.length % 2 != 0) { + throw new IllegalArgumentException( + "config_pairs must be set in pairs. " + + "Please specify a valid configuration pairs."); + } + List configList = new ArrayList<>(); + StringBuilder resultMessage = new StringBuilder(); + + for (int i = 0; i < configPairs.length; i += 2) { + String configKey = configPairs[i].trim(); + if (configKey.isEmpty()) { + throw new IllegalArgumentException( + "Config key cannot be null or empty. " + + "Please specify a valid configuration key."); + } + String configValue = configPairs[i + 1]; + + String operationDesc = String.format("set to '%s'", configValue); + + // Construct configuration modification operation. + AlterConfig alterConfig = + new AlterConfig(configKey, configValue, AlterConfigOpType.SET); + configList.add(alterConfig); + resultMessage.append( + String.format( + "Successfully %s configuration '%s'. ", operationDesc, configKey)); } - - // Construct configuration modification operation. - AlterConfig alterConfig = new AlterConfig(configKey, configValue, opType); // Call Admin API to modify cluster configuration // This will trigger validation on CoordinatorServer before persistence - admin.alterClusterConfigs(Collections.singletonList(alterConfig)).get(); + admin.alterClusterConfigs(configList).get(); return new String[] { - String.format( - "Successfully %s configuration '%s'. " - + "The change is persisted in ZooKeeper and applied to all servers.", - operationDesc, configKey) + resultMessage + "The change is persisted in ZooKeeper and applied to all servers." }; - } catch (IllegalArgumentException e) { // Re-throw validation errors with original message throw e; } catch (Exception e) { // Wrap other exceptions with more context throw new RuntimeException( - String.format( - "Failed to set cluster config '%s': %s", configKey, e.getMessage()), - e); + String.format("Failed to set cluster config: %s", e.getMessage()), e); } } } diff --git a/fluss-flink/fluss-flink-common/src/test/java/org/apache/fluss/flink/procedure/FlinkProcedureITCase.java b/fluss-flink/fluss-flink-common/src/test/java/org/apache/fluss/flink/procedure/FlinkProcedureITCase.java index b764923050..9d3ecfa812 100644 --- a/fluss-flink/fluss-flink-common/src/test/java/org/apache/fluss/flink/procedure/FlinkProcedureITCase.java +++ b/fluss-flink/fluss-flink-common/src/test/java/org/apache/fluss/flink/procedure/FlinkProcedureITCase.java @@ -100,9 +100,10 @@ void testShowProcedures() throws Exception { Arrays.asList( "+I[sys.add_acl]", "+I[sys.drop_acl]", - "+I[sys.get_cluster_config]", + "+I[sys.get_cluster_configs]", "+I[sys.list_acl]", - "+I[sys.set_cluster_config]", + "+I[sys.set_cluster_configs]", + "+I[sys.reset_cluster_configs]", "+I[sys.add_server_tag]", "+I[sys.remove_server_tag]"); // make sure no more results is unread. @@ -265,12 +266,12 @@ void testDisableAuthorization() throws Exception { } @Test - void testGetClusterConfig() throws Exception { + void testGetClusterConfigs() throws Exception { // Get specific config try (CloseableIterator resultIterator = tEnv.executeSql( String.format( - "Call %s.sys.get_cluster_config('%s')", + "Call %s.sys.get_cluster_configs('%s')", CATALOG_NAME, ConfigOptions.KV_SHARED_RATE_LIMITER_BYTES_PER_SEC.key())) .collect()) { @@ -285,7 +286,7 @@ void testGetClusterConfig() throws Exception { // Get all configs try (CloseableIterator resultIterator = - tEnv.executeSql(String.format("Call %s.sys.get_cluster_config()", CATALOG_NAME)) + tEnv.executeSql(String.format("Call %s.sys.get_cluster_configs()", CATALOG_NAME)) .collect()) { List results = CollectionUtil.iteratorToList(resultIterator); assertThat(results).isNotEmpty(); @@ -295,7 +296,7 @@ void testGetClusterConfig() throws Exception { try (CloseableIterator resultIterator = tEnv.executeSql( String.format( - "Call %s.sys.get_cluster_config('non.existent.config')", + "Call %s.sys.get_cluster_configs('non.existent.config')", CATALOG_NAME)) .collect()) { List results = CollectionUtil.iteratorToList(resultIterator); @@ -305,19 +306,19 @@ void testGetClusterConfig() throws Exception { // reset cluster configs. tEnv.executeSql( String.format( - "Call %s.sys.set_cluster_config('%s')", + "Call %s.sys.reset_cluster_configs('%s')", CATALOG_NAME, ConfigOptions.KV_SHARED_RATE_LIMITER_BYTES_PER_SEC.key())) .await(); } @Test - void testSetClusterConfig() throws Exception { + void testSetClusterConfigs() throws Exception { // Test setting a valid config try (CloseableIterator resultIterator = tEnv.executeSql( String.format( - "Call %s.sys.set_cluster_config('%s', '200MB')", + "Call %s.sys.set_cluster_configs('%s', '200MB')", CATALOG_NAME, ConfigOptions.KV_SHARED_RATE_LIMITER_BYTES_PER_SEC.key())) .collect()) { @@ -329,50 +330,74 @@ void testSetClusterConfig() throws Exception { .contains(ConfigOptions.KV_SHARED_RATE_LIMITER_BYTES_PER_SEC.key()); } + // Test setting multiple config + try (CloseableIterator resultIterator = + tEnv.executeSql( + String.format( + "Call %s.sys.set_cluster_configs('%s', '300MB', '%s', 'paimon')", + CATALOG_NAME, + ConfigOptions.KV_SHARED_RATE_LIMITER_BYTES_PER_SEC.key(), + ConfigOptions.DATALAKE_FORMAT.key())) + .collect()) { + List results = CollectionUtil.iteratorToList(resultIterator); + assertThat(results).hasSize(1); + assertThat(results.get(0).getField(0)) + .asString() + .contains("Successfully set to '300MB'") + .contains(ConfigOptions.KV_SHARED_RATE_LIMITER_BYTES_PER_SEC.key()) + .contains("Successfully set to 'paimon'") + .contains(ConfigOptions.DATALAKE_FORMAT.key()); + } + // Verify the config was actually set try (CloseableIterator resultIterator = tEnv.executeSql( String.format( - "Call %s.sys.get_cluster_config('%s')", + "Call %s.sys.get_cluster_configs('%s')", CATALOG_NAME, ConfigOptions.KV_SHARED_RATE_LIMITER_BYTES_PER_SEC.key())) .collect()) { List results = CollectionUtil.iteratorToList(resultIterator); assertThat(results).hasSize(1); - assertThat(results.get(0).getField(1)).isEqualTo("200MB"); + assertThat(results.get(0).getField(1)).isEqualTo("300MB"); } // reset cluster configs. tEnv.executeSql( String.format( - "Call %s.sys.set_cluster_config('%s')", + "Call %s.sys.reset_cluster_configs('%s')", CATALOG_NAME, ConfigOptions.KV_SHARED_RATE_LIMITER_BYTES_PER_SEC.key())) .await(); } @Test - void testDeleteClusterConfig() throws Exception { + void testResetClusterConfigs() throws Exception { // First set a config tEnv.executeSql( String.format( - "Call %s.sys.set_cluster_config('%s', 'paimon')", - CATALOG_NAME, ConfigOptions.DATALAKE_FORMAT.key())) + "Call %s.sys.set_cluster_configs('%s', 'paimon', '%s', '200MB')", + CATALOG_NAME, + ConfigOptions.DATALAKE_FORMAT.key(), + ConfigOptions.KV_SHARED_RATE_LIMITER_BYTES_PER_SEC.key())) .await(); - // Delete the config (reset to default) - omitting the value parameter + // Delete the config (reset to default) try (CloseableIterator resultIterator = tEnv.executeSql( String.format( - "Call %s.sys.set_cluster_config('%s')", - CATALOG_NAME, ConfigOptions.DATALAKE_FORMAT.key())) + "Call %s.sys.reset_cluster_configs('%s', '%s')", + CATALOG_NAME, + ConfigOptions.DATALAKE_FORMAT.key(), + ConfigOptions.KV_SHARED_RATE_LIMITER_BYTES_PER_SEC.key())) .collect()) { List results = CollectionUtil.iteratorToList(resultIterator); assertThat(results).hasSize(1); assertThat(results.get(0).getField(0)) .asString() .contains("Successfully deleted") - .contains(ConfigOptions.DATALAKE_FORMAT.key()); + .contains(ConfigOptions.DATALAKE_FORMAT.key()) + .contains(ConfigOptions.KV_SHARED_RATE_LIMITER_BYTES_PER_SEC.key()); } } @@ -383,12 +408,27 @@ void testSetClusterConfigValidation() throws Exception { () -> tEnv.executeSql( String.format( - "Call %s.sys.set_cluster_config('invalid.config.key', 'value')", + "Call %s.sys.set_cluster_configs('invalid.config.key', 'value')", CATALOG_NAME)) .await()) .rootCause() .hasMessageContaining( "The config key invalid.config.key is not allowed to be changed dynamically"); + + // validation to ensure an even number of arguments are passed + assertThatThrownBy( + () -> + tEnv.executeSql( + String.format( + "Call %s.sys.set_cluster_configs('%s')", + CATALOG_NAME, + ConfigOptions + .KV_SHARED_RATE_LIMITER_BYTES_PER_SEC + .key())) + .await()) + .rootCause() + .hasMessageContaining( + "config_pairs must be set in pairs. Please specify a valid configuration pairs."); } @ParameterizedTest From 0ae691e52454c14283716e21c5c11e7190b54562 Mon Sep 17 00:00:00 2001 From: Pei Yu <125331682@qq.com> Date: Mon, 5 Jan 2026 23:29:28 +0800 Subject: [PATCH 2/4] Refactor CALL procedure "sys.xxx_cluster_config" to support multiple conifg keys Signed-off-by: Pei Yu <125331682@qq.com> --- .../procedure/GetClusterConfigsProcedure.java | 8 +- .../ResetClusterConfigsProcedure.java | 22 ++-- .../procedure/SetClusterConfigsProcedure.java | 11 +- .../flink/procedure/FlinkProcedureITCase.java | 101 ++++++++++++++++-- website/docs/engine-flink/procedures.md | 91 ++++++++++------ 5 files changed, 171 insertions(+), 62 deletions(-) diff --git a/fluss-flink/fluss-flink-common/src/main/java/org/apache/fluss/flink/procedure/GetClusterConfigsProcedure.java b/fluss-flink/fluss-flink-common/src/main/java/org/apache/fluss/flink/procedure/GetClusterConfigsProcedure.java index ff2688bc86..0bdcfaff8b 100644 --- a/fluss-flink/fluss-flink-common/src/main/java/org/apache/fluss/flink/procedure/GetClusterConfigsProcedure.java +++ b/fluss-flink/fluss-flink-common/src/main/java/org/apache/fluss/flink/procedure/GetClusterConfigsProcedure.java @@ -68,13 +68,13 @@ public Row[] call(ProcedureContext context) throws Exception { } @ProcedureHint( - argument = {@ArgumentHint(name = "config_key", type = @DataTypeHint("STRING"))}, + argument = {@ArgumentHint(name = "config_keys", type = @DataTypeHint("STRING"))}, isVarArgs = true, output = @DataTypeHint( "ROW")) - public Row[] call(ProcedureContext context, String... configKey) throws Exception { - return getConfigs(configKey); + public Row[] call(ProcedureContext context, String... configKeys) throws Exception { + return getConfigs(configKeys); } private Row[] getConfigs(@Nullable String... configKeys) throws Exception { @@ -109,8 +109,6 @@ private Row[] getConfigs(@Nullable String... configKeys) throws Exception { entry.source() != null ? entry.source().name() : "UNKNOWN")); - } else { - results.add(Row.of()); } } } diff --git a/fluss-flink/fluss-flink-common/src/main/java/org/apache/fluss/flink/procedure/ResetClusterConfigsProcedure.java b/fluss-flink/fluss-flink-common/src/main/java/org/apache/fluss/flink/procedure/ResetClusterConfigsProcedure.java index 745e55f17c..6dafff8a4a 100644 --- a/fluss-flink/fluss-flink-common/src/main/java/org/apache/fluss/flink/procedure/ResetClusterConfigsProcedure.java +++ b/fluss-flink/fluss-flink-common/src/main/java/org/apache/fluss/flink/procedure/ResetClusterConfigsProcedure.java @@ -29,9 +29,9 @@ import java.util.List; /** - * Procedure to reset cluster configuration dynamically. + * Procedure to reset cluster configurations to their default values. * - *

This procedure allows modifying dynamic cluster configurations. The changes are: + *

This procedure reverts the configurations to their initial system defaults. The changes are: * *

    *
  • Validated by the CoordinatorServer before persistence @@ -47,12 +47,12 @@ * CALL sys.reset_cluster_configs('kv.rocksdb.shared-rate-limiter.bytes-per-sec'); * * -- reset multiple configurations at one time - * CALL sys.reset_cluster_configs('kv.rocksdb.shared-rate-limiter.bytes-per-sec','datalake.format'); + * CALL sys.reset_cluster_configs('kv.rocksdb.shared-rate-limiter.bytes-per-sec', 'datalake.format'); * * * - *

    Note: Not all configurations support dynamic changes. The server will validate the - * change and reject it if the configuration cannot be reset dynamically. + *

    Note: In theory, an operation like Reset to default value should always succeed, + * as the default value should be a valid one */ public class ResetClusterConfigsProcedure extends ProcedureBase { @@ -64,19 +64,19 @@ public String[] call(ProcedureContext context, String... configKeys) throws Exce // Validate config key if (configKeys.length == 0) { throw new IllegalArgumentException( - "config_pairs cannot be null or empty. " + "config_keys cannot be null or empty. " + "Please specify valid configuration keys."); } List configList = new ArrayList<>(); - StringBuilder resultMessage = new StringBuilder(); + List resultMessage = new ArrayList(); for (String key : configKeys) { String configKey = key.trim(); if (configKey.isEmpty()) { throw new IllegalArgumentException( "Config key cannot be null or empty. " - + "Please specify valid configuration key."); + + "Please specify a valid configuration key."); } String operationDesc = "deleted (reset to default)"; @@ -84,7 +84,7 @@ public String[] call(ProcedureContext context, String... configKeys) throws Exce AlterConfig alterConfig = new AlterConfig(configKey, null, AlterConfigOpType.DELETE); configList.add(alterConfig); - resultMessage.append( + resultMessage.add( String.format( "Successfully %s configuration '%s'. ", operationDesc, configKey)); } @@ -93,9 +93,7 @@ public String[] call(ProcedureContext context, String... configKeys) throws Exce // This will trigger validation on CoordinatorServer before persistence admin.alterClusterConfigs(configList).get(); - return new String[] { - resultMessage + "The change is persisted in ZooKeeper and applied to all servers." - }; + return resultMessage.toArray(new String[0]); } catch (IllegalArgumentException e) { // Re-throw validation errors with original message throw e; diff --git a/fluss-flink/fluss-flink-common/src/main/java/org/apache/fluss/flink/procedure/SetClusterConfigsProcedure.java b/fluss-flink/fluss-flink-common/src/main/java/org/apache/fluss/flink/procedure/SetClusterConfigsProcedure.java index 4a9e0266a8..7246706af8 100644 --- a/fluss-flink/fluss-flink-common/src/main/java/org/apache/fluss/flink/procedure/SetClusterConfigsProcedure.java +++ b/fluss-flink/fluss-flink-common/src/main/java/org/apache/fluss/flink/procedure/SetClusterConfigsProcedure.java @@ -49,9 +49,6 @@ * * -- Set multiple configurations at one time * CALL sys.set_cluster_configs('kv.rocksdb.shared-rate-limiter.bytes-per-sec', '200MB','datalake.format', 'paimon'); - * - * -- Delete a configuration (reset to default) - * CALL sys.set_cluster_configs('kv.rocksdb.shared-rate-limiter.bytes-per-sec', ''); * * *

    Note: Not all configurations support dynamic changes. The server will validate the @@ -78,7 +75,7 @@ public String[] call(ProcedureContext context, String... configPairs) throws Exc + "Please specify a valid configuration pairs."); } List configList = new ArrayList<>(); - StringBuilder resultMessage = new StringBuilder(); + List resultMessage = new ArrayList<>(); for (int i = 0; i < configPairs.length; i += 2) { String configKey = configPairs[i].trim(); @@ -95,7 +92,7 @@ public String[] call(ProcedureContext context, String... configPairs) throws Exc AlterConfig alterConfig = new AlterConfig(configKey, configValue, AlterConfigOpType.SET); configList.add(alterConfig); - resultMessage.append( + resultMessage.add( String.format( "Successfully %s configuration '%s'. ", operationDesc, configKey)); } @@ -104,9 +101,7 @@ public String[] call(ProcedureContext context, String... configPairs) throws Exc // This will trigger validation on CoordinatorServer before persistence admin.alterClusterConfigs(configList).get(); - return new String[] { - resultMessage + "The change is persisted in ZooKeeper and applied to all servers." - }; + return resultMessage.toArray(new String[0]); } catch (IllegalArgumentException e) { // Re-throw validation errors with original message throw e; diff --git a/fluss-flink/fluss-flink-common/src/test/java/org/apache/fluss/flink/procedure/FlinkProcedureITCase.java b/fluss-flink/fluss-flink-common/src/test/java/org/apache/fluss/flink/procedure/FlinkProcedureITCase.java index 9d3ecfa812..da40bca450 100644 --- a/fluss-flink/fluss-flink-common/src/test/java/org/apache/fluss/flink/procedure/FlinkProcedureITCase.java +++ b/fluss-flink/fluss-flink-common/src/test/java/org/apache/fluss/flink/procedure/FlinkProcedureITCase.java @@ -284,6 +284,31 @@ void testGetClusterConfigs() throws Exception { assertThat(row.getField(2)).isNotNull(); // config_source } + // Get multiple config + try (CloseableIterator resultIterator = + tEnv.executeSql( + String.format( + "Call %s.sys.get_cluster_configs('%s', '%s')", + CATALOG_NAME, + ConfigOptions.KV_SHARED_RATE_LIMITER_BYTES_PER_SEC.key(), + ConfigOptions.DATALAKE_FORMAT.key())) + .collect()) { + List results = CollectionUtil.iteratorToList(resultIterator); + assertThat(results).hasSize(2); + // the first row + Row row0 = results.get(0); + assertThat(row0.getField(0)) + .isEqualTo(ConfigOptions.KV_SHARED_RATE_LIMITER_BYTES_PER_SEC.key()); + assertThat(row0.getField(1)).isEqualTo("100 mb"); + assertThat(row0.getField(2)).isNotNull(); // config_source + + // the second row + Row row1 = results.get(1); + assertThat(row1.getField(0)).isEqualTo(ConfigOptions.DATALAKE_FORMAT.key()); + assertThat(row1.getField(1)).isEqualTo("paimon"); + assertThat(row1.getField(2)).isNotNull(); // config_source + } + // Get all configs try (CloseableIterator resultIterator = tEnv.executeSql(String.format("Call %s.sys.get_cluster_configs()", CATALOG_NAME)) @@ -340,11 +365,14 @@ void testSetClusterConfigs() throws Exception { ConfigOptions.DATALAKE_FORMAT.key())) .collect()) { List results = CollectionUtil.iteratorToList(resultIterator); - assertThat(results).hasSize(1); + assertThat(results).hasSize(2); assertThat(results.get(0).getField(0)) .asString() .contains("Successfully set to '300MB'") - .contains(ConfigOptions.KV_SHARED_RATE_LIMITER_BYTES_PER_SEC.key()) + .contains(ConfigOptions.KV_SHARED_RATE_LIMITER_BYTES_PER_SEC.key()); + + assertThat(results.get(1).getField(0)) + .asString() .contains("Successfully set to 'paimon'") .contains(ConfigOptions.DATALAKE_FORMAT.key()); } @@ -365,9 +393,10 @@ void testSetClusterConfigs() throws Exception { // reset cluster configs. tEnv.executeSql( String.format( - "Call %s.sys.reset_cluster_configs('%s')", + "Call %s.sys.reset_cluster_configs('%s', '%s')", CATALOG_NAME, - ConfigOptions.KV_SHARED_RATE_LIMITER_BYTES_PER_SEC.key())) + ConfigOptions.KV_SHARED_RATE_LIMITER_BYTES_PER_SEC.key(), + ConfigOptions.DATALAKE_FORMAT.key())) .await(); } @@ -392,11 +421,15 @@ void testResetClusterConfigs() throws Exception { ConfigOptions.KV_SHARED_RATE_LIMITER_BYTES_PER_SEC.key())) .collect()) { List results = CollectionUtil.iteratorToList(resultIterator); - assertThat(results).hasSize(1); + assertThat(results).hasSize(2); assertThat(results.get(0).getField(0)) .asString() .contains("Successfully deleted") - .contains(ConfigOptions.DATALAKE_FORMAT.key()) + .contains(ConfigOptions.DATALAKE_FORMAT.key()); + + assertThat(results.get(1).getField(0)) + .asString() + .contains("Successfully deleted") .contains(ConfigOptions.KV_SHARED_RATE_LIMITER_BYTES_PER_SEC.key()); } } @@ -428,7 +461,61 @@ void testSetClusterConfigValidation() throws Exception { .await()) .rootCause() .hasMessageContaining( - "config_pairs must be set in pairs. Please specify a valid configuration pairs."); + "config_pairs must be set in pairs. Please specify a valid configuration pairs"); + + // Try to no parameters passed + assertThatThrownBy( + () -> + tEnv.executeSql( + String.format( + "Call %s.sys.set_cluster_configs()", + CATALOG_NAME)) + .await()) + .rootCause() + .hasMessageContaining( + "config_pairs cannot be null or empty. Please specify a valid configuration pairs"); + + // Try to mismatched key-value pairs in the input parameters. + assertThatThrownBy( + () -> + tEnv.executeSql( + String.format( + "Call %s.sys.set_cluster_configs('%s', 'paimon')", + CATALOG_NAME, + ConfigOptions + .KV_SHARED_RATE_LIMITER_BYTES_PER_SEC + .key())) + .await()) + .rootCause() + .hasMessageContaining( + "Could not parse value 'paimon' for key 'kv.rocksdb.shared-rate-limiter.bytes-per-sec'"); + } + + @Test + void testResetClusterConfigValidation() throws Exception { + // Try to reset an invalid config + assertThatThrownBy( + () -> + tEnv.executeSql( + String.format( + "Call %s.sys.reset_cluster_configs('invalid.config.key')", + CATALOG_NAME)) + .await()) + .rootCause() + .hasMessageContaining( + "The config key invalid.config.key is not allowed to be changed dynamically"); + + // Try to no parameters passed + assertThatThrownBy( + () -> + tEnv.executeSql( + String.format( + "Call %s.sys.reset_cluster_configs()", + CATALOG_NAME)) + .await()) + .rootCause() + .hasMessageContaining( + "config_keys cannot be null or empty. Please specify valid configuration keys"); } @ParameterizedTest diff --git a/website/docs/engine-flink/procedures.md b/website/docs/engine-flink/procedures.md index 95e8724eb0..3f346c03c7 100644 --- a/website/docs/engine-flink/procedures.md +++ b/website/docs/engine-flink/procedures.md @@ -162,23 +162,23 @@ CALL sys.list_acl( Fluss provides procedures to dynamically manage cluster configurations without requiring a server restart. -### get_cluster_config +### get_cluster_configs Retrieve cluster configuration values. **Syntax:** ```sql --- Get a specific configuration -CALL [catalog_name.]sys.get_cluster_config(config_key => 'STRING') +-- Get multiple configurations +CALL [catalog_name.]sys.get_cluster_configs(config_keys => 'key1' [, 'key2', ...]) -- Get all cluster configurations -CALL [catalog_name.]sys.get_cluster_config() +CALL [catalog_name.]sys.get_cluster_configs() ``` **Parameters:** -- `config_key` (optional): The configuration key to retrieve. If omitted, returns all cluster configurations. +- `config_keys` (optional): The configuration keys to retrieve. If omitted, returns all cluster configurations. **Returns:** A table with columns: - `config_key`: The configuration key name @@ -192,35 +192,35 @@ CALL [catalog_name.]sys.get_cluster_config() USE fluss_catalog; -- Get a specific configuration -CALL sys.get_cluster_config( - config_key => 'kv.rocksdb.shared-rate-limiter.bytes-per-sec' +CALL sys.get_cluster_configs( + config_keys => 'kv.rocksdb.shared-rate-limiter.bytes-per-sec' +); + +-- Get multiple configuration +CALL sys.get_cluster_configs( + config_keys => 'kv.rocksdb.shared-rate-limiter.bytes-per-sec', 'datalake.format' ); -- Get all cluster configurations -CALL sys.get_cluster_config(); +CALL sys.get_cluster_configs(); ``` -### set_cluster_config +### set_cluster_configs -Set or delete a cluster configuration dynamically. +Set cluster configurations dynamically. **Syntax:** ```sql --- Set a configuration value -CALL [catalog_name.]sys.set_cluster_config( - config_key => 'STRING', - config_value => 'STRING' +-- Set configuration values +CALL [catalog_name.]sys.set_cluster_configs( + config_pairs => 'key1', 'value1' [, 'key2', 'value2' ...] ) - --- Delete a configuration (reset to default) -CALL [catalog_name.]sys.set_cluster_config(config_key => 'STRING') ``` **Parameters:** -- `config_key` (required): The configuration key to modify. -- `config_value` (optional): The new value to set. If omitted or empty, the configuration is deleted (reset to default). +- `config_pairs`(required): For key-value pairs in configuration items, the number of parameters must be even. **Important Notes:** @@ -236,20 +236,51 @@ CALL [catalog_name.]sys.set_cluster_config(config_key => 'STRING') USE fluss_catalog; -- Set RocksDB rate limiter -CALL sys.set_cluster_config( - config_key => 'kv.rocksdb.shared-rate-limiter.bytes-per-sec', - config_value => '200MB' +CALL sys.set_cluster_configs( + config_pairs => 'kv.rocksdb.shared-rate-limiter.bytes-per-sec', '200MB' ); --- Set datalake format -CALL sys.set_cluster_config( - config_key => 'datalake.format', - config_value => 'paimon' +-- Set RocksDB rate limiter and datalake format +CALL sys.set_cluster_configs( + config_pairs => 'kv.rocksdb.shared-rate-limiter.bytes-per-sec', '200MB', 'datalake.format','paimon' ); +``` --- Delete a configuration (reset to default) -CALL sys.set_cluster_config( - config_key => 'kv.rocksdb.shared-rate-limiter.bytes-per-sec' -); +### reset_cluster_configs + +reset cluster configurations dynamically. + +**Syntax:** + +```sql +-- reset configuration values +CALL [catalog_name.]sys.reset_cluster_configs(config_keys => 'key1' [, 'key2', ...]) ``` +**Parameters:** + +- `config_keys`(required): The configuration keys to reset. + +**Important Notes:** + +- Changes are validated before being applied and persisted in ZooKeeper +- Changes are automatically applied to all servers (Coordinator and TabletServers) +- Changes survive server restarts +- Not all configurations support dynamic changes. The server will reject invalid modifications + +**Example:** + +```sql title="Flink SQL" +-- Use the Fluss catalog (replace 'fluss_catalog' with your catalog name if different) +USE fluss_catalog; + +-- Reset a specific configuration +CALL sys.reset_cluster_configs( + config_keys => 'kv.rocksdb.shared-rate-limiter.bytes-per-sec' +); + +-- Reset RocksDB rate limiter and datalake format +CALL sys.reset_cluster_configs( + config_keys => 'kv.rocksdb.shared-rate-limiter.bytes-per-sec', 'datalake.format' +); +``` \ No newline at end of file From 053dd5da558e74be63993ea3cc7ea23895e4c489 Mon Sep 17 00:00:00 2001 From: Pei Yu <125331682@qq.com> Date: Tue, 6 Jan 2026 21:50:09 +0800 Subject: [PATCH 3/4] fix comment Signed-off-by: Pei Yu <125331682@qq.com> --- .../fluss/flink/procedure/ResetClusterConfigsProcedure.java | 2 +- website/docs/engine-flink/procedures.md | 6 ------ 2 files changed, 1 insertion(+), 7 deletions(-) diff --git a/fluss-flink/fluss-flink-common/src/main/java/org/apache/fluss/flink/procedure/ResetClusterConfigsProcedure.java b/fluss-flink/fluss-flink-common/src/main/java/org/apache/fluss/flink/procedure/ResetClusterConfigsProcedure.java index 6dafff8a4a..88e0c297b6 100644 --- a/fluss-flink/fluss-flink-common/src/main/java/org/apache/fluss/flink/procedure/ResetClusterConfigsProcedure.java +++ b/fluss-flink/fluss-flink-common/src/main/java/org/apache/fluss/flink/procedure/ResetClusterConfigsProcedure.java @@ -69,7 +69,7 @@ public String[] call(ProcedureContext context, String... configKeys) throws Exce } List configList = new ArrayList<>(); - List resultMessage = new ArrayList(); + List resultMessage = new ArrayList<>(); for (String key : configKeys) { String configKey = key.trim(); diff --git a/website/docs/engine-flink/procedures.md b/website/docs/engine-flink/procedures.md index 3f346c03c7..eeab4496fd 100644 --- a/website/docs/engine-flink/procedures.md +++ b/website/docs/engine-flink/procedures.md @@ -261,12 +261,6 @@ CALL [catalog_name.]sys.reset_cluster_configs(config_keys => 'key1' [, 'key2', . - `config_keys`(required): The configuration keys to reset. -**Important Notes:** - -- Changes are validated before being applied and persisted in ZooKeeper -- Changes are automatically applied to all servers (Coordinator and TabletServers) -- Changes survive server restarts -- Not all configurations support dynamic changes. The server will reject invalid modifications **Example:** From c1f8a3c8df7b28bfeadb133bb4e4533659f5374d Mon Sep 17 00:00:00 2001 From: peiyu <125331682@qq.com> Date: Wed, 7 Jan 2026 15:42:05 +0800 Subject: [PATCH 4/4] fix comment Signed-off-by: peiyu <125331682@qq.com> --- .../apache/fluss/flink/procedure/FlinkProcedureITCase.java | 6 ++++-- 1 file changed, 4 insertions(+), 2 deletions(-) diff --git a/fluss-flink/fluss-flink-common/src/test/java/org/apache/fluss/flink/procedure/FlinkProcedureITCase.java b/fluss-flink/fluss-flink-common/src/test/java/org/apache/fluss/flink/procedure/FlinkProcedureITCase.java index da40bca450..b824111c3e 100644 --- a/fluss-flink/fluss-flink-common/src/test/java/org/apache/fluss/flink/procedure/FlinkProcedureITCase.java +++ b/fluss-flink/fluss-flink-common/src/test/java/org/apache/fluss/flink/procedure/FlinkProcedureITCase.java @@ -435,7 +435,7 @@ void testResetClusterConfigs() throws Exception { } @Test - void testSetClusterConfigValidation() throws Exception { + void testSetClusterConfigsValidation() throws Exception { // Try to set an invalid config (not allowed for dynamic change) assertThatThrownBy( () -> @@ -445,6 +445,7 @@ void testSetClusterConfigValidation() throws Exception { CATALOG_NAME)) .await()) .rootCause() + // TODO: Fix misleading error: non-existent key reported as not allowed. .hasMessageContaining( "The config key invalid.config.key is not allowed to be changed dynamically"); @@ -492,7 +493,7 @@ void testSetClusterConfigValidation() throws Exception { } @Test - void testResetClusterConfigValidation() throws Exception { + void testResetClusterConfigsValidation() throws Exception { // Try to reset an invalid config assertThatThrownBy( () -> @@ -502,6 +503,7 @@ void testResetClusterConfigValidation() throws Exception { CATALOG_NAME)) .await()) .rootCause() + // TODO: Fix misleading error: non-existent key reported as not allowed. .hasMessageContaining( "The config key invalid.config.key is not allowed to be changed dynamically");