Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
136 changes: 132 additions & 4 deletions openeo/udf/run_code.py
Original file line number Diff line number Diff line change
Expand Up @@ -3,13 +3,15 @@
Note: this module was initially developed under the ``openeo-udf`` project (https://github.com/Open-EO/openeo-udf)
"""

#%%

import functools
import inspect
import logging
import math
import pathlib
import re
from typing import Callable, List, Union
from typing import Callable, List, Union, Tuple

import numpy
import pandas
Expand All @@ -28,6 +30,22 @@

_log = logging.getLogger(__name__)

# UDF specifications
UDF_CATEGORIES = {
"timeseries": {
"function_names": ["apply_timeseries"],
"required_params": ["series", "context"],
},
"datacube": {
"function_names": ["apply_datacube", "apply_hypercube"],
"required_params": ["cube", "context"],
},
"vectorcube": {
"function_names": ["apply_vectorcube"],
"required_params": ["geometries", "cube", "context"],
}
}


def _build_default_execution_context():
# TODO: is it really necessary to "pre-load" these modules? Isn't user going to import them explicitly in their script anyway?
Expand Down Expand Up @@ -146,9 +164,118 @@ def apply_timeseries_generic(
udf_data.set_datacube_list(datacubes)
return udf_data

def _build_expected_signatures_message() -> str:
"""Build the 'expected signatures' part of error messages."""
message = "\nExpected signatures:\n"
for cat_name, cat_info in UDF_CATEGORIES.items():
for func_name in cat_info["function_names"]:
params = ", ".join(cat_info["required_params"])
message += f" - {func_name}({params})\n"
message += " - Or a single-parameter function for generic UDF"
return message


def validate_udf_signature(func: Callable, category: str) -> List[str]:
"""
Returns a list of error messages. If empty, the signature is valid.
"""
errors = []
try:
sig = inspect.signature(func)
params = sig.parameters
except Exception:
return ["Could not read function signature."]

# Check required parameters
required_params = UDF_CATEGORIES[category]["required_params"]
for param_name in required_params:
if param_name not in params:
errors.append(f"Missing required parameter: '{param_name}'")

return errors


def discover_udf(code: str) -> Tuple[Callable, str]:
"""
Analyzes code and provides specific error feedback.
Returns (function, category)
"""
module = load_module_from_string(code)
signature_errors = []

# Collect valid UDF candidate functions
functions = {}
for name, obj in module.items():
if not callable(obj):
continue
# Exclude classes (e.g., XarrayDataCube)
if inspect.isclass(obj):
continue
# Exclude built-in functions, keep only user-defined functions
if not inspect.isfunction(obj):
continue

functions[name] = obj

# Helper for building error messages
expected_sigs = _build_expected_signatures_message()

if not functions:
# No functions found at all
error_msg = "No function (apply_datacube, apply_timeseries, apply_vectorcube) found in UDF code."
error_msg += expected_sigs
raise OpenEoUdfException(error_msg)

# Check each function
for func_name, func in functions.items():
# Determine category based on function name
category = None
for cat_name, cat_info in UDF_CATEGORIES.items():
if func_name in cat_info["function_names"]:
category = cat_name
break

if category:
# Validate signature for specific UDF type
errors = validate_udf_signature(func, category)
if not errors:
_log.info(f"Found UDF '{func_name}' (category: {category})")
return func, category
else:
signature_errors.append(f"Function '{func_name}' failed validation: {', '.join(errors)}")

# Check for generic UDF (single parameter)
elif len(inspect.signature(func).parameters) == 1:
_log.info(f"Found generic UDF '{func_name}'")
return func, "generic"

# Functions were found, but none were valid
error_msg = "No valid UDF found.\n"

if signature_errors:
error_msg += "\nSignature errors:\n" + "\n".join(f" - {e}" for e in signature_errors)
error_msg += "\n"

# Optional: List the functions that were found but rejected
found_names = list(functions.keys())
if found_names:
error_msg += f"\nFound functions (none valid): {', '.join(found_names)}\n"

error_msg += expected_sigs

raise OpenEoUdfException(error_msg)


def run_udf_code(code: str, data: UdfData) -> UdfData:
# TODO: current implementation uses first match directly, first check for multiple matches?
try:
# This will give us better error messages if no UDF is found
discover_udf(code)
except OpenEoUdfException as e:
# Re-raise with the better error message
raise e

# SECOND: Run the original logic unchanged
module = load_module_from_string(code)
functions = ((k, v) for (k, v) in module.items() if callable(v))

Expand Down Expand Up @@ -179,7 +306,6 @@ def run_udf_code(code: str, data: UdfData) -> UdfData:
raise ValueError("The provided UDF expects exactly one datacube, but {c} were provided.".format(
c=len(data.get_datacube_list())
))
# TODO: also support calls without user context?
result_cube = func(cube=data.get_datacube_list()[0], context=data.user_context)
data.set_datacube_list([result_cube])
return data
Expand All @@ -194,7 +320,6 @@ def run_udf_code(code: str, data: UdfData) -> UdfData:
raise ValueError("The provided UDF expects exactly one datacube, but {c} were provided.".format(
c=len(data.get_datacube_list())
))
# TODO: also support calls without user context?
result_cube: xarray.DataArray = func(cube=data.get_datacube_list()[0].get_array(), context=data.user_context)
data.set_datacube_list([XarrayDataCube(result_cube)])
return data
Expand Down Expand Up @@ -223,7 +348,6 @@ def run_udf_code(code: str, data: UdfData) -> UdfData:
c=len(data.get_datacube_list())
)
)
# TODO: geopandas is optional dependency.
input_geoms = data.get_feature_collection_list()[0].data
input_cube = data.get_datacube_list()[0].get_array()
result_geoms, result_cube = func(geometries=input_geoms, cube=input_cube, context=data.user_context)
Expand All @@ -235,6 +359,7 @@ def run_udf_code(code: str, data: UdfData) -> UdfData:
func(data)
return data

# This should rarely be reached since discover_udf should have caught it
raise OpenEoUdfException("No UDF found.")


Expand Down Expand Up @@ -331,3 +456,6 @@ def apply_datacube(cube: xarray.DataArray, context: dict) -> xarray.DataArray:
)

return tomllib.loads(content).get("dependencies")



96 changes: 95 additions & 1 deletion tests/udf/test_run_code.py
Original file line number Diff line number Diff line change
Expand Up @@ -9,7 +9,8 @@
import xarray

from openeo import UDF
from openeo.udf import UdfData, XarrayDataCube

from openeo.udf import UdfData, XarrayDataCube, OpenEoUdfException
from openeo.udf._compat import FlimsyTomlParser
from openeo.udf.run_code import (
_annotation_is_pandas_series,
Expand All @@ -19,6 +20,7 @@
execute_local_udf,
extract_udf_dependencies,
run_udf_code,
discover_udf,
)

from .test_xarraydatacube import _build_xdc
Expand Down Expand Up @@ -336,6 +338,98 @@ def _is_package_available(name: str) -> bool:
# TODO: move this to a more general test utility module.
return importlib.util.find_spec(name) is not None

class TestsDiscoverUdf:

def test_discover_udf_empty_code(self):
"""Test: Code with no functions at all."""
udf_code = textwrap.dedent("""
# Just comments
x = 42
y = "no functions here"
""")

with pytest.raises(OpenEoUdfException) as exc_info:
discover_udf(udf_code)

error_msg = str(exc_info.value)
assert "No function (apply_datacube, apply_timeseries, apply_vectorcube) found" in error_msg
assert "Expected signatures:" in error_msg
# Verify it lists all expected signatures
assert "apply_timeseries(series, context)" in error_msg
assert "apply_datacube(cube, context)" in error_msg
assert "apply_hypercube(cube, context)" in error_msg
assert "apply_vectorcube(geometries, cube, context)" in error_msg


def test_discover_udf_wrong_function_name(self):
"""Test: Functions exist but with wrong names."""
udf_code = textwrap.dedent("""
def process_data(cube, context):
return cube

def my_custom_func(series, context):
return series

def transform_vector(geometries, cube, context):
return geometries, cube
""")

with pytest.raises(OpenEoUdfException) as exc_info:
discover_udf(udf_code)

error_msg = str(exc_info.value)
# Should show what we expected vs what we got
assert "Expected signatures:" in error_msg



def test_discover_udf_missing_required_param_timeseries(self):
"""Test: apply_timeseries missing 'series' parameter."""
udf_code = textwrap.dedent("""
import pandas as pd

def apply_timeseries(context: dict) -> pd.Series: # Missing 'series'
return pd.Series()
""")

with pytest.raises(OpenEoUdfException) as exc_info:
discover_udf(udf_code)

error_msg = str(exc_info.value)
assert "Function 'apply_timeseries' failed validation" in error_msg
assert "Missing required parameter: 'series'" in error_msg


def test_discover_udf_missing_required_param_datacube(self):
"""Test: apply_datacube missing 'context' parameter."""
udf_code = textwrap.dedent("""
def apply_datacube(cube): # Missing 'context'
return cube
""")

with pytest.raises(OpenEoUdfException) as exc_info:
discover_udf(udf_code)

error_msg = str(exc_info.value)
assert "Function 'apply_datacube' failed validation" in error_msg
assert "Missing required parameter: 'context'" in error_msg


def test_discover_udf_missing_required_param_vectorcube(self):
"""Test: apply_vectorcube missing 'geometries' parameter."""
udf_code = textwrap.dedent("""
def apply_vectorcube(cube, context): # Missing 'geometries'
return cube
""")

with pytest.raises(OpenEoUdfException) as exc_info:
discover_udf(udf_code)

error_msg = str(exc_info.value)
assert "Function 'apply_vectorcube' failed validation" in error_msg
assert "Missing required parameter: 'geometries'" in error_msg



class TestExtractUdfDependencies:

Expand Down