From a78e2cfbb4ae74405f1c683f00611d133ad6b900 Mon Sep 17 00:00:00 2001 From: Mason Sharp Date: Fri, 9 Jan 2026 13:12:31 -0800 Subject: [PATCH 1/3] Minor fixes and improvements for progress tracking - Refactor apply_worker_get_progress() to return only the timestamp needed. Return value instead of pointer to entire struct. - Lock sooner in spock_group_resource_dump() because of getting the number of entries - Properly handle progress updates after COPY operations in replication set data sync and avoid potential memory leak - Address compiler warnings --- include/spock_group.h | 2 +- src/spock_apply.c | 4 ++-- src/spock_executor.c | 1 - src/spock_group.c | 28 ++++++++++++++++------------ src/spock_sync.c | 11 ++++++++++- 5 files changed, 29 insertions(+), 17 deletions(-) diff --git a/include/spock_group.h b/include/spock_group.h index 8732fad9..655fc1e2 100644 --- a/include/spock_group.h +++ b/include/spock_group.h @@ -156,7 +156,7 @@ extern void spock_group_detach(void); extern bool spock_group_progress_update(const SpockApplyProgress *sap); extern void spock_group_progress_update_ptr(SpockGroupEntry *entry, const SpockApplyProgress *sap); -extern SpockApplyProgress *apply_worker_get_progress(void); +extern TimestampTz apply_worker_get_prev_remote_ts(void); extern void spock_group_resource_dump(void); extern void spock_checkpoint_hook(XLogRecPtr checkPointRedo, int flags); diff --git a/src/spock_apply.c b/src/spock_apply.c index d6d02851..0b6e7151 100644 --- a/src/spock_apply.c +++ b/src/spock_apply.c @@ -242,7 +242,7 @@ wait_for_previous_transaction(void) * loop and process this transaction. Otherwise, wait for the * predecessor to commit. */ - if (apply_worker_get_progress()->prev_remote_ts == required_commit_ts || + if (apply_worker_get_prev_remote_ts() == required_commit_ts || required_commit_ts == 0) { break; @@ -263,7 +263,7 @@ wait_for_previous_transaction(void) elog(DEBUG1, "SPOCK: slot-group '%s' WAIT for ts [current proccessed" ", required] [" INT64_FORMAT ", " INT64_FORMAT "]", MySubscription->slot_name, - apply_worker_get_progress()->prev_remote_ts, + apply_worker_get_prev_remote_ts(), required_commit_ts); /* Latch */ diff --git a/src/spock_executor.c b/src/spock_executor.c index 0cc291a1..b5d4f51d 100644 --- a/src/spock_executor.c +++ b/src/spock_executor.c @@ -247,7 +247,6 @@ spock_object_access(ObjectAccessType access, if (access == OAT_DROP) { ObjectAccessDrop *drop_arg = (ObjectAccessDrop *) arg; - ObjectAddress object; DropBehavior behavior; /* No need to check for internal deletions. */ diff --git a/src/spock_group.c b/src/spock_group.c index e0eff255..9d26e80e 100644 --- a/src/spock_group.c +++ b/src/spock_group.c @@ -304,7 +304,7 @@ progress_update_struct(SpockApplyProgress *dest, const SpockApplyProgress *src) * Value of the received_lsn potentially can exceed remote_insert_lsn * because it is reported more frequently (by keepalive messages). */ - Assert(!(dest->remote_commit_ts == 0 ^ dest->last_updated_ts == 0)); + Assert(!((dest->remote_commit_ts == 0) ^ (dest->last_updated_ts == 0))); Assert(dest->remote_commit_ts >= 0 && dest->last_updated_ts >= 0); } @@ -401,14 +401,15 @@ spock_group_progress_update_ptr(SpockGroupEntry *e, } /* - * apply_worker_get_progress + * apply_worker_get_prev_remote_ts + * + * Get the previous remote timestamp for our apply worker * - * Return a pointer to the snapshot of the current apply worker's progress. */ -SpockApplyProgress * -apply_worker_get_progress(void) +TimestampTz +apply_worker_get_prev_remote_ts(void) { - static SpockApplyProgress sap; + TimestampTz prev_remote_ts; Assert(MyApplyWorker != NULL); Assert(MyApplyWorker->apply_group != NULL); @@ -416,7 +417,7 @@ apply_worker_get_progress(void) if (MyApplyWorker && MyApplyWorker->apply_group) { LWLockAcquire(SpockCtx->apply_group_master_lock, LW_SHARED); - sap = MyApplyWorker->apply_group->progress; + prev_remote_ts = MyApplyWorker->apply_group->progress.prev_remote_ts; LWLockRelease(SpockCtx->apply_group_master_lock); } else @@ -426,7 +427,7 @@ apply_worker_get_progress(void) */ elog(ERROR, "apply worker has not been fully initialised yet"); - return &sap; + return prev_remote_ts; } /* Iterate all groups */ @@ -516,12 +517,14 @@ spock_group_resource_dump(void) hdr.version = SPOCK_RES_VERSION; hdr.system_identifier = GetSystemIdentifier(); hdr.flags = 0; + + /* Acquire lock before reading hash table to ensure consistency */ + LWLockAcquire(SpockCtx->apply_group_master_lock, LW_SHARED); + hdr.entry_count = hash_get_num_entries(SpockGroupHash); write_buf(fd, &hdr, sizeof(hdr), SPOCK_RES_DUMPFILE "(header)"); - LWLockAcquire(SpockCtx->apply_group_master_lock, LW_SHARED); - dctx.fd = fd; dctx.count = 0; @@ -671,10 +674,11 @@ spock_group_progress_update_list(List *lst) elog(LOG, "SPOCK: adjust spock.progress %d->%d to " "remote_commit_ts='%s' " - "remote_commit_lsn=%llX remote_insert_lsn=%llX", + "remote_commit_lsn=%X/%X remote_insert_lsn=%X/%X", sap->key.remote_node_id, MySubscription->target->id, timestamptz_to_str(sap->remote_commit_ts), - sap->remote_commit_lsn, sap->remote_insert_lsn); + LSN_FORMAT_ARGS(sap->remote_commit_lsn), + LSN_FORMAT_ARGS(sap->remote_insert_lsn)); } /* diff --git a/src/spock_sync.c b/src/spock_sync.c index f5c7cbfe..aa4711de 100644 --- a/src/spock_sync.c +++ b/src/spock_sync.c @@ -958,6 +958,7 @@ copy_replication_sets_data(SpockSubscription *sub, const char *origin_dsn, PGconn *origin_conn; PGconn *target_conn; List *tables; + List *progress_entries_list = NIL; ListCell *lc; /* Connect to origin node. */ @@ -1021,12 +1022,19 @@ copy_replication_sets_data(SpockSubscription *sub, const char *origin_dsn, CHECK_FOR_INTERRUPTS(); } - adjust_progress_info(origin_conn); + progress_entries_list = adjust_progress_info(origin_conn); /* Finish the transactions and disconnect. */ finish_copy_origin_tx(origin_conn); finish_copy_target_tx(target_conn); + /* + * Match handling in copy_tables_data(). + * Update replication progress. We must do it after commit of the COPY. + * Call below will free progress_entries_list + */ + spock_group_progress_update_list(progress_entries_list); + return tables; } @@ -1177,6 +1185,7 @@ spock_sync_subscription(SpockSubscription *sub) origin_conn_repl = spock_connect_replica(sub->origin_if->dsn, sub->name, "snap"); + adjust_progress_info(origin_conn); snapshot = ensure_replication_slot_snapshot(origin_conn, origin_conn_repl, sub->slot_name, From 7a36d8bea890e47604f5d87856912193d89838aa Mon Sep 17 00:00:00 2001 From: Mason Sharp Date: Wed, 4 Feb 2026 18:28:04 -0800 Subject: [PATCH 2/3] Remove one adjust_progress_info call --- src/spock_sync.c | 1 - 1 file changed, 1 deletion(-) diff --git a/src/spock_sync.c b/src/spock_sync.c index aa4711de..27141bd6 100644 --- a/src/spock_sync.c +++ b/src/spock_sync.c @@ -1185,7 +1185,6 @@ spock_sync_subscription(SpockSubscription *sub) origin_conn_repl = spock_connect_replica(sub->origin_if->dsn, sub->name, "snap"); - adjust_progress_info(origin_conn); snapshot = ensure_replication_slot_snapshot(origin_conn, origin_conn_repl, sub->slot_name, From d5d7c1a6b3ed9d7c0562fc2d1467674971764d42 Mon Sep 17 00:00:00 2001 From: "Andrei V. Lepikhov" Date: Thu, 5 Feb 2026 10:44:15 +0100 Subject: [PATCH 3/3] Move replication progress adjustment out of copy_replication_sets_data Previously, adjust_progress_info() and spock_group_progress_update_list() were called inside copy_replication_sets_data(), between the COPY transaction finish and function return. This was incorrect because the progress update must happen after the COPY transaction is committed on both origin and target, and the snapshot used for progress info must be consistent with the replication slot snapshot. Move adjust_progress_info() to spock_sync_subscription() right before ensure_replication_slot_snapshot(), so it captures progress using the same connection and transaction context. Move spock_group_progress_update_list() to after copy_replication_sets_data() returns, ensuring replication progress is updated only after all copied data has been committed. --- src/spock_sync.c | 18 ++++++++---------- 1 file changed, 8 insertions(+), 10 deletions(-) diff --git a/src/spock_sync.c b/src/spock_sync.c index 27141bd6..c6e41b5a 100644 --- a/src/spock_sync.c +++ b/src/spock_sync.c @@ -958,7 +958,6 @@ copy_replication_sets_data(SpockSubscription *sub, const char *origin_dsn, PGconn *origin_conn; PGconn *target_conn; List *tables; - List *progress_entries_list = NIL; ListCell *lc; /* Connect to origin node. */ @@ -1022,19 +1021,10 @@ copy_replication_sets_data(SpockSubscription *sub, const char *origin_dsn, CHECK_FOR_INTERRUPTS(); } - progress_entries_list = adjust_progress_info(origin_conn); - /* Finish the transactions and disconnect. */ finish_copy_origin_tx(origin_conn); finish_copy_target_tx(target_conn); - /* - * Match handling in copy_tables_data(). - * Update replication progress. We must do it after commit of the COPY. - * Call below will free progress_entries_list - */ - spock_group_progress_update_list(progress_entries_list); - return tables; } @@ -1170,6 +1160,7 @@ spock_sync_subscription(SpockSubscription *sub) PGconn *origin_conn_repl; char *snapshot; bool use_failover_slot; + List *progress_entries_list = NIL; elog(INFO, "initializing subscriber %s", sub->name); @@ -1185,6 +1176,7 @@ spock_sync_subscription(SpockSubscription *sub) origin_conn_repl = spock_connect_replica(sub->origin_if->dsn, sub->name, "snap"); + progress_entries_list = adjust_progress_info(origin_conn); snapshot = ensure_replication_slot_snapshot(origin_conn, origin_conn_repl, sub->slot_name, @@ -1255,6 +1247,12 @@ spock_sync_subscription(SpockSubscription *sub) sub->replication_sets, sub->slot_name); + /* + * Arrange replication status according to the just copied + * data. + */ + spock_group_progress_update_list(progress_entries_list); + /* Store info about all the synchronized tables. */ StartTransactionCommand(); foreach(lc, tables)