From 92c89e6905f0173a1a6ede632bc628ee6fd5ca2e Mon Sep 17 00:00:00 2001 From: Moiz Ibrar Date: Mon, 19 Jan 2026 19:32:56 +0500 Subject: [PATCH 1/2] Fail add-node when sync event not confirmed MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit - Check spock.wait_for_sync_event return value instead of ignoring it - Change WaitForSyncEvent to return Query[bool] and use Scalar() - Fail add-node when sync isn’t confirmed (ErrReplicationSyncNotConfirmed) --- server/internal/database/service.go | 1 + .../database/wait_for_sync_event_resource.go | 12 +++++++++++- server/internal/postgres/create_db.go | 4 ++-- 3 files changed, 14 insertions(+), 3 deletions(-) diff --git a/server/internal/database/service.go b/server/internal/database/service.go index ab8caab5..e3e16fe5 100644 --- a/server/internal/database/service.go +++ b/server/internal/database/service.go @@ -20,6 +20,7 @@ var ( ErrInstanceStopped = errors.New("instance stopped") ErrInvalidDatabaseUpdate = errors.New("invalid database update") ErrInvalidSourceNode = errors.New("invalid source node") + ErrReplicationSyncNotConfirmed = errors.New("replication sync not confirmed") ) type Service struct { diff --git a/server/internal/database/wait_for_sync_event_resource.go b/server/internal/database/wait_for_sync_event_resource.go index 8474d34b..b44d38f4 100644 --- a/server/internal/database/wait_for_sync_event_resource.go +++ b/server/internal/database/wait_for_sync_event_resource.go @@ -73,12 +73,22 @@ func (r *WaitForSyncEventResource) Refresh(ctx context.Context, rc *resource.Con } // TODO: Set wait limit - err = postgres.WaitForSyncEvent(r.ProviderNode, syncEvent.SyncEventLsn, 100).Exec(ctx, subscriberConn) + synced, err := postgres.WaitForSyncEvent(r.ProviderNode, syncEvent.SyncEventLsn, 100).Scalar(ctx, subscriberConn) if errors.Is(err, pgx.ErrNoRows) { return resource.ErrNotFound } else if err != nil { return fmt.Errorf("failed to wait for sync event on subscriber: %w", err) } + if !synced { + return fmt.Errorf( + "%w: provider=%s subscriber=%s lsn=%s timeout_seconds=%d", + ErrReplicationSyncNotConfirmed, + r.ProviderNode, + r.SubscriberNode, + syncEvent.SyncEventLsn, + 100, + ) + } return nil } diff --git a/server/internal/postgres/create_db.go b/server/internal/postgres/create_db.go index 62746036..9b48f75c 100644 --- a/server/internal/postgres/create_db.go +++ b/server/internal/postgres/create_db.go @@ -250,8 +250,8 @@ func SyncEvent() Query[string] { } } -func WaitForSyncEvent(originNode, lsn string, timeoutSeconds int) Statement { - return Statement{ +func WaitForSyncEvent(originNode, lsn string, timeoutSeconds int) Query[bool] { + return Query[bool]{ SQL: "CALL spock.wait_for_sync_event(true, @origin_node, @lsn, @timeout);", Args: pgx.NamedArgs{ "origin_node": originNode, From 49eced4662b75a373f112befeab75e01fae9fce2 Mon Sep 17 00:00:00 2001 From: Moiz Ibrar Date: Tue, 20 Jan 2026 19:24:17 +0500 Subject: [PATCH 2/2] chore: inline replication sync error message Remove unused sentinel error and return a hardcoded message when sync isn't confirmed. --- server/internal/database/service.go | 1 - server/internal/database/wait_for_sync_event_resource.go | 4 +--- 2 files changed, 1 insertion(+), 4 deletions(-) diff --git a/server/internal/database/service.go b/server/internal/database/service.go index e3e16fe5..ab8caab5 100644 --- a/server/internal/database/service.go +++ b/server/internal/database/service.go @@ -20,7 +20,6 @@ var ( ErrInstanceStopped = errors.New("instance stopped") ErrInvalidDatabaseUpdate = errors.New("invalid database update") ErrInvalidSourceNode = errors.New("invalid source node") - ErrReplicationSyncNotConfirmed = errors.New("replication sync not confirmed") ) type Service struct { diff --git a/server/internal/database/wait_for_sync_event_resource.go b/server/internal/database/wait_for_sync_event_resource.go index b44d38f4..b1187e3c 100644 --- a/server/internal/database/wait_for_sync_event_resource.go +++ b/server/internal/database/wait_for_sync_event_resource.go @@ -80,9 +80,7 @@ func (r *WaitForSyncEventResource) Refresh(ctx context.Context, rc *resource.Con return fmt.Errorf("failed to wait for sync event on subscriber: %w", err) } if !synced { - return fmt.Errorf( - "%w: provider=%s subscriber=%s lsn=%s timeout_seconds=%d", - ErrReplicationSyncNotConfirmed, + return fmt.Errorf("replication sync not confirmed: provider=%s subscriber=%s lsn=%s timeout_seconds=%d", r.ProviderNode, r.SubscriberNode, syncEvent.SyncEventLsn,