Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
13 changes: 12 additions & 1 deletion src/dve/core_engine/exceptions.py
Original file line number Diff line number Diff line change
Expand Up @@ -14,7 +14,7 @@ def __init__(
self, error_message: str, *args: object, messages: Messages, entities: SparkEntities
) -> None:
super().__init__(error_message, *args)
self.error_messsage = error_message
self.error_message = error_message
"""The error message explaining the critical processing error."""
self.messages = messages
"""The messages gathered at the time the error was emitted."""
Expand All @@ -26,6 +26,17 @@ def critical_messages(self) -> Iterator[FeedbackMessage]:
"""Critical messages which caused the processing error."""
yield from filter(lambda message: message.is_critical, self.messages)

def to_feedback_message(self) -> FeedbackMessage:
"Convert to feedback message to write to json file"
return FeedbackMessage(
entity=None,
record=None,
failure_type="integrity",
error_type="processing",
error_location="Whole File",
error_message=self.error_message,
)


class EntityTypeMismatch(TypeError):
"""An exception emitted if entity type outputs from two collaborative objects are different."""
4 changes: 2 additions & 2 deletions src/dve/pipeline/duckdb_pipeline.py
Original file line number Diff line number Diff line change
Expand Up @@ -24,23 +24,23 @@ class DDBDVEPipeline(BaseDVEPipeline):
def __init__(
self,
audit_tables: DDBAuditingManager,
job_run_id: int,
connection: DuckDBPyConnection,
rules_path: Optional[URI],
processed_files_path: Optional[URI],
submitted_files_path: Optional[URI],
reference_data_loader: Optional[type[BaseRefDataLoader]] = None,
job_run_id: Optional[int] = None,
):
self._connection = connection
super().__init__(
audit_tables,
job_run_id,
DuckDBDataContract(connection=self._connection),
DuckDBStepImplementations.register_udfs(connection=self._connection),
rules_path,
processed_files_path,
submitted_files_path,
reference_data_loader,
job_run_id,
)

# pylint: disable=arguments-differ
Expand Down
94 changes: 94 additions & 0 deletions src/dve/pipeline/foundry_ddb_pipeline.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,94 @@
"""A duckdb pipeline for running on Foundry platform"""

from typing import Optional
from dve.core_engine.backends.implementations.duckdb.duckdb_helpers import duckdb_get_entity_count, duckdb_write_parquet
from dve.core_engine.backends.utilities import dump_errors
from dve.core_engine.models import SubmissionInfo
from dve.core_engine.type_hints import URI, Failed
from dve.pipeline.duckdb_pipeline import DDBDVEPipeline
from dve.pipeline.utils import SubmissionStatus
from dve.parser import file_handling as fh

class FoundryDDBPipeline(DDBDVEPipeline):
"""DuckDB pipeline for running on Foundry Platform.
Polymorphed to allow for exception handling when processing
single files sequentially through services."""

def persist_audit_records(self, submission_info: SubmissionInfo) -> URI:
"""Write out key audit relations to parquet for persisting to datasets"""
write_to = fh.joinuri(self.processed_files_path, submission_info.submission_id, "audit/")
self.write_parquet(
self._audit_tables._processing_status.get_relation(),
write_to + "processing_status.parquet",
)
self.write_parquet(
self._audit_tables._submission_statistics.get_relation(),
write_to + "submission_statistics.parquet",
)
return write_to

def file_transformation(
self, submission_info: SubmissionInfo
) -> SubmissionInfo | dict[str, str]:
try:
return super().file_transformation(submission_info)
except Exception as exc: # pylint: disable=W0718
self._logger.error(f"File transformation raised exception: {exc}")
self._logger.exception(exc)
return submission_info.dict()

def apply_data_contract(self, submission_info: SubmissionInfo) -> tuple[SubmissionInfo | bool]:
try:
return super().apply_data_contract(submission_info)
except Exception as exc: # pylint: disable=W0718
self._logger.error(f"Apply data contract raised exception: {exc}")
self._logger.exception(exc)
return submission_info, True

def apply_business_rules(self, submission_info: SubmissionInfo, failed: Failed):
try:
return super().apply_business_rules(submission_info, failed)
except Exception as exc: # pylint: disable=W0718
self._logger.error(f"Apply business rules raised exception: {exc}")
self._logger.exception(exc)
return submission_info, SubmissionStatus(failed=True)

