From b37af8f31d5e8fb53430dbf178d164addaebdb0d Mon Sep 17 00:00:00 2001 From: Siva Date: Tue, 20 Jan 2026 18:57:28 +0530 Subject: [PATCH 1/4] feat(etcd): enable hostname-based connectivity when IP is unreachable --- changes/unreleased/Added-20260120-185700.yaml | 3 + clustertest/host_test.go | 38 +++- clustertest/ip_hostname_test.go | 195 ++++++++++++++++++ server/internal/etcd/embedded.go | 85 ++++++-- 4 files changed, 296 insertions(+), 25 deletions(-) create mode 100644 changes/unreleased/Added-20260120-185700.yaml create mode 100644 clustertest/ip_hostname_test.go diff --git a/changes/unreleased/Added-20260120-185700.yaml b/changes/unreleased/Added-20260120-185700.yaml new file mode 100644 index 00000000..3b15f5a9 --- /dev/null +++ b/changes/unreleased/Added-20260120-185700.yaml @@ -0,0 +1,3 @@ +kind: Added +body: Enabled hostname-based connectivity when IP is unreachable +time: 2026-01-20T18:57:00.979755+05:30 diff --git a/clustertest/host_test.go b/clustertest/host_test.go index c559f31f..856a8193 100644 --- a/clustertest/host_test.go +++ b/clustertest/host_test.go @@ -44,6 +44,10 @@ type Host struct { container testcontainers.Container } +type RecreateConfig struct { + EtcdMode EtcdMode + ExtraEnv map[string]string +} func NewHost(t testing.TB, config HostConfig) *Host { t.Helper() @@ -204,13 +208,18 @@ func (h *Host) GetEtcdMode(t testing.TB, cli client.Client) string { return "" } -// RecreateWithMode stops the current container and recreates it with a new etcd mode. -// This simulates changing the PGEDGE_ETCD_MODE environment variable and restarting. +// Recreate stops the current container and recreates it with the given configuration. +// This is useful for testing configuration changes that require a restart. // The new container will be cleaned up by the original cleanup registered in NewHost. -func (h *Host) RecreateWithMode(t testing.TB, newMode EtcdMode) { +func (h *Host) Recreate(t testing.TB, cfg RecreateConfig) { t.Helper() - tLogf(t, "recreating host %s with etcd mode %s", h.id, newMode) + etcdMode := cfg.EtcdMode + if etcdMode == "" { + etcdMode = EtcdModeServer + } + + tLogf(t, "recreating host %s with etcd mode %s, extra env: %v", h.id, etcdMode, cfg.ExtraEnv) // Stop the current container ctx, cancel := context.WithTimeout(context.Background(), 30*time.Second) @@ -230,7 +239,7 @@ func (h *Host) RecreateWithMode(t testing.TB, newMode EtcdMode) { var ports []int - switch newMode { + switch etcdMode { case EtcdModeClient: ports = allocatePorts(t, 1) env["PGEDGE_ETCD_MODE"] = "client" @@ -240,11 +249,14 @@ func (h *Host) RecreateWithMode(t testing.TB, newMode EtcdMode) { env["PGEDGE_ETCD_SERVER__PEER_PORT"] = strconv.Itoa(ports[1]) env["PGEDGE_ETCD_SERVER__CLIENT_PORT"] = strconv.Itoa(ports[2]) default: - t.Fatalf("unrecognized etcd mode '%s'", newMode) + t.Fatalf("unrecognized etcd mode '%s'", etcdMode) } env["PGEDGE_HTTP__PORT"] = strconv.Itoa(ports[0]) + // Apply extra env overrides last + maps.Copy(env, cfg.ExtraEnv) + req := testcontainers.ContainerRequest{ AlwaysPullImage: true, Image: testConfig.imageTag, @@ -270,7 +282,7 @@ func (h *Host) RecreateWithMode(t testing.TB, newMode EtcdMode) { WithStartupTimeout(60 * time.Second), } - tLogf(t, "starting host %s with new mode %s", h.id, newMode) + tLogf(t, "starting host %s", h.id) newContainer, err := testcontainers.GenericContainer( t.Context(), @@ -288,6 +300,18 @@ func (h *Host) RecreateWithMode(t testing.TB, newMode EtcdMode) { h.port = ports[0] } +// RecreateWithMode is a convenience wrapper for Recreate that only changes the etcd mode. +func (h *Host) RecreateWithMode(t testing.TB, newMode EtcdMode) { + t.Helper() + h.Recreate(t, RecreateConfig{EtcdMode: newMode}) +} + +// RecreateWithEnv is a convenience wrapper for Recreate that only adds extra environment variables. +func (h *Host) RecreateWithEnv(t testing.TB, extraEnv map[string]string) { + t.Helper() + h.Recreate(t, RecreateConfig{ExtraEnv: extraEnv}) +} + func containerLogs(ctx context.Context, t testing.TB, container testcontainers.Container) (string, error) { t.Helper() diff --git a/clustertest/ip_hostname_test.go b/clustertest/ip_hostname_test.go new file mode 100644 index 00000000..957066dc --- /dev/null +++ b/clustertest/ip_hostname_test.go @@ -0,0 +1,195 @@ +//go:build cluster_test + +package clustertest + +import ( + "testing" + "time" + + "github.com/stretchr/testify/assert" + "github.com/stretchr/testify/require" +) + +func TestUseHostnameWhenIPUnreachable(t *testing.T) { + tLog(t, "initializing cluster with 3 etcd servers") + + cluster := NewCluster(t, ClusterConfig{ + Hosts: []HostConfig{ + {ID: "host-1"}, + {ID: "host-2"}, + {ID: "host-3"}, + }, + }) + cluster.Init(t) + cluster.AssertHealthy(t) + + tLog(t, "verifying initial cluster has 3 healthy hosts") + initialHostCount := countHealthyHosts(t, cluster) + require.Equal(t, 3, initialHostCount, "should have 3 healthy hosts initially") + + resp, err := cluster.Client().ListHosts(t.Context()) + require.NoError(t, err) + for _, host := range resp.Hosts { + tLogf(t, "host %s: ipv4_address=%s, hostname=%s", host.ID, host.Ipv4Address, host.Hostname) + } + + tLog(t, "test passed: cluster initialized successfully with automatic IP detection") +} + +func TestRestartedHostUsesHostnameIfIPUnreachable(t *testing.T) { + tLog(t, "initializing cluster with 3 etcd servers") + + cluster := NewCluster(t, ClusterConfig{ + Hosts: []HostConfig{ + {ID: "host-1"}, + {ID: "host-2"}, + {ID: "host-3"}, + }, + }) + cluster.Init(t) + cluster.AssertHealthy(t) + + tLog(t, "verifying initial cluster has 3 healthy hosts") + initialHostCount := countHealthyHosts(t, cluster) + require.Equal(t, 3, initialHostCount, "should have 3 healthy hosts initially") + + host3 := cluster.Host("host-3") + + tLog(t, "stopping host-3") + host3.Stop(t) + + tLog(t, "recreating host-3 with an unreachable IP address (simulating IP change)") + + host3.RecreateWithEnv(t, map[string]string{ + "PGEDGE_IPV4_ADDRESS": "10.99.99.99", + }) + + cluster.RefreshClient(t) + + tLog(t, "waiting for all hosts to become healthy") + waitForHostsHealthy(t, cluster, 3, 2*time.Minute) + + tLog(t, "verifying cluster health after host-3 restart with changed IP") + cluster.AssertHealthy(t) + + tLog(t, "verifying host-3 is reporting the new IP address") + resp, err := cluster.Client().ListHosts(t.Context()) + require.NoError(t, err) + + var host3Found bool + for _, host := range resp.Hosts { + if string(host.ID) == "host-3" { + host3Found = true + // The host should report the configured IP, but communication + // should work via hostname since the IP is unreachable. + assert.Equal(t, "10.99.99.99", host.Ipv4Address, + "host-3 should report the configured IP address") + tLogf(t, "host-3: ipv4_address=%s, hostname=%s, state=%s", + host.Ipv4Address, host.Hostname, host.Status.State) + } + } + require.True(t, host3Found, "host-3 should be in the cluster") + + tLog(t, "verifying all 3 hosts remain healthy after IP change") + finalHostCount := countHealthyHosts(t, cluster) + assert.Equal(t, 3, finalHostCount, "should have 3 healthy hosts after IP change") + + tLog(t, "test passed: cluster remains healthy after host IP change with hostname") +} + +func TestMultipleHostsUseHostnameWhenIPUnreachable(t *testing.T) { + tLog(t, "initializing cluster with 3 etcd servers") + + cluster := NewCluster(t, ClusterConfig{ + Hosts: []HostConfig{ + {ID: "host-1"}, + {ID: "host-2"}, + {ID: "host-3"}, + }, + }) + cluster.Init(t) + cluster.AssertHealthy(t) + + tLog(t, "verifying initial cluster has 3 healthy hosts") + initialHostCount := countHealthyHosts(t, cluster) + require.Equal(t, 3, initialHostCount, "should have 3 healthy hosts initially") + + tLog(t, "recreating host-2 with an unreachable IP") + host2 := cluster.Host("host-2") + host2.Recreate(t, RecreateConfig{ + ExtraEnv: map[string]string{ + "PGEDGE_IPV4_ADDRESS": "10.88.88.88", + }, + }) + + tLog(t, "recreating host-3 with an unreachable IP") + host3 := cluster.Host("host-3") + host3.Recreate(t, RecreateConfig{ + ExtraEnv: map[string]string{ + "PGEDGE_IPV4_ADDRESS": "10.99.99.99", + }, + }) + + cluster.RefreshClient(t) + + tLog(t, "waiting for all hosts to become healthy after IP changes") + waitForHostsHealthy(t, cluster, 3, 2*time.Minute) + + cluster.AssertHealthy(t) + + tLog(t, "verifying all 3 hosts are healthy after IP changes") + hostCount := countHealthyHosts(t, cluster) + require.Equal(t, 3, hostCount, "should have 3 healthy hosts") + + resp, err := cluster.Client().ListHosts(t.Context()) + require.NoError(t, err) + + for _, host := range resp.Hosts { + tLogf(t, "host %s: ipv4_address=%s, hostname=%s, state=%s", + host.ID, host.Ipv4Address, host.Hostname, host.Status.State) + assert.Equal(t, "healthy", host.Status.State, + "host %s should be healthy", host.ID) + } + + tLog(t, "test passed: cluster healthy after multiple hosts changed IPs") +} + +func TestEtcdWriteReadUsesHostnameWhenIPUnreachable(t *testing.T) { + tLog(t, "initializing cluster with 3 etcd servers") + + cluster := NewCluster(t, ClusterConfig{ + Hosts: []HostConfig{ + {ID: "host-1"}, + {ID: "host-2"}, + {ID: "host-3"}, + }, + }) + cluster.Init(t) + cluster.AssertHealthy(t) + + tLog(t, "changing host-2's IP to an unreachable address") + host2 := cluster.Host("host-2") + host2.Recreate(t, RecreateConfig{ + ExtraEnv: map[string]string{ + "PGEDGE_IPV4_ADDRESS": "10.88.88.88", + }, + }) + + cluster.RefreshClient(t) + waitForHostsHealthy(t, cluster, 3, 2*time.Minute) + + tLog(t, "verifying cluster is healthy after host-2 IP change") + cluster.AssertHealthy(t) + + resp, err := cluster.Client().ListHosts(t.Context()) + require.NoError(t, err) + + for _, host := range resp.Hosts { + tLogf(t, "host %s: ipv4_address=%s, hostname=%s, state=%s", + host.ID, host.Ipv4Address, host.Hostname, host.Status.State) + assert.Equal(t, "healthy", host.Status.State, + "host %s should be healthy", host.ID) + } + + tLog(t, "test passed: etcd operations work with hostname") +} diff --git a/server/internal/etcd/embedded.go b/server/internal/etcd/embedded.go index bf49606b..b8e172e6 100644 --- a/server/internal/etcd/embedded.go +++ b/server/internal/etcd/embedded.go @@ -4,6 +4,7 @@ import ( "context" "errors" "fmt" + "net" "net/url" "os" "path/filepath" @@ -32,13 +33,14 @@ var _ do.Shutdownable = (*EmbeddedEtcd)(nil) const quotaBackendBytes = 8 * 1024 * 1024 * 1024 // 8GB type EmbeddedEtcd struct { - mu sync.Mutex - certSvc *certificates.Service - client *clientv3.Client - etcd *embed.Etcd - logger zerolog.Logger - cfg *config.Manager - initialized chan struct{} + mu sync.Mutex + certSvc *certificates.Service + client *clientv3.Client + etcd *embed.Etcd + logger zerolog.Logger + cfg *config.Manager + initialized chan struct{} + advertiseHost string // Cached advertise host (IP or hostname) decided at startup } func NewEmbeddedEtcd(cfg *config.Manager, logger zerolog.Logger) *EmbeddedEtcd { @@ -156,10 +158,12 @@ func (e *EmbeddedEtcd) initialize(ctx context.Context) error { func (e *EmbeddedEtcd) start(ctx context.Context) error { appCfg := e.cfg.Config() - etcdCfg, err := embedConfig(appCfg, e.logger) + etcdCfg, advertiseHost, err := embedConfig(appCfg, e.logger) if err != nil { return fmt.Errorf("failed to initialize embedded etcd config: %w", err) } + e.advertiseHost = advertiseHost + etcd, err := startEmbedded(ctx, etcdCfg) if err != nil { return fmt.Errorf("failed to start etcd: %w", err) @@ -197,10 +201,11 @@ func (e *EmbeddedEtcd) Join(ctx context.Context, options JoinOptions) error { appCfg := e.cfg.Config() - etcdCfg, err := embedConfig(appCfg, e.logger) + etcdCfg, advertiseHost, err := embedConfig(appCfg, e.logger) if err != nil { return fmt.Errorf("failed to initialize embedded etcd config: %w", err) } + e.advertiseHost = advertiseHost clientCfg, err := clientConfig(e.cfg.Config(), e.logger, options.Leader.ClientURLs...) if err != nil { @@ -307,15 +312,24 @@ func (e *EmbeddedEtcd) Error() <-chan error { func (e *EmbeddedEtcd) ClientEndpoints() []string { appCfg := e.cfg.Config() + host := e.advertiseHost + if host == "" { + host = appCfg.IPv4Address + } return []string{ - fmt.Sprintf("https://%s:%d", appCfg.IPv4Address, appCfg.EtcdServer.ClientPort), + fmt.Sprintf("https://%s:%d", host, appCfg.EtcdServer.ClientPort), } } func (e *EmbeddedEtcd) PeerEndpoints() []string { appCfg := e.cfg.Config() + host := e.advertiseHost + if host == "" { + // Fallback if not set (shouldn't happen after Start) + host = appCfg.IPv4Address + } return []string{ - fmt.Sprintf("https://%s:%d", appCfg.IPv4Address, appCfg.EtcdServer.PeerPort), + fmt.Sprintf("https://%s:%d", host, appCfg.EtcdServer.PeerPort), } } @@ -594,10 +608,18 @@ func (e *EmbeddedEtcd) PromoteWhenReady(ctx context.Context, client *clientv3.Cl }) } -func embedConfig(cfg config.Config, logger zerolog.Logger) (*embed.Config, error) { +func embedConfig(cfg config.Config, logger zerolog.Logger) (*embed.Config, string, error) { + logger.Info(). + Str("host_id", cfg.HostID). + Str("ipv4_address", cfg.IPv4Address). + Str("hostname", cfg.Hostname). + Int("client_port", cfg.EtcdServer.ClientPort). + Int("peer_port", cfg.EtcdServer.PeerPort). + Msg("embedded etcd with configuration") + lg, err := newZapLogger(logger, cfg.EtcdServer.LogLevel, "etcd_server") if err != nil { - return nil, fmt.Errorf("failed to initialize etcd server logger: %w", err) + return nil, "", fmt.Errorf("failed to initialize etcd server logger: %w", err) } c := embed.NewConfig() @@ -631,24 +653,39 @@ func embedConfig(cfg config.Config, logger zerolog.Logger) (*embed.Config, error clientPort := cfg.EtcdServer.ClientPort peerPort := cfg.EtcdServer.PeerPort - myIP := cfg.IPv4Address + + // Determine the advertise host: use IP if we can reach to it, otherwise use hostname. + advertiseHost := cfg.IPv4Address + if cfg.Hostname != "" && !isIPReachable(cfg.IPv4Address, logger) { + // Cannot reach to IP, so using hostname + advertiseHost = cfg.Hostname + logger.Warn(). + Str("ip_address", cfg.IPv4Address). + Str("using_host", advertiseHost). + Msg("cannot reach configured IP, using hostname for etcd advertise URLs") + } else { + logger.Info(). + Str("advertise_host", advertiseHost). + Msg("using IP address for etcd advertise URLs") + } + c.ListenClientUrls = []url.URL{ {Scheme: "https", Host: fmt.Sprintf("0.0.0.0:%d", clientPort)}, } c.AdvertiseClientUrls = []url.URL{ - {Scheme: "https", Host: fmt.Sprintf("%s:%d", myIP, clientPort)}, + {Scheme: "https", Host: fmt.Sprintf("%s:%d", advertiseHost, clientPort)}, } c.ListenPeerUrls = []url.URL{ {Scheme: "https", Host: fmt.Sprintf("0.0.0.0:%d", peerPort)}, } c.AdvertisePeerUrls = []url.URL{ - {Scheme: "https", Host: fmt.Sprintf("%s:%d", myIP, peerPort)}, + {Scheme: "https", Host: fmt.Sprintf("%s:%d", advertiseHost, peerPort)}, } // This will get overridden when joining an existing cluster c.InitialCluster = fmt.Sprintf( "%s=http://%s:%d", cfg.HostID, - myIP, + advertiseHost, peerPort, ) // Using a large number here as a precaution. We're unlikely to hit this, @@ -658,7 +695,7 @@ func embedConfig(cfg config.Config, logger zerolog.Logger) (*embed.Config, error c.MaxRequestBytes = 10 * 1024 * 1024 // 10MB c.QuotaBackendBytes = quotaBackendBytes - return c, nil + return c, advertiseHost, nil } func initializationConfig(cfg config.Config, logger zerolog.Logger) (*embed.Config, error) { @@ -732,3 +769,15 @@ func clientForEmbedded(cfg config.Config, logger zerolog.Logger, etcd *embed.Etc return client, nil } + +func isIPReachable(ip string, logger zerolog.Logger) bool { + addr := fmt.Sprintf("%s:0", ip) + listener, err := net.Listen("tcp", addr) + if err != nil { + logger.Warn().Str("ip", ip).Err(err).Msg("cannot reach IP address") + return false + } + listener.Close() + logger.Info().Str("ip", ip).Msg("successfully reachable to IP address") + return true +} From c352245e6bebb9b4c7d59500c1535820a225742b Mon Sep 17 00:00:00 2001 From: Siva Date: Wed, 21 Jan 2026 23:50:22 +0530 Subject: [PATCH 2/4] reverting current changes --- changes/unreleased/Added-20260120-185700.yaml | 3 - clustertest/host_test.go | 38 +--- clustertest/ip_hostname_test.go | 195 ------------------ server/internal/etcd/embedded.go | 85 ++------ 4 files changed, 25 insertions(+), 296 deletions(-) delete mode 100644 changes/unreleased/Added-20260120-185700.yaml delete mode 100644 clustertest/ip_hostname_test.go diff --git a/changes/unreleased/Added-20260120-185700.yaml b/changes/unreleased/Added-20260120-185700.yaml deleted file mode 100644 index 3b15f5a9..00000000 --- a/changes/unreleased/Added-20260120-185700.yaml +++ /dev/null @@ -1,3 +0,0 @@ -kind: Added -body: Enabled hostname-based connectivity when IP is unreachable -time: 2026-01-20T18:57:00.979755+05:30 diff --git a/clustertest/host_test.go b/clustertest/host_test.go index 856a8193..c559f31f 100644 --- a/clustertest/host_test.go +++ b/clustertest/host_test.go @@ -44,10 +44,6 @@ type Host struct { container testcontainers.Container } -type RecreateConfig struct { - EtcdMode EtcdMode - ExtraEnv map[string]string -} func NewHost(t testing.TB, config HostConfig) *Host { t.Helper() @@ -208,18 +204,13 @@ func (h *Host) GetEtcdMode(t testing.TB, cli client.Client) string { return "" } -// Recreate stops the current container and recreates it with the given configuration. -// This is useful for testing configuration changes that require a restart. +// RecreateWithMode stops the current container and recreates it with a new etcd mode. +// This simulates changing the PGEDGE_ETCD_MODE environment variable and restarting. // The new container will be cleaned up by the original cleanup registered in NewHost. -func (h *Host) Recreate(t testing.TB, cfg RecreateConfig) { +func (h *Host) RecreateWithMode(t testing.TB, newMode EtcdMode) { t.Helper() - etcdMode := cfg.EtcdMode - if etcdMode == "" { - etcdMode = EtcdModeServer - } - - tLogf(t, "recreating host %s with etcd mode %s, extra env: %v", h.id, etcdMode, cfg.ExtraEnv) + tLogf(t, "recreating host %s with etcd mode %s", h.id, newMode) // Stop the current container ctx, cancel := context.WithTimeout(context.Background(), 30*time.Second) @@ -239,7 +230,7 @@ func (h *Host) Recreate(t testing.TB, cfg RecreateConfig) { var ports []int - switch etcdMode { + switch newMode { case EtcdModeClient: ports = allocatePorts(t, 1) env["PGEDGE_ETCD_MODE"] = "client" @@ -249,14 +240,11 @@ func (h *Host) Recreate(t testing.TB, cfg RecreateConfig) { env["PGEDGE_ETCD_SERVER__PEER_PORT"] = strconv.Itoa(ports[1]) env["PGEDGE_ETCD_SERVER__CLIENT_PORT"] = strconv.Itoa(ports[2]) default: - t.Fatalf("unrecognized etcd mode '%s'", etcdMode) + t.Fatalf("unrecognized etcd mode '%s'", newMode) } env["PGEDGE_HTTP__PORT"] = strconv.Itoa(ports[0]) - // Apply extra env overrides last - maps.Copy(env, cfg.ExtraEnv) - req := testcontainers.ContainerRequest{ AlwaysPullImage: true, Image: testConfig.imageTag, @@ -282,7 +270,7 @@ func (h *Host) Recreate(t testing.TB, cfg RecreateConfig) { WithStartupTimeout(60 * time.Second), } - tLogf(t, "starting host %s", h.id) + tLogf(t, "starting host %s with new mode %s", h.id, newMode) newContainer, err := testcontainers.GenericContainer( t.Context(), @@ -300,18 +288,6 @@ func (h *Host) Recreate(t testing.TB, cfg RecreateConfig) { h.port = ports[0] } -// RecreateWithMode is a convenience wrapper for Recreate that only changes the etcd mode. -func (h *Host) RecreateWithMode(t testing.TB, newMode EtcdMode) { - t.Helper() - h.Recreate(t, RecreateConfig{EtcdMode: newMode}) -} - -// RecreateWithEnv is a convenience wrapper for Recreate that only adds extra environment variables. -func (h *Host) RecreateWithEnv(t testing.TB, extraEnv map[string]string) { - t.Helper() - h.Recreate(t, RecreateConfig{ExtraEnv: extraEnv}) -} - func containerLogs(ctx context.Context, t testing.TB, container testcontainers.Container) (string, error) { t.Helper() diff --git a/clustertest/ip_hostname_test.go b/clustertest/ip_hostname_test.go deleted file mode 100644 index 957066dc..00000000 --- a/clustertest/ip_hostname_test.go +++ /dev/null @@ -1,195 +0,0 @@ -//go:build cluster_test - -package clustertest - -import ( - "testing" - "time" - - "github.com/stretchr/testify/assert" - "github.com/stretchr/testify/require" -) - -func TestUseHostnameWhenIPUnreachable(t *testing.T) { - tLog(t, "initializing cluster with 3 etcd servers") - - cluster := NewCluster(t, ClusterConfig{ - Hosts: []HostConfig{ - {ID: "host-1"}, - {ID: "host-2"}, - {ID: "host-3"}, - }, - }) - cluster.Init(t) - cluster.AssertHealthy(t) - - tLog(t, "verifying initial cluster has 3 healthy hosts") - initialHostCount := countHealthyHosts(t, cluster) - require.Equal(t, 3, initialHostCount, "should have 3 healthy hosts initially") - - resp, err := cluster.Client().ListHosts(t.Context()) - require.NoError(t, err) - for _, host := range resp.Hosts { - tLogf(t, "host %s: ipv4_address=%s, hostname=%s", host.ID, host.Ipv4Address, host.Hostname) - } - - tLog(t, "test passed: cluster initialized successfully with automatic IP detection") -} - -func TestRestartedHostUsesHostnameIfIPUnreachable(t *testing.T) { - tLog(t, "initializing cluster with 3 etcd servers") - - cluster := NewCluster(t, ClusterConfig{ - Hosts: []HostConfig{ - {ID: "host-1"}, - {ID: "host-2"}, - {ID: "host-3"}, - }, - }) - cluster.Init(t) - cluster.AssertHealthy(t) - - tLog(t, "verifying initial cluster has 3 healthy hosts") - initialHostCount := countHealthyHosts(t, cluster) - require.Equal(t, 3, initialHostCount, "should have 3 healthy hosts initially") - - host3 := cluster.Host("host-3") - - tLog(t, "stopping host-3") - host3.Stop(t) - - tLog(t, "recreating host-3 with an unreachable IP address (simulating IP change)") - - host3.RecreateWithEnv(t, map[string]string{ - "PGEDGE_IPV4_ADDRESS": "10.99.99.99", - }) - - cluster.RefreshClient(t) - - tLog(t, "waiting for all hosts to become healthy") - waitForHostsHealthy(t, cluster, 3, 2*time.Minute) - - tLog(t, "verifying cluster health after host-3 restart with changed IP") - cluster.AssertHealthy(t) - - tLog(t, "verifying host-3 is reporting the new IP address") - resp, err := cluster.Client().ListHosts(t.Context()) - require.NoError(t, err) - - var host3Found bool - for _, host := range resp.Hosts { - if string(host.ID) == "host-3" { - host3Found = true - // The host should report the configured IP, but communication - // should work via hostname since the IP is unreachable. - assert.Equal(t, "10.99.99.99", host.Ipv4Address, - "host-3 should report the configured IP address") - tLogf(t, "host-3: ipv4_address=%s, hostname=%s, state=%s", - host.Ipv4Address, host.Hostname, host.Status.State) - } - } - require.True(t, host3Found, "host-3 should be in the cluster") - - tLog(t, "verifying all 3 hosts remain healthy after IP change") - finalHostCount := countHealthyHosts(t, cluster) - assert.Equal(t, 3, finalHostCount, "should have 3 healthy hosts after IP change") - - tLog(t, "test passed: cluster remains healthy after host IP change with hostname") -} - -func TestMultipleHostsUseHostnameWhenIPUnreachable(t *testing.T) { - tLog(t, "initializing cluster with 3 etcd servers") - - cluster := NewCluster(t, ClusterConfig{ - Hosts: []HostConfig{ - {ID: "host-1"}, - {ID: "host-2"}, - {ID: "host-3"}, - }, - }) - cluster.Init(t) - cluster.AssertHealthy(t) - - tLog(t, "verifying initial cluster has 3 healthy hosts") - initialHostCount := countHealthyHosts(t, cluster) - require.Equal(t, 3, initialHostCount, "should have 3 healthy hosts initially") - - tLog(t, "recreating host-2 with an unreachable IP") - host2 := cluster.Host("host-2") - host2.Recreate(t, RecreateConfig{ - ExtraEnv: map[string]string{ - "PGEDGE_IPV4_ADDRESS": "10.88.88.88", - }, - }) - - tLog(t, "recreating host-3 with an unreachable IP") - host3 := cluster.Host("host-3") - host3.Recreate(t, RecreateConfig{ - ExtraEnv: map[string]string{ - "PGEDGE_IPV4_ADDRESS": "10.99.99.99", - }, - }) - - cluster.RefreshClient(t) - - tLog(t, "waiting for all hosts to become healthy after IP changes") - waitForHostsHealthy(t, cluster, 3, 2*time.Minute) - - cluster.AssertHealthy(t) - - tLog(t, "verifying all 3 hosts are healthy after IP changes") - hostCount := countHealthyHosts(t, cluster) - require.Equal(t, 3, hostCount, "should have 3 healthy hosts") - - resp, err := cluster.Client().ListHosts(t.Context()) - require.NoError(t, err) - - for _, host := range resp.Hosts { - tLogf(t, "host %s: ipv4_address=%s, hostname=%s, state=%s", - host.ID, host.Ipv4Address, host.Hostname, host.Status.State) - assert.Equal(t, "healthy", host.Status.State, - "host %s should be healthy", host.ID) - } - - tLog(t, "test passed: cluster healthy after multiple hosts changed IPs") -} - -func TestEtcdWriteReadUsesHostnameWhenIPUnreachable(t *testing.T) { - tLog(t, "initializing cluster with 3 etcd servers") - - cluster := NewCluster(t, ClusterConfig{ - Hosts: []HostConfig{ - {ID: "host-1"}, - {ID: "host-2"}, - {ID: "host-3"}, - }, - }) - cluster.Init(t) - cluster.AssertHealthy(t) - - tLog(t, "changing host-2's IP to an unreachable address") - host2 := cluster.Host("host-2") - host2.Recreate(t, RecreateConfig{ - ExtraEnv: map[string]string{ - "PGEDGE_IPV4_ADDRESS": "10.88.88.88", - }, - }) - - cluster.RefreshClient(t) - waitForHostsHealthy(t, cluster, 3, 2*time.Minute) - - tLog(t, "verifying cluster is healthy after host-2 IP change") - cluster.AssertHealthy(t) - - resp, err := cluster.Client().ListHosts(t.Context()) - require.NoError(t, err) - - for _, host := range resp.Hosts { - tLogf(t, "host %s: ipv4_address=%s, hostname=%s, state=%s", - host.ID, host.Ipv4Address, host.Hostname, host.Status.State) - assert.Equal(t, "healthy", host.Status.State, - "host %s should be healthy", host.ID) - } - - tLog(t, "test passed: etcd operations work with hostname") -} diff --git a/server/internal/etcd/embedded.go b/server/internal/etcd/embedded.go index b8e172e6..bf49606b 100644 --- a/server/internal/etcd/embedded.go +++ b/server/internal/etcd/embedded.go @@ -4,7 +4,6 @@ import ( "context" "errors" "fmt" - "net" "net/url" "os" "path/filepath" @@ -33,14 +32,13 @@ var _ do.Shutdownable = (*EmbeddedEtcd)(nil) const quotaBackendBytes = 8 * 1024 * 1024 * 1024 // 8GB type EmbeddedEtcd struct { - mu sync.Mutex - certSvc *certificates.Service - client *clientv3.Client - etcd *embed.Etcd - logger zerolog.Logger - cfg *config.Manager - initialized chan struct{} - advertiseHost string // Cached advertise host (IP or hostname) decided at startup + mu sync.Mutex + certSvc *certificates.Service + client *clientv3.Client + etcd *embed.Etcd + logger zerolog.Logger + cfg *config.Manager + initialized chan struct{} } func NewEmbeddedEtcd(cfg *config.Manager, logger zerolog.Logger) *EmbeddedEtcd { @@ -158,12 +156,10 @@ func (e *EmbeddedEtcd) initialize(ctx context.Context) error { func (e *EmbeddedEtcd) start(ctx context.Context) error { appCfg := e.cfg.Config() - etcdCfg, advertiseHost, err := embedConfig(appCfg, e.logger) + etcdCfg, err := embedConfig(appCfg, e.logger) if err != nil { return fmt.Errorf("failed to initialize embedded etcd config: %w", err) } - e.advertiseHost = advertiseHost - etcd, err := startEmbedded(ctx, etcdCfg) if err != nil { return fmt.Errorf("failed to start etcd: %w", err) @@ -201,11 +197,10 @@ func (e *EmbeddedEtcd) Join(ctx context.Context, options JoinOptions) error { appCfg := e.cfg.Config() - etcdCfg, advertiseHost, err := embedConfig(appCfg, e.logger) + etcdCfg, err := embedConfig(appCfg, e.logger) if err != nil { return fmt.Errorf("failed to initialize embedded etcd config: %w", err) } - e.advertiseHost = advertiseHost clientCfg, err := clientConfig(e.cfg.Config(), e.logger, options.Leader.ClientURLs...) if err != nil { @@ -312,24 +307,15 @@ func (e *EmbeddedEtcd) Error() <-chan error { func (e *EmbeddedEtcd) ClientEndpoints() []string { appCfg := e.cfg.Config() - host := e.advertiseHost - if host == "" { - host = appCfg.IPv4Address - } return []string{ - fmt.Sprintf("https://%s:%d", host, appCfg.EtcdServer.ClientPort), + fmt.Sprintf("https://%s:%d", appCfg.IPv4Address, appCfg.EtcdServer.ClientPort), } } func (e *EmbeddedEtcd) PeerEndpoints() []string { appCfg := e.cfg.Config() - host := e.advertiseHost - if host == "" { - // Fallback if not set (shouldn't happen after Start) - host = appCfg.IPv4Address - } return []string{ - fmt.Sprintf("https://%s:%d", host, appCfg.EtcdServer.PeerPort), + fmt.Sprintf("https://%s:%d", appCfg.IPv4Address, appCfg.EtcdServer.PeerPort), } } @@ -608,18 +594,10 @@ func (e *EmbeddedEtcd) PromoteWhenReady(ctx context.Context, client *clientv3.Cl }) } -func embedConfig(cfg config.Config, logger zerolog.Logger) (*embed.Config, string, error) { - logger.Info(). - Str("host_id", cfg.HostID). - Str("ipv4_address", cfg.IPv4Address). - Str("hostname", cfg.Hostname). - Int("client_port", cfg.EtcdServer.ClientPort). - Int("peer_port", cfg.EtcdServer.PeerPort). - Msg("embedded etcd with configuration") - +func embedConfig(cfg config.Config, logger zerolog.Logger) (*embed.Config, error) { lg, err := newZapLogger(logger, cfg.EtcdServer.LogLevel, "etcd_server") if err != nil { - return nil, "", fmt.Errorf("failed to initialize etcd server logger: %w", err) + return nil, fmt.Errorf("failed to initialize etcd server logger: %w", err) } c := embed.NewConfig() @@ -653,39 +631,24 @@ func embedConfig(cfg config.Config, logger zerolog.Logger) (*embed.Config, strin clientPort := cfg.EtcdServer.ClientPort peerPort := cfg.EtcdServer.PeerPort - - // Determine the advertise host: use IP if we can reach to it, otherwise use hostname. - advertiseHost := cfg.IPv4Address - if cfg.Hostname != "" && !isIPReachable(cfg.IPv4Address, logger) { - // Cannot reach to IP, so using hostname - advertiseHost = cfg.Hostname - logger.Warn(). - Str("ip_address", cfg.IPv4Address). - Str("using_host", advertiseHost). - Msg("cannot reach configured IP, using hostname for etcd advertise URLs") - } else { - logger.Info(). - Str("advertise_host", advertiseHost). - Msg("using IP address for etcd advertise URLs") - } - + myIP := cfg.IPv4Address c.ListenClientUrls = []url.URL{ {Scheme: "https", Host: fmt.Sprintf("0.0.0.0:%d", clientPort)}, } c.AdvertiseClientUrls = []url.URL{ - {Scheme: "https", Host: fmt.Sprintf("%s:%d", advertiseHost, clientPort)}, + {Scheme: "https", Host: fmt.Sprintf("%s:%d", myIP, clientPort)}, } c.ListenPeerUrls = []url.URL{ {Scheme: "https", Host: fmt.Sprintf("0.0.0.0:%d", peerPort)}, } c.AdvertisePeerUrls = []url.URL{ - {Scheme: "https", Host: fmt.Sprintf("%s:%d", advertiseHost, peerPort)}, + {Scheme: "https", Host: fmt.Sprintf("%s:%d", myIP, peerPort)}, } // This will get overridden when joining an existing cluster c.InitialCluster = fmt.Sprintf( "%s=http://%s:%d", cfg.HostID, - advertiseHost, + myIP, peerPort, ) // Using a large number here as a precaution. We're unlikely to hit this, @@ -695,7 +658,7 @@ func embedConfig(cfg config.Config, logger zerolog.Logger) (*embed.Config, strin c.MaxRequestBytes = 10 * 1024 * 1024 // 10MB c.QuotaBackendBytes = quotaBackendBytes - return c, advertiseHost, nil + return c, nil } func initializationConfig(cfg config.Config, logger zerolog.Logger) (*embed.Config, error) { @@ -769,15 +732,3 @@ func clientForEmbedded(cfg config.Config, logger zerolog.Logger, etcd *embed.Etc return client, nil } - -func isIPReachable(ip string, logger zerolog.Logger) bool { - addr := fmt.Sprintf("%s:0", ip) - listener, err := net.Listen("tcp", addr) - if err != nil { - logger.Warn().Str("ip", ip).Err(err).Msg("cannot reach IP address") - return false - } - listener.Close() - logger.Info().Str("ip", ip).Msg("successfully reachable to IP address") - return true -} From dcdc2a495ec9cff4a59248c5d9b9df314da3125d Mon Sep 17 00:00:00 2001 From: Siva Date: Thu, 22 Jan 2026 18:53:44 +0530 Subject: [PATCH 3/4] feat(etcd): enable host identity changes by updating peer URLs for improved stability --- changes/unreleased/Added-20260122-185129.yaml | 3 + server/internal/etcd/embedded.go | 46 ++++++++-- server/internal/etcd/embedded_test.go | 89 +++++++++++++++++++ server/internal/etcd/membership.go | 39 ++++++++ server/internal/etcd/membership_test.go | 77 ++++++++++++++++ .../orchestrator/swarm/patroni_config.go | 25 ++++++ 6 files changed, 273 insertions(+), 6 deletions(-) create mode 100644 changes/unreleased/Added-20260122-185129.yaml create mode 100644 server/internal/etcd/membership_test.go diff --git a/changes/unreleased/Added-20260122-185129.yaml b/changes/unreleased/Added-20260122-185129.yaml new file mode 100644 index 00000000..93295749 --- /dev/null +++ b/changes/unreleased/Added-20260122-185129.yaml @@ -0,0 +1,3 @@ +kind: Added +body: Improved etcd resilience by updating peer URLs when host IPs or hostnames change. +time: 2026-01-22T18:51:29.295792+05:30 diff --git a/server/internal/etcd/embedded.go b/server/internal/etcd/embedded.go index bf49606b..239d9bfb 100644 --- a/server/internal/etcd/embedded.go +++ b/server/internal/etcd/embedded.go @@ -171,6 +171,13 @@ func (e *EmbeddedEtcd) start(ctx context.Context) error { return fmt.Errorf("failed to get internal etcd client: %w", err) } + urlsChanged, updateErr := UpdateMemberPeerURLs(ctx, client, appCfg.HostID, e.PeerEndpoints()) + if updateErr != nil { + e.logger.Warn().Err(updateErr).Msg("failed to update peer URLs after IP/hostname change") + } else if urlsChanged { + e.logger.Info().Msg("peer URLs updated due to IP/hostname change") + } + e.certSvc, err = certificateService(ctx, e.cfg.Config(), client) if err != nil { return err @@ -237,7 +244,10 @@ func (e *EmbeddedEtcd) Join(ctx context.Context, options JoinOptions) error { // This server will have an empty member name in the members list, so we // add this server to the list separately. name := e.cfg.Config().HostID - peers := []string{fmt.Sprintf("%s=%s", name, e.PeerEndpoints()[0])} + var peers []string + for _, peerURL := range e.PeerEndpoints() { + peers = append(peers, fmt.Sprintf("%s=%s", name, peerURL)) + } for _, m := range members.Members { // Empty name indicates a member that hasn't started, including this // server. @@ -247,7 +257,9 @@ func (e *EmbeddedEtcd) Join(ctx context.Context, options JoinOptions) error { if len(m.PeerURLs) < 1 { return fmt.Errorf("member %q has no peer URLs", m.Name) } - peers = append(peers, fmt.Sprintf("%s=%s", m.Name, m.PeerURLs[0])) + for _, peerURL := range m.PeerURLs { + peers = append(peers, fmt.Sprintf("%s=%s", m.Name, peerURL)) + } } etcdCfg.InitialCluster = strings.Join(peers, ",") etcdCfg.ClusterState = embed.ClusterStateFlagExisting @@ -307,16 +319,28 @@ func (e *EmbeddedEtcd) Error() <-chan error { func (e *EmbeddedEtcd) ClientEndpoints() []string { appCfg := e.cfg.Config() - return []string{ - fmt.Sprintf("https://%s:%d", appCfg.IPv4Address, appCfg.EtcdServer.ClientPort), + clientPort := appCfg.EtcdServer.ClientPort + endpoints := []string{ + fmt.Sprintf("https://%s:%d", appCfg.IPv4Address, clientPort), } + // Add hostname-based URL if hostname differs from IP + if appCfg.Hostname != "" && appCfg.Hostname != appCfg.IPv4Address { + endpoints = append(endpoints, fmt.Sprintf("https://%s:%d", appCfg.Hostname, clientPort)) + } + return endpoints } func (e *EmbeddedEtcd) PeerEndpoints() []string { appCfg := e.cfg.Config() - return []string{ - fmt.Sprintf("https://%s:%d", appCfg.IPv4Address, appCfg.EtcdServer.PeerPort), + peerPort := appCfg.EtcdServer.PeerPort + endpoints := []string{ + fmt.Sprintf("https://%s:%d", appCfg.IPv4Address, peerPort), + } + // Add hostname-based URL if hostname differs from IP + if appCfg.Hostname != "" && appCfg.Hostname != appCfg.IPv4Address { + endpoints = append(endpoints, fmt.Sprintf("https://%s:%d", appCfg.Hostname, peerPort)) } + return endpoints } func (e *EmbeddedEtcd) etcdDir() string { @@ -638,12 +662,22 @@ func embedConfig(cfg config.Config, logger zerolog.Logger) (*embed.Config, error c.AdvertiseClientUrls = []url.URL{ {Scheme: "https", Host: fmt.Sprintf("%s:%d", myIP, clientPort)}, } + // Add hostname-based URL if hostname differs from IP + if cfg.Hostname != "" && cfg.Hostname != myIP { + c.AdvertiseClientUrls = append(c.AdvertiseClientUrls, + url.URL{Scheme: "https", Host: fmt.Sprintf("%s:%d", cfg.Hostname, clientPort)}) + } c.ListenPeerUrls = []url.URL{ {Scheme: "https", Host: fmt.Sprintf("0.0.0.0:%d", peerPort)}, } c.AdvertisePeerUrls = []url.URL{ {Scheme: "https", Host: fmt.Sprintf("%s:%d", myIP, peerPort)}, } + // Add hostname-based URL if hostname differs from IP + if cfg.Hostname != "" && cfg.Hostname != myIP { + c.AdvertisePeerUrls = append(c.AdvertisePeerUrls, + url.URL{Scheme: "https", Host: fmt.Sprintf("%s:%d", cfg.Hostname, peerPort)}) + } // This will get overridden when joining an existing cluster c.InitialCluster = fmt.Sprintf( "%s=http://%s:%d", diff --git a/server/internal/etcd/embedded_test.go b/server/internal/etcd/embedded_test.go index 2a7801ba..69d42217 100644 --- a/server/internal/etcd/embedded_test.go +++ b/server/internal/etcd/embedded_test.go @@ -367,6 +367,95 @@ func TestEmbeddedEtcd(t *testing.T) { err = serverA.RemoveHost(ctx, cfgB.HostID) require.ErrorIs(t, err, etcd.ErrMinimumClusterSize) }) + + t.Run("endpoints include both IP and hostname URLs", func(t *testing.T) { + ctx := t.Context() + + cfg := config.Config{ + HostID: uuid.NewString(), + DataDir: t.TempDir(), + EtcdMode: config.EtcdModeServer, + IPv4Address: "127.0.0.1", + Hostname: "localhost", + EtcdClient: config.EtcdClient{ + LogLevel: "debug", + }, + EtcdServer: config.EtcdServer{ + LogLevel: "debug", + ClientPort: storagetest.GetFreePort(t), + PeerPort: storagetest.GetFreePort(t), + }, + } + + server := etcd.NewEmbeddedEtcd(cfgMgr(t, cfg), testutils.Logger(t)) + require.NotNil(t, server) + + err := server.Start(ctx) + require.NoError(t, err) + t.Cleanup(func() { + server.Shutdown() + }) + + // Verify PeerEndpoints includes both IP and hostname + peerEndpoints := server.PeerEndpoints() + assert.Len(t, peerEndpoints, 2, "PeerEndpoints should include both IP and hostname URLs") + assert.Contains(t, peerEndpoints[0], "127.0.0.1", "First peer endpoint should contain IP") + assert.Contains(t, peerEndpoints[1], "localhost", "Second peer endpoint should contain hostname") + + // Verify ClientEndpoints includes both IP and hostname + clientEndpoints := server.ClientEndpoints() + assert.Len(t, clientEndpoints, 2, "ClientEndpoints should include both IP and hostname URLs") + assert.Contains(t, clientEndpoints[0], "127.0.0.1", "First client endpoint should contain IP") + assert.Contains(t, clientEndpoints[1], "localhost", "Second client endpoint should contain hostname") + + // Verify the etcd member list contains both peer URLs + client, err := server.GetClient() + require.NoError(t, err) + + members, err := client.MemberList(ctx) + require.NoError(t, err) + require.Len(t, members.Members, 1) + + member := members.Members[0] + assert.Len(t, member.PeerURLs, 2, "Member should have both IP and hostname peer URLs") + assert.Len(t, member.ClientURLs, 2, "Member should have both IP and hostname client URLs") + }) + + t.Run("endpoints only include IP when hostname equals IP", func(t *testing.T) { + ctx := t.Context() + + cfg := config.Config{ + HostID: uuid.NewString(), + DataDir: t.TempDir(), + EtcdMode: config.EtcdModeServer, + IPv4Address: "127.0.0.1", + Hostname: "127.0.0.1", + EtcdClient: config.EtcdClient{ + LogLevel: "debug", + }, + EtcdServer: config.EtcdServer{ + LogLevel: "debug", + ClientPort: storagetest.GetFreePort(t), + PeerPort: storagetest.GetFreePort(t), + }, + } + + server := etcd.NewEmbeddedEtcd(cfgMgr(t, cfg), testutils.Logger(t)) + require.NotNil(t, server) + + err := server.Start(ctx) + require.NoError(t, err) + t.Cleanup(func() { + server.Shutdown() + }) + + // Should only have 1 endpoint (no duplicate when hostname equals IP) + peerEndpoints := server.PeerEndpoints() + assert.Len(t, peerEndpoints, 1, "PeerEndpoints should only have IP when hostname equals IP") + + clientEndpoints := server.ClientEndpoints() + assert.Len(t, clientEndpoints, 1, "ClientEndpoints should only have IP when hostname equals IP") + }) } func cfgMgr(t testing.TB, cfg config.Config) *config.Manager { diff --git a/server/internal/etcd/membership.go b/server/internal/etcd/membership.go index 32570893..8dbd1f28 100644 --- a/server/internal/etcd/membership.go +++ b/server/internal/etcd/membership.go @@ -105,3 +105,42 @@ func getLeaderMember(ctx context.Context, client *clientv3.Client) (*etcdserverp return nil, errors.New("cluster has no leader") } + +func UpdateMemberPeerURLs(ctx context.Context, client *clientv3.Client, memberName string, newPeerURLs []string) (bool, error) { + resp, err := client.MemberList(ctx) + if err != nil { + return false, fmt.Errorf("failed to list members: %w", err) + } + + member := findMember(resp.Members, memberName) + if member == nil { + return false, fmt.Errorf("member %s not found", memberName) + } + + if UrlsEqual(member.PeerURLs, newPeerURLs) { + return false, nil // No change needed + } + + _, err = client.MemberUpdate(ctx, member.ID, newPeerURLs) + if err != nil { + return false, fmt.Errorf("failed to update member peer URLs: %w", err) + } + + return true, nil +} + +func UrlsEqual(a, b []string) bool { + if len(a) != len(b) { + return false + } + aSet := make(map[string]struct{}, len(a)) + for _, url := range a { + aSet[url] = struct{}{} + } + for _, url := range b { + if _, ok := aSet[url]; !ok { + return false + } + } + return true +} diff --git a/server/internal/etcd/membership_test.go b/server/internal/etcd/membership_test.go new file mode 100644 index 00000000..aaa09cfb --- /dev/null +++ b/server/internal/etcd/membership_test.go @@ -0,0 +1,77 @@ +//go:build etcd_lifecycle_test + +package etcd_test + +import ( + "testing" + + "github.com/pgEdge/control-plane/server/internal/etcd" + "github.com/stretchr/testify/assert" +) + +func TestUrlsEqual(t *testing.T) { + t.Parallel() + + tests := []struct { + name string + a []string + b []string + expected bool + }{ + { + name: "same URLs same order", + a: []string{"https://192.168.1.2:2380", "https://host-1:2380"}, + b: []string{"https://192.168.1.2:2380", "https://host-1:2380"}, + expected: true, + }, + { + name: "same URLs different order", + a: []string{"https://192.168.1.2:2380", "https://host-1:2380"}, + b: []string{"https://host-1:2380", "https://192.168.1.2:2380"}, + expected: true, + }, + { + name: "different URLs", + a: []string{"https://192.168.1.2:2380"}, + b: []string{"https://192.168.1.3:2380"}, + expected: false, + }, + { + name: "different lengths", + a: []string{"https://192.168.1.2:2380"}, + b: []string{"https://192.168.1.2:2380", "https://host-1:2380"}, + expected: false, + }, + { + name: "both empty", + a: []string{}, + b: []string{}, + expected: true, + }, + { + name: "one empty", + a: []string{"https://192.168.1.2:2380"}, + b: []string{}, + expected: false, + }, + { + name: "both nil", + a: nil, + b: nil, + expected: true, + }, + { + name: "one nil one empty", + a: nil, + b: []string{}, + expected: true, + }, + } + + for _, tt := range tests { + t.Run(tt.name, func(t *testing.T) { + result := etcd.UrlsEqual(tt.a, tt.b) + assert.Equal(t, tt.expected, result) + }) + } +} diff --git a/server/internal/orchestrator/swarm/patroni_config.go b/server/internal/orchestrator/swarm/patroni_config.go index 9971a5e8..b467729f 100644 --- a/server/internal/orchestrator/swarm/patroni_config.go +++ b/server/internal/orchestrator/swarm/patroni_config.go @@ -5,11 +5,13 @@ import ( "encoding/json" "errors" "fmt" + "io" "maps" "net/url" "path/filepath" "github.com/alessio/shellescape" + "github.com/rs/zerolog/log" "github.com/samber/do" "github.com/spf13/afero" clientv3 "go.etcd.io/etcd/client/v3" @@ -170,6 +172,11 @@ func (c *PatroniConfig) Create(ctx context.Context, rc *resource.Context) error return fmt.Errorf("failed to change ownership for %s: %w", configPath, err) } + // Signal Patroni to reload config + if err := c.signalPatroniReload(ctx, rc); err != nil { + log.Warn().Err(err).Str("instance_id", c.Spec.InstanceID).Msg("failed to signal Patroni to reload config") + } + return nil } @@ -177,6 +184,24 @@ func (c *PatroniConfig) Update(ctx context.Context, rc *resource.Context) error return c.Create(ctx, rc) } +func (c *PatroniConfig) signalPatroniReload(ctx context.Context, rc *resource.Context) error { + dockerClient, err := do.Invoke[*docker.Docker](rc.Injector) + if err != nil { + return err + } + + err = PostgresContainerExec(ctx, io.Discard, dockerClient, c.Spec.InstanceID, []string{"kill", "-HUP", "1"}) + if err != nil { + // Container might not be running yet during initial creation + if errors.Is(err, ErrNoPostgresContainer) { + return nil + } + return fmt.Errorf("failed to send SIGHUP to Patroni: %w", err) + } + + return nil +} + func (c *PatroniConfig) Delete(ctx context.Context, rc *resource.Context) error { fs, err := do.Invoke[afero.Fs](rc.Injector) if err != nil { From a5b2475bf28d0f3a2faf37a53cfbf0cc1173ab05 Mon Sep 17 00:00:00 2001 From: Siva Date: Fri, 23 Jan 2026 01:04:20 +0530 Subject: [PATCH 4/4] addressing review comments --- server/internal/ds/set.go | 6 ++++ server/internal/etcd/embedded.go | 30 +++++----------- server/internal/etcd/embedded_test.go | 35 ------------------- server/internal/etcd/membership.go | 19 ++-------- server/internal/etcd/membership_test.go | 4 +-- .../orchestrator/swarm/patroni_config.go | 25 ------------- 6 files changed, 18 insertions(+), 101 deletions(-) diff --git a/server/internal/ds/set.go b/server/internal/ds/set.go index 4e414fa9..ef829c77 100644 --- a/server/internal/ds/set.go +++ b/server/internal/ds/set.go @@ -125,3 +125,9 @@ func (s Set[T]) ToSortedSlice(cmp func(a T, b T) int) []T { func SetDifference[T comparable](a, b []T) Set[T] { return NewSet(a...).Difference(NewSet(b...)) } + +// SetSymmetricDifference (a-b)∪(b-a) returns the set of elements that are in +// either a or b, but not both. +func SetSymmetricDifference[T comparable](a, b []T) Set[T] { + return NewSet(a...).SymmetricDifference(NewSet(b...)) +} diff --git a/server/internal/etcd/embedded.go b/server/internal/etcd/embedded.go index 239d9bfb..5b01e583 100644 --- a/server/internal/etcd/embedded.go +++ b/server/internal/etcd/embedded.go @@ -320,27 +320,19 @@ func (e *EmbeddedEtcd) Error() <-chan error { func (e *EmbeddedEtcd) ClientEndpoints() []string { appCfg := e.cfg.Config() clientPort := appCfg.EtcdServer.ClientPort - endpoints := []string{ + return []string{ fmt.Sprintf("https://%s:%d", appCfg.IPv4Address, clientPort), + fmt.Sprintf("https://%s:%d", appCfg.Hostname, clientPort), } - // Add hostname-based URL if hostname differs from IP - if appCfg.Hostname != "" && appCfg.Hostname != appCfg.IPv4Address { - endpoints = append(endpoints, fmt.Sprintf("https://%s:%d", appCfg.Hostname, clientPort)) - } - return endpoints } func (e *EmbeddedEtcd) PeerEndpoints() []string { appCfg := e.cfg.Config() peerPort := appCfg.EtcdServer.PeerPort - endpoints := []string{ + return []string{ fmt.Sprintf("https://%s:%d", appCfg.IPv4Address, peerPort), + fmt.Sprintf("https://%s:%d", appCfg.Hostname, peerPort), } - // Add hostname-based URL if hostname differs from IP - if appCfg.Hostname != "" && appCfg.Hostname != appCfg.IPv4Address { - endpoints = append(endpoints, fmt.Sprintf("https://%s:%d", appCfg.Hostname, peerPort)) - } - return endpoints } func (e *EmbeddedEtcd) etcdDir() string { @@ -661,23 +653,17 @@ func embedConfig(cfg config.Config, logger zerolog.Logger) (*embed.Config, error } c.AdvertiseClientUrls = []url.URL{ {Scheme: "https", Host: fmt.Sprintf("%s:%d", myIP, clientPort)}, + {Scheme: "https", Host: fmt.Sprintf("%s:%d", cfg.Hostname, clientPort)}, } - // Add hostname-based URL if hostname differs from IP - if cfg.Hostname != "" && cfg.Hostname != myIP { - c.AdvertiseClientUrls = append(c.AdvertiseClientUrls, - url.URL{Scheme: "https", Host: fmt.Sprintf("%s:%d", cfg.Hostname, clientPort)}) - } + c.ListenPeerUrls = []url.URL{ {Scheme: "https", Host: fmt.Sprintf("0.0.0.0:%d", peerPort)}, } c.AdvertisePeerUrls = []url.URL{ {Scheme: "https", Host: fmt.Sprintf("%s:%d", myIP, peerPort)}, + {Scheme: "https", Host: fmt.Sprintf("%s:%d", cfg.Hostname, peerPort)}, } - // Add hostname-based URL if hostname differs from IP - if cfg.Hostname != "" && cfg.Hostname != myIP { - c.AdvertisePeerUrls = append(c.AdvertisePeerUrls, - url.URL{Scheme: "https", Host: fmt.Sprintf("%s:%d", cfg.Hostname, peerPort)}) - } + // This will get overridden when joining an existing cluster c.InitialCluster = fmt.Sprintf( "%s=http://%s:%d", diff --git a/server/internal/etcd/embedded_test.go b/server/internal/etcd/embedded_test.go index 69d42217..67d0d3c8 100644 --- a/server/internal/etcd/embedded_test.go +++ b/server/internal/etcd/embedded_test.go @@ -421,41 +421,6 @@ func TestEmbeddedEtcd(t *testing.T) { assert.Len(t, member.ClientURLs, 2, "Member should have both IP and hostname client URLs") }) - t.Run("endpoints only include IP when hostname equals IP", func(t *testing.T) { - ctx := t.Context() - - cfg := config.Config{ - HostID: uuid.NewString(), - DataDir: t.TempDir(), - EtcdMode: config.EtcdModeServer, - IPv4Address: "127.0.0.1", - Hostname: "127.0.0.1", - EtcdClient: config.EtcdClient{ - LogLevel: "debug", - }, - EtcdServer: config.EtcdServer{ - LogLevel: "debug", - ClientPort: storagetest.GetFreePort(t), - PeerPort: storagetest.GetFreePort(t), - }, - } - - server := etcd.NewEmbeddedEtcd(cfgMgr(t, cfg), testutils.Logger(t)) - require.NotNil(t, server) - - err := server.Start(ctx) - require.NoError(t, err) - t.Cleanup(func() { - server.Shutdown() - }) - - // Should only have 1 endpoint (no duplicate when hostname equals IP) - peerEndpoints := server.PeerEndpoints() - assert.Len(t, peerEndpoints, 1, "PeerEndpoints should only have IP when hostname equals IP") - - clientEndpoints := server.ClientEndpoints() - assert.Len(t, clientEndpoints, 1, "ClientEndpoints should only have IP when hostname equals IP") - }) } func cfgMgr(t testing.TB, cfg config.Config) *config.Manager { diff --git a/server/internal/etcd/membership.go b/server/internal/etcd/membership.go index 8dbd1f28..8c18f057 100644 --- a/server/internal/etcd/membership.go +++ b/server/internal/etcd/membership.go @@ -10,6 +10,7 @@ import ( clientv3 "go.etcd.io/etcd/client/v3" "github.com/pgEdge/control-plane/server/internal/certificates" + "github.com/pgEdge/control-plane/server/internal/ds" ) var ( @@ -117,7 +118,7 @@ func UpdateMemberPeerURLs(ctx context.Context, client *clientv3.Client, memberNa return false, fmt.Errorf("member %s not found", memberName) } - if UrlsEqual(member.PeerURLs, newPeerURLs) { + if ds.SetSymmetricDifference(member.PeerURLs, newPeerURLs).Size() == 0 { return false, nil // No change needed } @@ -128,19 +129,3 @@ func UpdateMemberPeerURLs(ctx context.Context, client *clientv3.Client, memberNa return true, nil } - -func UrlsEqual(a, b []string) bool { - if len(a) != len(b) { - return false - } - aSet := make(map[string]struct{}, len(a)) - for _, url := range a { - aSet[url] = struct{}{} - } - for _, url := range b { - if _, ok := aSet[url]; !ok { - return false - } - } - return true -} diff --git a/server/internal/etcd/membership_test.go b/server/internal/etcd/membership_test.go index aaa09cfb..563f1339 100644 --- a/server/internal/etcd/membership_test.go +++ b/server/internal/etcd/membership_test.go @@ -5,7 +5,7 @@ package etcd_test import ( "testing" - "github.com/pgEdge/control-plane/server/internal/etcd" + "github.com/pgEdge/control-plane/server/internal/ds" "github.com/stretchr/testify/assert" ) @@ -70,7 +70,7 @@ func TestUrlsEqual(t *testing.T) { for _, tt := range tests { t.Run(tt.name, func(t *testing.T) { - result := etcd.UrlsEqual(tt.a, tt.b) + result := ds.SetSymmetricDifference(tt.a, tt.b).Size() == 0 assert.Equal(t, tt.expected, result) }) } diff --git a/server/internal/orchestrator/swarm/patroni_config.go b/server/internal/orchestrator/swarm/patroni_config.go index b467729f..9971a5e8 100644 --- a/server/internal/orchestrator/swarm/patroni_config.go +++ b/server/internal/orchestrator/swarm/patroni_config.go @@ -5,13 +5,11 @@ import ( "encoding/json" "errors" "fmt" - "io" "maps" "net/url" "path/filepath" "github.com/alessio/shellescape" - "github.com/rs/zerolog/log" "github.com/samber/do" "github.com/spf13/afero" clientv3 "go.etcd.io/etcd/client/v3" @@ -172,11 +170,6 @@ func (c *PatroniConfig) Create(ctx context.Context, rc *resource.Context) error return fmt.Errorf("failed to change ownership for %s: %w", configPath, err) } - // Signal Patroni to reload config - if err := c.signalPatroniReload(ctx, rc); err != nil { - log.Warn().Err(err).Str("instance_id", c.Spec.InstanceID).Msg("failed to signal Patroni to reload config") - } - return nil } @@ -184,24 +177,6 @@ func (c *PatroniConfig) Update(ctx context.Context, rc *resource.Context) error return c.Create(ctx, rc) } -func (c *PatroniConfig) signalPatroniReload(ctx context.Context, rc *resource.Context) error { - dockerClient, err := do.Invoke[*docker.Docker](rc.Injector) - if err != nil { - return err - } - - err = PostgresContainerExec(ctx, io.Discard, dockerClient, c.Spec.InstanceID, []string{"kill", "-HUP", "1"}) - if err != nil { - // Container might not be running yet during initial creation - if errors.Is(err, ErrNoPostgresContainer) { - return nil - } - return fmt.Errorf("failed to send SIGHUP to Patroni: %w", err) - } - - return nil -} - func (c *PatroniConfig) Delete(ctx context.Context, rc *resource.Context) error { fs, err := do.Invoke[afero.Fs](rc.Injector) if err != nil {