diff --git a/server/internal/database/wait_for_sync_event_resource.go b/server/internal/database/wait_for_sync_event_resource.go index 8474d34b..b1187e3c 100644 --- a/server/internal/database/wait_for_sync_event_resource.go +++ b/server/internal/database/wait_for_sync_event_resource.go @@ -73,12 +73,20 @@ func (r *WaitForSyncEventResource) Refresh(ctx context.Context, rc *resource.Con } // TODO: Set wait limit - err = postgres.WaitForSyncEvent(r.ProviderNode, syncEvent.SyncEventLsn, 100).Exec(ctx, subscriberConn) + synced, err := postgres.WaitForSyncEvent(r.ProviderNode, syncEvent.SyncEventLsn, 100).Scalar(ctx, subscriberConn) if errors.Is(err, pgx.ErrNoRows) { return resource.ErrNotFound } else if err != nil { return fmt.Errorf("failed to wait for sync event on subscriber: %w", err) } + if !synced { + return fmt.Errorf("replication sync not confirmed: provider=%s subscriber=%s lsn=%s timeout_seconds=%d", + r.ProviderNode, + r.SubscriberNode, + syncEvent.SyncEventLsn, + 100, + ) + } return nil } diff --git a/server/internal/postgres/create_db.go b/server/internal/postgres/create_db.go index 62746036..9b48f75c 100644 --- a/server/internal/postgres/create_db.go +++ b/server/internal/postgres/create_db.go @@ -250,8 +250,8 @@ func SyncEvent() Query[string] { } } -func WaitForSyncEvent(originNode, lsn string, timeoutSeconds int) Statement { - return Statement{ +func WaitForSyncEvent(originNode, lsn string, timeoutSeconds int) Query[bool] { + return Query[bool]{ SQL: "CALL spock.wait_for_sync_event(true, @origin_node, @lsn, @timeout);", Args: pgx.NamedArgs{ "origin_node": originNode,