Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
24 changes: 13 additions & 11 deletions pkg/compactor/blocks_cleaner.go
Original file line number Diff line number Diff line change
Expand Up @@ -319,7 +319,7 @@ func (c *BlocksCleaner) runEmitPartitionMetricsWorker(ctx context.Context, jobCh
err := concurrency.ForEachUser(ctx, job.users, c.cfg.CleanupConcurrency, func(ctx context.Context, userID string) error {
userLogger := util_log.WithUserID(userID, c.logger)
userBucket := bucket.NewUserBucketClient(userID, c.bucketClient, c.cfgProvider)
c.emitUserParititionMetrics(ctx, userLogger, userBucket, userID)
c.emitUserPartitionMetrics(ctx, userLogger, userBucket, userID)
return nil
})

Expand Down Expand Up @@ -789,13 +789,14 @@ func (c *BlocksCleaner) cleanPartitionedGroupInfo(ctx context.Context, userBucke
isPartitionGroupInfoDeleted := false
partitionedGroupInfoFile := extraInfo.path
deletedBlocksCount := 0
partitionedGroupLogger := log.With(userLogger, "partitioned_group_id", partitionedGroupInfo.PartitionedGroupID, "partitioned_group_creation_time", partitionedGroupInfo.CreationTimeString())

if extraInfo.status.CanDelete {
if extraInfo.status.IsCompleted {
// Try to remove all blocks included in partitioned group info
deletedBlocksCount, err = partitionedGroupInfo.markAllBlocksForDeletion(ctx, userBucket, userLogger, c.blocksMarkedForDeletion, userID)
if err != nil {
level.Warn(userLogger).Log("msg", "unable to mark all blocks in partitioned group info for deletion", "partitioned_group_id", partitionedGroupInfo.PartitionedGroupID)
level.Warn(partitionedGroupLogger).Log("msg", "unable to mark all blocks in partitioned group info for deletion")
// if one block can not be marked for deletion, we should
// skip delete this partitioned group. next iteration
// would try it again.
Expand All @@ -804,13 +805,13 @@ func (c *BlocksCleaner) cleanPartitionedGroupInfo(ctx context.Context, userBucke
}

if deletedBlocksCount > 0 {
level.Info(userLogger).Log("msg", "parent blocks deleted, will delete partition group file in next cleaning cycle", "partitioned_group_id", partitionedGroupInfo.PartitionedGroupID)
level.Info(partitionedGroupLogger).Log("msg", "parent blocks deleted, will delete partition group file in next cleaning cycle")
} else {
level.Info(userLogger).Log("msg", "deleting partition group because either all associated blocks have been deleted or partition group is invalid", "partitioned_group_id", partitionedGroupInfo.PartitionedGroupID)
level.Info(partitionedGroupLogger).Log("msg", "deleting partition group because either all associated blocks have been deleted or partition group is invalid")
if err := userBucket.Delete(ctx, partitionedGroupInfoFile); err != nil {
level.Warn(userLogger).Log("msg", "failed to delete partitioned group info", "partitioned_group_info", partitionedGroupInfoFile, "err", err)
level.Warn(partitionedGroupLogger).Log("msg", "failed to delete partitioned group info", "partitioned_group_file", partitionedGroupInfoFile, "err", err)
} else {
level.Info(userLogger).Log("msg", "deleted partitioned group info", "partitioned_group_info", partitionedGroupInfoFile)
level.Info(partitionedGroupLogger).Log("msg", "deleted partitioned group info", "partitioned_group_file", partitionedGroupInfoFile)
isPartitionGroupInfoDeleted = true
}
}
Expand All @@ -819,15 +820,15 @@ func (c *BlocksCleaner) cleanPartitionedGroupInfo(ctx context.Context, userBucke
if isPartitionGroupInfoDeleted && (extraInfo.status.CanDelete || extraInfo.status.DeleteVisitMarker) {
// Remove partition visit markers
if _, err := bucket.DeletePrefix(ctx, userBucket, GetPartitionVisitMarkerDirectoryPath(partitionedGroupInfo.PartitionedGroupID), userLogger, defaultDeleteBlocksConcurrency); err != nil {
level.Warn(userLogger).Log("msg", "failed to delete partition visit markers for partitioned group", "partitioned_group_info", partitionedGroupInfoFile, "err", err)
level.Warn(partitionedGroupLogger).Log("msg", "failed to delete partition visit markers for partitioned group", "err", err)
} else {
level.Info(userLogger).Log("msg", "deleted partition visit markers for partitioned group", "partitioned_group_info", partitionedGroupInfoFile)
level.Info(partitionedGroupLogger).Log("msg", "deleted partition visit markers for partitioned group")
}
}
}
}

func (c *BlocksCleaner) emitUserParititionMetrics(ctx context.Context, userLogger log.Logger, userBucket objstore.InstrumentedBucket, userID string) {
func (c *BlocksCleaner) emitUserPartitionMetrics(ctx context.Context, userLogger log.Logger, userBucket objstore.InstrumentedBucket, userID string) {
existentPartitionedGroupInfo, err := c.iterPartitionGroups(ctx, userBucket, userLogger)
if err != nil {
level.Warn(userLogger).Log("msg", "error listing partitioned group directory to emit metrics", "err", err)
Expand All @@ -842,7 +843,7 @@ func (c *BlocksCleaner) emitUserParititionMetrics(ctx context.Context, userLogge
c.inProgressCompactions.WithLabelValues(userID).Set(float64(inProgressCompactions))
if oldestPartitionGroup != nil {
c.oldestPartitionGroupOffset.WithLabelValues(userID).Set(float64(time.Now().Unix() - oldestPartitionGroup.CreationTime))
level.Debug(userLogger).Log("msg", "partition group info with oldest creation time", "partitioned_group_id", oldestPartitionGroup.PartitionedGroupID, "creation_time", oldestPartitionGroup.CreationTime)
level.Debug(userLogger).Log("msg", "partition group info with oldest creation time", "partitioned_group_id", oldestPartitionGroup.PartitionedGroupID, "creation_time", oldestPartitionGroup.CreationTimeString())
} else {
c.oldestPartitionGroupOffset.WithLabelValues(userID).Set(0)
}
Expand Down Expand Up @@ -874,8 +875,9 @@ func (c *BlocksCleaner) iterPartitionGroups(ctx context.Context, userBucket objs
return nil
}

partitionedGroupLogger := log.With(userLogger, "partitioned_group_id", partitionedGroupInfo.PartitionedGroupID, "partitioned_group_creation_time", partitionedGroupInfo.CreationTimeString())
status := partitionedGroupInfo.getPartitionedGroupStatus(ctx, userBucket, c.compactionVisitMarkerTimeout, userLogger)
level.Debug(userLogger).Log("msg", "got partitioned group status", "partitioned_group_status", status.String())
level.Debug(partitionedGroupLogger).Log("msg", "got partitioned group status", "partitioned_group_status", status.String())
existentPartitionedGroupInfo[partitionedGroupInfo] = struct {
path string
status PartitionedGroupStatus
Expand Down
2 changes: 1 addition & 1 deletion pkg/compactor/blocks_cleaner_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -1256,7 +1256,7 @@ func TestBlocksCleaner_EmitUserMetrics(t *testing.T) {
err = v4Manager.updateVisitMarker(ctx)
require.NoError(t, err)

cleaner.emitUserParititionMetrics(ctx, logger, userBucket, userID)
cleaner.emitUserPartitionMetrics(ctx, logger, userBucket, userID)

metricNames := []string{
"cortex_compactor_remaining_planned_compactions",
Expand Down
16 changes: 8 additions & 8 deletions pkg/compactor/partition_compaction_grouper.go
Original file line number Diff line number Diff line change
Expand Up @@ -171,7 +171,7 @@ func (g *PartitionCompactionGrouper) generateCompactionJobs(blocks map[ulid.ULID
var blockIDs []string
for _, p := range existingPartitionedGroups {
blockIDs = p.getAllBlockIDs()
level.Info(g.logger).Log("msg", "existing partitioned group", "partitioned_group_id", p.PartitionedGroupID, "partition_count", p.PartitionCount, "rangeStart", p.rangeStartTime().String(), "rangeEnd", p.rangeEndTime().String(), "blocks", strings.Join(blockIDs, ","))
level.Info(g.logger).Log("msg", "existing partitioned group", "partitioned_group_id", p.PartitionedGroupID, "partitioned_group_creation_time", p.CreationTimeString(), "partition_count", p.PartitionCount, "rangeStart", p.rangeStartTime().String(), "rangeEnd", p.rangeEndTime().String(), "blocks", strings.Join(blockIDs, ","))
}

allPartitionedGroup, err := g.generatePartitionedGroups(blocks, groups, existingPartitionedGroups, timeRanges)
Expand All @@ -181,13 +181,13 @@ func (g *PartitionCompactionGrouper) generateCompactionJobs(blocks map[ulid.ULID
g.sortPartitionedGroups(allPartitionedGroup)
for _, p := range allPartitionedGroup {
blockIDs = p.getAllBlockIDs()
level.Info(g.logger).Log("msg", "partitioned group ready for compaction", "partitioned_group_id", p.PartitionedGroupID, "partition_count", p.PartitionCount, "rangeStart", p.rangeStartTime().String(), "rangeEnd", p.rangeEndTime().String(), "blocks", strings.Join(blockIDs, ","))
level.Info(g.logger).Log("msg", "partitioned group ready for compaction", "partitioned_group_id", p.PartitionedGroupID, "partitioned_group_creation_time", p.CreationTimeString(), "partition_count", p.PartitionCount, "rangeStart", p.rangeStartTime().String(), "rangeEnd", p.rangeEndTime().String(), "blocks", strings.Join(blockIDs, ","))
}

partitionCompactionJobs := g.generatePartitionCompactionJobs(blocks, allPartitionedGroup, g.doRandomPick)
for _, p := range partitionCompactionJobs {
blockIDs = p.getBlockIDs()
level.Info(g.logger).Log("msg", "partitioned compaction job", "partitioned_group_id", p.partitionedGroupInfo.PartitionedGroupID, "partition_id", p.partition.PartitionID, "partition_count", p.partitionedGroupInfo.PartitionCount, "rangeStart", p.rangeStartTime().String(), "rangeEnd", p.rangeEndTime().String(), "blocks", strings.Join(blockIDs, ","))
level.Info(g.logger).Log("msg", "partitioned compaction job", "partitioned_group_id", p.partitionedGroupInfo.PartitionedGroupID, "partitioned_group_creation_time", p.partitionedGroupInfo.CreationTimeString(), "partition_id", p.partition.PartitionID, "partition_count", p.partitionedGroupInfo.PartitionCount, "rangeStart", p.rangeStartTime().String(), "rangeEnd", p.rangeEndTime().String(), "blocks", strings.Join(blockIDs, ","))
}
return partitionCompactionJobs, nil
}
Expand Down Expand Up @@ -582,10 +582,10 @@ func (g *PartitionCompactionGrouper) generatePartitionCompactionJobs(blocks map[
partition := partitionedGroupInfo.Partitions[i]
if len(partition.Blocks) == 1 {
partition.Blocks = append(partition.Blocks, DUMMY_BLOCK_ID)
level.Info(g.logger).Log("msg", "handled single block in partition", "partitioned_group_id", partitionedGroupInfo.PartitionedGroupID, "partition_count", partitionedGroupInfo.PartitionCount, "partition_id", partition.PartitionID)
level.Info(g.logger).Log("msg", "handled single block in partition", "partitioned_group_id", partitionedGroupInfo.PartitionedGroupID, "partitioned_group_creation_time", partitionedGroupInfo.CreationTimeString(), "partition_count", partitionedGroupInfo.PartitionCount, "partition_id", partition.PartitionID)
} else if len(partition.Blocks) < 1 {
if err := g.handleEmptyPartition(partitionedGroupInfo, partition); err != nil {
level.Warn(g.logger).Log("msg", "failed to handle empty partition", "partitioned_group_id", partitionedGroupInfo.PartitionedGroupID, "partition_count", partitionedGroupInfo.PartitionCount, "partition_id", partition.PartitionID, "err", err)
level.Warn(g.logger).Log("msg", "failed to handle empty partition", "partitioned_group_id", partitionedGroupInfo.PartitionedGroupID, "partitioned_group_creation_time", partitionedGroupInfo.CreationTimeString(), "partition_count", partitionedGroupInfo.PartitionCount, "partition_id", partition.PartitionID, "err", err)
}
continue
}
Expand All @@ -609,7 +609,7 @@ func (g *PartitionCompactionGrouper) handleEmptyPartition(partitionedGroupInfo *
return nil
}

level.Info(g.logger).Log("msg", "handling empty block partition", "partitioned_group_id", partitionedGroupInfo.PartitionedGroupID, "partition_count", partitionedGroupInfo.PartitionCount, "partition_id", partition.PartitionID)
level.Info(g.logger).Log("msg", "handling empty block partition", "partitioned_group_id", partitionedGroupInfo.PartitionedGroupID, "partitioned_group_creation_time", partitionedGroupInfo.CreationTimeString(), "partition_count", partitionedGroupInfo.PartitionCount, "partition_id", partition.PartitionID)
visitMarker := &partitionVisitMarker{
PartitionedGroupID: partitionedGroupInfo.PartitionedGroupID,
PartitionID: partition.PartitionID,
Expand All @@ -618,7 +618,7 @@ func (g *PartitionCompactionGrouper) handleEmptyPartition(partitionedGroupInfo *
visitMarkerManager := NewVisitMarkerManager(g.bkt, g.logger, g.ringLifecyclerID, visitMarker)
visitMarkerManager.MarkWithStatus(g.ctx, Completed)

level.Info(g.logger).Log("msg", "handled empty block in partition", "partitioned_group_id", partitionedGroupInfo.PartitionedGroupID, "partition_count", partitionedGroupInfo.PartitionCount, "partition_id", partition.PartitionID)
level.Info(g.logger).Log("msg", "handled empty block in partition", "partitioned_group_id", partitionedGroupInfo.PartitionedGroupID, "partitioned_group_creation_time", partitionedGroupInfo.CreationTimeString(), "partition_count", partitionedGroupInfo.PartitionCount, "partition_id", partition.PartitionID)
return nil
}

Expand Down Expand Up @@ -720,7 +720,7 @@ func (g *PartitionCompactionGrouper) pickPartitionCompactionJob(partitionCompact
for _, p := range outGroups {
partitionInfo, err := tsdb.ConvertToPartitionInfo(p.Extensions())
if err == nil && partitionInfo != nil {
level.Info(g.logger).Log("msg", "picked compaction job", "partitioned_group_id", partitionInfo.PartitionedGroupID, "partition_count", partitionInfo.PartitionCount)
level.Info(g.logger).Log("msg", "picked compaction job", "partitioned_group_id", partitionInfo.PartitionedGroupID, "partitioned_group_creation_time", partitionInfo.CreationTimeString(), "partition_count", partitionInfo.PartitionCount)
}
}
return outGroups
Expand Down
Loading
Loading