Skip to content
Merged
9 changes: 8 additions & 1 deletion CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -7,7 +7,14 @@ The format is based on [Keep a Changelog](http://keepachangelog.com/) and this p

## [Unreleased]

TODO: add at least one Added, Changed, Deprecated, Removed, Fixed or Security section
### Added

- TestWorkflowContext class to testing module for workflow context testing scenarios
- GraphParameterType: add QueryCatalog class to default list

### Changed

- TestExecutionContext now includes system and workflow contexts to match ExecutionContext specification


## [4.12.1] 2025-06-24 - shipped with DI v25.2.0
Expand Down
3 changes: 2 additions & 1 deletion cmem_plugin_base/dataintegration/parameter/graph.py
Original file line number Diff line number Diff line change
Expand Up @@ -31,7 +31,7 @@ def __init__(
:param show_di_graphs: show DI project graphs
:param show_system_graphs: show system graphs such as shape and query catalogs
:param classes: allowed classes of the shown graphs
- if None -> defaults to di:Dataset and void:Dataset
- if None -> defaults to di:Dataset, void:Dataset and shui:QueryCatalog
:param allow_only_autocompleted_values: allow entering new graph URLs
"""
self.show_di_graphs = show_di_graphs
Expand All @@ -44,6 +44,7 @@ def __init__(
self.classes = {
"https://vocab.eccenca.com/di/Dataset",
"http://rdfs.org/ns/void#Dataset",
"https://vocab.eccenca.com/shui/QueryCatalog",
}

def autocomplete(
Expand Down
170 changes: 170 additions & 0 deletions cmem_plugin_base/dataintegration/typed_entities/file.py
Original file line number Diff line number Diff line change
@@ -1,7 +1,11 @@
"""File entities"""

import gzip
import io
import zipfile
from abc import abstractmethod
from collections.abc import Iterator
from contextlib import contextmanager
from io import BytesIO
from pathlib import Path
from typing import IO
Expand All @@ -15,6 +19,83 @@
)


def _is_gzip(stream: io.BufferedReader) -> bool:
"""Check if a stream contains gzip-compressed data."""
head = stream.read(2)
stream.seek(0)
return head == b"\x1f\x8b"


def _prepare_stream_for_processing(
input_stream: IO[bytes],
) -> tuple[io.TextIOWrapper | IO[bytes], bool]:
"""Prepare a file stream for processing.

This utility function:
1. Detects if the stream is gzip compressed
2. Decompresses if needed
3. Detects if the content is text or binary
4. Returns appropriate stream wrapper

Args:
input_stream: The input stream to process (should be in binary mode)

Returns:
A tuple containing:
- The processed stream (TextIOWrapper for text, original stream for binary)
- Boolean indicating if the content is text (True) or binary (False)

"""
buffered = io.BufferedReader(input_stream) # type: ignore[type-var]

decompressed_stream = gzip.GzipFile(fileobj=buffered) if _is_gzip(buffered) else buffered # type: ignore[arg-type]

sample = decompressed_stream.read(1024)
decompressed_stream.seek(0)

try:
sample.decode("utf-8")
is_text = True
stream_for_processing = io.TextIOWrapper(decompressed_stream, encoding="utf-8")
except UnicodeDecodeError:
is_text = False
stream_for_processing = decompressed_stream # type: ignore[assignment]

return stream_for_processing, is_text


class _TextToBytesWrapper:
"""Helper class to wrap a text stream and provide a bytes interface."""

def __init__(self, text_stream: io.TextIOWrapper) -> None:
self._text_stream = text_stream

def read(self, size: int = -1) -> bytes:
"""Read and encode text as bytes."""
text_content = self._text_stream.read(size)
return text_content.encode("utf-8") if text_content else b""

def readline(self, size: int = -1) -> bytes:
"""Read a line and encode as bytes."""
text_line = self._text_stream.readline(size)
return text_line.encode("utf-8") if text_line else b""

def __iter__(self) -> Iterator[bytes]:
"""Iterate over lines as bytes."""
for line in self._text_stream:
yield line.encode("utf-8")

def close(self) -> None:
"""Close the underlying text stream."""
self._text_stream.close()

def __enter__(self) -> "_TextToBytesWrapper":
return self

def __exit__(self, *args: object) -> None:
self.close()


class File:
"""A file entity that can be held in a FileEntitySchema.

Expand All @@ -38,6 +119,95 @@ def read_stream(self, project_id: str) -> IO[bytes]:
Caller is responsible for closing the stream.
"""

def is_text(self, project_id: str) -> bool:
"""Check if the file contains text data.

Returns True if the file content can be decoded as UTF-8 text, False otherwise.
This method automatically handles gzip decompression if needed.
"""
with self.read_stream(project_id) as stream:
_, is_text = _prepare_stream_for_processing(stream)
return is_text

def is_bytes(self, project_id: str) -> bool:
"""Check if the file contains binary data.

Returns True if the file content is binary (cannot be decoded as UTF-8), False otherwise.
This method automatically handles gzip decompression if needed.
"""
return not self.is_text(project_id)

def read_text(self, project_id: str) -> str:
"""Read the file content as text.

Returns the file content as a string. Automatically handles gzip decompression if needed.
Raises UnicodeDecodeError if the file content is not valid UTF-8 text.
"""
with self.read_stream(project_id) as stream:
processed_stream, is_text = _prepare_stream_for_processing(stream)
if not is_text:
raise UnicodeDecodeError("utf-8", b"", 0, 0, "File content is not valid UTF-8 text")
return processed_stream.read() # type: ignore[return-value]

def read_bytes(self, project_id: str) -> bytes:
"""Read the file content as bytes.

Returns the file content as bytes. Automatically handles gzip decompression if needed.
"""
with self.read_stream(project_id) as stream:
processed_stream, is_text = _prepare_stream_for_processing(stream)
if is_text:
content = processed_stream.read() # type: ignore[attr-defined]
return content.encode("utf-8") if isinstance(content, str) else content
return processed_stream.read() # type: ignore[return-value]

@contextmanager
def text_stream(self, project_id: str) -> Iterator[io.TextIOWrapper]:
"""Get a text stream for memory-efficient processing.

Returns a context manager that yields a text stream for reading file content.
Automatically handles gzip decompression if needed.
Raises UnicodeDecodeError if the file content is not valid UTF-8 text.

Example:
```python
with file.text_stream(project_id) as stream:
for line in stream:
process_line(line)
```

"""
with self.read_stream(project_id) as raw_stream:
processed_stream, is_text = _prepare_stream_for_processing(raw_stream)
if not is_text:
raise UnicodeDecodeError("utf-8", b"", 0, 0, "File content is not valid UTF-8 text")
yield processed_stream # type: ignore[misc]

@contextmanager
def bytes_stream(self, project_id: str) -> Iterator[IO[bytes]]:
"""Get a binary stream for memory-efficient processing.

Returns a context manager that yields a binary stream for reading file content.
Automatically handles gzip decompression if needed.

Example:
```python
with file.bytes_stream(project_id) as stream:
while chunk := stream.read(8192):
process_chunk(chunk)
```

"""
with self.read_stream(project_id) as raw_stream:
processed_stream, is_text = _prepare_stream_for_processing(raw_stream)
if is_text:
# Convert text stream back to bytes for consistent API
text_stream = processed_stream # type: ignore[assignment]
# Create a bytes stream by encoding the text stream
yield _TextToBytesWrapper(text_stream) # type: ignore[arg-type,misc]
else:
yield processed_stream # type: ignore[misc]


class LocalFile(File):
"""A file that's located on the local file system."""
Expand Down
35 changes: 33 additions & 2 deletions cmem_plugin_base/testing.py
Original file line number Diff line number Diff line change
Expand Up @@ -6,12 +6,13 @@
- TestTaskContext: testing task context
- TestExecutionContext: testing execution context with task and user linkage
- TestSystemContext: testing system context with encryption/decryption placeholders
- TestWorkflowContext: testing workflow context with workflow ID and status

These classes are intended for use in unit tests and other testing scenarios where real
context objects are unavailable or unnecessary.
"""

from typing import ClassVar
from typing import ClassVar, Literal

from cmem.cmempy.api import get_token
from cmem.cmempy.config import get_oauth_default_credentials
Expand All @@ -23,6 +24,7 @@
SystemContext,
TaskContext,
UserContext,
WorkflowContext,
)


Expand Down Expand Up @@ -77,10 +79,17 @@ class TestExecutionContext(ExecutionContext):

__test__ = False

def __init__(self, project_id: str = "TestProject", task_id: str = "TestTask"):
def __init__(
self,
project_id: str = "TestProject",
task_id: str = "TestTask",
workflow_id: str = "TestWorkflow",
):
self.system = TestSystemContext()
self.report = ReportContext()
self.task = TestTaskContext(project_id=project_id, task_id=task_id)
self.user = TestUserContext()
self.workflow = TestWorkflowContext(workflow_id=workflow_id)


class TestSystemContext(SystemContext):
Expand All @@ -102,3 +111,25 @@ def encrypt(self, value: str) -> str:
def decrypt(self, value: str) -> str:
"""Decrypt a value."""
return value


class TestWorkflowContext(WorkflowContext):
"""Testing workflow context"""

__test__ = False

def __init__(
self,
workflow_id: str = "TestWorkflow",
status: Literal["Idle", "Waiting", "Running", "Canceling", "Finished"] = "Running",
):
self._workflow_id = workflow_id
self._status = status

def workflow_id(self) -> str:
"""Get the workflow ID."""
return self._workflow_id

def status(self) -> Literal["Idle", "Waiting", "Running", "Canceling", "Finished"]:
"""Get the workflow status."""
return self._status
Loading