Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
33 changes: 33 additions & 0 deletions packages/backend-postgres/backend.ts
Original file line number Diff line number Diff line change
Expand Up @@ -14,6 +14,7 @@ import {
CreateWorkflowRunParams,
GetStepAttemptParams,
GetWorkflowRunParams,
GetWorkflowRunByIdempotencyKeyParams,
ExtendWorkflowRunLeaseParams,
ListStepAttemptsParams,
ListWorkflowRunsParams,
Expand Down Expand Up @@ -83,6 +84,18 @@ export class BackendPostgres implements Backend {
async createWorkflowRun(
params: CreateWorkflowRunParams,
): Promise<WorkflowRun> {
// Check for existing run with same idempotency key (within TTL window)
if (params.idempotencyKey) {
const existing = await this.getWorkflowRunByIdempotencyKey({
workflowName: params.workflowName,
idempotencyKey: params.idempotencyKey,
createdAfter: params.idempotencyKeyCreatedAfter,
});
if (existing) {
return existing;
}
}
Comment on lines +87 to +97
Copy link

Copilot AI Dec 31, 2025

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Potential race condition: Two concurrent requests with the same idempotency key could both find no existing run and create duplicate workflow runs. The check-then-insert pattern is not atomic. Consider using a database-level constraint (UNIQUE on workflow_name + idempotency_key) with INSERT ... ON CONFLICT to handle this atomically, or implement a proper distributed lock.

Copilot uses AI. Check for mistakes.

const [workflowRun] = await this.pg<WorkflowRun[]>`
INSERT INTO "openworkflow"."workflow_runs" (
"namespace_id",
Expand Down Expand Up @@ -138,6 +151,26 @@ export class BackendPostgres implements Backend {
return workflowRun ?? null;
}

async getWorkflowRunByIdempotencyKey(
params: GetWorkflowRunByIdempotencyKeyParams,
): Promise<WorkflowRun | null> {
const createdAfterClause = params.createdAfter
? this.pg`AND "created_at" > ${params.createdAfter}`
: this.pg``;

const [workflowRun] = await this.pg<WorkflowRun[]>`
SELECT *
FROM "openworkflow"."workflow_runs"
WHERE "namespace_id" = ${this.namespaceId}
AND "workflow_name" = ${params.workflowName}
Comment on lines +159 to +165
Copy link

Copilot AI Dec 31, 2025

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The idempotency check does not consider the workflow version. If the same idempotency key is used for different versions of the same workflow (e.g., "v1" and "v2"), it will return the existing run instead of creating a new one for the different version. Consider whether version should be part of the idempotency key lookup to allow the same key to be reused across different workflow versions, or document this limitation clearly.

Suggested change
: this.pg``;
const [workflowRun] = await this.pg<WorkflowRun[]>`
SELECT *
FROM "openworkflow"."workflow_runs"
WHERE "namespace_id" = ${this.namespaceId}
AND "workflow_name" = ${params.workflowName}
: this.pg``;
const workflowVersionClause =
// Only scope by workflow version when explicitly provided to preserve
// existing behavior for callers that do not pass a version.
(params as any).workflowVersion
? this.pg`AND "workflow_version" = ${(params as any).workflowVersion}`
: this.pg``;
const [workflowRun] = await this.pg<WorkflowRun[]>`
SELECT *
FROM "openworkflow"."workflow_runs"
WHERE "namespace_id" = ${this.namespaceId}
AND "workflow_name" = ${params.workflowName}
${workflowVersionClause}

Copilot uses AI. Check for mistakes.
AND "idempotency_key" = ${params.idempotencyKey}
${createdAfterClause}
Copy link

Copilot AI Dec 31, 2025

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The query lacks an ORDER BY clause. If multiple workflow runs exist with the same workflow name and idempotency key (e.g., created before and after the TTL cutoff), the query may return a non-deterministic result. Consider adding ORDER BY created_at DESC to ensure the most recent run is returned consistently.

Suggested change
${createdAfterClause}
${createdAfterClause}
ORDER BY "created_at" DESC

Copilot uses AI. Check for mistakes.
LIMIT 1
`;

return workflowRun ?? null;
}

async listWorkflowRuns(
params: ListWorkflowRunsParams,
): Promise<PaginatedResponse<WorkflowRun>> {
Expand Down
44 changes: 44 additions & 0 deletions packages/backend-sqlite/backend.ts
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,7 @@ import {
CreateWorkflowRunParams,
GetStepAttemptParams,
GetWorkflowRunParams,
GetWorkflowRunByIdempotencyKeyParams,
ExtendWorkflowRunLeaseParams,
ListStepAttemptsParams,
ListWorkflowRunsParams,
Expand Down Expand Up @@ -85,6 +86,18 @@ export class BackendSqlite implements Backend {
async createWorkflowRun(
params: CreateWorkflowRunParams,
): Promise<WorkflowRun> {
// Check for existing run with same idempotency key (within TTL window)
if (params.idempotencyKey) {
const existing = await this.getWorkflowRunByIdempotencyKey({
workflowName: params.workflowName,
idempotencyKey: params.idempotencyKey,
createdAfter: params.idempotencyKeyCreatedAfter,
});
if (existing) {
return existing;
}
}

Comment on lines +89 to +100
Copy link

Copilot AI Dec 31, 2025

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Potential race condition: Two concurrent requests with the same idempotency key could both find no existing run and create duplicate workflow runs. The check-then-insert pattern is not atomic. Consider using a database-level constraint (UNIQUE on workflow_name + idempotency_key) with INSERT ... ON CONFLICT OR IGNORE to handle this atomically, or wrap this in a transaction with proper isolation level.

Suggested change
// Check for existing run with same idempotency key (within TTL window)
if (params.idempotencyKey) {
const existing = await this.getWorkflowRunByIdempotencyKey({
workflowName: params.workflowName,
idempotencyKey: params.idempotencyKey,
createdAfter: params.idempotencyKeyCreatedAfter,
});
if (existing) {
return existing;
}
}
// When an idempotency key is provided, the check-and-insert must be atomic
// to avoid creating duplicate workflow runs under concurrent requests.
if (params.idempotencyKey) {
this.db.exec("BEGIN IMMEDIATE TRANSACTION");
try {
// Re-check within the transaction to ensure we see any concurrently
// created workflow run with the same idempotency key.
const existing = await this.getWorkflowRunByIdempotencyKey({
workflowName: params.workflowName,
idempotencyKey: params.idempotencyKey,
createdAfter: params.idempotencyKeyCreatedAfter,
});
if (existing) {
this.db.exec("COMMIT");
return existing;
}
const id = generateUUID();
const currentTime = now();
const availableAt = params.availableAt
? toISO(params.availableAt)
: currentTime;
const stmt = this.db.prepare(`
INSERT INTO "workflow_runs" (
"namespace_id",
"id",
"workflow_name",
"version",
"status",
"idempotency_key",
"config",
"context",
"input",
"attempts",
"available_at",
"deadline_at",
"created_at",
"updated_at"
)
VALUES (?, ?, ?, ?, 'pending', ?, ?, ?, ?, 0, ?, ?, ?, ?)
`);
stmt.run(
this.namespaceId,
id,
params.workflowName,
params.version,
params.idempotencyKey,
toJSON(params.config),
toJSON(params.context),
toJSON(params.input),
availableAt,
toISO(params.deadlineAt),
currentTime,
currentTime,
);
this.db.exec("COMMIT");
const workflowRun = await this.getWorkflowRun({ workflowRunId: id });
if (!workflowRun) throw new Error("Failed to create workflow run");
return workflowRun;
} catch (error) {
this.db.exec("ROLLBACK");
throw error;
}
}
// For non-idempotent workflow runs, we preserve the existing behavior.

Copilot uses AI. Check for mistakes.
const id = generateUUID();
const currentTime = now();
const availableAt = params.availableAt
Expand Down Expand Up @@ -147,6 +160,37 @@ export class BackendSqlite implements Backend {
return Promise.resolve(row ? rowToWorkflowRun(row) : null);
}

getWorkflowRunByIdempotencyKey(
params: GetWorkflowRunByIdempotencyKeyParams,
): Promise<WorkflowRun | null> {
const createdAfterClause = params.createdAfter
? `AND "created_at" > ?`
: "";

const stmt = this.db.prepare(`
SELECT *
FROM "workflow_runs"
WHERE "namespace_id" = ?
AND "workflow_name" = ?
AND "idempotency_key" = ?
${createdAfterClause}
Copy link

Copilot AI Dec 31, 2025

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The query lacks an ORDER BY clause. If multiple workflow runs exist with the same workflow name and idempotency key (e.g., created before and after the TTL cutoff), the query may return a non-deterministic result. Consider adding ORDER BY created_at DESC to ensure the most recent run is returned consistently.

Suggested change
${createdAfterClause}
${createdAfterClause}
ORDER BY "created_at" DESC

Copilot uses AI. Check for mistakes.
Comment on lines +163 to +176
Copy link

Copilot AI Dec 31, 2025

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The idempotency check does not consider the workflow version. If the same idempotency key is used for different versions of the same workflow (e.g., "v1" and "v2"), it will return the existing run instead of creating a new one for the different version. Consider whether version should be part of the idempotency key lookup to allow the same key to be reused across different workflow versions, or document this limitation clearly.

Copilot uses AI. Check for mistakes.
LIMIT 1
`);

const queryParams: (string | null)[] = [
this.namespaceId,
params.workflowName,
params.idempotencyKey,
];
if (params.createdAfter) {
queryParams.push(toISO(params.createdAfter));
}

const row = stmt.get(...queryParams) as WorkflowRunRow | undefined;

return Promise.resolve(row ? rowToWorkflowRun(row) : null);
}

async claimWorkflowRun(
params: ClaimWorkflowRunParams,
): Promise<WorkflowRun | null> {
Expand Down
Loading
Loading