-
Notifications
You must be signed in to change notification settings - Fork 408
[CELEBORN-2230] SparkUtils#shouldReportShuffleFetchFailure method should retrieve the number of task failures from TaskSetManager #3556
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
base: main
Are you sure you want to change the base?
Conversation
… number of task failures from TaskSetManager
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Pull request overview
This PR improves the accuracy of task failure counting in the shouldReportShuffleFetchFailure method by retrieving failure counts directly from Spark's TaskSetManager instead of manually counting failed task attempts. This addresses an issue where the previous implementation incorrectly counted failures for tasks in the "FAILED" state, which Spark may not always count depending on the failure reason (e.g., container preemption).
Key changes:
- Added
getTaskFailureCountmethod to retrieve failure counts from TaskSetManager's internalnumFailuresarray - Removed manual failure counting logic that iterated through task attempts in "FAILED" and "UNKNOWN" states
- Updated logging messages to clearly distinguish between previous failure count and total failure count
Reviewed changes
Copilot reviewed 2 out of 2 changed files in this pull request and generated 2 comments.
| File | Description |
|---|---|
| client-spark/spark-3/src/main/java/org/apache/spark/shuffle/celeborn/SparkUtils.java | Added getTaskFailureCount helper method and refactored shouldReportShuffleFetchFailure to use TaskSetManager's failure count |
| client-spark/spark-2/src/main/java/org/apache/spark/shuffle/celeborn/SparkUtils.java | Same changes as spark-3 version, maintaining consistency across Spark versions |
💡 Add Copilot custom instructions for smarter, more guided reviews. Learn how to get started.
| if (previousFailureCount < 0) { | ||
| return true; | ||
| } | ||
| if (previousFailureCount + 1 >= maxTaskFails || !hasRunningAttempt) { |
Copilot
AI
Dec 6, 2025
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
[nitpick] The comment mentions comparing with (maxTaskFails - 1) for equivalence, but the actual code uses previousFailureCount + 1 >= maxTaskFails. While mathematically equivalent, the implementation differs from the explanation. Consider either updating the comment to match the code or adjusting the code to match the comment explanation for consistency.
| // so we compare with (maxTaskFails - 1) which is equivalent to | ||
| // (previousFailureCount + 1) >= maxTaskFails | ||
| int previousFailureCount = getTaskFailureCount(taskSetManager, taskInfo.index()); | ||
| if (previousFailureCount < 0) { |
Copilot
AI
Dec 6, 2025
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
The error handling for previousFailureCount < 0 returns true to trigger FetchFailed, but lacks a comment explaining this fail-safe behavior. Consider adding a brief comment explaining that returning true when failure count cannot be determined is a conservative safety measure to trigger FetchFailed and prevent silent failures.
| if (previousFailureCount < 0) { | |
| if (previousFailureCount < 0) { | |
| // Fail-safe: If the previous failure count cannot be determined, conservatively trigger FetchFailed | |
| // to prevent silent failures and ensure the error is handled. |
| * @return the number of previous failed attempts, or -1 if an error occurs | ||
| */ | ||
| @VisibleForTesting | ||
| protected static int getTaskFailureCount(TaskSetManager taskSetManager, int index) { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I saw it is VisibleForTesting.
Could you test it in the UT?
|
This PR is stale because it has been open 20 days with no activity. Remove stale label or comment or this will be closed in 10 days. |
What changes were proposed in this pull request?
Retrieve the number of task failures from TaskSetManager in SparkUtils#shouldReportShuffleFetchFailure method
Why are the changes needed?
https://github.com/apache/celeborn/blob/main/client-spark/spark-3/src/main/java/org/apache/spark/shuffle/celeborn/SparkUtils.java#L484 We record the failure counts for task attempts in the "UNKNOWN" and "FAILED" states, but spark might not record the failure counts for task attempts in the FAILED state. This is a common occurrence in our production environment where task attempts fail due to container preemption. This situation happens frequently and failure counts should not be recorded, as existing code logic makes it easier for stageRerun to be triggered prematurely. Therefore, obtaining the failure counts for task attempts from the taskSetManager would be more accurate.
Does this PR resolve a correctness bug?
No.
Does this PR introduce any user-facing change?
No.
How was this patch tested?
Existing UTs.