Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion include/spock_group.h
Original file line number Diff line number Diff line change
Expand Up @@ -156,7 +156,7 @@ extern void spock_group_detach(void);
extern bool spock_group_progress_update(const SpockApplyProgress *sap);
extern void spock_group_progress_update_ptr(SpockGroupEntry *entry,
const SpockApplyProgress *sap);
extern SpockApplyProgress *apply_worker_get_progress(void);
extern TimestampTz apply_worker_get_prev_remote_ts(void);

extern void spock_group_resource_dump(void);
extern void spock_checkpoint_hook(XLogRecPtr checkPointRedo, int flags);
Expand Down
4 changes: 2 additions & 2 deletions src/spock_apply.c
Original file line number Diff line number Diff line change
Expand Up @@ -242,7 +242,7 @@ wait_for_previous_transaction(void)
* loop and process this transaction. Otherwise, wait for the
* predecessor to commit.
*/
if (apply_worker_get_progress()->prev_remote_ts == required_commit_ts ||
if (apply_worker_get_prev_remote_ts() == required_commit_ts ||
required_commit_ts == 0)
{
break;
Expand All @@ -263,7 +263,7 @@ wait_for_previous_transaction(void)
elog(DEBUG1, "SPOCK: slot-group '%s' WAIT for ts [current proccessed"
", required] [" INT64_FORMAT ", " INT64_FORMAT "]",
MySubscription->slot_name,
apply_worker_get_progress()->prev_remote_ts,
apply_worker_get_prev_remote_ts(),
required_commit_ts);

/* Latch */
Expand Down
1 change: 0 additions & 1 deletion src/spock_executor.c
Original file line number Diff line number Diff line change
Expand Up @@ -247,7 +247,6 @@ spock_object_access(ObjectAccessType access,
if (access == OAT_DROP)
{
ObjectAccessDrop *drop_arg = (ObjectAccessDrop *) arg;
ObjectAddress object;
DropBehavior behavior;

/* No need to check for internal deletions. */
Expand Down
28 changes: 16 additions & 12 deletions src/spock_group.c
Original file line number Diff line number Diff line change
Expand Up @@ -304,7 +304,7 @@ progress_update_struct(SpockApplyProgress *dest, const SpockApplyProgress *src)
* Value of the received_lsn potentially can exceed remote_insert_lsn
* because it is reported more frequently (by keepalive messages).
*/
Assert(!(dest->remote_commit_ts == 0 ^ dest->last_updated_ts == 0));
Assert(!((dest->remote_commit_ts == 0) ^ (dest->last_updated_ts == 0)));
Assert(dest->remote_commit_ts >= 0 && dest->last_updated_ts >= 0);
}

Expand Down Expand Up @@ -401,22 +401,23 @@ spock_group_progress_update_ptr(SpockGroupEntry *e,
}

/*
* apply_worker_get_progress
* apply_worker_get_prev_remote_ts
*
* Get the previous remote timestamp for our apply worker
*
* Return a pointer to the snapshot of the current apply worker's progress.
*/
SpockApplyProgress *
apply_worker_get_progress(void)
TimestampTz
apply_worker_get_prev_remote_ts(void)
{
static SpockApplyProgress sap;
TimestampTz prev_remote_ts;

Assert(MyApplyWorker != NULL);
Assert(MyApplyWorker->apply_group != NULL);

if (MyApplyWorker && MyApplyWorker->apply_group)
{
LWLockAcquire(SpockCtx->apply_group_master_lock, LW_SHARED);
sap = MyApplyWorker->apply_group->progress;
prev_remote_ts = MyApplyWorker->apply_group->progress.prev_remote_ts;
LWLockRelease(SpockCtx->apply_group_master_lock);
}
else
Expand All @@ -426,7 +427,7 @@ apply_worker_get_progress(void)
*/
elog(ERROR, "apply worker has not been fully initialised yet");

return &sap;
return prev_remote_ts;
}

/* Iterate all groups */
Expand Down Expand Up @@ -516,12 +517,14 @@ spock_group_resource_dump(void)
hdr.version = SPOCK_RES_VERSION;
hdr.system_identifier = GetSystemIdentifier();
hdr.flags = 0;

/* Acquire lock before reading hash table to ensure consistency */
LWLockAcquire(SpockCtx->apply_group_master_lock, LW_SHARED);

hdr.entry_count = hash_get_num_entries(SpockGroupHash);

write_buf(fd, &hdr, sizeof(hdr), SPOCK_RES_DUMPFILE "(header)");

LWLockAcquire(SpockCtx->apply_group_master_lock, LW_SHARED);

dctx.fd = fd;
dctx.count = 0;

Expand Down Expand Up @@ -671,10 +674,11 @@ spock_group_progress_update_list(List *lst)

elog(LOG, "SPOCK: adjust spock.progress %d->%d to "
"remote_commit_ts='%s' "
"remote_commit_lsn=%llX remote_insert_lsn=%llX",
"remote_commit_lsn=%X/%X remote_insert_lsn=%X/%X",
sap->key.remote_node_id, MySubscription->target->id,
timestamptz_to_str(sap->remote_commit_ts),
sap->remote_commit_lsn, sap->remote_insert_lsn);
LSN_FORMAT_ARGS(sap->remote_commit_lsn),
LSN_FORMAT_ARGS(sap->remote_insert_lsn));
}

/*
Expand Down
10 changes: 8 additions & 2 deletions src/spock_sync.c
Original file line number Diff line number Diff line change
Expand Up @@ -1021,8 +1021,6 @@ copy_replication_sets_data(SpockSubscription *sub, const char *origin_dsn,
CHECK_FOR_INTERRUPTS();
}

adjust_progress_info(origin_conn);

/* Finish the transactions and disconnect. */
finish_copy_origin_tx(origin_conn);
finish_copy_target_tx(target_conn);
Expand Down Expand Up @@ -1162,6 +1160,7 @@ spock_sync_subscription(SpockSubscription *sub)
PGconn *origin_conn_repl;
char *snapshot;
bool use_failover_slot;
List *progress_entries_list = NIL;

elog(INFO, "initializing subscriber %s", sub->name);

Expand All @@ -1177,6 +1176,7 @@ spock_sync_subscription(SpockSubscription *sub)
origin_conn_repl = spock_connect_replica(sub->origin_if->dsn,
sub->name, "snap");

progress_entries_list = adjust_progress_info(origin_conn);
snapshot = ensure_replication_slot_snapshot(origin_conn,
origin_conn_repl,
sub->slot_name,
Expand Down Expand Up @@ -1247,6 +1247,12 @@ spock_sync_subscription(SpockSubscription *sub)
sub->replication_sets,
sub->slot_name);

/*
* Arrange replication status according to the just copied
* data.
*/
spock_group_progress_update_list(progress_entries_list);

/* Store info about all the synchronized tables. */
StartTransactionCommand();
foreach(lc, tables)
Expand Down