Skip to content

Conversation

@swuferhong
Copy link
Contributor

Purpose

[server] Add rebalance id to trace rebalance task

Linked issue: close #2285

Brief change log

Tests

API and Format

Documentation

Copy link

Copilot AI left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Pull request overview

This PR adds a rebalance ID field to trace rebalance tasks throughout the system. The implementation includes adding the rebalanceId field to RebalancePlan, updating serialization/deserialization logic, implementing comprehensive rebalance management functionality, and adding corresponding client APIs and Flink procedures.

Key Changes

  • Added rebalanceId field to RebalancePlan for task tracing
  • Implemented RebalanceManager for coordinating rebalance operations
  • Refactored replica leader election from enum-based to class-based strategy pattern
  • Added Flink procedures for rebalance operations (rebalance, cancel, list progress)

Reviewed changes

Copilot reviewed 72 out of 72 changed files in this pull request and generated 10 comments.

Show a summary per file
File Description
RebalancePlan.java Added rebalanceId field with getter and updated equals/hashCode
RebalancePlanJsonSerde.java Updated JSON serialization to include rebalance_id field
ZooKeeperClient.java Added updatePartitionAssignment and deleteRebalancePlan methods
RebalanceManager.java New manager class for coordinating rebalance operations
CoordinatorEventProcessor.java Added rebalance task execution logic and replica reassignment
ReplicaLeaderElection.java Refactored from enum to class-based strategy pattern
Multiple test files Added comprehensive test coverage for rebalance functionality
Flink procedures Added procedures for rebalance, cancel, and list progress operations
Config files Added TABLE_GENERATE_UNBALANCE_TABLE_ASSIGNMENT config option

💡 Add Copilot custom instructions for smarter, more guided reviews. Learn how to get started.

public void registerRebalance(
String rebalanceId, Map<TableBucket, RebalancePlanForBucket> rebalancePlan) {
checkNotClosed();
registerTime = System.currentTimeMillis();
Copy link

Copilot AI Jan 4, 2026

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This field access (publicly accessible via this expression) is not protected by any monitor, but the class is annotated as @threadsafe.

Copilot uses AI. Check for mistakes.
/** Test for {@link GoalOptimizer}. */
public class GoalOptimizerTest {

private SortedSet<ServerModel> servers;
Copy link

Copilot AI Jan 4, 2026

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The contents of this container are never accessed.

Copilot uses AI. Check for mistakes.
} catch (Exception e) {
LOG.error(
"Failed to get rebalance plan from zookeeper, it will be treated as no"
+ "rebalance tasks.",
Copy link

Copilot AI Jan 4, 2026

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This string appears to be missing a space after 'no'.

Suggested change
+ "rebalance tasks.",
+ " rebalance tasks.",

Copilot uses AI. Check for mistakes.
rebalancePlan =
rebalanceManager.generateRebalancePlan(rebalanceEvent.getGoalsByPriority());
} catch (Exception e) {
throw new RebalanceFailureException("Failed to generate rebalance plan.", e);
Copy link

Copilot AI Jan 4, 2026

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Access of element annotated with VisibleForTesting found in production code.

Copilot uses AI. Check for mistakes.
Comment on lines +1157 to +1161
throw new RebalanceFailureException(
"Rebalance task already exists. Please wait for it to finish or cancel it first.");
Copy link

Copilot AI Jan 4, 2026

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Access of element annotated with VisibleForTesting found in production code.

Copilot uses AI. Check for mistakes.
Comment on lines +162 to +163
throw new RebalanceFailureException(
"Rebalance task already exists. Please wait for it to finish or cancel it first.");
Copy link

Copilot AI Jan 4, 2026

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Access of element annotated with VisibleForTesting found in production code.

Copilot uses AI. Check for mistakes.
} catch (Exception e) {
LOG.error("Error when register rebalance plan to zookeeper.", e);
throw new RebalanceFailureException(
"Error when register rebalance plan to zookeeper.", e);
Copy link

Copilot AI Jan 4, 2026

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Access of element annotated with VisibleForTesting found in production code.

Suggested change
"Error when register rebalance plan to zookeeper.", e);
"Error when register rebalance plan to zookeeper.");

Copilot uses AI. Check for mistakes.
Comment on lines +98 to +101
throw new RebalanceFailureException(
String.format(
"[%s] All alive tabletServers are excluded from replica moves.",
name()));
Copy link

Copilot AI Jan 4, 2026

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Access of element annotated with VisibleForTesting found in production code.

Copilot uses AI. Check for mistakes.
LOG.debug("Starting Optimizing for goal {}", name());
// Initialize pre-optimized stats.
ClusterModelStats statsBeforeOptimization = clusterModel.getClusterStats();
LOG.trace("[PRE - {}] {}", name(), statsBeforeOptimization);
Copy link

Copilot AI Jan 4, 2026

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Default toString(): ClusterModelStats inherits toString() from Object, and so is not suitable for printing.

Copilot uses AI. Check for mistakes.
}

ClusterModelStats statsAfterOptimization = clusterModel.getClusterStats();
LOG.trace("[POST - {}] {}", name(), statsAfterOptimization);
Copy link

Copilot AI Jan 4, 2026

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Default toString(): ClusterModelStats inherits toString() from Object, and so is not suitable for printing.

Copilot uses AI. Check for mistakes.
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Labels

None yet

Projects

None yet

Development

Successfully merging this pull request may close these issues.

Add rebalanceId to trace the rebalance task

1 participant