Skip to content

feat: disable unhealthy flow schedules#975

Draft
Winzen wants to merge 2 commits intostagingfrom
feat/disable_unhealthy_flow
Draft

feat: disable unhealthy flow schedules#975
Winzen wants to merge 2 commits intostagingfrom
feat/disable_unhealthy_flow

Conversation

@Winzen
Copy link
Contributor

@Winzen Winzen commented Feb 4, 2026

Nova funcionalidade: Disable Unhealthy Flow Schedules

Contexto

Estamos adicionando o comando disable_unhealthy_flow_schedules, que:

  • Desativa automaticamente flows com problemas recentes no Prefect
  • Avalia se um flow deve ser desativado com base na última execução da task run_dbt
  • Permite rodar em modo dry-run, logando quais flows seriam desativados sem alterá-los

Organização de pastas

A lógica do comando foi organizada em módulos por feature, dentro de _disable_unhealthy_flow_schedules, para separar responsabilidades e facilitar manutenção e testes:

Organização de pastas e fluxo

management/
└── commands/
    ├── disable_unhealthy_flow_schedules.py   # comando Django mínimo
    └── _disable_unhealthy_flow_schedules/   # código auxiliar do comando
        ├── __init__.py
        ├── constants.py       # Constantes e queries GraphQL
        ├── datetime_utils.py  # Helpers para parsing de datas e cálculo de datas relativas
        ├── models.py          # Task, TaskRun, FlowRun, FlowDisable
        └── service.py         # MakeClient, FlowService e integrações externas

Diagrama visual do fluxo de execução

disable_unhealthy_flow_schedules.py (Command Django)
        │
        │  (chama)
        ▼
FlowService.disable_unhealthy_flow_schedules(dry_run)
        │
        │  (usa)
        ▼
   ┌─────────────┐
   │  FlowService │
   └─────────────┘
        │
        │  (consulta GraphQL)
        ▼
   MakeClient.query.execute(...)
        │
        │  (retorna dados)
        ▼
   FlowDisable.get_runs()
        │
        │  (instancia FlowRun e TaskRun)
        ▼
   FlowDisable.validate()
        │
        │  (decide se flow deve ser desativado)
        ▼
   FlowService.set_flow_schedule()  (se não dry-run)
        │
        ▼
   ✅ Flow desativado ou apenas logado

Notas visuais

  • ➡️ indica chamada de função / fluxo de execução
  • FlowService é o núcleo que orquestra a lógica
  • FlowDisable encapsula regra de negócio / validação
  • MakeClient é responsável por comunicação externa com GraphQL

Benefícios desta primeira versão

  • Modular e testável: cada parte (models, service, utils, constants) isolada
  • Dry-run disponível: verifica sem alterar estado
  • Parsing de datas confiável: comparações precisas com datetime
  • Organização clara por feature: facilita manutenção e expansão futura

Estrutura do código

1️⃣ Command Django

from django.core.management.base import BaseCommand
from ._disable_unhealthy_flow_schedules.service import FlowService

class Command(BaseCommand):
    help = "Disable unhealthy flow schedules"

    def add_arguments(self, parser):
        parser.add_argument(
            "--dry-run",
            action="store_true",
            help="Log flows that would be disabled without disabling them"
        )

    def handle(self, *args, **options):
        FlowService().disable_unhealthy_flow_schedules(dry_run=options["dry_run"])
  • Mantido mínimo, apenas chama o serviço
  • --dry-run permite verificar sem alterar o estado

2️⃣ constants.py

from enum import Enum

class Querys(Enum):
    FLOWS_FAILED_LAST_WEEK = "..."  # GraphQL
    LAST_COMPLETED_RUNS_TASKS = "..."

RUN_DBT_TASK_NAME = "run_dbt"
FLOW_SUCCESS_STATE = "Success"
FLOW_FAILED_STATE = "Failed"

3️⃣ datetime_utils.py

from datetime import datetime, timedelta
from django.utils import timezone

ISO_FORMAT = "%Y-%m-%dT%H:%M:%SZ"

def parse_datetime(value: str) -> datetime:
    return datetime.strptime(value, ISO_FORMAT)

def one_week_ago() -> str:
    return (timezone.now() - timedelta(days=7)).strftime(ISO_FORMAT)

4️⃣ models.py

  • Classes de domínio: Task, TaskRun, FlowRun, FlowDisable
  • FlowDisable.validate() encapsula a regra de negócio
class FlowDisable:
    def __init__(self, id: str, created: str, service: FlowService):
        self.id = id
        self.created = parse_datetime(created)
        self.service = service
        self.runs = self.get_runs()

    def get_runs(self):
        response = self.service.last_completed_runs_tasks(self.id)
        return [FlowRun(**run) for run in response["flow_run"]]

    def validate(self) -> bool:
        for run in self.runs:
            if (run.task_runs.task.name == RUN_DBT_TASK_NAME
                    and run.start_time > self.created
                    and run.state == FLOW_FAILED_STATE):
                return True
            if run.state == FLOW_SUCCESS_STATE:
                return False
        return True

5️⃣ service.py

  • MakeClient → inicializa GraphQL client
  • FlowService → funções de integração, queries e desativação de flows
  • Função principal: disable_unhealthy_flow_schedules(dry_run=False)
def disable_unhealthy_flow_schedules(self, dry_run: bool = False):
    flows_data = self.flows_failed_last_week()
    flows = [FlowDisable(**flow, service=self) for flow in flows_data]
    flows_to_disable = [flow for flow in flows if flow.validate()]

    logger.info("Flows para ficar em alerta:")
    for flow in flows:
        logger.info(f"- {flow.id} | {flow.runs[0].name}")

    if not dry_run:
        logger.info("Flows desativados:")
        for flow in flows_to_disable:
            self.set_flow_schedule(flow_id=flow.id, active=False)
            logger.info(f"- {flow.id} | {flow.runs[0].name}")

6️⃣ Benefícios dessa primeira versão

  • Código modular e testável
  • Separação clara entre CLI, domínio, constantes e integração externa
  • Parsing de datas confiável, comparações precisas
  • Dry-run disponível para testes sem efeito colateral

@Winzen Winzen self-assigned this Feb 4, 2026
@Winzen Winzen added the staging Indica que o Pull Request está com a branch de destino (base) apontando para staging label Feb 4, 2026
@Winzen Winzen force-pushed the feat/disable_unhealthy_flow branch from a475d99 to 98f3fba Compare February 6, 2026 02:52
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Labels

staging Indica que o Pull Request está com a branch de destino (base) apontando para staging

Projects

None yet

Development

Successfully merging this pull request may close these issues.

2 participants