From 227dd3533551ac293e6642396f7fc2a403e1bb90 Mon Sep 17 00:00:00 2001 From: idelcano Date: Thu, 20 Nov 2025 20:03:29 +0100 Subject: [PATCH 1/4] add python skeleton --- DHIS2/python_skeleton/.gitignore | 3 + .../create_missing_values_use_case.py | 241 ++++++++++++++++++ DHIS2/python_skeleton/dhis_utils.py | 33 +++ DHIS2/python_skeleton/file_utils.py | 207 +++++++++++++++ DHIS2/python_skeleton/get_files_util.py | 47 ++++ DHIS2/python_skeleton/main_skeleton.py | 64 +++++ 6 files changed, 595 insertions(+) create mode 100644 DHIS2/python_skeleton/.gitignore create mode 100644 DHIS2/python_skeleton/create_missing_values_use_case.py create mode 100644 DHIS2/python_skeleton/dhis_utils.py create mode 100644 DHIS2/python_skeleton/file_utils.py create mode 100644 DHIS2/python_skeleton/get_files_util.py create mode 100644 DHIS2/python_skeleton/main_skeleton.py diff --git a/DHIS2/python_skeleton/.gitignore b/DHIS2/python_skeleton/.gitignore new file mode 100644 index 00000000..66448356 --- /dev/null +++ b/DHIS2/python_skeleton/.gitignore @@ -0,0 +1,3 @@ +.env +input/ +output/ \ No newline at end of file diff --git a/DHIS2/python_skeleton/create_missing_values_use_case.py b/DHIS2/python_skeleton/create_missing_values_use_case.py new file mode 100644 index 00000000..ef3da09b --- /dev/null +++ b/DHIS2/python_skeleton/create_missing_values_use_case.py @@ -0,0 +1,241 @@ +# create_missing_values_use_case.py + +from typing import Optional, Tuple + +import requests + +from dhis_utils import dhis_get +from file_utils import read_csv, escape_sql_literal, write_text + + +# Numeric ID of the attribute in trackedentityattributevalue table +TRACKED_ENTITY_ATTRIBUTE_ID = 11364749 +# Optional: UID of the attribute, for documentation/reference +TRACKED_ENTITY_ATTRIBUTE_UID = "Nf2VUgxqhmi" + + +class CreateMissingValuesUseCase: + """ + Use case to generate INSERT statements for missing attribute values + based on existing TEI metadata (created, lastUpdated, storedBy). + """ + + def __init__( + self, + base_url: str, + jsessionid: str, + input_path: str, + output_path: str, + ): + """ + Args: + base_url: DHIS2 base URL (without trailing slash). + jsessionid: JSESSIONID cookie value. + input_path: CSV file name (relative to 'input/' folder). + output_path: Output SQL file name (relative to 'output/' folder). + """ + self.base_url = base_url + self.jsessionid = jsessionid + self.input_path = input_path + self.output_path = output_path + + @staticmethod + def normalize_timestamp(raw_timestamp: Optional[str]) -> Optional[str]: + """ + Normalize a DHIS2 timestamp into a format that Postgres accepts. + Example: + '2025-07-18T13:48:12.502' -> '2025-07-18 13:48:12.502' + """ + if not raw_timestamp: + return None + + timestamp = raw_timestamp.rstrip("Z") + timestamp = timestamp.replace("T", " ") + return timestamp + + @classmethod + def _get_attribute_template_from_tei_level( + cls, + tei_data: dict, + ) -> Tuple[Optional[str], Optional[str], Optional[str]]: + """ + Try to obtain (created, lastUpdated, storedBy) from the first + attribute at TEI level. + """ + tei_level_attributes = tei_data.get("attributes") or [] + if not tei_level_attributes: + return None, None, None + + first_attribute = tei_level_attributes[0] + created = cls.normalize_timestamp(first_attribute.get("created")) + last_updated = cls.normalize_timestamp(first_attribute.get("lastUpdated")) + stored_by = first_attribute.get("storedBy") + + return created, last_updated, stored_by + + @classmethod + def _get_attribute_template_from_first_enrollment( + cls, + tei_data: dict, + ) -> Tuple[Optional[str], Optional[str], Optional[str]]: + """ + Fallback: try to obtain (created, lastUpdated, storedBy) from the + first attribute of the first enrollment. + """ + enrollments = tei_data.get("enrollments") or [] + if not enrollments: + return None, None, None + + first_enrollment = enrollments[0] + enrollment_attributes = first_enrollment.get("attributes") or [] + if not enrollment_attributes: + return None, None, None + + first_enrollment_attribute = enrollment_attributes[0] + created = cls.normalize_timestamp(first_enrollment_attribute.get("created")) + last_updated = cls.normalize_timestamp(first_enrollment_attribute.get("lastUpdated")) + stored_by = first_enrollment_attribute.get("storedBy") + + return created, last_updated, stored_by + + def _get_attribute_template( + self, + tei_uid: str, + ) -> Tuple[Optional[str], Optional[str], Optional[str]]: + """ + Get (created, lastUpdated, storedBy) to reuse as a template in the + new attribute value. + + Strategy: + 1) Try TEI-level attributes. + 2) If none, try the first enrollment's attributes. + 3) If nothing found, return (None, None, None). + """ + tei_data = dhis_get( + path=f"/api/trackedEntityInstances/{tei_uid}", + base_url=self.base_url, + jsessionid=self.jsessionid, + params={"fields": "*"}, + ) + + created, last_updated, stored_by = self._get_attribute_template_from_tei_level(tei_data) + if created and last_updated and stored_by: + return created, last_updated, stored_by + + return self._get_attribute_template_from_first_enrollment(tei_data) + + @staticmethod + def _build_insert_statement( + tracked_entity_id: str, + created_timestamp: str, + last_updated_timestamp: str, + full_name: str, + stored_by: str, + ) -> str: + """ + Build an INSERT statement for trackedentityattributevalue. + """ + full_name_sql = escape_sql_literal(full_name) + stored_by_sql = escape_sql_literal(stored_by) + + return f""" +INSERT INTO trackedentityattributevalue ( + trackedentityid, + trackedentityattributeid, + created, + lastupdated, + value, + storedby +) +VALUES ( + {tracked_entity_id}, + {TRACKED_ENTITY_ATTRIBUTE_ID}, + '{created_timestamp}'::timestamp, + '{last_updated_timestamp}'::timestamp, + '{full_name_sql}', + '{stored_by_sql}' +); +""".strip() + + def execute(self): + """ + Use case entry point. + + It expects a CSV with at least: + - trackedentityid + - tei_uid + - full_name + + For each row: + - Obtain a template for timestamps (created, lastUpdated, storedBy) + based on existing attributes of the TEI. + - Generate INSERT statements in trackedentityattributevalue to store full_name. + """ + inserts: list[str] = [] + skipped: list[tuple[str, str]] = [] + + rows = read_csv(self.input_path) + print(f"Read {len(rows)} rows from input/{self.input_path}") + + for row in rows: + tracked_entity_id = (row.get("trackedentityid") or "").strip() + tei_uid = (row.get("tei_uid") or "").strip() + full_name = (row.get("full_name") or "").strip() + + if not tracked_entity_id or not tei_uid or not full_name: + print(f"[SKIP] Missing required data in CSV row: {row}") + skipped.append((tei_uid, "incomplete_csv_data")) + continue + + print(f"Processing TEI {tei_uid} (trackedentityid={tracked_entity_id})...") + + try: + created_timestamp, last_updated_timestamp, stored_by = self._get_attribute_template( + tei_uid=tei_uid, + ) + except requests.HTTPError as http_error: + status_code = ( + http_error.response.status_code + if http_error.response is not None + else "?" + ) + print(f"[ERROR] TEI {tei_uid}: HTTP {status_code}") + skipped.append((tei_uid, f"http_{status_code}")) + continue + except Exception as unexpected_error: + print(f"[ERROR] TEI {tei_uid}: {unexpected_error}") + skipped.append((tei_uid, "unexpected_error")) + continue + + if not created_timestamp or not last_updated_timestamp or not stored_by: + print( + f"[WARN] TEI {tei_uid}: no template " + "(created/lastUpdated/storedBy), skipping" + ) + skipped.append((tei_uid, "no_attribute_template")) + continue + + insert_sql = self._build_insert_statement( + tracked_entity_id=tracked_entity_id, + created_timestamp=created_timestamp, + last_updated_timestamp=last_updated_timestamp, + full_name=full_name, + stored_by=stored_by, + ) + + inserts.append(insert_sql) + + print(insert_sql) + + if not inserts: + print("No INSERT statements generated. Check CSV / connection.") + return + + sql_script = "BEGIN;\n\n" + "\n\n".join(inserts) + "\n\nCOMMIT;\n" + final_path = write_text(self.output_path, sql_script) + + print(f"\nSQL written to: {final_path}") + if skipped: + print("\nSkipped TEIs:") + for tei_uid, reason in skipped: + print(f" - {tei_uid}: {reason}") diff --git a/DHIS2/python_skeleton/dhis_utils.py b/DHIS2/python_skeleton/dhis_utils.py new file mode 100644 index 00000000..1af03933 --- /dev/null +++ b/DHIS2/python_skeleton/dhis_utils.py @@ -0,0 +1,33 @@ +# dhis_utils.py + +import requests + + +def dhis_get( + path: str, + base_url: str, + jsessionid: str, + params: dict | None = None, + timeout: int = 30, +) -> dict: + """ + Perform a GET request to a DHIS2 instance using a JSESSIONID cookie. + + Args: + path: API path, e.g. "/api/system/info". + base_url: Base URL of the DHIS2 instance, e.g. "https://my-dhis2". + jsessionid: Value of the JSESSIONID cookie. + params: Optional query parameters. + timeout: Request timeout in seconds. + + Returns: + Parsed JSON response as a Python dict. + + Raises: + requests.HTTPError if the response status is not 2xx. + """ + url = f"{base_url.rstrip('/')}/{path.lstrip('/')}" + cookies = {"JSESSIONID": jsessionid} + response = requests.get(url, params=params or {}, cookies=cookies, timeout=timeout) + response.raise_for_status() + return response.json() diff --git a/DHIS2/python_skeleton/file_utils.py b/DHIS2/python_skeleton/file_utils.py new file mode 100644 index 00000000..1a27fe1a --- /dev/null +++ b/DHIS2/python_skeleton/file_utils.py @@ -0,0 +1,207 @@ +# file_utils.py + +import json +import csv +from pathlib import Path +from datetime import datetime + + +def _to_input_path(path: str | Path) -> Path: + """ + Map a relative path to the 'input' folder. + + - If the path is absolute, it is returned as-is. + - If the path is relative, it is resolved as 'input/'. + """ + p = Path(path) + if p.is_absolute(): + return p + return Path("input") / p + + +def _to_output_path(path: str | Path) -> Path: + """ + Map a relative path to the 'output' folder. + + - If the path is absolute, it is returned as-is. + - If the path is relative, it is resolved as 'output/'. + """ + p = Path(path) + if p.is_absolute(): + return p + return Path("output") / p + + +def _with_timestamp_if_exists(path: Path) -> Path: + """ + If the given path already exists, append a human-readable timestamp + to the filename (before the extension). + + Example: + output/result.sql -> exists + output/result_2025-11-20_15-42-10.sql (new path) + """ + if not path.exists(): + return path + + timestamp = datetime.now().strftime("%Y-%m-%d_%H-%M-%S") + return path.with_name(f"{path.stem}_{timestamp}{path.suffix}") + + +def read_json(path: str | Path): + """ + Read a JSON file from disk and return the parsed content. + The file is read from the 'input' folder unless an absolute path is provided. + """ + path = _to_input_path(path) + with path.open(encoding="utf-8") as f: + return json.load(f) + + +def write_json(path: str | Path, data, indent: int = 2) -> Path: + """ + Write a Python object as JSON to disk. + The file is written to the 'output' folder unless an absolute path is provided. + If the target file already exists, a human-readable timestamp is appended + to the filename. + + Returns: + The final Path used to write the file. + """ + path = _to_output_path(path) + path = _with_timestamp_if_exists(path) + path.parent.mkdir(parents=True, exist_ok=True) + with path.open("w", encoding="utf-8") as f: + json.dump(data, f, ensure_ascii=False, indent=indent) + return path + + +def read_csv(path: str | Path) -> list[dict]: + """ + Read a CSV file and return a list of dictionaries (one per row). + The file is read from the 'input' folder unless an absolute path is provided. + """ + path = _to_input_path(path) + with path.open(newline="", encoding="utf-8") as f: + return list(csv.DictReader(f)) + + +def write_csv(path: str | Path, rows: list[dict], fieldnames: list[str]) -> Path: + """ + Write a list of dictionaries to a CSV file. + The file is written to the 'output' folder unless an absolute path is provided. + If the target file already exists, a human-readable timestamp is appended + to the filename. + + Returns: + The final Path used to write the file. + """ + path = _to_output_path(path) + path = _with_timestamp_if_exists(path) + path.parent.mkdir(parents=True, exist_ok=True) + with path.open("w", newline="", encoding="utf-8") as f: + writer = csv.DictWriter(f, fieldnames=fieldnames) + writer.writeheader() + writer.writerows(rows) + return path + + +def write_text(path: str | Path, content: str) -> Path: + """ + Write plain text to a file. + The file is written to the 'output' folder unless an absolute path is provided. + If the target file already exists, a human-readable timestamp is appended + to the filename. + + Returns: + The final Path used to write the file. + """ + path = _to_output_path(path) + path = _with_timestamp_if_exists(path) + path.parent.mkdir(parents=True, exist_ok=True) + path.write_text(content, encoding="utf-8") + return path + + +def escape_sql_literal(text: str) -> str: + """ + Escape single quotes in a string so it can be safely used + as a SQL literal value. + """ + return text.replace("'", "''") + + +def load_dhis_env_config( + default_base_url: str, + default_jsessionid: str, + env_path: str | Path | None = None, +) -> tuple[str, str]: + """ + Look for a .env file and try to read DHIS2 configuration from it. + If a valid BASE_URL and JSESSIONID are found, show them to the user + (masking the JSESSIONID) and ask for confirmation. + + If the user presses ENTER, the values from .env are used. + If the user types 'n' or 'N' and presses ENTER, the defaults are kept. + + Args: + default_base_url: Fallback base URL if .env is not used or not found. + default_jsessionid: Fallback JSESSIONID if .env is not used or not found. + env_path: Optional explicit path to the .env file. If None, "./.env" is used. + + Returns: + (base_url, jsessionid) either from .env (if confirmed) or the defaults. + """ + if env_path is None: + env_path = Path(".") / ".env" + else: + env_path = Path(env_path) + + if not env_path.is_file(): + return default_base_url, default_jsessionid + + base_url_env: str | None = None + jsessionid_env: str | None = None + + try: + with env_path.open(encoding="utf-8") as f: + for line in f: + line = line.strip() + if not line or line.startswith("#"): + continue + if "=" not in line: + continue + key, value = line.split("=", 1) + key = key.strip() + value = value.strip() + + key_upper = key.upper() + if key_upper == "BASE_URL": + base_url_env = value + elif key_upper == "JSESSIONID": + jsessionid_env = value + except Exception as e: + print(f"[WARN] Failed to read .env file at {env_path}: {e}") + return default_base_url, default_jsessionid + + if not base_url_env or not jsessionid_env: + return default_base_url, default_jsessionid + + masked_jsessionid = ( + jsessionid_env[:6] + "..." if len(jsessionid_env) > 6 else jsessionid_env + ) + + print("Found .env configuration:") + print(f" Base URL : {base_url_env}") + print(f" JSESSIONID : {masked_jsessionid}") + print() + answer = input( + "Press ENTER to use this configuration, or type 'n' and press ENTER to ignore it: " + ).strip() + + if answer.lower() == "n": + print("Using default configuration (ignoring .env).") + return default_base_url, default_jsessionid + + print("Using configuration from .env.") + return base_url_env, jsessionid_env diff --git a/DHIS2/python_skeleton/get_files_util.py b/DHIS2/python_skeleton/get_files_util.py new file mode 100644 index 00000000..77024c4e --- /dev/null +++ b/DHIS2/python_skeleton/get_files_util.py @@ -0,0 +1,47 @@ +import os +from pathlib import Path + +# === CONFIGURATION === +# Used to get all the files in one file to check in chatgpt +BASE_DIR = Path(__file__).resolve().parent +MAX_BYTES = 200_000 # maximum file size to read + + +def main(): + for root, dirs, files in os.walk(BASE_DIR): + for name in files: + path = os.path.join(root, name) + + # Skip hidden files and folders (e.g. .git, .idea, .venv, etc.) + if any(part.startswith(".") for part in path.split(os.sep)): + continue + + # Skip common binary / large formats + if any( + path.endswith(ext) + for ext in (".png", ".jpg", ".jpeg", ".gif", ".pdf", ".zip", ".pyc", ".sql", ".csv") + ): + continue + + try: + # Skip very large files + if os.path.getsize(path) > MAX_BYTES: + continue + + with open(path, "r", encoding="utf-8") as f: + content = f.read() + except (UnicodeDecodeError, OSError): + # Binary or unreadable files -> skip + continue + + rel_path = os.path.relpath(path, BASE_DIR) + + print("\n" + "=" * 80) + print(f"FILE: {rel_path}") + print("=" * 80 + "\n") + print(content) + print("\n") # extra separation + + +if __name__ == "__main__": + main() diff --git a/DHIS2/python_skeleton/main_skeleton.py b/DHIS2/python_skeleton/main_skeleton.py new file mode 100644 index 00000000..a93b01ba --- /dev/null +++ b/DHIS2/python_skeleton/main_skeleton.py @@ -0,0 +1,64 @@ +# main.py + +import argparse + +from file_utils import load_dhis_env_config +from create_missing_values_use_case import CreateMissingValuesUseCase + + +# Default configuration (can be overridden by .env and CLI) +DEFAULT_BASE_URL = "" +DEFAULT_JSESSIONID = "" + +DEFAULT_INPUT_FILE = ("teis_without_storedby.csv") # read from input/ +DEFAULT_OUTPUT_FILE = "insert_attr_fullname.sql" # write to output/ + + +def parse_args(): + parser = argparse.ArgumentParser( + description="Create missing attribute values SQL for DHIS2 tracked entities." + ) + parser.add_argument( + "--base-url", + default=DEFAULT_BASE_URL, + help="DHIS2 base URL (default from code or .env).", + ) + parser.add_argument( + "--jsessionid", + default=DEFAULT_JSESSIONID, + help="JSESSIONID cookie value (default from code or .env).", + ) + parser.add_argument( + "--input-file", + default=DEFAULT_INPUT_FILE, + help="Input CSV file name (relative to 'input/' folder).", + ) + parser.add_argument( + "--output-file", + default=DEFAULT_OUTPUT_FILE, + help="Output SQL file name (relative to 'output/' folder).", + ) + return parser.parse_args() + + +def main(): + args = parse_args() + + # Merge CLI defaults with .env (with confirmation) + base_url, jsessionid = load_dhis_env_config( + default_base_url=args.base_url, + default_jsessionid=args.jsessionid, + ) + + use_case = CreateMissingValuesUseCase( + base_url=base_url, + jsessionid=jsessionid, + input_path=args.input_file, + output_path=args.output_file, + ) + + use_case.execute() + + +if __name__ == "__main__": + main() \ No newline at end of file From 4d459f1eea24dd818d87473e68164b3276e3f1c1 Mon Sep 17 00:00:00 2001 From: idelcano Date: Fri, 21 Nov 2025 09:43:57 +0100 Subject: [PATCH 2/4] change field in usecase, and add test connection to avoid masive error calls when the token is out --- .../create_missing_values_use_case.py | 14 ++++++- DHIS2/python_skeleton/dhis_utils.py | 42 +++++++++++++++++++ DHIS2/python_skeleton/main_skeleton.py | 4 +- 3 files changed, 58 insertions(+), 2 deletions(-) diff --git a/DHIS2/python_skeleton/create_missing_values_use_case.py b/DHIS2/python_skeleton/create_missing_values_use_case.py index ef3da09b..7d987c6f 100644 --- a/DHIS2/python_skeleton/create_missing_values_use_case.py +++ b/DHIS2/python_skeleton/create_missing_values_use_case.py @@ -180,7 +180,19 @@ def execute(self): for row in rows: tracked_entity_id = (row.get("trackedentityid") or "").strip() tei_uid = (row.get("tei_uid") or "").strip() - full_name = (row.get("full_name") or "").strip() + firstname = (row.get("firstname") or "").strip() + surname = (row.get("surname") or "").strip() + + # Construimos el full_name como: + # firstname + " " + surname + if firstname and surname: + full_name = f"{firstname} {surname}" + elif firstname: + full_name = firstname + elif surname: + full_name = surname + else: + full_name = "" if not tracked_entity_id or not tei_uid or not full_name: print(f"[SKIP] Missing required data in CSV row: {row}") diff --git a/DHIS2/python_skeleton/dhis_utils.py b/DHIS2/python_skeleton/dhis_utils.py index 1af03933..a1bd7de8 100644 --- a/DHIS2/python_skeleton/dhis_utils.py +++ b/DHIS2/python_skeleton/dhis_utils.py @@ -1,5 +1,6 @@ # dhis_utils.py +import sys import requests @@ -31,3 +32,44 @@ def dhis_get( response = requests.get(url, params=params or {}, cookies=cookies, timeout=timeout) response.raise_for_status() return response.json() + + +def test_connection( + base_url: str, + jsessionid: str, + timeout: int = 10, +) -> dict: + """ + Test connection against /api/system/info. + If it fails, print an error and abort the script. + + Returns: + system/info JSON dict if everything is OK. + + Exits: + Calls sys.exit(1) on any error. + """ + try: + system_info = dhis_get( + path="/api/system/info", + base_url=base_url, + jsessionid=jsessionid, + params=None, + timeout=timeout, + ) + except requests.HTTPError as http_error: + status = http_error.response.status_code if http_error.response is not None else "?" + print(f"[FATAL] HTTP error {status} while calling /api/system/info at {base_url}") + sys.exit(1) + except requests.RequestException as req_error: + print(f"[FATAL] Could not connect to {base_url} (/api/system/info): {req_error}") + sys.exit(1) + except Exception as unexpected: + print(f"[FATAL] Unexpected error while testing connection to {base_url}: {unexpected}") + sys.exit(1) + + print( + f"[OK] Connected to DHIS2 at {base_url} " + f"(version={system_info.get('version', 'unknown')})" + ) + return system_info diff --git a/DHIS2/python_skeleton/main_skeleton.py b/DHIS2/python_skeleton/main_skeleton.py index a93b01ba..3a040484 100644 --- a/DHIS2/python_skeleton/main_skeleton.py +++ b/DHIS2/python_skeleton/main_skeleton.py @@ -4,7 +4,7 @@ from file_utils import load_dhis_env_config from create_missing_values_use_case import CreateMissingValuesUseCase - +from dhis_utils import test_connection # Default configuration (can be overridden by .env and CLI) DEFAULT_BASE_URL = "" @@ -50,6 +50,8 @@ def main(): default_jsessionid=args.jsessionid, ) + test_connection(base_url=base_url, jsessionid=jsessionid) + use_case = CreateMissingValuesUseCase( base_url=base_url, jsessionid=jsessionid, From e41db9ec35525d3e8d2037d3054b7d2958d6b731 Mon Sep 17 00:00:00 2001 From: idelcano Date: Tue, 25 Nov 2025 13:24:48 +0100 Subject: [PATCH 3/4] added usecase to update blueprint --- DHIS2/python_skeleton/file_utils.py | 20 ++ DHIS2/python_skeleton/main_skeleton.py | 80 +++++- .../update_blueprint_dataelements_use_case.py | 268 ++++++++++++++++++ 3 files changed, 360 insertions(+), 8 deletions(-) create mode 100644 DHIS2/python_skeleton/update_blueprint_dataelements_use_case.py diff --git a/DHIS2/python_skeleton/file_utils.py b/DHIS2/python_skeleton/file_utils.py index 1a27fe1a..88db8bf6 100644 --- a/DHIS2/python_skeleton/file_utils.py +++ b/DHIS2/python_skeleton/file_utils.py @@ -48,6 +48,26 @@ def _with_timestamp_if_exists(path: Path) -> Path: return path.with_name(f"{path.stem}_{timestamp}{path.suffix}") +def resolve_input_path(path: str | Path) -> Path: + """ + Public helper to map a relative path to the 'input' folder. + """ + return _to_input_path(path) + + +def resolve_output_path(path: str | Path, with_timestamp: bool = True) -> Path: + """ + Public helper to map a relative path to the 'output' folder. + + If with_timestamp is True and the file exists, a timestamp is appended + to avoid overwriting the existing file. + """ + resolved = _to_output_path(path) + if with_timestamp: + return _with_timestamp_if_exists(resolved) + return resolved + + def read_json(path: str | Path): """ Read a JSON file from disk and return the parsed content. diff --git a/DHIS2/python_skeleton/main_skeleton.py b/DHIS2/python_skeleton/main_skeleton.py index 3a040484..d538ee89 100644 --- a/DHIS2/python_skeleton/main_skeleton.py +++ b/DHIS2/python_skeleton/main_skeleton.py @@ -4,6 +4,10 @@ from file_utils import load_dhis_env_config from create_missing_values_use_case import CreateMissingValuesUseCase +from update_blueprint_dataelements_use_case import ( + UpdateBlueprintDataElementsUseCase, + DEFAULT_SHEETS as DEFAULT_BLUEPRINT_SHEETS, +) from dhis_utils import test_connection # Default configuration (can be overridden by .env and CLI) @@ -13,10 +17,20 @@ DEFAULT_INPUT_FILE = ("teis_without_storedby.csv") # read from input/ DEFAULT_OUTPUT_FILE = "insert_attr_fullname.sql" # write to output/ +DEFAULT_BLUEPRINT_INPUT = "Blueprint_HWF.xlsx" # read from input/ +DEFAULT_BLUEPRINT_OUTPUT = "blueprint_apvd.xlsx" # write to output/ +DEFAULT_USE_CASE = "create-missing-values" + def parse_args(): parser = argparse.ArgumentParser( - description="Create missing attribute values SQL for DHIS2 tracked entities." + description="Utilities for DHIS2 blueprints and tracked entities." + ) + parser.add_argument( + "--use-case", + choices=["create-missing-values", "update-blueprint-dataelements"], + default=DEFAULT_USE_CASE, + help="Which workflow to run.", ) parser.add_argument( "--base-url", @@ -38,6 +52,43 @@ def parse_args(): default=DEFAULT_OUTPUT_FILE, help="Output SQL file name (relative to 'output/' folder).", ) + parser.add_argument( + "--xlsx-file", + default=DEFAULT_BLUEPRINT_INPUT, + help="Input XLSX file (relative to 'input/' folder) for blueprint updates.", + ) + parser.add_argument( + "--output-xlsx-file", + default=DEFAULT_BLUEPRINT_OUTPUT, + help="Output XLSX file (relative to 'output/' folder) for blueprint updates.", + ) + parser.add_argument( + "--sheets", + nargs="+", + default=list(DEFAULT_BLUEPRINT_SHEETS), + help="Sheet names to process when updating blueprint data elements.", + ) + parser.add_argument( + "--name-col", + default="3", + help="Column (index, letter, or header text) that holds the data element name.", + ) + parser.add_argument( + "--uid-col", + default="DE UID", + help="Column (index, letter, or header text) where the UID will be written.", + ) + parser.add_argument( + "--code-col", + default="DE Code", + help="Column (index, letter, or header text) where the code will be written.", + ) + parser.add_argument( + "--data-start-row", + type=int, + default=2, + help="Row to start reading data when headers are not used.", + ) return parser.parse_args() @@ -52,15 +103,28 @@ def main(): test_connection(base_url=base_url, jsessionid=jsessionid) - use_case = CreateMissingValuesUseCase( - base_url=base_url, - jsessionid=jsessionid, - input_path=args.input_file, - output_path=args.output_file, - ) + if args.use_case == "update-blueprint-dataelements": + use_case = UpdateBlueprintDataElementsUseCase( + base_url=base_url, + jsessionid=jsessionid, + xlsx_path=args.xlsx_file, + output_path=args.output_xlsx_file, + sheet_names=args.sheets, + name_column=args.name_col, + uid_column=args.uid_col, + code_column=args.code_col, + data_start_row=args.data_start_row, + ) + else: + use_case = CreateMissingValuesUseCase( + base_url=base_url, + jsessionid=jsessionid, + input_path=args.input_file, + output_path=args.output_file, + ) use_case.execute() if __name__ == "__main__": - main() \ No newline at end of file + main() diff --git a/DHIS2/python_skeleton/update_blueprint_dataelements_use_case.py b/DHIS2/python_skeleton/update_blueprint_dataelements_use_case.py new file mode 100644 index 00000000..47b6c6a8 --- /dev/null +++ b/DHIS2/python_skeleton/update_blueprint_dataelements_use_case.py @@ -0,0 +1,268 @@ +from __future__ import annotations + +from dataclasses import dataclass +from pathlib import Path +from typing import Optional + +import requests +from openpyxl import load_workbook +from openpyxl.utils.cell import column_index_from_string + +from dhis_utils import dhis_get +from file_utils import resolve_input_path, resolve_output_path + + +DEFAULT_SHEETS = ("Module 1 - APVD", "Module 2 - APVD") + + +@dataclass +class ColumnSelector: + """ + Helper to resolve a column either by index (1-based), Excel letter, or header text. + """ + + raw: str + index: Optional[int] + header: Optional[str] + + @classmethod + def parse(cls, raw: str | int) -> "ColumnSelector": + text = str(raw).strip() + if text.isdigit(): + return cls(raw=text, index=int(text), header=None) + + # Try Excel letters (A, B, AA...) + try: + numeric_index = column_index_from_string(text) + return cls(raw=text, index=numeric_index, header=None) + except ValueError: + pass + + return cls(raw=text, index=None, header=text) + + +class UpdateBlueprintDataElementsUseCase: + """ + Update DHIS2 data element UIDs (and codes when available) inside an XLSX blueprint. + - Open a workbook. + - For each configured sheet, read the data element name from a column (by header or position). + - Ensure the name has the '-APVD' suffix. + - Look up the data element in DHIS2; if no exact match, ask the user to enter the UID manually. + - Write UID and code back into the configured columns. + """ + + def __init__( + self, + base_url: str, + jsessionid: str, + xlsx_path: str, + output_path: str, + sheet_names: list[str], + name_column: str | int, + uid_column: str | int, + code_column: str | int, + data_start_row: int = 2, + header_scan_rows: int = 5, + ): + self.base_url = base_url + self.jsessionid = jsessionid + self.xlsx_path = xlsx_path + self.output_path = output_path + self.sheet_names = sheet_names + self.name_selector = ColumnSelector.parse(name_column) + self.uid_selector = ColumnSelector.parse(uid_column) + self.code_selector = ColumnSelector.parse(code_column) + self.data_start_row = data_start_row + self.header_scan_rows = header_scan_rows + + @staticmethod + def _ensure_apvd_suffix(name: str) -> str: + normalized = name.strip() + if normalized.endswith("-APVD"): + return normalized + return f"{normalized}-APVD" + + @staticmethod + def _normalize_text(value) -> str: + if value is None: + return "" + return str(value).strip() + + def _resolve_column(self, sheet, selector: ColumnSelector) -> tuple[int, Optional[int]]: + """ + Return (column_index, header_row_used). + If the selector uses a header, search for it in the first header_scan_rows rows. + """ + if selector.index is not None: + return selector.index, None + + header_lower = selector.header.lower() + for row in sheet.iter_rows(min_row=1, max_row=self.header_scan_rows): + for cell in row: + value = self._normalize_text(cell.value) + if value.lower() == header_lower: + return cell.column, cell.row + + raise ValueError( + f"No column with header '{selector.header}' found in first {self.header_scan_rows} rows of sheet '{sheet.title}'" + ) + + def _resolve_columns(self, sheet) -> tuple[dict[str, int], int]: + """ + Resolve configured columns and compute the first data row (skip header rows if found). + """ + header_rows: list[int] = [] + columns = {} + + for key, selector in ( + ("name", self.name_selector), + ("uid", self.uid_selector), + ("code", self.code_selector), + ): + col_index, header_row = self._resolve_column(sheet, selector) + columns[key] = col_index + if header_row is not None: + header_rows.append(header_row) + + data_row_start = self.data_start_row + if header_rows: + data_row_start = max(data_row_start, max(header_rows) + 1) + + print( + f"Sheet '{sheet.title}': name_col={columns['name']}, uid_col={columns['uid']}, " + f"code_col={columns['code']}, start_row={data_row_start}" + ) + return columns, data_row_start + + def _prompt_manual_entry(self, sheet_name: str, row_idx: int, target_name: str) -> dict: + print( + f"[INPUT REQUIRED] '{target_name}' (sheet '{sheet_name}', row {row_idx}) " + "could not be matched automatically." + ) + manual_uid = input("Enter the UID (required, press ENTER to abort): ").strip() + if not manual_uid: + raise SystemExit("Aborted by user (no UID provided).") + manual_code = input("Enter the code (optional, press ENTER to skip): ").strip() + return {"id": manual_uid, "name": target_name, "code": manual_code or None} + + def _find_data_element(self, target_name: str) -> Optional[dict]: + """ + Look for a data element whose name matches target_name exactly. + Returns None if no safe match is found. + """ + response = dhis_get( + path="/api/dataElements", + base_url=self.base_url, + jsessionid=self.jsessionid, + params={ + "filter": f"name:like:{target_name}", + "fields": "id,name,code", + }, + ) + + data_elements = response.get("dataElements") or [] + exact_matches = [ + de for de in data_elements if self._normalize_text(de.get("name")) == target_name + ] + + if len(exact_matches) == 1: + return exact_matches[0] + + if len(exact_matches) > 1: + print( + f"[WARN] Multiple exact matches for '{target_name}': " + + ", ".join(de.get("id", "?") for de in exact_matches) + ) + return None + + if data_elements: + print( + f"[WARN] No exact match for '{target_name}'. Candidates: " + + ", ".join(self._normalize_text(de.get("name")) for de in data_elements) + ) + else: + print(f"[WARN] No data elements returned for '{target_name}'.") + + return None + + def _process_row( + self, + sheet, + row_idx: int, + columns: dict[str, int], + ) -> bool: + """ + Process a single row. Returns True if a UID was written. + """ + name_cell = sheet.cell(row=row_idx, column=columns["name"]) + raw_name = self._normalize_text(name_cell.value) + if not raw_name: + return False + + target_name = self._ensure_apvd_suffix(raw_name) + if target_name != name_cell.value: + name_cell.value = target_name + + try: + data_element = self._find_data_element(target_name) + except requests.HTTPError as http_error: + status_code = ( + http_error.response.status_code if http_error.response is not None else "?" + ) + print( + f"[ERROR] HTTP {status_code} while searching '{target_name}' " + f"(sheet '{sheet.title}', row {row_idx})" + ) + data_element = None + except Exception as unexpected: + print( + f"[ERROR] Unexpected error while searching '{target_name}' " + f"(sheet '{sheet.title}', row {row_idx}): {unexpected}" + ) + data_element = None + + if data_element is None or self._normalize_text(data_element.get("name")) != target_name: + data_element = self._prompt_manual_entry(sheet.title, row_idx, target_name) + + uid_value = data_element.get("id") + if not uid_value: + data_element = self._prompt_manual_entry(sheet.title, row_idx, target_name) + uid_value = data_element.get("id") + + sheet.cell(row=row_idx, column=columns["uid"]).value = uid_value + + code_value = data_element.get("code") + if code_value: + sheet.cell(row=row_idx, column=columns["code"]).value = code_value + + print( + f"[OK] Row {row_idx} in '{sheet.title}': " + f"name='{target_name}', uid='{uid_value}', code='{code_value or 'N/A'}'" + ) + return True + + def execute(self): + workbook_path = resolve_input_path(self.xlsx_path) + if not workbook_path.is_file(): + raise FileNotFoundError(f"Workbook not found at {workbook_path}") + + wb = load_workbook(workbook_path) + print(f"Opened workbook: {workbook_path}") + + total_updated = 0 + for sheet_name in self.sheet_names: + if sheet_name not in wb.sheetnames: + print(f"[WARN] Sheet '{sheet_name}' not found, skipping.") + continue + + sheet = wb[sheet_name] + columns, start_row = self._resolve_columns(sheet) + + for row_idx in range(start_row, sheet.max_row + 1): + if self._process_row(sheet, row_idx, columns): + total_updated += 1 + + output_path = resolve_output_path(self.output_path) + output_path.parent.mkdir(parents=True, exist_ok=True) + wb.save(output_path) + print(f"Workbook saved to: {output_path} ({total_updated} rows updated)") From 0a722cc900fbf119d6d359901fb15b5a99553867 Mon Sep 17 00:00:00 2001 From: idelcano Date: Tue, 25 Nov 2025 13:28:57 +0100 Subject: [PATCH 4/4] added readme and update gitignore --- DHIS2/python_skeleton/.gitignore | 4 ++- DHIS2/python_skeleton/README.md | 55 ++++++++++++++++++++++++++++++++ 2 files changed, 58 insertions(+), 1 deletion(-) create mode 100644 DHIS2/python_skeleton/README.md diff --git a/DHIS2/python_skeleton/.gitignore b/DHIS2/python_skeleton/.gitignore index 66448356..23539666 100644 --- a/DHIS2/python_skeleton/.gitignore +++ b/DHIS2/python_skeleton/.gitignore @@ -1,3 +1,5 @@ .env +.venv/ +__pycache__/ input/ -output/ \ No newline at end of file +output/ diff --git a/DHIS2/python_skeleton/README.md b/DHIS2/python_skeleton/README.md new file mode 100644 index 00000000..7b263a9a --- /dev/null +++ b/DHIS2/python_skeleton/README.md @@ -0,0 +1,55 @@ +# DHIS2 Python Skeleton + +Small set of DHIS2 utilities (“use cases”): +- Generate SQL for missing attribute values (`create-missing-values`). +- Update data element UIDs and codes inside a blueprint XLSX (`update-blueprint-dataelements`). + +## Installation (recommended: virtualenv) +```bash +python3 -m venv .venv +source .venv/bin/activate # Linux / macOS +# .venv\Scripts\activate # Windows + +pip install openpyxl +pip install requests +``` + +## Quick start +Run commands from the project root. + +### Update data element UIDs in a blueprint +Reads an XLSX from `input/` and writes the result to `output/` (adds a timestamp if the file already exists). +```bash +python3 main_skeleton.py \ + --use-case update-blueprint-dataelements \ + --base-url https://server \ + --jsessionid token \ + --xlsx-file Blueprint_HFW.xlsx \ + --output-xlsx-file blueprint_apvd.xlsx +``` +Key parameters: +- `--sheets`: sheet names to process (default `Module 1 - APVD` and `Module 2 - APVD`). +- `--name-col`: column with the data element name (index, letter, or header; default `3`). +- `--uid-col`: column to write the UID (default `UID`; accepts index, letter, or header). +- `--code-col`: column to write the code (default `Code`; accepts index, letter, or header). +- `--data-start-row`: start row when no headers are present (default `2`; auto-adjusts if headers are detected). + +What it does: +- Ensures names end with `-APVD`. +- Searches DHIS2 with `name:like`; if one exact match is found, writes UID (and code when present). +- If no exact match, shows candidates and prompts for UID manually in the console. + +### Create SQL for missing attributes +Generates INSERT statements for missing tracked-entity attributes (reads CSV from `input/`, writes SQL to `output/`): +```bash +python3 main_skeleton.py \ + --use-case create-missing-values \ + --base-url https://server \ + --jsessionid token \ + --input-file teis_without_storedby.csv \ + --output-file insert_attr_fullname.sql +``` + +## Notes +- Credentials can also come from `.env` (`BASE_URL` and `JSESSIONID`); the script will ask for confirmation before using them. +- For more options, run `python3 main_skeleton.py --help`.