diff --git a/crates/bin/ampctl/src/cmd/job.rs b/crates/bin/ampctl/src/cmd/job.rs index 60e6c7ed3..090686ca6 100644 --- a/crates/bin/ampctl/src/cmd/job.rs +++ b/crates/bin/ampctl/src/cmd/job.rs @@ -3,6 +3,7 @@ pub mod inspect; pub mod list; pub mod prune; +pub mod resume; pub mod rm; pub mod stop; @@ -23,6 +24,10 @@ pub enum Commands { #[command(after_help = include_str!("job/stop__after_help.md"))] Stop(stop::Args), + /// Resume a stopped job + #[command(after_help = include_str!("job/resume__after_help.md"))] + Resume(resume::Args), + /// Remove job(s) by identifier or status filter #[command(alias = "remove")] #[command(after_help = include_str!("job/rm__after_help.md"))] @@ -39,6 +44,7 @@ pub async fn run(command: Commands) -> anyhow::Result<()> { Commands::List(args) => list::run(args).await?, Commands::Inspect(args) => inspect::run(args).await?, Commands::Stop(args) => stop::run(args).await?, + Commands::Resume(args) => resume::run(args).await?, Commands::Rm(args) => rm::run(args).await?, Commands::Prune(args) => prune::run(args).await?, } diff --git a/crates/bin/ampctl/src/cmd/job/resume.rs b/crates/bin/ampctl/src/cmd/job/resume.rs new file mode 100644 index 000000000..67e6c05d3 --- /dev/null +++ b/crates/bin/ampctl/src/cmd/job/resume.rs @@ -0,0 +1,98 @@ +//! Job resume command. +//! +//! Resumes a stopped job through the admin API by: +//! 1. Creating a client for the admin API +//! 2. Using the client's job resume method +//! 3. Displaying success message +//! +//! # Configuration +//! +//! - Admin URL: `--admin-url` flag or `AMP_ADMIN_URL` env var (default: `http://localhost:1610`) +//! - Logging: `AMP_LOG` env var (`error`, `warn`, `info`, `debug`, `trace`) + +use monitoring::logging; +use worker::job::JobId; + +use crate::args::GlobalArgs; + +/// Command-line arguments for the `jobs resume` command. +#[derive(Debug, clap::Args)] +pub struct Args { + #[command(flatten)] + pub global: GlobalArgs, + + /// The job ID to resume + pub id: JobId, +} + +/// Result of a job resume operation. +#[derive(serde::Serialize)] +struct ResumeResult { + job_id: JobId, +} + +impl std::fmt::Display for ResumeResult { + fn fmt(&self, f: &mut std::fmt::Formatter) -> std::fmt::Result { + writeln!( + f, + "{} Job {} resume requested", + console::style("✓").green().bold(), + self.job_id + ) + } +} + +/// Resume a job by requesting it to resume via the admin API. +/// +/// # Errors +/// +/// Returns [`Error`] for API errors (400/404/409/500) or network failures. +#[tracing::instrument(skip_all, fields(admin_url = %global.admin_url, job_id = %id))] +pub async fn run(Args { global, id }: Args) -> Result<(), Error> { + let client = global.build_client().map_err(Error::ClientBuildError)?; + + tracing::debug!("Resuming job via admin API"); + + client.jobs().resume(&id).await.map_err(|err| { + tracing::error!(error = %err, error_source = logging::error_source(&err), "Failed to resume job"); + match err { + crate::client::jobs::ResumeError::NotFound(_) => Error::JobNotFound { id }, + _ => Error::ResumeJobError(err), + } + })?; + let result = ResumeResult { job_id: id }; + global.print(&result).map_err(Error::JsonSerialization)?; + + Ok(()) +} + +/// Errors for job resume operations. +#[derive(Debug, thiserror::Error)] +pub enum Error { + /// Failed to build client + #[error("failed to build admin API client")] + ClientBuildError(#[source] crate::args::BuildClientError), + + /// Job not found + /// + /// This occurs when the job ID is valid but no job + /// record exists with that ID in the metadata database. + #[error("job not found: {id}")] + JobNotFound { id: JobId }, + + /// Error resuming job via admin API + /// + /// This occurs when the resume request fails due to: + /// - Invalid job ID format + /// - Network or connection errors + /// - Metadata database errors + /// + /// Note: The resume operation is idempotent - resuming a job that's already + /// in a Scheduled or Running state returns success. + #[error("failed to resume job")] + ResumeJobError(#[source] crate::client::jobs::ResumeError), + + /// Failed to serialize result to JSON + #[error("failed to serialize result to JSON")] + JsonSerialization(#[source] serde_json::Error), +} diff --git a/crates/bin/ampctl/src/cmd/job/resume__after_help.md b/crates/bin/ampctl/src/cmd/job/resume__after_help.md new file mode 100644 index 000000000..b9a03c2de --- /dev/null +++ b/crates/bin/ampctl/src/cmd/job/resume__after_help.md @@ -0,0 +1,13 @@ +## Examples + +Resume a job: + $ ampctl job resume 12345 + +With custom admin URL: + $ ampctl job resume 12345 --admin-url http://prod-server:1610 + +## Notes + +Resuming a job is a request, not immediate start. The worker node will +attempt to finish current operations cleanly. Check job status with +`ampctl job inspect ` to confirm it resumed. \ No newline at end of file diff --git a/crates/clients/admin/src/jobs.rs b/crates/clients/admin/src/jobs.rs index e71cc2329..643ba7447 100644 --- a/crates/clients/admin/src/jobs.rs +++ b/crates/clients/admin/src/jobs.rs @@ -31,6 +31,13 @@ fn job_stop(id: &JobId) -> String { format!("jobs/{id}/stop") } +/// Build URL path for resuming a job. +/// +/// PUT `/jobs/{id}/resume` +fn job_resume(id: &JobId) -> String { + format!("jobs/{id}/resume") +} + /// Build URL path for deleting a job by ID. /// /// DELETE `/jobs/{id}` @@ -229,6 +236,88 @@ impl<'a> JobsClient<'a> { } } + /// Resume a stopped job by ID. + /// + /// PUTs to `/jobs/{id}/resume` endpoint. + /// + /// This operation is idempotent - resuming a job that's already in a scheduled or running state returns success (200). + /// + /// # Errors + /// + /// Returns [`StopError`] for network errors, API errors (400/404/500), + /// or unexpected responses. + #[tracing::instrument(skip(self), fields(job_id = %id))] + pub async fn resume(&self, id: &JobId) -> Result<(), ResumeError> { + let url = self + .client + .base_url() + .join(&job_resume(id)) + .expect("valid URL"); + + tracing::debug!("Sending PUT request to resume job"); + + let response = self + .client + .http() + .put(url.as_str()) + .send() + .await + .map_err(|err| ResumeError::Network { + url: url.to_string(), + source: err, + })?; + + let status = response.status(); + tracing::debug!(status = %status, "Received API response"); + + match status.as_u16() { + 200 => { + tracing::debug!("Job stop request processed successfully"); + Ok(()) + } + 400 | 404 | 500 => { + let text = response.text().await.map_err(|err| { + tracing::error!(status = %status, error = %err, error_source = logging::error_source(&err), "Failed to read error response"); + ResumeError::UnexpectedResponse { + status: status.as_u16(), + message: format!("Failed to read error response: {}", err), + } + })?; + + let error_response: ErrorResponse = serde_json::from_str(&text).map_err(|err| { + tracing::error!(status = %status, error = %err, error_source = logging::error_source(&err), "Failed to parse error response"); + ResumeError::UnexpectedResponse { + status: status.as_u16(), + message: text.clone(), + } + })?; + + match error_response.error_code.as_str() { + "INVALID_JOB_ID" => Err(ResumeError::InvalidJobId(error_response.into())), + "JOB_NOT_FOUND" => Err(ResumeError::NotFound(error_response.into())), + "STOP_JOB_ERROR" => Err(ResumeError::ResumeJobError(error_response.into())), + "UNEXPECTED_STATE_CONFLICT" => { + Err(ResumeError::UnexpectedStateConflict(error_response.into())) + } + _ => Err(ResumeError::UnexpectedResponse { + status: status.as_u16(), + message: text, + }), + } + } + _ => { + let text = response + .text() + .await + .unwrap_or_else(|_| String::from("Failed to read response body")); + Err(ResumeError::UnexpectedResponse { + status: status.as_u16(), + message: text, + }) + } + } + } + /// Delete a job by ID. /// /// DELETEs to `/jobs/{id}` endpoint. @@ -681,6 +770,47 @@ pub enum StopError { UnexpectedResponse { status: u16, message: String }, } +/// Errors that can occur when resuming a job. +#[derive(Debug, thiserror::Error)] +pub enum ResumeError { + /// The job ID in the URL path is invalid (400, INVALID_JOB_ID) + /// + /// This occurs when the ID cannot be parsed as a valid JobId. + #[error("invalid job ID")] + InvalidJobId(#[source] ApiError), + + /// Job not found (404, JOB_NOT_FOUND) + /// + /// This occurs when the job ID is valid but no job + /// record exists with that ID in the metadata database. + #[error("job not found")] + NotFound(#[source] ApiError), + + /// Database error during stop operation (500, STOP_JOB_ERROR) + /// + /// This occurs when: + /// - Database connection fails or is lost during the transaction + /// - Transaction conflicts or deadlocks occur + /// - Database constraint violations are encountered + #[error("failed to stop job")] + ResumeJobError(#[source] ApiError), + + /// Unexpected state conflict during stop operation (500, UNEXPECTED_STATE_CONFLICT) + /// + /// This indicates an internal inconsistency in the state machine. + /// It should not occur under normal operation and indicates a bug. + #[error("unexpected state conflict")] + UnexpectedStateConflict(#[source] ApiError), + + /// Network or connection error + #[error("network error connecting to {url}")] + Network { url: String, source: reqwest::Error }, + + /// Unexpected response from API + #[error("unexpected response (status {status}): {message}")] + UnexpectedResponse { status: u16, message: String }, +} + /// Errors that can occur when deleting a job by ID. #[derive(Debug, thiserror::Error)] pub enum DeleteByIdError { diff --git a/crates/core/metadata-db/src/jobs.rs b/crates/core/metadata-db/src/jobs.rs index abe72930b..63906d4d0 100644 --- a/crates/core/metadata-db/src/jobs.rs +++ b/crates/core/metadata-db/src/jobs.rs @@ -79,6 +79,44 @@ where } } +/// Update job status to Scheduled +/// +/// This function will only update the job status if it's currently in a valid state +/// to be resumed (Stopped). If the job is already scheduled, this is +/// considered success (idempotent behavior). If the job is in a running or scheduled state, this returns a conflict error. +/// +/// Returns an error if the job doesn't exist, is in a terminal state, or if there's a database error. +/// +/// **Note:** This function does not send notifications. The caller is responsible for +/// calling `send_job_notification` after successful status update if worker notification +/// is required. +#[tracing::instrument(skip(exe), err)] +pub async fn request_resume<'c, E>( + exe: E, + job_id: impl Into + std::fmt::Debug, +) -> Result<(), Error> +where + E: Executor<'c>, +{ + // Try to update job status + match sql::update_status_if_any_state( + exe, + job_id.into(), + &[JobStatus::Stopped], + JobStatus::Scheduled, + ) + .await + { + Ok(()) => Ok(()), + // Check if the job is already scheduled (idempotent behavior) + Err(JobStatusUpdateError::StateConflict { + actual: JobStatus::Scheduled, + .. + }) => Ok(()), + Err(err) => Err(Error::JobStatusUpdate(err)), + } +} + /// List jobs with cursor-based pagination support, optionally filtered by status /// /// Uses cursor-based pagination where `last_job_id` is the ID of the last job diff --git a/crates/services/admin-api/src/handlers/jobs.rs b/crates/services/admin-api/src/handlers/jobs.rs index fa5ac25cc..e576a8b03 100644 --- a/crates/services/admin-api/src/handlers/jobs.rs +++ b/crates/services/admin-api/src/handlers/jobs.rs @@ -5,4 +5,5 @@ pub mod delete_by_id; pub mod get_all; pub mod get_by_id; pub mod job_info; +pub mod resume; pub mod stop; diff --git a/crates/services/admin-api/src/handlers/jobs/resume.rs b/crates/services/admin-api/src/handlers/jobs/resume.rs new file mode 100644 index 000000000..c3c161441 --- /dev/null +++ b/crates/services/admin-api/src/handlers/jobs/resume.rs @@ -0,0 +1,198 @@ +//! Jobs stop handler + +use axum::{ + extract::{Path, State, rejection::PathRejection}, + http::StatusCode, +}; +use monitoring::logging; +use worker::job::JobId; + +use crate::{ + ctx::Ctx, + handlers::error::{ErrorResponse, IntoErrorResponse}, + scheduler, +}; + +/// Handler for the `PUT /jobs/{id}/resume` endpoint +/// +/// Resumes a stopped job using the specified job ID. This is an idempotent +/// operation that handles job termination requests safely. +/// +/// ## Path Parameters +/// - `id`: The unique identifier of the job to resume (must be a valid JobId) +/// +/// ## Response +/// - **200 OK**: Job resume request processed successfully, or job already in scheduled or running state (idempotent) +/// - **400 Bad Request**: Invalid job ID format (not parseable as JobId) +/// - **404 Not Found**: Job with the given ID does not exist +/// - **500 Internal Server Error**: Database connection or scheduler error +/// +/// ## Error Codes +/// - `INVALID_JOB_ID`: The provided ID is not a valid job identifier +/// - `JOB_NOT_FOUND`: No job exists with the given ID +/// - `RESUME_JOB_ERROR`: Database error during resume operation execution +/// - `UNEXPECTED_STATE_CONFLICT`: Internal state machine error (indicates a bug) +/// +/// ## Idempotent Behavior +/// This handler is idempotent - resuming a job that's already in a scheduled or running state returns success (200). +/// This allows clients to safely retry stop requests without worrying about conflict errors. +/// +/// The desired outcome of a resume request is that the job is running. If the job is already +/// stopped, completed, or failed, this outcome is achieved, so we return success. +/// +/// ## Behavior +/// This handler provides idempotent job resuming with the following characteristics: +/// - Jobs already in scheduled or running states return success (idempotent) +/// - Only stopped jobs transition to scheduled state +/// - Job lookup and resume request are performed atomically within a single transaction +/// - Database layer validates state transitions and prevents race conditions +/// +/// ## State Transitions +/// Valid resume transitions: +/// - Stopped → Scheduled (200 OK) +/// +/// Already scheduled or running (idempotent - return success): +/// - Scheduled → no change (200 OK) +/// - Running → no change (200 OK) +/// +/// The handler: +/// - Validates and extracts the job ID from the URL path +/// - Delegates to scheduler for atomic resume operation (job lookup + resume + worker notification) +/// - Returns success if job is already in scheduled or running state (idempotent) +/// - Returns appropriate HTTP status codes and error messages +#[tracing::instrument(skip_all, err)] +#[cfg_attr( + feature = "utoipa", + utoipa::path( + put, + path = "/jobs/{id}/resume", + tag = "jobs", + operation_id = "jobs_resume", + params( + ("id" = i64, Path, description = "Job ID") + ), + responses( + (status = 200, description = "Job resume request processed successfully, or job already in scheduled or running state (idempotent)"), + (status = 400, description = "Invalid job ID", body = crate::handlers::error::ErrorResponse), + (status = 404, description = "Job not found", body = crate::handlers::error::ErrorResponse), + (status = 500, description = "Internal server error", body = crate::handlers::error::ErrorResponse) + ) + ) +)] +pub async fn handler( + State(ctx): State, + path: Result, PathRejection>, +) -> Result { + let id = match path { + Ok(Path(id)) => id, + Err(err) => { + tracing::debug!(error = %err, error_source = logging::error_source(&err), "invalid job ID in path"); + return Err(Error::InvalidId { err }.into()); + } + }; + + // Attempt to resume the job - this operation is atomic and includes job lookup + if let Err(err) = ctx.scheduler.resume_job(id).await { + return match err { + scheduler::ResumeJobError::JobNotFound => { + tracing::debug!(job_id=%id, "Job not found"); + Err(Error::NotFound { id }.into()) + } + scheduler::ResumeJobError::JobAlreadyRunning { status } => { + // Idempotent behavior: job is already in scheduled or running state + tracing::debug!(job_id=%id, status=%status, "Job already in scheduled or running state, returning success (idempotent)"); + Ok(StatusCode::OK) + } + scheduler::ResumeJobError::StateConflict { current_status } => { + // Unexpected state conflict - this shouldn't happen with current state machine + tracing::error!( + job_id=%id, + current_status=%current_status, + "Unexpected state conflict during resume operation" + ); + Err(Error::UnexpectedStateConflict { + id, + current_status: current_status.to_string(), + } + .into()) + } + scheduler::ResumeJobError::BeginTransaction(_) + | scheduler::ResumeJobError::GetJob(_) + | scheduler::ResumeJobError::UpdateJobStatus(_) + | scheduler::ResumeJobError::CommitTransaction(_) + | scheduler::ResumeJobError::SendNotification(_) => { + tracing::error!(error = %err, error_source = logging::error_source(&err), job_id=%id, "Database error during resume operation"); + Err(Error::ResumeJob { id, source: err }.into()) + } + }; + } + + tracing::info!(job_id=%id, "Job resume request processed successfully"); + Ok(StatusCode::OK) +} + +#[derive(Debug, thiserror::Error)] +pub enum Error { + /// The job ID in the URL path is invalid + /// + /// This occurs when the ID cannot be parsed as a valid JobId. + #[error("invalid job ID: {err}")] + InvalidId { err: PathRejection }, + + /// Job not found + /// + /// This occurs when the job ID is valid but no job + /// record exists with that ID in the metadata database. + #[error("job '{id}' not found")] + NotFound { id: JobId }, + + /// Database error during resume operation + /// + /// This error occurs when the scheduler encounters a database error + /// while executing the atomic resume operation (job lookup, state transition, or worker notification). + /// + /// This occurs when: + /// - Database connection fails or is lost during the transaction + /// - Transaction conflicts or deadlocks occur + /// - Database constraint violations are encountered + #[error("failed to resume job '{id}'")] + ResumeJob { + id: JobId, + source: scheduler::ResumeJobError, + }, + + /// Unexpected state conflict during resume operation + /// + /// This error indicates an internal inconsistency in the state machine. + /// It should not occur under normal operation and indicates a bug that + /// requires investigation. + /// + /// This occurs when: + /// - State machine has invalid transition rules + /// - Concurrent state modifications occur without proper locking + /// - Database state corruption is detected + #[error("unexpected state conflict for job '{id}': current state is '{current_status}'")] + UnexpectedStateConflict { id: JobId, current_status: String }, +} + +impl IntoErrorResponse for Error { + fn error_code(&self) -> &'static str { + match self { + Error::InvalidId { .. } => "INVALID_JOB_ID", + Error::NotFound { .. } => "JOB_NOT_FOUND", + Error::ResumeJob { .. } => "RESUME_JOB_ERROR", + Error::UnexpectedStateConflict { .. } => "UNEXPECTED_STATE_CONFLICT", + } + } + + fn status_code(&self) -> StatusCode { + match self { + Error::InvalidId { .. } => StatusCode::BAD_REQUEST, + Error::NotFound { .. } => StatusCode::NOT_FOUND, + Error::ResumeJob { .. } => StatusCode::INTERNAL_SERVER_ERROR, + + // Internal server error for unexpected state conflicts (indicates a bug) + Error::UnexpectedStateConflict { .. } => StatusCode::INTERNAL_SERVER_ERROR, + } + } +} diff --git a/crates/services/admin-api/src/lib.rs b/crates/services/admin-api/src/lib.rs index 523e0b378..36199fe45 100644 --- a/crates/services/admin-api/src/lib.rs +++ b/crates/services/admin-api/src/lib.rs @@ -59,6 +59,7 @@ pub fn router(ctx: Ctx) -> Router<()> { get(jobs::get_by_id::handler).delete(jobs::delete_by_id::handler), ) .route("/jobs/{id}/stop", put(jobs::stop::handler)) + .route("/jobs/{id}/resume", put(jobs::resume::handler)) .route( "/manifests", get(manifests::list_all::handler) diff --git a/crates/services/admin-api/src/scheduler.rs b/crates/services/admin-api/src/scheduler.rs index b2367306c..5f2701424 100644 --- a/crates/services/admin-api/src/scheduler.rs +++ b/crates/services/admin-api/src/scheduler.rs @@ -64,6 +64,9 @@ pub trait SchedulerJobs: Send + Sync { /// Stop a running job async fn stop_job(&self, job_id: JobId) -> Result<(), StopJobError>; + /// Resume a stopped job + async fn resume_job(&self, job_id: JobId) -> Result<(), ResumeJobError>; + /// Get a job by its ID async fn get_job(&self, job_id: JobId) -> Result, GetJobError>; @@ -243,6 +246,79 @@ pub enum StopJobError { SendNotification(#[source] metadata_db::Error), } +/// Errors that can occur when resuming a job +#[derive(Debug, thiserror::Error)] +pub enum ResumeJobError { + /// Job not found + /// + /// This occurs when: + /// - No job exists with the specified ID in the database + /// - The job was deleted after the resume request was initiated + #[error("job not found")] + JobNotFound, + + /// Job is already in a scheduled, running state + /// + /// This occurs when: + /// - Job was previously resumed by another request + /// - Job is already in a scheduled, running state + #[error("job is already in scheduled or running state: {status}")] + JobAlreadyRunning { status: JobStatus }, + + /// Job state conflict - cannot resume from current state + /// + /// This occurs when: + /// - Job is in an unexpected state that doesn't allow resuming + /// - Concurrent state modifications created an invalid transition + #[error("cannot resume job from current state: {current_status}")] + StateConflict { current_status: JobStatus }, + + /// Failed to begin transaction for resume operation + /// + /// This occurs when: + /// - Database connection pool is exhausted + /// - Database connection fails or is lost + /// - Transaction initialization encounters an error + #[error("failed to begin transaction")] + BeginTransaction(#[source] metadata_db::Error), + + /// Failed to retrieve job information during resume operation + /// + /// This occurs when: + /// - Database query fails during job lookup + /// - Connection is lost during the transaction + /// - Query execution encounters an error + #[error("failed to get job")] + GetJob(#[source] metadata_db::Error), + + /// Failed to update job status to scheduled + /// + /// This occurs when: + /// - Database update operation fails + /// - Transaction encounters a constraint violation + /// - Connection is lost during status update + #[error("failed to update job status")] + UpdateJobStatus(#[source] metadata_db::Error), + + /// Failed to commit transaction for resume operation + /// + /// This occurs when: + /// - Transaction commit fails due to conflicts + /// - Connection is lost before commit completes + /// - Database encounters an error during commit + #[error("failed to commit transaction")] + CommitTransaction(#[source] metadata_db::Error), + + /// Failed to send resume notification to worker + /// + /// This occurs when: + /// - Worker notification channel fails + /// - Database notification system encounters an error + /// - Connection is lost during notification + #[error("failed to send resume notification")] + SendNotification(#[source] metadata_db::Error), +} + /// Error when getting a job from the metadata database /// /// This occurs when: diff --git a/crates/services/controller/src/scheduler.rs b/crates/services/controller/src/scheduler.rs index cf634d181..57760230c 100644 --- a/crates/services/controller/src/scheduler.rs +++ b/crates/services/controller/src/scheduler.rs @@ -28,8 +28,8 @@ use std::time::Duration; use admin_api::scheduler::{ DeleteJobError, DeleteJobsByStatusError, GetJobError, GetWorkerError, ListJobsByDatasetError, - ListJobsError, ListWorkersError, ScheduleJobError, SchedulerJobs, SchedulerWorkers, - StopJobError, + ListJobsError, ListWorkersError, ResumeJobError, ScheduleJobError, SchedulerJobs, + SchedulerWorkers, StopJobError, }; use async_trait::async_trait; use dataset_store::DatasetKind; @@ -196,6 +196,64 @@ impl Scheduler { Ok(()) } + /// Resume a stopped job with transactional guarantees + /// + /// Fetches the job, validates its state, updates it to scheduled, and notifies the + /// worker within a single database transaction for atomicity. + async fn resume_job_impl(&self, job_id: JobId) -> Result<(), ResumeJobError> { + // Begin a transaction to ensure atomicity + let mut tx = self + .metadata_db + .begin_txn() + .await + .map_err(ResumeJobError::BeginTransaction)?; + + // Fetch the job to get its node_id and validate it exists + let job = metadata_db::jobs::get_by_id(&mut tx, &job_id) + .await + .map_err(ResumeJobError::GetJob)? + .ok_or(ResumeJobError::JobNotFound)?; + + // Attempt to resume the job + metadata_db::jobs::request_resume(&mut tx, &job_id) + .await + .map_err(|err| match err { + MetadataDbError::JobStatusUpdate(JobStatusUpdateError::NotFound) => { + ResumeJobError::JobNotFound + } + MetadataDbError::JobStatusUpdate(JobStatusUpdateError::StateConflict { + actual, + .. + }) => match actual.into() { + JobStatus::Scheduled | JobStatus::Running => { + ResumeJobError::JobAlreadyRunning { + status: actual.into(), + } + } + _ => ResumeJobError::StateConflict { + current_status: actual.into(), + }, + }, + other => ResumeJobError::UpdateJobStatus(other), + })?; + + // Notify the worker about the resume request (within the transaction) + metadata_db::workers::send_job_notif( + &mut tx, + job.node_id, + &JobNotification::resume(job_id), + ) + .await + .map_err(ResumeJobError::SendNotification)?; + + // Commit the transaction + tx.commit() + .await + .map_err(ResumeJobError::CommitTransaction)?; + + Ok(()) + } + /// Reconcile failed jobs by retrying them with exponential backoff /// /// This method: @@ -281,6 +339,10 @@ impl SchedulerJobs for Scheduler { self.stop_job_impl(job_id).await } + async fn resume_job(&self, job_id: JobId) -> Result<(), ResumeJobError> { + self.resume_job_impl(job_id).await + } + async fn get_job(&self, job_id: JobId) -> Result, GetJobError> { let job = metadata_db::jobs::get_by_id(&self.metadata_db, &job_id) .await diff --git a/crates/services/worker/src/job/notif.rs b/crates/services/worker/src/job/notif.rs index 485b9c2ed..0b18c236f 100644 --- a/crates/services/worker/src/job/notif.rs +++ b/crates/services/worker/src/job/notif.rs @@ -27,6 +27,15 @@ impl Notification { action: Action::Stop, } } + + /// Create a new resume action + #[must_use] + pub fn resume(job_id: JobId) -> Self { + Self { + job_id, + action: Action::Resume, + } + } } /// Job notification actions @@ -44,6 +53,11 @@ pub enum Action { /// Stop the job: mark the job as stopped and release the locations by deleting /// the row from the `jobs` table. Stop, + + /// Resume the job + /// + /// Resume the job: Fetch the job from the Metadata DB job queue and resume the job + Resume, } impl Action { @@ -53,6 +67,7 @@ impl Action { match self { Self::Start => "START", Self::Stop => "STOP", + Self::Resume => "RESUME", } } } @@ -70,6 +85,7 @@ impl std::str::FromStr for Action { match s { s if s.eq_ignore_ascii_case("START") => Ok(Self::Start), s if s.eq_ignore_ascii_case("STOP") => Ok(Self::Stop), + s if s.eq_ignore_ascii_case("RESUME") => Ok(Self::Resume), _ => Err(format!("Invalid action variant: {s}").into()), } } diff --git a/crates/services/worker/src/service.rs b/crates/services/worker/src/service.rs index 34cd8596b..a6fcc0e61 100644 --- a/crates/services/worker/src/service.rs +++ b/crates/services/worker/src/service.rs @@ -254,7 +254,7 @@ impl Worker { action: JobAction, ) -> Result<(), NotificationError> { match action { - JobAction::Start => { + JobAction::Start | JobAction::Resume => { // Load the job from the queue (retry on failure) let job = self.queue.get_job(job_id).await.map_err(|err| { NotificationError::StartActionFailed { diff --git a/tests/src/tests/it_admin_api_datasets_resume_job.rs b/tests/src/tests/it_admin_api_datasets_resume_job.rs new file mode 100644 index 000000000..dca80b2cf --- /dev/null +++ b/tests/src/tests/it_admin_api_datasets_resume_job.rs @@ -0,0 +1,405 @@ +//! Integration tests for the PUT /jobs/{id}/resume endpoint. +//! +//! These tests verify the job resume functionality including: +//! - Resuming a stopped job +//! - Idempotent behavior when resuming already scheduled/running jobs +//! - Error handling for nonexistent jobs +//! - Error handling for jobs in terminal states (completed, failed) + +use ampctl::client::{ + self, + jobs::{JobInfo, ResumeError}, +}; +use datasets_common::{ + fqn::FullyQualifiedName, name::Name, namespace::Namespace, reference::Reference, + revision::Revision, version::Version, +}; +use datasets_derived::Manifest as DerivedDatasetManifest; +use dump::EndBlock; +use serde_json::value::RawValue; +use worker::job::JobId; + +use crate::testlib::ctx::TestCtxBuilder; + +#[tokio::test] +async fn resume_nonexistent_job_returns_404() { + //* Given + let ctx = TestCtx::setup("test_resume_nonexistent").await; + let fake_job_id = JobId::try_from(999999i64).expect("valid job ID"); + + //* When + let result = ctx.resume_job(&fake_job_id).await; + + //* Then + assert!(result.is_err(), "resume should fail for nonexistent job"); + let err = result.unwrap_err(); + match err { + ResumeError::NotFound(api_err) => { + assert_eq!( + api_err.error_code, "JOB_NOT_FOUND", + "Expected JOB_NOT_FOUND error code, got: {}", + api_err.error_code + ); + } + _ => panic!("Expected NotFound error, got: {:?}", err), + } +} + +#[tokio::test] +async fn resume_stopped_job_succeeds() { + //* Given + let ctx = TestCtx::setup("test_resume_stopped").await; + + // Deploy dataset (schedules a job) + let job_id = ctx + .deploy_dataset(None) + .await + .expect("dataset deployment should succeed"); + + // Give job scheduler time to process + tokio::time::sleep(tokio::time::Duration::from_millis(100)).await; + + // Stop the job + ctx.stop_job(&job_id) + .await + .expect("stopping the job should succeed"); + + // Wait for job to transition to stopped state + tokio::time::sleep(tokio::time::Duration::from_millis(100)).await; + + //* When + let job_info_before = ctx + .inspect_job(&job_id) + .await + .expect("failed to inspect job"); + let result = ctx.resume_job(&job_id).await; + let job_info_after = ctx + .inspect_job(&job_id) + .await + .expect("failed to inspect job"); + + //* Then + assert_eq!( + job_info_before.status, "STOPPED", + "job should be in stopped state" + ); + assert!( + result.is_ok(), + "resuming stopped job should succeed: {:?}", + result.err() + ); + assert!( + job_info_after.status == "RUNNING" || job_info_after.status == "SCHEDULED", + "job should be in RUNNING or SCHEDULED state, got: {}", + job_info_after.status + ); +} + +#[tokio::test] +async fn resume_already_running_job_is_idempotent() { + //* Given + let ctx = TestCtx::setup("test_resume_scheduled").await; + + // Deploy dataset (schedules a job - starts in Scheduled state) + let job_id = ctx + .deploy_dataset(None) + .await + .expect("dataset deployment should succeed"); + + //* When + let job_info_before = ctx + .inspect_job(&job_id) + .await + .expect("failed to inspect job"); + let result = ctx.resume_job(&job_id).await; + let job_info_after = ctx + .inspect_job(&job_id) + .await + .expect("failed to inspect job"); + + //* Then + assert!( + job_info_before.status == "SCHEDULED" || job_info_before.status == "RUNNING", + "job should be in SCHEDULED or RUNNING state before resume, got: {}", + job_info_before.status + ); + assert!( + result.is_ok(), + "resuming already scheduled job should succeed (idempotent): {:?}", + result.err() + ); + assert!( + job_info_after.status == "SCHEDULED" || job_info_after.status == "RUNNING", + "job should remain in SCHEDULED or RUNNING state after resume, got: {}", + job_info_after.status + ); +} + +#[tokio::test] +async fn resume_completed_job_should_fail() { + //* Given + let ctx = TestCtx::setup("test_resume_completed").await; + + // Deploy dataset + let job_id = ctx + .deploy_dataset(Some(EndBlock::Absolute(15000001))) + .await + .expect("dataset deployment should succeed"); + + // Wait for job to complete + let timeout = tokio::time::Duration::from_secs(30); + let poll_interval = tokio::time::Duration::from_millis(200); + let start = tokio::time::Instant::now(); + loop { + let job_info = ctx + .inspect_job(&job_id) + .await + .expect("failed to inspect job"); + if job_info.status == "COMPLETED" { + break; + } + if start.elapsed() > timeout { + panic!( + "Timeout waiting for job to complete, current status: {}", + job_info.status + ); + } + tokio::time::sleep(poll_interval).await; + } + + //* When + let result = ctx.resume_job(&job_id).await; + + //* Then + match result { + Ok(res) => { + panic!("Job should be completed, got: {:?}", res); + } + Err(ResumeError::UnexpectedStateConflict(api_err)) => { + assert_eq!( + api_err.error_code, "UNEXPECTED_STATE_CONFLICT", + "Expected UNEXPECTED_STATE_CONFLICT error code, got: {}", + api_err.error_code + ); + } + Err(other) => panic!("Expected UnexpectedStateConflict or Ok, got: {:?}", other), + } +} + +#[tokio::test] +async fn resume_failed_job_should_fail() { + //* Given + let ctx = TestCtx::setup_with_derived("test_resume_failed").await; + + // Deploy derived dataset (will fail due to invalid SQL) + let job_id = ctx + .deploy_derived_dataset() + .await + .expect("dataset deployment should succeed"); + + // Wait for job to fail + tokio::time::sleep(tokio::time::Duration::from_millis(100)).await; + + //* When + let job_info = ctx + .inspect_job(&job_id) + .await + .expect("failed to inspect job"); + let result = ctx.resume_job(&job_id).await; + + //* Then + assert_eq!( + job_info.status, "FAILED", + "job should be in FAILED state, got: {}", + job_info.status + ); + match result { + Ok(res) => { + panic!( + "Job should be failed and resume should return error, got: {:?}", + res + ); + } + Err(ResumeError::UnexpectedStateConflict(api_err)) => { + assert_eq!( + api_err.error_code, "UNEXPECTED_STATE_CONFLICT", + "Expected UNEXPECTED_STATE_CONFLICT error code, got: {}", + api_err.error_code + ); + } + Err(other) => panic!("Expected UnexpectedStateConflict, got: {:?}", other), + } +} + +struct TestCtx { + _ctx: crate::testlib::ctx::TestCtx, + dataset_ref: Reference, + ampctl_client: client::Client, +} + +impl TestCtx { + async fn setup(test_name: &str) -> Self { + // Use a raw dataset (like eth_rpc) instead of a derived manifest + let dataset_ref: Reference = "_/eth_rpc@0.0.0" + .parse() + .expect("Failed to parse dataset reference"); + + let ctx = TestCtxBuilder::new(test_name) + .with_dataset_manifest(dataset_ref.name().to_string()) + .with_provider_config("rpc_eth_mainnet") + .build() + .await + .expect("failed to build test context"); + + let admin_api_url = ctx.daemon_controller().admin_api_url(); + let base_url = admin_api_url + .parse() + .expect("failed to parse admin API URL"); + + let ampctl_client = client::Client::new(base_url); + + Self { + _ctx: ctx, + dataset_ref, + ampctl_client, + } + } + + async fn setup_with_derived(test_name: &str) -> Self { + // Use eth_firehose as the base dataset for derived manifests + let dataset_ref: Reference = "_/eth_firehose@0.0.1" + .parse() + .expect("Failed to parse dataset reference"); + + let ctx = TestCtxBuilder::new(test_name) + .with_dataset_manifest(("eth_firehose", "_/eth_firehose@0.0.1")) + .with_provider_config("firehose_eth_mainnet") + .build() + .await + .expect("failed to build test context"); + + let admin_api_url = ctx.daemon_controller().admin_api_url(); + let base_url = admin_api_url + .parse() + .expect("failed to parse admin API URL"); + + let ampctl_client = client::Client::new(base_url); + + Self { + _ctx: ctx, + dataset_ref, + ampctl_client, + } + } + + async fn deploy_dataset( + &self, + end_block: Option, + ) -> Result { + self.ampctl_client + .datasets() + .deploy(&self.dataset_ref, end_block, 1, None) + .await + } + + async fn deploy_derived_dataset(&self) -> Result { + let namespace = "_".parse::().expect("valid namespace"); + let name = "failing_derived_test" + .parse::() + .expect("valid dataset name"); + let version = "1.0.0".parse::().expect("valid version"); + + // Register a derived dataset with invalid SQL that will cause the job to fail + let manifest = create_failing_manifest(); + let manifest_str = + serde_json::to_string(&manifest).expect("failed to serialize manifest to JSON"); + + self.register_dataset(&namespace, &name, &version, &manifest_str) + .await + .expect("dataset registration should succeed"); + + // Wait for worker to be ready + tokio::time::sleep(tokio::time::Duration::from_millis(100)).await; + + let reference = Reference::new( + namespace.clone(), + name.clone(), + Revision::Version(version.clone()), + ); + self.ampctl_client + .datasets() + .deploy(&reference, None, 1, None) + .await + } + + async fn register_dataset( + &self, + namespace: &Namespace, + name: &Name, + version: &Version, + manifest: &str, + ) -> Result<(), client::datasets::RegisterError> { + let fqn = FullyQualifiedName::new(namespace.clone(), name.clone()); + let manifest_json: Box = + serde_json::from_str(manifest).expect("failed to parse manifest JSON"); + self.ampctl_client + .datasets() + .register(&fqn, Some(version), manifest_json) + .await + } + + async fn stop_job(&self, job_id: &JobId) -> Result<(), client::jobs::StopError> { + self.ampctl_client.jobs().stop(job_id).await + } + + async fn resume_job(&self, job_id: &JobId) -> Result<(), ResumeError> { + self.ampctl_client.jobs().resume(job_id).await + } + + async fn inspect_job(&self, job_id: &JobId) -> Result { + self.ampctl_client + .jobs() + .get(job_id) + .await + .map(|job| job.unwrap()) + } +} + +/// Create a manifest with invalid SQL that will cause the job to fail +fn create_failing_manifest() -> DerivedDatasetManifest { + let manifest_json = indoc::indoc! {r#" + { + "kind": "manifest", + "dependencies": { + "eth_firehose": "_/eth_firehose@0.0.1" + }, + "tables": { + "failing_table": { + "input": { + "sql": "SELECT block_num FROM eth_firehose.blocks" + }, + "schema": { + "arrow": { + "fields": [ + { + "name": "_block_num", + "type": "UInt64", + "nullable": false + }, + { + "name": "block_num", + "type": "UInt64", + "nullable": false + } + ] + } + }, + "network": "mainnet" + } + }, + "functions": {} + } + "#}; + + serde_json::from_str(manifest_json).expect("failed to parse manifest JSON") +} diff --git a/tests/src/tests/mod.rs b/tests/src/tests/mod.rs index 729688fb1..a68581e89 100644 --- a/tests/src/tests/mod.rs +++ b/tests/src/tests/mod.rs @@ -1,6 +1,7 @@ mod it_admin_api_datasets_list_jobs; mod it_admin_api_datasets_manifest; mod it_admin_api_datasets_register; +mod it_admin_api_datasets_resume_job; mod it_admin_api_schema; mod it_client; mod it_dependencies;