diff --git a/changes/unreleased/Added-20260122-185129.yaml b/changes/unreleased/Added-20260122-185129.yaml new file mode 100644 index 00000000..93295749 --- /dev/null +++ b/changes/unreleased/Added-20260122-185129.yaml @@ -0,0 +1,3 @@ +kind: Added +body: Improved etcd resilience by updating peer URLs when host IPs or hostnames change. +time: 2026-01-22T18:51:29.295792+05:30 diff --git a/server/internal/ds/set.go b/server/internal/ds/set.go index 4e414fa9..ef829c77 100644 --- a/server/internal/ds/set.go +++ b/server/internal/ds/set.go @@ -125,3 +125,9 @@ func (s Set[T]) ToSortedSlice(cmp func(a T, b T) int) []T { func SetDifference[T comparable](a, b []T) Set[T] { return NewSet(a...).Difference(NewSet(b...)) } + +// SetSymmetricDifference (a-b)∪(b-a) returns the set of elements that are in +// either a or b, but not both. +func SetSymmetricDifference[T comparable](a, b []T) Set[T] { + return NewSet(a...).SymmetricDifference(NewSet(b...)) +} diff --git a/server/internal/etcd/embedded.go b/server/internal/etcd/embedded.go index bf49606b..5b01e583 100644 --- a/server/internal/etcd/embedded.go +++ b/server/internal/etcd/embedded.go @@ -171,6 +171,13 @@ func (e *EmbeddedEtcd) start(ctx context.Context) error { return fmt.Errorf("failed to get internal etcd client: %w", err) } + urlsChanged, updateErr := UpdateMemberPeerURLs(ctx, client, appCfg.HostID, e.PeerEndpoints()) + if updateErr != nil { + e.logger.Warn().Err(updateErr).Msg("failed to update peer URLs after IP/hostname change") + } else if urlsChanged { + e.logger.Info().Msg("peer URLs updated due to IP/hostname change") + } + e.certSvc, err = certificateService(ctx, e.cfg.Config(), client) if err != nil { return err @@ -237,7 +244,10 @@ func (e *EmbeddedEtcd) Join(ctx context.Context, options JoinOptions) error { // This server will have an empty member name in the members list, so we // add this server to the list separately. name := e.cfg.Config().HostID - peers := []string{fmt.Sprintf("%s=%s", name, e.PeerEndpoints()[0])} + var peers []string + for _, peerURL := range e.PeerEndpoints() { + peers = append(peers, fmt.Sprintf("%s=%s", name, peerURL)) + } for _, m := range members.Members { // Empty name indicates a member that hasn't started, including this // server. @@ -247,7 +257,9 @@ func (e *EmbeddedEtcd) Join(ctx context.Context, options JoinOptions) error { if len(m.PeerURLs) < 1 { return fmt.Errorf("member %q has no peer URLs", m.Name) } - peers = append(peers, fmt.Sprintf("%s=%s", m.Name, m.PeerURLs[0])) + for _, peerURL := range m.PeerURLs { + peers = append(peers, fmt.Sprintf("%s=%s", m.Name, peerURL)) + } } etcdCfg.InitialCluster = strings.Join(peers, ",") etcdCfg.ClusterState = embed.ClusterStateFlagExisting @@ -307,15 +319,19 @@ func (e *EmbeddedEtcd) Error() <-chan error { func (e *EmbeddedEtcd) ClientEndpoints() []string { appCfg := e.cfg.Config() + clientPort := appCfg.EtcdServer.ClientPort return []string{ - fmt.Sprintf("https://%s:%d", appCfg.IPv4Address, appCfg.EtcdServer.ClientPort), + fmt.Sprintf("https://%s:%d", appCfg.IPv4Address, clientPort), + fmt.Sprintf("https://%s:%d", appCfg.Hostname, clientPort), } } func (e *EmbeddedEtcd) PeerEndpoints() []string { appCfg := e.cfg.Config() + peerPort := appCfg.EtcdServer.PeerPort return []string{ - fmt.Sprintf("https://%s:%d", appCfg.IPv4Address, appCfg.EtcdServer.PeerPort), + fmt.Sprintf("https://%s:%d", appCfg.IPv4Address, peerPort), + fmt.Sprintf("https://%s:%d", appCfg.Hostname, peerPort), } } @@ -637,13 +653,17 @@ func embedConfig(cfg config.Config, logger zerolog.Logger) (*embed.Config, error } c.AdvertiseClientUrls = []url.URL{ {Scheme: "https", Host: fmt.Sprintf("%s:%d", myIP, clientPort)}, + {Scheme: "https", Host: fmt.Sprintf("%s:%d", cfg.Hostname, clientPort)}, } + c.ListenPeerUrls = []url.URL{ {Scheme: "https", Host: fmt.Sprintf("0.0.0.0:%d", peerPort)}, } c.AdvertisePeerUrls = []url.URL{ {Scheme: "https", Host: fmt.Sprintf("%s:%d", myIP, peerPort)}, + {Scheme: "https", Host: fmt.Sprintf("%s:%d", cfg.Hostname, peerPort)}, } + // This will get overridden when joining an existing cluster c.InitialCluster = fmt.Sprintf( "%s=http://%s:%d", diff --git a/server/internal/etcd/embedded_test.go b/server/internal/etcd/embedded_test.go index 2a7801ba..67d0d3c8 100644 --- a/server/internal/etcd/embedded_test.go +++ b/server/internal/etcd/embedded_test.go @@ -367,6 +367,60 @@ func TestEmbeddedEtcd(t *testing.T) { err = serverA.RemoveHost(ctx, cfgB.HostID) require.ErrorIs(t, err, etcd.ErrMinimumClusterSize) }) + + t.Run("endpoints include both IP and hostname URLs", func(t *testing.T) { + ctx := t.Context() + + cfg := config.Config{ + HostID: uuid.NewString(), + DataDir: t.TempDir(), + EtcdMode: config.EtcdModeServer, + IPv4Address: "127.0.0.1", + Hostname: "localhost", + EtcdClient: config.EtcdClient{ + LogLevel: "debug", + }, + EtcdServer: config.EtcdServer{ + LogLevel: "debug", + ClientPort: storagetest.GetFreePort(t), + PeerPort: storagetest.GetFreePort(t), + }, + } + + server := etcd.NewEmbeddedEtcd(cfgMgr(t, cfg), testutils.Logger(t)) + require.NotNil(t, server) + + err := server.Start(ctx) + require.NoError(t, err) + t.Cleanup(func() { + server.Shutdown() + }) + + // Verify PeerEndpoints includes both IP and hostname + peerEndpoints := server.PeerEndpoints() + assert.Len(t, peerEndpoints, 2, "PeerEndpoints should include both IP and hostname URLs") + assert.Contains(t, peerEndpoints[0], "127.0.0.1", "First peer endpoint should contain IP") + assert.Contains(t, peerEndpoints[1], "localhost", "Second peer endpoint should contain hostname") + + // Verify ClientEndpoints includes both IP and hostname + clientEndpoints := server.ClientEndpoints() + assert.Len(t, clientEndpoints, 2, "ClientEndpoints should include both IP and hostname URLs") + assert.Contains(t, clientEndpoints[0], "127.0.0.1", "First client endpoint should contain IP") + assert.Contains(t, clientEndpoints[1], "localhost", "Second client endpoint should contain hostname") + + // Verify the etcd member list contains both peer URLs + client, err := server.GetClient() + require.NoError(t, err) + + members, err := client.MemberList(ctx) + require.NoError(t, err) + require.Len(t, members.Members, 1) + + member := members.Members[0] + assert.Len(t, member.PeerURLs, 2, "Member should have both IP and hostname peer URLs") + assert.Len(t, member.ClientURLs, 2, "Member should have both IP and hostname client URLs") + }) + } func cfgMgr(t testing.TB, cfg config.Config) *config.Manager { diff --git a/server/internal/etcd/membership.go b/server/internal/etcd/membership.go index 32570893..8c18f057 100644 --- a/server/internal/etcd/membership.go +++ b/server/internal/etcd/membership.go @@ -10,6 +10,7 @@ import ( clientv3 "go.etcd.io/etcd/client/v3" "github.com/pgEdge/control-plane/server/internal/certificates" + "github.com/pgEdge/control-plane/server/internal/ds" ) var ( @@ -105,3 +106,26 @@ func getLeaderMember(ctx context.Context, client *clientv3.Client) (*etcdserverp return nil, errors.New("cluster has no leader") } + +func UpdateMemberPeerURLs(ctx context.Context, client *clientv3.Client, memberName string, newPeerURLs []string) (bool, error) { + resp, err := client.MemberList(ctx) + if err != nil { + return false, fmt.Errorf("failed to list members: %w", err) + } + + member := findMember(resp.Members, memberName) + if member == nil { + return false, fmt.Errorf("member %s not found", memberName) + } + + if ds.SetSymmetricDifference(member.PeerURLs, newPeerURLs).Size() == 0 { + return false, nil // No change needed + } + + _, err = client.MemberUpdate(ctx, member.ID, newPeerURLs) + if err != nil { + return false, fmt.Errorf("failed to update member peer URLs: %w", err) + } + + return true, nil +} diff --git a/server/internal/etcd/membership_test.go b/server/internal/etcd/membership_test.go new file mode 100644 index 00000000..563f1339 --- /dev/null +++ b/server/internal/etcd/membership_test.go @@ -0,0 +1,77 @@ +//go:build etcd_lifecycle_test + +package etcd_test + +import ( + "testing" + + "github.com/pgEdge/control-plane/server/internal/ds" + "github.com/stretchr/testify/assert" +) + +func TestUrlsEqual(t *testing.T) { + t.Parallel() + + tests := []struct { + name string + a []string + b []string + expected bool + }{ + { + name: "same URLs same order", + a: []string{"https://192.168.1.2:2380", "https://host-1:2380"}, + b: []string{"https://192.168.1.2:2380", "https://host-1:2380"}, + expected: true, + }, + { + name: "same URLs different order", + a: []string{"https://192.168.1.2:2380", "https://host-1:2380"}, + b: []string{"https://host-1:2380", "https://192.168.1.2:2380"}, + expected: true, + }, + { + name: "different URLs", + a: []string{"https://192.168.1.2:2380"}, + b: []string{"https://192.168.1.3:2380"}, + expected: false, + }, + { + name: "different lengths", + a: []string{"https://192.168.1.2:2380"}, + b: []string{"https://192.168.1.2:2380", "https://host-1:2380"}, + expected: false, + }, + { + name: "both empty", + a: []string{}, + b: []string{}, + expected: true, + }, + { + name: "one empty", + a: []string{"https://192.168.1.2:2380"}, + b: []string{}, + expected: false, + }, + { + name: "both nil", + a: nil, + b: nil, + expected: true, + }, + { + name: "one nil one empty", + a: nil, + b: []string{}, + expected: true, + }, + } + + for _, tt := range tests { + t.Run(tt.name, func(t *testing.T) { + result := ds.SetSymmetricDifference(tt.a, tt.b).Size() == 0 + assert.Equal(t, tt.expected, result) + }) + } +}