diff --git a/src/xpk/main.py b/src/xpk/main.py index 2ff998d15..299f99337 100644 --- a/src/xpk/main.py +++ b/src/xpk/main.py @@ -33,6 +33,7 @@ import argparse import argcomplete +import contextlib import sys from .parser.core import set_parser @@ -42,6 +43,7 @@ from .core.telemetry import MetricsCollector, send_clearcut_payload, should_send_telemetry from .utils.console import xpk_print, exit_code_to_int from .utils.execution_context import set_context +from .utils.kubectl import sandbox_kubeconfig ################### Compatibility Check ################### # Check that the user runs the below version or greater. @@ -85,7 +87,10 @@ def main() -> None: flags=retrieve_flags(main_args), ) print_xpk_hello() - main_args.func(main_args) + is_sandbox = main_args.sandbox_kubeconfig + opt_sandbox = sandbox_kubeconfig if is_sandbox else contextlib.nullcontext + with opt_sandbox(): + main_args.func(main_args) xpk_print('XPK Done.', flush=True) MetricsCollector.log_complete(0) except SystemExit as e: diff --git a/src/xpk/parser/common.py b/src/xpk/parser/common.py index 6619b11a4..815099a09 100644 --- a/src/xpk/parser/common.py +++ b/src/xpk/parser/common.py @@ -138,6 +138,13 @@ def add_shared_arguments( help='Disables prompting before unintended destructive actions.', required=required, ) + custom_parser_or_group.add_argument( + '--sandbox-kubeconfig', + action=argparse.BooleanOptionalAction, + default=False, + help='Whether to sandbox k8s config. (Experimental)', + required=required, + ) def add_cluster_arguments( diff --git a/src/xpk/utils/kubectl.py b/src/xpk/utils/kubectl.py index c48e093e5..24c992684 100644 --- a/src/xpk/utils/kubectl.py +++ b/src/xpk/utils/kubectl.py @@ -14,6 +14,11 @@ limitations under the License. """ +import contextlib +import os +import tempfile +from typing import Iterator + from kubernetes.client.exceptions import ApiException from kubernetes.dynamic import DynamicClient @@ -58,3 +63,36 @@ def apply_kubectl_manifest(client, manifest) -> int: xpk_print(f'Error applying {kind}: {e}') status_code = 1 return status_code + + +@contextlib.contextmanager +def _set_env(key: str, value: str) -> Iterator[None]: + environ = os.environ + + backup = environ.get(key) + environ[key] = value + try: + yield + finally: + if backup is None: + del environ[key] + else: + environ[key] = backup + + +@contextlib.contextmanager +def sandbox_kubeconfig() -> Iterator[None]: + """Context manager to use a temporary k8s config file. + + This ensures that xpk operations do not interfere with the user's default + k8s config file by limiting all operation into a temporary file for the + duration of the context. + + We use KUBECONFIG environment so it's process wide and not thread safe. + """ + + with ( + tempfile.TemporaryDirectory(prefix='xpk-kube-') as dir_name, + _set_env('KUBECONFIG', os.path.join(dir_name, 'config')), + ): + yield