-
Notifications
You must be signed in to change notification settings - Fork 474
[server] Add rebalance id to trace rebalance task #2286
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Conversation
fbd1e6a to
1782631
Compare
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Pull request overview
This PR adds a rebalance ID field to trace rebalance tasks throughout the system. The implementation includes adding the rebalanceId field to RebalancePlan, updating serialization/deserialization logic, implementing comprehensive rebalance management functionality, and adding corresponding client APIs and Flink procedures.
Key Changes
- Added
rebalanceIdfield toRebalancePlanfor task tracing - Implemented
RebalanceManagerfor coordinating rebalance operations - Refactored replica leader election from enum-based to class-based strategy pattern
- Added Flink procedures for rebalance operations (rebalance, cancel, list progress)
Reviewed changes
Copilot reviewed 72 out of 72 changed files in this pull request and generated 10 comments.
Show a summary per file
| File | Description |
|---|---|
| RebalancePlan.java | Added rebalanceId field with getter and updated equals/hashCode |
| RebalancePlanJsonSerde.java | Updated JSON serialization to include rebalance_id field |
| ZooKeeperClient.java | Added updatePartitionAssignment and deleteRebalancePlan methods |
| RebalanceManager.java | New manager class for coordinating rebalance operations |
| CoordinatorEventProcessor.java | Added rebalance task execution logic and replica reassignment |
| ReplicaLeaderElection.java | Refactored from enum to class-based strategy pattern |
| Multiple test files | Added comprehensive test coverage for rebalance functionality |
| Flink procedures | Added procedures for rebalance, cancel, and list progress operations |
| Config files | Added TABLE_GENERATE_UNBALANCE_TABLE_ASSIGNMENT config option |
💡 Add Copilot custom instructions for smarter, more guided reviews. Learn how to get started.
| public void registerRebalance( | ||
| String rebalanceId, Map<TableBucket, RebalancePlanForBucket> rebalancePlan) { | ||
| checkNotClosed(); | ||
| registerTime = System.currentTimeMillis(); |
Copilot
AI
Jan 4, 2026
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
This field access (publicly accessible via this expression) is not protected by any monitor, but the class is annotated as @threadsafe.
| /** Test for {@link GoalOptimizer}. */ | ||
| public class GoalOptimizerTest { | ||
|
|
||
| private SortedSet<ServerModel> servers; |
Copilot
AI
Jan 4, 2026
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
The contents of this container are never accessed.
| } catch (Exception e) { | ||
| LOG.error( | ||
| "Failed to get rebalance plan from zookeeper, it will be treated as no" | ||
| + "rebalance tasks.", |
Copilot
AI
Jan 4, 2026
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
This string appears to be missing a space after 'no'.
| + "rebalance tasks.", | |
| + " rebalance tasks.", |
| rebalancePlan = | ||
| rebalanceManager.generateRebalancePlan(rebalanceEvent.getGoalsByPriority()); | ||
| } catch (Exception e) { | ||
| throw new RebalanceFailureException("Failed to generate rebalance plan.", e); |
Copilot
AI
Jan 4, 2026
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Access of element annotated with VisibleForTesting found in production code.
| throw new RebalanceFailureException( | ||
| "Rebalance task already exists. Please wait for it to finish or cancel it first."); |
Copilot
AI
Jan 4, 2026
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Access of element annotated with VisibleForTesting found in production code.
| throw new RebalanceFailureException( | ||
| "Rebalance task already exists. Please wait for it to finish or cancel it first."); |
Copilot
AI
Jan 4, 2026
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Access of element annotated with VisibleForTesting found in production code.
| } catch (Exception e) { | ||
| LOG.error("Error when register rebalance plan to zookeeper.", e); | ||
| throw new RebalanceFailureException( | ||
| "Error when register rebalance plan to zookeeper.", e); |
Copilot
AI
Jan 4, 2026
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Access of element annotated with VisibleForTesting found in production code.
| "Error when register rebalance plan to zookeeper.", e); | |
| "Error when register rebalance plan to zookeeper."); |
| throw new RebalanceFailureException( | ||
| String.format( | ||
| "[%s] All alive tabletServers are excluded from replica moves.", | ||
| name())); |
Copilot
AI
Jan 4, 2026
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Access of element annotated with VisibleForTesting found in production code.
| LOG.debug("Starting Optimizing for goal {}", name()); | ||
| // Initialize pre-optimized stats. | ||
| ClusterModelStats statsBeforeOptimization = clusterModel.getClusterStats(); | ||
| LOG.trace("[PRE - {}] {}", name(), statsBeforeOptimization); |
Copilot
AI
Jan 4, 2026
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Default toString(): ClusterModelStats inherits toString() from Object, and so is not suitable for printing.
| } | ||
|
|
||
| ClusterModelStats statsAfterOptimization = clusterModel.getClusterStats(); | ||
| LOG.trace("[POST - {}] {}", name(), statsAfterOptimization); |
Copilot
AI
Jan 4, 2026
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Default toString(): ClusterModelStats inherits toString() from Object, and so is not suitable for printing.
6396d5b to
f2133e8
Compare
f2133e8 to
654239f
Compare
Purpose
[server] Add rebalance id to trace rebalance task
Linked issue: close #2285
Brief change log
Tests
API and Format
Documentation