diff --git a/openml/runs/functions.py b/openml/runs/functions.py index 503788dbd..f0d1657f6 100644 --- a/openml/runs/functions.py +++ b/openml/runs/functions.py @@ -7,7 +7,7 @@ from collections import OrderedDict from functools import partial from pathlib import Path -from typing import TYPE_CHECKING, Any +from typing import TYPE_CHECKING, Any, cast import numpy as np import pandas as pd @@ -54,6 +54,236 @@ ERROR_CODE = 512 +def _validate_flow_and_task_inputs( + flow: OpenMLFlow | OpenMLTask, + task: OpenMLTask | OpenMLFlow, + flow_tags: list[str] | None, +) -> tuple[OpenMLFlow, OpenMLTask]: + """Validate and normalize inputs for flow and task execution. + + Parameters + ---------- + flow : OpenMLFlow or OpenMLTask + The flow object (may be swapped with task for backward compatibility). + task : OpenMLTask or OpenMLFlow + The task object (may be swapped with flow for backward compatibility). + flow_tags : List[str] or None + A list of tags that the flow should have at creation. + + Returns + ------- + Tuple[OpenMLFlow, OpenMLTask] + The validated flow and task. + + Raises + ------ + ValueError + If flow_tags is not a list or task is not published. + """ + if flow_tags is not None and not isinstance(flow_tags, list): + raise ValueError("flow_tags should be a list") + + # TODO: At some point in the future do not allow for arguments in old order (changed 6-2018). + # Flexibility currently still allowed due to code-snippet in OpenML100 paper (3-2019). + if isinstance(flow, OpenMLTask) and isinstance(task, OpenMLFlow): + # We want to allow either order of argument (to avoid confusion). + warnings.warn( + "run_flow_on_task: the old argument order (task, flow) is deprecated and " + "will not be supported in the future. Please use the " + "order (flow, task).", + DeprecationWarning, + stacklevel=3, + ) + task, flow = flow, task + + if not isinstance(flow, OpenMLFlow): + raise TypeError( + f"run_flow_on_task: expected argument 'flow' to be OpenMLFlow, " + f"got {type(flow).__name__}", + ) + + if not isinstance(task, OpenMLTask): + raise TypeError( + f"run_flow_on_task: expected argument 'task' to be OpenMLTask, " + f"got {type(task).__name__}", + ) + + if task.task_id is None: + raise ValueError( + "run_flow_on_task: argument 'task.task_id' is None; task must be published on OpenML" + ) + + return flow, task + + +def _sync_flow_with_server( + flow: OpenMLFlow, + task: OpenMLTask, + *, + upload_flow: bool, + avoid_duplicate_runs: bool, +) -> int | None: + """Synchronize flow with server and check if setup/task combination is already present. + + Parameters + ---------- + flow : OpenMLFlow + The flow to synchronize. + task : OpenMLTask + The task to check for duplicate runs. + upload_flow : bool + Whether to upload the flow if it doesn't exist. + avoid_duplicate_runs : bool + Whether to check for duplicate runs. + + Returns + ------- + int or None + The flow_id if synced with server, None otherwise. + + Raises + ------ + PyOpenMLError + If flow_id mismatch or flow doesn't exist when expected. + OpenMLRunsExistError + If duplicate runs exist and avoid_duplicate_runs is True. + """ + # We only need to sync with the server right now if we want to upload the flow, + # or ensure no duplicate runs exist. Otherwise it can be synced at upload time. + flow_id = None + if not upload_flow and not avoid_duplicate_runs: + return flow_id + + flow_id = flow_exists(flow.name, flow.external_version) + if isinstance(flow.flow_id, int) and flow_id != flow.flow_id: + if flow_id is not False: + raise PyOpenMLError( + f"Local flow_id does not match server flow_id: '{flow.flow_id}' vs '{flow_id}'", + ) + raise PyOpenMLError("Flow does not exist on the server, but 'flow.flow_id' is not None.") + + if upload_flow and flow_id is False: + flow.publish() + return flow.flow_id + + if flow_id: + flow_from_server = get_flow(flow_id) + _copy_server_fields(flow_from_server, flow) + if avoid_duplicate_runs: + flow_from_server.model = flow.model + setup_id = setup_exists(flow_from_server) + ids = run_exists(cast("int", task.task_id), setup_id) + if ids: + error_message = "One or more runs of this setup were already performed on the task." + raise OpenMLRunsExistError(ids, error_message) + return flow_id + + # Flow does not exist on server and we do not want to upload it. + # No sync with the server happens. + return None + + +def _prepare_run_environment(flow: OpenMLFlow) -> tuple[list[str], list[str]]: + """Prepare run environment information and tags. + + Parameters + ---------- + flow : OpenMLFlow + The flow to get version information from. + + Returns + ------- + Tuple[List[str], List[str]] + A tuple of (tags, run_environment). + """ + run_environment = flow.extension.get_version_information() + tags = ["openml-python", run_environment[1]] + return tags, run_environment + + +def _create_run_from_results( # noqa: PLR0913 + task: OpenMLTask, + flow: OpenMLFlow, + flow_id: int | None, + data_content: list[list], + trace: OpenMLRunTrace | None, + fold_evaluations: OrderedDict[str, OrderedDict], + sample_evaluations: OrderedDict[str, OrderedDict], + tags: list[str], + run_environment: list[str], + upload_flow: bool, + avoid_duplicate_runs: bool, +) -> OpenMLRun: + """Create an OpenMLRun object from execution results. + + Parameters + ---------- + task : OpenMLTask + The task that was executed. + flow : OpenMLFlow + The flow that was executed. + flow_id : int or None + The flow ID if synced with server. + data_content : List[List] + The prediction data content. + trace : OpenMLRunTrace or None + The execution trace if available. + fold_evaluations : OrderedDict + The fold-based evaluation measures. + sample_evaluations : OrderedDict + The sample-based evaluation measures. + tags : List[str] + Tags to attach to the run. + run_environment : List[str] + Environment information. + upload_flow : bool + Whether the flow was uploaded. + avoid_duplicate_runs : bool + Whether duplicate runs were checked. + + Returns + ------- + OpenMLRun + The created run object. + """ + dataset = task.get_dataset() + task_id = cast("int", task.task_id) + dataset_id = dataset.dataset_id + model = flow.model + flow_name = flow.name + setup_string = flow.extension.create_setup_string(flow.model) + fields = [*run_environment, time.strftime("%c"), "Created by run_flow_on_task"] + generated_description = "\n".join(fields) + + run = OpenMLRun( + task_id=task_id, + flow_id=flow_id, + dataset_id=dataset_id, + model=model, + flow_name=flow_name, + tags=tags, + trace=trace, + data_content=data_content, + flow=flow, + setup_string=setup_string, + description_text=generated_description, + ) + + if (upload_flow or avoid_duplicate_runs) and flow.flow_id is not None: + # We only extract the parameter settings if a sync happened with the server. + # I.e. when the flow was uploaded or we found it in the avoid_duplicate check. + # Otherwise, we will do this at upload time. + run.parameter_settings = flow.extension.obtain_parameter_values(flow) + + # now we need to attach the detailed evaluations + if task.task_type_id == TaskType.LEARNING_CURVE: + run.sample_evaluations = sample_evaluations + else: + run.fold_evaluations = fold_evaluations + + return run + + # TODO(eddiebergman): Could potentially overload this but # it seems very big to do so def run_model_on_task( # noqa: PLR0913 @@ -175,7 +405,7 @@ def get_task_and_type_conversion(_task: int | str | OpenMLTask) -> OpenMLTask: return run -def run_flow_on_task( # noqa: C901, PLR0912, PLR0915, PLR0913 +def run_flow_on_task( # noqa: PLR0913 flow: OpenMLFlow, task: OpenMLTask, avoid_duplicate_runs: bool | None = None, @@ -222,71 +452,29 @@ def run_flow_on_task( # noqa: C901, PLR0912, PLR0915, PLR0913 run : OpenMLRun Result of the run. """ - if flow_tags is not None and not isinstance(flow_tags, list): - raise ValueError("flow_tags should be a list") - if avoid_duplicate_runs is None: avoid_duplicate_runs = openml.config.avoid_duplicate_runs - # TODO: At some point in the future do not allow for arguments in old order (changed 6-2018). - # Flexibility currently still allowed due to code-snippet in OpenML100 paper (3-2019). - if isinstance(flow, OpenMLTask) and isinstance(task, OpenMLFlow): - # We want to allow either order of argument (to avoid confusion). - warnings.warn( - "The old argument order (Flow, model) is deprecated and " - "will not be supported in the future. Please use the " - "order (model, Flow).", - DeprecationWarning, - stacklevel=2, - ) - task, flow = flow, task - - if task.task_id is None: - raise ValueError("The task should be published at OpenML") + # 1. Validate inputs + flow, task = _validate_flow_and_task_inputs(flow, task, flow_tags) + # 2. Prepare the model if flow.model is None: flow.model = flow.extension.flow_to_model(flow) - flow.model = flow.extension.seed_model(flow.model, seed=seed) - # We only need to sync with the server right now if we want to upload the flow, - # or ensure no duplicate runs exist. Otherwise it can be synced at upload time. - flow_id = None - if upload_flow or avoid_duplicate_runs: - flow_id = flow_exists(flow.name, flow.external_version) - if isinstance(flow.flow_id, int) and flow_id != flow.flow_id: - if flow_id is not False: - raise PyOpenMLError( - f"Local flow_id does not match server flow_id: '{flow.flow_id}' vs '{flow_id}'", - ) - raise PyOpenMLError( - "Flow does not exist on the server, but 'flow.flow_id' is not None." - ) - if upload_flow and flow_id is False: - flow.publish() - flow_id = flow.flow_id - elif flow_id: - flow_from_server = get_flow(flow_id) - _copy_server_fields(flow_from_server, flow) - if avoid_duplicate_runs: - flow_from_server.model = flow.model - setup_id = setup_exists(flow_from_server) - ids = run_exists(task.task_id, setup_id) - if ids: - error_message = ( - "One or more runs of this setup were already performed on the task." - ) - raise OpenMLRunsExistError(ids, error_message) - else: - # Flow does not exist on server and we do not want to upload it. - # No sync with the server happens. - flow_id = None - - dataset = task.get_dataset() + # 3. Sync with server and check for duplicates + flow_id = _sync_flow_with_server( + flow, + task, + upload_flow=upload_flow, + avoid_duplicate_runs=avoid_duplicate_runs, + ) - run_environment = flow.extension.get_version_information() - tags = ["openml-python", run_environment[1]] + # 4. Prepare run environment + tags, run_environment = _prepare_run_environment(flow) + # 5. Check if model is already fitted if flow.extension.check_if_model_fitted(flow.model): warnings.warn( "The model is already fitted! This might cause inconsistency in comparison of results.", @@ -294,8 +482,8 @@ def run_flow_on_task( # noqa: C901, PLR0912, PLR0915, PLR0913 stacklevel=2, ) - # execute the run - res = _run_task_get_arffcontent( + # 6. Execute the run (parallel processing happens here) + data_content, trace, fold_evaluations, sample_evaluations = _run_task_get_arffcontent( model=flow.model, task=task, extension=flow.extension, @@ -303,35 +491,22 @@ def run_flow_on_task( # noqa: C901, PLR0912, PLR0915, PLR0913 n_jobs=n_jobs, ) - data_content, trace, fold_evaluations, sample_evaluations = res - fields = [*run_environment, time.strftime("%c"), "Created by run_flow_on_task"] - generated_description = "\n".join(fields) - run = OpenMLRun( - task_id=task.task_id, + # 7. Create run from results + run = _create_run_from_results( + task=task, + flow=flow, flow_id=flow_id, - dataset_id=dataset.dataset_id, - model=flow.model, - flow_name=flow.name, - tags=tags, - trace=trace, data_content=data_content, - flow=flow, - setup_string=flow.extension.create_setup_string(flow.model), - description_text=generated_description, + trace=trace, + fold_evaluations=fold_evaluations, + sample_evaluations=sample_evaluations, + tags=tags, + run_environment=run_environment, + upload_flow=upload_flow, + avoid_duplicate_runs=avoid_duplicate_runs, ) - if (upload_flow or avoid_duplicate_runs) and flow.flow_id is not None: - # We only extract the parameter settings if a sync happened with the server. - # I.e. when the flow was uploaded or we found it in the avoid_duplicate check. - # Otherwise, we will do this at upload time. - run.parameter_settings = flow.extension.obtain_parameter_values(flow) - - # now we need to attach the detailed evaluations - if task.task_type_id == TaskType.LEARNING_CURVE: - run.sample_evaluations = sample_evaluations - else: - run.fold_evaluations = fold_evaluations - + # 8. Log completion message if flow_id: message = f"Executed Task {task.task_id} with Flow id:{run.flow_id}" else: diff --git a/tests/test_runs/test_run_functions.py b/tests/test_runs/test_run_functions.py index 8f2c505b7..705b32315 100644 --- a/tests/test_runs/test_run_functions.py +++ b/tests/test_runs/test_run_functions.py @@ -7,6 +7,7 @@ import time import unittest import warnings +from collections import OrderedDict from openml_sklearn import SklearnExtension, cat, cont from packaging.version import Version @@ -42,7 +43,9 @@ ) #from openml.extensions.sklearn import cat, cont from openml.runs.functions import ( + _create_run_from_results, _run_task_get_arffcontent, + _sync_flow_with_server, delete_run, format_prediction, run_exists, @@ -111,6 +114,61 @@ def setUp(self): super().setUp() self.extension = SklearnExtension() + def test_sync_flow_with_server_returns_early_without_sync_flags(self): + flow = mock.MagicMock() + flow.name = "dummy-flow" + flow.external_version = "1" + flow.flow_id = None + task = mock.MagicMock() + + with mock.patch("openml.runs.functions.flow_exists") as flow_exists_mock: + flow_id = _sync_flow_with_server( + flow=flow, + task=task, + upload_flow=False, + avoid_duplicate_runs=False, + ) + + assert flow_id is None + flow_exists_mock.assert_not_called() + + def test_create_run_from_results_keeps_required_constructor_fields(self): + model = object() + flow = mock.MagicMock() + flow.model = model + flow.name = "dummy-flow" + flow.flow_id = None + flow.extension.create_setup_string.return_value = "dummy-setup-string" + + task = mock.MagicMock() + task.task_id = 123 + task.task_type_id = TaskType.SUPERVISED_CLASSIFICATION + dataset = mock.MagicMock() + dataset.dataset_id = 456 + task.get_dataset.return_value = dataset + + run = _create_run_from_results( + task=task, + flow=flow, + flow_id=None, + data_content=[[0, 0, 0, "prediction", "truth"]], + trace=None, + fold_evaluations=OrderedDict(), + sample_evaluations=OrderedDict(), + tags=["openml-python", "python"], + run_environment=["python", "3.11"], + upload_flow=False, + avoid_duplicate_runs=False, + ) + + assert run.task_id == 123 + assert run.dataset_id == 456 + assert run.model is model + assert run.flow_name == "dummy-flow" + assert run.setup_string == "dummy-setup-string" + assert run.description_text is not None + assert "Created by run_flow_on_task" in run.description_text + def _wait_for_processed_run(self, run_id, max_waiting_time_seconds): # it can take a while for a run to be processed on the OpenML (test) # server however, sometimes it is good to wait (a bit) for this, to @@ -1096,6 +1154,57 @@ def test_local_run_metric_score(self): self._test_local_evaluations(run) + @pytest.mark.sklearn() + @pytest.mark.uses_test_server() + def test_run_flow_on_task_basic(self): + """Test that run_flow_on_task executes successfully with basic flow and task.""" + clf = Pipeline( + steps=[ + ("imputer", SimpleImputer(strategy="most_frequent")), + ("encoder", OneHotEncoder(handle_unknown="ignore")), + ("estimator", RandomForestClassifier(n_estimators=5, random_state=42)), + ], + ) + flow = self.extension.model_to_flow(clf) + task = openml.tasks.get_task(119) # diabetes; holdout + + run = openml.runs.run_flow_on_task( + flow=flow, + task=task, + upload_flow=False, + ) + + assert run.task_id == task.task_id + assert run.flow_name == flow.name + assert run.dataset_id == task.dataset_id + assert run.data_content is not None + assert len(run.data_content) > 0 + + TestBase._mark_entity_for_removal("run", run.run_id) + TestBase.logger.info(f"collected from test_run_flow_on_task_basic: {run.run_id}") + + @pytest.mark.sklearn() + @pytest.mark.uses_test_server() + def test_run_flow_on_task_with_flow_tags(self): + """Test run_flow_on_task with custom flow tags (for the flow, not the run).""" + clf = RandomForestClassifier(n_estimators=5, random_state=42) + flow = self.extension.model_to_flow(clf) + task = openml.tasks.get_task(119) + + run = openml.runs.run_flow_on_task( + flow=flow, + task=task, + flow_tags=["test_flow_tag_1", "test_flow_tag_2"], + upload_flow=False, + ) + + assert run.task_id == task.task_id + assert run.flow_name == flow.name + assert run.data_content is not None + + TestBase._mark_entity_for_removal("run", run.run_id) + TestBase.logger.info(f"collected from test_run_flow_on_task_with_flow_tags: {run.run_id}") + @pytest.mark.production() def test_online_run_metric_score(self): self.use_production_server()