Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
44 changes: 29 additions & 15 deletions ami/jobs/admin.py
Original file line number Diff line number Diff line change
Expand Up @@ -4,7 +4,7 @@

from ami.main.admin import AdminBase

from .models import Job, get_job_type_by_inferred_key
from .models import Job


@admin.register(Job)
Expand All @@ -13,14 +13,14 @@ class JobAdmin(AdminBase):

list_display = (
"name",
"job_type_key",
"status",
"task_id",
"project",
"scheduled_at",
"started_at",
"finished_at",
"duration",
"job_type_key",
"inferred_job_type",
)

@admin.action()
Expand All @@ -29,22 +29,28 @@ def enqueue_jobs(self, request: HttpRequest, queryset: QuerySet[Job]) -> None:
job.enqueue()
self.message_user(request, f"Queued {queryset.count()} job(s).")

@admin.display(description="Inferred Job Type")
def inferred_job_type(self, obj: Job) -> str:
"""
@TODO Remove this after running migration 0011_job_job_type_key.py and troubleshooting.
"""
job_type = get_job_type_by_inferred_key(obj)
return job_type.name if job_type else "Could not infer"
@admin.action()
def retry_jobs(self, request: HttpRequest, queryset: QuerySet[Job]) -> None:
for job in queryset:
job.retry(async_task=True)
self.message_user(request, f"Retried {queryset.count()} job(s).")

# return obj.job_type().name
@admin.action()
def cancel_jobs(self, request: HttpRequest, queryset: QuerySet[Job]) -> None:
for job in queryset:
job.cancel()
self.message_user(request, f"Cancelled {queryset.count()} job(s).")

actions = [enqueue_jobs]
actions = [enqueue_jobs, retry_jobs]
Copy link

Copilot AI Nov 1, 2025

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The cancel_jobs action is defined at line 39 but not included in the actions list. Either remove the unused action method or add it to the actions list: actions = [enqueue_jobs, retry_jobs, cancel_jobs]

Suggested change
actions = [enqueue_jobs, retry_jobs]
actions = [enqueue_jobs, retry_jobs, cancel_jobs]

Copilot uses AI. Check for mistakes.

Comment on lines +44 to 45
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

⚠️ Potential issue | 🟠 Major

Add cancel_jobs to the actions list

cancel_jobs is defined but never registered in actions, so the new admin action will not appear in the UI. Please include it to make the cancellation action usable.

-    actions = [enqueue_jobs, retry_jobs]
+    actions = [enqueue_jobs, retry_jobs, cancel_jobs]
📝 Committable suggestion

‼️ IMPORTANT
Carefully review the code before committing. Ensure that it accurately replaces the highlighted code, contains no missing lines, and has no issues with indentation. Thoroughly test & benchmark the code to ensure it meets the requirements.

Suggested change
actions = [enqueue_jobs, retry_jobs]
actions = [enqueue_jobs, retry_jobs, cancel_jobs]
🧰 Tools
🪛 Ruff (0.14.2)

44-44: Mutable class attributes should be annotated with typing.ClassVar

(RUF012)

🤖 Prompt for AI Agents
In ami/jobs/admin.py around lines 44-45, the admin action list currently
contains only [enqueue_jobs, retry_jobs]; add cancel_jobs to that list so the
cancel_jobs action is registered and appears in the admin UI (e.g., change
actions = [enqueue_jobs, retry_jobs] to actions = [enqueue_jobs, retry_jobs,
cancel_jobs]). Ensure no trailing commas or syntax errors are introduced.

