From 7ea62d88aa5352c7ca0c3c4ec737cb22d29d2f1a Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?oliver=20k=C3=B6nig?= Date: Thu, 11 Dec 2025 04:09:33 +0000 Subject: [PATCH 01/31] pass dgxc to ft_launcher MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit Signed-off-by: oliver könig --- nemo_run/run/torchx_backend/packaging.py | 1 + 1 file changed, 1 insertion(+) diff --git a/nemo_run/run/torchx_backend/packaging.py b/nemo_run/run/torchx_backend/packaging.py index 84b9dc4c..8a850de4 100644 --- a/nemo_run/run/torchx_backend/packaging.py +++ b/nemo_run/run/torchx_backend/packaging.py @@ -203,6 +203,7 @@ def _get_details_from_script(fn_or_script: Script, serialize_configs: bool): log_level=launcher.log_level, max_retries=executor.retries, max_restarts=launcher.max_restarts, + dgxc=isinstance(executor, DGXCloudExecutor), use_env=use_env, ) else: From 02efa9fc013b2049ffa4c64f5df346882c5fc7cd Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?oliver=20k=C3=B6nig?= Date: Thu, 11 Dec 2025 18:13:51 +0000 Subject: [PATCH 02/31] feat: Add FT to DGXC MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit Signed-off-by: oliver könig --- nemo_run/core/execution/dgxcloud.py | 56 ++++++++++++++- nemo_run/core/execution/templates/dgxc.sh.j2 | 47 +++++++++++++ .../execution/templates/ft_launcher_dgxc.j2 | 69 +++++++++++++++++++ .../{ft_launcher.j2 => ft_launcher_slurm.j2} | 0 nemo_run/core/execution/templates/slurm.sh.j2 | 2 +- .../run/torchx_backend/schedulers/dgxcloud.py | 23 ++++++- 6 files changed, 193 insertions(+), 4 deletions(-) create mode 100644 nemo_run/core/execution/templates/dgxc.sh.j2 create mode 100644 nemo_run/core/execution/templates/ft_launcher_dgxc.j2 rename nemo_run/core/execution/templates/{ft_launcher.j2 => ft_launcher_slurm.j2} (100%) diff --git a/nemo_run/core/execution/dgxcloud.py b/nemo_run/core/execution/dgxcloud.py index 13596ebf..d22c0b20 100644 --- a/nemo_run/core/execution/dgxcloud.py +++ b/nemo_run/core/execution/dgxcloud.py @@ -21,7 +21,7 @@ import subprocess import tempfile import time -from dataclasses import dataclass, field +from dataclasses import asdict, dataclass, field from enum import Enum from pathlib import Path from typing import Any, Iterable, Optional @@ -31,6 +31,8 @@ from nemo_run.config import get_nemorun_home from nemo_run.core.execution.base import Executor, ExecutorMacros +from nemo_run.core.execution.launcher import FaultTolerance, Launcher +from nemo_run.core.execution.utils import fill_template from nemo_run.core.packaging.base import Packager from nemo_run.core.packaging.git import GitArchivePackager @@ -556,3 +558,55 @@ def _default_headers(self, token: Optional[str] = None) -> dict: if token: headers["Authorization"] = f"Bearer {token}" return headers + + +@dataclass(kw_only=True) +class DGXCloudRequest: + launch_cmd: list[str] + jobs: list[str] + executor: DGXCloudExecutor + max_retries: int + extra_env: dict[str, str] + launcher: Optional[Launcher] = None + + def materialize(self) -> str: + """Creates the content of a DGXC entrypoint script.""" + + # 1. Environment Variables + # Combine executor defaults with extra envs + env_vars = [] + full_env_vars = self.executor.env_vars | self.extra_env + for key, value in full_env_vars.items(): + env_vars.append(f"export {key.upper()}={value}") + + # 3. Prepare Template Variables + vars_to_fill = { + "max_retries": self.max_retries, + "env_vars": env_vars, + "training_command": " ".join(self.launch_cmd), + "ft_enabled": self.launcher and isinstance(self.launcher, FaultTolerance), + } + + # 4. Fault Tolerance Injection + if self.launcher and isinstance(self.launcher, FaultTolerance): + assert ( + self.launcher.cfg_path + and self.launcher.finished_flag_file + and self.launcher.job_results_file + ), "Fault Tolerance requires cfg_path, finished_flag_file, and job_results_file" + + vars_to_fill["fault_tol_cfg_path"] = self.launcher.cfg_path + vars_to_fill["fault_tol_finished_flag_file"] = self.launcher.finished_flag_file + vars_to_fill["fault_tol_job_results_file"] = self.launcher.job_results_file + + # Render the template + entrypoint_script = fill_template("dgxc.sh.j2", vars_to_fill) + return entrypoint_script + + def __repr__(self) -> str: + return f"""# DGXC Entrypoint Script Request +# Executor: {self.executor.__class__.__name__} +# Jobs: {self.jobs} +# --------------------------------------------------- +{self.materialize()} +""" diff --git a/nemo_run/core/execution/templates/dgxc.sh.j2 b/nemo_run/core/execution/templates/dgxc.sh.j2 new file mode 100644 index 00000000..be5dee05 --- /dev/null +++ b/nemo_run/core/execution/templates/dgxc.sh.j2 @@ -0,0 +1,47 @@ +{%- import "ft_launcher_k8s.j2" as fault_tolerance -%} +#!/bin/bash +# +# Generated by NeMo Run for Kubernetes (PyTorchJob) +# + +# 1. Basic Shell Setup +set -evx # Print commands, but DO NOT exit immediately on error (we handle that below) +export PYTHONUNBUFFERED=1 +export TORCHX_MAX_RETRIES={{max_retries}} + +# 2. Environment Variables +# These are strictly user-defined vars (e.g. HYDRA_FULL_ERROR). +# Note: MASTER_ADDR, WORLD_SIZE, RANK are injected automatically by the PyTorchJob operator. +{%- for env_var in env_vars %} +{{env_var}} +{%- endfor %} + +# 3. Fault Tolerance: SETUP (Check-in) +# Checks if we are resuming or if we are already finished. +{%- if ft_enabled %} +{{ fault_tolerance.ft_launcher_setup(fault_tol_cfg_path, fault_tol_finished_flag_file, fault_tol_job_results_file) }} +{%- endif %} + +# 4. Main Execution +# In PyTorchJob, we usually have exactly one main command (torchrun). +# We assume the variable 'training_command' contains the full torchrun string. + +echo "Starting training command..." +set +e # Turn off auto-exit so we can capture the code +# --------------------------------------------------------- +{{ training_command }} +# --------------------------------------------------------- +exitcode=$? +set -e + +echo "Main command exited with code $exitcode" + +# 5. Fault Tolerance: TEARDOWN (Check-out) +# Decides if we should exit 0 (complete) or exit 1 (retry via K8s backoffLimit). +{%- if ft_enabled %} +{{ fault_tolerance.ft_launcher_teardown() }} +{%- else %} +# If FT is disabled, simply pass the exit code through. +# K8s will restart if exitcode != 0 and backoffLimit > 0. +exit $exitcode +{%- endif %} diff --git a/nemo_run/core/execution/templates/ft_launcher_dgxc.j2 b/nemo_run/core/execution/templates/ft_launcher_dgxc.j2 new file mode 100644 index 00000000..2b1546f8 --- /dev/null +++ b/nemo_run/core/execution/templates/ft_launcher_dgxc.j2 @@ -0,0 +1,69 @@ +{% macro ft_launcher_setup(fault_tol_cfg_path, fault_tol_finished_flag_file, fault_tol_job_results_file) -%} +# ------------------------------------------------------------------------- +# K8s Fault Tolerance Setup (The "Check-In" Desk) +# ------------------------------------------------------------------------- + +# 1. Export Paths +# IMPORTANT: These paths must reside on a ReadWriteMany (RWX) Persistent Volume +# mounted to all Pods so state is preserved across pod restarts/rescheduling. +export FAULT_TOL_CFG_PATH="{{fault_tol_cfg_path}}" +export FAULT_TOL_FINISHED_FLAG_FILE="{{fault_tol_finished_flag_file}}" +export FAULT_TOL_JOB_RESULTS_FILE="{{fault_tol_job_results_file}}" + +# 2. Define Helper Functions +is_training_finished() { + test -f "$FAULT_TOL_FINISHED_FLAG_FILE" +} + +# 3. Check for Previous Success +# In K8s, a Pod might be restarted due to node maintenance even if the job +# logic was done. If the flag file exists, we exit immediately with 0. +if is_training_finished ; then + echo "[FT-Setup] Found finished flag at $FAULT_TOL_FINISHED_FLAG_FILE." + echo "[FT-Setup] Training is already complete. Exiting successfully." + exit 0 +fi + +# 4. Logging Start +# We use HOSTNAME (usually pod-name) as the identifier since SLURM_JOB_ID is gone. +# We append 'X' (Running/Unknown) to the log. +echo "[FT-Setup] Starting training on $(hostname)..." +# Optional: Log attempt to shared file (Using X for Running) +# Note: In high-scale K8s, writing to a single file from 1000 pods can cause lock contention. +# If scale is small, this is fine. +if [ -n "$FAULT_TOL_JOB_RESULTS_FILE" ]; then + echo "$(hostname) $(date +%s) X" >> "$FAULT_TOL_JOB_RESULTS_FILE" +fi + +{%- endmacro %} + +{% macro ft_launcher_teardown() -%} +# ------------------------------------------------------------------------- +# K8s Fault Tolerance Teardown (The "Check-Out" Desk) +# ------------------------------------------------------------------------- + +# 1. Analyze Exit Code from the Main Command +# 'exitcode' is captured in the main script before calling this macro. +if [ "$exitcode" -eq "0" ]; then + RESULT_STATUS="S" # Success +else + RESULT_STATUS="F" # Failure +fi + +# 2. Update Log (Optional but helpful for debugging) +if [ -n "$FAULT_TOL_JOB_RESULTS_FILE" ]; then + # We update the specific entry for this host from X to S or F + # Note: 'sed -i' on a shared PVC can be risky with concurrency. + # Appending a new status line is safer in K8s. + echo "$(hostname) $(date +%s) $RESULT_STATUS" >> "$FAULT_TOL_JOB_RESULTS_FILE" +fi + +# 3. The Requeue Decision Logic +if [ "$exitcode" -eq "0" ]; then + # Case A: Script exited successfully. + # Verification: Did it actually finish (create the flag file)? + if is_training_finished; then + echo "[FT-Teardown] Job finished successfully and flag file exists." + exit 0 + else + # Edge Case: The python script exited 0, but didn't write the flag diff --git a/nemo_run/core/execution/templates/ft_launcher.j2 b/nemo_run/core/execution/templates/ft_launcher_slurm.j2 similarity index 100% rename from nemo_run/core/execution/templates/ft_launcher.j2 rename to nemo_run/core/execution/templates/ft_launcher_slurm.j2 diff --git a/nemo_run/core/execution/templates/slurm.sh.j2 b/nemo_run/core/execution/templates/slurm.sh.j2 index 26f756fa..dc2c93fa 100644 --- a/nemo_run/core/execution/templates/slurm.sh.j2 +++ b/nemo_run/core/execution/templates/slurm.sh.j2 @@ -1,4 +1,4 @@ -{%- import "ft_launcher.j2" as fault_tolerance -%} +{%- import "ft_launcher_slurm.j2" as fault_tolerance -%} #!/bin/bash # # Generated by NeMo Run diff --git a/nemo_run/run/torchx_backend/schedulers/dgxcloud.py b/nemo_run/run/torchx_backend/schedulers/dgxcloud.py index 4377ec71..b8baeed6 100644 --- a/nemo_run/run/torchx_backend/schedulers/dgxcloud.py +++ b/nemo_run/run/torchx_backend/schedulers/dgxcloud.py @@ -37,7 +37,7 @@ from nemo_run.config import get_nemorun_home from nemo_run.core.execution.base import Executor -from nemo_run.core.execution.dgxcloud import DGXCloudExecutor, DGXCloudState +from nemo_run.core.execution.dgxcloud import DGXCloudExecutor, DGXCloudRequest, DGXCloudState from nemo_run.core.serialization.zlib_json import ZlibJSONSerializer from nemo_run.run.torchx_backend.schedulers.api import SchedulerMixin @@ -109,6 +109,23 @@ def _submit_dryrun( # type: ignore role = values.apply(role) cmd = [role.entrypoint] + role.args + + req = DGXCloudRequest( + launch_cmd=cmd, + jobs=[role.name], + executor=executor, + max_retries=role.max_retries, + extra_env=role.env, + launcher=executor.get_launcher(), + ) + + # Write and copy sbatch script + path = os.path.join(executor.experiment_dir, f"{executor.job_name}_job.sh") + script = req.materialize() + + with open(path, "w") as f: + f.write(script) + return AppDryRunInfo( DGXRequest(app=app, executor=executor, cmd=cmd, name=role.name), # Minimal function to show the config, if any @@ -128,7 +145,9 @@ def schedule(self, dryrun_info: AppDryRunInfo[DGXRequest]) -> str: # The DGXExecutor's launch call typically returns (job_id, handle). # We'll call it without additional parameters here. - job_id, status = executor.launch(name=req.name, cmd=req.cmd) + cmd = os.path.join(executor.experiment_dir, f"{executor.job_name}_job.sh") + req.launch_cmd = ["bash", cmd] + job_id, status = executor.launch(name=req.name, cmd=req.launch_cmd) if not job_id: raise RuntimeError("Failed scheduling run on DGX: no job_id returned") From 0be1693fd5015ed93e8bf219e07ccc632bbd2fa0 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?oliver=20k=C3=B6nig?= Date: Thu, 11 Dec 2025 18:16:37 +0000 Subject: [PATCH 03/31] torchrun_job MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit Signed-off-by: oliver könig --- nemo_run/run/torchx_backend/schedulers/dgxcloud.py | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/nemo_run/run/torchx_backend/schedulers/dgxcloud.py b/nemo_run/run/torchx_backend/schedulers/dgxcloud.py index b8baeed6..b786d3c0 100644 --- a/nemo_run/run/torchx_backend/schedulers/dgxcloud.py +++ b/nemo_run/run/torchx_backend/schedulers/dgxcloud.py @@ -120,7 +120,7 @@ def _submit_dryrun( # type: ignore ) # Write and copy sbatch script - path = os.path.join(executor.experiment_dir, f"{executor.job_name}_job.sh") + path = os.path.join(executor.experiment_dir, "torchrun_job.sh") script = req.materialize() with open(path, "w") as f: @@ -145,7 +145,7 @@ def schedule(self, dryrun_info: AppDryRunInfo[DGXRequest]) -> str: # The DGXExecutor's launch call typically returns (job_id, handle). # We'll call it without additional parameters here. - cmd = os.path.join(executor.experiment_dir, f"{executor.job_name}_job.sh") + cmd = os.path.join(executor.experiment_dir, "torchrun_job.sh") req.launch_cmd = ["bash", cmd] job_id, status = executor.launch(name=req.name, cmd=req.launch_cmd) if not job_id: From 232cef796e65f2b9ac04501c9a2f21d2a106c769 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?oliver=20k=C3=B6nig?= Date: Thu, 11 Dec 2025 18:19:25 +0000 Subject: [PATCH 04/31] fix MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit Signed-off-by: oliver könig --- nemo_run/core/execution/templates/dgxc.sh.j2 | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/nemo_run/core/execution/templates/dgxc.sh.j2 b/nemo_run/core/execution/templates/dgxc.sh.j2 index be5dee05..1a8c000a 100644 --- a/nemo_run/core/execution/templates/dgxc.sh.j2 +++ b/nemo_run/core/execution/templates/dgxc.sh.j2 @@ -1,4 +1,4 @@ -{%- import "ft_launcher_k8s.j2" as fault_tolerance -%} +{%- import "ft_launcher_dgxc.j2" as fault_tolerance -%} #!/bin/bash # # Generated by NeMo Run for Kubernetes (PyTorchJob) From cc1a276ade11a5be70699081dbc8bc7dc1237cc4 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?oliver=20k=C3=B6nig?= Date: Thu, 11 Dec 2025 18:20:05 +0000 Subject: [PATCH 05/31] format MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit Signed-off-by: oliver könig --- nemo_run/core/execution/dgxcloud.py | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/nemo_run/core/execution/dgxcloud.py b/nemo_run/core/execution/dgxcloud.py index d22c0b20..8e2b4474 100644 --- a/nemo_run/core/execution/dgxcloud.py +++ b/nemo_run/core/execution/dgxcloud.py @@ -21,7 +21,7 @@ import subprocess import tempfile import time -from dataclasses import asdict, dataclass, field +from dataclasses import dataclass, field from enum import Enum from pathlib import Path from typing import Any, Iterable, Optional From f25924817da95e95f63c8a9229b6091a1329965d Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?oliver=20k=C3=B6nig?= Date: Thu, 11 Dec 2025 18:22:53 +0000 Subject: [PATCH 06/31] fix MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit Signed-off-by: oliver könig --- .../execution/templates/ft_launcher_dgxc.j2 | 17 ++++++++++++++++- 1 file changed, 16 insertions(+), 1 deletion(-) diff --git a/nemo_run/core/execution/templates/ft_launcher_dgxc.j2 b/nemo_run/core/execution/templates/ft_launcher_dgxc.j2 index 2b1546f8..8c3f392f 100644 --- a/nemo_run/core/execution/templates/ft_launcher_dgxc.j2 +++ b/nemo_run/core/execution/templates/ft_launcher_dgxc.j2 @@ -66,4 +66,19 @@ if [ "$exitcode" -eq "0" ]; then echo "[FT-Teardown] Job finished successfully and flag file exists." exit 0 else - # Edge Case: The python script exited 0, but didn't write the flag + # Edge Case: The python script exited 0, but didn't write the flag file. + # This usually means a silent crash or partial run. We must force a retry. + echo "[FT-Teardown] WARNING: Process exited 0 but finished flag is MISSING." + echo "[FT-Teardown] Forcing exit 1 to trigger Kubernetes restart." + exit 1 + fi +else + # Case B: Script crashed (exitcode != 0). + echo "[FT-Teardown] Job failed with exit code $exitcode." + + # We exit with the error code. + # The K8s 'backoffLimit' (in PyTorchJob spec) will determine if we restart. + # We do NOT calculate retry counts manually here. + exit $exitcode +fi +{%- endmacro %} From 4e15269a4e567de1e4b377c34cccc5c07355ced6 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?oliver=20k=C3=B6nig?= Date: Thu, 11 Dec 2025 22:36:42 +0000 Subject: [PATCH 07/31] fix MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit Signed-off-by: oliver könig --- nemo_run/core/execution/dgxcloud.py | 25 +++++++++++++++++++++---- 1 file changed, 21 insertions(+), 4 deletions(-) diff --git a/nemo_run/core/execution/dgxcloud.py b/nemo_run/core/execution/dgxcloud.py index 8e2b4474..1a706088 100644 --- a/nemo_run/core/execution/dgxcloud.py +++ b/nemo_run/core/execution/dgxcloud.py @@ -16,7 +16,6 @@ import base64 import glob import json -import logging import os import subprocess import tempfile @@ -30,14 +29,14 @@ from invoke.context import Context from nemo_run.config import get_nemorun_home +from nemo_run.core.console import CONSOLE +from nemo_run.core.constants import RUNDIR_NAME from nemo_run.core.execution.base import Executor, ExecutorMacros -from nemo_run.core.execution.launcher import FaultTolerance, Launcher +from nemo_run.core.execution.launcher import FaultTolerance, Launcher, Torchrun from nemo_run.core.execution.utils import fill_template from nemo_run.core.packaging.base import Packager from nemo_run.core.packaging.git import GitArchivePackager -logger = logging.getLogger(__name__) - class DGXCloudState(Enum): CREATING = "Creating" @@ -463,6 +462,24 @@ def cancel(self, job_id: str): response.text, ) + def _setup_launcher(self): + super()._setup_launcher() + launcher = self.launcher + if launcher and isinstance(launcher, (FaultTolerance, Torchrun)): + self.torchrun_nproc_per_node = self.nprocs_per_node + self.ntasks_per_node = 1 + CONSOLE.log( + f"Detected {launcher.__class__.__name__} launcher, setting ntasks_per_node=1 and torchrun_nproc_per_node={self.torchrun_nproc_per_node}" + ) + + if launcher and isinstance(launcher, FaultTolerance): + base_dir = os.path.join(self.job_dir, Path(self.job_dir).name) + launcher.cfg_path = os.path.join(base_dir, f"{self.job_name}_ft_cfg.yml") + launcher.finished_flag_file = os.path.join( + "/", RUNDIR_NAME, f"{self.job_name}_finished_flag" + ) + launcher.job_results_file = os.path.join(base_dir, f"{self.job_name}_job_results") + def cleanup(self, handle: str): ... def assign( From 0875083bdc507eed69fb92cc504d14685bc030da Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?oliver=20k=C3=B6nig?= Date: Thu, 11 Dec 2025 22:45:11 +0000 Subject: [PATCH 08/31] fix MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit Signed-off-by: oliver könig --- nemo_run/core/execution/dgxcloud.py | 5 ++--- 1 file changed, 2 insertions(+), 3 deletions(-) diff --git a/nemo_run/core/execution/dgxcloud.py b/nemo_run/core/execution/dgxcloud.py index 1a706088..8058757f 100644 --- a/nemo_run/core/execution/dgxcloud.py +++ b/nemo_run/core/execution/dgxcloud.py @@ -28,12 +28,11 @@ import requests from invoke.context import Context -from nemo_run.config import get_nemorun_home -from nemo_run.core.console import CONSOLE -from nemo_run.core.constants import RUNDIR_NAME +from nemo_run.config import RUNDIR_NAME, get_nemorun_home from nemo_run.core.execution.base import Executor, ExecutorMacros from nemo_run.core.execution.launcher import FaultTolerance, Launcher, Torchrun from nemo_run.core.execution.utils import fill_template +from nemo_run.core.frontend.console.api import CONSOLE from nemo_run.core.packaging.base import Packager from nemo_run.core.packaging.git import GitArchivePackager From db84ae07bc9e19563791be1e005039e45790824e Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?oliver=20k=C3=B6nig?= Date: Thu, 11 Dec 2025 22:51:26 +0000 Subject: [PATCH 09/31] revert MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit Signed-off-by: oliver könig --- nemo_run/core/execution/dgxcloud.py | 6 +++++- 1 file changed, 5 insertions(+), 1 deletion(-) diff --git a/nemo_run/core/execution/dgxcloud.py b/nemo_run/core/execution/dgxcloud.py index 8058757f..8440eb64 100644 --- a/nemo_run/core/execution/dgxcloud.py +++ b/nemo_run/core/execution/dgxcloud.py @@ -24,7 +24,7 @@ from enum import Enum from pathlib import Path from typing import Any, Iterable, Optional - +import logging import requests from invoke.context import Context @@ -37,6 +37,9 @@ from nemo_run.core.packaging.git import GitArchivePackager +logger = logging.getLogger(__name__) + + class DGXCloudState(Enum): CREATING = "Creating" INITIALIZING = "Initializing" @@ -626,3 +629,4 @@ def __repr__(self) -> str: # --------------------------------------------------- {self.materialize()} """ +""" From d19794339fc892be54b8b12bc6161aec4e3346a7 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?oliver=20k=C3=B6nig?= Date: Thu, 11 Dec 2025 22:56:35 +0000 Subject: [PATCH 10/31] fix MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit Signed-off-by: oliver könig --- nemo_run/core/execution/dgxcloud.py | 5 ++--- 1 file changed, 2 insertions(+), 3 deletions(-) diff --git a/nemo_run/core/execution/dgxcloud.py b/nemo_run/core/execution/dgxcloud.py index 8440eb64..1d17afdb 100644 --- a/nemo_run/core/execution/dgxcloud.py +++ b/nemo_run/core/execution/dgxcloud.py @@ -16,6 +16,7 @@ import base64 import glob import json +import logging import os import subprocess import tempfile @@ -24,7 +25,7 @@ from enum import Enum from pathlib import Path from typing import Any, Iterable, Optional -import logging + import requests from invoke.context import Context @@ -36,7 +37,6 @@ from nemo_run.core.packaging.base import Packager from nemo_run.core.packaging.git import GitArchivePackager - logger = logging.getLogger(__name__) @@ -629,4 +629,3 @@ def __repr__(self) -> str: # --------------------------------------------------- {self.materialize()} """ -""" From c507ddad378ea8431840fcb603a991f1444023c4 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?oliver=20k=C3=B6nig?= Date: Fri, 12 Dec 2025 00:03:19 +0000 Subject: [PATCH 11/31] fix MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit Signed-off-by: oliver könig --- nemo_run/core/execution/templates/ft_launcher_dgxc.j2 | 2 ++ 1 file changed, 2 insertions(+) diff --git a/nemo_run/core/execution/templates/ft_launcher_dgxc.j2 b/nemo_run/core/execution/templates/ft_launcher_dgxc.j2 index 8c3f392f..87c23a27 100644 --- a/nemo_run/core/execution/templates/ft_launcher_dgxc.j2 +++ b/nemo_run/core/execution/templates/ft_launcher_dgxc.j2 @@ -55,6 +55,8 @@ if [ -n "$FAULT_TOL_JOB_RESULTS_FILE" ]; then # We update the specific entry for this host from X to S or F # Note: 'sed -i' on a shared PVC can be risky with concurrency. # Appending a new status line is safer in K8s. + mkdir -p "$(dirname "$FAULT_TOL_JOB_RESULTS_FILE")" + echo "$(hostname) $(date +%s) $RESULT_STATUS" >> "$FAULT_TOL_JOB_RESULTS_FILE" fi From 620e0737f1eee97679fc4bd94d7b6aa1bbe2e367 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?oliver=20k=C3=B6nig?= Date: Fri, 12 Dec 2025 00:20:59 +0000 Subject: [PATCH 12/31] fix MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit Signed-off-by: oliver könig --- nemo_run/core/execution/templates/ft_launcher_dgxc.j2 | 1 + 1 file changed, 1 insertion(+) diff --git a/nemo_run/core/execution/templates/ft_launcher_dgxc.j2 b/nemo_run/core/execution/templates/ft_launcher_dgxc.j2 index 87c23a27..d62fa400 100644 --- a/nemo_run/core/execution/templates/ft_launcher_dgxc.j2 +++ b/nemo_run/core/execution/templates/ft_launcher_dgxc.j2 @@ -32,6 +32,7 @@ echo "[FT-Setup] Starting training on $(hostname)..." # Note: In high-scale K8s, writing to a single file from 1000 pods can cause lock contention. # If scale is small, this is fine. if [ -n "$FAULT_TOL_JOB_RESULTS_FILE" ]; then + mkdir -p "$(dirname "$FAULT_TOL_JOB_RESULTS_FILE")" echo "$(hostname) $(date +%s) X" >> "$FAULT_TOL_JOB_RESULTS_FILE" fi From eb98cb51cc8b646253b64d7d63e4b713504bf347 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?oliver=20k=C3=B6nig?= Date: Fri, 12 Dec 2025 00:43:38 +0000 Subject: [PATCH 13/31] cleanup MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit Signed-off-by: oliver könig --- nemo_run/core/execution/templates/dgxc.sh.j2 | 22 ++--------- .../execution/templates/ft_launcher_dgxc.j2 | 37 ------------------- 2 files changed, 3 insertions(+), 56 deletions(-) diff --git a/nemo_run/core/execution/templates/dgxc.sh.j2 b/nemo_run/core/execution/templates/dgxc.sh.j2 index 1a8c000a..75bdede2 100644 --- a/nemo_run/core/execution/templates/dgxc.sh.j2 +++ b/nemo_run/core/execution/templates/dgxc.sh.j2 @@ -1,47 +1,31 @@ {%- import "ft_launcher_dgxc.j2" as fault_tolerance -%} #!/bin/bash -# -# Generated by NeMo Run for Kubernetes (PyTorchJob) -# -# 1. Basic Shell Setup set -evx # Print commands, but DO NOT exit immediately on error (we handle that below) export PYTHONUNBUFFERED=1 export TORCHX_MAX_RETRIES={{max_retries}} -# 2. Environment Variables -# These are strictly user-defined vars (e.g. HYDRA_FULL_ERROR). -# Note: MASTER_ADDR, WORLD_SIZE, RANK are injected automatically by the PyTorchJob operator. {%- for env_var in env_vars %} {{env_var}} {%- endfor %} -# 3. Fault Tolerance: SETUP (Check-in) -# Checks if we are resuming or if we are already finished. {%- if ft_enabled %} {{ fault_tolerance.ft_launcher_setup(fault_tol_cfg_path, fault_tol_finished_flag_file, fault_tol_job_results_file) }} {%- endif %} -# 4. Main Execution -# In PyTorchJob, we usually have exactly one main command (torchrun). -# We assume the variable 'training_command' contains the full torchrun string. - echo "Starting training command..." set +e # Turn off auto-exit so we can capture the code -# --------------------------------------------------------- + {{ training_command }} -# --------------------------------------------------------- + exitcode=$? set -e echo "Main command exited with code $exitcode" -# 5. Fault Tolerance: TEARDOWN (Check-out) -# Decides if we should exit 0 (complete) or exit 1 (retry via K8s backoffLimit). {%- if ft_enabled %} {{ fault_tolerance.ft_launcher_teardown() }} {%- else %} -# If FT is disabled, simply pass the exit code through. -# K8s will restart if exitcode != 0 and backoffLimit > 0. + exit $exitcode {%- endif %} diff --git a/nemo_run/core/execution/templates/ft_launcher_dgxc.j2 b/nemo_run/core/execution/templates/ft_launcher_dgxc.j2 index d62fa400..f568879c 100644 --- a/nemo_run/core/execution/templates/ft_launcher_dgxc.j2 +++ b/nemo_run/core/execution/templates/ft_launcher_dgxc.j2 @@ -1,36 +1,20 @@ {% macro ft_launcher_setup(fault_tol_cfg_path, fault_tol_finished_flag_file, fault_tol_job_results_file) -%} -# ------------------------------------------------------------------------- -# K8s Fault Tolerance Setup (The "Check-In" Desk) -# ------------------------------------------------------------------------- -# 1. Export Paths -# IMPORTANT: These paths must reside on a ReadWriteMany (RWX) Persistent Volume -# mounted to all Pods so state is preserved across pod restarts/rescheduling. export FAULT_TOL_CFG_PATH="{{fault_tol_cfg_path}}" export FAULT_TOL_FINISHED_FLAG_FILE="{{fault_tol_finished_flag_file}}" export FAULT_TOL_JOB_RESULTS_FILE="{{fault_tol_job_results_file}}" -# 2. Define Helper Functions is_training_finished() { test -f "$FAULT_TOL_FINISHED_FLAG_FILE" } -# 3. Check for Previous Success -# In K8s, a Pod might be restarted due to node maintenance even if the job -# logic was done. If the flag file exists, we exit immediately with 0. if is_training_finished ; then echo "[FT-Setup] Found finished flag at $FAULT_TOL_FINISHED_FLAG_FILE." echo "[FT-Setup] Training is already complete. Exiting successfully." exit 0 fi -# 4. Logging Start -# We use HOSTNAME (usually pod-name) as the identifier since SLURM_JOB_ID is gone. -# We append 'X' (Running/Unknown) to the log. echo "[FT-Setup] Starting training on $(hostname)..." -# Optional: Log attempt to shared file (Using X for Running) -# Note: In high-scale K8s, writing to a single file from 1000 pods can cause lock contention. -# If scale is small, this is fine. if [ -n "$FAULT_TOL_JOB_RESULTS_FILE" ]; then mkdir -p "$(dirname "$FAULT_TOL_JOB_RESULTS_FILE")" echo "$(hostname) $(date +%s) X" >> "$FAULT_TOL_JOB_RESULTS_FILE" @@ -39,49 +23,28 @@ fi {%- endmacro %} {% macro ft_launcher_teardown() -%} -# ------------------------------------------------------------------------- -# K8s Fault Tolerance Teardown (The "Check-Out" Desk) -# ------------------------------------------------------------------------- - -# 1. Analyze Exit Code from the Main Command -# 'exitcode' is captured in the main script before calling this macro. if [ "$exitcode" -eq "0" ]; then RESULT_STATUS="S" # Success else RESULT_STATUS="F" # Failure fi -# 2. Update Log (Optional but helpful for debugging) if [ -n "$FAULT_TOL_JOB_RESULTS_FILE" ]; then - # We update the specific entry for this host from X to S or F - # Note: 'sed -i' on a shared PVC can be risky with concurrency. - # Appending a new status line is safer in K8s. mkdir -p "$(dirname "$FAULT_TOL_JOB_RESULTS_FILE")" - echo "$(hostname) $(date +%s) $RESULT_STATUS" >> "$FAULT_TOL_JOB_RESULTS_FILE" fi -# 3. The Requeue Decision Logic if [ "$exitcode" -eq "0" ]; then - # Case A: Script exited successfully. - # Verification: Did it actually finish (create the flag file)? if is_training_finished; then echo "[FT-Teardown] Job finished successfully and flag file exists." exit 0 else - # Edge Case: The python script exited 0, but didn't write the flag file. - # This usually means a silent crash or partial run. We must force a retry. echo "[FT-Teardown] WARNING: Process exited 0 but finished flag is MISSING." echo "[FT-Teardown] Forcing exit 1 to trigger Kubernetes restart." exit 1 fi else - # Case B: Script crashed (exitcode != 0). echo "[FT-Teardown] Job failed with exit code $exitcode." - - # We exit with the error code. - # The K8s 'backoffLimit' (in PyTorchJob spec) will determine if we restart. - # We do NOT calculate retry counts manually here. exit $exitcode fi {%- endmacro %} From 096e0afe3c685852c57a463264de915d529a9c41 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?oliver=20k=C3=B6nig?= Date: Tue, 16 Dec 2025 13:48:22 +0000 Subject: [PATCH 14/31] change template MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit Signed-off-by: oliver könig --- .../execution/templates/ft_launcher_dgxc.j2 | 62 +++++++++---------- 1 file changed, 29 insertions(+), 33 deletions(-) diff --git a/nemo_run/core/execution/templates/ft_launcher_dgxc.j2 b/nemo_run/core/execution/templates/ft_launcher_dgxc.j2 index f568879c..b1549570 100644 --- a/nemo_run/core/execution/templates/ft_launcher_dgxc.j2 +++ b/nemo_run/core/execution/templates/ft_launcher_dgxc.j2 @@ -1,50 +1,46 @@ {% macro ft_launcher_setup(fault_tol_cfg_path, fault_tol_finished_flag_file, fault_tol_job_results_file) -%} - +# This script uses experimental fault tolerance launcher +# Fault tolerance related items export FAULT_TOL_CFG_PATH="{{fault_tol_cfg_path}}" export FAULT_TOL_FINISHED_FLAG_FILE="{{fault_tol_finished_flag_file}}" -export FAULT_TOL_JOB_RESULTS_FILE="{{fault_tol_job_results_file}}" +ANY_JOB_STEP_FAILED=0 +# Automatic job resubmission related items +JOB_RESULTS_FILE="{{fault_tol_job_results_file}}" +is_job_failures_limit_reached() { + if [ $TORCHX_MAX_RETRIES -eq 0 ]; then + true + else + tail -n $TORCHX_MAX_RETRIES "$JOB_RESULTS_FILE" | \ + awk "/^[[:alnum:]]+[[:space:]]+[[:alnum:]]+[[:space:]]+[XF]$/{f++} END{exit !(f>=$TORCHX_MAX_RETRIES)}" + fi +} is_training_finished() { - test -f "$FAULT_TOL_FINISHED_FLAG_FILE" + test -f "$(dirname $JOB_RESULTS_FILE)/$(basename $FAULT_TOL_FINISHED_FLAG_FILE)" } - -if is_training_finished ; then - echo "[FT-Setup] Found finished flag at $FAULT_TOL_FINISHED_FLAG_FILE." - echo "[FT-Setup] Training is already complete. Exiting successfully." - exit 0 -fi - -echo "[FT-Setup] Starting training on $(hostname)..." -if [ -n "$FAULT_TOL_JOB_RESULTS_FILE" ]; then - mkdir -p "$(dirname "$FAULT_TOL_JOB_RESULTS_FILE")" - echo "$(hostname) $(date +%s) X" >> "$FAULT_TOL_JOB_RESULTS_FILE" +# Exit immediately if finished flag file exists and this job is a continuation +if [ -v RETRY_COUNT ] && [ "$RETRY_COUNT" -gt 0 ] ; then + if is_training_finished ; then echo "Training is finished" ; exit 0 ; fi + if is_job_failures_limit_reached ; then echo "Job failures limit reached ($TORCHX_MAX_RETRIES)" ; exit 1 ; fi +else + rm -f "$FAULT_TOL_FINISHED_FLAG_FILE" "$JOB_RESULTS_FILE" fi +# Write unknown job status to the job log, we will fix it at the end +echo "$TORCHX_REPLICA_ID ${RETRY_COUNT:-0} X" >> "$JOB_RESULTS_FILE" {%- endmacro %} {% macro ft_launcher_teardown() -%} -if [ "$exitcode" -eq "0" ]; then - RESULT_STATUS="S" # Success +if [ $exitcode -ne 0 ]; then ANY_JOB_STEP_FAILED=1 ; fi +# Fix the job log entry ("JOB_ID X" -> "JOB_ID S/F"), depending on the job result +if [ "$ANY_JOB_STEP_FAILED" = "0" ] ; then + sed -i "s/$TORCHX_REPLICA_ID ${RETRY_COUNT:-0} X/$TORCHX_REPLICA_ID ${RETRY_COUNT:-0} S/" "$JOB_RESULTS_FILE" else - RESULT_STATUS="F" # Failure -fi - -if [ -n "$FAULT_TOL_JOB_RESULTS_FILE" ]; then - mkdir -p "$(dirname "$FAULT_TOL_JOB_RESULTS_FILE")" - echo "$(hostname) $(date +%s) $RESULT_STATUS" >> "$FAULT_TOL_JOB_RESULTS_FILE" + sed -i "s/$TORCHX_REPLICA_ID ${RETRY_COUNT:-0} X/$TORCHX_REPLICA_ID ${RETRY_COUNT:-0} F/" "$JOB_RESULTS_FILE" fi -if [ "$exitcode" -eq "0" ]; then - if is_training_finished; then - echo "[FT-Teardown] Job finished successfully and flag file exists." - exit 0 - else - echo "[FT-Teardown] WARNING: Process exited 0 but finished flag is MISSING." - echo "[FT-Teardown] Forcing exit 1 to trigger Kubernetes restart." - exit 1 - fi -else - echo "[FT-Teardown] Job failed with exit code $exitcode." +if ! (is_training_finished || is_job_failures_limit_reached); then + scontrol requeue "$TORCHX_REPLICA_ID" exit $exitcode fi {%- endmacro %} From c7ab84368a67500017a9758a1ea8d3ffa0ac2b58 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?oliver=20k=C3=B6nig?= Date: Tue, 16 Dec 2025 14:01:23 +0000 Subject: [PATCH 15/31] fix MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit Signed-off-by: oliver könig --- nemo_run/core/execution/templates/ft_launcher_dgxc.j2 | 8 ++------ 1 file changed, 2 insertions(+), 6 deletions(-) diff --git a/nemo_run/core/execution/templates/ft_launcher_dgxc.j2 b/nemo_run/core/execution/templates/ft_launcher_dgxc.j2 index b1549570..096db033 100644 --- a/nemo_run/core/execution/templates/ft_launcher_dgxc.j2 +++ b/nemo_run/core/execution/templates/ft_launcher_dgxc.j2 @@ -19,12 +19,8 @@ is_training_finished() { test -f "$(dirname $JOB_RESULTS_FILE)/$(basename $FAULT_TOL_FINISHED_FLAG_FILE)" } # Exit immediately if finished flag file exists and this job is a continuation -if [ -v RETRY_COUNT ] && [ "$RETRY_COUNT" -gt 0 ] ; then - if is_training_finished ; then echo "Training is finished" ; exit 0 ; fi - if is_job_failures_limit_reached ; then echo "Job failures limit reached ($TORCHX_MAX_RETRIES)" ; exit 1 ; fi -else - rm -f "$FAULT_TOL_FINISHED_FLAG_FILE" "$JOB_RESULTS_FILE" -fi +if is_training_finished ; then echo "Training is finished" ; exit 0 ; fi +if is_job_failures_limit_reached ; then echo "Job failures limit reached ($TORCHX_MAX_RETRIES)" ; exit 1 ; fi # Write unknown job status to the job log, we will fix it at the end echo "$TORCHX_REPLICA_ID ${RETRY_COUNT:-0} X" >> "$JOB_RESULTS_FILE" From bd000a407e7455e9da22bd8448bfaf08267b76b7 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?oliver=20k=C3=B6nig?= Date: Tue, 16 Dec 2025 14:13:11 +0000 Subject: [PATCH 16/31] fix MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit Signed-off-by: oliver könig --- .../execution/templates/ft_launcher_dgxc.j2 | 29 ++++++++++++++----- 1 file changed, 21 insertions(+), 8 deletions(-) diff --git a/nemo_run/core/execution/templates/ft_launcher_dgxc.j2 b/nemo_run/core/execution/templates/ft_launcher_dgxc.j2 index 096db033..c08ba8e8 100644 --- a/nemo_run/core/execution/templates/ft_launcher_dgxc.j2 +++ b/nemo_run/core/execution/templates/ft_launcher_dgxc.j2 @@ -7,36 +7,49 @@ ANY_JOB_STEP_FAILED=0 # Automatic job resubmission related items JOB_RESULTS_FILE="{{fault_tol_job_results_file}}" +# For k8s, we use pod restart count or a custom retry counter +RETRY_COUNT=${RETRY_COUNT:-0} +# Use a unique identifier for this job/pod +JOB_ID=${HOSTNAME:-${TORCHX_REPLICA_ID:-unknown}} + is_job_failures_limit_reached() { if [ $TORCHX_MAX_RETRIES -eq 0 ]; then true else - tail -n $TORCHX_MAX_RETRIES "$JOB_RESULTS_FILE" | \ - awk "/^[[:alnum:]]+[[:space:]]+[[:alnum:]]+[[:space:]]+[XF]$/{f++} END{exit !(f>=$TORCHX_MAX_RETRIES)}" + tail -n $TORCHX_MAX_RETRIES "$JOB_RESULTS_FILE" 2>/dev/null | \ + awk "/^[[:alnum:]_-]+[[:space:]]+[[:alnum:]]+[[:space:]]+[XF]$/{f++} END{exit !(f>=$TORCHX_MAX_RETRIES)}" fi } is_training_finished() { test -f "$(dirname $JOB_RESULTS_FILE)/$(basename $FAULT_TOL_FINISHED_FLAG_FILE)" } # Exit immediately if finished flag file exists and this job is a continuation -if is_training_finished ; then echo "Training is finished" ; exit 0 ; fi -if is_job_failures_limit_reached ; then echo "Job failures limit reached ($TORCHX_MAX_RETRIES)" ; exit 1 ; fi +if [ "$RETRY_COUNT" -gt 0 ] ; then + if is_training_finished ; then echo "Training is finished" ; exit 0 ; fi + if is_job_failures_limit_reached ; then echo "Job failures limit reached ($TORCHX_MAX_RETRIES)" ; exit 1 ; fi +else + rm -f "$FAULT_TOL_FINISHED_FLAG_FILE" "$JOB_RESULTS_FILE" +fi # Write unknown job status to the job log, we will fix it at the end -echo "$TORCHX_REPLICA_ID ${RETRY_COUNT:-0} X" >> "$JOB_RESULTS_FILE" +echo "$JOB_ID $RETRY_COUNT X" >> "$JOB_RESULTS_FILE" {%- endmacro %} {% macro ft_launcher_teardown() -%} if [ $exitcode -ne 0 ]; then ANY_JOB_STEP_FAILED=1 ; fi # Fix the job log entry ("JOB_ID X" -> "JOB_ID S/F"), depending on the job result +JOB_ID=${HOSTNAME:-${TORCHX_REPLICA_ID:-unknown}} +RETRY_COUNT=${RETRY_COUNT:-0} + if [ "$ANY_JOB_STEP_FAILED" = "0" ] ; then - sed -i "s/$TORCHX_REPLICA_ID ${RETRY_COUNT:-0} X/$TORCHX_REPLICA_ID ${RETRY_COUNT:-0} S/" "$JOB_RESULTS_FILE" + sed -i "s/$JOB_ID $RETRY_COUNT X/$JOB_ID $RETRY_COUNT S/" "$JOB_RESULTS_FILE" else - sed -i "s/$TORCHX_REPLICA_ID ${RETRY_COUNT:-0} X/$TORCHX_REPLICA_ID ${RETRY_COUNT:-0} F/" "$JOB_RESULTS_FILE" + sed -i "s/$JOB_ID $RETRY_COUNT X/$JOB_ID $RETRY_COUNT F/" "$JOB_RESULTS_FILE" fi +# On k8s, we exit with the appropriate code and let the retry policy handle resubmission +# Rather than explicitly requeueing like SLURM if ! (is_training_finished || is_job_failures_limit_reached); then - scontrol requeue "$TORCHX_REPLICA_ID" exit $exitcode fi {%- endmacro %} From 5f2fb8a0b3d9138f9385ace2e0745ec30f02326d Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?oliver=20k=C3=B6nig?= Date: Tue, 16 Dec 2025 14:20:24 +0000 Subject: [PATCH 17/31] test MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit Signed-off-by: oliver könig --- .../execution/templates/ft_launcher_dgxc.j2 | 52 ++++++++++++++----- 1 file changed, 39 insertions(+), 13 deletions(-) diff --git a/nemo_run/core/execution/templates/ft_launcher_dgxc.j2 b/nemo_run/core/execution/templates/ft_launcher_dgxc.j2 index c08ba8e8..0717e64e 100644 --- a/nemo_run/core/execution/templates/ft_launcher_dgxc.j2 +++ b/nemo_run/core/execution/templates/ft_launcher_dgxc.j2 @@ -13,30 +13,48 @@ RETRY_COUNT=${RETRY_COUNT:-0} JOB_ID=${HOSTNAME:-${TORCHX_REPLICA_ID:-unknown}} is_job_failures_limit_reached() { - if [ $TORCHX_MAX_RETRIES -eq 0 ]; then - true + if [ ! -f "$JOB_RESULTS_FILE" ]; then + return 1 # File doesn't exist, limit not reached + fi + if [ "${TORCHX_MAX_RETRIES:-0}" -eq 0 ]; then + return 0 # 0 retries means limit is always reached else - tail -n $TORCHX_MAX_RETRIES "$JOB_RESULTS_FILE" 2>/dev/null | \ - awk "/^[[:alnum:]_-]+[[:space:]]+[[:alnum:]]+[[:space:]]+[XF]$/{f++} END{exit !(f>=$TORCHX_MAX_RETRIES)}" + tail -n "${TORCHX_MAX_RETRIES}" "$JOB_RESULTS_FILE" 2>/dev/null | \ + awk "/^[[:alnum:]_-]+[[:space:]]+[[:alnum:]]+[[:space:]]+[XF]$/{f++} END{exit !(f>=${TORCHX_MAX_RETRIES})}" fi } + is_training_finished() { - test -f "$(dirname $JOB_RESULTS_FILE)/$(basename $FAULT_TOL_FINISHED_FLAG_FILE)" + test -f "$(dirname "$JOB_RESULTS_FILE")/$(basename "$FAULT_TOL_FINISHED_FLAG_FILE")" } -# Exit immediately if finished flag file exists and this job is a continuation -if [ "$RETRY_COUNT" -gt 0 ] ; then - if is_training_finished ; then echo "Training is finished" ; exit 0 ; fi - if is_job_failures_limit_reached ; then echo "Job failures limit reached ($TORCHX_MAX_RETRIES)" ; exit 1 ; fi -else + +# Check if training is already finished +if is_training_finished ; then + echo "Training is finished" + exit 0 +fi + +# Check if we've hit the failure limit +if is_job_failures_limit_reached ; then + echo "Job failures limit reached (${TORCHX_MAX_RETRIES:-0})" + exit 1 +fi + +# Only clean up job results on the very first run +if [ "$RETRY_COUNT" -eq 0 ] ; then rm -f "$FAULT_TOL_FINISHED_FLAG_FILE" "$JOB_RESULTS_FILE" fi +# Ensure directory exists +mkdir -p "$(dirname "$JOB_RESULTS_FILE")" + # Write unknown job status to the job log, we will fix it at the end echo "$JOB_ID $RETRY_COUNT X" >> "$JOB_RESULTS_FILE" {%- endmacro %} {% macro ft_launcher_teardown() -%} if [ $exitcode -ne 0 ]; then ANY_JOB_STEP_FAILED=1 ; fi + # Fix the job log entry ("JOB_ID X" -> "JOB_ID S/F"), depending on the job result JOB_ID=${HOSTNAME:-${TORCHX_REPLICA_ID:-unknown}} RETRY_COUNT=${RETRY_COUNT:-0} @@ -47,9 +65,17 @@ else sed -i "s/$JOB_ID $RETRY_COUNT X/$JOB_ID $RETRY_COUNT F/" "$JOB_RESULTS_FILE" fi -# On k8s, we exit with the appropriate code and let the retry policy handle resubmission -# Rather than explicitly requeueing like SLURM -if ! (is_training_finished || is_job_failures_limit_reached); then +# Check final state +if is_training_finished ; then + echo "Training completed successfully" + exit 0 +elif is_job_failures_limit_reached ; then + echo "Job failures limit reached, giving up" + exit 1 +else + # Training not finished and we haven't hit retry limit + # Exit with failure code to trigger pod restart + echo "Training incomplete, exiting with code $exitcode to trigger retry" exit $exitcode fi {%- endmacro %} From c9b1755ac62e7421705910736df944393cd70a79 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?oliver=20k=C3=B6nig?= Date: Tue, 16 Dec 2025 14:34:07 +0000 Subject: [PATCH 18/31] retries MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit Signed-off-by: oliver könig --- .../execution/templates/ft_launcher_dgxc.j2 | 18 +++++++++++------- 1 file changed, 11 insertions(+), 7 deletions(-) diff --git a/nemo_run/core/execution/templates/ft_launcher_dgxc.j2 b/nemo_run/core/execution/templates/ft_launcher_dgxc.j2 index 0717e64e..ab0ce885 100644 --- a/nemo_run/core/execution/templates/ft_launcher_dgxc.j2 +++ b/nemo_run/core/execution/templates/ft_launcher_dgxc.j2 @@ -13,15 +13,19 @@ RETRY_COUNT=${RETRY_COUNT:-0} JOB_ID=${HOSTNAME:-${TORCHX_REPLICA_ID:-unknown}} is_job_failures_limit_reached() { - if [ ! -f "$JOB_RESULTS_FILE" ]; then - return 1 # File doesn't exist, limit not reached - fi + # If TORCHX_MAX_RETRIES is 0 or unset, never reach the limit (infinite retries) if [ "${TORCHX_MAX_RETRIES:-0}" -eq 0 ]; then - return 0 # 0 retries means limit is always reached - else - tail -n "${TORCHX_MAX_RETRIES}" "$JOB_RESULTS_FILE" 2>/dev/null | \ - awk "/^[[:alnum:]_-]+[[:space:]]+[[:alnum:]]+[[:space:]]+[XF]$/{f++} END{exit !(f>=${TORCHX_MAX_RETRIES})}" + return 1 # Limit not reached, allow retries + fi + + # If job results file doesn't exist yet, limit not reached + if [ ! -f "$JOB_RESULTS_FILE" ]; then + return 1 fi + + # Check if we have TORCHX_MAX_RETRIES failures in the log + tail -n "${TORCHX_MAX_RETRIES}" "$JOB_RESULTS_FILE" 2>/dev/null | \ + awk "/^[[:alnum:]_-]+[[:space:]]+[[:alnum:]]+[[:space:]]+[XF]$/{f++} END{exit !(f>=${TORCHX_MAX_RETRIES})}" } is_training_finished() { From 6d3c34f945dd049e821b3cf2e05ba91b7ba909eb Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?oliver=20k=C3=B6nig?= Date: Tue, 16 Dec 2025 17:45:11 +0000 Subject: [PATCH 19/31] TORCHX_MAX_RETRIES MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit Signed-off-by: oliver könig --- nemo_run/core/execution/templates/ft_launcher_dgxc.j2 | 1 + 1 file changed, 1 insertion(+) diff --git a/nemo_run/core/execution/templates/ft_launcher_dgxc.j2 b/nemo_run/core/execution/templates/ft_launcher_dgxc.j2 index ab0ce885..c6993d17 100644 --- a/nemo_run/core/execution/templates/ft_launcher_dgxc.j2 +++ b/nemo_run/core/execution/templates/ft_launcher_dgxc.j2 @@ -4,6 +4,7 @@ export FAULT_TOL_CFG_PATH="{{fault_tol_cfg_path}}" export FAULT_TOL_FINISHED_FLAG_FILE="{{fault_tol_finished_flag_file}}" ANY_JOB_STEP_FAILED=0 +export TORCHX_MAX_RETRIES=3 # Automatic job resubmission related items JOB_RESULTS_FILE="{{fault_tol_job_results_file}}" From cdffd23d152896a377f9b58a2a90a82918375a37 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?oliver=20k=C3=B6nig?= Date: Wed, 17 Dec 2025 17:59:08 +0000 Subject: [PATCH 20/31] cleanup MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit Signed-off-by: oliver könig --- .../execution/templates/ft_launcher_dgxc.j2 | 78 +------------------ 1 file changed, 1 insertion(+), 77 deletions(-) diff --git a/nemo_run/core/execution/templates/ft_launcher_dgxc.j2 b/nemo_run/core/execution/templates/ft_launcher_dgxc.j2 index c6993d17..e16ebf5d 100644 --- a/nemo_run/core/execution/templates/ft_launcher_dgxc.j2 +++ b/nemo_run/core/execution/templates/ft_launcher_dgxc.j2 @@ -3,84 +3,8 @@ # Fault tolerance related items export FAULT_TOL_CFG_PATH="{{fault_tol_cfg_path}}" export FAULT_TOL_FINISHED_FLAG_FILE="{{fault_tol_finished_flag_file}}" -ANY_JOB_STEP_FAILED=0 -export TORCHX_MAX_RETRIES=3 - -# Automatic job resubmission related items -JOB_RESULTS_FILE="{{fault_tol_job_results_file}}" -# For k8s, we use pod restart count or a custom retry counter -RETRY_COUNT=${RETRY_COUNT:-0} -# Use a unique identifier for this job/pod -JOB_ID=${HOSTNAME:-${TORCHX_REPLICA_ID:-unknown}} - -is_job_failures_limit_reached() { - # If TORCHX_MAX_RETRIES is 0 or unset, never reach the limit (infinite retries) - if [ "${TORCHX_MAX_RETRIES:-0}" -eq 0 ]; then - return 1 # Limit not reached, allow retries - fi - - # If job results file doesn't exist yet, limit not reached - if [ ! -f "$JOB_RESULTS_FILE" ]; then - return 1 - fi - - # Check if we have TORCHX_MAX_RETRIES failures in the log - tail -n "${TORCHX_MAX_RETRIES}" "$JOB_RESULTS_FILE" 2>/dev/null | \ - awk "/^[[:alnum:]_-]+[[:space:]]+[[:alnum:]]+[[:space:]]+[XF]$/{f++} END{exit !(f>=${TORCHX_MAX_RETRIES})}" -} - -is_training_finished() { - test -f "$(dirname "$JOB_RESULTS_FILE")/$(basename "$FAULT_TOL_FINISHED_FLAG_FILE")" -} - -# Check if training is already finished -if is_training_finished ; then - echo "Training is finished" - exit 0 -fi - -# Check if we've hit the failure limit -if is_job_failures_limit_reached ; then - echo "Job failures limit reached (${TORCHX_MAX_RETRIES:-0})" - exit 1 -fi - -# Only clean up job results on the very first run -if [ "$RETRY_COUNT" -eq 0 ] ; then - rm -f "$FAULT_TOL_FINISHED_FLAG_FILE" "$JOB_RESULTS_FILE" -fi - -# Ensure directory exists -mkdir -p "$(dirname "$JOB_RESULTS_FILE")" - -# Write unknown job status to the job log, we will fix it at the end -echo "$JOB_ID $RETRY_COUNT X" >> "$JOB_RESULTS_FILE" {%- endmacro %} {% macro ft_launcher_teardown() -%} -if [ $exitcode -ne 0 ]; then ANY_JOB_STEP_FAILED=1 ; fi - -# Fix the job log entry ("JOB_ID X" -> "JOB_ID S/F"), depending on the job result -JOB_ID=${HOSTNAME:-${TORCHX_REPLICA_ID:-unknown}} -RETRY_COUNT=${RETRY_COUNT:-0} - -if [ "$ANY_JOB_STEP_FAILED" = "0" ] ; then - sed -i "s/$JOB_ID $RETRY_COUNT X/$JOB_ID $RETRY_COUNT S/" "$JOB_RESULTS_FILE" -else - sed -i "s/$JOB_ID $RETRY_COUNT X/$JOB_ID $RETRY_COUNT F/" "$JOB_RESULTS_FILE" -fi - -# Check final state -if is_training_finished ; then - echo "Training completed successfully" - exit 0 -elif is_job_failures_limit_reached ; then - echo "Job failures limit reached, giving up" - exit 1 -else - # Training not finished and we haven't hit retry limit - # Exit with failure code to trigger pod restart - echo "Training incomplete, exiting with code $exitcode to trigger retry" - exit $exitcode -fi +exit $exitcode {%- endmacro %} From b5f0a1ab3380000837c232104c81e197eabcbe4e Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?oliver=20k=C3=B6nig?= Date: Wed, 17 Dec 2025 18:30:13 +0000 Subject: [PATCH 21/31] fix MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit Signed-off-by: oliver könig --- .../core/execution/templates/ft_launcher_dgxc.j2 | 14 ++++++++++++++ 1 file changed, 14 insertions(+) diff --git a/nemo_run/core/execution/templates/ft_launcher_dgxc.j2 b/nemo_run/core/execution/templates/ft_launcher_dgxc.j2 index e16ebf5d..150d8b0c 100644 --- a/nemo_run/core/execution/templates/ft_launcher_dgxc.j2 +++ b/nemo_run/core/execution/templates/ft_launcher_dgxc.j2 @@ -3,6 +3,20 @@ # Fault tolerance related items export FAULT_TOL_CFG_PATH="{{fault_tol_cfg_path}}" export FAULT_TOL_FINISHED_FLAG_FILE="{{fault_tol_finished_flag_file}}" + +JOB_RESULTS_FILE="{{fault_tol_job_results_file}}" + +is_training_finished() { + test -f "$(dirname $JOB_RESULTS_FILE)/$(basename $FAULT_TOL_FINISHED_FLAG_FILE)" +} + +if is_training_finished ; then + echo "Training is finished"; + exit 0; +else + rm -f "$FAULT_TOL_FINISHED_FLAG_FILE" "$JOB_RESULTS_FILE" +fi + {%- endmacro %} {% macro ft_launcher_teardown() -%} From 9fc020c914cebd1f3b96c084948f831e43ac3419 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?oliver=20k=C3=B6nig?= Date: Sat, 20 Dec 2025 12:24:03 +0000 Subject: [PATCH 22/31] bump FT interface MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit Signed-off-by: oliver könig --- nemo_run/run/torchx_backend/components/ft_launcher.py | 10 +++++----- 1 file changed, 5 insertions(+), 5 deletions(-) diff --git a/nemo_run/run/torchx_backend/components/ft_launcher.py b/nemo_run/run/torchx_backend/components/ft_launcher.py index 3920041f..23dc9de6 100644 --- a/nemo_run/run/torchx_backend/components/ft_launcher.py +++ b/nemo_run/run/torchx_backend/components/ft_launcher.py @@ -92,27 +92,27 @@ def ft_launcher( ): if workload_check_interval: ft_args += [ - "--ft-param-workload_check_interval", + "--ft-workload_check_interval", str(workload_check_interval), ] if initial_rank_heartbeat_timeout: ft_args += [ - "--ft-param-initial_rank_heartbeat_timeout", + "--ft-initial_rank_heartbeat_timeout", str(initial_rank_heartbeat_timeout), ] if rank_heartbeat_timeout: ft_args += [ - "--ft-param-rank_heartbeat_timeout", + "--ft-rank_heartbeat_timeout", str(rank_heartbeat_timeout), ] if rank_termination_signal: - ft_args += ["--ft-param-rank_termination_signal", rank_termination_signal] + ft_args += ["--ft-rank_termination_signal", rank_termination_signal] if log_level: - ft_args += ["--ft-param-log_level", log_level] + ft_args += ["--ft-log_level", log_level] if max_restarts: ft_args += ["--max-restarts", str(max_restarts)] From 8cd9ca3d50c99e1bc5c366ea4f7b64dbeaf0644d Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?oliver=20k=C3=B6nig?= Date: Sat, 20 Dec 2025 13:02:54 +0000 Subject: [PATCH 23/31] --ft-use-infra-group-rank=False MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit Signed-off-by: oliver könig --- nemo_run/run/torchx_backend/components/ft_launcher.py | 3 +++ 1 file changed, 3 insertions(+) diff --git a/nemo_run/run/torchx_backend/components/ft_launcher.py b/nemo_run/run/torchx_backend/components/ft_launcher.py index 23dc9de6..fe125b93 100644 --- a/nemo_run/run/torchx_backend/components/ft_launcher.py +++ b/nemo_run/run/torchx_backend/components/ft_launcher.py @@ -117,6 +117,9 @@ def ft_launcher( if max_restarts: ft_args += ["--max-restarts", str(max_restarts)] + if dgxc is True: + ft_args = +["--ft-use-infra-group-rank", "False"] + else: ft_args = ["--ignore-missing-fault-tol-cfg"] From 1d5a101af4accf66e0b04ef398f78338b36071dd Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?oliver=20k=C3=B6nig?= Date: Sat, 20 Dec 2025 13:10:30 +0000 Subject: [PATCH 24/31] fix MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit Signed-off-by: oliver könig --- nemo_run/run/torchx_backend/components/ft_launcher.py | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/nemo_run/run/torchx_backend/components/ft_launcher.py b/nemo_run/run/torchx_backend/components/ft_launcher.py index fe125b93..2395465f 100644 --- a/nemo_run/run/torchx_backend/components/ft_launcher.py +++ b/nemo_run/run/torchx_backend/components/ft_launcher.py @@ -118,7 +118,7 @@ def ft_launcher( ft_args += ["--max-restarts", str(max_restarts)] if dgxc is True: - ft_args = +["--ft-use-infra-group-rank", "False"] + ft_args += ["--ft-use-infra-group-rank", "False"] else: ft_args = ["--ignore-missing-fault-tol-cfg"] From fd8531757c138bef3be3a4511b8419c794eee203 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?oliver=20k=C3=B6nig?= Date: Fri, 23 Jan 2026 15:50:10 +0000 Subject: [PATCH 25/31] add warning for max_restarts MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit Signed-off-by: oliver könig --- nemo_run/run/torchx_backend/components/ft_launcher.py | 8 +++++++- 1 file changed, 7 insertions(+), 1 deletion(-) diff --git a/nemo_run/run/torchx_backend/components/ft_launcher.py b/nemo_run/run/torchx_backend/components/ft_launcher.py index 2395465f..006c8321 100644 --- a/nemo_run/run/torchx_backend/components/ft_launcher.py +++ b/nemo_run/run/torchx_backend/components/ft_launcher.py @@ -13,6 +13,7 @@ # See the License for the specific language governing permissions and # limitations under the License. +import logging import shlex from typing import Optional @@ -22,6 +23,8 @@ from nemo_run.run.torchx_backend.components import torchrun +logger = logging.getLogger(__name__) + # Adapted from torchrun component def ft_launcher( @@ -115,7 +118,10 @@ def ft_launcher( ft_args += ["--ft-log_level", log_level] if max_restarts: - ft_args += ["--max-restarts", str(max_restarts)] + if dgxc is True: + logger.warning("max_restarts is ignored for DGXCloudExecutor") + else: + ft_args += ["--max-restarts", str(max_restarts)] if dgxc is True: ft_args += ["--ft-use-infra-group-rank", "False"] From e965507055ba950967000e0760699892f98f5b3a Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?oliver=20k=C3=B6nig?= Date: Sat, 24 Jan 2026 01:42:37 +0000 Subject: [PATCH 26/31] fix MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit Signed-off-by: oliver könig --- test/run/torchx_backend/test_packaging.py | 15 ++++++--------- 1 file changed, 6 insertions(+), 9 deletions(-) diff --git a/test/run/torchx_backend/test_packaging.py b/test/run/torchx_backend/test_packaging.py index 9343c637..a908730e 100644 --- a/test/run/torchx_backend/test_packaging.py +++ b/test/run/torchx_backend/test_packaging.py @@ -23,10 +23,7 @@ from nemo_run.core.execution.launcher import FaultTolerance, Torchrun from nemo_run.core.execution.local import LocalExecutor from nemo_run.core.packaging.base import Packager -from nemo_run.run.torchx_backend.packaging import ( - merge_executables, - package, -) +from nemo_run.run.torchx_backend.packaging import merge_executables, package @dataclass(kw_only=True) @@ -265,15 +262,15 @@ def test_package_fault_tolerance(mock_executor): assert role.entrypoint == "ft_launcher" assert role.args == [ - "--ft-param-workload_check_interval", + "--ft-workload_check_interval", "10", - "--ft-param-initial_rank_heartbeat_timeout", + "--ft-initial_rank_heartbeat_timeout", "5", - "--ft-param-rank_heartbeat_timeout", + "--ft-rank_heartbeat_timeout", "5", - "--ft-param-rank_termination_signal", + "--ft-rank_termination_signal", "SIGINT", - "--ft-param-log_level", + "--ft-log_level", "INFO", "--rdzv-backend", "etcd", From 73429130f493584754d09d5ae6f53ae062acc955 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?oliver=20k=C3=B6nig?= Date: Sat, 24 Jan 2026 01:47:25 +0000 Subject: [PATCH 27/31] fix MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit Signed-off-by: oliver könig --- test/core/execution/artifacts/ft_het_slurm.sh | 2 +- test/core/execution/artifacts/ft_slurm.sh | 2 +- 2 files changed, 2 insertions(+), 2 deletions(-) diff --git a/test/core/execution/artifacts/ft_het_slurm.sh b/test/core/execution/artifacts/ft_het_slurm.sh index a0a51456..df3ecb5b 100644 --- a/test/core/execution/artifacts/ft_het_slurm.sh +++ b/test/core/execution/artifacts/ft_het_slurm.sh @@ -77,7 +77,7 @@ echo "$SLURM_JOB_ID ${SLURM_RESTART_COUNT:-0} X" >> "$JOB_RESULTS_FILE" export CUSTOM_ENV_1=some_value_1 -srun --het-group=0 --output /root/experiment/sample_job/log-account-account.sample_job-0_%j_${SLURM_RESTART_COUNT:-0}.out --container-image image_1 --container-mounts /root/experiment/sample_job:/nemo_run --container-workdir /nemo_run/code --wait=60 --kill-on-bad-exit=1 ft_launcher --ft-param-workload_check_interval 10 --ft-param-rank_heartbeat_timeout 10 --rdzv-backend c10d --rdzv-endpoint localhost:0 --rdzv-id 1 --nnodes 1 --nproc-per-node 1 --node-rank 0 --tee 3 --no-python test_ft.sh & pids[0]=$! +srun --het-group=0 --output /root/experiment/sample_job/log-account-account.sample_job-0_%j_${SLURM_RESTART_COUNT:-0}.out --container-image image_1 --container-mounts /root/experiment/sample_job:/nemo_run --container-workdir /nemo_run/code --wait=60 --kill-on-bad-exit=1 ft_launcher --ft-workload_check_interval 10 --ft-rank_heartbeat_timeout 10 --rdzv-backend c10d --rdzv-endpoint localhost:0 --rdzv-id 1 --nnodes 1 --nproc-per-node 1 --node-rank 0 --tee 3 --no-python test_ft.sh & pids[0]=$! sleep 30 diff --git a/test/core/execution/artifacts/ft_slurm.sh b/test/core/execution/artifacts/ft_slurm.sh index 59b15123..4421b3c4 100644 --- a/test/core/execution/artifacts/ft_slurm.sh +++ b/test/core/execution/artifacts/ft_slurm.sh @@ -62,7 +62,7 @@ echo "$SLURM_JOB_ID ${SLURM_RESTART_COUNT:-0} X" >> "$JOB_RESULTS_FILE" # Command 1 -srun --output /root/sample_job/log-account-account.sample_job_%j_${SLURM_RESTART_COUNT:-0}.out --container-mounts /root/sample_job:/nemo_run --container-workdir /nemo_run/code --wait=60 --kill-on-bad-exit=1 ft_launcher --ft-param-workload_check_interval 10 --ft-param-rank_heartbeat_timeout 10 --rdzv-backend c10d --rdzv-endpoint localhost:0 --rdzv-id 7680 --nnodes 1 --nproc-per-node 1 --node-rank 0 --tee 3 --no-python test_ft.sh +srun --output /root/sample_job/log-account-account.sample_job_%j_${SLURM_RESTART_COUNT:-0}.out --container-mounts /root/sample_job:/nemo_run --container-workdir /nemo_run/code --wait=60 --kill-on-bad-exit=1 ft_launcher --ft-workload_check_interval 10 --ft-rank_heartbeat_timeout 10 --rdzv-backend c10d --rdzv-endpoint localhost:0 --rdzv-id 7680 --nnodes 1 --nproc-per-node 1 --node-rank 0 --tee 3 --no-python test_ft.sh exitcode=$? From dbded700beaa9b0095a84e4a85877f3e6eabc598 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?oliver=20k=C3=B6nig?= Date: Sat, 24 Jan 2026 02:08:04 +0000 Subject: [PATCH 28/31] add test MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit Signed-off-by: oliver könig --- test/core/execution/test_dgxcloud.py | 322 ++++++++++++++++++++++++++- 1 file changed, 321 insertions(+), 1 deletion(-) diff --git a/test/core/execution/test_dgxcloud.py b/test/core/execution/test_dgxcloud.py index 9098b5c5..fbc719a1 100644 --- a/test/core/execution/test_dgxcloud.py +++ b/test/core/execution/test_dgxcloud.py @@ -1143,4 +1143,324 @@ def test_default_headers_with_token(self): assert headers["Content-Type"] == "application/json" assert "Authorization" in headers assert headers["Authorization"] == "Bearer test_token" - assert headers["Authorization"] == "Bearer test_token" + + +class TestDGXCloudRequest: + """Test DGXCloudRequest dataclass and its methods.""" + + @pytest.fixture + def basic_executor(self): + """Create a basic DGXCloudExecutor for testing.""" + return DGXCloudExecutor( + base_url="https://dgxapi.example.com", + kube_apiserver_url="https://127.0.0.1:443", + app_id="test_app_id", + app_secret="test_app_secret", + project_name="test_project", + container_image="nvcr.io/nvidia/test:latest", + pvc_nemo_run_dir="/workspace/nemo_run", + ) + + @pytest.fixture + def executor_with_env_vars(self): + """Create a DGXCloudExecutor with environment variables.""" + return DGXCloudExecutor( + base_url="https://dgxapi.example.com", + kube_apiserver_url="https://127.0.0.1:443", + app_id="test_app_id", + app_secret="test_app_secret", + project_name="test_project", + container_image="nvcr.io/nvidia/test:latest", + pvc_nemo_run_dir="/workspace/nemo_run", + env_vars={"EXECUTOR_VAR": "executor_value", "SHARED_VAR": "from_executor"}, + ) + + def test_dgxcloud_request_init(self, basic_executor): + """Test basic initialization of DGXCloudRequest.""" + from nemo_run.core.execution.dgxcloud import DGXCloudRequest + + request = DGXCloudRequest( + launch_cmd=["python", "train.py"], + jobs=["job1", "job2"], + executor=basic_executor, + max_retries=3, + extra_env={"EXTRA_VAR": "extra_value"}, + ) + + assert request.launch_cmd == ["python", "train.py"] + assert request.jobs == ["job1", "job2"] + assert request.executor == basic_executor + assert request.max_retries == 3 + assert request.extra_env == {"EXTRA_VAR": "extra_value"} + assert request.launcher is None + + def test_dgxcloud_request_with_launcher(self, basic_executor): + """Test DGXCloudRequest with a launcher.""" + from nemo_run.core.execution.dgxcloud import DGXCloudRequest + from nemo_run.core.execution.launcher import Torchrun + + launcher = Torchrun() + request = DGXCloudRequest( + launch_cmd=["python", "train.py"], + jobs=["job1"], + executor=basic_executor, + max_retries=5, + extra_env={}, + launcher=launcher, + ) + + assert request.launcher == launcher + assert isinstance(request.launcher, Torchrun) + + def test_materialize_basic(self, basic_executor): + """Test materialization of a basic request without fault tolerance.""" + from nemo_run.core.execution.dgxcloud import DGXCloudRequest + + request = DGXCloudRequest( + launch_cmd=["python", "train.py", "--epochs", "10"], + jobs=["job1"], + executor=basic_executor, + max_retries=3, + extra_env={"MY_VAR": "my_value"}, + ) + + with patch("nemo_run.core.execution.dgxcloud.fill_template") as mock_fill: + mock_fill.return_value = "#!/bin/bash\necho 'test script'" + script = request.materialize() + + # Verify fill_template was called + mock_fill.assert_called_once() + args, kwargs = mock_fill.call_args + assert args[0] == "dgxc.sh.j2" + + template_vars = args[1] + assert template_vars["max_retries"] == 3 + assert template_vars["training_command"] == "python train.py --epochs 10" + assert template_vars["ft_enabled"] is False + assert "export MY_VAR=my_value" in template_vars["env_vars"] + + assert script == "#!/bin/bash\necho 'test script'" + + def test_materialize_with_env_vars(self, executor_with_env_vars): + """Test that environment variables from executor and extra_env are merged.""" + from nemo_run.core.execution.dgxcloud import DGXCloudRequest + + request = DGXCloudRequest( + launch_cmd=["python", "train.py"], + jobs=["job1"], + executor=executor_with_env_vars, + max_retries=1, + extra_env={"EXTRA_VAR": "extra_value", "SHARED_VAR": "from_extra"}, + ) + + with patch("nemo_run.core.execution.dgxcloud.fill_template") as mock_fill: + mock_fill.return_value = "mock_script" + request.materialize() + + template_vars = mock_fill.call_args[0][1] + env_vars = template_vars["env_vars"] + + # Check that variables are present (order may vary due to dict merge) + assert "export EXECUTOR_VAR=executor_value" in env_vars + assert "export EXTRA_VAR=extra_value" in env_vars + # extra_env should override executor.env_vars for SHARED_VAR + assert "export SHARED_VAR=from_extra" in env_vars + assert "export SHARED_VAR=from_executor" not in env_vars + + def test_materialize_with_fault_tolerance(self, basic_executor): + """Test materialization with fault tolerance enabled.""" + from nemo_run.core.execution.dgxcloud import DGXCloudRequest + from nemo_run.core.execution.launcher import FaultTolerance + + ft_launcher = FaultTolerance( + cfg_path="/workspace/ft_config.yaml", + finished_flag_file="/workspace/.ft_finished", + job_results_file="/workspace/ft_results.json", + ) + + request = DGXCloudRequest( + launch_cmd=["python", "train.py"], + jobs=["job1"], + executor=basic_executor, + max_retries=5, + extra_env={}, + launcher=ft_launcher, + ) + + with patch("nemo_run.core.execution.dgxcloud.fill_template") as mock_fill: + mock_fill.return_value = "ft_script" + _ = request.materialize() + + template_vars = mock_fill.call_args[0][1] + assert template_vars["ft_enabled"] is True + assert template_vars["fault_tol_cfg_path"] == "/workspace/ft_config.yaml" + assert template_vars["fault_tol_finished_flag_file"] == "/workspace/.ft_finished" + assert template_vars["fault_tol_job_results_file"] == "/workspace/ft_results.json" + + def test_materialize_fault_tolerance_missing_fields(self, basic_executor): + """Test that fault tolerance with missing required fields raises an error.""" + from nemo_run.core.execution.dgxcloud import DGXCloudRequest + from nemo_run.core.execution.launcher import FaultTolerance + + # Create FaultTolerance with missing required fields + ft_launcher = FaultTolerance( + cfg_path="/workspace/ft_config.yaml", + # Missing finished_flag_file and job_results_file + ) + + request = DGXCloudRequest( + launch_cmd=["python", "train.py"], + jobs=["job1"], + executor=basic_executor, + max_retries=5, + extra_env={}, + launcher=ft_launcher, + ) + + with pytest.raises(AssertionError) as exc_info: + with patch("nemo_run.core.execution.dgxcloud.fill_template"): + request.materialize() + + assert "Fault Tolerance requires" in str(exc_info.value) + + def test_materialize_with_non_fault_tolerance_launcher(self, basic_executor): + """Test materialization with a non-FaultTolerance launcher (e.g., Torchrun).""" + from nemo_run.core.execution.dgxcloud import DGXCloudRequest + from nemo_run.core.execution.launcher import Torchrun + + launcher = Torchrun() + request = DGXCloudRequest( + launch_cmd=["python", "train.py"], + jobs=["job1"], + executor=basic_executor, + max_retries=2, + extra_env={}, + launcher=launcher, + ) + + with patch("nemo_run.core.execution.dgxcloud.fill_template") as mock_fill: + mock_fill.return_value = "torchrun_script" + _ = request.materialize() + + template_vars = mock_fill.call_args[0][1] + # FT should be disabled for non-FaultTolerance launchers + assert template_vars["ft_enabled"] is False + # FT-specific fields should not be in template_vars + assert "fault_tol_cfg_path" not in template_vars + assert "fault_tol_finished_flag_file" not in template_vars + assert "fault_tol_job_results_file" not in template_vars + + def test_materialize_empty_extra_env(self, basic_executor): + """Test materialization with empty extra_env.""" + from nemo_run.core.execution.dgxcloud import DGXCloudRequest + + request = DGXCloudRequest( + launch_cmd=["python", "train.py"], + jobs=["job1"], + executor=basic_executor, + max_retries=1, + extra_env={}, + ) + + with patch("nemo_run.core.execution.dgxcloud.fill_template") as mock_fill: + mock_fill.return_value = "script" + request.materialize() + + template_vars = mock_fill.call_args[0][1] + assert template_vars["env_vars"] == [] + + def test_materialize_uppercase_env_vars(self, basic_executor): + """Test that environment variable keys are uppercased.""" + from nemo_run.core.execution.dgxcloud import DGXCloudRequest + + request = DGXCloudRequest( + launch_cmd=["python", "train.py"], + jobs=["job1"], + executor=basic_executor, + max_retries=1, + extra_env={"lowercase_var": "value", "MixedCase": "value2"}, + ) + + with patch("nemo_run.core.execution.dgxcloud.fill_template") as mock_fill: + mock_fill.return_value = "script" + request.materialize() + + template_vars = mock_fill.call_args[0][1] + env_vars = template_vars["env_vars"] + + # Keys should be uppercased + assert "export LOWERCASE_VAR=value" in env_vars + assert "export MIXEDCASE=value2" in env_vars + + def test_repr(self, basic_executor): + """Test the __repr__ method.""" + from nemo_run.core.execution.dgxcloud import DGXCloudRequest + + request = DGXCloudRequest( + launch_cmd=["python", "train.py"], + jobs=["job1", "job2"], + executor=basic_executor, + max_retries=3, + extra_env={}, + ) + + with patch("nemo_run.core.execution.dgxcloud.fill_template") as mock_fill: + mock_fill.return_value = "#!/bin/bash\necho 'script content'" + repr_str = repr(request) + + assert "# DGXC Entrypoint Script Request" in repr_str + assert "# Executor: DGXCloudExecutor" in repr_str + assert "# Jobs: ['job1', 'job2']" in repr_str + assert "#!/bin/bash" in repr_str + assert "echo 'script content'" in repr_str + + def test_complex_launch_command(self, basic_executor): + """Test materialization with a complex multi-argument launch command.""" + from nemo_run.core.execution.dgxcloud import DGXCloudRequest + + request = DGXCloudRequest( + launch_cmd=[ + "torchrun", + "--nproc_per_node=8", + "--nnodes=2", + "train.py", + "--batch-size", + "32", + "--lr", + "0.001", + ], + jobs=["job1"], + executor=basic_executor, + max_retries=1, + extra_env={}, + ) + + with patch("nemo_run.core.execution.dgxcloud.fill_template") as mock_fill: + mock_fill.return_value = "script" + request.materialize() + + template_vars = mock_fill.call_args[0][1] + expected_cmd = ( + "torchrun --nproc_per_node=8 --nnodes=2 train.py --batch-size 32 --lr 0.001" + ) + assert template_vars["training_command"] == expected_cmd + + def test_max_retries_values(self, basic_executor): + """Test different max_retries values.""" + from nemo_run.core.execution.dgxcloud import DGXCloudRequest + + for retries in [0, 1, 10, 100]: + request = DGXCloudRequest( + launch_cmd=["python", "train.py"], + jobs=["job1"], + executor=basic_executor, + max_retries=retries, + extra_env={}, + ) + + with patch("nemo_run.core.execution.dgxcloud.fill_template") as mock_fill: + mock_fill.return_value = "script" + request.materialize() + + template_vars = mock_fill.call_args[0][1] + assert template_vars["max_retries"] == retries From 9ace74faad917ccb4ef90ca20d37b7b1677f8168 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?oliver=20k=C3=B6nig?= Date: Sat, 24 Jan 2026 02:10:05 +0000 Subject: [PATCH 29/31] fix MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit Signed-off-by: oliver könig --- nemo_run/core/execution/dgxcloud.py | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/nemo_run/core/execution/dgxcloud.py b/nemo_run/core/execution/dgxcloud.py index 1d17afdb..b6d5e855 100644 --- a/nemo_run/core/execution/dgxcloud.py +++ b/nemo_run/core/execution/dgxcloud.py @@ -603,7 +603,7 @@ def materialize(self) -> str: "max_retries": self.max_retries, "env_vars": env_vars, "training_command": " ".join(self.launch_cmd), - "ft_enabled": self.launcher and isinstance(self.launcher, FaultTolerance), + "ft_enabled": bool(self.launcher and isinstance(self.launcher, FaultTolerance)), } # 4. Fault Tolerance Injection From 7e6fdc397935b8ebabb03cd3dcf34d8bda9afca0 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?oliver=20k=C3=B6nig?= Date: Sat, 24 Jan 2026 02:15:36 +0000 Subject: [PATCH 30/31] tests MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit Signed-off-by: oliver könig --- test/core/execution/test_dgxcloud.py | 177 +++++++++++++++++++++++++++ 1 file changed, 177 insertions(+) diff --git a/test/core/execution/test_dgxcloud.py b/test/core/execution/test_dgxcloud.py index fbc719a1..ca4a5cf9 100644 --- a/test/core/execution/test_dgxcloud.py +++ b/test/core/execution/test_dgxcloud.py @@ -1144,6 +1144,183 @@ def test_default_headers_with_token(self): assert "Authorization" in headers assert headers["Authorization"] == "Bearer test_token" + def test_setup_launcher_no_launcher(self): + """Test _setup_launcher when no launcher is set.""" + executor = DGXCloudExecutor( + base_url="https://dgxapi.example.com", + kube_apiserver_url="https://127.0.0.1:443", + app_id="test_app_id", + app_secret="test_app_secret", + project_name="test_project", + container_image="nvcr.io/nvidia/test:latest", + pvc_nemo_run_dir="/workspace/nemo_run", + nprocs_per_node=8, + ) + + # Set up job details required by _setup_launcher + executor.job_name = "test_job" + executor.job_dir = "/workspace/test_job" + + with patch("nemo_run.core.execution.dgxcloud.CONSOLE"): + executor._setup_launcher() + + # When no launcher, ntasks_per_node should remain as nprocs_per_node + assert executor.ntasks_per_node == 8 + assert ( + not hasattr(executor, "torchrun_nproc_per_node") + or executor.torchrun_nproc_per_node is None + ) + + def test_setup_launcher_with_torchrun(self): + """Test _setup_launcher with Torchrun launcher.""" + from nemo_run.core.execution.launcher import Torchrun + + executor = DGXCloudExecutor( + base_url="https://dgxapi.example.com", + kube_apiserver_url="https://127.0.0.1:443", + app_id="test_app_id", + app_secret="test_app_secret", + project_name="test_project", + container_image="nvcr.io/nvidia/test:latest", + pvc_nemo_run_dir="/workspace/nemo_run", + nprocs_per_node=8, + launcher=Torchrun(), + ) + + executor.job_name = "test_job" + executor.job_dir = "/workspace/test_job" + + with patch("nemo_run.core.execution.dgxcloud.CONSOLE") as mock_console: + executor._setup_launcher() + + # With Torchrun, ntasks_per_node should be 1 and torchrun_nproc_per_node should be nprocs_per_node + assert executor.ntasks_per_node == 1 + assert executor.torchrun_nproc_per_node == 8 + mock_console.log.assert_called_once() + assert "Torchrun" in mock_console.log.call_args[0][0] + + def test_setup_launcher_with_fault_tolerance(self): + """Test _setup_launcher with FaultTolerance launcher.""" + from nemo_run.core.execution.launcher import FaultTolerance + + ft_launcher = FaultTolerance() + executor = DGXCloudExecutor( + base_url="https://dgxapi.example.com", + kube_apiserver_url="https://127.0.0.1:443", + app_id="test_app_id", + app_secret="test_app_secret", + project_name="test_project", + container_image="nvcr.io/nvidia/test:latest", + pvc_nemo_run_dir="/workspace/nemo_run", + nprocs_per_node=4, + launcher=ft_launcher, + ) + + executor.job_name = "my_ft_job" + executor.job_dir = "/workspace/jobs/my_ft_job" + + with patch("nemo_run.core.execution.dgxcloud.CONSOLE") as mock_console: + with patch("nemo_run.config.RUNDIR_NAME", "nemo_run"): + executor._setup_launcher() + + # Verify Torchrun settings + assert executor.ntasks_per_node == 1 + assert executor.torchrun_nproc_per_node == 4 + + # Verify FaultTolerance paths are set + assert ft_launcher.cfg_path == "/workspace/jobs/my_ft_job/my_ft_job/my_ft_job_ft_cfg.yml" + assert ft_launcher.finished_flag_file == "/nemo_run/my_ft_job_finished_flag" + assert ( + ft_launcher.job_results_file + == "/workspace/jobs/my_ft_job/my_ft_job/my_ft_job_job_results" + ) + + # Verify console log was called + mock_console.log.assert_called_once() + assert "FaultTolerance" in mock_console.log.call_args[0][0] + + def test_setup_launcher_fault_tolerance_paths(self): + """Test that FaultTolerance paths are correctly constructed.""" + from nemo_run.core.execution.launcher import FaultTolerance + + ft_launcher = FaultTolerance() + executor = DGXCloudExecutor( + base_url="https://dgxapi.example.com", + kube_apiserver_url="https://127.0.0.1:443", + app_id="test_app_id", + app_secret="test_app_secret", + project_name="test_project", + container_image="nvcr.io/nvidia/test:latest", + pvc_nemo_run_dir="/workspace/nemo_run", + launcher=ft_launcher, + ) + + executor.job_name = "test_training" + executor.job_dir = "/mnt/workspace/test_training" + + with patch("nemo_run.core.execution.dgxcloud.CONSOLE"): + with patch("nemo_run.config.RUNDIR_NAME", "custom_rundir"): + executor._setup_launcher() + + # Check path construction + base_dir = "/mnt/workspace/test_training/test_training" + assert ft_launcher.cfg_path == f"{base_dir}/test_training_ft_cfg.yml" + assert ft_launcher.finished_flag_file == "/custom_rundir/test_training_finished_flag" + assert ft_launcher.job_results_file == f"{base_dir}/test_training_job_results" + + def test_setup_launcher_with_different_nprocs(self): + """Test _setup_launcher with different nprocs_per_node values.""" + from nemo_run.core.execution.launcher import Torchrun + + for nprocs in [1, 2, 4, 8, 16]: + executor = DGXCloudExecutor( + base_url="https://dgxapi.example.com", + kube_apiserver_url="https://127.0.0.1:443", + app_id="test_app_id", + app_secret="test_app_secret", + project_name="test_project", + container_image="nvcr.io/nvidia/test:latest", + pvc_nemo_run_dir="/workspace/nemo_run", + nprocs_per_node=nprocs, + launcher=Torchrun(), + ) + + executor.job_name = "test_job" + executor.job_dir = "/workspace/test_job" + + with patch("nemo_run.core.execution.dgxcloud.CONSOLE"): + executor._setup_launcher() + + assert executor.torchrun_nproc_per_node == nprocs + assert executor.ntasks_per_node == 1 + + def test_setup_launcher_super_called(self): + """Test that _setup_launcher calls super()._setup_launcher().""" + from nemo_run.core.execution.launcher import Torchrun + + executor = DGXCloudExecutor( + base_url="https://dgxapi.example.com", + kube_apiserver_url="https://127.0.0.1:443", + app_id="test_app_id", + app_secret="test_app_secret", + project_name="test_project", + container_image="nvcr.io/nvidia/test:latest", + pvc_nemo_run_dir="/workspace/nemo_run", + launcher=Torchrun(), + ) + + executor.job_name = "test_job" + executor.job_dir = "/workspace/test_job" + + with patch("nemo_run.core.execution.dgxcloud.CONSOLE"): + with patch.object( + executor.__class__.__bases__[0], "_setup_launcher" + ) as mock_super_setup: + executor._setup_launcher() + + # Verify super() was called + mock_super_setup.assert_called_once() + class TestDGXCloudRequest: """Test DGXCloudRequest dataclass and its methods.""" From 46ebce688ae4cfeec4eea4b22395adee347f7dbb Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?oliver=20k=C3=B6nig?= Date: Sat, 24 Jan 2026 02:17:56 +0000 Subject: [PATCH 31/31] fix test MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit Signed-off-by: oliver könig --- test/core/execution/test_dgxcloud.py | 6 +++--- 1 file changed, 3 insertions(+), 3 deletions(-) diff --git a/test/core/execution/test_dgxcloud.py b/test/core/execution/test_dgxcloud.py index ca4a5cf9..49505c48 100644 --- a/test/core/execution/test_dgxcloud.py +++ b/test/core/execution/test_dgxcloud.py @@ -1164,8 +1164,8 @@ def test_setup_launcher_no_launcher(self): with patch("nemo_run.core.execution.dgxcloud.CONSOLE"): executor._setup_launcher() - # When no launcher, ntasks_per_node should remain as nprocs_per_node - assert executor.ntasks_per_node == 8 + # When no launcher, torchrun_nproc_per_node and ntasks_per_node should not be modified + # ntasks_per_node is only set when launcher is Torchrun or FaultTolerance assert ( not hasattr(executor, "torchrun_nproc_per_node") or executor.torchrun_nproc_per_node is None @@ -1259,7 +1259,7 @@ def test_setup_launcher_fault_tolerance_paths(self): executor.job_dir = "/mnt/workspace/test_training" with patch("nemo_run.core.execution.dgxcloud.CONSOLE"): - with patch("nemo_run.config.RUNDIR_NAME", "custom_rundir"): + with patch("nemo_run.core.execution.dgxcloud.RUNDIR_NAME", "custom_rundir"): executor._setup_launcher() # Check path construction