Skip to content
Draft
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -30,54 +30,61 @@
import java.util.ArrayList;
import java.util.Collection;
import java.util.List;
import java.util.Map;
import java.util.function.Function;
import java.util.stream.Collectors;

/**
* Procedure to get cluster configuration(s).
*
* <p>This procedure allows querying dynamic cluster configurations. It can retrieve:
*
* <ul>
* <li>A specific configuration key
* <li>multiple configurations
* <li>All configurations (when key parameter is null or empty)
* </ul>
*
* <p>Usage examples:
*
* <pre>
* -- Get a specific configuration
* CALL sys.get_cluster_config('kv.rocksdb.shared-rate-limiter.bytes-per-sec');
* CALL sys.get_cluster_configs('kv.rocksdb.shared-rate-limiter.bytes-per-sec');
*
* -- Get multiple configurations
* CALL sys.get_cluster_configs('kv.rocksdb.shared-rate-limiter.bytes-per-sec', 'datalake.format');
*
* -- Get all cluster configurations
* CALL sys.get_cluster_config();
* CALL sys.get_cluster_configs();
* </pre>
*/
public class GetClusterConfigProcedure extends ProcedureBase {
public class GetClusterConfigsProcedure extends ProcedureBase {

@ProcedureHint(
output =
@DataTypeHint(
"ROW<config_key STRING, config_value STRING, config_source STRING>"))
public Row[] call(ProcedureContext context) throws Exception {
return getConfigs(null);
return getConfigs();
}

@ProcedureHint(
argument = {@ArgumentHint(name = "config_key", type = @DataTypeHint("STRING"))},
isVarArgs = true,
output =
@DataTypeHint(
"ROW<config_key STRING, config_value STRING, config_source STRING>"))
public Row[] call(ProcedureContext context, String configKey) throws Exception {
public Row[] call(ProcedureContext context, String... configKey) throws Exception {
return getConfigs(configKey);
}

private Row[] getConfigs(@Nullable String configKey) throws Exception {
private Row[] getConfigs(@Nullable String... configKeys) throws Exception {
try {
// Get all cluster configurations
Collection<ConfigEntry> configs = admin.describeClusterConfigs().get();

List<Row> results = new ArrayList<>();

if (configKey == null || configKey.isEmpty()) {
if (configKeys == null || configKeys.length == 0) {
// Return all configurations
for (ConfigEntry entry : configs) {
results.add(
Expand All @@ -87,17 +94,23 @@ private Row[] getConfigs(@Nullable String configKey) throws Exception {
entry.source() != null ? entry.source().name() : "UNKNOWN"));
}
} else {
// Find specific configuration
for (ConfigEntry entry : configs) {
if (entry.key().equals(configKey)) {
// Find configurations
// The order of the results is the same as that of the key.
Map<String, ConfigEntry> configEntryMap =
configs.stream()
.collect(Collectors.toMap(ConfigEntry::key, Function.identity()));
for (String key : configKeys) {
ConfigEntry entry = configEntryMap.get(key);
if (null != entry) {
results.add(
Row.of(
entry.key(),
entry.value(),
entry.source() != null
? entry.source().name()
: "UNKNOWN"));
break;
} else {
results.add(Row.of());
}
}
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -70,8 +70,9 @@ private enum ProcedureEnum {
ADD_ACL("sys.add_acl", AddAclProcedure.class),
DROP_ACL("sys.drop_acl", DropAclProcedure.class),
List_ACL("sys.list_acl", ListAclProcedure.class),
SET_CLUSTER_CONFIG("sys.set_cluster_config", SetClusterConfigProcedure.class),
GET_CLUSTER_CONFIG("sys.get_cluster_config", GetClusterConfigProcedure.class),
SET_CLUSTER_CONFIGS("sys.set_cluster_configs", SetClusterConfigsProcedure.class),
GET_CLUSTER_CONFIGS("sys.get_cluster_configs", GetClusterConfigsProcedure.class),
RESET_CLUSTER_CONFIGS("sys.reset_cluster_configs", ResetClusterConfigsProcedure.class),
ADD_SERVER_TAG("sys.add_server_tag", AddServerTagProcedure.class),
REMOVE_SERVER_TAG("sys.remove_server_tag", RemoveServerTagProcedure.class);

Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,108 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package org.apache.fluss.flink.procedure;

import org.apache.fluss.config.cluster.AlterConfig;
import org.apache.fluss.config.cluster.AlterConfigOpType;

import org.apache.flink.table.annotation.ArgumentHint;
import org.apache.flink.table.annotation.DataTypeHint;
import org.apache.flink.table.annotation.ProcedureHint;
import org.apache.flink.table.procedure.ProcedureContext;

import java.util.ArrayList;
import java.util.List;

/**
* Procedure to reset cluster configuration dynamically.
*
* <p>This procedure allows modifying dynamic cluster configurations. The changes are:
*
* <ul>
* <li>Validated by the CoordinatorServer before persistence
* <li>Persisted in ZooKeeper for durability
* <li>Applied to all relevant servers (Coordinator and TabletServers)
* <li>Survive server restarts
* </ul>
*
* <p>Usage examples:
*
* <pre>
* -- reset a configuration
* CALL sys.reset_cluster_configs('kv.rocksdb.shared-rate-limiter.bytes-per-sec');
*
* -- reset multiple configurations at one time
* CALL sys.reset_cluster_configs('kv.rocksdb.shared-rate-limiter.bytes-per-sec','datalake.format');
*
* </pre>
*
* <p><b>Note:</b> Not all configurations support dynamic changes. The server will validate the
* change and reject it if the configuration cannot be reset dynamically.
*/
public class ResetClusterConfigsProcedure extends ProcedureBase {

@ProcedureHint(
argument = {@ArgumentHint(name = "config_keys", type = @DataTypeHint("STRING"))},
isVarArgs = true)
public String[] call(ProcedureContext context, String... configKeys) throws Exception {
try {
// Validate config key
if (configKeys.length == 0) {
throw new IllegalArgumentException(
"config_pairs cannot be null or empty. "
+ "Please specify valid configuration keys.");
}

List<AlterConfig> configList = new ArrayList<>();
StringBuilder resultMessage = new StringBuilder();

for (String key : configKeys) {
String configKey = key.trim();
if (configKey.isEmpty()) {
throw new IllegalArgumentException(
"Config key cannot be null or empty. "
+ "Please specify valid configuration key.");
}

String operationDesc = "deleted (reset to default)";

AlterConfig alterConfig =
new AlterConfig(configKey, null, AlterConfigOpType.DELETE);
configList.add(alterConfig);
resultMessage.append(
String.format(
"Successfully %s configuration '%s'. ", operationDesc, configKey));
}

// Call Admin API to modify cluster configuration
// This will trigger validation on CoordinatorServer before persistence
admin.alterClusterConfigs(configList).get();

return new String[] {
resultMessage + "The change is persisted in ZooKeeper and applied to all servers."
};
} catch (IllegalArgumentException e) {
// Re-throw validation errors with original message
throw e;
} catch (Exception e) {
// Wrap other exceptions with more context
throw new RuntimeException(
String.format("Failed to reset cluster config: %s", e.getMessage()), e);
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -25,9 +25,8 @@
import org.apache.flink.table.annotation.ProcedureHint;
import org.apache.flink.table.procedure.ProcedureContext;

import javax.annotation.Nullable;

import java.util.Collections;
import java.util.ArrayList;
import java.util.List;

/**
* Procedure to set or delete cluster configuration dynamically.
Expand All @@ -45,84 +44,76 @@
*
* <pre>
* -- Set a configuration
* CALL sys.set_cluster_config('kv.rocksdb.shared-rate-limiter.bytes-per-sec', '200MB');
* CALL sys.set_cluster_config('datalake.format', 'paimon');
* CALL sys.set_cluster_configs('kv.rocksdb.shared-rate-limiter.bytes-per-sec', '200MB');
* CALL sys.set_cluster_configs('datalake.format', 'paimon');
*
* -- Set multiple configurations at one time
* CALL sys.set_cluster_configs('kv.rocksdb.shared-rate-limiter.bytes-per-sec', '200MB','datalake.format', 'paimon');
*
* -- Delete a configuration (reset to default)
* CALL sys.set_cluster_config('kv.rocksdb.shared-rate-limiter.bytes-per-sec', NULL);
* CALL sys.set_cluster_config('kv.rocksdb.shared-rate-limiter.bytes-per-sec', '');
* CALL sys.set_cluster_configs('kv.rocksdb.shared-rate-limiter.bytes-per-sec', '');
* </pre>
*
* <p><b>Note:</b> Not all configurations support dynamic changes. The server will validate the
* change and reject it if the configuration cannot be modified dynamically or if the new value is
* invalid.
*/
public class SetClusterConfigProcedure extends ProcedureBase {

@ProcedureHint(argument = {@ArgumentHint(name = "config_key", type = @DataTypeHint("STRING"))})
public String[] call(ProcedureContext context, String configKey) throws Exception {
return performSet(configKey, null);
}
public class SetClusterConfigsProcedure extends ProcedureBase {

@ProcedureHint(
argument = {
@ArgumentHint(name = "config_key", type = @DataTypeHint("STRING")),
@ArgumentHint(name = "config_value", type = @DataTypeHint("STRING"))
})
public String[] call(ProcedureContext context, String configKey, String configValue)
throws Exception {
return performSet(configKey, configValue);
}

private String[] performSet(String configKey, @Nullable String configValue) throws Exception {

argument = {@ArgumentHint(name = "config_pairs", type = @DataTypeHint("STRING"))},
isVarArgs = true)
public String[] call(ProcedureContext context, String... configPairs) throws Exception {
try {
// Validate config key
if (configKey == null || configKey.trim().isEmpty()) {
if (configPairs.length == 0) {
throw new IllegalArgumentException(
"Config key cannot be null or empty. "
+ "Please specify a valid configuration key.");
"config_pairs cannot be null or empty. "
+ "Please specify a valid configuration pairs.");
}

configKey = configKey.trim();

// Determine operation type
AlterConfigOpType opType;
String operationDesc;

if (configValue == null || configValue.trim().isEmpty()) {
// Delete operation - reset to default
opType = AlterConfigOpType.DELETE;
operationDesc = "deleted (reset to default)";
} else {
// Set operation
opType = AlterConfigOpType.SET;
operationDesc = String.format("set to '%s'", configValue);
if (configPairs.length % 2 != 0) {
throw new IllegalArgumentException(
"config_pairs must be set in pairs. "
+ "Please specify a valid configuration pairs.");
}
List<AlterConfig> configList = new ArrayList<>();
StringBuilder resultMessage = new StringBuilder();

for (int i = 0; i < configPairs.length; i += 2) {
String configKey = configPairs[i].trim();
if (configKey.isEmpty()) {
throw new IllegalArgumentException(
"Config key cannot be null or empty. "
+ "Please specify a valid configuration key.");
}
String configValue = configPairs[i + 1];

String operationDesc = String.format("set to '%s'", configValue);

// Construct configuration modification operation.
AlterConfig alterConfig =
new AlterConfig(configKey, configValue, AlterConfigOpType.SET);
configList.add(alterConfig);
resultMessage.append(
String.format(
"Successfully %s configuration '%s'. ", operationDesc, configKey));
}

// Construct configuration modification operation.
AlterConfig alterConfig = new AlterConfig(configKey, configValue, opType);

// Call Admin API to modify cluster configuration
// This will trigger validation on CoordinatorServer before persistence
admin.alterClusterConfigs(Collections.singletonList(alterConfig)).get();
admin.alterClusterConfigs(configList).get();

return new String[] {
String.format(
"Successfully %s configuration '%s'. "
+ "The change is persisted in ZooKeeper and applied to all servers.",
operationDesc, configKey)
resultMessage + "The change is persisted in ZooKeeper and applied to all servers."
};

} catch (IllegalArgumentException e) {
// Re-throw validation errors with original message
throw e;
} catch (Exception e) {
// Wrap other exceptions with more context
throw new RuntimeException(
String.format(
"Failed to set cluster config '%s': %s", configKey, e.getMessage()),
e);
String.format("Failed to set cluster config: %s", e.getMessage()), e);
}
}
}
Loading
Loading