diff --git a/fluss-flink/fluss-flink-common/src/main/java/org/apache/fluss/flink/procedure/GetClusterConfigProcedure.java b/fluss-flink/fluss-flink-common/src/main/java/org/apache/fluss/flink/procedure/GetClusterConfigsProcedure.java similarity index 71% rename from fluss-flink/fluss-flink-common/src/main/java/org/apache/fluss/flink/procedure/GetClusterConfigProcedure.java rename to fluss-flink/fluss-flink-common/src/main/java/org/apache/fluss/flink/procedure/GetClusterConfigsProcedure.java index 444f1dd13d..0bdcfaff8b 100644 --- a/fluss-flink/fluss-flink-common/src/main/java/org/apache/fluss/flink/procedure/GetClusterConfigProcedure.java +++ b/fluss-flink/fluss-flink-common/src/main/java/org/apache/fluss/flink/procedure/GetClusterConfigsProcedure.java @@ -30,6 +30,9 @@ import java.util.ArrayList; import java.util.Collection; import java.util.List; +import java.util.Map; +import java.util.function.Function; +import java.util.stream.Collectors; /** * Procedure to get cluster configuration(s). @@ -37,7 +40,7 @@ *

This procedure allows querying dynamic cluster configurations. It can retrieve: * *

* @@ -45,39 +48,43 @@ * *
  * -- Get a specific configuration
- * CALL sys.get_cluster_config('kv.rocksdb.shared-rate-limiter.bytes-per-sec');
+ * CALL sys.get_cluster_configs('kv.rocksdb.shared-rate-limiter.bytes-per-sec');
+ *
+ * -- Get multiple configurations
+ * CALL sys.get_cluster_configs('kv.rocksdb.shared-rate-limiter.bytes-per-sec', 'datalake.format');
  *
  * -- Get all cluster configurations
- * CALL sys.get_cluster_config();
+ * CALL sys.get_cluster_configs();
  * 
*/ -public class GetClusterConfigProcedure extends ProcedureBase { +public class GetClusterConfigsProcedure extends ProcedureBase { @ProcedureHint( output = @DataTypeHint( "ROW")) public Row[] call(ProcedureContext context) throws Exception { - return getConfigs(null); + return getConfigs(); } @ProcedureHint( - argument = {@ArgumentHint(name = "config_key", type = @DataTypeHint("STRING"))}, + argument = {@ArgumentHint(name = "config_keys", type = @DataTypeHint("STRING"))}, + isVarArgs = true, output = @DataTypeHint( "ROW")) - public Row[] call(ProcedureContext context, String configKey) throws Exception { - return getConfigs(configKey); + public Row[] call(ProcedureContext context, String... configKeys) throws Exception { + return getConfigs(configKeys); } - private Row[] getConfigs(@Nullable String configKey) throws Exception { + private Row[] getConfigs(@Nullable String... configKeys) throws Exception { try { // Get all cluster configurations Collection configs = admin.describeClusterConfigs().get(); List results = new ArrayList<>(); - if (configKey == null || configKey.isEmpty()) { + if (configKeys == null || configKeys.length == 0) { // Return all configurations for (ConfigEntry entry : configs) { results.add( @@ -87,9 +94,14 @@ private Row[] getConfigs(@Nullable String configKey) throws Exception { entry.source() != null ? entry.source().name() : "UNKNOWN")); } } else { - // Find specific configuration - for (ConfigEntry entry : configs) { - if (entry.key().equals(configKey)) { + // Find configurations + // The order of the results is the same as that of the key. + Map configEntryMap = + configs.stream() + .collect(Collectors.toMap(ConfigEntry::key, Function.identity())); + for (String key : configKeys) { + ConfigEntry entry = configEntryMap.get(key); + if (null != entry) { results.add( Row.of( entry.key(), @@ -97,7 +109,6 @@ private Row[] getConfigs(@Nullable String configKey) throws Exception { entry.source() != null ? entry.source().name() : "UNKNOWN")); - break; } } } diff --git a/fluss-flink/fluss-flink-common/src/main/java/org/apache/fluss/flink/procedure/ProcedureManager.java b/fluss-flink/fluss-flink-common/src/main/java/org/apache/fluss/flink/procedure/ProcedureManager.java index 12d4cf3675..7ebf73b818 100644 --- a/fluss-flink/fluss-flink-common/src/main/java/org/apache/fluss/flink/procedure/ProcedureManager.java +++ b/fluss-flink/fluss-flink-common/src/main/java/org/apache/fluss/flink/procedure/ProcedureManager.java @@ -70,8 +70,9 @@ private enum ProcedureEnum { ADD_ACL("sys.add_acl", AddAclProcedure.class), DROP_ACL("sys.drop_acl", DropAclProcedure.class), List_ACL("sys.list_acl", ListAclProcedure.class), - SET_CLUSTER_CONFIG("sys.set_cluster_config", SetClusterConfigProcedure.class), - GET_CLUSTER_CONFIG("sys.get_cluster_config", GetClusterConfigProcedure.class), + SET_CLUSTER_CONFIGS("sys.set_cluster_configs", SetClusterConfigsProcedure.class), + GET_CLUSTER_CONFIGS("sys.get_cluster_configs", GetClusterConfigsProcedure.class), + RESET_CLUSTER_CONFIGS("sys.reset_cluster_configs", ResetClusterConfigsProcedure.class), ADD_SERVER_TAG("sys.add_server_tag", AddServerTagProcedure.class), REMOVE_SERVER_TAG("sys.remove_server_tag", RemoveServerTagProcedure.class); diff --git a/fluss-flink/fluss-flink-common/src/main/java/org/apache/fluss/flink/procedure/ResetClusterConfigsProcedure.java b/fluss-flink/fluss-flink-common/src/main/java/org/apache/fluss/flink/procedure/ResetClusterConfigsProcedure.java new file mode 100644 index 0000000000..88e0c297b6 --- /dev/null +++ b/fluss-flink/fluss-flink-common/src/main/java/org/apache/fluss/flink/procedure/ResetClusterConfigsProcedure.java @@ -0,0 +1,106 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.fluss.flink.procedure; + +import org.apache.fluss.config.cluster.AlterConfig; +import org.apache.fluss.config.cluster.AlterConfigOpType; + +import org.apache.flink.table.annotation.ArgumentHint; +import org.apache.flink.table.annotation.DataTypeHint; +import org.apache.flink.table.annotation.ProcedureHint; +import org.apache.flink.table.procedure.ProcedureContext; + +import java.util.ArrayList; +import java.util.List; + +/** + * Procedure to reset cluster configurations to their default values. + * + *

This procedure reverts the configurations to their initial system defaults. The changes are: + * + *

    + *
  • Validated by the CoordinatorServer before persistence + *
  • Persisted in ZooKeeper for durability + *
  • Applied to all relevant servers (Coordinator and TabletServers) + *
  • Survive server restarts + *
+ * + *

Usage examples: + * + *

+ * -- reset a configuration
+ * CALL sys.reset_cluster_configs('kv.rocksdb.shared-rate-limiter.bytes-per-sec');
+ *
+ * -- reset multiple configurations at one time
+ * CALL sys.reset_cluster_configs('kv.rocksdb.shared-rate-limiter.bytes-per-sec', 'datalake.format');
+ *
+ * 
+ * + *

Note: In theory, an operation like Reset to default value should always succeed, + * as the default value should be a valid one + */ +public class ResetClusterConfigsProcedure extends ProcedureBase { + + @ProcedureHint( + argument = {@ArgumentHint(name = "config_keys", type = @DataTypeHint("STRING"))}, + isVarArgs = true) + public String[] call(ProcedureContext context, String... configKeys) throws Exception { + try { + // Validate config key + if (configKeys.length == 0) { + throw new IllegalArgumentException( + "config_keys cannot be null or empty. " + + "Please specify valid configuration keys."); + } + + List configList = new ArrayList<>(); + List resultMessage = new ArrayList<>(); + + for (String key : configKeys) { + String configKey = key.trim(); + if (configKey.isEmpty()) { + throw new IllegalArgumentException( + "Config key cannot be null or empty. " + + "Please specify a valid configuration key."); + } + + String operationDesc = "deleted (reset to default)"; + + AlterConfig alterConfig = + new AlterConfig(configKey, null, AlterConfigOpType.DELETE); + configList.add(alterConfig); + resultMessage.add( + String.format( + "Successfully %s configuration '%s'. ", operationDesc, configKey)); + } + + // Call Admin API to modify cluster configuration + // This will trigger validation on CoordinatorServer before persistence + admin.alterClusterConfigs(configList).get(); + + return resultMessage.toArray(new String[0]); + } catch (IllegalArgumentException e) { + // Re-throw validation errors with original message + throw e; + } catch (Exception e) { + // Wrap other exceptions with more context + throw new RuntimeException( + String.format("Failed to reset cluster config: %s", e.getMessage()), e); + } + } +} diff --git a/fluss-flink/fluss-flink-common/src/main/java/org/apache/fluss/flink/procedure/SetClusterConfigProcedure.java b/fluss-flink/fluss-flink-common/src/main/java/org/apache/fluss/flink/procedure/SetClusterConfigProcedure.java deleted file mode 100644 index ee4dbcf37e..0000000000 --- a/fluss-flink/fluss-flink-common/src/main/java/org/apache/fluss/flink/procedure/SetClusterConfigProcedure.java +++ /dev/null @@ -1,128 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright ownership. - * The ASF licenses this file to You under the Apache License, Version 2.0 - * (the "License"); you may not use this file except in compliance with - * the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.apache.fluss.flink.procedure; - -import org.apache.fluss.config.cluster.AlterConfig; -import org.apache.fluss.config.cluster.AlterConfigOpType; - -import org.apache.flink.table.annotation.ArgumentHint; -import org.apache.flink.table.annotation.DataTypeHint; -import org.apache.flink.table.annotation.ProcedureHint; -import org.apache.flink.table.procedure.ProcedureContext; - -import javax.annotation.Nullable; - -import java.util.Collections; - -/** - * Procedure to set or delete cluster configuration dynamically. - * - *

This procedure allows modifying dynamic cluster configurations. The changes are: - * - *

    - *
  • Validated by the CoordinatorServer before persistence - *
  • Persisted in ZooKeeper for durability - *
  • Applied to all relevant servers (Coordinator and TabletServers) - *
  • Survive server restarts - *
- * - *

Usage examples: - * - *

- * -- Set a configuration
- * CALL sys.set_cluster_config('kv.rocksdb.shared-rate-limiter.bytes-per-sec', '200MB');
- * CALL sys.set_cluster_config('datalake.format', 'paimon');
- *
- * -- Delete a configuration (reset to default)
- * CALL sys.set_cluster_config('kv.rocksdb.shared-rate-limiter.bytes-per-sec', NULL);
- * CALL sys.set_cluster_config('kv.rocksdb.shared-rate-limiter.bytes-per-sec', '');
- * 
- * - *

Note: Not all configurations support dynamic changes. The server will validate the - * change and reject it if the configuration cannot be modified dynamically or if the new value is - * invalid. - */ -public class SetClusterConfigProcedure extends ProcedureBase { - - @ProcedureHint(argument = {@ArgumentHint(name = "config_key", type = @DataTypeHint("STRING"))}) - public String[] call(ProcedureContext context, String configKey) throws Exception { - return performSet(configKey, null); - } - - @ProcedureHint( - argument = { - @ArgumentHint(name = "config_key", type = @DataTypeHint("STRING")), - @ArgumentHint(name = "config_value", type = @DataTypeHint("STRING")) - }) - public String[] call(ProcedureContext context, String configKey, String configValue) - throws Exception { - return performSet(configKey, configValue); - } - - private String[] performSet(String configKey, @Nullable String configValue) throws Exception { - - try { - // Validate config key - if (configKey == null || configKey.trim().isEmpty()) { - throw new IllegalArgumentException( - "Config key cannot be null or empty. " - + "Please specify a valid configuration key."); - } - - configKey = configKey.trim(); - - // Determine operation type - AlterConfigOpType opType; - String operationDesc; - - if (configValue == null || configValue.trim().isEmpty()) { - // Delete operation - reset to default - opType = AlterConfigOpType.DELETE; - operationDesc = "deleted (reset to default)"; - } else { - // Set operation - opType = AlterConfigOpType.SET; - operationDesc = String.format("set to '%s'", configValue); - } - - // Construct configuration modification operation. - AlterConfig alterConfig = new AlterConfig(configKey, configValue, opType); - - // Call Admin API to modify cluster configuration - // This will trigger validation on CoordinatorServer before persistence - admin.alterClusterConfigs(Collections.singletonList(alterConfig)).get(); - - return new String[] { - String.format( - "Successfully %s configuration '%s'. " - + "The change is persisted in ZooKeeper and applied to all servers.", - operationDesc, configKey) - }; - - } catch (IllegalArgumentException e) { - // Re-throw validation errors with original message - throw e; - } catch (Exception e) { - // Wrap other exceptions with more context - throw new RuntimeException( - String.format( - "Failed to set cluster config '%s': %s", configKey, e.getMessage()), - e); - } - } -} diff --git a/fluss-flink/fluss-flink-common/src/main/java/org/apache/fluss/flink/procedure/SetClusterConfigsProcedure.java b/fluss-flink/fluss-flink-common/src/main/java/org/apache/fluss/flink/procedure/SetClusterConfigsProcedure.java new file mode 100644 index 0000000000..7246706af8 --- /dev/null +++ b/fluss-flink/fluss-flink-common/src/main/java/org/apache/fluss/flink/procedure/SetClusterConfigsProcedure.java @@ -0,0 +1,114 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.fluss.flink.procedure; + +import org.apache.fluss.config.cluster.AlterConfig; +import org.apache.fluss.config.cluster.AlterConfigOpType; + +import org.apache.flink.table.annotation.ArgumentHint; +import org.apache.flink.table.annotation.DataTypeHint; +import org.apache.flink.table.annotation.ProcedureHint; +import org.apache.flink.table.procedure.ProcedureContext; + +import java.util.ArrayList; +import java.util.List; + +/** + * Procedure to set or delete cluster configuration dynamically. + * + *

This procedure allows modifying dynamic cluster configurations. The changes are: + * + *

    + *
  • Validated by the CoordinatorServer before persistence + *
  • Persisted in ZooKeeper for durability + *
  • Applied to all relevant servers (Coordinator and TabletServers) + *
  • Survive server restarts + *
+ * + *

Usage examples: + * + *

+ * -- Set a configuration
+ * CALL sys.set_cluster_configs('kv.rocksdb.shared-rate-limiter.bytes-per-sec', '200MB');
+ * CALL sys.set_cluster_configs('datalake.format', 'paimon');
+ *
+ * -- Set multiple configurations at one time
+ * CALL sys.set_cluster_configs('kv.rocksdb.shared-rate-limiter.bytes-per-sec', '200MB','datalake.format', 'paimon');
+ * 
+ * + *

Note: Not all configurations support dynamic changes. The server will validate the + * change and reject it if the configuration cannot be modified dynamically or if the new value is + * invalid. + */ +public class SetClusterConfigsProcedure extends ProcedureBase { + + @ProcedureHint( + argument = {@ArgumentHint(name = "config_pairs", type = @DataTypeHint("STRING"))}, + isVarArgs = true) + public String[] call(ProcedureContext context, String... configPairs) throws Exception { + try { + // Validate config key + if (configPairs.length == 0) { + throw new IllegalArgumentException( + "config_pairs cannot be null or empty. " + + "Please specify a valid configuration pairs."); + } + + if (configPairs.length % 2 != 0) { + throw new IllegalArgumentException( + "config_pairs must be set in pairs. " + + "Please specify a valid configuration pairs."); + } + List configList = new ArrayList<>(); + List resultMessage = new ArrayList<>(); + + for (int i = 0; i < configPairs.length; i += 2) { + String configKey = configPairs[i].trim(); + if (configKey.isEmpty()) { + throw new IllegalArgumentException( + "Config key cannot be null or empty. " + + "Please specify a valid configuration key."); + } + String configValue = configPairs[i + 1]; + + String operationDesc = String.format("set to '%s'", configValue); + + // Construct configuration modification operation. + AlterConfig alterConfig = + new AlterConfig(configKey, configValue, AlterConfigOpType.SET); + configList.add(alterConfig); + resultMessage.add( + String.format( + "Successfully %s configuration '%s'. ", operationDesc, configKey)); + } + + // Call Admin API to modify cluster configuration + // This will trigger validation on CoordinatorServer before persistence + admin.alterClusterConfigs(configList).get(); + + return resultMessage.toArray(new String[0]); + } catch (IllegalArgumentException e) { + // Re-throw validation errors with original message + throw e; + } catch (Exception e) { + // Wrap other exceptions with more context + throw new RuntimeException( + String.format("Failed to set cluster config: %s", e.getMessage()), e); + } + } +} diff --git a/fluss-flink/fluss-flink-common/src/test/java/org/apache/fluss/flink/procedure/FlinkProcedureITCase.java b/fluss-flink/fluss-flink-common/src/test/java/org/apache/fluss/flink/procedure/FlinkProcedureITCase.java index b764923050..b824111c3e 100644 --- a/fluss-flink/fluss-flink-common/src/test/java/org/apache/fluss/flink/procedure/FlinkProcedureITCase.java +++ b/fluss-flink/fluss-flink-common/src/test/java/org/apache/fluss/flink/procedure/FlinkProcedureITCase.java @@ -100,9 +100,10 @@ void testShowProcedures() throws Exception { Arrays.asList( "+I[sys.add_acl]", "+I[sys.drop_acl]", - "+I[sys.get_cluster_config]", + "+I[sys.get_cluster_configs]", "+I[sys.list_acl]", - "+I[sys.set_cluster_config]", + "+I[sys.set_cluster_configs]", + "+I[sys.reset_cluster_configs]", "+I[sys.add_server_tag]", "+I[sys.remove_server_tag]"); // make sure no more results is unread. @@ -265,12 +266,12 @@ void testDisableAuthorization() throws Exception { } @Test - void testGetClusterConfig() throws Exception { + void testGetClusterConfigs() throws Exception { // Get specific config try (CloseableIterator resultIterator = tEnv.executeSql( String.format( - "Call %s.sys.get_cluster_config('%s')", + "Call %s.sys.get_cluster_configs('%s')", CATALOG_NAME, ConfigOptions.KV_SHARED_RATE_LIMITER_BYTES_PER_SEC.key())) .collect()) { @@ -283,9 +284,34 @@ void testGetClusterConfig() throws Exception { assertThat(row.getField(2)).isNotNull(); // config_source } + // Get multiple config + try (CloseableIterator resultIterator = + tEnv.executeSql( + String.format( + "Call %s.sys.get_cluster_configs('%s', '%s')", + CATALOG_NAME, + ConfigOptions.KV_SHARED_RATE_LIMITER_BYTES_PER_SEC.key(), + ConfigOptions.DATALAKE_FORMAT.key())) + .collect()) { + List results = CollectionUtil.iteratorToList(resultIterator); + assertThat(results).hasSize(2); + // the first row + Row row0 = results.get(0); + assertThat(row0.getField(0)) + .isEqualTo(ConfigOptions.KV_SHARED_RATE_LIMITER_BYTES_PER_SEC.key()); + assertThat(row0.getField(1)).isEqualTo("100 mb"); + assertThat(row0.getField(2)).isNotNull(); // config_source + + // the second row + Row row1 = results.get(1); + assertThat(row1.getField(0)).isEqualTo(ConfigOptions.DATALAKE_FORMAT.key()); + assertThat(row1.getField(1)).isEqualTo("paimon"); + assertThat(row1.getField(2)).isNotNull(); // config_source + } + // Get all configs try (CloseableIterator resultIterator = - tEnv.executeSql(String.format("Call %s.sys.get_cluster_config()", CATALOG_NAME)) + tEnv.executeSql(String.format("Call %s.sys.get_cluster_configs()", CATALOG_NAME)) .collect()) { List results = CollectionUtil.iteratorToList(resultIterator); assertThat(results).isNotEmpty(); @@ -295,7 +321,7 @@ void testGetClusterConfig() throws Exception { try (CloseableIterator resultIterator = tEnv.executeSql( String.format( - "Call %s.sys.get_cluster_config('non.existent.config')", + "Call %s.sys.get_cluster_configs('non.existent.config')", CATALOG_NAME)) .collect()) { List results = CollectionUtil.iteratorToList(resultIterator); @@ -305,19 +331,19 @@ void testGetClusterConfig() throws Exception { // reset cluster configs. tEnv.executeSql( String.format( - "Call %s.sys.set_cluster_config('%s')", + "Call %s.sys.reset_cluster_configs('%s')", CATALOG_NAME, ConfigOptions.KV_SHARED_RATE_LIMITER_BYTES_PER_SEC.key())) .await(); } @Test - void testSetClusterConfig() throws Exception { + void testSetClusterConfigs() throws Exception { // Test setting a valid config try (CloseableIterator resultIterator = tEnv.executeSql( String.format( - "Call %s.sys.set_cluster_config('%s', '200MB')", + "Call %s.sys.set_cluster_configs('%s', '200MB')", CATALOG_NAME, ConfigOptions.KV_SHARED_RATE_LIMITER_BYTES_PER_SEC.key())) .collect()) { @@ -329,66 +355,169 @@ void testSetClusterConfig() throws Exception { .contains(ConfigOptions.KV_SHARED_RATE_LIMITER_BYTES_PER_SEC.key()); } + // Test setting multiple config + try (CloseableIterator resultIterator = + tEnv.executeSql( + String.format( + "Call %s.sys.set_cluster_configs('%s', '300MB', '%s', 'paimon')", + CATALOG_NAME, + ConfigOptions.KV_SHARED_RATE_LIMITER_BYTES_PER_SEC.key(), + ConfigOptions.DATALAKE_FORMAT.key())) + .collect()) { + List results = CollectionUtil.iteratorToList(resultIterator); + assertThat(results).hasSize(2); + assertThat(results.get(0).getField(0)) + .asString() + .contains("Successfully set to '300MB'") + .contains(ConfigOptions.KV_SHARED_RATE_LIMITER_BYTES_PER_SEC.key()); + + assertThat(results.get(1).getField(0)) + .asString() + .contains("Successfully set to 'paimon'") + .contains(ConfigOptions.DATALAKE_FORMAT.key()); + } + // Verify the config was actually set try (CloseableIterator resultIterator = tEnv.executeSql( String.format( - "Call %s.sys.get_cluster_config('%s')", + "Call %s.sys.get_cluster_configs('%s')", CATALOG_NAME, ConfigOptions.KV_SHARED_RATE_LIMITER_BYTES_PER_SEC.key())) .collect()) { List results = CollectionUtil.iteratorToList(resultIterator); assertThat(results).hasSize(1); - assertThat(results.get(0).getField(1)).isEqualTo("200MB"); + assertThat(results.get(0).getField(1)).isEqualTo("300MB"); } // reset cluster configs. tEnv.executeSql( String.format( - "Call %s.sys.set_cluster_config('%s')", + "Call %s.sys.reset_cluster_configs('%s', '%s')", CATALOG_NAME, - ConfigOptions.KV_SHARED_RATE_LIMITER_BYTES_PER_SEC.key())) + ConfigOptions.KV_SHARED_RATE_LIMITER_BYTES_PER_SEC.key(), + ConfigOptions.DATALAKE_FORMAT.key())) .await(); } @Test - void testDeleteClusterConfig() throws Exception { + void testResetClusterConfigs() throws Exception { // First set a config tEnv.executeSql( String.format( - "Call %s.sys.set_cluster_config('%s', 'paimon')", - CATALOG_NAME, ConfigOptions.DATALAKE_FORMAT.key())) + "Call %s.sys.set_cluster_configs('%s', 'paimon', '%s', '200MB')", + CATALOG_NAME, + ConfigOptions.DATALAKE_FORMAT.key(), + ConfigOptions.KV_SHARED_RATE_LIMITER_BYTES_PER_SEC.key())) .await(); - // Delete the config (reset to default) - omitting the value parameter + // Delete the config (reset to default) try (CloseableIterator resultIterator = tEnv.executeSql( String.format( - "Call %s.sys.set_cluster_config('%s')", - CATALOG_NAME, ConfigOptions.DATALAKE_FORMAT.key())) + "Call %s.sys.reset_cluster_configs('%s', '%s')", + CATALOG_NAME, + ConfigOptions.DATALAKE_FORMAT.key(), + ConfigOptions.KV_SHARED_RATE_LIMITER_BYTES_PER_SEC.key())) .collect()) { List results = CollectionUtil.iteratorToList(resultIterator); - assertThat(results).hasSize(1); + assertThat(results).hasSize(2); assertThat(results.get(0).getField(0)) .asString() .contains("Successfully deleted") .contains(ConfigOptions.DATALAKE_FORMAT.key()); + + assertThat(results.get(1).getField(0)) + .asString() + .contains("Successfully deleted") + .contains(ConfigOptions.KV_SHARED_RATE_LIMITER_BYTES_PER_SEC.key()); } } @Test - void testSetClusterConfigValidation() throws Exception { + void testSetClusterConfigsValidation() throws Exception { // Try to set an invalid config (not allowed for dynamic change) assertThatThrownBy( () -> tEnv.executeSql( String.format( - "Call %s.sys.set_cluster_config('invalid.config.key', 'value')", + "Call %s.sys.set_cluster_configs('invalid.config.key', 'value')", CATALOG_NAME)) .await()) .rootCause() + // TODO: Fix misleading error: non-existent key reported as not allowed. .hasMessageContaining( "The config key invalid.config.key is not allowed to be changed dynamically"); + + // validation to ensure an even number of arguments are passed + assertThatThrownBy( + () -> + tEnv.executeSql( + String.format( + "Call %s.sys.set_cluster_configs('%s')", + CATALOG_NAME, + ConfigOptions + .KV_SHARED_RATE_LIMITER_BYTES_PER_SEC + .key())) + .await()) + .rootCause() + .hasMessageContaining( + "config_pairs must be set in pairs. Please specify a valid configuration pairs"); + + // Try to no parameters passed + assertThatThrownBy( + () -> + tEnv.executeSql( + String.format( + "Call %s.sys.set_cluster_configs()", + CATALOG_NAME)) + .await()) + .rootCause() + .hasMessageContaining( + "config_pairs cannot be null or empty. Please specify a valid configuration pairs"); + + // Try to mismatched key-value pairs in the input parameters. + assertThatThrownBy( + () -> + tEnv.executeSql( + String.format( + "Call %s.sys.set_cluster_configs('%s', 'paimon')", + CATALOG_NAME, + ConfigOptions + .KV_SHARED_RATE_LIMITER_BYTES_PER_SEC + .key())) + .await()) + .rootCause() + .hasMessageContaining( + "Could not parse value 'paimon' for key 'kv.rocksdb.shared-rate-limiter.bytes-per-sec'"); + } + + @Test + void testResetClusterConfigsValidation() throws Exception { + // Try to reset an invalid config + assertThatThrownBy( + () -> + tEnv.executeSql( + String.format( + "Call %s.sys.reset_cluster_configs('invalid.config.key')", + CATALOG_NAME)) + .await()) + .rootCause() + // TODO: Fix misleading error: non-existent key reported as not allowed. + .hasMessageContaining( + "The config key invalid.config.key is not allowed to be changed dynamically"); + + // Try to no parameters passed + assertThatThrownBy( + () -> + tEnv.executeSql( + String.format( + "Call %s.sys.reset_cluster_configs()", + CATALOG_NAME)) + .await()) + .rootCause() + .hasMessageContaining( + "config_keys cannot be null or empty. Please specify valid configuration keys"); } @ParameterizedTest diff --git a/website/docs/engine-flink/procedures.md b/website/docs/engine-flink/procedures.md index 95e8724eb0..eeab4496fd 100644 --- a/website/docs/engine-flink/procedures.md +++ b/website/docs/engine-flink/procedures.md @@ -162,23 +162,23 @@ CALL sys.list_acl( Fluss provides procedures to dynamically manage cluster configurations without requiring a server restart. -### get_cluster_config +### get_cluster_configs Retrieve cluster configuration values. **Syntax:** ```sql --- Get a specific configuration -CALL [catalog_name.]sys.get_cluster_config(config_key => 'STRING') +-- Get multiple configurations +CALL [catalog_name.]sys.get_cluster_configs(config_keys => 'key1' [, 'key2', ...]) -- Get all cluster configurations -CALL [catalog_name.]sys.get_cluster_config() +CALL [catalog_name.]sys.get_cluster_configs() ``` **Parameters:** -- `config_key` (optional): The configuration key to retrieve. If omitted, returns all cluster configurations. +- `config_keys` (optional): The configuration keys to retrieve. If omitted, returns all cluster configurations. **Returns:** A table with columns: - `config_key`: The configuration key name @@ -192,35 +192,35 @@ CALL [catalog_name.]sys.get_cluster_config() USE fluss_catalog; -- Get a specific configuration -CALL sys.get_cluster_config( - config_key => 'kv.rocksdb.shared-rate-limiter.bytes-per-sec' +CALL sys.get_cluster_configs( + config_keys => 'kv.rocksdb.shared-rate-limiter.bytes-per-sec' +); + +-- Get multiple configuration +CALL sys.get_cluster_configs( + config_keys => 'kv.rocksdb.shared-rate-limiter.bytes-per-sec', 'datalake.format' ); -- Get all cluster configurations -CALL sys.get_cluster_config(); +CALL sys.get_cluster_configs(); ``` -### set_cluster_config +### set_cluster_configs -Set or delete a cluster configuration dynamically. +Set cluster configurations dynamically. **Syntax:** ```sql --- Set a configuration value -CALL [catalog_name.]sys.set_cluster_config( - config_key => 'STRING', - config_value => 'STRING' +-- Set configuration values +CALL [catalog_name.]sys.set_cluster_configs( + config_pairs => 'key1', 'value1' [, 'key2', 'value2' ...] ) - --- Delete a configuration (reset to default) -CALL [catalog_name.]sys.set_cluster_config(config_key => 'STRING') ``` **Parameters:** -- `config_key` (required): The configuration key to modify. -- `config_value` (optional): The new value to set. If omitted or empty, the configuration is deleted (reset to default). +- `config_pairs`(required): For key-value pairs in configuration items, the number of parameters must be even. **Important Notes:** @@ -236,20 +236,45 @@ CALL [catalog_name.]sys.set_cluster_config(config_key => 'STRING') USE fluss_catalog; -- Set RocksDB rate limiter -CALL sys.set_cluster_config( - config_key => 'kv.rocksdb.shared-rate-limiter.bytes-per-sec', - config_value => '200MB' +CALL sys.set_cluster_configs( + config_pairs => 'kv.rocksdb.shared-rate-limiter.bytes-per-sec', '200MB' ); --- Set datalake format -CALL sys.set_cluster_config( - config_key => 'datalake.format', - config_value => 'paimon' +-- Set RocksDB rate limiter and datalake format +CALL sys.set_cluster_configs( + config_pairs => 'kv.rocksdb.shared-rate-limiter.bytes-per-sec', '200MB', 'datalake.format','paimon' ); +``` --- Delete a configuration (reset to default) -CALL sys.set_cluster_config( - config_key => 'kv.rocksdb.shared-rate-limiter.bytes-per-sec' -); +### reset_cluster_configs + +reset cluster configurations dynamically. + +**Syntax:** + +```sql +-- reset configuration values +CALL [catalog_name.]sys.reset_cluster_configs(config_keys => 'key1' [, 'key2', ...]) ``` +**Parameters:** + +- `config_keys`(required): The configuration keys to reset. + + +**Example:** + +```sql title="Flink SQL" +-- Use the Fluss catalog (replace 'fluss_catalog' with your catalog name if different) +USE fluss_catalog; + +-- Reset a specific configuration +CALL sys.reset_cluster_configs( + config_keys => 'kv.rocksdb.shared-rate-limiter.bytes-per-sec' +); + +-- Reset RocksDB rate limiter and datalake format +CALL sys.reset_cluster_configs( + config_keys => 'kv.rocksdb.shared-rate-limiter.bytes-per-sec', 'datalake.format' +); +``` \ No newline at end of file