def run_pipeline(self, submission_info: SubmissionInfo) -> tuple[Optional[URI], URI, URI]:
"""Sequential single submission pipeline runner"""
try:
sub_id: str = submission_info.submission_id
self._audit_tables.add_new_submissions(submissions=[submission_info])
self._audit_tables.mark_transform(submission_ids=[sub_id])
sub_info = self.file_transformation(submission_info=submission_info)
if isinstance(sub_info, SubmissionInfo):
self._audit_tables.mark_data_contract(submission_ids=[sub_id])
sub_info, failed = self.apply_data_contract(submission_info=submission_info)
self._audit_tables.mark_business_rules(submissions=[(sub_id, failed)])
sub_info, sub_status = self.apply_business_rules(
submission_info=submission_info, failed=failed
)
else:
sub_status = SubmissionStatus(failed=True)
self._audit_tables.mark_error_report(
submissions=[(sub_id, sub_status.submission_result)]
)
sub_info, sub_status, sub_stats, report_uri = self.error_report(
submission_info=submission_info, status=sub_status
)
self._audit_tables.add_submission_statistics_records(sub_stats=[sub_stats])
except Exception as err: # pylint: disable=W0718
self._logger.error(
f"During processing of submission_id: {sub_id}, the following exception was raised: {err}"
)
self._audit_tables.mark_failed(submissions=[sub_id])
finally:
audit_files_uri = self.persist_audit_records(submission_info=submission_info)
return (
(
None
if sub_status.failed
else fh.joinuri(self.processed_files_path, sub_id, "business_rules")
),
report_uri,
audit_files_uri,
)
52 changes: 36 additions & 16 deletions src/dve/pipeline/pipeline.py
Original file line number Diff line number Diff line change
Expand Up @@ -13,13 +13,19 @@
import polars as pl
from pydantic import validate_arguments

