From ea59fa2e40a2fde370b99dae0b86a7e2292578cb Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?oliver=20k=C3=B6nig?= Date: Fri, 23 Jan 2026 15:58:27 +0000 Subject: [PATCH 1/6] fix: Search for incluster config if no kubeconfig is given MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit Signed-off-by: oliver könig --- nemo_run/run/ray/kuberay.py | 15 ++++++++++++++- 1 file changed, 14 insertions(+), 1 deletion(-) diff --git a/nemo_run/run/ray/kuberay.py b/nemo_run/run/ray/kuberay.py index 45ab1e94..bc0e8f86 100644 --- a/nemo_run/run/ray/kuberay.py +++ b/nemo_run/run/ray/kuberay.py @@ -60,7 +60,20 @@ class KubeRayCluster: def __post_init__(self) -> None: # noqa: D401 – simple verb is fine """Initialise Kubernetes API clients once the instance is created.""" # Load local kube-config once; the function returns *None* so we don't store it. - config.load_kube_config() + try: + config.load_kube_config() + except Exception as kube_config_error: + logger.error( + "Error loading kube-config: %s, trying with incluster config", kube_config_error + ) + try: + config.load_incluster_config() + except Exception as incluster_config_error: + logger.error( + "Error loading incluster config: %s, raising original error", + incluster_config_error, + ) + raise kube_config_error from incluster_config_error # The dedicated clients are what we interact with throughout the class # – separating CoreV1 for pods/services from CustomObjects for CRDs. From a0ffe7b03076df7a31aabb16062a139fb46ab64c Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?oliver=20k=C3=B6nig?= Date: Fri, 23 Jan 2026 17:07:21 +0000 Subject: [PATCH 2/6] fix MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit Signed-off-by: oliver könig --- nemo_run/run/ray/kuberay.py | 15 ++++++++++++++- 1 file changed, 14 insertions(+), 1 deletion(-) diff --git a/nemo_run/run/ray/kuberay.py b/nemo_run/run/ray/kuberay.py index bc0e8f86..e6efe7e4 100644 --- a/nemo_run/run/ray/kuberay.py +++ b/nemo_run/run/ray/kuberay.py @@ -745,7 +745,20 @@ class KubeRayJob: executor: KubeRayExecutor def __post_init__(self): - config.load_kube_config() + try: + config.load_kube_config() + except Exception as kube_config_error: + logger.error( + "Error loading kube-config: %s, trying with incluster config", kube_config_error + ) + try: + config.load_incluster_config() + except Exception as incluster_config_error: + logger.error( + "Error loading incluster config: %s, raising original error", + incluster_config_error, + ) + raise kube_config_error from incluster_config_error # Lazily create K8s API clients if not supplied self.api = client.CustomObjectsApi() From 8cf5f42e9b9b06f6aaa17eb00f70a9bf5351f367 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?oliver=20k=C3=B6nig?= Date: Fri, 23 Jan 2026 17:57:34 +0000 Subject: [PATCH 3/6] fix MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit Signed-off-by: oliver könig --- nemo_run/run/ray/job.py | 5 ++++- 1 file changed, 4 insertions(+), 1 deletion(-) diff --git a/nemo_run/run/ray/job.py b/nemo_run/run/ray/job.py index 295a7c1e..8c608a3f 100644 --- a/nemo_run/run/ray/job.py +++ b/nemo_run/run/ray/job.py @@ -94,7 +94,10 @@ def start( ) def stop(self, wait: bool = False) -> None: - self.backend.stop(wait=wait) # type: ignore[attr-defined] + if isinstance(self.backend, KubeRayJob): + self.backend.stop() # type: ignore[attr-defined] + else: + self.backend.stop(wait=wait) # type: ignore[attr-defined] def status(self, display: bool = True): return self.backend.status(display=display) # type: ignore[attr-defined] From 98faac1f7823a3b2444387c936cbec5771c6c565 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?oliver=20k=C3=B6nig?= Date: Sat, 24 Jan 2026 02:00:56 +0000 Subject: [PATCH 4/6] fix MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit Signed-off-by: oliver könig --- test/run/ray/test_kuberay.py | 194 +++++++++++++++++++++++++++++++++++ 1 file changed, 194 insertions(+) diff --git a/test/run/ray/test_kuberay.py b/test/run/ray/test_kuberay.py index 685451ed..5e568f4a 100644 --- a/test/run/ray/test_kuberay.py +++ b/test/run/ray/test_kuberay.py @@ -2074,3 +2074,197 @@ def test_cluster_create_without_lifecycle_kwargs(self, mock_k8s_clients): # Should create lifecycle_kwargs and succeed assert hasattr(executor, "lifecycle_kwargs") assert mock_api.create_namespaced_custom_object.called + + +class TestKubeConfigLoadingFallback: + """Test kube config loading with fallback to incluster config.""" + + def test_kuberay_cluster_kube_config_success(self): + """Test KubeRayCluster when kube config loads successfully.""" + with patch("nemo_run.run.ray.kuberay.config.load_kube_config") as mock_load_kube: + with patch("nemo_run.run.ray.kuberay.config.load_incluster_config") as mock_incluster: + with patch("nemo_run.run.ray.kuberay.client.CustomObjectsApi"): + with patch("nemo_run.run.ray.kuberay.client.CoreV1Api"): + with patch("nemo_run.run.ray.kuberay.get_user", return_value="testuser"): + executor = KubeRayExecutor(namespace="test-namespace") + # Create cluster to trigger __post_init__ which loads config + _ = KubeRayCluster(name="test-cluster", executor=executor) + + # Verify kube config was loaded and incluster was NOT called + assert mock_load_kube.call_count >= 1 + # incluster should not be called when kube config succeeds + mock_incluster.assert_not_called() + + def test_kuberay_cluster_fallback_to_incluster(self): + """Test KubeRayCluster falls back to incluster config when kube config fails.""" + kube_error = Exception("Kube config file not found") + + with patch( + "nemo_run.run.ray.kuberay.config.load_kube_config", side_effect=kube_error + ) as mock_load_kube: + with patch( + "nemo_run.run.ray.kuberay.config.load_incluster_config" + ) as mock_incluster: + with patch("nemo_run.run.ray.kuberay.client.CustomObjectsApi"): + with patch("nemo_run.run.ray.kuberay.client.CoreV1Api"): + with patch("nemo_run.run.ray.kuberay.get_user", return_value="testuser"): + executor = KubeRayExecutor(namespace="test-namespace") + # Create cluster to trigger __post_init__ which loads config + _ = KubeRayCluster(name="test-cluster", executor=executor) + + # Verify both were called + assert mock_load_kube.call_count >= 1 + assert mock_incluster.call_count >= 1 + + def test_kuberay_cluster_both_configs_fail(self): + """Test KubeRayCluster raises original error when both configs fail.""" + kube_error = Exception("Kube config file not found") + incluster_error = Exception("Not running inside a cluster") + + with patch( + "nemo_run.run.ray.kuberay.config.load_kube_config", side_effect=kube_error + ): + with patch( + "nemo_run.run.ray.kuberay.config.load_incluster_config", + side_effect=incluster_error, + ): + with pytest.raises(Exception) as exc_info: + with patch("nemo_run.run.ray.kuberay.get_user", return_value="testuser"): + executor = KubeRayExecutor(namespace="test-namespace") + KubeRayCluster(name="test-cluster", executor=executor) + + # Should raise the original kube config error (not the incluster error) + assert exc_info.value == kube_error + assert "Kube config file not found" in str(exc_info.value) + + def test_kuberay_job_kube_config_success(self): + """Test KubeRayJob when kube config loads successfully.""" + with patch("nemo_run.run.ray.kuberay.config.load_kube_config") as mock_load_kube: + with patch("nemo_run.run.ray.kuberay.config.load_incluster_config") as mock_incluster: + with patch("nemo_run.run.ray.kuberay.client.CustomObjectsApi"): + with patch("nemo_run.run.ray.kuberay.client.CoreV1Api"): + with patch("nemo_run.run.ray.kuberay.get_user", return_value="testuser"): + executor = KubeRayExecutor(namespace="test-namespace") + # Create job to trigger __post_init__ which loads config + _ = KubeRayJob(name="test-job", executor=executor) + + # Verify kube config was loaded + assert mock_load_kube.call_count >= 1 + # incluster should not be called when kube config succeeds + mock_incluster.assert_not_called() + + def test_kuberay_job_fallback_to_incluster(self): + """Test KubeRayJob falls back to incluster config when kube config fails.""" + kube_error = Exception("Kube config file not found") + + with patch( + "nemo_run.run.ray.kuberay.config.load_kube_config", side_effect=kube_error + ) as mock_load_kube: + with patch( + "nemo_run.run.ray.kuberay.config.load_incluster_config" + ) as mock_incluster: + with patch("nemo_run.run.ray.kuberay.client.CustomObjectsApi"): + with patch("nemo_run.run.ray.kuberay.client.CoreV1Api"): + with patch("nemo_run.run.ray.kuberay.get_user", return_value="testuser"): + executor = KubeRayExecutor(namespace="test-namespace") + # Create job to trigger __post_init__ which loads config + _ = KubeRayJob(name="test-job", executor=executor) + + # Verify both were called + assert mock_load_kube.call_count >= 1 + assert mock_incluster.call_count >= 1 + + def test_kuberay_job_both_configs_fail(self): + """Test KubeRayJob raises original error when both configs fail.""" + kube_error = Exception("Kube config file not found") + incluster_error = Exception("Not running inside a cluster") + + with patch( + "nemo_run.run.ray.kuberay.config.load_kube_config", side_effect=kube_error + ): + with patch( + "nemo_run.run.ray.kuberay.config.load_incluster_config", + side_effect=incluster_error, + ): + with pytest.raises(Exception) as exc_info: + with patch("nemo_run.run.ray.kuberay.get_user", return_value="testuser"): + executor = KubeRayExecutor(namespace="test-namespace") + KubeRayJob(name="test-job", executor=executor) + + # Should raise the original kube config error (not the incluster error) + assert exc_info.value == kube_error + assert "Kube config file not found" in str(exc_info.value) + + def test_executor_kube_config_success(self): + """Test KubeRayExecutor when kube config loads successfully.""" + with patch( + "nemo_run.core.execution.kuberay.config.load_kube_config" + ) as mock_load_kube: + with patch( + "nemo_run.core.execution.kuberay.config.load_incluster_config" + ) as mock_incluster: + with patch("nemo_run.core.execution.kuberay.client.CustomObjectsApi"): + with patch("nemo_run.core.execution.kuberay.client.CoreV1Api"): + # Create executor to trigger __post_init__ which loads config + _ = KubeRayExecutor(namespace="test-namespace") + + # Verify kube config was loaded and incluster was NOT called + mock_load_kube.assert_called_once() + mock_incluster.assert_not_called() + + def test_executor_fallback_to_incluster(self): + """Test KubeRayExecutor falls back to incluster config when kube config fails.""" + kube_error = Exception("Kube config file not found") + + with patch( + "nemo_run.core.execution.kuberay.config.load_kube_config", side_effect=kube_error + ) as mock_load_kube: + with patch( + "nemo_run.core.execution.kuberay.config.load_incluster_config" + ) as mock_incluster: + with patch("nemo_run.core.execution.kuberay.client.CustomObjectsApi"): + with patch("nemo_run.core.execution.kuberay.client.CoreV1Api"): + # Create executor to trigger __post_init__ which loads config + _ = KubeRayExecutor(namespace="test-namespace") + + # Verify fallback occurred + mock_load_kube.assert_called_once() + mock_incluster.assert_called_once() + + def test_executor_both_configs_fail(self): + """Test KubeRayExecutor raises original error when both configs fail.""" + kube_error = Exception("Kube config file not found") + incluster_error = Exception("Not running inside a cluster") + + with patch( + "nemo_run.core.execution.kuberay.config.load_kube_config", side_effect=kube_error + ): + with patch( + "nemo_run.core.execution.kuberay.config.load_incluster_config", + side_effect=incluster_error, + ): + with pytest.raises(Exception) as exc_info: + KubeRayExecutor(namespace="test-namespace") + + # Should raise the original kube config error (not the incluster error) + assert exc_info.value == kube_error + assert "Kube config file not found" in str(exc_info.value) + + def test_error_chaining_preserved(self): + """Test that error chaining is preserved (raise X from Y).""" + kube_error = Exception("Kube config file not found") + incluster_error = Exception("Not running inside a cluster") + + with patch( + "nemo_run.core.execution.kuberay.config.load_kube_config", side_effect=kube_error + ): + with patch( + "nemo_run.core.execution.kuberay.config.load_incluster_config", + side_effect=incluster_error, + ): + with pytest.raises(Exception) as exc_info: + KubeRayExecutor(namespace="test-namespace") + + # Verify error chaining (raise kube_error from incluster_error) + assert exc_info.value == kube_error + assert exc_info.value.__cause__ == incluster_error From d5038b6d7e882b4c606952d82678573717ebb8a2 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?oliver=20k=C3=B6nig?= Date: Sat, 24 Jan 2026 02:01:52 +0000 Subject: [PATCH 5/6] fix MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit Signed-off-by: oliver könig --- test/run/ray/test_kuberay.py | 63 +++--------------------------------- 1 file changed, 5 insertions(+), 58 deletions(-) diff --git a/test/run/ray/test_kuberay.py b/test/run/ray/test_kuberay.py index 5e568f4a..01a47f85 100644 --- a/test/run/ray/test_kuberay.py +++ b/test/run/ray/test_kuberay.py @@ -2195,75 +2195,22 @@ def test_kuberay_job_both_configs_fail(self): assert exc_info.value == kube_error assert "Kube config file not found" in str(exc_info.value) - def test_executor_kube_config_success(self): - """Test KubeRayExecutor when kube config loads successfully.""" - with patch( - "nemo_run.core.execution.kuberay.config.load_kube_config" - ) as mock_load_kube: - with patch( - "nemo_run.core.execution.kuberay.config.load_incluster_config" - ) as mock_incluster: - with patch("nemo_run.core.execution.kuberay.client.CustomObjectsApi"): - with patch("nemo_run.core.execution.kuberay.client.CoreV1Api"): - # Create executor to trigger __post_init__ which loads config - _ = KubeRayExecutor(namespace="test-namespace") - - # Verify kube config was loaded and incluster was NOT called - mock_load_kube.assert_called_once() - mock_incluster.assert_not_called() - - def test_executor_fallback_to_incluster(self): - """Test KubeRayExecutor falls back to incluster config when kube config fails.""" - kube_error = Exception("Kube config file not found") - - with patch( - "nemo_run.core.execution.kuberay.config.load_kube_config", side_effect=kube_error - ) as mock_load_kube: - with patch( - "nemo_run.core.execution.kuberay.config.load_incluster_config" - ) as mock_incluster: - with patch("nemo_run.core.execution.kuberay.client.CustomObjectsApi"): - with patch("nemo_run.core.execution.kuberay.client.CoreV1Api"): - # Create executor to trigger __post_init__ which loads config - _ = KubeRayExecutor(namespace="test-namespace") - - # Verify fallback occurred - mock_load_kube.assert_called_once() - mock_incluster.assert_called_once() - - def test_executor_both_configs_fail(self): - """Test KubeRayExecutor raises original error when both configs fail.""" - kube_error = Exception("Kube config file not found") - incluster_error = Exception("Not running inside a cluster") - - with patch( - "nemo_run.core.execution.kuberay.config.load_kube_config", side_effect=kube_error - ): - with patch( - "nemo_run.core.execution.kuberay.config.load_incluster_config", - side_effect=incluster_error, - ): - with pytest.raises(Exception) as exc_info: - KubeRayExecutor(namespace="test-namespace") - - # Should raise the original kube config error (not the incluster error) - assert exc_info.value == kube_error - assert "Kube config file not found" in str(exc_info.value) - def test_error_chaining_preserved(self): """Test that error chaining is preserved (raise X from Y).""" kube_error = Exception("Kube config file not found") incluster_error = Exception("Not running inside a cluster") with patch( - "nemo_run.core.execution.kuberay.config.load_kube_config", side_effect=kube_error + "nemo_run.run.ray.kuberay.config.load_kube_config", side_effect=kube_error ): with patch( - "nemo_run.core.execution.kuberay.config.load_incluster_config", + "nemo_run.run.ray.kuberay.config.load_incluster_config", side_effect=incluster_error, ): with pytest.raises(Exception) as exc_info: - KubeRayExecutor(namespace="test-namespace") + with patch("nemo_run.run.ray.kuberay.get_user", return_value="testuser"): + executor = KubeRayExecutor(namespace="test-namespace") + KubeRayJob(name="test-job", executor=executor) # Verify error chaining (raise kube_error from incluster_error) assert exc_info.value == kube_error From f6858cd2fdd15952f5d33294e13ca61b09769e19 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?oliver=20k=C3=B6nig?= Date: Sat, 24 Jan 2026 02:02:49 +0000 Subject: [PATCH 6/6] format MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit Signed-off-by: oliver könig --- test/run/ray/test_kuberay.py | 20 +++++--------------- 1 file changed, 5 insertions(+), 15 deletions(-) diff --git a/test/run/ray/test_kuberay.py b/test/run/ray/test_kuberay.py index 01a47f85..8ad66d6e 100644 --- a/test/run/ray/test_kuberay.py +++ b/test/run/ray/test_kuberay.py @@ -2102,9 +2102,7 @@ def test_kuberay_cluster_fallback_to_incluster(self): with patch( "nemo_run.run.ray.kuberay.config.load_kube_config", side_effect=kube_error ) as mock_load_kube: - with patch( - "nemo_run.run.ray.kuberay.config.load_incluster_config" - ) as mock_incluster: + with patch("nemo_run.run.ray.kuberay.config.load_incluster_config") as mock_incluster: with patch("nemo_run.run.ray.kuberay.client.CustomObjectsApi"): with patch("nemo_run.run.ray.kuberay.client.CoreV1Api"): with patch("nemo_run.run.ray.kuberay.get_user", return_value="testuser"): @@ -2121,9 +2119,7 @@ def test_kuberay_cluster_both_configs_fail(self): kube_error = Exception("Kube config file not found") incluster_error = Exception("Not running inside a cluster") - with patch( - "nemo_run.run.ray.kuberay.config.load_kube_config", side_effect=kube_error - ): + with patch("nemo_run.run.ray.kuberay.config.load_kube_config", side_effect=kube_error): with patch( "nemo_run.run.ray.kuberay.config.load_incluster_config", side_effect=incluster_error, @@ -2160,9 +2156,7 @@ def test_kuberay_job_fallback_to_incluster(self): with patch( "nemo_run.run.ray.kuberay.config.load_kube_config", side_effect=kube_error ) as mock_load_kube: - with patch( - "nemo_run.run.ray.kuberay.config.load_incluster_config" - ) as mock_incluster: + with patch("nemo_run.run.ray.kuberay.config.load_incluster_config") as mock_incluster: with patch("nemo_run.run.ray.kuberay.client.CustomObjectsApi"): with patch("nemo_run.run.ray.kuberay.client.CoreV1Api"): with patch("nemo_run.run.ray.kuberay.get_user", return_value="testuser"): @@ -2179,9 +2173,7 @@ def test_kuberay_job_both_configs_fail(self): kube_error = Exception("Kube config file not found") incluster_error = Exception("Not running inside a cluster") - with patch( - "nemo_run.run.ray.kuberay.config.load_kube_config", side_effect=kube_error - ): + with patch("nemo_run.run.ray.kuberay.config.load_kube_config", side_effect=kube_error): with patch( "nemo_run.run.ray.kuberay.config.load_incluster_config", side_effect=incluster_error, @@ -2200,9 +2192,7 @@ def test_error_chaining_preserved(self): kube_error = Exception("Kube config file not found") incluster_error = Exception("Not running inside a cluster") - with patch( - "nemo_run.run.ray.kuberay.config.load_kube_config", side_effect=kube_error - ): + with patch("nemo_run.run.ray.kuberay.config.load_kube_config", side_effect=kube_error): with patch( "nemo_run.run.ray.kuberay.config.load_incluster_config", side_effect=incluster_error,