exclude = (
# This takes too long to load in the admin panel
autocomplete_fields = (
"source_image_collection",
"source_image_single",
# These are read-only fields
"pipeline",
"project",
)

readonly_fields = (
"task_id",
"scheduled_at",
"started_at",
Expand All @@ -54,3 +60,11 @@ def inferred_job_type(self, obj: Job) -> str:
"progress",
"result",
)

list_filter = (
"status",
"job_type_key",
"project",
"source_image_collection",
"pipeline",
)
19 changes: 19 additions & 0 deletions ami/jobs/migrations/0019_job_last_checked_at.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,19 @@
# Generated by Django 4.2.10 on 2025-10-30 19:54

from django.db import migrations, models


class Migration(migrations.Migration):
dependencies = [
("jobs", "0018_alter_job_job_type_key"),
]

operations = [
migrations.AddField(
model_name="job",
name="last_checked_at",
field=models.DateTimeField(
blank=True, help_text="Last time job status was checked by periodic task", null=True
),
),
]
Original file line number Diff line number Diff line change
@@ -0,0 +1,47 @@
# Generated manually for periodic job status checking

from django.db import migrations


def create_periodic_task(apps, schema_editor):
"""Create periodic task to check unfinished jobs every 3 minutes."""
try:
from django_celery_beat.models import IntervalSchedule, PeriodicTask

interval_schedule, _ = IntervalSchedule.objects.get_or_create(
every=3,
period=IntervalSchedule.MINUTES,
)

PeriodicTask.objects.get_or_create(
name="jobs.check_incomplete_jobs",
defaults={
"task": "ami.jobs.tasks.check_incomplete_jobs",
"interval": interval_schedule,
"enabled": True,
"description": "Check status of unfinished jobs and update if tasks disappeared",
},
)
except Exception as e:
print(f"Warning: Could not create periodic task: {e}")
print("You may need to create it manually in the Django admin or via shell.")


def delete_periodic_task(apps, schema_editor):
"""Delete the periodic task if rolling back."""
try:
from django_celery_beat.models import PeriodicTask

PeriodicTask.objects.filter(name="jobs.check_incomplete_jobs").delete()
except Exception as e:
print(f"Warning: Could not delete periodic task: {e}")


class Migration(migrations.Migration):
dependencies = [
("jobs", "0019_job_last_checked_at"),
]

operations = [
migrations.RunPython(create_periodic_task, delete_periodic_task),
]
41 changes: 37 additions & 4 deletions ami/jobs/models.py
Original file line number Diff line number Diff line change
Expand Up @@ -722,11 +722,22 @@ class Job(BaseModel):
# Hide old failed jobs after 3 days
FAILED_CUTOFF_HOURS = 24 * 3

# Job status checking configuration constants
NO_TASK_ID_TIMEOUT_SECONDS = 300 # 5 minutes
DISAPPEARED_TASK_RETRY_THRESHOLD_SECONDS = 300 # 5 minutes
MAX_JOB_RUNTIME_SECONDS = 7 * 24 * 60 * 60 # 7 days
STUCK_PENDING_TIMEOUT_SECONDS = 600 # 10 minutes

Copy link

Copilot AI Nov 1, 2025

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Missing required constants STUCK_PENDING_NO_WORKERS_TIMEOUT_SECONDS and PENDING_LOG_INTERVAL_SECONDS which are referenced in ami/jobs/status.py at lines 281, 284, 286, and 287. These constants should be defined in the Job model alongside other timeout constants.

Suggested change
STUCK_PENDING_NO_WORKERS_TIMEOUT_SECONDS = 600 # 10 minutes
PENDING_LOG_INTERVAL_SECONDS = 60 # 1 minute

Copilot uses AI. Check for mistakes.
name = models.CharField(max_length=255)
queue = models.CharField(max_length=255, default="default")
scheduled_at = models.DateTimeField(null=True, blank=True)
started_at = models.DateTimeField(null=True, blank=True)
finished_at = models.DateTimeField(null=True, blank=True)
last_checked_at = models.DateTimeField(
null=True,
blank=True,
help_text="Last time job status was checked by periodic task",
)
# @TODO can we use an Enum or Pydantic model for status?
status = models.CharField(max_length=255, default=JobState.CREATED.name, choices=JobState.choices())
progress: JobProgress = SchemaField(JobProgress, default=default_job_progress)
Expand Down Expand Up @@ -813,7 +824,7 @@ def send_task():
self.started_at = None
self.finished_at = None
self.scheduled_at = datetime.datetime.now()
self.status = AsyncResult(task_id).status
self.update_status(AsyncResult(task_id).status, save=False)
self.update_progress(save=False)
self.save()

Expand Down Expand Up @@ -862,7 +873,7 @@ def retry(self, async_task=True):
self.logger.info(f"Re-running job {self}")
self.finished_at = None
self.progress.reset()
self.status = JobState.RETRY
self.update_status(JobState.RETRY, save=False)
self.save()
if async_task:
self.enqueue()
Expand All @@ -873,15 +884,15 @@ def cancel(self):
"""
Terminate the celery task.
"""
self.status = JobState.CANCELING
self.update_status(JobState.CANCELING, save=False)
self.save()
if self.task_id:
task = run_job.AsyncResult(self.task_id)
if task:
task.revoke(terminate=True)
self.save()
else:
self.status = JobState.REVOKED
self.update_status(JobState.REVOKED, save=False)
self.save()

def update_status(self, status=None, save=True):
Expand Down Expand Up @@ -909,6 +920,7 @@ def update_status(self, status=None, save=True):
def update_progress(self, save=True):
"""
Update the total aggregate progress from the progress of each stage.
Also ensure the displayed progress.summary.status is in sync with job.status.
"""
if not len(self.progress.stages):
# Need at least one stage to calculate progress
Expand All @@ -927,6 +939,10 @@ def update_progress(self, save=True):
total_progress = sum([stage.progress for stage in self.progress.stages]) / len(self.progress.stages)

self.progress.summary.progress = total_progress
try:
self.progress.summary.status = JobState(self.status)
except ValueError:
self.progress.summary.status = JobState.UNKNOWN

if save:
self.save(update_progress=False)
Expand All @@ -949,6 +965,23 @@ def save(self, update_progress=True, *args, **kwargs):
if self.progress.summary.status != self.status:
logger.warning(f"Job {self} status mismatches progress: {self.progress.summary.status} != {self.status}")

def check_status(self, force: bool = False, save: bool = True) -> bool:
"""
Check if the job's Celery task still exists and update status accordingly.

This delegates to the status_checker module to avoid bloating this file.

Args:
force: Skip the recent check time limit
save: Save the job if status changes

Returns:
bool: True if job status was changed, False otherwise
"""
from ami.jobs.status import check_job_status

return check_job_status(self, force=force, save=save)

def check_custom_permission(self, user, action: str) -> bool:
job_type = self.job_type_key.lower()
if self.source_image_single:
Expand Down
Loading