From b68607ad9356bb14f419e5989ef4aa1ce636af82 Mon Sep 17 00:00:00 2001 From: Chris McCord Date: Tue, 18 Nov 2025 23:42:44 -0500 Subject: [PATCH 1/2] Fix race condition causing stale PIDs in remote lookups sync_register/sync_join messages from multicast_loop can arrive before ack_sync from gen_server since they're different senders (no ordering guarantee). When this happens, the message was dropped because the remote node wasn't in nodes_map yet, leaving stale data from ack_sync. Fix: Include RemoteScopePid in broadcasts to allow inline discovery when sync arrives before ack_sync. Old message format still supported for rolling upgrades. --- src/syn_pg.erl | 25 ++++++++++++++++++++++--- src/syn_registry.erl | 27 +++++++++++++++++++++++---- 2 files changed, 45 insertions(+), 7 deletions(-) diff --git a/src/syn_pg.erl b/src/syn_pg.erl index 66b5a169..57375441 100644 --- a/src/syn_pg.erl +++ b/src/syn_pg.erl @@ -419,13 +419,32 @@ handle_call(Request, From, #state{scope = Scope} = State) -> {noreply, #state{}} | {noreply, #state{}, timeout() | hibernate | {continue, term()}} | {stop, Reason :: term(), #state{}}. +%% New format with RemoteScopePid - allows inline discovery +handle_info({'3.0', sync_join, GroupName, Pid, Meta, Time, Reason, RemoteScopePid}, #state{nodes_map = NodesMap} = State) -> + RemoteNode = node(Pid), + case maps:is_key(RemoteNode, NodesMap) of + true -> + handle_pg_sync(GroupName, Pid, Meta, Time, Reason, State), + {noreply, State}; + + false -> + %% Node not in nodes_map yet - sync arrived before ack_sync due to + %% different sender processes (multicast_loop vs gen_server). + %% Inline the discovery: set up monitor and add to nodes_map. + _MRef = monitor(process, RemoteScopePid), + NodesMap1 = NodesMap#{RemoteNode => RemoteScopePid}, + handle_pg_sync(GroupName, Pid, Meta, Time, Reason, State#state{nodes_map = NodesMap1}), + {noreply, State#state{nodes_map = NodesMap1}} + end; + +%% Old format for backwards compatibility (rolling upgrades) handle_info({'3.0', sync_join, GroupName, Pid, Meta, Time, Reason}, #state{nodes_map = NodesMap} = State) -> case maps:is_key(node(Pid), NodesMap) of true -> handle_pg_sync(GroupName, Pid, Meta, Time, Reason, State); false -> - %% ignore, race condition + %% ignore, cannot inline discover without RemoteScopePid ok end, {noreply, State}; @@ -573,8 +592,8 @@ do_join_on_node(GroupName, Pid, PreviousMeta, Meta, MRef, Reason, RequesterNode, %% callback syn_event_handler:call_event_handler(CallbackMethod, [Scope, GroupName, Pid, Meta, Reason]), syn_event_handler:call_event_handler(CallbackMethod, [Scope, GroupName, Pid, PreviousMeta, Meta, normal]), - %% broadcast - syn_gen_scope:broadcast({'3.0', sync_join, GroupName, Pid, Meta, Time, Reason}, [RequesterNode], State), + %% broadcast (include self() so receiver can set up monitor if needed) + syn_gen_scope:broadcast({'3.0', sync_join, GroupName, Pid, Meta, Time, Reason, self()}, [RequesterNode], State), %% return {reply, {ok, {CallbackMethod, PreviousMeta, Meta, Time, TableByName, TableByPid}}, State}. diff --git a/src/syn_registry.erl b/src/syn_registry.erl index 774e750a..b090d25f 100755 --- a/src/syn_registry.erl +++ b/src/syn_registry.erl @@ -314,13 +314,32 @@ handle_call(Request, From, #state{scope = Scope} = State) -> {noreply, #state{}} | {noreply, #state{}, timeout() | hibernate | {continue, term()}} | {stop, Reason :: term(), #state{}}. +%% New format with RemoteScopePid - allows inline discovery +handle_info({'3.0', sync_register, Name, Pid, Meta, Time, Reason, RemoteScopePid}, #state{nodes_map = NodesMap} = State) -> + RemoteNode = node(Pid), + case maps:is_key(RemoteNode, NodesMap) of + true -> + handle_registry_sync(Name, Pid, Meta, Time, Reason, State), + {noreply, State}; + + false -> + %% Node not in nodes_map yet - sync arrived before ack_sync due to + %% different sender processes (multicast_loop vs gen_server). + %% Inline the discovery: set up monitor and add to nodes_map. + _MRef = monitor(process, RemoteScopePid), + NodesMap1 = NodesMap#{RemoteNode => RemoteScopePid}, + handle_registry_sync(Name, Pid, Meta, Time, Reason, State#state{nodes_map = NodesMap1}), + {noreply, State#state{nodes_map = NodesMap1}} + end; + +%% Old format for backwards compatibility (rolling upgrades) handle_info({'3.0', sync_register, Name, Pid, Meta, Time, Reason}, #state{nodes_map = NodesMap} = State) -> case maps:is_key(node(Pid), NodesMap) of true -> handle_registry_sync(Name, Pid, Meta, Time, Reason, State); false -> - %% ignore, race condition + %% ignore, cannot inline discover without RemoteScopePid ok end, {noreply, State}; @@ -467,8 +486,8 @@ do_register_on_node(Name, Pid, PreviousMeta, Meta, MRef, Reason, RequesterNode, %% callback syn_event_handler:call_event_handler(CallbackMethod, [Scope, Name, Pid, Meta, Reason]), syn_event_handler:call_event_handler(CallbackMethod, [Scope, Name, Pid, PreviousMeta, Meta, normal]), - %% broadcast - syn_gen_scope:broadcast({'3.0', sync_register, Name, Pid, Meta, Time, Reason}, [RequesterNode], State), + %% broadcast (include self() so receiver can set up monitor if needed) + syn_gen_scope:broadcast({'3.0', sync_register, Name, Pid, Meta, Time, Reason, self()}, [RequesterNode], State), %% return {reply, {ok, {CallbackMethod, PreviousMeta, Meta, Time, TableByName, TableByPid}}, State}. @@ -690,7 +709,7 @@ resolve_conflict(Scope, Name, {Pid, Meta, Time}, {TablePid, TableMeta, TableTime ResolveTime = erlang:system_time(), add_to_local_table(Name, TablePid, TableMeta, ResolveTime, TableMRef, TableByName, TableByPid), %% broadcast to all (including remote node to update the time) - syn_gen_scope:broadcast({'3.0', sync_register, Name, TablePid, TableMeta, ResolveTime, syn_conflict_resolution}, State); + syn_gen_scope:broadcast({'3.0', sync_register, Name, TablePid, TableMeta, ResolveTime, syn_conflict_resolution, self()}, State); Invalid -> error_logger:info_msg("SYN[~s|~s<~s>] Registry CONFLICT for name ~p: ~p vs ~p -> none chosen (got: ~p)", From ceb62a83be33e861f38d4e538ad9cfa3177212bb Mon Sep 17 00:00:00 2001 From: Chris McCord Date: Sun, 11 Jan 2026 16:22:48 +0000 Subject: [PATCH 2/2] Fix race condition: route ack_sync through multicast_loop The previous fix attempted to handle sync_register arriving before ack_sync by including RemoteScopePid for inline discovery. However, the same race exists for sync_unregister vs ack_sync: 1. Node A sends ack_sync (direct from gen_server) 2. Process dies on Node A, broadcasts sync_unregister (via multicast_loop) 3. At Node B: sync_unregister arrives first (ignored - not in table yet) 4. ack_sync arrives second, adds the now-dead process 5. Stale entry persists forever Root cause: ack_sync and broadcasts use different senders (gen_server vs multicast_loop), so FIFO ordering is not guaranteed. Fix: Route ack_sync through multicast_loop via new send_to_node_ordered/3. All messages to remote nodes now flow through the same sender, guaranteeing FIFO delivery. This fixes the root cause rather than patching symptoms. The previous inline discovery mechanism is removed as it's no longer needed. --- src/syn_gen_scope.erl | 19 ++++++++++++++----- src/syn_pg.erl | 25 +++---------------------- src/syn_registry.erl | 27 ++++----------------------- 3 files changed, 21 insertions(+), 50 deletions(-) diff --git a/src/syn_gen_scope.erl b/src/syn_gen_scope.erl index 7c194a93..2a9b941e 100644 --- a/src/syn_gen_scope.erl +++ b/src/syn_gen_scope.erl @@ -36,7 +36,8 @@ -export([ broadcast/2, broadcast/3, - send_to_node/3 + send_to_node/3, + send_to_node_ordered/3 ]). %% gen_server callbacks @@ -127,6 +128,10 @@ broadcast(Message, ExcludedNodes, #state{multicast_pid = MulticastPid} = State) send_to_node(RemoteNode, Message, #state{process_name = ProcessName}) -> {ProcessName, RemoteNode} ! Message. +-spec send_to_node_ordered(RemoteNode :: node(), Message :: term(), #state{}) -> any(). +send_to_node_ordered(RemoteNode, Message, #state{multicast_pid = MulticastPid, process_name = ProcessName}) -> + MulticastPid ! {send_single, RemoteNode, Message, ProcessName}. + %% =================================================================== %% Callbacks %% =================================================================== @@ -208,9 +213,9 @@ handle_info({'3.0', discover, RemoteScopePid}, #state{ error_logger:info_msg("SYN[~s|~s<~s>] Received DISCOVER request from node ~s", [node(), HandlerLogName, Scope, RemoteScopeNode] ), - %% send local data to remote + %% send local data to remote (ordered to maintain FIFO with broadcasts) {ok, LocalData} = Handler:get_local_data(State), - send_to_node(RemoteScopeNode, {'3.0', ack_sync, self(), LocalData}, State), + send_to_node_ordered(RemoteScopeNode, {'3.0', ack_sync, self(), LocalData}, State), %% is this a new node? case maps:is_key(RemoteScopeNode, NodesMap) of true -> @@ -244,9 +249,9 @@ handle_info({'3.0', ack_sync, RemoteScopePid, Data}, #state{ false -> %% monitor _MRef = monitor(process, RemoteScopePid), - %% send local to remote + %% send local to remote (ordered to maintain FIFO with broadcasts) {ok, LocalData} = Handler:get_local_data(State), - send_to_node(RemoteScopeNode, {'3.0', ack_sync, self(), LocalData}, State), + send_to_node_ordered(RemoteScopeNode, {'3.0', ack_sync, self(), LocalData}, State), %% return {noreply, State#state{nodes_map = NodesMap#{RemoteScopeNode => RemoteScopePid}}} end; @@ -339,6 +344,10 @@ multicast_loop() -> end, maps:keys(NodesMap) -- ExcludedNodes), multicast_loop(); + {send_single, RemoteNode, Message, ProcessName} -> + {ProcessName, RemoteNode} ! Message, + multicast_loop(); + terminate -> terminated end. diff --git a/src/syn_pg.erl b/src/syn_pg.erl index 57375441..195b87e1 100644 --- a/src/syn_pg.erl +++ b/src/syn_pg.erl @@ -419,32 +419,13 @@ handle_call(Request, From, #state{scope = Scope} = State) -> {noreply, #state{}} | {noreply, #state{}, timeout() | hibernate | {continue, term()}} | {stop, Reason :: term(), #state{}}. -%% New format with RemoteScopePid - allows inline discovery -handle_info({'3.0', sync_join, GroupName, Pid, Meta, Time, Reason, RemoteScopePid}, #state{nodes_map = NodesMap} = State) -> - RemoteNode = node(Pid), - case maps:is_key(RemoteNode, NodesMap) of - true -> - handle_pg_sync(GroupName, Pid, Meta, Time, Reason, State), - {noreply, State}; - - false -> - %% Node not in nodes_map yet - sync arrived before ack_sync due to - %% different sender processes (multicast_loop vs gen_server). - %% Inline the discovery: set up monitor and add to nodes_map. - _MRef = monitor(process, RemoteScopePid), - NodesMap1 = NodesMap#{RemoteNode => RemoteScopePid}, - handle_pg_sync(GroupName, Pid, Meta, Time, Reason, State#state{nodes_map = NodesMap1}), - {noreply, State#state{nodes_map = NodesMap1}} - end; - -%% Old format for backwards compatibility (rolling upgrades) handle_info({'3.0', sync_join, GroupName, Pid, Meta, Time, Reason}, #state{nodes_map = NodesMap} = State) -> case maps:is_key(node(Pid), NodesMap) of true -> handle_pg_sync(GroupName, Pid, Meta, Time, Reason, State); false -> - %% ignore, cannot inline discover without RemoteScopePid + %% ignore, node not yet discovered (ack_sync not yet received) ok end, {noreply, State}; @@ -592,8 +573,8 @@ do_join_on_node(GroupName, Pid, PreviousMeta, Meta, MRef, Reason, RequesterNode, %% callback syn_event_handler:call_event_handler(CallbackMethod, [Scope, GroupName, Pid, Meta, Reason]), syn_event_handler:call_event_handler(CallbackMethod, [Scope, GroupName, Pid, PreviousMeta, Meta, normal]), - %% broadcast (include self() so receiver can set up monitor if needed) - syn_gen_scope:broadcast({'3.0', sync_join, GroupName, Pid, Meta, Time, Reason, self()}, [RequesterNode], State), + %% broadcast + syn_gen_scope:broadcast({'3.0', sync_join, GroupName, Pid, Meta, Time, Reason}, [RequesterNode], State), %% return {reply, {ok, {CallbackMethod, PreviousMeta, Meta, Time, TableByName, TableByPid}}, State}. diff --git a/src/syn_registry.erl b/src/syn_registry.erl index b090d25f..7f22713c 100755 --- a/src/syn_registry.erl +++ b/src/syn_registry.erl @@ -314,32 +314,13 @@ handle_call(Request, From, #state{scope = Scope} = State) -> {noreply, #state{}} | {noreply, #state{}, timeout() | hibernate | {continue, term()}} | {stop, Reason :: term(), #state{}}. -%% New format with RemoteScopePid - allows inline discovery -handle_info({'3.0', sync_register, Name, Pid, Meta, Time, Reason, RemoteScopePid}, #state{nodes_map = NodesMap} = State) -> - RemoteNode = node(Pid), - case maps:is_key(RemoteNode, NodesMap) of - true -> - handle_registry_sync(Name, Pid, Meta, Time, Reason, State), - {noreply, State}; - - false -> - %% Node not in nodes_map yet - sync arrived before ack_sync due to - %% different sender processes (multicast_loop vs gen_server). - %% Inline the discovery: set up monitor and add to nodes_map. - _MRef = monitor(process, RemoteScopePid), - NodesMap1 = NodesMap#{RemoteNode => RemoteScopePid}, - handle_registry_sync(Name, Pid, Meta, Time, Reason, State#state{nodes_map = NodesMap1}), - {noreply, State#state{nodes_map = NodesMap1}} - end; - -%% Old format for backwards compatibility (rolling upgrades) handle_info({'3.0', sync_register, Name, Pid, Meta, Time, Reason}, #state{nodes_map = NodesMap} = State) -> case maps:is_key(node(Pid), NodesMap) of true -> handle_registry_sync(Name, Pid, Meta, Time, Reason, State); false -> - %% ignore, cannot inline discover without RemoteScopePid + %% ignore, node not yet discovered (ack_sync not yet received) ok end, {noreply, State}; @@ -486,8 +467,8 @@ do_register_on_node(Name, Pid, PreviousMeta, Meta, MRef, Reason, RequesterNode, %% callback syn_event_handler:call_event_handler(CallbackMethod, [Scope, Name, Pid, Meta, Reason]), syn_event_handler:call_event_handler(CallbackMethod, [Scope, Name, Pid, PreviousMeta, Meta, normal]), - %% broadcast (include self() so receiver can set up monitor if needed) - syn_gen_scope:broadcast({'3.0', sync_register, Name, Pid, Meta, Time, Reason, self()}, [RequesterNode], State), + %% broadcast + syn_gen_scope:broadcast({'3.0', sync_register, Name, Pid, Meta, Time, Reason}, [RequesterNode], State), %% return {reply, {ok, {CallbackMethod, PreviousMeta, Meta, Time, TableByName, TableByPid}}, State}. @@ -709,7 +690,7 @@ resolve_conflict(Scope, Name, {Pid, Meta, Time}, {TablePid, TableMeta, TableTime ResolveTime = erlang:system_time(), add_to_local_table(Name, TablePid, TableMeta, ResolveTime, TableMRef, TableByName, TableByPid), %% broadcast to all (including remote node to update the time) - syn_gen_scope:broadcast({'3.0', sync_register, Name, TablePid, TableMeta, ResolveTime, syn_conflict_resolution, self()}, State); + syn_gen_scope:broadcast({'3.0', sync_register, Name, TablePid, TableMeta, ResolveTime, syn_conflict_resolution}, State); Invalid -> error_logger:info_msg("SYN[~s|~s<~s>] Registry CONFLICT for name ~p: ~p vs ~p -> none chosen (got: ~p)",