-
Notifications
You must be signed in to change notification settings - Fork 31
feat: Add idempotency key support with TTL #136
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
base: main
Are you sure you want to change the base?
Changes from all commits
d84893a
3b0ea72
202a520
60e2149
48faf47
File filter
Filter by extension
Conversations
Jump to
Diff view
Diff view
There are no files selected for viewing
| Original file line number | Diff line number | Diff line change | ||||||||||||||||||||||||||||||||||||||||||
|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|
|
|
@@ -14,6 +14,7 @@ import { | |||||||||||||||||||||||||||||||||||||||||||
| CreateWorkflowRunParams, | ||||||||||||||||||||||||||||||||||||||||||||
| GetStepAttemptParams, | ||||||||||||||||||||||||||||||||||||||||||||
| GetWorkflowRunParams, | ||||||||||||||||||||||||||||||||||||||||||||
| GetWorkflowRunByIdempotencyKeyParams, | ||||||||||||||||||||||||||||||||||||||||||||
| ExtendWorkflowRunLeaseParams, | ||||||||||||||||||||||||||||||||||||||||||||
| ListStepAttemptsParams, | ||||||||||||||||||||||||||||||||||||||||||||
| ListWorkflowRunsParams, | ||||||||||||||||||||||||||||||||||||||||||||
|
|
@@ -83,6 +84,18 @@ export class BackendPostgres implements Backend { | |||||||||||||||||||||||||||||||||||||||||||
| async createWorkflowRun( | ||||||||||||||||||||||||||||||||||||||||||||
| params: CreateWorkflowRunParams, | ||||||||||||||||||||||||||||||||||||||||||||
| ): Promise<WorkflowRun> { | ||||||||||||||||||||||||||||||||||||||||||||
| // Check for existing run with same idempotency key (within TTL window) | ||||||||||||||||||||||||||||||||||||||||||||
| if (params.idempotencyKey) { | ||||||||||||||||||||||||||||||||||||||||||||
| const existing = await this.getWorkflowRunByIdempotencyKey({ | ||||||||||||||||||||||||||||||||||||||||||||
| workflowName: params.workflowName, | ||||||||||||||||||||||||||||||||||||||||||||
| idempotencyKey: params.idempotencyKey, | ||||||||||||||||||||||||||||||||||||||||||||
| createdAfter: params.idempotencyKeyCreatedAfter, | ||||||||||||||||||||||||||||||||||||||||||||
| }); | ||||||||||||||||||||||||||||||||||||||||||||
| if (existing) { | ||||||||||||||||||||||||||||||||||||||||||||
| return existing; | ||||||||||||||||||||||||||||||||||||||||||||
| } | ||||||||||||||||||||||||||||||||||||||||||||
| } | ||||||||||||||||||||||||||||||||||||||||||||
|
|
||||||||||||||||||||||||||||||||||||||||||||
| const [workflowRun] = await this.pg<WorkflowRun[]>` | ||||||||||||||||||||||||||||||||||||||||||||
| INSERT INTO "openworkflow"."workflow_runs" ( | ||||||||||||||||||||||||||||||||||||||||||||
| "namespace_id", | ||||||||||||||||||||||||||||||||||||||||||||
|
|
@@ -138,6 +151,26 @@ export class BackendPostgres implements Backend { | |||||||||||||||||||||||||||||||||||||||||||
| return workflowRun ?? null; | ||||||||||||||||||||||||||||||||||||||||||||
| } | ||||||||||||||||||||||||||||||||||||||||||||
|
|
||||||||||||||||||||||||||||||||||||||||||||
| async getWorkflowRunByIdempotencyKey( | ||||||||||||||||||||||||||||||||||||||||||||
| params: GetWorkflowRunByIdempotencyKeyParams, | ||||||||||||||||||||||||||||||||||||||||||||
| ): Promise<WorkflowRun | null> { | ||||||||||||||||||||||||||||||||||||||||||||
| const createdAfterClause = params.createdAfter | ||||||||||||||||||||||||||||||||||||||||||||
| ? this.pg`AND "created_at" > ${params.createdAfter}` | ||||||||||||||||||||||||||||||||||||||||||||
| : this.pg``; | ||||||||||||||||||||||||||||||||||||||||||||
|
|
||||||||||||||||||||||||||||||||||||||||||||
| const [workflowRun] = await this.pg<WorkflowRun[]>` | ||||||||||||||||||||||||||||||||||||||||||||
| SELECT * | ||||||||||||||||||||||||||||||||||||||||||||
| FROM "openworkflow"."workflow_runs" | ||||||||||||||||||||||||||||||||||||||||||||
| WHERE "namespace_id" = ${this.namespaceId} | ||||||||||||||||||||||||||||||||||||||||||||
| AND "workflow_name" = ${params.workflowName} | ||||||||||||||||||||||||||||||||||||||||||||
|
Comment on lines
+159
to
+165
|
||||||||||||||||||||||||||||||||||||||||||||
| : this.pg``; | |
| const [workflowRun] = await this.pg<WorkflowRun[]>` | |
| SELECT * | |
| FROM "openworkflow"."workflow_runs" | |
| WHERE "namespace_id" = ${this.namespaceId} | |
| AND "workflow_name" = ${params.workflowName} | |
| : this.pg``; | |
| const workflowVersionClause = | |
| // Only scope by workflow version when explicitly provided to preserve | |
| // existing behavior for callers that do not pass a version. | |
| (params as any).workflowVersion | |
| ? this.pg`AND "workflow_version" = ${(params as any).workflowVersion}` | |
| : this.pg``; | |
| const [workflowRun] = await this.pg<WorkflowRun[]>` | |
| SELECT * | |
| FROM "openworkflow"."workflow_runs" | |
| WHERE "namespace_id" = ${this.namespaceId} | |
| AND "workflow_name" = ${params.workflowName} | |
| ${workflowVersionClause} |
Copilot
AI
Dec 31, 2025
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
The query lacks an ORDER BY clause. If multiple workflow runs exist with the same workflow name and idempotency key (e.g., created before and after the TTL cutoff), the query may return a non-deterministic result. Consider adding ORDER BY created_at DESC to ensure the most recent run is returned consistently.
| ${createdAfterClause} | |
| ${createdAfterClause} | |
| ORDER BY "created_at" DESC |
| Original file line number | Diff line number | Diff line change | ||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||
|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|
|
|
@@ -19,6 +19,7 @@ import { | |||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||
| CreateWorkflowRunParams, | ||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||
| GetStepAttemptParams, | ||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||
| GetWorkflowRunParams, | ||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||
| GetWorkflowRunByIdempotencyKeyParams, | ||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||
| ExtendWorkflowRunLeaseParams, | ||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||
| ListStepAttemptsParams, | ||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||
| ListWorkflowRunsParams, | ||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||
|
|
@@ -85,6 +86,18 @@ export class BackendSqlite implements Backend { | |||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||
| async createWorkflowRun( | ||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||
| params: CreateWorkflowRunParams, | ||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||
| ): Promise<WorkflowRun> { | ||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||
| // Check for existing run with same idempotency key (within TTL window) | ||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||
| if (params.idempotencyKey) { | ||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||
| const existing = await this.getWorkflowRunByIdempotencyKey({ | ||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||
| workflowName: params.workflowName, | ||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||
| idempotencyKey: params.idempotencyKey, | ||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||
| createdAfter: params.idempotencyKeyCreatedAfter, | ||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||
| }); | ||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||
| if (existing) { | ||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||
| return existing; | ||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||
| } | ||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||
| } | ||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||
|
|
||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||
|
Comment on lines
+89
to
+100
|
||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||
| // Check for existing run with same idempotency key (within TTL window) | |
| if (params.idempotencyKey) { | |
| const existing = await this.getWorkflowRunByIdempotencyKey({ | |
| workflowName: params.workflowName, | |
| idempotencyKey: params.idempotencyKey, | |
| createdAfter: params.idempotencyKeyCreatedAfter, | |
| }); | |
| if (existing) { | |
| return existing; | |
| } | |
| } | |
| // When an idempotency key is provided, the check-and-insert must be atomic | |
| // to avoid creating duplicate workflow runs under concurrent requests. | |
| if (params.idempotencyKey) { | |
| this.db.exec("BEGIN IMMEDIATE TRANSACTION"); | |
| try { | |
| // Re-check within the transaction to ensure we see any concurrently | |
| // created workflow run with the same idempotency key. | |
| const existing = await this.getWorkflowRunByIdempotencyKey({ | |
| workflowName: params.workflowName, | |
| idempotencyKey: params.idempotencyKey, | |
| createdAfter: params.idempotencyKeyCreatedAfter, | |
| }); | |
| if (existing) { | |
| this.db.exec("COMMIT"); | |
| return existing; | |
| } | |
| const id = generateUUID(); | |
| const currentTime = now(); | |
| const availableAt = params.availableAt | |
| ? toISO(params.availableAt) | |
| : currentTime; | |
| const stmt = this.db.prepare(` | |
| INSERT INTO "workflow_runs" ( | |
| "namespace_id", | |
| "id", | |
| "workflow_name", | |
| "version", | |
| "status", | |
| "idempotency_key", | |
| "config", | |
| "context", | |
| "input", | |
| "attempts", | |
| "available_at", | |
| "deadline_at", | |
| "created_at", | |
| "updated_at" | |
| ) | |
| VALUES (?, ?, ?, ?, 'pending', ?, ?, ?, ?, 0, ?, ?, ?, ?) | |
| `); | |
| stmt.run( | |
| this.namespaceId, | |
| id, | |
| params.workflowName, | |
| params.version, | |
| params.idempotencyKey, | |
| toJSON(params.config), | |
| toJSON(params.context), | |
| toJSON(params.input), | |
| availableAt, | |
| toISO(params.deadlineAt), | |
| currentTime, | |
| currentTime, | |
| ); | |
| this.db.exec("COMMIT"); | |
| const workflowRun = await this.getWorkflowRun({ workflowRunId: id }); | |
| if (!workflowRun) throw new Error("Failed to create workflow run"); | |
| return workflowRun; | |
| } catch (error) { | |
| this.db.exec("ROLLBACK"); | |
| throw error; | |
| } | |
| } | |
| // For non-idempotent workflow runs, we preserve the existing behavior. |
Copilot
AI
Dec 31, 2025
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
The query lacks an ORDER BY clause. If multiple workflow runs exist with the same workflow name and idempotency key (e.g., created before and after the TTL cutoff), the query may return a non-deterministic result. Consider adding ORDER BY created_at DESC to ensure the most recent run is returned consistently.
| ${createdAfterClause} | |
| ${createdAfterClause} | |
| ORDER BY "created_at" DESC |
Copilot
AI
Dec 31, 2025
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
The idempotency check does not consider the workflow version. If the same idempotency key is used for different versions of the same workflow (e.g., "v1" and "v2"), it will return the existing run instead of creating a new one for the different version. Consider whether version should be part of the idempotency key lookup to allow the same key to be reused across different workflow versions, or document this limitation clearly.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Potential race condition: Two concurrent requests with the same idempotency key could both find no existing run and create duplicate workflow runs. The check-then-insert pattern is not atomic. Consider using a database-level constraint (UNIQUE on workflow_name + idempotency_key) with INSERT ... ON CONFLICT to handle this atomically, or implement a proper distributed lock.