Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion server/internal/api/apiv1/post_init_handlers.go
Original file line number Diff line number Diff line change
Expand Up @@ -569,7 +569,7 @@ func (s *PostInitHandlers) SwitchoverDatabaseNode(ctx context.Context, req *api.
}
input.ScheduledAt = tm.UTC()

s.logger.Info().
s.logger.Debug().
Str("database_id", databaseID).
Str("scheduled_at", input.ScheduledAt.String()).
Msg("parsed scheduled_at")
Expand Down
16 changes: 8 additions & 8 deletions server/internal/api/http.go
Original file line number Diff line number Diff line change
Expand Up @@ -45,23 +45,23 @@ func (s *httpServer) start() {
}

func (s *httpServer) listenAndServe() {
go func() {
s.logger.Info().
Str("host_port", s.server.Addr).
Msg("starting http server")
s.logger.Info().
Str("host_port", s.server.Addr).
Msg("starting http server")

go func() {
if err := s.server.ListenAndServe(); err != nil {
s.errCh <- fmt.Errorf("http server error: %w", err)
}
}()
}

func (s *httpServer) listenAndServeTLS() {
go func() {
s.logger.Info().
Str("host_port", s.server.Addr).
Msg("starting https server")
s.logger.Info().
Str("host_port", s.server.Addr).
Msg("starting https server")

go func() {
rootCA, err := os.ReadFile(s.cfg.CACert)
if err != nil {
s.errCh <- fmt.Errorf("failed to read CA cert: %w", err)
Expand Down
18 changes: 13 additions & 5 deletions server/internal/api/server.go
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,7 @@ import (
var _ do.Shutdownable = (*Server)(nil)

type Server struct {
logger zerolog.Logger
started bool
cfg config.Config
v1Svc *apiv1.Service
Expand Down Expand Up @@ -63,15 +64,18 @@ func NewServer(
}

return &Server{
cfg: cfg,
v1Svc: v1Svc,
http: httpSvr,
mqtt: mqttSvr,
errCh: make(chan error, 2),
logger: logger,
cfg: cfg,
v1Svc: v1Svc,
http: httpSvr,
mqtt: mqttSvr,
errCh: make(chan error, 2),
}
}

func (s *Server) ServePreInit(ctx context.Context) error {
s.logger.Debug().Msg("serving pre-init handlers")

if err := s.v1Svc.UsePreInitHandlers(); err != nil {
return fmt.Errorf("failed to set v1 api to use pre-init handlers: %w", err)
}
Expand All @@ -82,6 +86,8 @@ func (s *Server) ServePreInit(ctx context.Context) error {
}

func (s *Server) ServePostInit(ctx context.Context) error {
s.logger.Debug().Msg("serving post-init handlers")

if err := s.v1Svc.UsePostInitHandlers(); err != nil {
return fmt.Errorf("failed to set v1 api to use post-init handlers: %w", err)
}
Expand Down Expand Up @@ -129,6 +135,8 @@ func (s *Server) serve(ctx context.Context) {
}

func (s *Server) Shutdown() error {
s.logger.Debug().Msg("shutting down api server")

ctx := context.Background()

var errs []error
Expand Down
134 changes: 97 additions & 37 deletions server/internal/app/app.go
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,7 @@ import (
"context"
"errors"
"fmt"
"time"

"github.com/rs/zerolog"
"github.com/samber/do"
Expand All @@ -21,17 +22,24 @@ import (
"github.com/pgEdge/control-plane/server/internal/workflows"
)

type ErrorProducer interface {
Error() <-chan error
}

type Orchestrator interface {
host.Orchestrator
database.Orchestrator
}

type App struct {
i *do.Injector
cfg config.Config
logger zerolog.Logger
etcd etcd.Etcd
api *api.Server
i *do.Injector
cfg config.Config
logger zerolog.Logger
etcd etcd.Etcd
api *api.Server
errCh chan error
serviceCtx context.Context
serviceCtxCancel context.CancelFunc
}

func NewApp(i *do.Injector) (*App, error) {
Expand All @@ -48,12 +56,19 @@ func NewApp(i *do.Injector) (*App, error) {
i: i,
cfg: cfg,
logger: logger,
errCh: make(chan error, 1),
}

return app, nil
}

func (a *App) Run(ctx context.Context) error {
func (a *App) Run(parentCtx context.Context) error {
// The caller of this method cancels parentCtx to trigger a shutdown.
// However, some shut down procedures need an active context. We provide
// this separate context to services that are managed by `App` and we cancel
// it after all the shutdown processes have finished.
a.serviceCtx, a.serviceCtxCancel = context.WithCancel(context.Background())

// grpclog needs to be configured before grpc.Dial is called
grpcLogger, err := do.Invoke[grpclog.LoggerV2](a.i)
if err != nil {
Expand All @@ -65,10 +80,12 @@ func (a *App) Run(ctx context.Context) error {
if err != nil {
return fmt.Errorf("failed to initialize etcd: %w", err)
}

apiServer, err := do.Invoke[*api.Server](a.i)
if err != nil {
return fmt.Errorf("failed to initialize api server: %w", err)
}
a.addErrorProducer(parentCtx, apiServer)

a.etcd = e
a.api = apiServer
Expand All @@ -78,34 +95,54 @@ func (a *App) Run(ctx context.Context) error {
return fmt.Errorf("failed to check if etcd is initialized: %w", err)
}
if initialized {
if err := a.etcd.Start(ctx); err != nil {
if err := a.etcd.Start(a.serviceCtx); err != nil {
return fmt.Errorf("failed to start etcd: %w", err)
}
return a.runInitialized(ctx)
a.addErrorProducer(parentCtx, a.etcd)

return a.shutdown(a.runInitialized(parentCtx))
} else {
return a.runPreInitialization(ctx)
return a.shutdown(a.runPreInitialization(parentCtx))
}
}

func (a *App) runPreInitialization(ctx context.Context) error {
if err := a.api.ServePreInit(ctx); err != nil {
func (a *App) addErrorProducer(parentCtx context.Context, producer ErrorProducer) {
go func() {
select {
case <-parentCtx.Done():
return
case err := <-producer.Error():
if err != nil {
a.errCh <- err
}
}
}()
}

func (a *App) runPreInitialization(parentCtx context.Context) error {
if err := a.api.ServePreInit(a.serviceCtx); err != nil {
return fmt.Errorf("failed to serve pre-init API: %w", err)
}

a.logger.Info().
Str("state", "uninitialized").
Msg("server ready")

select {
case <-ctx.Done():
case <-parentCtx.Done():
a.logger.Info().Msg("got shutdown signal")
return a.Shutdown(nil)
case err := <-a.api.Error():
return a.Shutdown(err)
return nil
case err := <-a.errCh:
return err
case <-a.etcd.Initialized():
a.logger.Info().Msg("etcd initialized")
a.addErrorProducer(parentCtx, a.etcd)
config.UpdateInjectedConfig(a.i)
return a.runInitialized(ctx)
return a.runInitialized(parentCtx)
}
}

func (a *App) runInitialized(ctx context.Context) error {
func (a *App) runInitialized(parentCtx context.Context) error {
handleError := func(err error) error {
a.api.HandleInitializationError(err)
return err
Expand All @@ -116,78 +153,101 @@ func (a *App) runInitialized(ctx context.Context) error {
if err != nil {
return handleError(fmt.Errorf("failed to initialize migration runner: %w", err))
}
if err := migrationRunner.Run(ctx); err != nil {
if err := migrationRunner.Run(a.serviceCtx); err != nil {
return handleError(fmt.Errorf("failed to run migrations: %w", err))
}

certSvc, err := do.Invoke[*certificates.Service](a.i)
if err != nil {
return handleError(fmt.Errorf("failed to initialize certificate service: %w", err))
}
if err := certSvc.Start(ctx); err != nil {
if err := certSvc.Start(a.serviceCtx); err != nil {
return handleError(fmt.Errorf("failed to start certificate service: %w", err))
}

hostSvc, err := do.Invoke[*host.Service](a.i)
if err != nil {
return handleError(fmt.Errorf("failed to initialize host service: %w", err))
}
if err := hostSvc.UpdateHost(ctx); err != nil {
if err := hostSvc.UpdateHost(a.serviceCtx); err != nil {
return handleError(fmt.Errorf("failed to update host: %w", err))
}

hostTicker, err := do.Invoke[*host.UpdateTicker](a.i)
if err != nil {
return handleError(fmt.Errorf("failed to initialize host ticker: %w", err))
}
hostTicker.Start(ctx)
hostTicker.Start(a.serviceCtx)

monitorSvc, err := do.Invoke[*monitor.Service](a.i)
if err != nil {
return handleError(fmt.Errorf("failed to initialize monitor service: %w", err))
}
if err := monitorSvc.Start(ctx); err != nil {
if err := monitorSvc.Start(a.serviceCtx); err != nil {
return handleError(fmt.Errorf("failed to start monitor service: %w", err))
}

schedulerSvc, err := do.Invoke[*scheduler.Service](a.i)
if err != nil {
return handleError(fmt.Errorf("failed to initialize scheduler service: %w", err))
}
if err := schedulerSvc.Start(ctx); err != nil {
a.addErrorProducer(parentCtx, schedulerSvc)
if err := schedulerSvc.Start(a.serviceCtx); err != nil {
return handleError(fmt.Errorf("failed to start scheduler service: %w", err))
}

worker, err := do.Invoke[*workflows.Worker](a.i)
if err != nil {
return handleError(fmt.Errorf("failed to initialize worker: %w", err))
}
if err := worker.Start(ctx); err != nil {
if err := worker.Start(a.serviceCtx); err != nil {
return handleError(fmt.Errorf("failed to start worker: %w", err))
}

if err := a.api.ServePostInit(ctx); err != nil {
if err := a.api.ServePostInit(a.serviceCtx); err != nil {
return handleError(fmt.Errorf("failed to serve post-init API: %w", err))
}

a.logger.Info().
Str("state", "initialized").
Msg("server ready")

select {
case <-ctx.Done():
case <-parentCtx.Done():
a.logger.Info().Msg("got shutdown signal")
return a.Shutdown(nil)
case err := <-a.api.Error():
return a.Shutdown(err)
case err := <-schedulerSvc.Error():
return a.Shutdown(err)

return nil
case err := <-a.errCh:
return err
}
}

func (a *App) Shutdown(reason error) error {
a.logger.Info().Msg("attempting to gracefully shut down")
func (a *App) shutdown(reason error) error {
defer a.logger.Info().Msg("shutdown complete")
defer a.serviceCtxCancel()

if reason != nil {
a.logger.Err(reason).Msg("shutting down due to error")
}

a.logger.Info().
Int64("stop_grace_period_seconds", a.cfg.StopGracePeriodSeconds).
Msg("attempting to gracefully shut down")

errs := []error{
reason,
a.i.Shutdown(),
errCh := make(chan error, 1)

go func() {
errCh <- a.i.Shutdown()
}()

var errs = []error{reason}

gracePeriod := time.Duration(a.cfg.StopGracePeriodSeconds) * time.Second

select {
case err := <-errCh:
errs = append(errs, err)
case <-time.After(gracePeriod):
errs = append(errs, fmt.Errorf("graceful shutdown timed out after %d seconds", a.cfg.StopGracePeriodSeconds))
}

return errors.Join(errs...)
Expand Down
15 changes: 14 additions & 1 deletion server/internal/etcd/embedded.go
Original file line number Diff line number Diff line change
Expand Up @@ -44,8 +44,10 @@ type EmbeddedEtcd struct {
func NewEmbeddedEtcd(cfg *config.Manager, logger zerolog.Logger) *EmbeddedEtcd {
return &EmbeddedEtcd{
cfg: cfg,
logger: logger,
initialized: make(chan struct{}),
logger: logger.With().
Str("component", "etcd_server").
Logger(),
}
}

Expand Down Expand Up @@ -155,6 +157,10 @@ func (e *EmbeddedEtcd) initialize(ctx context.Context) error {

func (e *EmbeddedEtcd) start(ctx context.Context) error {
appCfg := e.cfg.Config()
e.logger.Info().
Int("peer_port", appCfg.EtcdServer.PeerPort).
Int("client_port", appCfg.EtcdServer.ClientPort).
Msg("starting embedded etcd server")

etcdCfg, err := embedConfig(appCfg, e.logger)
if err != nil {
Expand Down Expand Up @@ -264,6 +270,11 @@ func (e *EmbeddedEtcd) Join(ctx context.Context, options JoinOptions) error {
etcdCfg.InitialCluster = strings.Join(peers, ",")
etcdCfg.ClusterState = embed.ClusterStateFlagExisting

e.logger.Info().
Int("peer_port", appCfg.EtcdServer.PeerPort).
Int("client_port", appCfg.EtcdServer.ClientPort).
Msg("starting embedded etcd server")

etcd, err := startEmbedded(ctx, etcdCfg)
if err != nil {
return err
Expand Down Expand Up @@ -299,6 +310,8 @@ func (e *EmbeddedEtcd) Initialized() <-chan struct{} {
}

func (e *EmbeddedEtcd) Shutdown() error {
e.logger.Info().Msg("shutting down embedded etcd server")

var errs []error
if e.client != nil {
if err := e.client.Close(); err != nil {
Expand Down
Loading