From 84993a0cfdccaddb1cb8cd3bafa5a661c37bc322 Mon Sep 17 00:00:00 2001 From: Jason Lynch Date: Sun, 25 Jan 2026 11:07:15 -0500 Subject: [PATCH 1/2] fix: allow for complete crash The workflows worker does not exit until its context is canceled. This commit fixes an issue where certain errors, such as those originating from Etcd or the various watches, could lead to the app shutting down without canceling the worker's context. This put the server in a partially shut down state, where it would still try to run workflows but fail from missing services in our DI system. Now, the `App` manages a separate context for services, and the worker manages its own context as well. This allows for a complete crash in case of error, and improves our shutdown process by waiting until all the shutdown methods have been tried before canceling the service context. PLAT-331 --- server/internal/app/app.go | 125 ++++++++++++++++++++-------- server/internal/workflows/worker.go | 20 ++++- 2 files changed, 107 insertions(+), 38 deletions(-) diff --git a/server/internal/app/app.go b/server/internal/app/app.go index 51b3ca69..6b897e37 100644 --- a/server/internal/app/app.go +++ b/server/internal/app/app.go @@ -4,6 +4,7 @@ import ( "context" "errors" "fmt" + "time" "github.com/rs/zerolog" "github.com/samber/do" @@ -21,17 +22,24 @@ import ( "github.com/pgEdge/control-plane/server/internal/workflows" ) +type ErrorProducer interface { + Error() <-chan error +} + type Orchestrator interface { host.Orchestrator database.Orchestrator } type App struct { - i *do.Injector - cfg config.Config - logger zerolog.Logger - etcd etcd.Etcd - api *api.Server + i *do.Injector + cfg config.Config + logger zerolog.Logger + etcd etcd.Etcd + api *api.Server + errCh chan error + serviceCtx context.Context + serviceCtxCancel context.CancelFunc } func NewApp(i *do.Injector) (*App, error) { @@ -48,12 +56,19 @@ func NewApp(i *do.Injector) (*App, error) { i: i, cfg: cfg, logger: logger, + errCh: make(chan error, 1), } return app, nil } -func (a *App) Run(ctx context.Context) error { +func (a *App) Run(parentCtx context.Context) error { + // The caller of this method cancels parentCtx to trigger a shutdown. + // However, some shut down procedures need an active context. We provide + // this separate context to services that are managed by `App` and we cancel + // it after all the shutdown processes have finished. + a.serviceCtx, a.serviceCtxCancel = context.WithCancel(context.Background()) + // grpclog needs to be configured before grpc.Dial is called grpcLogger, err := do.Invoke[grpclog.LoggerV2](a.i) if err != nil { @@ -65,10 +80,12 @@ func (a *App) Run(ctx context.Context) error { if err != nil { return fmt.Errorf("failed to initialize etcd: %w", err) } + apiServer, err := do.Invoke[*api.Server](a.i) if err != nil { return fmt.Errorf("failed to initialize api server: %w", err) } + a.addErrorProducer(parentCtx, apiServer) a.etcd = e a.api = apiServer @@ -78,34 +95,50 @@ func (a *App) Run(ctx context.Context) error { return fmt.Errorf("failed to check if etcd is initialized: %w", err) } if initialized { - if err := a.etcd.Start(ctx); err != nil { + if err := a.etcd.Start(a.serviceCtx); err != nil { return fmt.Errorf("failed to start etcd: %w", err) } - return a.runInitialized(ctx) + a.addErrorProducer(parentCtx, a.etcd) + + return a.shutdown(a.runInitialized(parentCtx)) } else { - return a.runPreInitialization(ctx) + return a.shutdown(a.runPreInitialization(parentCtx)) } } -func (a *App) runPreInitialization(ctx context.Context) error { - if err := a.api.ServePreInit(ctx); err != nil { +func (a *App) addErrorProducer(parentCtx context.Context, producer ErrorProducer) { + go func() { + select { + case <-parentCtx.Done(): + return + case err := <-producer.Error(): + if err != nil { + a.errCh <- err + } + } + }() +} + +func (a *App) runPreInitialization(parentCtx context.Context) error { + if err := a.api.ServePreInit(a.serviceCtx); err != nil { return fmt.Errorf("failed to serve pre-init API: %w", err) } select { - case <-ctx.Done(): + case <-parentCtx.Done(): a.logger.Info().Msg("got shutdown signal") - return a.Shutdown(nil) - case err := <-a.api.Error(): - return a.Shutdown(err) + return nil + case err := <-a.errCh: + return err case <-a.etcd.Initialized(): a.logger.Info().Msg("etcd initialized") + a.addErrorProducer(parentCtx, a.etcd) config.UpdateInjectedConfig(a.i) - return a.runInitialized(ctx) + return a.runInitialized(parentCtx) } } -func (a *App) runInitialized(ctx context.Context) error { +func (a *App) runInitialized(parentCtx context.Context) error { handleError := func(err error) error { a.api.HandleInitializationError(err) return err @@ -116,7 +149,7 @@ func (a *App) runInitialized(ctx context.Context) error { if err != nil { return handleError(fmt.Errorf("failed to initialize migration runner: %w", err)) } - if err := migrationRunner.Run(ctx); err != nil { + if err := migrationRunner.Run(a.serviceCtx); err != nil { return handleError(fmt.Errorf("failed to run migrations: %w", err)) } @@ -124,7 +157,7 @@ func (a *App) runInitialized(ctx context.Context) error { if err != nil { return handleError(fmt.Errorf("failed to initialize certificate service: %w", err)) } - if err := certSvc.Start(ctx); err != nil { + if err := certSvc.Start(a.serviceCtx); err != nil { return handleError(fmt.Errorf("failed to start certificate service: %w", err)) } @@ -132,7 +165,7 @@ func (a *App) runInitialized(ctx context.Context) error { if err != nil { return handleError(fmt.Errorf("failed to initialize host service: %w", err)) } - if err := hostSvc.UpdateHost(ctx); err != nil { + if err := hostSvc.UpdateHost(a.serviceCtx); err != nil { return handleError(fmt.Errorf("failed to update host: %w", err)) } @@ -140,13 +173,13 @@ func (a *App) runInitialized(ctx context.Context) error { if err != nil { return handleError(fmt.Errorf("failed to initialize host ticker: %w", err)) } - hostTicker.Start(ctx) + hostTicker.Start(a.serviceCtx) monitorSvc, err := do.Invoke[*monitor.Service](a.i) if err != nil { return handleError(fmt.Errorf("failed to initialize monitor service: %w", err)) } - if err := monitorSvc.Start(ctx); err != nil { + if err := monitorSvc.Start(a.serviceCtx); err != nil { return handleError(fmt.Errorf("failed to start monitor service: %w", err)) } @@ -154,7 +187,8 @@ func (a *App) runInitialized(ctx context.Context) error { if err != nil { return handleError(fmt.Errorf("failed to initialize scheduler service: %w", err)) } - if err := schedulerSvc.Start(ctx); err != nil { + a.addErrorProducer(parentCtx, schedulerSvc) + if err := schedulerSvc.Start(a.serviceCtx); err != nil { return handleError(fmt.Errorf("failed to start scheduler service: %w", err)) } @@ -162,32 +196,49 @@ func (a *App) runInitialized(ctx context.Context) error { if err != nil { return handleError(fmt.Errorf("failed to initialize worker: %w", err)) } - if err := worker.Start(ctx); err != nil { + if err := worker.Start(a.serviceCtx); err != nil { return handleError(fmt.Errorf("failed to start worker: %w", err)) } - if err := a.api.ServePostInit(ctx); err != nil { + if err := a.api.ServePostInit(a.serviceCtx); err != nil { return handleError(fmt.Errorf("failed to serve post-init API: %w", err)) } select { - case <-ctx.Done(): + case <-parentCtx.Done(): a.logger.Info().Msg("got shutdown signal") - return a.Shutdown(nil) - case err := <-a.api.Error(): - return a.Shutdown(err) - case err := <-schedulerSvc.Error(): - return a.Shutdown(err) - + return nil + case err := <-a.errCh: + return err } } -func (a *App) Shutdown(reason error) error { - a.logger.Info().Msg("attempting to gracefully shut down") +func (a *App) shutdown(reason error) error { + defer a.serviceCtxCancel() + + if reason != nil { + a.logger.Err(reason).Msg("shutting down due to error") + } + + a.logger.Info(). + Int64("stop_grace_period_seconds", a.cfg.StopGracePeriodSeconds). + Msg("attempting to gracefully shut down") + + errCh := make(chan error, 1) - errs := []error{ - reason, - a.i.Shutdown(), + go func() { + errCh <- a.i.Shutdown() + }() + + var errs = []error{reason} + + gracePeriod := time.Duration(a.cfg.StopGracePeriodSeconds) * time.Second + + select { + case err := <-errCh: + errs = append(errs, err) + case <-time.After(gracePeriod): + errs = append(errs, fmt.Errorf("graceful shutdown timed out after %d seconds", a.cfg.StopGracePeriodSeconds)) } return errors.Join(errs...) diff --git a/server/internal/workflows/worker.go b/server/internal/workflows/worker.go index 980fac6c..2ee9dae3 100644 --- a/server/internal/workflows/worker.go +++ b/server/internal/workflows/worker.go @@ -20,6 +20,8 @@ type Orchestrator interface { type Worker struct { worker *worker.Worker workflows *Workflows + ctx context.Context + cancel context.CancelFunc } func NewWorker(be backend.Backend, workflows *Workflows, orch Orchestrator) (*Worker, error) { @@ -46,13 +48,29 @@ func NewWorker(be backend.Backend, workflows *Workflows, orch Orchestrator) (*Wo } func (w *Worker) Start(ctx context.Context) error { - if err := w.worker.Start(ctx); err != nil { + if w.cancel != nil { + return fmt.Errorf("workflows worker already started") + } + + // The parent context isn't canceled until the stop grace period times out, + // so we start the worker with a child context that we can cancel ourselves. + childCtx, cancel := context.WithCancel(ctx) + + if err := w.worker.Start(childCtx); err != nil { + cancel() return fmt.Errorf("failed to start worker: %w", err) } + w.ctx = childCtx + w.cancel = cancel return nil } func (w *Worker) Shutdown() error { + + if w.cancel != nil { + w.cancel() + } + if err := w.worker.WaitForCompletion(); err != nil { return fmt.Errorf("failed to wait for active tasks to complete: %w", err) } From 49fe78aecfa3233c746870e709ce135bf99c6a05 Mon Sep 17 00:00:00 2001 From: Jason Lynch Date: Wed, 21 Jan 2026 09:47:27 -0500 Subject: [PATCH 2/2] feat: logging improvements Simplifies our startup and shut down log output by limiting it to general process status and services that are opening ports. PLAT-331 --- .../internal/api/apiv1/post_init_handlers.go | 2 +- server/internal/api/http.go | 16 ++++++++-------- server/internal/api/server.go | 18 +++++++++++++----- server/internal/app/app.go | 9 +++++++++ server/internal/etcd/embedded.go | 15 ++++++++++++++- server/internal/etcd/provide.go | 2 +- server/internal/etcd/remote.go | 8 +++++++- server/internal/migrate/runner.go | 6 +++--- server/internal/monitor/service.go | 4 +++- server/internal/scheduler/service.go | 10 +++++----- server/internal/workflows/provide.go | 6 +++++- server/internal/workflows/worker.go | 10 +++++++++- 12 files changed, 78 insertions(+), 28 deletions(-) diff --git a/server/internal/api/apiv1/post_init_handlers.go b/server/internal/api/apiv1/post_init_handlers.go index b6779a58..ad48af00 100644 --- a/server/internal/api/apiv1/post_init_handlers.go +++ b/server/internal/api/apiv1/post_init_handlers.go @@ -569,7 +569,7 @@ func (s *PostInitHandlers) SwitchoverDatabaseNode(ctx context.Context, req *api. } input.ScheduledAt = tm.UTC() - s.logger.Info(). + s.logger.Debug(). Str("database_id", databaseID). Str("scheduled_at", input.ScheduledAt.String()). Msg("parsed scheduled_at") diff --git a/server/internal/api/http.go b/server/internal/api/http.go index f7ea5ba0..59dcb064 100644 --- a/server/internal/api/http.go +++ b/server/internal/api/http.go @@ -45,11 +45,11 @@ func (s *httpServer) start() { } func (s *httpServer) listenAndServe() { - go func() { - s.logger.Info(). - Str("host_port", s.server.Addr). - Msg("starting http server") + s.logger.Info(). + Str("host_port", s.server.Addr). + Msg("starting http server") + go func() { if err := s.server.ListenAndServe(); err != nil { s.errCh <- fmt.Errorf("http server error: %w", err) } @@ -57,11 +57,11 @@ func (s *httpServer) listenAndServe() { } func (s *httpServer) listenAndServeTLS() { - go func() { - s.logger.Info(). - Str("host_port", s.server.Addr). - Msg("starting https server") + s.logger.Info(). + Str("host_port", s.server.Addr). + Msg("starting https server") + go func() { rootCA, err := os.ReadFile(s.cfg.CACert) if err != nil { s.errCh <- fmt.Errorf("failed to read CA cert: %w", err) diff --git a/server/internal/api/server.go b/server/internal/api/server.go index c897a2f2..c89eaed2 100644 --- a/server/internal/api/server.go +++ b/server/internal/api/server.go @@ -17,6 +17,7 @@ import ( var _ do.Shutdownable = (*Server)(nil) type Server struct { + logger zerolog.Logger started bool cfg config.Config v1Svc *apiv1.Service @@ -63,15 +64,18 @@ func NewServer( } return &Server{ - cfg: cfg, - v1Svc: v1Svc, - http: httpSvr, - mqtt: mqttSvr, - errCh: make(chan error, 2), + logger: logger, + cfg: cfg, + v1Svc: v1Svc, + http: httpSvr, + mqtt: mqttSvr, + errCh: make(chan error, 2), } } func (s *Server) ServePreInit(ctx context.Context) error { + s.logger.Debug().Msg("serving pre-init handlers") + if err := s.v1Svc.UsePreInitHandlers(); err != nil { return fmt.Errorf("failed to set v1 api to use pre-init handlers: %w", err) } @@ -82,6 +86,8 @@ func (s *Server) ServePreInit(ctx context.Context) error { } func (s *Server) ServePostInit(ctx context.Context) error { + s.logger.Debug().Msg("serving post-init handlers") + if err := s.v1Svc.UsePostInitHandlers(); err != nil { return fmt.Errorf("failed to set v1 api to use post-init handlers: %w", err) } @@ -129,6 +135,8 @@ func (s *Server) serve(ctx context.Context) { } func (s *Server) Shutdown() error { + s.logger.Debug().Msg("shutting down api server") + ctx := context.Background() var errs []error diff --git a/server/internal/app/app.go b/server/internal/app/app.go index 6b897e37..efcf81bd 100644 --- a/server/internal/app/app.go +++ b/server/internal/app/app.go @@ -124,6 +124,10 @@ func (a *App) runPreInitialization(parentCtx context.Context) error { return fmt.Errorf("failed to serve pre-init API: %w", err) } + a.logger.Info(). + Str("state", "uninitialized"). + Msg("server ready") + select { case <-parentCtx.Done(): a.logger.Info().Msg("got shutdown signal") @@ -204,6 +208,10 @@ func (a *App) runInitialized(parentCtx context.Context) error { return handleError(fmt.Errorf("failed to serve post-init API: %w", err)) } + a.logger.Info(). + Str("state", "initialized"). + Msg("server ready") + select { case <-parentCtx.Done(): a.logger.Info().Msg("got shutdown signal") @@ -214,6 +222,7 @@ func (a *App) runInitialized(parentCtx context.Context) error { } func (a *App) shutdown(reason error) error { + defer a.logger.Info().Msg("shutdown complete") defer a.serviceCtxCancel() if reason != nil { diff --git a/server/internal/etcd/embedded.go b/server/internal/etcd/embedded.go index 5b01e583..29df44c9 100644 --- a/server/internal/etcd/embedded.go +++ b/server/internal/etcd/embedded.go @@ -44,8 +44,10 @@ type EmbeddedEtcd struct { func NewEmbeddedEtcd(cfg *config.Manager, logger zerolog.Logger) *EmbeddedEtcd { return &EmbeddedEtcd{ cfg: cfg, - logger: logger, initialized: make(chan struct{}), + logger: logger.With(). + Str("component", "etcd_server"). + Logger(), } } @@ -155,6 +157,10 @@ func (e *EmbeddedEtcd) initialize(ctx context.Context) error { func (e *EmbeddedEtcd) start(ctx context.Context) error { appCfg := e.cfg.Config() + e.logger.Info(). + Int("peer_port", appCfg.EtcdServer.PeerPort). + Int("client_port", appCfg.EtcdServer.ClientPort). + Msg("starting embedded etcd server") etcdCfg, err := embedConfig(appCfg, e.logger) if err != nil { @@ -264,6 +270,11 @@ func (e *EmbeddedEtcd) Join(ctx context.Context, options JoinOptions) error { etcdCfg.InitialCluster = strings.Join(peers, ",") etcdCfg.ClusterState = embed.ClusterStateFlagExisting + e.logger.Info(). + Int("peer_port", appCfg.EtcdServer.PeerPort). + Int("client_port", appCfg.EtcdServer.ClientPort). + Msg("starting embedded etcd server") + etcd, err := startEmbedded(ctx, etcdCfg) if err != nil { return err @@ -299,6 +310,8 @@ func (e *EmbeddedEtcd) Initialized() <-chan struct{} { } func (e *EmbeddedEtcd) Shutdown() error { + e.logger.Info().Msg("shutting down embedded etcd server") + var errs []error if e.client != nil { if err := e.client.Close(); err != nil { diff --git a/server/internal/etcd/provide.go b/server/internal/etcd/provide.go index a634d80d..eed6271a 100644 --- a/server/internal/etcd/provide.go +++ b/server/internal/etcd/provide.go @@ -58,7 +58,7 @@ func provideEtcd(i *do.Injector) { oldMode := generated.EtcdMode newMode := appCfg.EtcdMode - logger.Info(). + logger.Debug(). Str("old_mode", string(oldMode)). Str("new_mode", string(newMode)). Bool("old_mode_empty", oldMode == ""). diff --git a/server/internal/etcd/remote.go b/server/internal/etcd/remote.go index 208eca2e..4960b0e9 100644 --- a/server/internal/etcd/remote.go +++ b/server/internal/etcd/remote.go @@ -34,9 +34,11 @@ type RemoteEtcd struct { func NewRemoteEtcd(cfg *config.Manager, logger zerolog.Logger) *RemoteEtcd { return &RemoteEtcd{ cfg: cfg, - logger: logger, initialized: make(chan struct{}), err: make(chan error), + logger: logger.With(). + Str("component", "etcd_client"). + Logger(), } } @@ -46,6 +48,8 @@ func (r *RemoteEtcd) IsInitialized() (bool, error) { } func (r *RemoteEtcd) Start(ctx context.Context) error { + r.logger.Debug().Msg("starting etcd client") + initialized, err := r.IsInitialized() if err != nil { return err @@ -196,6 +200,8 @@ func (r *RemoteEtcd) VerifyJoinToken(in string) error { } func (r *RemoteEtcd) Shutdown() error { + r.logger.Debug().Msg("shutting down etcd client") + if r.client == nil { return nil } diff --git a/server/internal/migrate/runner.go b/server/internal/migrate/runner.go index 4ed4339e..d7e46f10 100644 --- a/server/internal/migrate/runner.go +++ b/server/internal/migrate/runner.go @@ -146,7 +146,7 @@ func (r *Runner) runMigrations(ctx context.Context) error { startIndex := r.findStartIndex(currentRevision) if startIndex >= len(r.migrations) { - r.logger.Info().Msg("control-plane db is up to date, no migrations to run") + r.logger.Debug().Msg("control-plane db is up to date, no migrations to run") return nil } @@ -199,7 +199,7 @@ func (r *Runner) findStartIndex(currentRevision string) int { func (r *Runner) hasPendingMigrations(ctx context.Context) (bool, error) { if len(r.migrations) == 0 { - r.logger.Info().Msg("no migrations to run") + r.logger.Debug().Msg("no migrations to run") return false, nil } @@ -210,7 +210,7 @@ func (r *Runner) hasPendingMigrations(ctx context.Context) (bool, error) { startIndex := r.findStartIndex(currentRevision) if startIndex >= len(r.migrations) { - r.logger.Info().Msg("control-plane db is up to date, no migrations to run") + r.logger.Debug().Msg("control-plane db is up to date, no migrations to run") return false, nil } diff --git a/server/internal/monitor/service.go b/server/internal/monitor/service.go index 23e88766..59e7c68a 100644 --- a/server/internal/monitor/service.go +++ b/server/internal/monitor/service.go @@ -49,6 +49,8 @@ func NewService( } func (s *Service) Start(ctx context.Context) error { + s.logger.Debug().Msg("starting monitors") + // The monitors should run for the lifetime of the application rather than // the lifetime of a single operation. s.appCtx = ctx @@ -72,7 +74,7 @@ func (s *Service) Start(ctx context.Context) error { } func (s *Service) Shutdown() error { - s.logger.Info().Msg("shutting down instance monitors") + s.logger.Debug().Msg("shutting down monitors") for _, mon := range s.instances { mon.Stop() diff --git a/server/internal/scheduler/service.go b/server/internal/scheduler/service.go index 1fefc650..2300508b 100644 --- a/server/internal/scheduler/service.go +++ b/server/internal/scheduler/service.go @@ -47,7 +47,7 @@ func NewService( } func (s *Service) Start(ctx context.Context) error { - s.logger.Info().Msg("starting scheduler service") + s.logger.Debug().Msg("starting scheduler service") if err := s.elector.Start(ctx); err != nil { return fmt.Errorf("failed to start elector: %w", err) @@ -83,11 +83,11 @@ func (s *Service) Start(ctx context.Context) error { } func (s *Service) Shutdown() error { - s.logger.Info().Msg("shutting down scheduler service") + s.logger.Debug().Msg("shutting down scheduler service") s.scheduler.Stop() if s.watchOp != nil { - s.logger.Info().Msg("closing scheduled job watch") + s.logger.Debug().Msg("closing scheduled job watch") s.watchOp.Close() } @@ -115,7 +115,7 @@ func (s *Service) registerJob(ctx context.Context, job *StoredScheduledJob) erro } s.runners[job.ID] = gocronJob - s.logger.Info().Str("job_id", job.ID).Msg("registered scheduled job") + s.logger.Debug().Str("job_id", job.ID).Msg("registered scheduled job") return nil } @@ -126,7 +126,7 @@ func (s *Service) UnregisterJob(jobID string) { if job, ok := s.runners[jobID]; ok { s.scheduler.RemoveByReference(job) delete(s.runners, jobID) - s.logger.Info().Str("job_id", jobID).Msg("unregistered scheduled job") + s.logger.Debug().Str("job_id", jobID).Msg("unregistered scheduled job") } } diff --git a/server/internal/workflows/provide.go b/server/internal/workflows/provide.go index edc10a92..8e0160c6 100644 --- a/server/internal/workflows/provide.go +++ b/server/internal/workflows/provide.go @@ -25,6 +25,10 @@ func Provide(i *do.Injector) { func provideWorker(i *do.Injector) { do.Provide(i, func(i *do.Injector) (*Worker, error) { + logger, err := do.Invoke[zerolog.Logger](i) + if err != nil { + return nil, err + } be, err := do.Invoke[backend.Backend](i) if err != nil { return nil, err @@ -37,7 +41,7 @@ func provideWorker(i *do.Injector) { if err != nil { return nil, err } - return NewWorker(be, workflows, orch) + return NewWorker(logger, be, workflows, orch) }) } diff --git a/server/internal/workflows/worker.go b/server/internal/workflows/worker.go index 2ee9dae3..3743b360 100644 --- a/server/internal/workflows/worker.go +++ b/server/internal/workflows/worker.go @@ -8,6 +8,7 @@ import ( "github.com/cschleiden/go-workflows/backend" "github.com/cschleiden/go-workflows/worker" "github.com/cschleiden/go-workflows/workflow" + "github.com/rs/zerolog" "github.com/samber/do" ) @@ -18,13 +19,14 @@ type Orchestrator interface { } type Worker struct { + logger zerolog.Logger worker *worker.Worker workflows *Workflows ctx context.Context cancel context.CancelFunc } -func NewWorker(be backend.Backend, workflows *Workflows, orch Orchestrator) (*Worker, error) { +func NewWorker(logger zerolog.Logger, be backend.Backend, workflows *Workflows, orch Orchestrator) (*Worker, error) { queues, err := orch.WorkerQueues() if err != nil { return nil, fmt.Errorf("failed to get worker queues: %w", err) @@ -42,6 +44,9 @@ func NewWorker(be backend.Backend, workflows *Workflows, orch Orchestrator) (*Wo } return &Worker{ + logger: logger.With(). + Str("component", "workflows_worker"). + Logger(), worker: w, workflows: workflows, }, nil @@ -52,6 +57,8 @@ func (w *Worker) Start(ctx context.Context) error { return fmt.Errorf("workflows worker already started") } + w.logger.Debug().Msg("starting workflows worker") + // The parent context isn't canceled until the stop grace period times out, // so we start the worker with a child context that we can cancel ourselves. childCtx, cancel := context.WithCancel(ctx) @@ -66,6 +73,7 @@ func (w *Worker) Start(ctx context.Context) error { } func (w *Worker) Shutdown() error { + w.logger.Debug().Msg("shutting down workflow worker") if w.cancel != nil { w.cancel()