diff --git a/pkg/compactor/blocks_cleaner.go b/pkg/compactor/blocks_cleaner.go index b2b9cf02777..22fe7abf915 100644 --- a/pkg/compactor/blocks_cleaner.go +++ b/pkg/compactor/blocks_cleaner.go @@ -319,7 +319,7 @@ func (c *BlocksCleaner) runEmitPartitionMetricsWorker(ctx context.Context, jobCh err := concurrency.ForEachUser(ctx, job.users, c.cfg.CleanupConcurrency, func(ctx context.Context, userID string) error { userLogger := util_log.WithUserID(userID, c.logger) userBucket := bucket.NewUserBucketClient(userID, c.bucketClient, c.cfgProvider) - c.emitUserParititionMetrics(ctx, userLogger, userBucket, userID) + c.emitUserPartitionMetrics(ctx, userLogger, userBucket, userID) return nil }) @@ -789,13 +789,14 @@ func (c *BlocksCleaner) cleanPartitionedGroupInfo(ctx context.Context, userBucke isPartitionGroupInfoDeleted := false partitionedGroupInfoFile := extraInfo.path deletedBlocksCount := 0 + partitionedGroupLogger := log.With(userLogger, "partitioned_group_id", partitionedGroupInfo.PartitionedGroupID, "partitioned_group_creation_time", partitionedGroupInfo.CreationTimeString()) if extraInfo.status.CanDelete { if extraInfo.status.IsCompleted { // Try to remove all blocks included in partitioned group info deletedBlocksCount, err = partitionedGroupInfo.markAllBlocksForDeletion(ctx, userBucket, userLogger, c.blocksMarkedForDeletion, userID) if err != nil { - level.Warn(userLogger).Log("msg", "unable to mark all blocks in partitioned group info for deletion", "partitioned_group_id", partitionedGroupInfo.PartitionedGroupID) + level.Warn(partitionedGroupLogger).Log("msg", "unable to mark all blocks in partitioned group info for deletion") // if one block can not be marked for deletion, we should // skip delete this partitioned group. next iteration // would try it again. @@ -804,13 +805,13 @@ func (c *BlocksCleaner) cleanPartitionedGroupInfo(ctx context.Context, userBucke } if deletedBlocksCount > 0 { - level.Info(userLogger).Log("msg", "parent blocks deleted, will delete partition group file in next cleaning cycle", "partitioned_group_id", partitionedGroupInfo.PartitionedGroupID) + level.Info(partitionedGroupLogger).Log("msg", "parent blocks deleted, will delete partition group file in next cleaning cycle") } else { - level.Info(userLogger).Log("msg", "deleting partition group because either all associated blocks have been deleted or partition group is invalid", "partitioned_group_id", partitionedGroupInfo.PartitionedGroupID) + level.Info(partitionedGroupLogger).Log("msg", "deleting partition group because either all associated blocks have been deleted or partition group is invalid") if err := userBucket.Delete(ctx, partitionedGroupInfoFile); err != nil { - level.Warn(userLogger).Log("msg", "failed to delete partitioned group info", "partitioned_group_info", partitionedGroupInfoFile, "err", err) + level.Warn(partitionedGroupLogger).Log("msg", "failed to delete partitioned group info", "partitioned_group_file", partitionedGroupInfoFile, "err", err) } else { - level.Info(userLogger).Log("msg", "deleted partitioned group info", "partitioned_group_info", partitionedGroupInfoFile) + level.Info(partitionedGroupLogger).Log("msg", "deleted partitioned group info", "partitioned_group_file", partitionedGroupInfoFile) isPartitionGroupInfoDeleted = true } } @@ -819,15 +820,15 @@ func (c *BlocksCleaner) cleanPartitionedGroupInfo(ctx context.Context, userBucke if isPartitionGroupInfoDeleted && (extraInfo.status.CanDelete || extraInfo.status.DeleteVisitMarker) { // Remove partition visit markers if _, err := bucket.DeletePrefix(ctx, userBucket, GetPartitionVisitMarkerDirectoryPath(partitionedGroupInfo.PartitionedGroupID), userLogger, defaultDeleteBlocksConcurrency); err != nil { - level.Warn(userLogger).Log("msg", "failed to delete partition visit markers for partitioned group", "partitioned_group_info", partitionedGroupInfoFile, "err", err) + level.Warn(partitionedGroupLogger).Log("msg", "failed to delete partition visit markers for partitioned group", "err", err) } else { - level.Info(userLogger).Log("msg", "deleted partition visit markers for partitioned group", "partitioned_group_info", partitionedGroupInfoFile) + level.Info(partitionedGroupLogger).Log("msg", "deleted partition visit markers for partitioned group") } } } } -func (c *BlocksCleaner) emitUserParititionMetrics(ctx context.Context, userLogger log.Logger, userBucket objstore.InstrumentedBucket, userID string) { +func (c *BlocksCleaner) emitUserPartitionMetrics(ctx context.Context, userLogger log.Logger, userBucket objstore.InstrumentedBucket, userID string) { existentPartitionedGroupInfo, err := c.iterPartitionGroups(ctx, userBucket, userLogger) if err != nil { level.Warn(userLogger).Log("msg", "error listing partitioned group directory to emit metrics", "err", err) @@ -842,7 +843,7 @@ func (c *BlocksCleaner) emitUserParititionMetrics(ctx context.Context, userLogge c.inProgressCompactions.WithLabelValues(userID).Set(float64(inProgressCompactions)) if oldestPartitionGroup != nil { c.oldestPartitionGroupOffset.WithLabelValues(userID).Set(float64(time.Now().Unix() - oldestPartitionGroup.CreationTime)) - level.Debug(userLogger).Log("msg", "partition group info with oldest creation time", "partitioned_group_id", oldestPartitionGroup.PartitionedGroupID, "creation_time", oldestPartitionGroup.CreationTime) + level.Debug(userLogger).Log("msg", "partition group info with oldest creation time", "partitioned_group_id", oldestPartitionGroup.PartitionedGroupID, "creation_time", oldestPartitionGroup.CreationTimeString()) } else { c.oldestPartitionGroupOffset.WithLabelValues(userID).Set(0) } @@ -874,8 +875,9 @@ func (c *BlocksCleaner) iterPartitionGroups(ctx context.Context, userBucket objs return nil } + partitionedGroupLogger := log.With(userLogger, "partitioned_group_id", partitionedGroupInfo.PartitionedGroupID, "partitioned_group_creation_time", partitionedGroupInfo.CreationTimeString()) status := partitionedGroupInfo.getPartitionedGroupStatus(ctx, userBucket, c.compactionVisitMarkerTimeout, userLogger) - level.Debug(userLogger).Log("msg", "got partitioned group status", "partitioned_group_status", status.String()) + level.Debug(partitionedGroupLogger).Log("msg", "got partitioned group status", "partitioned_group_status", status.String()) existentPartitionedGroupInfo[partitionedGroupInfo] = struct { path string status PartitionedGroupStatus diff --git a/pkg/compactor/blocks_cleaner_test.go b/pkg/compactor/blocks_cleaner_test.go index 9b317474fa8..e5e5037db80 100644 --- a/pkg/compactor/blocks_cleaner_test.go +++ b/pkg/compactor/blocks_cleaner_test.go @@ -1256,7 +1256,7 @@ func TestBlocksCleaner_EmitUserMetrics(t *testing.T) { err = v4Manager.updateVisitMarker(ctx) require.NoError(t, err) - cleaner.emitUserParititionMetrics(ctx, logger, userBucket, userID) + cleaner.emitUserPartitionMetrics(ctx, logger, userBucket, userID) metricNames := []string{ "cortex_compactor_remaining_planned_compactions", diff --git a/pkg/compactor/partition_compaction_grouper.go b/pkg/compactor/partition_compaction_grouper.go index 4f1d955bfc6..839cd74e2a9 100644 --- a/pkg/compactor/partition_compaction_grouper.go +++ b/pkg/compactor/partition_compaction_grouper.go @@ -171,7 +171,7 @@ func (g *PartitionCompactionGrouper) generateCompactionJobs(blocks map[ulid.ULID var blockIDs []string for _, p := range existingPartitionedGroups { blockIDs = p.getAllBlockIDs() - level.Info(g.logger).Log("msg", "existing partitioned group", "partitioned_group_id", p.PartitionedGroupID, "partition_count", p.PartitionCount, "rangeStart", p.rangeStartTime().String(), "rangeEnd", p.rangeEndTime().String(), "blocks", strings.Join(blockIDs, ",")) + level.Info(g.logger).Log("msg", "existing partitioned group", "partitioned_group_id", p.PartitionedGroupID, "partitioned_group_creation_time", p.CreationTimeString(), "partition_count", p.PartitionCount, "rangeStart", p.rangeStartTime().String(), "rangeEnd", p.rangeEndTime().String(), "blocks", strings.Join(blockIDs, ",")) } allPartitionedGroup, err := g.generatePartitionedGroups(blocks, groups, existingPartitionedGroups, timeRanges) @@ -181,13 +181,13 @@ func (g *PartitionCompactionGrouper) generateCompactionJobs(blocks map[ulid.ULID g.sortPartitionedGroups(allPartitionedGroup) for _, p := range allPartitionedGroup { blockIDs = p.getAllBlockIDs() - level.Info(g.logger).Log("msg", "partitioned group ready for compaction", "partitioned_group_id", p.PartitionedGroupID, "partition_count", p.PartitionCount, "rangeStart", p.rangeStartTime().String(), "rangeEnd", p.rangeEndTime().String(), "blocks", strings.Join(blockIDs, ",")) + level.Info(g.logger).Log("msg", "partitioned group ready for compaction", "partitioned_group_id", p.PartitionedGroupID, "partitioned_group_creation_time", p.CreationTimeString(), "partition_count", p.PartitionCount, "rangeStart", p.rangeStartTime().String(), "rangeEnd", p.rangeEndTime().String(), "blocks", strings.Join(blockIDs, ",")) } partitionCompactionJobs := g.generatePartitionCompactionJobs(blocks, allPartitionedGroup, g.doRandomPick) for _, p := range partitionCompactionJobs { blockIDs = p.getBlockIDs() - level.Info(g.logger).Log("msg", "partitioned compaction job", "partitioned_group_id", p.partitionedGroupInfo.PartitionedGroupID, "partition_id", p.partition.PartitionID, "partition_count", p.partitionedGroupInfo.PartitionCount, "rangeStart", p.rangeStartTime().String(), "rangeEnd", p.rangeEndTime().String(), "blocks", strings.Join(blockIDs, ",")) + level.Info(g.logger).Log("msg", "partitioned compaction job", "partitioned_group_id", p.partitionedGroupInfo.PartitionedGroupID, "partitioned_group_creation_time", p.partitionedGroupInfo.CreationTimeString(), "partition_id", p.partition.PartitionID, "partition_count", p.partitionedGroupInfo.PartitionCount, "rangeStart", p.rangeStartTime().String(), "rangeEnd", p.rangeEndTime().String(), "blocks", strings.Join(blockIDs, ",")) } return partitionCompactionJobs, nil } @@ -582,10 +582,10 @@ func (g *PartitionCompactionGrouper) generatePartitionCompactionJobs(blocks map[ partition := partitionedGroupInfo.Partitions[i] if len(partition.Blocks) == 1 { partition.Blocks = append(partition.Blocks, DUMMY_BLOCK_ID) - level.Info(g.logger).Log("msg", "handled single block in partition", "partitioned_group_id", partitionedGroupInfo.PartitionedGroupID, "partition_count", partitionedGroupInfo.PartitionCount, "partition_id", partition.PartitionID) + level.Info(g.logger).Log("msg", "handled single block in partition", "partitioned_group_id", partitionedGroupInfo.PartitionedGroupID, "partitioned_group_creation_time", partitionedGroupInfo.CreationTimeString(), "partition_count", partitionedGroupInfo.PartitionCount, "partition_id", partition.PartitionID) } else if len(partition.Blocks) < 1 { if err := g.handleEmptyPartition(partitionedGroupInfo, partition); err != nil { - level.Warn(g.logger).Log("msg", "failed to handle empty partition", "partitioned_group_id", partitionedGroupInfo.PartitionedGroupID, "partition_count", partitionedGroupInfo.PartitionCount, "partition_id", partition.PartitionID, "err", err) + level.Warn(g.logger).Log("msg", "failed to handle empty partition", "partitioned_group_id", partitionedGroupInfo.PartitionedGroupID, "partitioned_group_creation_time", partitionedGroupInfo.CreationTimeString(), "partition_count", partitionedGroupInfo.PartitionCount, "partition_id", partition.PartitionID, "err", err) } continue } @@ -609,7 +609,7 @@ func (g *PartitionCompactionGrouper) handleEmptyPartition(partitionedGroupInfo * return nil } - level.Info(g.logger).Log("msg", "handling empty block partition", "partitioned_group_id", partitionedGroupInfo.PartitionedGroupID, "partition_count", partitionedGroupInfo.PartitionCount, "partition_id", partition.PartitionID) + level.Info(g.logger).Log("msg", "handling empty block partition", "partitioned_group_id", partitionedGroupInfo.PartitionedGroupID, "partitioned_group_creation_time", partitionedGroupInfo.CreationTimeString(), "partition_count", partitionedGroupInfo.PartitionCount, "partition_id", partition.PartitionID) visitMarker := &partitionVisitMarker{ PartitionedGroupID: partitionedGroupInfo.PartitionedGroupID, PartitionID: partition.PartitionID, @@ -618,7 +618,7 @@ func (g *PartitionCompactionGrouper) handleEmptyPartition(partitionedGroupInfo * visitMarkerManager := NewVisitMarkerManager(g.bkt, g.logger, g.ringLifecyclerID, visitMarker) visitMarkerManager.MarkWithStatus(g.ctx, Completed) - level.Info(g.logger).Log("msg", "handled empty block in partition", "partitioned_group_id", partitionedGroupInfo.PartitionedGroupID, "partition_count", partitionedGroupInfo.PartitionCount, "partition_id", partition.PartitionID) + level.Info(g.logger).Log("msg", "handled empty block in partition", "partitioned_group_id", partitionedGroupInfo.PartitionedGroupID, "partitioned_group_creation_time", partitionedGroupInfo.CreationTimeString(), "partition_count", partitionedGroupInfo.PartitionCount, "partition_id", partition.PartitionID) return nil } @@ -720,7 +720,7 @@ func (g *PartitionCompactionGrouper) pickPartitionCompactionJob(partitionCompact for _, p := range outGroups { partitionInfo, err := tsdb.ConvertToPartitionInfo(p.Extensions()) if err == nil && partitionInfo != nil { - level.Info(g.logger).Log("msg", "picked compaction job", "partitioned_group_id", partitionInfo.PartitionedGroupID, "partition_count", partitionInfo.PartitionCount) + level.Info(g.logger).Log("msg", "picked compaction job", "partitioned_group_id", partitionInfo.PartitionedGroupID, "partitioned_group_creation_time", partitionInfo.CreationTimeString(), "partition_count", partitionInfo.PartitionCount) } } return outGroups diff --git a/pkg/compactor/partitioned_group_info.go b/pkg/compactor/partitioned_group_info.go index 9d9d1fd7859..a43bb262301 100644 --- a/pkg/compactor/partitioned_group_info.go +++ b/pkg/compactor/partitioned_group_info.go @@ -120,6 +120,7 @@ func (p *PartitionedGroupInfo) getPartitionedGroupStatus( partitionVisitMarkerTimeout time.Duration, userLogger log.Logger, ) PartitionedGroupStatus { + partitionedGroupLogger := log.With(userLogger, "partitioned_group_id", p.PartitionedGroupID, "partitioned_group_creation_time", p.CreationTimeString()) status := PartitionedGroupStatus{ PartitionedGroupID: p.PartitionedGroupID, CanDelete: false, @@ -136,13 +137,13 @@ func (p *PartitionedGroupInfo) getPartitionedGroupStatus( PartitionedGroupID: p.PartitionedGroupID, PartitionID: partition.PartitionID, } - visitMarkerManager := NewVisitMarkerManager(userBucket, userLogger, "PartitionedGroupInfo.getPartitionedGroupStatus", visitMarker) + visitMarkerManager := NewVisitMarkerManager(userBucket, partitionedGroupLogger, "PartitionedGroupInfo.getPartitionedGroupStatus", visitMarker) partitionVisitMarkerExists := true if err := visitMarkerManager.ReadVisitMarker(ctx, visitMarker); err != nil { if errors.Is(err, errorVisitMarkerNotFound) { partitionVisitMarkerExists = false } else { - level.Warn(userLogger).Log("msg", "unable to read partition visit marker", "path", visitMarker.GetVisitMarkerFilePath(), "err", err) + level.Warn(partitionedGroupLogger).Log("msg", "unable to read partition visit marker", "path", visitMarker.GetVisitMarkerFilePath(), "err", err) return status } } @@ -183,20 +184,20 @@ func (p *PartitionedGroupInfo) getPartitionedGroupStatus( if _, ok := checkedBlocks[blockID]; ok { continue } - if !p.doesBlockExist(ctx, userBucket, userLogger, blockID) { - level.Info(userLogger).Log("msg", "delete partitioned group", "reason", "block is physically deleted", "block", blockID) + if !p.doesBlockExist(ctx, userBucket, partitionedGroupLogger, blockID) { + level.Info(partitionedGroupLogger).Log("msg", "delete partitioned group", "reason", "block is physically deleted", "block", blockID) status.CanDelete = true status.DeleteVisitMarker = true return status } - if p.isBlockDeleted(ctx, userBucket, userLogger, blockID) { - level.Info(userLogger).Log("msg", "delete partitioned group", "reason", "block is marked for deletion", "block", blockID) + if p.isBlockDeleted(ctx, userBucket, partitionedGroupLogger, blockID) { + level.Info(partitionedGroupLogger).Log("msg", "delete partitioned group", "reason", "block is marked for deletion", "block", blockID) status.CanDelete = true status.DeleteVisitMarker = true return status } - if p.isBlockNoCompact(ctx, userBucket, userLogger, blockID) { - level.Info(userLogger).Log("msg", "delete partitioned group", "reason", "block is marked for no compact", "block", blockID) + if p.isBlockNoCompact(ctx, userBucket, partitionedGroupLogger, blockID) { + level.Info(partitionedGroupLogger).Log("msg", "delete partitioned group", "reason", "block is marked for no compact", "block", blockID) status.CanDelete = true status.DeleteVisitMarker = true return status @@ -207,28 +208,28 @@ func (p *PartitionedGroupInfo) getPartitionedGroupStatus( return status } -func (p *PartitionedGroupInfo) doesBlockExist(ctx context.Context, userBucket objstore.InstrumentedBucket, userLogger log.Logger, blockID ulid.ULID) bool { +func (p *PartitionedGroupInfo) doesBlockExist(ctx context.Context, userBucket objstore.InstrumentedBucket, partitionedGroupLogger log.Logger, blockID ulid.ULID) bool { metaExists, err := userBucket.Exists(ctx, path.Join(blockID.String(), metadata.MetaFilename)) if err != nil { - level.Warn(userLogger).Log("msg", "unable to get stats of meta.json for block", "partitioned_group_id", p.PartitionedGroupID, "block", blockID.String()) + level.Warn(partitionedGroupLogger).Log("msg", "unable to get stats of meta.json for block", "block", blockID.String()) return true } return metaExists } -func (p *PartitionedGroupInfo) isBlockDeleted(ctx context.Context, userBucket objstore.InstrumentedBucket, userLogger log.Logger, blockID ulid.ULID) bool { +func (p *PartitionedGroupInfo) isBlockDeleted(ctx context.Context, userBucket objstore.InstrumentedBucket, partitionedGroupLogger log.Logger, blockID ulid.ULID) bool { deletionMarkerExists, err := userBucket.Exists(ctx, path.Join(blockID.String(), metadata.DeletionMarkFilename)) if err != nil { - level.Warn(userLogger).Log("msg", "unable to get stats of deletion-mark.json for block", "partitioned_group_id", p.PartitionedGroupID, "block", blockID.String()) + level.Warn(partitionedGroupLogger).Log("msg", "unable to get stats of deletion-mark.json for block", "block", blockID.String()) return false } return deletionMarkerExists } -func (p *PartitionedGroupInfo) isBlockNoCompact(ctx context.Context, userBucket objstore.InstrumentedBucket, userLogger log.Logger, blockID ulid.ULID) bool { +func (p *PartitionedGroupInfo) isBlockNoCompact(ctx context.Context, userBucket objstore.InstrumentedBucket, partitionedGroupLogger log.Logger, blockID ulid.ULID) bool { noCompactMarkerExists, err := userBucket.Exists(ctx, path.Join(blockID.String(), metadata.NoCompactMarkFilename)) if err != nil { - level.Warn(userLogger).Log("msg", "unable to get stats of no-compact-mark.json for block", "partitioned_group_id", p.PartitionedGroupID, "block", blockID.String()) + level.Warn(partitionedGroupLogger).Log("msg", "unable to get stats of no-compact-mark.json for block", "block", blockID.String()) return false } return noCompactMarkerExists @@ -237,17 +238,18 @@ func (p *PartitionedGroupInfo) isBlockNoCompact(ctx context.Context, userBucket func (p *PartitionedGroupInfo) markAllBlocksForDeletion(ctx context.Context, userBucket objstore.InstrumentedBucket, userLogger log.Logger, blocksMarkedForDeletion *prometheus.CounterVec, userID string) (int, error) { blocks := p.getAllBlocks() deleteBlocksCount := 0 + partitionedGroupLogger := log.With(userLogger, "partitioned_group_id", p.PartitionedGroupID, "partitioned_group_creation_time", p.CreationTimeString()) defer func() { - level.Info(userLogger).Log("msg", "total number of blocks marked for deletion during partitioned group info clean up", "count", deleteBlocksCount) + level.Info(partitionedGroupLogger).Log("msg", "total number of blocks marked for deletion during partitioned group info clean up", "count", deleteBlocksCount) }() for _, blockID := range blocks { - if p.doesBlockExist(ctx, userBucket, userLogger, blockID) && !p.isBlockDeleted(ctx, userBucket, userLogger, blockID) && !p.isBlockNoCompact(ctx, userBucket, userLogger, blockID) { - if err := block.MarkForDeletion(ctx, userLogger, userBucket, blockID, "delete block during partitioned group completion check", blocksMarkedForDeletion.WithLabelValues(userID, reasonValueRetention)); err != nil { - level.Warn(userLogger).Log("msg", "unable to mark block for deletion", "partitioned_group_id", p.PartitionedGroupID, "block", blockID.String()) + if p.doesBlockExist(ctx, userBucket, partitionedGroupLogger, blockID) && !p.isBlockDeleted(ctx, userBucket, partitionedGroupLogger, blockID) && !p.isBlockNoCompact(ctx, userBucket, partitionedGroupLogger, blockID) { + if err := block.MarkForDeletion(ctx, partitionedGroupLogger, userBucket, blockID, "delete block during partitioned group completion check", blocksMarkedForDeletion.WithLabelValues(userID, reasonValueRetention)); err != nil { + level.Warn(partitionedGroupLogger).Log("msg", "unable to mark block for deletion", "block", blockID.String()) return deleteBlocksCount, err } deleteBlocksCount++ - level.Debug(userLogger).Log("msg", "marked block for deletion during partitioned group info clean up", "partitioned_group_id", p.PartitionedGroupID, "block", blockID.String()) + level.Debug(partitionedGroupLogger).Log("msg", "marked block for deletion during partitioned group info clean up", "block", blockID.String()) } } return deleteBlocksCount, nil @@ -258,7 +260,11 @@ func (p *PartitionedGroupInfo) String() string { for _, partition := range p.Partitions { partitions = append(partitions, fmt.Sprintf("(PartitionID: %d, Blocks: %s)", partition.PartitionID, partition.Blocks)) } - return fmt.Sprintf("{PartitionedGroupID: %d, PartitionCount: %d, Partitions: %s}", p.PartitionedGroupID, p.PartitionCount, strings.Join(partitions, ", ")) + return fmt.Sprintf("{PartitionedGroupID: %d, CreationTime: %s, PartitionCount: %d, Partitions: %s}", p.PartitionedGroupID, p.CreationTimeString(), p.PartitionCount, strings.Join(partitions, ", ")) +} + +func (p *PartitionedGroupInfo) CreationTimeString() string { + return time.Unix(p.CreationTime, 0).Format(time.RFC3339) } func GetPartitionedGroupFile(partitionedGroupID uint32) string { @@ -304,7 +310,7 @@ func UpdatePartitionedGroupInfo(ctx context.Context, bkt objstore.InstrumentedBu // partitioned group info which is supposed to be the correct grouping based on latest bucket store. existingPartitionedGroup, _ := ReadPartitionedGroupInfo(ctx, bkt, logger, partitionedGroupInfo.PartitionedGroupID) if existingPartitionedGroup != nil { - level.Warn(logger).Log("msg", "partitioned group info already exists", "partitioned_group_id", partitionedGroupInfo.PartitionedGroupID) + level.Warn(logger).Log("msg", "partitioned group info already exists", "partitioned_group_id", partitionedGroupInfo.PartitionedGroupID, "partitioned_group_creation_time", partitionedGroupInfo.CreationTimeString()) return existingPartitionedGroup, nil } if partitionedGroupInfo.CreationTime <= 0 { @@ -319,6 +325,6 @@ func UpdatePartitionedGroupInfo(ctx context.Context, bkt objstore.InstrumentedBu if err := bkt.Upload(ctx, partitionedGroupFile, reader); err != nil { return nil, err } - level.Info(logger).Log("msg", "created new partitioned group info", "partitioned_group_id", partitionedGroupInfo.PartitionedGroupID) + level.Info(logger).Log("msg", "created new partitioned group info", "partitioned_group_id", partitionedGroupInfo.PartitionedGroupID, "partitioned_group_creation_time", partitionedGroupInfo.CreationTimeString()) return &partitionedGroupInfo, nil } diff --git a/pkg/compactor/sharded_block_populator.go b/pkg/compactor/sharded_block_populator.go index a8d4228d13b..c7f8932367e 100644 --- a/pkg/compactor/sharded_block_populator.go +++ b/pkg/compactor/sharded_block_populator.go @@ -52,6 +52,7 @@ func (c ShardedBlockPopulator) PopulateBlock(ctx context.Context, metrics *tsdb. }() metrics.PopulatingBlocks.Set(1) + begin := time.Now() globalMaxt := blocks[0].Meta().MaxTime g, gCtx := errgroup.WithContext(ctx) g.SetLimit(8) @@ -91,7 +92,7 @@ func (c ShardedBlockPopulator) PopulateBlock(ctx context.Context, metrics *tsdb. if err != nil { return err } - level.Debug(c.logger).Log("msg", "finished sharding", "duration", time.Since(shardStart)) + level.Debug(c.logger).Log("msg", "finished sharding", "duration", time.Since(shardStart), "duration_ms", time.Since(shardStart).Milliseconds()) // Blocks meta is half open: [min, max), so subtract 1 to ensure we don't hold samples with exact meta.MaxTime timestamp. setsMtx.Lock() sets = append(sets, tsdb.NewBlockChunkSeriesSet(meta.ULID, indexr, chunkr, tombsr, shardedPosting, meta.MinTime, meta.MaxTime-1, false)) @@ -103,7 +104,9 @@ func (c ShardedBlockPopulator) PopulateBlock(ctx context.Context, metrics *tsdb. if err := g.Wait(); err != nil { return err } + level.Info(c.logger).Log("msg", "finished sharding all blocks and created series sets", "series_sets_count", len(sets), "symbols_count", len(symbols), "duration", time.Since(begin), "duration_ms", time.Since(begin).Milliseconds()) + begin = time.Now() symbolsList := make([]string, len(symbols)) symbolIdx := 0 for symbol := range symbols { @@ -116,6 +119,7 @@ func (c ShardedBlockPopulator) PopulateBlock(ctx context.Context, metrics *tsdb. return errors.Wrap(err, "add symbol") } } + level.Info(c.logger).Log("msg", "finished sorting symbols and added to index", "symbols_count", len(symbols), "duration", time.Since(begin), "duration_ms", time.Since(begin).Milliseconds()) var ( ref = storage.SeriesRef(0) @@ -131,6 +135,8 @@ func (c ShardedBlockPopulator) PopulateBlock(ctx context.Context, metrics *tsdb. defer cancel() } + begin = time.Now() + seriesCount := 0 go func() { // Iterate over all sorted chunk series. for set.Next() { @@ -173,6 +179,7 @@ func (c ShardedBlockPopulator) PopulateBlock(ctx context.Context, metrics *tsdb. if err := indexw.AddSeries(r, s.Labels(), chks...); err != nil { return errors.Wrap(err, "add series") } + seriesCount++ meta.Stats.NumChunks += uint64(len(chks)) meta.Stats.NumSeries++ @@ -209,6 +216,7 @@ func (c ShardedBlockPopulator) PopulateBlock(ctx context.Context, metrics *tsdb. return err } } + level.Info(c.logger).Log("msg", "finished iterating all series sets", "series_sets_count", len(sets), "series_count", seriesCount, "duration", time.Since(begin), "duration_ms", time.Since(begin).Milliseconds()) if set.Err() != nil { return errors.Wrap(set.Err(), "iterate compaction set") diff --git a/pkg/storage/tsdb/meta_extensions.go b/pkg/storage/tsdb/meta_extensions.go index b6b8a7acf07..970c632839d 100644 --- a/pkg/storage/tsdb/meta_extensions.go +++ b/pkg/storage/tsdb/meta_extensions.go @@ -3,6 +3,7 @@ package tsdb import ( "fmt" "strconv" + "time" "github.com/thanos-io/thanos/pkg/block/metadata" ) @@ -19,6 +20,10 @@ type PartitionInfo struct { PartitionedGroupCreationTime int64 `json:"partitioned_group_creation_time"` } +func (p *PartitionInfo) CreationTimeString() string { + return time.Unix(p.PartitionedGroupCreationTime, 0).Format(time.RFC3339) +} + var ( DefaultPartitionInfo = PartitionInfo{ PartitionedGroupID: 0,