configEntryMap =
+ configs.stream()
+ .collect(Collectors.toMap(ConfigEntry::key, Function.identity()));
+ for (String key : configKeys) {
+ ConfigEntry entry = configEntryMap.get(key);
+ if (null != entry) {
results.add(
Row.of(
entry.key(),
@@ -97,7 +109,6 @@ private Row[] getConfigs(@Nullable String configKey) throws Exception {
entry.source() != null
? entry.source().name()
: "UNKNOWN"));
- break;
}
}
}
diff --git a/fluss-flink/fluss-flink-common/src/main/java/org/apache/fluss/flink/procedure/ProcedureManager.java b/fluss-flink/fluss-flink-common/src/main/java/org/apache/fluss/flink/procedure/ProcedureManager.java
index 12d4cf3675..7ebf73b818 100644
--- a/fluss-flink/fluss-flink-common/src/main/java/org/apache/fluss/flink/procedure/ProcedureManager.java
+++ b/fluss-flink/fluss-flink-common/src/main/java/org/apache/fluss/flink/procedure/ProcedureManager.java
@@ -70,8 +70,9 @@ private enum ProcedureEnum {
ADD_ACL("sys.add_acl", AddAclProcedure.class),
DROP_ACL("sys.drop_acl", DropAclProcedure.class),
List_ACL("sys.list_acl", ListAclProcedure.class),
- SET_CLUSTER_CONFIG("sys.set_cluster_config", SetClusterConfigProcedure.class),
- GET_CLUSTER_CONFIG("sys.get_cluster_config", GetClusterConfigProcedure.class),
+ SET_CLUSTER_CONFIGS("sys.set_cluster_configs", SetClusterConfigsProcedure.class),
+ GET_CLUSTER_CONFIGS("sys.get_cluster_configs", GetClusterConfigsProcedure.class),
+ RESET_CLUSTER_CONFIGS("sys.reset_cluster_configs", ResetClusterConfigsProcedure.class),
ADD_SERVER_TAG("sys.add_server_tag", AddServerTagProcedure.class),
REMOVE_SERVER_TAG("sys.remove_server_tag", RemoveServerTagProcedure.class);
diff --git a/fluss-flink/fluss-flink-common/src/main/java/org/apache/fluss/flink/procedure/ResetClusterConfigsProcedure.java b/fluss-flink/fluss-flink-common/src/main/java/org/apache/fluss/flink/procedure/ResetClusterConfigsProcedure.java
new file mode 100644
index 0000000000..88e0c297b6
--- /dev/null
+++ b/fluss-flink/fluss-flink-common/src/main/java/org/apache/fluss/flink/procedure/ResetClusterConfigsProcedure.java
@@ -0,0 +1,106 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.fluss.flink.procedure;
+
+import org.apache.fluss.config.cluster.AlterConfig;
+import org.apache.fluss.config.cluster.AlterConfigOpType;
+
+import org.apache.flink.table.annotation.ArgumentHint;
+import org.apache.flink.table.annotation.DataTypeHint;
+import org.apache.flink.table.annotation.ProcedureHint;
+import org.apache.flink.table.procedure.ProcedureContext;
+
+import java.util.ArrayList;
+import java.util.List;
+
+/**
+ * Procedure to reset cluster configurations to their default values.
+ *
+ * This procedure reverts the configurations to their initial system defaults. The changes are:
+ *
+ *
+ * - Validated by the CoordinatorServer before persistence
+ *
- Persisted in ZooKeeper for durability
+ *
- Applied to all relevant servers (Coordinator and TabletServers)
+ *
- Survive server restarts
+ *
+ *
+ * Usage examples:
+ *
+ *
+ * -- reset a configuration
+ * CALL sys.reset_cluster_configs('kv.rocksdb.shared-rate-limiter.bytes-per-sec');
+ *
+ * -- reset multiple configurations at one time
+ * CALL sys.reset_cluster_configs('kv.rocksdb.shared-rate-limiter.bytes-per-sec', 'datalake.format');
+ *
+ *
+ *
+ * Note: In theory, an operation like Reset to default value should always succeed,
+ * as the default value should be a valid one
+ */
+public class ResetClusterConfigsProcedure extends ProcedureBase {
+
+ @ProcedureHint(
+ argument = {@ArgumentHint(name = "config_keys", type = @DataTypeHint("STRING"))},
+ isVarArgs = true)
+ public String[] call(ProcedureContext context, String... configKeys) throws Exception {
+ try {
+ // Validate config key
+ if (configKeys.length == 0) {
+ throw new IllegalArgumentException(
+ "config_keys cannot be null or empty. "
+ + "Please specify valid configuration keys.");
+ }
+
+ List configList = new ArrayList<>();
+ List resultMessage = new ArrayList<>();
+
+ for (String key : configKeys) {
+ String configKey = key.trim();
+ if (configKey.isEmpty()) {
+ throw new IllegalArgumentException(
+ "Config key cannot be null or empty. "
+ + "Please specify a valid configuration key.");
+ }
+
+ String operationDesc = "deleted (reset to default)";
+
+ AlterConfig alterConfig =
+ new AlterConfig(configKey, null, AlterConfigOpType.DELETE);
+ configList.add(alterConfig);
+ resultMessage.add(
+ String.format(
+ "Successfully %s configuration '%s'. ", operationDesc, configKey));
+ }
+
+ // Call Admin API to modify cluster configuration
+ // This will trigger validation on CoordinatorServer before persistence
+ admin.alterClusterConfigs(configList).get();
+
+ return resultMessage.toArray(new String[0]);
+ } catch (IllegalArgumentException e) {
+ // Re-throw validation errors with original message
+ throw e;
+ } catch (Exception e) {
+ // Wrap other exceptions with more context
+ throw new RuntimeException(
+ String.format("Failed to reset cluster config: %s", e.getMessage()), e);
+ }
+ }
+}
diff --git a/fluss-flink/fluss-flink-common/src/main/java/org/apache/fluss/flink/procedure/SetClusterConfigProcedure.java b/fluss-flink/fluss-flink-common/src/main/java/org/apache/fluss/flink/procedure/SetClusterConfigProcedure.java
deleted file mode 100644
index ee4dbcf37e..0000000000
--- a/fluss-flink/fluss-flink-common/src/main/java/org/apache/fluss/flink/procedure/SetClusterConfigProcedure.java
+++ /dev/null
@@ -1,128 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.fluss.flink.procedure;
-
-import org.apache.fluss.config.cluster.AlterConfig;
-import org.apache.fluss.config.cluster.AlterConfigOpType;
-
-import org.apache.flink.table.annotation.ArgumentHint;
-import org.apache.flink.table.annotation.DataTypeHint;
-import org.apache.flink.table.annotation.ProcedureHint;
-import org.apache.flink.table.procedure.ProcedureContext;
-
-import javax.annotation.Nullable;
-
-import java.util.Collections;
-
-/**
- * Procedure to set or delete cluster configuration dynamically.
- *
- * This procedure allows modifying dynamic cluster configurations. The changes are:
- *
- *
- * - Validated by the CoordinatorServer before persistence
- *
- Persisted in ZooKeeper for durability
- *
- Applied to all relevant servers (Coordinator and TabletServers)
- *
- Survive server restarts
- *
- *
- * Usage examples:
- *
- *
- * -- Set a configuration
- * CALL sys.set_cluster_config('kv.rocksdb.shared-rate-limiter.bytes-per-sec', '200MB');
- * CALL sys.set_cluster_config('datalake.format', 'paimon');
- *
- * -- Delete a configuration (reset to default)
- * CALL sys.set_cluster_config('kv.rocksdb.shared-rate-limiter.bytes-per-sec', NULL);
- * CALL sys.set_cluster_config('kv.rocksdb.shared-rate-limiter.bytes-per-sec', '');
- *
- *
- * Note: Not all configurations support dynamic changes. The server will validate the
- * change and reject it if the configuration cannot be modified dynamically or if the new value is
- * invalid.
- */
-public class SetClusterConfigProcedure extends ProcedureBase {
-
- @ProcedureHint(argument = {@ArgumentHint(name = "config_key", type = @DataTypeHint("STRING"))})
- public String[] call(ProcedureContext context, String configKey) throws Exception {
- return performSet(configKey, null);
- }
-
- @ProcedureHint(
- argument = {
- @ArgumentHint(name = "config_key", type = @DataTypeHint("STRING")),
- @ArgumentHint(name = "config_value", type = @DataTypeHint("STRING"))
- })
- public String[] call(ProcedureContext context, String configKey, String configValue)
- throws Exception {
- return performSet(configKey, configValue);
- }
-
- private String[] performSet(String configKey, @Nullable String configValue) throws Exception {
-
- try {
- // Validate config key
- if (configKey == null || configKey.trim().isEmpty()) {
- throw new IllegalArgumentException(
- "Config key cannot be null or empty. "
- + "Please specify a valid configuration key.");
- }
-
- configKey = configKey.trim();
-
- // Determine operation type
- AlterConfigOpType opType;
- String operationDesc;
-
- if (configValue == null || configValue.trim().isEmpty()) {
- // Delete operation - reset to default
- opType = AlterConfigOpType.DELETE;
- operationDesc = "deleted (reset to default)";
- } else {
- // Set operation
- opType = AlterConfigOpType.SET;
- operationDesc = String.format("set to '%s'", configValue);
- }
-
- // Construct configuration modification operation.
- AlterConfig alterConfig = new AlterConfig(configKey, configValue, opType);
-
- // Call Admin API to modify cluster configuration
- // This will trigger validation on CoordinatorServer before persistence
- admin.alterClusterConfigs(Collections.singletonList(alterConfig)).get();
-
- return new String[] {
- String.format(
- "Successfully %s configuration '%s'. "
- + "The change is persisted in ZooKeeper and applied to all servers.",
- operationDesc, configKey)
- };
-
- } catch (IllegalArgumentException e) {
- // Re-throw validation errors with original message
- throw e;
- } catch (Exception e) {
- // Wrap other exceptions with more context
- throw new RuntimeException(
- String.format(
- "Failed to set cluster config '%s': %s", configKey, e.getMessage()),
- e);
- }
- }
-}
diff --git a/fluss-flink/fluss-flink-common/src/main/java/org/apache/fluss/flink/procedure/SetClusterConfigsProcedure.java b/fluss-flink/fluss-flink-common/src/main/java/org/apache/fluss/flink/procedure/SetClusterConfigsProcedure.java
new file mode 100644
index 0000000000..7246706af8
--- /dev/null
+++ b/fluss-flink/fluss-flink-common/src/main/java/org/apache/fluss/flink/procedure/SetClusterConfigsProcedure.java
@@ -0,0 +1,114 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.fluss.flink.procedure;
+
+import org.apache.fluss.config.cluster.AlterConfig;
+import org.apache.fluss.config.cluster.AlterConfigOpType;
+
+import org.apache.flink.table.annotation.ArgumentHint;
+import org.apache.flink.table.annotation.DataTypeHint;
+import org.apache.flink.table.annotation.ProcedureHint;
+import org.apache.flink.table.procedure.ProcedureContext;
+
+import java.util.ArrayList;
+import java.util.List;
+
+/**
+ * Procedure to set or delete cluster configuration dynamically.
+ *
+ *
This procedure allows modifying dynamic cluster configurations. The changes are:
+ *
+ *
+ * - Validated by the CoordinatorServer before persistence
+ *
- Persisted in ZooKeeper for durability
+ *
- Applied to all relevant servers (Coordinator and TabletServers)
+ *
- Survive server restarts
+ *
+ *
+ * Usage examples:
+ *
+ *
+ * -- Set a configuration
+ * CALL sys.set_cluster_configs('kv.rocksdb.shared-rate-limiter.bytes-per-sec', '200MB');
+ * CALL sys.set_cluster_configs('datalake.format', 'paimon');
+ *
+ * -- Set multiple configurations at one time
+ * CALL sys.set_cluster_configs('kv.rocksdb.shared-rate-limiter.bytes-per-sec', '200MB','datalake.format', 'paimon');
+ *
+ *
+ * Note: Not all configurations support dynamic changes. The server will validate the
+ * change and reject it if the configuration cannot be modified dynamically or if the new value is
+ * invalid.
+ */
+public class SetClusterConfigsProcedure extends ProcedureBase {
+
+ @ProcedureHint(
+ argument = {@ArgumentHint(name = "config_pairs", type = @DataTypeHint("STRING"))},
+ isVarArgs = true)
+ public String[] call(ProcedureContext context, String... configPairs) throws Exception {
+ try {
+ // Validate config key
+ if (configPairs.length == 0) {
+ throw new IllegalArgumentException(
+ "config_pairs cannot be null or empty. "
+ + "Please specify a valid configuration pairs.");
+ }
+
+ if (configPairs.length % 2 != 0) {
+ throw new IllegalArgumentException(
+ "config_pairs must be set in pairs. "
+ + "Please specify a valid configuration pairs.");
+ }
+ List configList = new ArrayList<>();
+ List resultMessage = new ArrayList<>();
+
+ for (int i = 0; i < configPairs.length; i += 2) {
+ String configKey = configPairs[i].trim();
+ if (configKey.isEmpty()) {
+ throw new IllegalArgumentException(
+ "Config key cannot be null or empty. "
+ + "Please specify a valid configuration key.");
+ }
+ String configValue = configPairs[i + 1];
+
+ String operationDesc = String.format("set to '%s'", configValue);
+
+ // Construct configuration modification operation.
+ AlterConfig alterConfig =
+ new AlterConfig(configKey, configValue, AlterConfigOpType.SET);
+ configList.add(alterConfig);
+ resultMessage.add(
+ String.format(
+ "Successfully %s configuration '%s'. ", operationDesc, configKey));
+ }
+
+ // Call Admin API to modify cluster configuration
+ // This will trigger validation on CoordinatorServer before persistence
+ admin.alterClusterConfigs(configList).get();
+
+ return resultMessage.toArray(new String[0]);
+ } catch (IllegalArgumentException e) {
+ // Re-throw validation errors with original message
+ throw e;
+ } catch (Exception e) {
+ // Wrap other exceptions with more context
+ throw new RuntimeException(
+ String.format("Failed to set cluster config: %s", e.getMessage()), e);
+ }
+ }
+}
diff --git a/fluss-flink/fluss-flink-common/src/test/java/org/apache/fluss/flink/procedure/FlinkProcedureITCase.java b/fluss-flink/fluss-flink-common/src/test/java/org/apache/fluss/flink/procedure/FlinkProcedureITCase.java
index b764923050..b824111c3e 100644
--- a/fluss-flink/fluss-flink-common/src/test/java/org/apache/fluss/flink/procedure/FlinkProcedureITCase.java
+++ b/fluss-flink/fluss-flink-common/src/test/java/org/apache/fluss/flink/procedure/FlinkProcedureITCase.java
@@ -100,9 +100,10 @@ void testShowProcedures() throws Exception {
Arrays.asList(
"+I[sys.add_acl]",
"+I[sys.drop_acl]",
- "+I[sys.get_cluster_config]",
+ "+I[sys.get_cluster_configs]",
"+I[sys.list_acl]",
- "+I[sys.set_cluster_config]",
+ "+I[sys.set_cluster_configs]",
+ "+I[sys.reset_cluster_configs]",
"+I[sys.add_server_tag]",
"+I[sys.remove_server_tag]");
// make sure no more results is unread.
@@ -265,12 +266,12 @@ void testDisableAuthorization() throws Exception {
}
@Test
- void testGetClusterConfig() throws Exception {
+ void testGetClusterConfigs() throws Exception {
// Get specific config
try (CloseableIterator resultIterator =
tEnv.executeSql(
String.format(
- "Call %s.sys.get_cluster_config('%s')",
+ "Call %s.sys.get_cluster_configs('%s')",
CATALOG_NAME,
ConfigOptions.KV_SHARED_RATE_LIMITER_BYTES_PER_SEC.key()))
.collect()) {
@@ -283,9 +284,34 @@ void testGetClusterConfig() throws Exception {
assertThat(row.getField(2)).isNotNull(); // config_source
}
+ // Get multiple config
+ try (CloseableIterator resultIterator =
+ tEnv.executeSql(
+ String.format(
+ "Call %s.sys.get_cluster_configs('%s', '%s')",
+ CATALOG_NAME,
+ ConfigOptions.KV_SHARED_RATE_LIMITER_BYTES_PER_SEC.key(),
+ ConfigOptions.DATALAKE_FORMAT.key()))
+ .collect()) {
+ List results = CollectionUtil.iteratorToList(resultIterator);
+ assertThat(results).hasSize(2);
+ // the first row
+ Row row0 = results.get(0);
+ assertThat(row0.getField(0))
+ .isEqualTo(ConfigOptions.KV_SHARED_RATE_LIMITER_BYTES_PER_SEC.key());
+ assertThat(row0.getField(1)).isEqualTo("100 mb");
+ assertThat(row0.getField(2)).isNotNull(); // config_source
+
+ // the second row
+ Row row1 = results.get(1);
+ assertThat(row1.getField(0)).isEqualTo(ConfigOptions.DATALAKE_FORMAT.key());
+ assertThat(row1.getField(1)).isEqualTo("paimon");
+ assertThat(row1.getField(2)).isNotNull(); // config_source
+ }
+
// Get all configs
try (CloseableIterator resultIterator =
- tEnv.executeSql(String.format("Call %s.sys.get_cluster_config()", CATALOG_NAME))
+ tEnv.executeSql(String.format("Call %s.sys.get_cluster_configs()", CATALOG_NAME))
.collect()) {
List results = CollectionUtil.iteratorToList(resultIterator);
assertThat(results).isNotEmpty();
@@ -295,7 +321,7 @@ void testGetClusterConfig() throws Exception {
try (CloseableIterator resultIterator =
tEnv.executeSql(
String.format(
- "Call %s.sys.get_cluster_config('non.existent.config')",
+ "Call %s.sys.get_cluster_configs('non.existent.config')",
CATALOG_NAME))
.collect()) {
List results = CollectionUtil.iteratorToList(resultIterator);
@@ -305,19 +331,19 @@ void testGetClusterConfig() throws Exception {
// reset cluster configs.
tEnv.executeSql(
String.format(
- "Call %s.sys.set_cluster_config('%s')",
+ "Call %s.sys.reset_cluster_configs('%s')",
CATALOG_NAME,
ConfigOptions.KV_SHARED_RATE_LIMITER_BYTES_PER_SEC.key()))
.await();
}
@Test
- void testSetClusterConfig() throws Exception {
+ void testSetClusterConfigs() throws Exception {
// Test setting a valid config
try (CloseableIterator resultIterator =
tEnv.executeSql(
String.format(
- "Call %s.sys.set_cluster_config('%s', '200MB')",
+ "Call %s.sys.set_cluster_configs('%s', '200MB')",
CATALOG_NAME,
ConfigOptions.KV_SHARED_RATE_LIMITER_BYTES_PER_SEC.key()))
.collect()) {
@@ -329,66 +355,169 @@ void testSetClusterConfig() throws Exception {
.contains(ConfigOptions.KV_SHARED_RATE_LIMITER_BYTES_PER_SEC.key());
}
+ // Test setting multiple config
+ try (CloseableIterator resultIterator =
+ tEnv.executeSql(
+ String.format(
+ "Call %s.sys.set_cluster_configs('%s', '300MB', '%s', 'paimon')",
+ CATALOG_NAME,
+ ConfigOptions.KV_SHARED_RATE_LIMITER_BYTES_PER_SEC.key(),
+ ConfigOptions.DATALAKE_FORMAT.key()))
+ .collect()) {
+ List results = CollectionUtil.iteratorToList(resultIterator);
+ assertThat(results).hasSize(2);
+ assertThat(results.get(0).getField(0))
+ .asString()
+ .contains("Successfully set to '300MB'")
+ .contains(ConfigOptions.KV_SHARED_RATE_LIMITER_BYTES_PER_SEC.key());
+
+ assertThat(results.get(1).getField(0))
+ .asString()
+ .contains("Successfully set to 'paimon'")
+ .contains(ConfigOptions.DATALAKE_FORMAT.key());
+ }
+
// Verify the config was actually set
try (CloseableIterator resultIterator =
tEnv.executeSql(
String.format(
- "Call %s.sys.get_cluster_config('%s')",
+ "Call %s.sys.get_cluster_configs('%s')",
CATALOG_NAME,
ConfigOptions.KV_SHARED_RATE_LIMITER_BYTES_PER_SEC.key()))
.collect()) {
List results = CollectionUtil.iteratorToList(resultIterator);
assertThat(results).hasSize(1);
- assertThat(results.get(0).getField(1)).isEqualTo("200MB");
+ assertThat(results.get(0).getField(1)).isEqualTo("300MB");
}
// reset cluster configs.
tEnv.executeSql(
String.format(
- "Call %s.sys.set_cluster_config('%s')",
+ "Call %s.sys.reset_cluster_configs('%s', '%s')",
CATALOG_NAME,
- ConfigOptions.KV_SHARED_RATE_LIMITER_BYTES_PER_SEC.key()))
+ ConfigOptions.KV_SHARED_RATE_LIMITER_BYTES_PER_SEC.key(),
+ ConfigOptions.DATALAKE_FORMAT.key()))
.await();
}
@Test
- void testDeleteClusterConfig() throws Exception {
+ void testResetClusterConfigs() throws Exception {
// First set a config
tEnv.executeSql(
String.format(
- "Call %s.sys.set_cluster_config('%s', 'paimon')",
- CATALOG_NAME, ConfigOptions.DATALAKE_FORMAT.key()))
+ "Call %s.sys.set_cluster_configs('%s', 'paimon', '%s', '200MB')",
+ CATALOG_NAME,
+ ConfigOptions.DATALAKE_FORMAT.key(),
+ ConfigOptions.KV_SHARED_RATE_LIMITER_BYTES_PER_SEC.key()))
.await();
- // Delete the config (reset to default) - omitting the value parameter
+ // Delete the config (reset to default)
try (CloseableIterator resultIterator =
tEnv.executeSql(
String.format(
- "Call %s.sys.set_cluster_config('%s')",
- CATALOG_NAME, ConfigOptions.DATALAKE_FORMAT.key()))
+ "Call %s.sys.reset_cluster_configs('%s', '%s')",
+ CATALOG_NAME,
+ ConfigOptions.DATALAKE_FORMAT.key(),
+ ConfigOptions.KV_SHARED_RATE_LIMITER_BYTES_PER_SEC.key()))
.collect()) {
List results = CollectionUtil.iteratorToList(resultIterator);
- assertThat(results).hasSize(1);
+ assertThat(results).hasSize(2);
assertThat(results.get(0).getField(0))
.asString()
.contains("Successfully deleted")
.contains(ConfigOptions.DATALAKE_FORMAT.key());
+
+ assertThat(results.get(1).getField(0))
+ .asString()
+ .contains("Successfully deleted")
+ .contains(ConfigOptions.KV_SHARED_RATE_LIMITER_BYTES_PER_SEC.key());
}
}
@Test
- void testSetClusterConfigValidation() throws Exception {
+ void testSetClusterConfigsValidation() throws Exception {
// Try to set an invalid config (not allowed for dynamic change)
assertThatThrownBy(
() ->
tEnv.executeSql(
String.format(
- "Call %s.sys.set_cluster_config('invalid.config.key', 'value')",
+ "Call %s.sys.set_cluster_configs('invalid.config.key', 'value')",
CATALOG_NAME))
.await())
.rootCause()
+ // TODO: Fix misleading error: non-existent key reported as not allowed.
.hasMessageContaining(
"The config key invalid.config.key is not allowed to be changed dynamically");
+
+ // validation to ensure an even number of arguments are passed
+ assertThatThrownBy(
+ () ->
+ tEnv.executeSql(
+ String.format(
+ "Call %s.sys.set_cluster_configs('%s')",
+ CATALOG_NAME,
+ ConfigOptions
+ .KV_SHARED_RATE_LIMITER_BYTES_PER_SEC
+ .key()))
+ .await())
+ .rootCause()
+ .hasMessageContaining(
+ "config_pairs must be set in pairs. Please specify a valid configuration pairs");
+
+ // Try to no parameters passed
+ assertThatThrownBy(
+ () ->
+ tEnv.executeSql(
+ String.format(
+ "Call %s.sys.set_cluster_configs()",
+ CATALOG_NAME))
+ .await())
+ .rootCause()
+ .hasMessageContaining(
+ "config_pairs cannot be null or empty. Please specify a valid configuration pairs");
+
+ // Try to mismatched key-value pairs in the input parameters.
+ assertThatThrownBy(
+ () ->
+ tEnv.executeSql(
+ String.format(
+ "Call %s.sys.set_cluster_configs('%s', 'paimon')",
+ CATALOG_NAME,
+ ConfigOptions
+ .KV_SHARED_RATE_LIMITER_BYTES_PER_SEC
+ .key()))
+ .await())
+ .rootCause()
+ .hasMessageContaining(
+ "Could not parse value 'paimon' for key 'kv.rocksdb.shared-rate-limiter.bytes-per-sec'");
+ }
+
+ @Test
+ void testResetClusterConfigsValidation() throws Exception {
+ // Try to reset an invalid config
+ assertThatThrownBy(
+ () ->
+ tEnv.executeSql(
+ String.format(
+ "Call %s.sys.reset_cluster_configs('invalid.config.key')",
+ CATALOG_NAME))
+ .await())
+ .rootCause()
+ // TODO: Fix misleading error: non-existent key reported as not allowed.
+ .hasMessageContaining(
+ "The config key invalid.config.key is not allowed to be changed dynamically");
+
+ // Try to no parameters passed
+ assertThatThrownBy(
+ () ->
+ tEnv.executeSql(
+ String.format(
+ "Call %s.sys.reset_cluster_configs()",
+ CATALOG_NAME))
+ .await())
+ .rootCause()
+ .hasMessageContaining(
+ "config_keys cannot be null or empty. Please specify valid configuration keys");
}
@ParameterizedTest
diff --git a/website/docs/engine-flink/procedures.md b/website/docs/engine-flink/procedures.md
index 95e8724eb0..eeab4496fd 100644
--- a/website/docs/engine-flink/procedures.md
+++ b/website/docs/engine-flink/procedures.md
@@ -162,23 +162,23 @@ CALL sys.list_acl(
Fluss provides procedures to dynamically manage cluster configurations without requiring a server restart.
-### get_cluster_config
+### get_cluster_configs
Retrieve cluster configuration values.
**Syntax:**
```sql
--- Get a specific configuration
-CALL [catalog_name.]sys.get_cluster_config(config_key => 'STRING')
+-- Get multiple configurations
+CALL [catalog_name.]sys.get_cluster_configs(config_keys => 'key1' [, 'key2', ...])
-- Get all cluster configurations
-CALL [catalog_name.]sys.get_cluster_config()
+CALL [catalog_name.]sys.get_cluster_configs()
```
**Parameters:**
-- `config_key` (optional): The configuration key to retrieve. If omitted, returns all cluster configurations.
+- `config_keys` (optional): The configuration keys to retrieve. If omitted, returns all cluster configurations.
**Returns:** A table with columns:
- `config_key`: The configuration key name
@@ -192,35 +192,35 @@ CALL [catalog_name.]sys.get_cluster_config()
USE fluss_catalog;
-- Get a specific configuration
-CALL sys.get_cluster_config(
- config_key => 'kv.rocksdb.shared-rate-limiter.bytes-per-sec'
+CALL sys.get_cluster_configs(
+ config_keys => 'kv.rocksdb.shared-rate-limiter.bytes-per-sec'
+);
+
+-- Get multiple configuration
+CALL sys.get_cluster_configs(
+ config_keys => 'kv.rocksdb.shared-rate-limiter.bytes-per-sec', 'datalake.format'
);
-- Get all cluster configurations
-CALL sys.get_cluster_config();
+CALL sys.get_cluster_configs();
```
-### set_cluster_config
+### set_cluster_configs
-Set or delete a cluster configuration dynamically.
+Set cluster configurations dynamically.
**Syntax:**
```sql
--- Set a configuration value
-CALL [catalog_name.]sys.set_cluster_config(
- config_key => 'STRING',
- config_value => 'STRING'
+-- Set configuration values
+CALL [catalog_name.]sys.set_cluster_configs(
+ config_pairs => 'key1', 'value1' [, 'key2', 'value2' ...]
)
-
--- Delete a configuration (reset to default)
-CALL [catalog_name.]sys.set_cluster_config(config_key => 'STRING')
```
**Parameters:**
-- `config_key` (required): The configuration key to modify.
-- `config_value` (optional): The new value to set. If omitted or empty, the configuration is deleted (reset to default).
+- `config_pairs`(required): For key-value pairs in configuration items, the number of parameters must be even.
**Important Notes:**
@@ -236,20 +236,45 @@ CALL [catalog_name.]sys.set_cluster_config(config_key => 'STRING')
USE fluss_catalog;
-- Set RocksDB rate limiter
-CALL sys.set_cluster_config(
- config_key => 'kv.rocksdb.shared-rate-limiter.bytes-per-sec',
- config_value => '200MB'
+CALL sys.set_cluster_configs(
+ config_pairs => 'kv.rocksdb.shared-rate-limiter.bytes-per-sec', '200MB'
);
--- Set datalake format
-CALL sys.set_cluster_config(
- config_key => 'datalake.format',
- config_value => 'paimon'
+-- Set RocksDB rate limiter and datalake format
+CALL sys.set_cluster_configs(
+ config_pairs => 'kv.rocksdb.shared-rate-limiter.bytes-per-sec', '200MB', 'datalake.format','paimon'
);
+```
--- Delete a configuration (reset to default)
-CALL sys.set_cluster_config(
- config_key => 'kv.rocksdb.shared-rate-limiter.bytes-per-sec'
-);
+### reset_cluster_configs
+
+reset cluster configurations dynamically.
+
+**Syntax:**
+
+```sql
+-- reset configuration values
+CALL [catalog_name.]sys.reset_cluster_configs(config_keys => 'key1' [, 'key2', ...])
```
+**Parameters:**
+
+- `config_keys`(required): The configuration keys to reset.
+
+
+**Example:**
+
+```sql title="Flink SQL"
+-- Use the Fluss catalog (replace 'fluss_catalog' with your catalog name if different)
+USE fluss_catalog;
+
+-- Reset a specific configuration
+CALL sys.reset_cluster_configs(
+ config_keys => 'kv.rocksdb.shared-rate-limiter.bytes-per-sec'
+);
+
+-- Reset RocksDB rate limiter and datalake format
+CALL sys.reset_cluster_configs(
+ config_keys => 'kv.rocksdb.shared-rate-limiter.bytes-per-sec', 'datalake.format'
+);
+```
\ No newline at end of file