diff --git a/include/spock_group.h b/include/spock_group.h index 8732fad9..655fc1e2 100644 --- a/include/spock_group.h +++ b/include/spock_group.h @@ -156,7 +156,7 @@ extern void spock_group_detach(void); extern bool spock_group_progress_update(const SpockApplyProgress *sap); extern void spock_group_progress_update_ptr(SpockGroupEntry *entry, const SpockApplyProgress *sap); -extern SpockApplyProgress *apply_worker_get_progress(void); +extern TimestampTz apply_worker_get_prev_remote_ts(void); extern void spock_group_resource_dump(void); extern void spock_checkpoint_hook(XLogRecPtr checkPointRedo, int flags); diff --git a/src/spock_apply.c b/src/spock_apply.c index d6d02851..0b6e7151 100644 --- a/src/spock_apply.c +++ b/src/spock_apply.c @@ -242,7 +242,7 @@ wait_for_previous_transaction(void) * loop and process this transaction. Otherwise, wait for the * predecessor to commit. */ - if (apply_worker_get_progress()->prev_remote_ts == required_commit_ts || + if (apply_worker_get_prev_remote_ts() == required_commit_ts || required_commit_ts == 0) { break; @@ -263,7 +263,7 @@ wait_for_previous_transaction(void) elog(DEBUG1, "SPOCK: slot-group '%s' WAIT for ts [current proccessed" ", required] [" INT64_FORMAT ", " INT64_FORMAT "]", MySubscription->slot_name, - apply_worker_get_progress()->prev_remote_ts, + apply_worker_get_prev_remote_ts(), required_commit_ts); /* Latch */ diff --git a/src/spock_executor.c b/src/spock_executor.c index 0cc291a1..b5d4f51d 100644 --- a/src/spock_executor.c +++ b/src/spock_executor.c @@ -247,7 +247,6 @@ spock_object_access(ObjectAccessType access, if (access == OAT_DROP) { ObjectAccessDrop *drop_arg = (ObjectAccessDrop *) arg; - ObjectAddress object; DropBehavior behavior; /* No need to check for internal deletions. */ diff --git a/src/spock_group.c b/src/spock_group.c index e0eff255..9d26e80e 100644 --- a/src/spock_group.c +++ b/src/spock_group.c @@ -304,7 +304,7 @@ progress_update_struct(SpockApplyProgress *dest, const SpockApplyProgress *src) * Value of the received_lsn potentially can exceed remote_insert_lsn * because it is reported more frequently (by keepalive messages). */ - Assert(!(dest->remote_commit_ts == 0 ^ dest->last_updated_ts == 0)); + Assert(!((dest->remote_commit_ts == 0) ^ (dest->last_updated_ts == 0))); Assert(dest->remote_commit_ts >= 0 && dest->last_updated_ts >= 0); } @@ -401,14 +401,15 @@ spock_group_progress_update_ptr(SpockGroupEntry *e, } /* - * apply_worker_get_progress + * apply_worker_get_prev_remote_ts + * + * Get the previous remote timestamp for our apply worker * - * Return a pointer to the snapshot of the current apply worker's progress. */ -SpockApplyProgress * -apply_worker_get_progress(void) +TimestampTz +apply_worker_get_prev_remote_ts(void) { - static SpockApplyProgress sap; + TimestampTz prev_remote_ts; Assert(MyApplyWorker != NULL); Assert(MyApplyWorker->apply_group != NULL); @@ -416,7 +417,7 @@ apply_worker_get_progress(void) if (MyApplyWorker && MyApplyWorker->apply_group) { LWLockAcquire(SpockCtx->apply_group_master_lock, LW_SHARED); - sap = MyApplyWorker->apply_group->progress; + prev_remote_ts = MyApplyWorker->apply_group->progress.prev_remote_ts; LWLockRelease(SpockCtx->apply_group_master_lock); } else @@ -426,7 +427,7 @@ apply_worker_get_progress(void) */ elog(ERROR, "apply worker has not been fully initialised yet"); - return &sap; + return prev_remote_ts; } /* Iterate all groups */ @@ -516,12 +517,14 @@ spock_group_resource_dump(void) hdr.version = SPOCK_RES_VERSION; hdr.system_identifier = GetSystemIdentifier(); hdr.flags = 0; + + /* Acquire lock before reading hash table to ensure consistency */ + LWLockAcquire(SpockCtx->apply_group_master_lock, LW_SHARED); + hdr.entry_count = hash_get_num_entries(SpockGroupHash); write_buf(fd, &hdr, sizeof(hdr), SPOCK_RES_DUMPFILE "(header)"); - LWLockAcquire(SpockCtx->apply_group_master_lock, LW_SHARED); - dctx.fd = fd; dctx.count = 0; @@ -671,10 +674,11 @@ spock_group_progress_update_list(List *lst) elog(LOG, "SPOCK: adjust spock.progress %d->%d to " "remote_commit_ts='%s' " - "remote_commit_lsn=%llX remote_insert_lsn=%llX", + "remote_commit_lsn=%X/%X remote_insert_lsn=%X/%X", sap->key.remote_node_id, MySubscription->target->id, timestamptz_to_str(sap->remote_commit_ts), - sap->remote_commit_lsn, sap->remote_insert_lsn); + LSN_FORMAT_ARGS(sap->remote_commit_lsn), + LSN_FORMAT_ARGS(sap->remote_insert_lsn)); } /* diff --git a/src/spock_sync.c b/src/spock_sync.c index f5c7cbfe..c6e41b5a 100644 --- a/src/spock_sync.c +++ b/src/spock_sync.c @@ -1021,8 +1021,6 @@ copy_replication_sets_data(SpockSubscription *sub, const char *origin_dsn, CHECK_FOR_INTERRUPTS(); } - adjust_progress_info(origin_conn); - /* Finish the transactions and disconnect. */ finish_copy_origin_tx(origin_conn); finish_copy_target_tx(target_conn); @@ -1162,6 +1160,7 @@ spock_sync_subscription(SpockSubscription *sub) PGconn *origin_conn_repl; char *snapshot; bool use_failover_slot; + List *progress_entries_list = NIL; elog(INFO, "initializing subscriber %s", sub->name); @@ -1177,6 +1176,7 @@ spock_sync_subscription(SpockSubscription *sub) origin_conn_repl = spock_connect_replica(sub->origin_if->dsn, sub->name, "snap"); + progress_entries_list = adjust_progress_info(origin_conn); snapshot = ensure_replication_slot_snapshot(origin_conn, origin_conn_repl, sub->slot_name, @@ -1247,6 +1247,12 @@ spock_sync_subscription(SpockSubscription *sub) sub->replication_sets, sub->slot_name); + /* + * Arrange replication status according to the just copied + * data. + */ + spock_group_progress_update_list(progress_entries_list); + /* Store info about all the synchronized tables. */ StartTransactionCommand(); foreach(lc, tables)