diff --git a/server/internal/api/apiv1/post_init_handlers.go b/server/internal/api/apiv1/post_init_handlers.go index b6779a58..ad48af00 100644 --- a/server/internal/api/apiv1/post_init_handlers.go +++ b/server/internal/api/apiv1/post_init_handlers.go @@ -569,7 +569,7 @@ func (s *PostInitHandlers) SwitchoverDatabaseNode(ctx context.Context, req *api. } input.ScheduledAt = tm.UTC() - s.logger.Info(). + s.logger.Debug(). Str("database_id", databaseID). Str("scheduled_at", input.ScheduledAt.String()). Msg("parsed scheduled_at") diff --git a/server/internal/api/http.go b/server/internal/api/http.go index f7ea5ba0..59dcb064 100644 --- a/server/internal/api/http.go +++ b/server/internal/api/http.go @@ -45,11 +45,11 @@ func (s *httpServer) start() { } func (s *httpServer) listenAndServe() { - go func() { - s.logger.Info(). - Str("host_port", s.server.Addr). - Msg("starting http server") + s.logger.Info(). + Str("host_port", s.server.Addr). + Msg("starting http server") + go func() { if err := s.server.ListenAndServe(); err != nil { s.errCh <- fmt.Errorf("http server error: %w", err) } @@ -57,11 +57,11 @@ func (s *httpServer) listenAndServe() { } func (s *httpServer) listenAndServeTLS() { - go func() { - s.logger.Info(). - Str("host_port", s.server.Addr). - Msg("starting https server") + s.logger.Info(). + Str("host_port", s.server.Addr). + Msg("starting https server") + go func() { rootCA, err := os.ReadFile(s.cfg.CACert) if err != nil { s.errCh <- fmt.Errorf("failed to read CA cert: %w", err) diff --git a/server/internal/api/server.go b/server/internal/api/server.go index c897a2f2..c89eaed2 100644 --- a/server/internal/api/server.go +++ b/server/internal/api/server.go @@ -17,6 +17,7 @@ import ( var _ do.Shutdownable = (*Server)(nil) type Server struct { + logger zerolog.Logger started bool cfg config.Config v1Svc *apiv1.Service @@ -63,15 +64,18 @@ func NewServer( } return &Server{ - cfg: cfg, - v1Svc: v1Svc, - http: httpSvr, - mqtt: mqttSvr, - errCh: make(chan error, 2), + logger: logger, + cfg: cfg, + v1Svc: v1Svc, + http: httpSvr, + mqtt: mqttSvr, + errCh: make(chan error, 2), } } func (s *Server) ServePreInit(ctx context.Context) error { + s.logger.Debug().Msg("serving pre-init handlers") + if err := s.v1Svc.UsePreInitHandlers(); err != nil { return fmt.Errorf("failed to set v1 api to use pre-init handlers: %w", err) } @@ -82,6 +86,8 @@ func (s *Server) ServePreInit(ctx context.Context) error { } func (s *Server) ServePostInit(ctx context.Context) error { + s.logger.Debug().Msg("serving post-init handlers") + if err := s.v1Svc.UsePostInitHandlers(); err != nil { return fmt.Errorf("failed to set v1 api to use post-init handlers: %w", err) } @@ -129,6 +135,8 @@ func (s *Server) serve(ctx context.Context) { } func (s *Server) Shutdown() error { + s.logger.Debug().Msg("shutting down api server") + ctx := context.Background() var errs []error diff --git a/server/internal/app/app.go b/server/internal/app/app.go index 51b3ca69..efcf81bd 100644 --- a/server/internal/app/app.go +++ b/server/internal/app/app.go @@ -4,6 +4,7 @@ import ( "context" "errors" "fmt" + "time" "github.com/rs/zerolog" "github.com/samber/do" @@ -21,17 +22,24 @@ import ( "github.com/pgEdge/control-plane/server/internal/workflows" ) +type ErrorProducer interface { + Error() <-chan error +} + type Orchestrator interface { host.Orchestrator database.Orchestrator } type App struct { - i *do.Injector - cfg config.Config - logger zerolog.Logger - etcd etcd.Etcd - api *api.Server + i *do.Injector + cfg config.Config + logger zerolog.Logger + etcd etcd.Etcd + api *api.Server + errCh chan error + serviceCtx context.Context + serviceCtxCancel context.CancelFunc } func NewApp(i *do.Injector) (*App, error) { @@ -48,12 +56,19 @@ func NewApp(i *do.Injector) (*App, error) { i: i, cfg: cfg, logger: logger, + errCh: make(chan error, 1), } return app, nil } -func (a *App) Run(ctx context.Context) error { +func (a *App) Run(parentCtx context.Context) error { + // The caller of this method cancels parentCtx to trigger a shutdown. + // However, some shut down procedures need an active context. We provide + // this separate context to services that are managed by `App` and we cancel + // it after all the shutdown processes have finished. + a.serviceCtx, a.serviceCtxCancel = context.WithCancel(context.Background()) + // grpclog needs to be configured before grpc.Dial is called grpcLogger, err := do.Invoke[grpclog.LoggerV2](a.i) if err != nil { @@ -65,10 +80,12 @@ func (a *App) Run(ctx context.Context) error { if err != nil { return fmt.Errorf("failed to initialize etcd: %w", err) } + apiServer, err := do.Invoke[*api.Server](a.i) if err != nil { return fmt.Errorf("failed to initialize api server: %w", err) } + a.addErrorProducer(parentCtx, apiServer) a.etcd = e a.api = apiServer @@ -78,34 +95,54 @@ func (a *App) Run(ctx context.Context) error { return fmt.Errorf("failed to check if etcd is initialized: %w", err) } if initialized { - if err := a.etcd.Start(ctx); err != nil { + if err := a.etcd.Start(a.serviceCtx); err != nil { return fmt.Errorf("failed to start etcd: %w", err) } - return a.runInitialized(ctx) + a.addErrorProducer(parentCtx, a.etcd) + + return a.shutdown(a.runInitialized(parentCtx)) } else { - return a.runPreInitialization(ctx) + return a.shutdown(a.runPreInitialization(parentCtx)) } } -func (a *App) runPreInitialization(ctx context.Context) error { - if err := a.api.ServePreInit(ctx); err != nil { +func (a *App) addErrorProducer(parentCtx context.Context, producer ErrorProducer) { + go func() { + select { + case <-parentCtx.Done(): + return + case err := <-producer.Error(): + if err != nil { + a.errCh <- err + } + } + }() +} + +func (a *App) runPreInitialization(parentCtx context.Context) error { + if err := a.api.ServePreInit(a.serviceCtx); err != nil { return fmt.Errorf("failed to serve pre-init API: %w", err) } + a.logger.Info(). + Str("state", "uninitialized"). + Msg("server ready") + select { - case <-ctx.Done(): + case <-parentCtx.Done(): a.logger.Info().Msg("got shutdown signal") - return a.Shutdown(nil) - case err := <-a.api.Error(): - return a.Shutdown(err) + return nil + case err := <-a.errCh: + return err case <-a.etcd.Initialized(): a.logger.Info().Msg("etcd initialized") + a.addErrorProducer(parentCtx, a.etcd) config.UpdateInjectedConfig(a.i) - return a.runInitialized(ctx) + return a.runInitialized(parentCtx) } } -func (a *App) runInitialized(ctx context.Context) error { +func (a *App) runInitialized(parentCtx context.Context) error { handleError := func(err error) error { a.api.HandleInitializationError(err) return err @@ -116,7 +153,7 @@ func (a *App) runInitialized(ctx context.Context) error { if err != nil { return handleError(fmt.Errorf("failed to initialize migration runner: %w", err)) } - if err := migrationRunner.Run(ctx); err != nil { + if err := migrationRunner.Run(a.serviceCtx); err != nil { return handleError(fmt.Errorf("failed to run migrations: %w", err)) } @@ -124,7 +161,7 @@ func (a *App) runInitialized(ctx context.Context) error { if err != nil { return handleError(fmt.Errorf("failed to initialize certificate service: %w", err)) } - if err := certSvc.Start(ctx); err != nil { + if err := certSvc.Start(a.serviceCtx); err != nil { return handleError(fmt.Errorf("failed to start certificate service: %w", err)) } @@ -132,7 +169,7 @@ func (a *App) runInitialized(ctx context.Context) error { if err != nil { return handleError(fmt.Errorf("failed to initialize host service: %w", err)) } - if err := hostSvc.UpdateHost(ctx); err != nil { + if err := hostSvc.UpdateHost(a.serviceCtx); err != nil { return handleError(fmt.Errorf("failed to update host: %w", err)) } @@ -140,13 +177,13 @@ func (a *App) runInitialized(ctx context.Context) error { if err != nil { return handleError(fmt.Errorf("failed to initialize host ticker: %w", err)) } - hostTicker.Start(ctx) + hostTicker.Start(a.serviceCtx) monitorSvc, err := do.Invoke[*monitor.Service](a.i) if err != nil { return handleError(fmt.Errorf("failed to initialize monitor service: %w", err)) } - if err := monitorSvc.Start(ctx); err != nil { + if err := monitorSvc.Start(a.serviceCtx); err != nil { return handleError(fmt.Errorf("failed to start monitor service: %w", err)) } @@ -154,7 +191,8 @@ func (a *App) runInitialized(ctx context.Context) error { if err != nil { return handleError(fmt.Errorf("failed to initialize scheduler service: %w", err)) } - if err := schedulerSvc.Start(ctx); err != nil { + a.addErrorProducer(parentCtx, schedulerSvc) + if err := schedulerSvc.Start(a.serviceCtx); err != nil { return handleError(fmt.Errorf("failed to start scheduler service: %w", err)) } @@ -162,32 +200,54 @@ func (a *App) runInitialized(ctx context.Context) error { if err != nil { return handleError(fmt.Errorf("failed to initialize worker: %w", err)) } - if err := worker.Start(ctx); err != nil { + if err := worker.Start(a.serviceCtx); err != nil { return handleError(fmt.Errorf("failed to start worker: %w", err)) } - if err := a.api.ServePostInit(ctx); err != nil { + if err := a.api.ServePostInit(a.serviceCtx); err != nil { return handleError(fmt.Errorf("failed to serve post-init API: %w", err)) } + a.logger.Info(). + Str("state", "initialized"). + Msg("server ready") + select { - case <-ctx.Done(): + case <-parentCtx.Done(): a.logger.Info().Msg("got shutdown signal") - return a.Shutdown(nil) - case err := <-a.api.Error(): - return a.Shutdown(err) - case err := <-schedulerSvc.Error(): - return a.Shutdown(err) - + return nil + case err := <-a.errCh: + return err } } -func (a *App) Shutdown(reason error) error { - a.logger.Info().Msg("attempting to gracefully shut down") +func (a *App) shutdown(reason error) error { + defer a.logger.Info().Msg("shutdown complete") + defer a.serviceCtxCancel() + + if reason != nil { + a.logger.Err(reason).Msg("shutting down due to error") + } + + a.logger.Info(). + Int64("stop_grace_period_seconds", a.cfg.StopGracePeriodSeconds). + Msg("attempting to gracefully shut down") - errs := []error{ - reason, - a.i.Shutdown(), + errCh := make(chan error, 1) + + go func() { + errCh <- a.i.Shutdown() + }() + + var errs = []error{reason} + + gracePeriod := time.Duration(a.cfg.StopGracePeriodSeconds) * time.Second + + select { + case err := <-errCh: + errs = append(errs, err) + case <-time.After(gracePeriod): + errs = append(errs, fmt.Errorf("graceful shutdown timed out after %d seconds", a.cfg.StopGracePeriodSeconds)) } return errors.Join(errs...) diff --git a/server/internal/etcd/embedded.go b/server/internal/etcd/embedded.go index 5b01e583..29df44c9 100644 --- a/server/internal/etcd/embedded.go +++ b/server/internal/etcd/embedded.go @@ -44,8 +44,10 @@ type EmbeddedEtcd struct { func NewEmbeddedEtcd(cfg *config.Manager, logger zerolog.Logger) *EmbeddedEtcd { return &EmbeddedEtcd{ cfg: cfg, - logger: logger, initialized: make(chan struct{}), + logger: logger.With(). + Str("component", "etcd_server"). + Logger(), } } @@ -155,6 +157,10 @@ func (e *EmbeddedEtcd) initialize(ctx context.Context) error { func (e *EmbeddedEtcd) start(ctx context.Context) error { appCfg := e.cfg.Config() + e.logger.Info(). + Int("peer_port", appCfg.EtcdServer.PeerPort). + Int("client_port", appCfg.EtcdServer.ClientPort). + Msg("starting embedded etcd server") etcdCfg, err := embedConfig(appCfg, e.logger) if err != nil { @@ -264,6 +270,11 @@ func (e *EmbeddedEtcd) Join(ctx context.Context, options JoinOptions) error { etcdCfg.InitialCluster = strings.Join(peers, ",") etcdCfg.ClusterState = embed.ClusterStateFlagExisting + e.logger.Info(). + Int("peer_port", appCfg.EtcdServer.PeerPort). + Int("client_port", appCfg.EtcdServer.ClientPort). + Msg("starting embedded etcd server") + etcd, err := startEmbedded(ctx, etcdCfg) if err != nil { return err @@ -299,6 +310,8 @@ func (e *EmbeddedEtcd) Initialized() <-chan struct{} { } func (e *EmbeddedEtcd) Shutdown() error { + e.logger.Info().Msg("shutting down embedded etcd server") + var errs []error if e.client != nil { if err := e.client.Close(); err != nil { diff --git a/server/internal/etcd/provide.go b/server/internal/etcd/provide.go index a634d80d..eed6271a 100644 --- a/server/internal/etcd/provide.go +++ b/server/internal/etcd/provide.go @@ -58,7 +58,7 @@ func provideEtcd(i *do.Injector) { oldMode := generated.EtcdMode newMode := appCfg.EtcdMode - logger.Info(). + logger.Debug(). Str("old_mode", string(oldMode)). Str("new_mode", string(newMode)). Bool("old_mode_empty", oldMode == ""). diff --git a/server/internal/etcd/remote.go b/server/internal/etcd/remote.go index 208eca2e..4960b0e9 100644 --- a/server/internal/etcd/remote.go +++ b/server/internal/etcd/remote.go @@ -34,9 +34,11 @@ type RemoteEtcd struct { func NewRemoteEtcd(cfg *config.Manager, logger zerolog.Logger) *RemoteEtcd { return &RemoteEtcd{ cfg: cfg, - logger: logger, initialized: make(chan struct{}), err: make(chan error), + logger: logger.With(). + Str("component", "etcd_client"). + Logger(), } } @@ -46,6 +48,8 @@ func (r *RemoteEtcd) IsInitialized() (bool, error) { } func (r *RemoteEtcd) Start(ctx context.Context) error { + r.logger.Debug().Msg("starting etcd client") + initialized, err := r.IsInitialized() if err != nil { return err @@ -196,6 +200,8 @@ func (r *RemoteEtcd) VerifyJoinToken(in string) error { } func (r *RemoteEtcd) Shutdown() error { + r.logger.Debug().Msg("shutting down etcd client") + if r.client == nil { return nil } diff --git a/server/internal/migrate/runner.go b/server/internal/migrate/runner.go index 4ed4339e..d7e46f10 100644 --- a/server/internal/migrate/runner.go +++ b/server/internal/migrate/runner.go @@ -146,7 +146,7 @@ func (r *Runner) runMigrations(ctx context.Context) error { startIndex := r.findStartIndex(currentRevision) if startIndex >= len(r.migrations) { - r.logger.Info().Msg("control-plane db is up to date, no migrations to run") + r.logger.Debug().Msg("control-plane db is up to date, no migrations to run") return nil } @@ -199,7 +199,7 @@ func (r *Runner) findStartIndex(currentRevision string) int { func (r *Runner) hasPendingMigrations(ctx context.Context) (bool, error) { if len(r.migrations) == 0 { - r.logger.Info().Msg("no migrations to run") + r.logger.Debug().Msg("no migrations to run") return false, nil } @@ -210,7 +210,7 @@ func (r *Runner) hasPendingMigrations(ctx context.Context) (bool, error) { startIndex := r.findStartIndex(currentRevision) if startIndex >= len(r.migrations) { - r.logger.Info().Msg("control-plane db is up to date, no migrations to run") + r.logger.Debug().Msg("control-plane db is up to date, no migrations to run") return false, nil } diff --git a/server/internal/monitor/service.go b/server/internal/monitor/service.go index 23e88766..59e7c68a 100644 --- a/server/internal/monitor/service.go +++ b/server/internal/monitor/service.go @@ -49,6 +49,8 @@ func NewService( } func (s *Service) Start(ctx context.Context) error { + s.logger.Debug().Msg("starting monitors") + // The monitors should run for the lifetime of the application rather than // the lifetime of a single operation. s.appCtx = ctx @@ -72,7 +74,7 @@ func (s *Service) Start(ctx context.Context) error { } func (s *Service) Shutdown() error { - s.logger.Info().Msg("shutting down instance monitors") + s.logger.Debug().Msg("shutting down monitors") for _, mon := range s.instances { mon.Stop() diff --git a/server/internal/scheduler/service.go b/server/internal/scheduler/service.go index 1fefc650..2300508b 100644 --- a/server/internal/scheduler/service.go +++ b/server/internal/scheduler/service.go @@ -47,7 +47,7 @@ func NewService( } func (s *Service) Start(ctx context.Context) error { - s.logger.Info().Msg("starting scheduler service") + s.logger.Debug().Msg("starting scheduler service") if err := s.elector.Start(ctx); err != nil { return fmt.Errorf("failed to start elector: %w", err) @@ -83,11 +83,11 @@ func (s *Service) Start(ctx context.Context) error { } func (s *Service) Shutdown() error { - s.logger.Info().Msg("shutting down scheduler service") + s.logger.Debug().Msg("shutting down scheduler service") s.scheduler.Stop() if s.watchOp != nil { - s.logger.Info().Msg("closing scheduled job watch") + s.logger.Debug().Msg("closing scheduled job watch") s.watchOp.Close() } @@ -115,7 +115,7 @@ func (s *Service) registerJob(ctx context.Context, job *StoredScheduledJob) erro } s.runners[job.ID] = gocronJob - s.logger.Info().Str("job_id", job.ID).Msg("registered scheduled job") + s.logger.Debug().Str("job_id", job.ID).Msg("registered scheduled job") return nil } @@ -126,7 +126,7 @@ func (s *Service) UnregisterJob(jobID string) { if job, ok := s.runners[jobID]; ok { s.scheduler.RemoveByReference(job) delete(s.runners, jobID) - s.logger.Info().Str("job_id", jobID).Msg("unregistered scheduled job") + s.logger.Debug().Str("job_id", jobID).Msg("unregistered scheduled job") } } diff --git a/server/internal/workflows/provide.go b/server/internal/workflows/provide.go index edc10a92..8e0160c6 100644 --- a/server/internal/workflows/provide.go +++ b/server/internal/workflows/provide.go @@ -25,6 +25,10 @@ func Provide(i *do.Injector) { func provideWorker(i *do.Injector) { do.Provide(i, func(i *do.Injector) (*Worker, error) { + logger, err := do.Invoke[zerolog.Logger](i) + if err != nil { + return nil, err + } be, err := do.Invoke[backend.Backend](i) if err != nil { return nil, err @@ -37,7 +41,7 @@ func provideWorker(i *do.Injector) { if err != nil { return nil, err } - return NewWorker(be, workflows, orch) + return NewWorker(logger, be, workflows, orch) }) } diff --git a/server/internal/workflows/worker.go b/server/internal/workflows/worker.go index 980fac6c..3743b360 100644 --- a/server/internal/workflows/worker.go +++ b/server/internal/workflows/worker.go @@ -8,6 +8,7 @@ import ( "github.com/cschleiden/go-workflows/backend" "github.com/cschleiden/go-workflows/worker" "github.com/cschleiden/go-workflows/workflow" + "github.com/rs/zerolog" "github.com/samber/do" ) @@ -18,11 +19,14 @@ type Orchestrator interface { } type Worker struct { + logger zerolog.Logger worker *worker.Worker workflows *Workflows + ctx context.Context + cancel context.CancelFunc } -func NewWorker(be backend.Backend, workflows *Workflows, orch Orchestrator) (*Worker, error) { +func NewWorker(logger zerolog.Logger, be backend.Backend, workflows *Workflows, orch Orchestrator) (*Worker, error) { queues, err := orch.WorkerQueues() if err != nil { return nil, fmt.Errorf("failed to get worker queues: %w", err) @@ -40,19 +44,41 @@ func NewWorker(be backend.Backend, workflows *Workflows, orch Orchestrator) (*Wo } return &Worker{ + logger: logger.With(). + Str("component", "workflows_worker"). + Logger(), worker: w, workflows: workflows, }, nil } func (w *Worker) Start(ctx context.Context) error { - if err := w.worker.Start(ctx); err != nil { + if w.cancel != nil { + return fmt.Errorf("workflows worker already started") + } + + w.logger.Debug().Msg("starting workflows worker") + + // The parent context isn't canceled until the stop grace period times out, + // so we start the worker with a child context that we can cancel ourselves. + childCtx, cancel := context.WithCancel(ctx) + + if err := w.worker.Start(childCtx); err != nil { + cancel() return fmt.Errorf("failed to start worker: %w", err) } + w.ctx = childCtx + w.cancel = cancel return nil } func (w *Worker) Shutdown() error { + w.logger.Debug().Msg("shutting down workflow worker") + + if w.cancel != nil { + w.cancel() + } + if err := w.worker.WaitForCompletion(); err != nil { return fmt.Errorf("failed to wait for active tasks to complete: %w", err) }