diff --git a/openeo/udf/run_code.py b/openeo/udf/run_code.py index fc673dbed..1c4f9077e 100644 --- a/openeo/udf/run_code.py +++ b/openeo/udf/run_code.py @@ -3,13 +3,15 @@ Note: this module was initially developed under the ``openeo-udf`` project (https://github.com/Open-EO/openeo-udf) """ +#%% + import functools import inspect import logging import math import pathlib import re -from typing import Callable, List, Union +from typing import Callable, List, Union, Tuple import numpy import pandas @@ -28,6 +30,22 @@ _log = logging.getLogger(__name__) +# UDF specifications +UDF_CATEGORIES = { + "timeseries": { + "function_names": ["apply_timeseries"], + "required_params": ["series", "context"], + }, + "datacube": { + "function_names": ["apply_datacube", "apply_hypercube"], + "required_params": ["cube", "context"], + }, + "vectorcube": { + "function_names": ["apply_vectorcube"], + "required_params": ["geometries", "cube", "context"], + } +} + def _build_default_execution_context(): # TODO: is it really necessary to "pre-load" these modules? Isn't user going to import them explicitly in their script anyway? @@ -146,9 +164,118 @@ def apply_timeseries_generic( udf_data.set_datacube_list(datacubes) return udf_data +def _build_expected_signatures_message() -> str: + """Build the 'expected signatures' part of error messages.""" + message = "\nExpected signatures:\n" + for cat_name, cat_info in UDF_CATEGORIES.items(): + for func_name in cat_info["function_names"]: + params = ", ".join(cat_info["required_params"]) + message += f" - {func_name}({params})\n" + message += " - Or a single-parameter function for generic UDF" + return message + + +def validate_udf_signature(func: Callable, category: str) -> List[str]: + """ + Returns a list of error messages. If empty, the signature is valid. + """ + errors = [] + try: + sig = inspect.signature(func) + params = sig.parameters + except Exception: + return ["Could not read function signature."] + + # Check required parameters + required_params = UDF_CATEGORIES[category]["required_params"] + for param_name in required_params: + if param_name not in params: + errors.append(f"Missing required parameter: '{param_name}'") + + return errors + + +def discover_udf(code: str) -> Tuple[Callable, str]: + """ + Analyzes code and provides specific error feedback. + Returns (function, category) + """ + module = load_module_from_string(code) + signature_errors = [] + + # Collect valid UDF candidate functions + functions = {} + for name, obj in module.items(): + if not callable(obj): + continue + # Exclude classes (e.g., XarrayDataCube) + if inspect.isclass(obj): + continue + # Exclude built-in functions, keep only user-defined functions + if not inspect.isfunction(obj): + continue + + functions[name] = obj + + # Helper for building error messages + expected_sigs = _build_expected_signatures_message() + + if not functions: + # No functions found at all + error_msg = "No function (apply_datacube, apply_timeseries, apply_vectorcube) found in UDF code." + error_msg += expected_sigs + raise OpenEoUdfException(error_msg) + + # Check each function + for func_name, func in functions.items(): + # Determine category based on function name + category = None + for cat_name, cat_info in UDF_CATEGORIES.items(): + if func_name in cat_info["function_names"]: + category = cat_name + break + + if category: + # Validate signature for specific UDF type + errors = validate_udf_signature(func, category) + if not errors: + _log.info(f"Found UDF '{func_name}' (category: {category})") + return func, category + else: + signature_errors.append(f"Function '{func_name}' failed validation: {', '.join(errors)}") + + # Check for generic UDF (single parameter) + elif len(inspect.signature(func).parameters) == 1: + _log.info(f"Found generic UDF '{func_name}'") + return func, "generic" + + # Functions were found, but none were valid + error_msg = "No valid UDF found.\n" + + if signature_errors: + error_msg += "\nSignature errors:\n" + "\n".join(f" - {e}" for e in signature_errors) + error_msg += "\n" + + # Optional: List the functions that were found but rejected + found_names = list(functions.keys()) + if found_names: + error_msg += f"\nFound functions (none valid): {', '.join(found_names)}\n" + + error_msg += expected_sigs + + raise OpenEoUdfException(error_msg) + def run_udf_code(code: str, data: UdfData) -> UdfData: # TODO: current implementation uses first match directly, first check for multiple matches? + try: + # This will give us better error messages if no UDF is found + discover_udf(code) + except OpenEoUdfException as e: + # Re-raise with the better error message + raise e + + # SECOND: Run the original logic unchanged module = load_module_from_string(code) functions = ((k, v) for (k, v) in module.items() if callable(v)) @@ -179,7 +306,6 @@ def run_udf_code(code: str, data: UdfData) -> UdfData: raise ValueError("The provided UDF expects exactly one datacube, but {c} were provided.".format( c=len(data.get_datacube_list()) )) - # TODO: also support calls without user context? result_cube = func(cube=data.get_datacube_list()[0], context=data.user_context) data.set_datacube_list([result_cube]) return data @@ -194,7 +320,6 @@ def run_udf_code(code: str, data: UdfData) -> UdfData: raise ValueError("The provided UDF expects exactly one datacube, but {c} were provided.".format( c=len(data.get_datacube_list()) )) - # TODO: also support calls without user context? result_cube: xarray.DataArray = func(cube=data.get_datacube_list()[0].get_array(), context=data.user_context) data.set_datacube_list([XarrayDataCube(result_cube)]) return data @@ -223,7 +348,6 @@ def run_udf_code(code: str, data: UdfData) -> UdfData: c=len(data.get_datacube_list()) ) ) - # TODO: geopandas is optional dependency. input_geoms = data.get_feature_collection_list()[0].data input_cube = data.get_datacube_list()[0].get_array() result_geoms, result_cube = func(geometries=input_geoms, cube=input_cube, context=data.user_context) @@ -235,6 +359,7 @@ def run_udf_code(code: str, data: UdfData) -> UdfData: func(data) return data + # This should rarely be reached since discover_udf should have caught it raise OpenEoUdfException("No UDF found.") @@ -331,3 +456,6 @@ def apply_datacube(cube: xarray.DataArray, context: dict) -> xarray.DataArray: ) return tomllib.loads(content).get("dependencies") + + + diff --git a/tests/udf/test_run_code.py b/tests/udf/test_run_code.py index 2579b574e..4061feb4e 100644 --- a/tests/udf/test_run_code.py +++ b/tests/udf/test_run_code.py @@ -9,7 +9,8 @@ import xarray from openeo import UDF -from openeo.udf import UdfData, XarrayDataCube + +from openeo.udf import UdfData, XarrayDataCube, OpenEoUdfException from openeo.udf._compat import FlimsyTomlParser from openeo.udf.run_code import ( _annotation_is_pandas_series, @@ -19,6 +20,7 @@ execute_local_udf, extract_udf_dependencies, run_udf_code, + discover_udf, ) from .test_xarraydatacube import _build_xdc @@ -336,6 +338,98 @@ def _is_package_available(name: str) -> bool: # TODO: move this to a more general test utility module. return importlib.util.find_spec(name) is not None +class TestsDiscoverUdf: + + def test_discover_udf_empty_code(self): + """Test: Code with no functions at all.""" + udf_code = textwrap.dedent(""" + # Just comments + x = 42 + y = "no functions here" + """) + + with pytest.raises(OpenEoUdfException) as exc_info: + discover_udf(udf_code) + + error_msg = str(exc_info.value) + assert "No function (apply_datacube, apply_timeseries, apply_vectorcube) found" in error_msg + assert "Expected signatures:" in error_msg + # Verify it lists all expected signatures + assert "apply_timeseries(series, context)" in error_msg + assert "apply_datacube(cube, context)" in error_msg + assert "apply_hypercube(cube, context)" in error_msg + assert "apply_vectorcube(geometries, cube, context)" in error_msg + + + def test_discover_udf_wrong_function_name(self): + """Test: Functions exist but with wrong names.""" + udf_code = textwrap.dedent(""" + def process_data(cube, context): + return cube + + def my_custom_func(series, context): + return series + + def transform_vector(geometries, cube, context): + return geometries, cube + """) + + with pytest.raises(OpenEoUdfException) as exc_info: + discover_udf(udf_code) + + error_msg = str(exc_info.value) + # Should show what we expected vs what we got + assert "Expected signatures:" in error_msg + + + + def test_discover_udf_missing_required_param_timeseries(self): + """Test: apply_timeseries missing 'series' parameter.""" + udf_code = textwrap.dedent(""" + import pandas as pd + + def apply_timeseries(context: dict) -> pd.Series: # Missing 'series' + return pd.Series() + """) + + with pytest.raises(OpenEoUdfException) as exc_info: + discover_udf(udf_code) + + error_msg = str(exc_info.value) + assert "Function 'apply_timeseries' failed validation" in error_msg + assert "Missing required parameter: 'series'" in error_msg + + + def test_discover_udf_missing_required_param_datacube(self): + """Test: apply_datacube missing 'context' parameter.""" + udf_code = textwrap.dedent(""" + def apply_datacube(cube): # Missing 'context' + return cube + """) + + with pytest.raises(OpenEoUdfException) as exc_info: + discover_udf(udf_code) + + error_msg = str(exc_info.value) + assert "Function 'apply_datacube' failed validation" in error_msg + assert "Missing required parameter: 'context'" in error_msg + + + def test_discover_udf_missing_required_param_vectorcube(self): + """Test: apply_vectorcube missing 'geometries' parameter.""" + udf_code = textwrap.dedent(""" + def apply_vectorcube(cube, context): # Missing 'geometries' + return cube + """) + + with pytest.raises(OpenEoUdfException) as exc_info: + discover_udf(udf_code) + + error_msg = str(exc_info.value) + assert "Function 'apply_vectorcube' failed validation" in error_msg + assert "Missing required parameter: 'geometries'" in error_msg + + class TestExtractUdfDependencies: