Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
19 changes: 14 additions & 5 deletions src/syn_gen_scope.erl
Original file line number Diff line number Diff line change
Expand Up @@ -36,7 +36,8 @@
-export([
broadcast/2,
broadcast/3,
send_to_node/3
send_to_node/3,
send_to_node_ordered/3
]).

%% gen_server callbacks
Expand Down Expand Up @@ -127,6 +128,10 @@ broadcast(Message, ExcludedNodes, #state{multicast_pid = MulticastPid} = State)
send_to_node(RemoteNode, Message, #state{process_name = ProcessName}) ->
{ProcessName, RemoteNode} ! Message.

-spec send_to_node_ordered(RemoteNode :: node(), Message :: term(), #state{}) -> any().
send_to_node_ordered(RemoteNode, Message, #state{multicast_pid = MulticastPid, process_name = ProcessName}) ->
MulticastPid ! {send_single, RemoteNode, Message, ProcessName}.

%% ===================================================================
%% Callbacks
%% ===================================================================
Expand Down Expand Up @@ -208,9 +213,9 @@ handle_info({'3.0', discover, RemoteScopePid}, #state{
error_logger:info_msg("SYN[~s|~s<~s>] Received DISCOVER request from node ~s",
[node(), HandlerLogName, Scope, RemoteScopeNode]
),
%% send local data to remote
%% send local data to remote (ordered to maintain FIFO with broadcasts)
{ok, LocalData} = Handler:get_local_data(State),
send_to_node(RemoteScopeNode, {'3.0', ack_sync, self(), LocalData}, State),
send_to_node_ordered(RemoteScopeNode, {'3.0', ack_sync, self(), LocalData}, State),
%% is this a new node?
case maps:is_key(RemoteScopeNode, NodesMap) of
true ->
Expand Down Expand Up @@ -244,9 +249,9 @@ handle_info({'3.0', ack_sync, RemoteScopePid, Data}, #state{
false ->
%% monitor
_MRef = monitor(process, RemoteScopePid),
%% send local to remote
%% send local to remote (ordered to maintain FIFO with broadcasts)
{ok, LocalData} = Handler:get_local_data(State),
send_to_node(RemoteScopeNode, {'3.0', ack_sync, self(), LocalData}, State),
send_to_node_ordered(RemoteScopeNode, {'3.0', ack_sync, self(), LocalData}, State),
%% return
{noreply, State#state{nodes_map = NodesMap#{RemoteScopeNode => RemoteScopePid}}}
end;
Expand Down Expand Up @@ -339,6 +344,10 @@ multicast_loop() ->
end, maps:keys(NodesMap) -- ExcludedNodes),
multicast_loop();

{send_single, RemoteNode, Message, ProcessName} ->
{ProcessName, RemoteNode} ! Message,
multicast_loop();

terminate ->
terminated
end.
2 changes: 1 addition & 1 deletion src/syn_pg.erl
Original file line number Diff line number Diff line change
Expand Up @@ -425,7 +425,7 @@ handle_info({'3.0', sync_join, GroupName, Pid, Meta, Time, Reason}, #state{nodes
handle_pg_sync(GroupName, Pid, Meta, Time, Reason, State);

false ->
%% ignore, race condition
%% ignore, node not yet discovered (ack_sync not yet received)
ok
end,
{noreply, State};
Expand Down
2 changes: 1 addition & 1 deletion src/syn_registry.erl
Original file line number Diff line number Diff line change
Expand Up @@ -320,7 +320,7 @@ handle_info({'3.0', sync_register, Name, Pid, Meta, Time, Reason}, #state{nodes_
handle_registry_sync(Name, Pid, Meta, Time, Reason, State);

false ->
%% ignore, race condition
%% ignore, node not yet discovered (ack_sync not yet received)
ok
end,
{noreply, State};
Expand Down