diff --git a/nemo_run/run/ray/job.py b/nemo_run/run/ray/job.py index 295a7c1e..8c608a3f 100644 --- a/nemo_run/run/ray/job.py +++ b/nemo_run/run/ray/job.py @@ -94,7 +94,10 @@ def start( ) def stop(self, wait: bool = False) -> None: - self.backend.stop(wait=wait) # type: ignore[attr-defined] + if isinstance(self.backend, KubeRayJob): + self.backend.stop() # type: ignore[attr-defined] + else: + self.backend.stop(wait=wait) # type: ignore[attr-defined] def status(self, display: bool = True): return self.backend.status(display=display) # type: ignore[attr-defined] diff --git a/nemo_run/run/ray/kuberay.py b/nemo_run/run/ray/kuberay.py index 45ab1e94..e6efe7e4 100644 --- a/nemo_run/run/ray/kuberay.py +++ b/nemo_run/run/ray/kuberay.py @@ -60,7 +60,20 @@ class KubeRayCluster: def __post_init__(self) -> None: # noqa: D401 – simple verb is fine """Initialise Kubernetes API clients once the instance is created.""" # Load local kube-config once; the function returns *None* so we don't store it. - config.load_kube_config() + try: + config.load_kube_config() + except Exception as kube_config_error: + logger.error( + "Error loading kube-config: %s, trying with incluster config", kube_config_error + ) + try: + config.load_incluster_config() + except Exception as incluster_config_error: + logger.error( + "Error loading incluster config: %s, raising original error", + incluster_config_error, + ) + raise kube_config_error from incluster_config_error # The dedicated clients are what we interact with throughout the class # – separating CoreV1 for pods/services from CustomObjects for CRDs. @@ -732,7 +745,20 @@ class KubeRayJob: executor: KubeRayExecutor def __post_init__(self): - config.load_kube_config() + try: + config.load_kube_config() + except Exception as kube_config_error: + logger.error( + "Error loading kube-config: %s, trying with incluster config", kube_config_error + ) + try: + config.load_incluster_config() + except Exception as incluster_config_error: + logger.error( + "Error loading incluster config: %s, raising original error", + incluster_config_error, + ) + raise kube_config_error from incluster_config_error # Lazily create K8s API clients if not supplied self.api = client.CustomObjectsApi() diff --git a/test/run/ray/test_kuberay.py b/test/run/ray/test_kuberay.py index 685451ed..8ad66d6e 100644 --- a/test/run/ray/test_kuberay.py +++ b/test/run/ray/test_kuberay.py @@ -2074,3 +2074,134 @@ def test_cluster_create_without_lifecycle_kwargs(self, mock_k8s_clients): # Should create lifecycle_kwargs and succeed assert hasattr(executor, "lifecycle_kwargs") assert mock_api.create_namespaced_custom_object.called + + +class TestKubeConfigLoadingFallback: + """Test kube config loading with fallback to incluster config.""" + + def test_kuberay_cluster_kube_config_success(self): + """Test KubeRayCluster when kube config loads successfully.""" + with patch("nemo_run.run.ray.kuberay.config.load_kube_config") as mock_load_kube: + with patch("nemo_run.run.ray.kuberay.config.load_incluster_config") as mock_incluster: + with patch("nemo_run.run.ray.kuberay.client.CustomObjectsApi"): + with patch("nemo_run.run.ray.kuberay.client.CoreV1Api"): + with patch("nemo_run.run.ray.kuberay.get_user", return_value="testuser"): + executor = KubeRayExecutor(namespace="test-namespace") + # Create cluster to trigger __post_init__ which loads config + _ = KubeRayCluster(name="test-cluster", executor=executor) + + # Verify kube config was loaded and incluster was NOT called + assert mock_load_kube.call_count >= 1 + # incluster should not be called when kube config succeeds + mock_incluster.assert_not_called() + + def test_kuberay_cluster_fallback_to_incluster(self): + """Test KubeRayCluster falls back to incluster config when kube config fails.""" + kube_error = Exception("Kube config file not found") + + with patch( + "nemo_run.run.ray.kuberay.config.load_kube_config", side_effect=kube_error + ) as mock_load_kube: + with patch("nemo_run.run.ray.kuberay.config.load_incluster_config") as mock_incluster: + with patch("nemo_run.run.ray.kuberay.client.CustomObjectsApi"): + with patch("nemo_run.run.ray.kuberay.client.CoreV1Api"): + with patch("nemo_run.run.ray.kuberay.get_user", return_value="testuser"): + executor = KubeRayExecutor(namespace="test-namespace") + # Create cluster to trigger __post_init__ which loads config + _ = KubeRayCluster(name="test-cluster", executor=executor) + + # Verify both were called + assert mock_load_kube.call_count >= 1 + assert mock_incluster.call_count >= 1 + + def test_kuberay_cluster_both_configs_fail(self): + """Test KubeRayCluster raises original error when both configs fail.""" + kube_error = Exception("Kube config file not found") + incluster_error = Exception("Not running inside a cluster") + + with patch("nemo_run.run.ray.kuberay.config.load_kube_config", side_effect=kube_error): + with patch( + "nemo_run.run.ray.kuberay.config.load_incluster_config", + side_effect=incluster_error, + ): + with pytest.raises(Exception) as exc_info: + with patch("nemo_run.run.ray.kuberay.get_user", return_value="testuser"): + executor = KubeRayExecutor(namespace="test-namespace") + KubeRayCluster(name="test-cluster", executor=executor) + + # Should raise the original kube config error (not the incluster error) + assert exc_info.value == kube_error + assert "Kube config file not found" in str(exc_info.value) + + def test_kuberay_job_kube_config_success(self): + """Test KubeRayJob when kube config loads successfully.""" + with patch("nemo_run.run.ray.kuberay.config.load_kube_config") as mock_load_kube: + with patch("nemo_run.run.ray.kuberay.config.load_incluster_config") as mock_incluster: + with patch("nemo_run.run.ray.kuberay.client.CustomObjectsApi"): + with patch("nemo_run.run.ray.kuberay.client.CoreV1Api"): + with patch("nemo_run.run.ray.kuberay.get_user", return_value="testuser"): + executor = KubeRayExecutor(namespace="test-namespace") + # Create job to trigger __post_init__ which loads config + _ = KubeRayJob(name="test-job", executor=executor) + + # Verify kube config was loaded + assert mock_load_kube.call_count >= 1 + # incluster should not be called when kube config succeeds + mock_incluster.assert_not_called() + + def test_kuberay_job_fallback_to_incluster(self): + """Test KubeRayJob falls back to incluster config when kube config fails.""" + kube_error = Exception("Kube config file not found") + + with patch( + "nemo_run.run.ray.kuberay.config.load_kube_config", side_effect=kube_error + ) as mock_load_kube: + with patch("nemo_run.run.ray.kuberay.config.load_incluster_config") as mock_incluster: + with patch("nemo_run.run.ray.kuberay.client.CustomObjectsApi"): + with patch("nemo_run.run.ray.kuberay.client.CoreV1Api"): + with patch("nemo_run.run.ray.kuberay.get_user", return_value="testuser"): + executor = KubeRayExecutor(namespace="test-namespace") + # Create job to trigger __post_init__ which loads config + _ = KubeRayJob(name="test-job", executor=executor) + + # Verify both were called + assert mock_load_kube.call_count >= 1 + assert mock_incluster.call_count >= 1 + + def test_kuberay_job_both_configs_fail(self): + """Test KubeRayJob raises original error when both configs fail.""" + kube_error = Exception("Kube config file not found") + incluster_error = Exception("Not running inside a cluster") + + with patch("nemo_run.run.ray.kuberay.config.load_kube_config", side_effect=kube_error): + with patch( + "nemo_run.run.ray.kuberay.config.load_incluster_config", + side_effect=incluster_error, + ): + with pytest.raises(Exception) as exc_info: + with patch("nemo_run.run.ray.kuberay.get_user", return_value="testuser"): + executor = KubeRayExecutor(namespace="test-namespace") + KubeRayJob(name="test-job", executor=executor) + + # Should raise the original kube config error (not the incluster error) + assert exc_info.value == kube_error + assert "Kube config file not found" in str(exc_info.value) + + def test_error_chaining_preserved(self): + """Test that error chaining is preserved (raise X from Y).""" + kube_error = Exception("Kube config file not found") + incluster_error = Exception("Not running inside a cluster") + + with patch("nemo_run.run.ray.kuberay.config.load_kube_config", side_effect=kube_error): + with patch( + "nemo_run.run.ray.kuberay.config.load_incluster_config", + side_effect=incluster_error, + ): + with pytest.raises(Exception) as exc_info: + with patch("nemo_run.run.ray.kuberay.get_user", return_value="testuser"): + executor = KubeRayExecutor(namespace="test-namespace") + KubeRayJob(name="test-job", executor=executor) + + # Verify error chaining (raise kube_error from incluster_error) + assert exc_info.value == kube_error + assert exc_info.value.__cause__ == incluster_error