configEntryMap =
+ configs.stream()
+ .collect(Collectors.toMap(ConfigEntry::key, Function.identity()));
+ for (String key : configKeys) {
+ ConfigEntry entry = configEntryMap.get(key);
+ if (null != entry) {
results.add(
Row.of(
entry.key(),
@@ -97,7 +109,8 @@ private Row[] getConfigs(@Nullable String configKey) throws Exception {
entry.source() != null
? entry.source().name()
: "UNKNOWN"));
- break;
+ } else {
+ results.add(Row.of());
}
}
}
diff --git a/fluss-flink/fluss-flink-common/src/main/java/org/apache/fluss/flink/procedure/ProcedureManager.java b/fluss-flink/fluss-flink-common/src/main/java/org/apache/fluss/flink/procedure/ProcedureManager.java
index 12d4cf3675..7ebf73b818 100644
--- a/fluss-flink/fluss-flink-common/src/main/java/org/apache/fluss/flink/procedure/ProcedureManager.java
+++ b/fluss-flink/fluss-flink-common/src/main/java/org/apache/fluss/flink/procedure/ProcedureManager.java
@@ -70,8 +70,9 @@ private enum ProcedureEnum {
ADD_ACL("sys.add_acl", AddAclProcedure.class),
DROP_ACL("sys.drop_acl", DropAclProcedure.class),
List_ACL("sys.list_acl", ListAclProcedure.class),
- SET_CLUSTER_CONFIG("sys.set_cluster_config", SetClusterConfigProcedure.class),
- GET_CLUSTER_CONFIG("sys.get_cluster_config", GetClusterConfigProcedure.class),
+ SET_CLUSTER_CONFIGS("sys.set_cluster_configs", SetClusterConfigsProcedure.class),
+ GET_CLUSTER_CONFIGS("sys.get_cluster_configs", GetClusterConfigsProcedure.class),
+ RESET_CLUSTER_CONFIGS("sys.reset_cluster_configs", ResetClusterConfigsProcedure.class),
ADD_SERVER_TAG("sys.add_server_tag", AddServerTagProcedure.class),
REMOVE_SERVER_TAG("sys.remove_server_tag", RemoveServerTagProcedure.class);
diff --git a/fluss-flink/fluss-flink-common/src/main/java/org/apache/fluss/flink/procedure/ResetClusterConfigsProcedure.java b/fluss-flink/fluss-flink-common/src/main/java/org/apache/fluss/flink/procedure/ResetClusterConfigsProcedure.java
new file mode 100644
index 0000000000..745e55f17c
--- /dev/null
+++ b/fluss-flink/fluss-flink-common/src/main/java/org/apache/fluss/flink/procedure/ResetClusterConfigsProcedure.java
@@ -0,0 +1,108 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.fluss.flink.procedure;
+
+import org.apache.fluss.config.cluster.AlterConfig;
+import org.apache.fluss.config.cluster.AlterConfigOpType;
+
+import org.apache.flink.table.annotation.ArgumentHint;
+import org.apache.flink.table.annotation.DataTypeHint;
+import org.apache.flink.table.annotation.ProcedureHint;
+import org.apache.flink.table.procedure.ProcedureContext;
+
+import java.util.ArrayList;
+import java.util.List;
+
+/**
+ * Procedure to reset cluster configuration dynamically.
+ *
+ * This procedure allows modifying dynamic cluster configurations. The changes are:
+ *
+ *
+ * - Validated by the CoordinatorServer before persistence
+ *
- Persisted in ZooKeeper for durability
+ *
- Applied to all relevant servers (Coordinator and TabletServers)
+ *
- Survive server restarts
+ *
+ *
+ * Usage examples:
+ *
+ *
+ * -- reset a configuration
+ * CALL sys.reset_cluster_configs('kv.rocksdb.shared-rate-limiter.bytes-per-sec');
+ *
+ * -- reset multiple configurations at one time
+ * CALL sys.reset_cluster_configs('kv.rocksdb.shared-rate-limiter.bytes-per-sec','datalake.format');
+ *
+ *
+ *
+ * Note: Not all configurations support dynamic changes. The server will validate the
+ * change and reject it if the configuration cannot be reset dynamically.
+ */
+public class ResetClusterConfigsProcedure extends ProcedureBase {
+
+ @ProcedureHint(
+ argument = {@ArgumentHint(name = "config_keys", type = @DataTypeHint("STRING"))},
+ isVarArgs = true)
+ public String[] call(ProcedureContext context, String... configKeys) throws Exception {
+ try {
+ // Validate config key
+ if (configKeys.length == 0) {
+ throw new IllegalArgumentException(
+ "config_pairs cannot be null or empty. "
+ + "Please specify valid configuration keys.");
+ }
+
+ List configList = new ArrayList<>();
+ StringBuilder resultMessage = new StringBuilder();
+
+ for (String key : configKeys) {
+ String configKey = key.trim();
+ if (configKey.isEmpty()) {
+ throw new IllegalArgumentException(
+ "Config key cannot be null or empty. "
+ + "Please specify valid configuration key.");
+ }
+
+ String operationDesc = "deleted (reset to default)";
+
+ AlterConfig alterConfig =
+ new AlterConfig(configKey, null, AlterConfigOpType.DELETE);
+ configList.add(alterConfig);
+ resultMessage.append(
+ String.format(
+ "Successfully %s configuration '%s'. ", operationDesc, configKey));
+ }
+
+ // Call Admin API to modify cluster configuration
+ // This will trigger validation on CoordinatorServer before persistence
+ admin.alterClusterConfigs(configList).get();
+
+ return new String[] {
+ resultMessage + "The change is persisted in ZooKeeper and applied to all servers."
+ };
+ } catch (IllegalArgumentException e) {
+ // Re-throw validation errors with original message
+ throw e;
+ } catch (Exception e) {
+ // Wrap other exceptions with more context
+ throw new RuntimeException(
+ String.format("Failed to reset cluster config: %s", e.getMessage()), e);
+ }
+ }
+}
diff --git a/fluss-flink/fluss-flink-common/src/main/java/org/apache/fluss/flink/procedure/SetClusterConfigProcedure.java b/fluss-flink/fluss-flink-common/src/main/java/org/apache/fluss/flink/procedure/SetClusterConfigsProcedure.java
similarity index 50%
rename from fluss-flink/fluss-flink-common/src/main/java/org/apache/fluss/flink/procedure/SetClusterConfigProcedure.java
rename to fluss-flink/fluss-flink-common/src/main/java/org/apache/fluss/flink/procedure/SetClusterConfigsProcedure.java
index ee4dbcf37e..4a9e0266a8 100644
--- a/fluss-flink/fluss-flink-common/src/main/java/org/apache/fluss/flink/procedure/SetClusterConfigProcedure.java
+++ b/fluss-flink/fluss-flink-common/src/main/java/org/apache/fluss/flink/procedure/SetClusterConfigsProcedure.java
@@ -25,9 +25,8 @@
import org.apache.flink.table.annotation.ProcedureHint;
import org.apache.flink.table.procedure.ProcedureContext;
-import javax.annotation.Nullable;
-
-import java.util.Collections;
+import java.util.ArrayList;
+import java.util.List;
/**
* Procedure to set or delete cluster configuration dynamically.
@@ -45,84 +44,76 @@
*
*
* -- Set a configuration
- * CALL sys.set_cluster_config('kv.rocksdb.shared-rate-limiter.bytes-per-sec', '200MB');
- * CALL sys.set_cluster_config('datalake.format', 'paimon');
+ * CALL sys.set_cluster_configs('kv.rocksdb.shared-rate-limiter.bytes-per-sec', '200MB');
+ * CALL sys.set_cluster_configs('datalake.format', 'paimon');
+ *
+ * -- Set multiple configurations at one time
+ * CALL sys.set_cluster_configs('kv.rocksdb.shared-rate-limiter.bytes-per-sec', '200MB','datalake.format', 'paimon');
*
* -- Delete a configuration (reset to default)
- * CALL sys.set_cluster_config('kv.rocksdb.shared-rate-limiter.bytes-per-sec', NULL);
- * CALL sys.set_cluster_config('kv.rocksdb.shared-rate-limiter.bytes-per-sec', '');
+ * CALL sys.set_cluster_configs('kv.rocksdb.shared-rate-limiter.bytes-per-sec', '');
*
*
* Note: Not all configurations support dynamic changes. The server will validate the
* change and reject it if the configuration cannot be modified dynamically or if the new value is
* invalid.
*/
-public class SetClusterConfigProcedure extends ProcedureBase {
-
- @ProcedureHint(argument = {@ArgumentHint(name = "config_key", type = @DataTypeHint("STRING"))})
- public String[] call(ProcedureContext context, String configKey) throws Exception {
- return performSet(configKey, null);
- }
+public class SetClusterConfigsProcedure extends ProcedureBase {
@ProcedureHint(
- argument = {
- @ArgumentHint(name = "config_key", type = @DataTypeHint("STRING")),
- @ArgumentHint(name = "config_value", type = @DataTypeHint("STRING"))
- })
- public String[] call(ProcedureContext context, String configKey, String configValue)
- throws Exception {
- return performSet(configKey, configValue);
- }
-
- private String[] performSet(String configKey, @Nullable String configValue) throws Exception {
-
+ argument = {@ArgumentHint(name = "config_pairs", type = @DataTypeHint("STRING"))},
+ isVarArgs = true)
+ public String[] call(ProcedureContext context, String... configPairs) throws Exception {
try {
// Validate config key
- if (configKey == null || configKey.trim().isEmpty()) {
+ if (configPairs.length == 0) {
throw new IllegalArgumentException(
- "Config key cannot be null or empty. "
- + "Please specify a valid configuration key.");
+ "config_pairs cannot be null or empty. "
+ + "Please specify a valid configuration pairs.");
}
- configKey = configKey.trim();
-
- // Determine operation type
- AlterConfigOpType opType;
- String operationDesc;
-
- if (configValue == null || configValue.trim().isEmpty()) {
- // Delete operation - reset to default
- opType = AlterConfigOpType.DELETE;
- operationDesc = "deleted (reset to default)";
- } else {
- // Set operation
- opType = AlterConfigOpType.SET;
- operationDesc = String.format("set to '%s'", configValue);
+ if (configPairs.length % 2 != 0) {
+ throw new IllegalArgumentException(
+ "config_pairs must be set in pairs. "
+ + "Please specify a valid configuration pairs.");
+ }
+ List configList = new ArrayList<>();
+ StringBuilder resultMessage = new StringBuilder();
+
+ for (int i = 0; i < configPairs.length; i += 2) {
+ String configKey = configPairs[i].trim();
+ if (configKey.isEmpty()) {
+ throw new IllegalArgumentException(
+ "Config key cannot be null or empty. "
+ + "Please specify a valid configuration key.");
+ }
+ String configValue = configPairs[i + 1];
+
+ String operationDesc = String.format("set to '%s'", configValue);
+
+ // Construct configuration modification operation.
+ AlterConfig alterConfig =
+ new AlterConfig(configKey, configValue, AlterConfigOpType.SET);
+ configList.add(alterConfig);
+ resultMessage.append(
+ String.format(
+ "Successfully %s configuration '%s'. ", operationDesc, configKey));
}
-
- // Construct configuration modification operation.
- AlterConfig alterConfig = new AlterConfig(configKey, configValue, opType);
// Call Admin API to modify cluster configuration
// This will trigger validation on CoordinatorServer before persistence
- admin.alterClusterConfigs(Collections.singletonList(alterConfig)).get();
+ admin.alterClusterConfigs(configList).get();
return new String[] {
- String.format(
- "Successfully %s configuration '%s'. "
- + "The change is persisted in ZooKeeper and applied to all servers.",
- operationDesc, configKey)
+ resultMessage + "The change is persisted in ZooKeeper and applied to all servers."
};
-
} catch (IllegalArgumentException e) {
// Re-throw validation errors with original message
throw e;
} catch (Exception e) {
// Wrap other exceptions with more context
throw new RuntimeException(
- String.format(
- "Failed to set cluster config '%s': %s", configKey, e.getMessage()),
- e);
+ String.format("Failed to set cluster config: %s", e.getMessage()), e);
}
}
}
diff --git a/fluss-flink/fluss-flink-common/src/test/java/org/apache/fluss/flink/procedure/FlinkProcedureITCase.java b/fluss-flink/fluss-flink-common/src/test/java/org/apache/fluss/flink/procedure/FlinkProcedureITCase.java
index b764923050..9d3ecfa812 100644
--- a/fluss-flink/fluss-flink-common/src/test/java/org/apache/fluss/flink/procedure/FlinkProcedureITCase.java
+++ b/fluss-flink/fluss-flink-common/src/test/java/org/apache/fluss/flink/procedure/FlinkProcedureITCase.java
@@ -100,9 +100,10 @@ void testShowProcedures() throws Exception {
Arrays.asList(
"+I[sys.add_acl]",
"+I[sys.drop_acl]",
- "+I[sys.get_cluster_config]",
+ "+I[sys.get_cluster_configs]",
"+I[sys.list_acl]",
- "+I[sys.set_cluster_config]",
+ "+I[sys.set_cluster_configs]",
+ "+I[sys.reset_cluster_configs]",
"+I[sys.add_server_tag]",
"+I[sys.remove_server_tag]");
// make sure no more results is unread.
@@ -265,12 +266,12 @@ void testDisableAuthorization() throws Exception {
}
@Test
- void testGetClusterConfig() throws Exception {
+ void testGetClusterConfigs() throws Exception {
// Get specific config
try (CloseableIterator resultIterator =
tEnv.executeSql(
String.format(
- "Call %s.sys.get_cluster_config('%s')",
+ "Call %s.sys.get_cluster_configs('%s')",
CATALOG_NAME,
ConfigOptions.KV_SHARED_RATE_LIMITER_BYTES_PER_SEC.key()))
.collect()) {
@@ -285,7 +286,7 @@ void testGetClusterConfig() throws Exception {
// Get all configs
try (CloseableIterator resultIterator =
- tEnv.executeSql(String.format("Call %s.sys.get_cluster_config()", CATALOG_NAME))
+ tEnv.executeSql(String.format("Call %s.sys.get_cluster_configs()", CATALOG_NAME))
.collect()) {
List results = CollectionUtil.iteratorToList(resultIterator);
assertThat(results).isNotEmpty();
@@ -295,7 +296,7 @@ void testGetClusterConfig() throws Exception {
try (CloseableIterator resultIterator =
tEnv.executeSql(
String.format(
- "Call %s.sys.get_cluster_config('non.existent.config')",
+ "Call %s.sys.get_cluster_configs('non.existent.config')",
CATALOG_NAME))
.collect()) {
List results = CollectionUtil.iteratorToList(resultIterator);
@@ -305,19 +306,19 @@ void testGetClusterConfig() throws Exception {
// reset cluster configs.
tEnv.executeSql(
String.format(
- "Call %s.sys.set_cluster_config('%s')",
+ "Call %s.sys.reset_cluster_configs('%s')",
CATALOG_NAME,
ConfigOptions.KV_SHARED_RATE_LIMITER_BYTES_PER_SEC.key()))
.await();
}
@Test
- void testSetClusterConfig() throws Exception {
+ void testSetClusterConfigs() throws Exception {
// Test setting a valid config
try (CloseableIterator resultIterator =
tEnv.executeSql(
String.format(
- "Call %s.sys.set_cluster_config('%s', '200MB')",
+ "Call %s.sys.set_cluster_configs('%s', '200MB')",
CATALOG_NAME,
ConfigOptions.KV_SHARED_RATE_LIMITER_BYTES_PER_SEC.key()))
.collect()) {
@@ -329,50 +330,74 @@ void testSetClusterConfig() throws Exception {
.contains(ConfigOptions.KV_SHARED_RATE_LIMITER_BYTES_PER_SEC.key());
}
+ // Test setting multiple config
+ try (CloseableIterator resultIterator =
+ tEnv.executeSql(
+ String.format(
+ "Call %s.sys.set_cluster_configs('%s', '300MB', '%s', 'paimon')",
+ CATALOG_NAME,
+ ConfigOptions.KV_SHARED_RATE_LIMITER_BYTES_PER_SEC.key(),
+ ConfigOptions.DATALAKE_FORMAT.key()))
+ .collect()) {
+ List results = CollectionUtil.iteratorToList(resultIterator);
+ assertThat(results).hasSize(1);
+ assertThat(results.get(0).getField(0))
+ .asString()
+ .contains("Successfully set to '300MB'")
+ .contains(ConfigOptions.KV_SHARED_RATE_LIMITER_BYTES_PER_SEC.key())
+ .contains("Successfully set to 'paimon'")
+ .contains(ConfigOptions.DATALAKE_FORMAT.key());
+ }
+
// Verify the config was actually set
try (CloseableIterator resultIterator =
tEnv.executeSql(
String.format(
- "Call %s.sys.get_cluster_config('%s')",
+ "Call %s.sys.get_cluster_configs('%s')",
CATALOG_NAME,
ConfigOptions.KV_SHARED_RATE_LIMITER_BYTES_PER_SEC.key()))
.collect()) {
List results = CollectionUtil.iteratorToList(resultIterator);
assertThat(results).hasSize(1);
- assertThat(results.get(0).getField(1)).isEqualTo("200MB");
+ assertThat(results.get(0).getField(1)).isEqualTo("300MB");
}
// reset cluster configs.
tEnv.executeSql(
String.format(
- "Call %s.sys.set_cluster_config('%s')",
+ "Call %s.sys.reset_cluster_configs('%s')",
CATALOG_NAME,
ConfigOptions.KV_SHARED_RATE_LIMITER_BYTES_PER_SEC.key()))
.await();
}
@Test
- void testDeleteClusterConfig() throws Exception {
+ void testResetClusterConfigs() throws Exception {
// First set a config
tEnv.executeSql(
String.format(
- "Call %s.sys.set_cluster_config('%s', 'paimon')",
- CATALOG_NAME, ConfigOptions.DATALAKE_FORMAT.key()))
+ "Call %s.sys.set_cluster_configs('%s', 'paimon', '%s', '200MB')",
+ CATALOG_NAME,
+ ConfigOptions.DATALAKE_FORMAT.key(),
+ ConfigOptions.KV_SHARED_RATE_LIMITER_BYTES_PER_SEC.key()))
.await();
- // Delete the config (reset to default) - omitting the value parameter
+ // Delete the config (reset to default)
try (CloseableIterator resultIterator =
tEnv.executeSql(
String.format(
- "Call %s.sys.set_cluster_config('%s')",
- CATALOG_NAME, ConfigOptions.DATALAKE_FORMAT.key()))
+ "Call %s.sys.reset_cluster_configs('%s', '%s')",
+ CATALOG_NAME,
+ ConfigOptions.DATALAKE_FORMAT.key(),
+ ConfigOptions.KV_SHARED_RATE_LIMITER_BYTES_PER_SEC.key()))
.collect()) {
List results = CollectionUtil.iteratorToList(resultIterator);
assertThat(results).hasSize(1);
assertThat(results.get(0).getField(0))
.asString()
.contains("Successfully deleted")
- .contains(ConfigOptions.DATALAKE_FORMAT.key());
+ .contains(ConfigOptions.DATALAKE_FORMAT.key())
+ .contains(ConfigOptions.KV_SHARED_RATE_LIMITER_BYTES_PER_SEC.key());
}
}
@@ -383,12 +408,27 @@ void testSetClusterConfigValidation() throws Exception {
() ->
tEnv.executeSql(
String.format(
- "Call %s.sys.set_cluster_config('invalid.config.key', 'value')",
+ "Call %s.sys.set_cluster_configs('invalid.config.key', 'value')",
CATALOG_NAME))
.await())
.rootCause()
.hasMessageContaining(
"The config key invalid.config.key is not allowed to be changed dynamically");
+
+ // validation to ensure an even number of arguments are passed
+ assertThatThrownBy(
+ () ->
+ tEnv.executeSql(
+ String.format(
+ "Call %s.sys.set_cluster_configs('%s')",
+ CATALOG_NAME,
+ ConfigOptions
+ .KV_SHARED_RATE_LIMITER_BYTES_PER_SEC
+ .key()))
+ .await())
+ .rootCause()
+ .hasMessageContaining(
+ "config_pairs must be set in pairs. Please specify a valid configuration pairs.");
}
@ParameterizedTest
From 0ae691e52454c14283716e21c5c11e7190b54562 Mon Sep 17 00:00:00 2001
From: Pei Yu <125331682@qq.com>
Date: Mon, 5 Jan 2026 23:29:28 +0800
Subject: [PATCH 2/4] Refactor CALL procedure "sys.xxx_cluster_config" to
support multiple conifg keys
Signed-off-by: Pei Yu <125331682@qq.com>
---
.../procedure/GetClusterConfigsProcedure.java | 8 +-
.../ResetClusterConfigsProcedure.java | 22 ++--
.../procedure/SetClusterConfigsProcedure.java | 11 +-
.../flink/procedure/FlinkProcedureITCase.java | 101 ++++++++++++++++--
website/docs/engine-flink/procedures.md | 91 ++++++++++------
5 files changed, 171 insertions(+), 62 deletions(-)
diff --git a/fluss-flink/fluss-flink-common/src/main/java/org/apache/fluss/flink/procedure/GetClusterConfigsProcedure.java b/fluss-flink/fluss-flink-common/src/main/java/org/apache/fluss/flink/procedure/GetClusterConfigsProcedure.java
index ff2688bc86..0bdcfaff8b 100644
--- a/fluss-flink/fluss-flink-common/src/main/java/org/apache/fluss/flink/procedure/GetClusterConfigsProcedure.java
+++ b/fluss-flink/fluss-flink-common/src/main/java/org/apache/fluss/flink/procedure/GetClusterConfigsProcedure.java
@@ -68,13 +68,13 @@ public Row[] call(ProcedureContext context) throws Exception {
}
@ProcedureHint(
- argument = {@ArgumentHint(name = "config_key", type = @DataTypeHint("STRING"))},
+ argument = {@ArgumentHint(name = "config_keys", type = @DataTypeHint("STRING"))},
isVarArgs = true,
output =
@DataTypeHint(
"ROW"))
- public Row[] call(ProcedureContext context, String... configKey) throws Exception {
- return getConfigs(configKey);
+ public Row[] call(ProcedureContext context, String... configKeys) throws Exception {
+ return getConfigs(configKeys);
}
private Row[] getConfigs(@Nullable String... configKeys) throws Exception {
@@ -109,8 +109,6 @@ private Row[] getConfigs(@Nullable String... configKeys) throws Exception {
entry.source() != null
? entry.source().name()
: "UNKNOWN"));
- } else {
- results.add(Row.of());
}
}
}
diff --git a/fluss-flink/fluss-flink-common/src/main/java/org/apache/fluss/flink/procedure/ResetClusterConfigsProcedure.java b/fluss-flink/fluss-flink-common/src/main/java/org/apache/fluss/flink/procedure/ResetClusterConfigsProcedure.java
index 745e55f17c..6dafff8a4a 100644
--- a/fluss-flink/fluss-flink-common/src/main/java/org/apache/fluss/flink/procedure/ResetClusterConfigsProcedure.java
+++ b/fluss-flink/fluss-flink-common/src/main/java/org/apache/fluss/flink/procedure/ResetClusterConfigsProcedure.java
@@ -29,9 +29,9 @@
import java.util.List;
/**
- * Procedure to reset cluster configuration dynamically.
+ * Procedure to reset cluster configurations to their default values.
*
- * This procedure allows modifying dynamic cluster configurations. The changes are:
+ *
This procedure reverts the configurations to their initial system defaults. The changes are:
*
*
* - Validated by the CoordinatorServer before persistence
@@ -47,12 +47,12 @@
* CALL sys.reset_cluster_configs('kv.rocksdb.shared-rate-limiter.bytes-per-sec');
*
* -- reset multiple configurations at one time
- * CALL sys.reset_cluster_configs('kv.rocksdb.shared-rate-limiter.bytes-per-sec','datalake.format');
+ * CALL sys.reset_cluster_configs('kv.rocksdb.shared-rate-limiter.bytes-per-sec', 'datalake.format');
*
*
*
- *
Note: Not all configurations support dynamic changes. The server will validate the
- * change and reject it if the configuration cannot be reset dynamically.
+ *
Note: In theory, an operation like Reset to default value should always succeed,
+ * as the default value should be a valid one
*/
public class ResetClusterConfigsProcedure extends ProcedureBase {
@@ -64,19 +64,19 @@ public String[] call(ProcedureContext context, String... configKeys) throws Exce
// Validate config key
if (configKeys.length == 0) {
throw new IllegalArgumentException(
- "config_pairs cannot be null or empty. "
+ "config_keys cannot be null or empty. "
+ "Please specify valid configuration keys.");
}
List configList = new ArrayList<>();
- StringBuilder resultMessage = new StringBuilder();
+ List resultMessage = new ArrayList();
for (String key : configKeys) {
String configKey = key.trim();
if (configKey.isEmpty()) {
throw new IllegalArgumentException(
"Config key cannot be null or empty. "
- + "Please specify valid configuration key.");
+ + "Please specify a valid configuration key.");
}
String operationDesc = "deleted (reset to default)";
@@ -84,7 +84,7 @@ public String[] call(ProcedureContext context, String... configKeys) throws Exce
AlterConfig alterConfig =
new AlterConfig(configKey, null, AlterConfigOpType.DELETE);
configList.add(alterConfig);
- resultMessage.append(
+ resultMessage.add(
String.format(
"Successfully %s configuration '%s'. ", operationDesc, configKey));
}
@@ -93,9 +93,7 @@ public String[] call(ProcedureContext context, String... configKeys) throws Exce
// This will trigger validation on CoordinatorServer before persistence
admin.alterClusterConfigs(configList).get();
- return new String[] {
- resultMessage + "The change is persisted in ZooKeeper and applied to all servers."
- };
+ return resultMessage.toArray(new String[0]);
} catch (IllegalArgumentException e) {
// Re-throw validation errors with original message
throw e;
diff --git a/fluss-flink/fluss-flink-common/src/main/java/org/apache/fluss/flink/procedure/SetClusterConfigsProcedure.java b/fluss-flink/fluss-flink-common/src/main/java/org/apache/fluss/flink/procedure/SetClusterConfigsProcedure.java
index 4a9e0266a8..7246706af8 100644
--- a/fluss-flink/fluss-flink-common/src/main/java/org/apache/fluss/flink/procedure/SetClusterConfigsProcedure.java
+++ b/fluss-flink/fluss-flink-common/src/main/java/org/apache/fluss/flink/procedure/SetClusterConfigsProcedure.java
@@ -49,9 +49,6 @@
*
* -- Set multiple configurations at one time
* CALL sys.set_cluster_configs('kv.rocksdb.shared-rate-limiter.bytes-per-sec', '200MB','datalake.format', 'paimon');
- *
- * -- Delete a configuration (reset to default)
- * CALL sys.set_cluster_configs('kv.rocksdb.shared-rate-limiter.bytes-per-sec', '');
*
*
* Note: Not all configurations support dynamic changes. The server will validate the
@@ -78,7 +75,7 @@ public String[] call(ProcedureContext context, String... configPairs) throws Exc
+ "Please specify a valid configuration pairs.");
}
List configList = new ArrayList<>();
- StringBuilder resultMessage = new StringBuilder();
+ List resultMessage = new ArrayList<>();
for (int i = 0; i < configPairs.length; i += 2) {
String configKey = configPairs[i].trim();
@@ -95,7 +92,7 @@ public String[] call(ProcedureContext context, String... configPairs) throws Exc
AlterConfig alterConfig =
new AlterConfig(configKey, configValue, AlterConfigOpType.SET);
configList.add(alterConfig);
- resultMessage.append(
+ resultMessage.add(
String.format(
"Successfully %s configuration '%s'. ", operationDesc, configKey));
}
@@ -104,9 +101,7 @@ public String[] call(ProcedureContext context, String... configPairs) throws Exc
// This will trigger validation on CoordinatorServer before persistence
admin.alterClusterConfigs(configList).get();
- return new String[] {
- resultMessage + "The change is persisted in ZooKeeper and applied to all servers."
- };
+ return resultMessage.toArray(new String[0]);
} catch (IllegalArgumentException e) {
// Re-throw validation errors with original message
throw e;
diff --git a/fluss-flink/fluss-flink-common/src/test/java/org/apache/fluss/flink/procedure/FlinkProcedureITCase.java b/fluss-flink/fluss-flink-common/src/test/java/org/apache/fluss/flink/procedure/FlinkProcedureITCase.java
index 9d3ecfa812..da40bca450 100644
--- a/fluss-flink/fluss-flink-common/src/test/java/org/apache/fluss/flink/procedure/FlinkProcedureITCase.java
+++ b/fluss-flink/fluss-flink-common/src/test/java/org/apache/fluss/flink/procedure/FlinkProcedureITCase.java
@@ -284,6 +284,31 @@ void testGetClusterConfigs() throws Exception {
assertThat(row.getField(2)).isNotNull(); // config_source
}
+ // Get multiple config
+ try (CloseableIterator resultIterator =
+ tEnv.executeSql(
+ String.format(
+ "Call %s.sys.get_cluster_configs('%s', '%s')",
+ CATALOG_NAME,
+ ConfigOptions.KV_SHARED_RATE_LIMITER_BYTES_PER_SEC.key(),
+ ConfigOptions.DATALAKE_FORMAT.key()))
+ .collect()) {
+ List results = CollectionUtil.iteratorToList(resultIterator);
+ assertThat(results).hasSize(2);
+ // the first row
+ Row row0 = results.get(0);
+ assertThat(row0.getField(0))
+ .isEqualTo(ConfigOptions.KV_SHARED_RATE_LIMITER_BYTES_PER_SEC.key());
+ assertThat(row0.getField(1)).isEqualTo("100 mb");
+ assertThat(row0.getField(2)).isNotNull(); // config_source
+
+ // the second row
+ Row row1 = results.get(1);
+ assertThat(row1.getField(0)).isEqualTo(ConfigOptions.DATALAKE_FORMAT.key());
+ assertThat(row1.getField(1)).isEqualTo("paimon");
+ assertThat(row1.getField(2)).isNotNull(); // config_source
+ }
+
// Get all configs
try (CloseableIterator resultIterator =
tEnv.executeSql(String.format("Call %s.sys.get_cluster_configs()", CATALOG_NAME))
@@ -340,11 +365,14 @@ void testSetClusterConfigs() throws Exception {
ConfigOptions.DATALAKE_FORMAT.key()))
.collect()) {
List results = CollectionUtil.iteratorToList(resultIterator);
- assertThat(results).hasSize(1);
+ assertThat(results).hasSize(2);
assertThat(results.get(0).getField(0))
.asString()
.contains("Successfully set to '300MB'")
- .contains(ConfigOptions.KV_SHARED_RATE_LIMITER_BYTES_PER_SEC.key())
+ .contains(ConfigOptions.KV_SHARED_RATE_LIMITER_BYTES_PER_SEC.key());
+
+ assertThat(results.get(1).getField(0))
+ .asString()
.contains("Successfully set to 'paimon'")
.contains(ConfigOptions.DATALAKE_FORMAT.key());
}
@@ -365,9 +393,10 @@ void testSetClusterConfigs() throws Exception {
// reset cluster configs.
tEnv.executeSql(
String.format(
- "Call %s.sys.reset_cluster_configs('%s')",
+ "Call %s.sys.reset_cluster_configs('%s', '%s')",
CATALOG_NAME,
- ConfigOptions.KV_SHARED_RATE_LIMITER_BYTES_PER_SEC.key()))
+ ConfigOptions.KV_SHARED_RATE_LIMITER_BYTES_PER_SEC.key(),
+ ConfigOptions.DATALAKE_FORMAT.key()))
.await();
}
@@ -392,11 +421,15 @@ void testResetClusterConfigs() throws Exception {
ConfigOptions.KV_SHARED_RATE_LIMITER_BYTES_PER_SEC.key()))
.collect()) {
List results = CollectionUtil.iteratorToList(resultIterator);
- assertThat(results).hasSize(1);
+ assertThat(results).hasSize(2);
assertThat(results.get(0).getField(0))
.asString()
.contains("Successfully deleted")
- .contains(ConfigOptions.DATALAKE_FORMAT.key())
+ .contains(ConfigOptions.DATALAKE_FORMAT.key());
+
+ assertThat(results.get(1).getField(0))
+ .asString()
+ .contains("Successfully deleted")
.contains(ConfigOptions.KV_SHARED_RATE_LIMITER_BYTES_PER_SEC.key());
}
}
@@ -428,7 +461,61 @@ void testSetClusterConfigValidation() throws Exception {
.await())
.rootCause()
.hasMessageContaining(
- "config_pairs must be set in pairs. Please specify a valid configuration pairs.");
+ "config_pairs must be set in pairs. Please specify a valid configuration pairs");
+
+ // Try to no parameters passed
+ assertThatThrownBy(
+ () ->
+ tEnv.executeSql(
+ String.format(
+ "Call %s.sys.set_cluster_configs()",
+ CATALOG_NAME))
+ .await())
+ .rootCause()
+ .hasMessageContaining(
+ "config_pairs cannot be null or empty. Please specify a valid configuration pairs");
+
+ // Try to mismatched key-value pairs in the input parameters.
+ assertThatThrownBy(
+ () ->
+ tEnv.executeSql(
+ String.format(
+ "Call %s.sys.set_cluster_configs('%s', 'paimon')",
+ CATALOG_NAME,
+ ConfigOptions
+ .KV_SHARED_RATE_LIMITER_BYTES_PER_SEC
+ .key()))
+ .await())
+ .rootCause()
+ .hasMessageContaining(
+ "Could not parse value 'paimon' for key 'kv.rocksdb.shared-rate-limiter.bytes-per-sec'");
+ }
+
+ @Test
+ void testResetClusterConfigValidation() throws Exception {
+ // Try to reset an invalid config
+ assertThatThrownBy(
+ () ->
+ tEnv.executeSql(
+ String.format(
+ "Call %s.sys.reset_cluster_configs('invalid.config.key')",
+ CATALOG_NAME))
+ .await())
+ .rootCause()
+ .hasMessageContaining(
+ "The config key invalid.config.key is not allowed to be changed dynamically");
+
+ // Try to no parameters passed
+ assertThatThrownBy(
+ () ->
+ tEnv.executeSql(
+ String.format(
+ "Call %s.sys.reset_cluster_configs()",
+ CATALOG_NAME))
+ .await())
+ .rootCause()
+ .hasMessageContaining(
+ "config_keys cannot be null or empty. Please specify valid configuration keys");
}
@ParameterizedTest
diff --git a/website/docs/engine-flink/procedures.md b/website/docs/engine-flink/procedures.md
index 95e8724eb0..3f346c03c7 100644
--- a/website/docs/engine-flink/procedures.md
+++ b/website/docs/engine-flink/procedures.md
@@ -162,23 +162,23 @@ CALL sys.list_acl(
Fluss provides procedures to dynamically manage cluster configurations without requiring a server restart.
-### get_cluster_config
+### get_cluster_configs
Retrieve cluster configuration values.
**Syntax:**
```sql
--- Get a specific configuration
-CALL [catalog_name.]sys.get_cluster_config(config_key => 'STRING')
+-- Get multiple configurations
+CALL [catalog_name.]sys.get_cluster_configs(config_keys => 'key1' [, 'key2', ...])
-- Get all cluster configurations
-CALL [catalog_name.]sys.get_cluster_config()
+CALL [catalog_name.]sys.get_cluster_configs()
```
**Parameters:**
-- `config_key` (optional): The configuration key to retrieve. If omitted, returns all cluster configurations.
+- `config_keys` (optional): The configuration keys to retrieve. If omitted, returns all cluster configurations.
**Returns:** A table with columns:
- `config_key`: The configuration key name
@@ -192,35 +192,35 @@ CALL [catalog_name.]sys.get_cluster_config()
USE fluss_catalog;
-- Get a specific configuration
-CALL sys.get_cluster_config(
- config_key => 'kv.rocksdb.shared-rate-limiter.bytes-per-sec'
+CALL sys.get_cluster_configs(
+ config_keys => 'kv.rocksdb.shared-rate-limiter.bytes-per-sec'
+);
+
+-- Get multiple configuration
+CALL sys.get_cluster_configs(
+ config_keys => 'kv.rocksdb.shared-rate-limiter.bytes-per-sec', 'datalake.format'
);
-- Get all cluster configurations
-CALL sys.get_cluster_config();
+CALL sys.get_cluster_configs();
```
-### set_cluster_config
+### set_cluster_configs
-Set or delete a cluster configuration dynamically.
+Set cluster configurations dynamically.
**Syntax:**
```sql
--- Set a configuration value
-CALL [catalog_name.]sys.set_cluster_config(
- config_key => 'STRING',
- config_value => 'STRING'
+-- Set configuration values
+CALL [catalog_name.]sys.set_cluster_configs(
+ config_pairs => 'key1', 'value1' [, 'key2', 'value2' ...]
)
-
--- Delete a configuration (reset to default)
-CALL [catalog_name.]sys.set_cluster_config(config_key => 'STRING')
```
**Parameters:**
-- `config_key` (required): The configuration key to modify.
-- `config_value` (optional): The new value to set. If omitted or empty, the configuration is deleted (reset to default).
+- `config_pairs`(required): For key-value pairs in configuration items, the number of parameters must be even.
**Important Notes:**
@@ -236,20 +236,51 @@ CALL [catalog_name.]sys.set_cluster_config(config_key => 'STRING')
USE fluss_catalog;
-- Set RocksDB rate limiter
-CALL sys.set_cluster_config(
- config_key => 'kv.rocksdb.shared-rate-limiter.bytes-per-sec',
- config_value => '200MB'
+CALL sys.set_cluster_configs(
+ config_pairs => 'kv.rocksdb.shared-rate-limiter.bytes-per-sec', '200MB'
);
--- Set datalake format
-CALL sys.set_cluster_config(
- config_key => 'datalake.format',
- config_value => 'paimon'
+-- Set RocksDB rate limiter and datalake format
+CALL sys.set_cluster_configs(
+ config_pairs => 'kv.rocksdb.shared-rate-limiter.bytes-per-sec', '200MB', 'datalake.format','paimon'
);
+```
--- Delete a configuration (reset to default)
-CALL sys.set_cluster_config(
- config_key => 'kv.rocksdb.shared-rate-limiter.bytes-per-sec'
-);
+### reset_cluster_configs
+
+reset cluster configurations dynamically.
+
+**Syntax:**
+
+```sql
+-- reset configuration values
+CALL [catalog_name.]sys.reset_cluster_configs(config_keys => 'key1' [, 'key2', ...])
```
+**Parameters:**
+
+- `config_keys`(required): The configuration keys to reset.
+
+**Important Notes:**
+
+- Changes are validated before being applied and persisted in ZooKeeper
+- Changes are automatically applied to all servers (Coordinator and TabletServers)
+- Changes survive server restarts
+- Not all configurations support dynamic changes. The server will reject invalid modifications
+
+**Example:**
+
+```sql title="Flink SQL"
+-- Use the Fluss catalog (replace 'fluss_catalog' with your catalog name if different)
+USE fluss_catalog;
+
+-- Reset a specific configuration
+CALL sys.reset_cluster_configs(
+ config_keys => 'kv.rocksdb.shared-rate-limiter.bytes-per-sec'
+);
+
+-- Reset RocksDB rate limiter and datalake format
+CALL sys.reset_cluster_configs(
+ config_keys => 'kv.rocksdb.shared-rate-limiter.bytes-per-sec', 'datalake.format'
+);
+```
\ No newline at end of file
From 053dd5da558e74be63993ea3cc7ea23895e4c489 Mon Sep 17 00:00:00 2001
From: Pei Yu <125331682@qq.com>
Date: Tue, 6 Jan 2026 21:50:09 +0800
Subject: [PATCH 3/4] fix comment
Signed-off-by: Pei Yu <125331682@qq.com>
---
.../fluss/flink/procedure/ResetClusterConfigsProcedure.java | 2 +-
website/docs/engine-flink/procedures.md | 6 ------
2 files changed, 1 insertion(+), 7 deletions(-)
diff --git a/fluss-flink/fluss-flink-common/src/main/java/org/apache/fluss/flink/procedure/ResetClusterConfigsProcedure.java b/fluss-flink/fluss-flink-common/src/main/java/org/apache/fluss/flink/procedure/ResetClusterConfigsProcedure.java
index 6dafff8a4a..88e0c297b6 100644
--- a/fluss-flink/fluss-flink-common/src/main/java/org/apache/fluss/flink/procedure/ResetClusterConfigsProcedure.java
+++ b/fluss-flink/fluss-flink-common/src/main/java/org/apache/fluss/flink/procedure/ResetClusterConfigsProcedure.java
@@ -69,7 +69,7 @@ public String[] call(ProcedureContext context, String... configKeys) throws Exce
}
List configList = new ArrayList<>();
- List resultMessage = new ArrayList();
+ List resultMessage = new ArrayList<>();
for (String key : configKeys) {
String configKey = key.trim();
diff --git a/website/docs/engine-flink/procedures.md b/website/docs/engine-flink/procedures.md
index 3f346c03c7..eeab4496fd 100644
--- a/website/docs/engine-flink/procedures.md
+++ b/website/docs/engine-flink/procedures.md
@@ -261,12 +261,6 @@ CALL [catalog_name.]sys.reset_cluster_configs(config_keys => 'key1' [, 'key2', .
- `config_keys`(required): The configuration keys to reset.
-**Important Notes:**
-
-- Changes are validated before being applied and persisted in ZooKeeper
-- Changes are automatically applied to all servers (Coordinator and TabletServers)
-- Changes survive server restarts
-- Not all configurations support dynamic changes. The server will reject invalid modifications
**Example:**
From c1f8a3c8df7b28bfeadb133bb4e4533659f5374d Mon Sep 17 00:00:00 2001
From: peiyu <125331682@qq.com>
Date: Wed, 7 Jan 2026 15:42:05 +0800
Subject: [PATCH 4/4] fix comment
Signed-off-by: peiyu <125331682@qq.com>
---
.../apache/fluss/flink/procedure/FlinkProcedureITCase.java | 6 ++++--
1 file changed, 4 insertions(+), 2 deletions(-)
diff --git a/fluss-flink/fluss-flink-common/src/test/java/org/apache/fluss/flink/procedure/FlinkProcedureITCase.java b/fluss-flink/fluss-flink-common/src/test/java/org/apache/fluss/flink/procedure/FlinkProcedureITCase.java
index da40bca450..b824111c3e 100644
--- a/fluss-flink/fluss-flink-common/src/test/java/org/apache/fluss/flink/procedure/FlinkProcedureITCase.java
+++ b/fluss-flink/fluss-flink-common/src/test/java/org/apache/fluss/flink/procedure/FlinkProcedureITCase.java
@@ -435,7 +435,7 @@ void testResetClusterConfigs() throws Exception {
}
@Test
- void testSetClusterConfigValidation() throws Exception {
+ void testSetClusterConfigsValidation() throws Exception {
// Try to set an invalid config (not allowed for dynamic change)
assertThatThrownBy(
() ->
@@ -445,6 +445,7 @@ void testSetClusterConfigValidation() throws Exception {
CATALOG_NAME))
.await())
.rootCause()
+ // TODO: Fix misleading error: non-existent key reported as not allowed.
.hasMessageContaining(
"The config key invalid.config.key is not allowed to be changed dynamically");
@@ -492,7 +493,7 @@ void testSetClusterConfigValidation() throws Exception {
}
@Test
- void testResetClusterConfigValidation() throws Exception {
+ void testResetClusterConfigsValidation() throws Exception {
// Try to reset an invalid config
assertThatThrownBy(
() ->
@@ -502,6 +503,7 @@ void testResetClusterConfigValidation() throws Exception {
CATALOG_NAME))
.await())
.rootCause()
+ // TODO: Fix misleading error: non-existent key reported as not allowed.
.hasMessageContaining(
"The config key invalid.config.key is not allowed to be changed dynamically");