from dve.core_engine.exceptions import CriticalProcessingError
from dve.core_engine.message import FeedbackMessage
import dve.reporting.excel_report as er
from dve.core_engine.backends.base.auditing import BaseAuditingManager
from dve.core_engine.backends.base.contract import BaseDataContract
from dve.core_engine.backends.base.core import EntityManager
from dve.core_engine.backends.base.reference_data import BaseRefDataLoader
from dve.core_engine.backends.base.rules import BaseStepImplementations
from dve.core_engine.backends.exceptions import MessageBearingError
from dve.core_engine.backends.exceptions import (
BackendError,
MessageBearingError,
ReaderLacksEntityTypeSupport,
)
from dve.core_engine.backends.readers import BaseFileReader
from dve.core_engine.backends.types import EntityType
from dve.core_engine.backends.utilities import dump_errors, stringify_model
Expand All @@ -44,13 +50,13 @@ class BaseDVEPipeline:
def __init__(
self,
audit_tables: BaseAuditingManager,
job_run_id: int,
data_contract: BaseDataContract,
step_implementations: Optional[BaseStepImplementations[EntityType]],
rules_path: Optional[URI],
processed_files_path: Optional[URI],
submitted_files_path: Optional[URI],
reference_data_loader: Optional[type[BaseRefDataLoader]] = None,
job_run_id: Optional[int] = None,
):
self._submitted_files_path = submitted_files_path
self._processed_files_path = processed_files_path
Expand Down Expand Up @@ -265,30 +271,41 @@ def file_transformation(
if not self.processed_files_path:
raise AttributeError("processed files path not provided")

errors: list[FeedbackMessage] = []
submission_file_uri: URI = fh.joinuri(
self.processed_files_path,
submission_info.submission_id,
submission_info.file_name_with_ext,
)
try:
errors = self.write_file_to_parquet(
errors.extend(self.write_file_to_parquet(
submission_file_uri, submission_info, self.processed_files_path
)
if errors:
dump_errors(
fh.joinuri(self.processed_files_path, submission_info.submission_id),
"file_transformation",
errors,
)
return submission_info.dict()
return submission_info
except ValueError as exc:
self._logger.error(f"File transformation write_file_to_parquet raised error: {exc}")
return submission_info.dict()
except Exception as exc: # pylint: disable=broad-except
))

except MessageBearingError as exc:
self._logger.error(f"Unexpected file transformation error: {exc}")
self._logger.exception(exc)
errors.extend(exc.messages)

except BackendError as exc: # pylint: disable=broad-except
self._logger.error(f"Unexpected file transformation error: {exc}")
self._logger.exception(exc)
errors.extend([
CriticalProcessingError(
entities=None,
error_message=repr(exc),
messages=[],
).to_feedback_message()
])

if errors:
dump_errors(
fh.joinuri(self.processed_files_path, submission_info.submission_id),
"file_transformation",
errors,
)
return submission_info.dict()
return submission_info

def file_transformation_step(
self, pool: Executor, submissions_to_process: list[SubmissionInfo]
Expand Down Expand Up @@ -321,6 +338,7 @@ def file_transformation_step(
except Exception as exc: # pylint: disable=W0703
self._logger.error(f"File transformation raised exception: {exc}")
self._logger.exception(exc)
# TODO: write errors to file here (maybe processing errors - not to be seen by end user)
failed_processing.append(sub_info)
continue

Expand Down Expand Up @@ -423,6 +441,7 @@ def data_contract_step(
except Exception as exc: # pylint: disable=W0703
self._logger.error(f"Data Contract raised exception: {exc}")
self._logger.exception(exc)
# TODO: write errors to file here (maybe processing errors - not to be seen by end user)
failed_processing.append(sub_info)
continue

Expand Down Expand Up @@ -562,6 +581,7 @@ def business_rule_step(
except Exception as exc: # pylint: disable=W0703
self._logger.error(f"Business Rules raised exception: {exc}")
self._logger.exception(exc)
# TODO: write errors to file here (maybe processing errors - not to be seen by end user)
failed_processing.append(sub_info)
continue

Expand Down
4 changes: 2 additions & 2 deletions src/dve/pipeline/spark_pipeline.py
Original file line number Diff line number Diff line change
Expand Up @@ -26,23 +26,23 @@ class SparkDVEPipeline(BaseDVEPipeline):
def __init__(
self,
audit_tables: SparkAuditingManager,
job_run_id: int,
rules_path: Optional[URI],
processed_files_path: Optional[URI],
submitted_files_path: Optional[URI],
reference_data_loader: Optional[type[BaseRefDataLoader]] = None,
spark: Optional[SparkSession] = None,
job_run_id: Optional[int] = None,
):
self._spark = spark if spark else SparkSession.builder.getOrCreate()
super().__init__(
audit_tables,
job_run_id,
SparkDataContract(spark_session=self._spark),
SparkStepImplementations.register_udfs(self._spark),
rules_path,
processed_files_path,
submitted_files_path,
reference_data_loader,
job_run_id,
)

# pylint: disable=arguments-differ
Expand Down
1 change: 1 addition & 0 deletions tests/fixtures.py
Original file line number Diff line number Diff line change
Expand Up @@ -120,4 +120,5 @@ def temp_ddb_conn() -> Iterator[Tuple[Path, DuckDBPyConnection]]:
with tempfile.TemporaryDirectory(prefix="ddb_audit_testing") as tmp:
db_file = Path(tmp, db + ".duckdb")
conn = connect(database=db_file, read_only=False)

yield db_file, conn
Original file line number Diff line number Diff line change
Expand Up @@ -457,7 +457,7 @@ def test_one_to_one_join_multi_matches_raises(
new_columns={"satellites.name": "satellite"},
)
entities = EntityManager({"planets": planets_rel, "satellites": satellites_rel})
with pytest.raises(ValueError, match="Multiple matches for some records.+"):
with pytest.raises(ValueError, match="Multiple matches for some records.*"):
DUCKDB_STEP_BACKEND.one_to_one_join(entities, config=join)


Expand Down
7 changes: 7 additions & 0 deletions tests/test_pipeline/pipeline_helpers.py
Original file line number Diff line number Diff line change
Expand Up @@ -66,6 +66,13 @@ def planet_test_files() -> Iterator[str]:
shutil.copytree(get_test_file_path("planets/"), Path(tdir, "planets"))
yield tdir + "/planets"

@pytest.fixture(scope="function")
def movies_test_files() -> Iterator[str]:
clear_config_cache()
with tempfile.TemporaryDirectory() as tdir:
shutil.copytree(get_test_file_path("movies/"), Path(tdir, "movies"))
yield tdir + "/movies"


@pytest.fixture(scope="function")
def planet_data_after_file_transformation() -> Iterator[Tuple[SubmissionInfo, str]]:
Expand Down
1 change: 1 addition & 0 deletions tests/test_pipeline/test_duckdb_pipeline.py
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,7 @@

from concurrent.futures import ThreadPoolExecutor
from pathlib import Path
import shutil
from typing import Dict, Tuple
from uuid import uuid4

Expand Down
Loading
Loading