diff --git a/server/internal/workflows/backend/etcd/etcd.go b/server/internal/workflows/backend/etcd/etcd.go index 0e6cf5ea..cc077251 100644 --- a/server/internal/workflows/backend/etcd/etcd.go +++ b/server/internal/workflows/backend/etcd/etcd.go @@ -1,6 +1,7 @@ package etcd import ( + "cmp" "context" "errors" "fmt" @@ -67,11 +68,11 @@ func (b *Backend) CreateWorkflowInstance(ctx context.Context, instance *workflow Metadata: attrs.Metadata, State: core.WorkflowInstanceStateActive, }), - b.store.PendingEvent.Put(&pending_event.Value{ - WorkflowInstanceID: instance.InstanceID, - WorkflowExecutionID: instance.ExecutionID, - Event: event, - }), + b.store.PendingEvent.Put(pending_event.NewValue( + instance.InstanceID, + instance.ExecutionID, + event, + )), b.store.WorkflowQueueItem.Put(&workflow_queue_item.Value{ WorkflowInstance: instance, CreatedAt: time.Now(), @@ -103,11 +104,11 @@ func (b *Backend) CancelWorkflowInstance(ctx context.Context, instance *workflow } err = b.store.PendingEvent. - Create(&pending_event.Value{ - WorkflowInstanceID: instance.InstanceID, - WorkflowExecutionID: instance.ExecutionID, - Event: cancelEvent, - }). + Create(pending_event.NewValue( + instance.InstanceID, + instance.ExecutionID, + cancelEvent, + )). Exec(ctx) if err != nil { return fmt.Errorf("failed to create pending event: %w", err) @@ -219,11 +220,11 @@ func (b *Backend) SignalWorkflow(ctx context.Context, instanceID string, event * } executionID := instance.WorkflowInstance.ExecutionID err = b.store.PendingEvent. - Create(&pending_event.Value{ - WorkflowInstanceID: instanceID, - WorkflowExecutionID: executionID, - Event: event, - }). + Create(pending_event.NewValue( + instanceID, + executionID, + event, + )). Exec(ctx) if err != nil { return fmt.Errorf("failed to signal workflow: %w", err) @@ -461,11 +462,11 @@ func (b *Backend) CompleteWorkflowTask( } for _, event := range timerEvents { ops = append(ops, - b.store.PendingEvent.Create(&pending_event.Value{ - WorkflowInstanceID: instanceID, - WorkflowExecutionID: executionID, - Event: event, - }), + b.store.PendingEvent.Create(pending_event.NewValue( + instanceID, + executionID, + event, + )), ) } groupedEvents := history.EventsByWorkflowInstance(workflowEvents) @@ -487,10 +488,10 @@ func (b *Backend) CompleteWorkflowTask( if exists { ops = append(ops, b.store.PendingEvent. - Create(&pending_event.Value{ - WorkflowInstanceID: instanceID, - WorkflowExecutionID: executionID, - Event: history.NewPendingEvent( + Create(pending_event.NewValue( + instanceID, + executionID, + history.NewPendingEvent( time.Now(), history.EventType_SubWorkflowFailed, map[string]any{ @@ -498,7 +499,7 @@ func (b *Backend) CompleteWorkflowTask( }, history.ScheduleEventID(first.WorkflowInstance.ParentEventID), ), - }), + )), ) continue } @@ -524,11 +525,11 @@ func (b *Backend) CompleteWorkflowTask( for _, event := range events { ops = append(ops, - b.store.PendingEvent.Create(&pending_event.Value{ - WorkflowInstanceID: targetInstance.InstanceID, - WorkflowExecutionID: targetInstance.ExecutionID, - Event: event.HistoryEvent, - }), + b.store.PendingEvent.Create(pending_event.NewValue( + targetInstance.InstanceID, + targetInstance.ExecutionID, + event.HistoryEvent, + )), ) } } @@ -699,11 +700,11 @@ func (b *Backend) CompleteActivityTask(ctx context.Context, task *backend.Activi task.WorkflowInstance.InstanceID, task.Event.ID, ), - b.store.PendingEvent.Create(&pending_event.Value{ - WorkflowInstanceID: task.WorkflowInstance.InstanceID, - WorkflowExecutionID: task.WorkflowInstance.ExecutionID, - Event: result, - }), + b.store.PendingEvent.Create(pending_event.NewValue( + task.WorkflowInstance.InstanceID, + task.WorkflowInstance.ExecutionID, + result, + )), ).Commit(ctx) if err != nil { return fmt.Errorf("failed to persist activity completion: %w", err) @@ -796,13 +797,11 @@ func (b *Backend) FeatureSupported(feature backend.Feature) bool { func sortPendingEvents(events []*pending_event.Value) { slices.SortStableFunc(events, func(a *pending_event.Value, b *pending_event.Value) int { - switch { - case a.Event.Timestamp.Before(b.Event.Timestamp): - return -1 - case b.Event.Timestamp.Before(a.Event.Timestamp): - return 1 - default: - return 0 + if a.CreatedAt > 0 && b.CreatedAt > 0 { + return cmp.Compare(a.CreatedAt, b.CreatedAt) } + + // Fallback to using the event timestamp for backwards compatibility. + return a.Event.Timestamp.Compare(b.Event.Timestamp) }) } diff --git a/server/internal/workflows/backend/etcd/pending_event/store.go b/server/internal/workflows/backend/etcd/pending_event/store.go index 671afe53..b01955e4 100644 --- a/server/internal/workflows/backend/etcd/pending_event/store.go +++ b/server/internal/workflows/backend/etcd/pending_event/store.go @@ -1,6 +1,8 @@ package pending_event import ( + "time" + "github.com/cschleiden/go-workflows/backend/history" clientv3 "go.etcd.io/etcd/client/v3" @@ -12,6 +14,20 @@ type Value struct { WorkflowInstanceID string `json:"workflow_instance_id"` WorkflowExecutionID string `json:"workflow_execution_id"` Event *history.Event `json:"event"` + CreatedAt int64 `json:"created_at"` +} + +func NewValue( + workflowInstanceID string, + workflowExecutionID string, + event *history.Event, +) *Value { + return &Value{ + WorkflowInstanceID: workflowInstanceID, + WorkflowExecutionID: workflowExecutionID, + Event: event, + CreatedAt: time.Now().UnixMicro(), + } } type Store struct {