Skip to content

Conversation

@leixm
Copy link
Contributor

@leixm leixm commented Dec 4, 2025

What changes were proposed in this pull request?

Retrieve the number of task failures from TaskSetManager in SparkUtils#shouldReportShuffleFetchFailure method

Why are the changes needed?

https://github.com/apache/celeborn/blob/main/client-spark/spark-3/src/main/java/org/apache/spark/shuffle/celeborn/SparkUtils.java#L484 We record the failure counts for task attempts in the "UNKNOWN" and "FAILED" states, but spark might not record the failure counts for task attempts in the FAILED state. This is a common occurrence in our production environment where task attempts fail due to container preemption. This situation happens frequently and failure counts should not be recorded, as existing code logic makes it easier for stageRerun to be triggered prematurely. Therefore, obtaining the failure counts for task attempts from the taskSetManager would be more accurate.

Does this PR resolve a correctness bug?

No.

Does this PR introduce any user-facing change?

No.

How was this patch tested?

Existing UTs.

… number of task failures from TaskSetManager
@leixm
Copy link
Contributor Author

leixm commented Dec 4, 2025

cc @AngersZhuuuu @turboFei @RexXiong

@turboFei turboFei requested a review from Copilot December 6, 2025 07:14
Copy link

Copilot AI left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Pull request overview

This PR improves the accuracy of task failure counting in the shouldReportShuffleFetchFailure method by retrieving failure counts directly from Spark's TaskSetManager instead of manually counting failed task attempts. This addresses an issue where the previous implementation incorrectly counted failures for tasks in the "FAILED" state, which Spark may not always count depending on the failure reason (e.g., container preemption).

Key changes:

  • Added getTaskFailureCount method to retrieve failure counts from TaskSetManager's internal numFailures array
  • Removed manual failure counting logic that iterated through task attempts in "FAILED" and "UNKNOWN" states
  • Updated logging messages to clearly distinguish between previous failure count and total failure count

Reviewed changes

Copilot reviewed 2 out of 2 changed files in this pull request and generated 2 comments.

File Description
client-spark/spark-3/src/main/java/org/apache/spark/shuffle/celeborn/SparkUtils.java Added getTaskFailureCount helper method and refactored shouldReportShuffleFetchFailure to use TaskSetManager's failure count
client-spark/spark-2/src/main/java/org/apache/spark/shuffle/celeborn/SparkUtils.java Same changes as spark-3 version, maintaining consistency across Spark versions

💡 Add Copilot custom instructions for smarter, more guided reviews. Learn how to get started.

if (previousFailureCount < 0) {
return true;
}
if (previousFailureCount + 1 >= maxTaskFails || !hasRunningAttempt) {
Copy link

Copilot AI Dec 6, 2025

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

[nitpick] The comment mentions comparing with (maxTaskFails - 1) for equivalence, but the actual code uses previousFailureCount + 1 >= maxTaskFails. While mathematically equivalent, the implementation differs from the explanation. Consider either updating the comment to match the code or adjusting the code to match the comment explanation for consistency.

Copilot uses AI. Check for mistakes.
// so we compare with (maxTaskFails - 1) which is equivalent to
// (previousFailureCount + 1) >= maxTaskFails
int previousFailureCount = getTaskFailureCount(taskSetManager, taskInfo.index());
if (previousFailureCount < 0) {
Copy link

Copilot AI Dec 6, 2025

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The error handling for previousFailureCount < 0 returns true to trigger FetchFailed, but lacks a comment explaining this fail-safe behavior. Consider adding a brief comment explaining that returning true when failure count cannot be determined is a conservative safety measure to trigger FetchFailed and prevent silent failures.

Suggested change
if (previousFailureCount < 0) {
if (previousFailureCount < 0) {
// Fail-safe: If the previous failure count cannot be determined, conservatively trigger FetchFailed
// to prevent silent failures and ensure the error is handled.

Copilot uses AI. Check for mistakes.
* @return the number of previous failed attempts, or -1 if an error occurs
*/
@VisibleForTesting
protected static int getTaskFailureCount(TaskSetManager taskSetManager, int index) {
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I saw it is VisibleForTesting.

Could you test it in the UT?

@github-actions
Copy link

github-actions bot commented Jan 6, 2026

This PR is stale because it has been open 20 days with no activity. Remove stale label or comment or this will be closed in 10 days.

@github-actions github-actions bot added the stale label Jan 6, 2026
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Projects

None yet

Development

Successfully merging this pull request may close these issues.

2 participants