Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
3 changes: 3 additions & 0 deletions changes/unreleased/Added-20260122-185129.yaml
Original file line number Diff line number Diff line change
@@ -0,0 +1,3 @@
kind: Added
body: Improved etcd resilience by updating peer URLs when host IPs or hostnames change.
time: 2026-01-22T18:51:29.295792+05:30
6 changes: 6 additions & 0 deletions server/internal/ds/set.go
Original file line number Diff line number Diff line change
Expand Up @@ -125,3 +125,9 @@ func (s Set[T]) ToSortedSlice(cmp func(a T, b T) int) []T {
func SetDifference[T comparable](a, b []T) Set[T] {
return NewSet(a...).Difference(NewSet(b...))
}

// SetSymmetricDifference (a-b)∪(b-a) returns the set of elements that are in
// either a or b, but not both.
func SetSymmetricDifference[T comparable](a, b []T) Set[T] {
return NewSet(a...).SymmetricDifference(NewSet(b...))
}
28 changes: 24 additions & 4 deletions server/internal/etcd/embedded.go
Original file line number Diff line number Diff line change
Expand Up @@ -171,6 +171,13 @@ func (e *EmbeddedEtcd) start(ctx context.Context) error {
return fmt.Errorf("failed to get internal etcd client: %w", err)
}

urlsChanged, updateErr := UpdateMemberPeerURLs(ctx, client, appCfg.HostID, e.PeerEndpoints())
if updateErr != nil {
e.logger.Warn().Err(updateErr).Msg("failed to update peer URLs after IP/hostname change")
} else if urlsChanged {
e.logger.Info().Msg("peer URLs updated due to IP/hostname change")
}

e.certSvc, err = certificateService(ctx, e.cfg.Config(), client)
if err != nil {
return err
Expand Down Expand Up @@ -237,7 +244,10 @@ func (e *EmbeddedEtcd) Join(ctx context.Context, options JoinOptions) error {
// This server will have an empty member name in the members list, so we
// add this server to the list separately.
name := e.cfg.Config().HostID
peers := []string{fmt.Sprintf("%s=%s", name, e.PeerEndpoints()[0])}
var peers []string
for _, peerURL := range e.PeerEndpoints() {
peers = append(peers, fmt.Sprintf("%s=%s", name, peerURL))
}
for _, m := range members.Members {
// Empty name indicates a member that hasn't started, including this
// server.
Expand All @@ -247,7 +257,9 @@ func (e *EmbeddedEtcd) Join(ctx context.Context, options JoinOptions) error {
if len(m.PeerURLs) < 1 {
return fmt.Errorf("member %q has no peer URLs", m.Name)
}
peers = append(peers, fmt.Sprintf("%s=%s", m.Name, m.PeerURLs[0]))
for _, peerURL := range m.PeerURLs {
peers = append(peers, fmt.Sprintf("%s=%s", m.Name, peerURL))
}
}
etcdCfg.InitialCluster = strings.Join(peers, ",")
etcdCfg.ClusterState = embed.ClusterStateFlagExisting
Expand Down Expand Up @@ -307,15 +319,19 @@ func (e *EmbeddedEtcd) Error() <-chan error {

func (e *EmbeddedEtcd) ClientEndpoints() []string {
appCfg := e.cfg.Config()
clientPort := appCfg.EtcdServer.ClientPort
return []string{
fmt.Sprintf("https://%s:%d", appCfg.IPv4Address, appCfg.EtcdServer.ClientPort),
fmt.Sprintf("https://%s:%d", appCfg.IPv4Address, clientPort),
fmt.Sprintf("https://%s:%d", appCfg.Hostname, clientPort),
}
}

func (e *EmbeddedEtcd) PeerEndpoints() []string {
appCfg := e.cfg.Config()
peerPort := appCfg.EtcdServer.PeerPort
return []string{
fmt.Sprintf("https://%s:%d", appCfg.IPv4Address, appCfg.EtcdServer.PeerPort),
fmt.Sprintf("https://%s:%d", appCfg.IPv4Address, peerPort),
fmt.Sprintf("https://%s:%d", appCfg.Hostname, peerPort),
}
}

Expand Down Expand Up @@ -637,13 +653,17 @@ func embedConfig(cfg config.Config, logger zerolog.Logger) (*embed.Config, error
}
c.AdvertiseClientUrls = []url.URL{
{Scheme: "https", Host: fmt.Sprintf("%s:%d", myIP, clientPort)},
{Scheme: "https", Host: fmt.Sprintf("%s:%d", cfg.Hostname, clientPort)},
}

c.ListenPeerUrls = []url.URL{
{Scheme: "https", Host: fmt.Sprintf("0.0.0.0:%d", peerPort)},
}
c.AdvertisePeerUrls = []url.URL{
{Scheme: "https", Host: fmt.Sprintf("%s:%d", myIP, peerPort)},
{Scheme: "https", Host: fmt.Sprintf("%s:%d", cfg.Hostname, peerPort)},
}

// This will get overridden when joining an existing cluster
c.InitialCluster = fmt.Sprintf(
"%s=http://%s:%d",
Expand Down
54 changes: 54 additions & 0 deletions server/internal/etcd/embedded_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -367,6 +367,60 @@ func TestEmbeddedEtcd(t *testing.T) {
err = serverA.RemoveHost(ctx, cfgB.HostID)
require.ErrorIs(t, err, etcd.ErrMinimumClusterSize)
})

t.Run("endpoints include both IP and hostname URLs", func(t *testing.T) {
ctx := t.Context()

cfg := config.Config{
HostID: uuid.NewString(),
DataDir: t.TempDir(),
EtcdMode: config.EtcdModeServer,
IPv4Address: "127.0.0.1",
Hostname: "localhost",
EtcdClient: config.EtcdClient{
LogLevel: "debug",
},
EtcdServer: config.EtcdServer{
LogLevel: "debug",
ClientPort: storagetest.GetFreePort(t),
PeerPort: storagetest.GetFreePort(t),
},
}

server := etcd.NewEmbeddedEtcd(cfgMgr(t, cfg), testutils.Logger(t))
require.NotNil(t, server)

err := server.Start(ctx)
require.NoError(t, err)
t.Cleanup(func() {
server.Shutdown()
})

// Verify PeerEndpoints includes both IP and hostname
peerEndpoints := server.PeerEndpoints()
assert.Len(t, peerEndpoints, 2, "PeerEndpoints should include both IP and hostname URLs")
assert.Contains(t, peerEndpoints[0], "127.0.0.1", "First peer endpoint should contain IP")
assert.Contains(t, peerEndpoints[1], "localhost", "Second peer endpoint should contain hostname")

// Verify ClientEndpoints includes both IP and hostname
clientEndpoints := server.ClientEndpoints()
assert.Len(t, clientEndpoints, 2, "ClientEndpoints should include both IP and hostname URLs")
assert.Contains(t, clientEndpoints[0], "127.0.0.1", "First client endpoint should contain IP")
assert.Contains(t, clientEndpoints[1], "localhost", "Second client endpoint should contain hostname")

// Verify the etcd member list contains both peer URLs
client, err := server.GetClient()
require.NoError(t, err)

members, err := client.MemberList(ctx)
require.NoError(t, err)
require.Len(t, members.Members, 1)

member := members.Members[0]
assert.Len(t, member.PeerURLs, 2, "Member should have both IP and hostname peer URLs")
assert.Len(t, member.ClientURLs, 2, "Member should have both IP and hostname client URLs")
})

}

func cfgMgr(t testing.TB, cfg config.Config) *config.Manager {
Expand Down
24 changes: 24 additions & 0 deletions server/internal/etcd/membership.go
Original file line number Diff line number Diff line change
Expand Up @@ -10,6 +10,7 @@ import (
clientv3 "go.etcd.io/etcd/client/v3"

"github.com/pgEdge/control-plane/server/internal/certificates"
"github.com/pgEdge/control-plane/server/internal/ds"
)

var (
Expand Down Expand Up @@ -105,3 +106,26 @@ func getLeaderMember(ctx context.Context, client *clientv3.Client) (*etcdserverp

return nil, errors.New("cluster has no leader")
}

func UpdateMemberPeerURLs(ctx context.Context, client *clientv3.Client, memberName string, newPeerURLs []string) (bool, error) {
resp, err := client.MemberList(ctx)
if err != nil {
return false, fmt.Errorf("failed to list members: %w", err)
}

member := findMember(resp.Members, memberName)
if member == nil {
return false, fmt.Errorf("member %s not found", memberName)
}

if ds.SetSymmetricDifference(member.PeerURLs, newPeerURLs).Size() == 0 {
return false, nil // No change needed
}

_, err = client.MemberUpdate(ctx, member.ID, newPeerURLs)
if err != nil {
return false, fmt.Errorf("failed to update member peer URLs: %w", err)
}

return true, nil
}
77 changes: 77 additions & 0 deletions server/internal/etcd/membership_test.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,77 @@
//go:build etcd_lifecycle_test

package etcd_test

import (
"testing"

"github.com/pgEdge/control-plane/server/internal/ds"
"github.com/stretchr/testify/assert"
)

func TestUrlsEqual(t *testing.T) {
t.Parallel()

tests := []struct {
name string
a []string
b []string
expected bool
}{
{
name: "same URLs same order",
a: []string{"https://192.168.1.2:2380", "https://host-1:2380"},
b: []string{"https://192.168.1.2:2380", "https://host-1:2380"},
expected: true,
},
{
name: "same URLs different order",
a: []string{"https://192.168.1.2:2380", "https://host-1:2380"},
b: []string{"https://host-1:2380", "https://192.168.1.2:2380"},
expected: true,
},
{
name: "different URLs",
a: []string{"https://192.168.1.2:2380"},
b: []string{"https://192.168.1.3:2380"},
expected: false,
},
{
name: "different lengths",
a: []string{"https://192.168.1.2:2380"},
b: []string{"https://192.168.1.2:2380", "https://host-1:2380"},
expected: false,
},
{
name: "both empty",
a: []string{},
b: []string{},
expected: true,
},
{
name: "one empty",
a: []string{"https://192.168.1.2:2380"},
b: []string{},
expected: false,
},
{
name: "both nil",
a: nil,
b: nil,
expected: true,
},
{
name: "one nil one empty",
a: nil,
b: []string{},
expected: true,
},
}

for _, tt := range tests {
t.Run(tt.name, func(t *testing.T) {
result := ds.SetSymmetricDifference(tt.a, tt.b).Size() == 0
assert.Equal(t, tt.expected, result)
})
}
}