From c544e03894e09a1f9ebd0acda8145b39274cf3c9 Mon Sep 17 00:00:00 2001 From: "ocean.wy" Date: Mon, 29 Dec 2025 14:58:38 +0800 Subject: [PATCH 1/4] Support rocksdb metrics --- .../org/apache/fluss/metrics/MetricNames.java | 48 +++ .../org/apache/fluss/server/kv/KvManager.java | 19 + .../org/apache/fluss/server/kv/KvTablet.java | 39 +- .../fluss/server/kv/rocksdb/RocksDBKv.java | 23 +- .../server/kv/rocksdb/RocksDBKvBuilder.java | 7 +- .../server/kv/rocksdb/RocksDBMetrics.java | 374 ++++++++++++++++++ .../kv/rocksdb/RocksDBResourceContainer.java | 29 +- .../metrics/group/TableMetricGroup.java | 151 ++++++- .../group/TabletServerMetricGroup.java | 44 +++ .../apache/fluss/server/kv/KvTabletTest.java | 181 +++++++++ .../rocksdb/RocksDBResourceContainerTest.java | 3 +- .../observability/monitor-metrics.md | 128 ++++++ 12 files changed, 1036 insertions(+), 10 deletions(-) create mode 100644 fluss-server/src/main/java/org/apache/fluss/server/kv/rocksdb/RocksDBMetrics.java diff --git a/fluss-common/src/main/java/org/apache/fluss/metrics/MetricNames.java b/fluss-common/src/main/java/org/apache/fluss/metrics/MetricNames.java index b1326d46bc..60eb942e8b 100644 --- a/fluss-common/src/main/java/org/apache/fluss/metrics/MetricNames.java +++ b/fluss-common/src/main/java/org/apache/fluss/metrics/MetricNames.java @@ -131,6 +131,54 @@ public class MetricNames { public static final String KV_PRE_WRITE_BUFFER_TRUNCATE_AS_ERROR_RATE = "preWriteBufferTruncateAsErrorPerSecond"; + // -------------------------------------------------------------------------------------------- + // RocksDB metrics + // -------------------------------------------------------------------------------------------- + // Table-level RocksDB metrics (aggregated from all buckets of a table, Max aggregation) + /** Maximum write stall duration across all buckets of this table (Max aggregation). */ + public static final String ROCKSDB_WRITE_STALL_MICROS_MAX = "rocksdbWriteStallMicrosMax"; + + /** Maximum get latency across all buckets of this table (Max aggregation). */ + public static final String ROCKSDB_GET_LATENCY_MICROS_MAX = "rocksdbGetLatencyMicrosMax"; + + /** Maximum write latency across all buckets of this table (Max aggregation). */ + public static final String ROCKSDB_WRITE_LATENCY_MICROS_MAX = "rocksdbWriteLatencyMicrosMax"; + + /** Maximum number of L0 files across all buckets of this table (Max aggregation). */ + public static final String ROCKSDB_NUM_FILES_AT_LEVEL0_MAX = "rocksdbNumFilesAtLevel0Max"; + + /** Maximum flush pending indicator across all buckets of this table (Max aggregation). */ + public static final String ROCKSDB_FLUSH_PENDING_MAX = "rocksdbFlushPendingMax"; + + /** Maximum compaction pending indicator across all buckets of this table (Max aggregation). */ + public static final String ROCKSDB_COMPACTION_PENDING_MAX = "rocksdbCompactionPendingMax"; + + /** Maximum compaction time across all buckets of this table (Max aggregation). */ + public static final String ROCKSDB_COMPACTION_TIME_MICROS_MAX = + "rocksdbCompactionTimeMicrosMax"; + + // Table-level RocksDB metrics (aggregated from all buckets of a table, Sum aggregation) + /** Total bytes read across all buckets of this table (Sum aggregation). */ + public static final String ROCKSDB_BYTES_READ_TOTAL = "rocksdbBytesReadTotal"; + + /** Total bytes written across all buckets of this table (Sum aggregation). */ + public static final String ROCKSDB_BYTES_WRITTEN_TOTAL = "rocksdbBytesWrittenTotal"; + + /** Total flush bytes written across all buckets of this table (Sum aggregation). */ + public static final String ROCKSDB_FLUSH_BYTES_WRITTEN_TOTAL = "rocksdbFlushBytesWrittenTotal"; + + /** Total compaction bytes read across all buckets of this table (Sum aggregation). */ + public static final String ROCKSDB_COMPACTION_BYTES_READ_TOTAL = + "rocksdbCompactionBytesReadTotal"; + + /** Total compaction bytes written across all buckets of this table (Sum aggregation). */ + public static final String ROCKSDB_COMPACTION_BYTES_WRITTEN_TOTAL = + "rocksdbCompactionBytesWrittenTotal"; + + // Server-level RocksDB metrics (aggregated from all tables, Sum aggregation) + /** Total memory usage across all RocksDB instances in this server (Sum aggregation). */ + public static final String ROCKSDB_MEMORY_USAGE_TOTAL = "rocksdbMemoryUsageTotal"; + // -------------------------------------------------------------------------------------------- // metrics for table bucket // -------------------------------------------------------------------------------------------- diff --git a/fluss-server/src/main/java/org/apache/fluss/server/kv/KvManager.java b/fluss-server/src/main/java/org/apache/fluss/server/kv/KvManager.java index 4353530903..1879c8ff61 100644 --- a/fluss-server/src/main/java/org/apache/fluss/server/kv/KvManager.java +++ b/fluss-server/src/main/java/org/apache/fluss/server/kv/KvManager.java @@ -266,6 +266,14 @@ public KvTablet getOrCreateKv( sharedRocksDBRateLimiter); currentKvs.put(tableBucket, tablet); + // Register RocksDB metrics to TableMetricGroup for aggregation + // Note: BucketMetricGroup is already created by Replica when LogTablet is + // created + if (tablet.getRocksDBMetrics() != null) { + serverMetricGroup.registerRocksDBMetrics( + tablePath.getTablePath(), tableBucket, tablet.getRocksDBMetrics()); + } + LOG.info( "Created kv tablet for bucket {} in dir {}.", tableBucket, @@ -304,6 +312,9 @@ public void dropKv(TableBucket tableBucket) { if (dropKvTablet != null) { TablePath tablePath = dropKvTablet.getTablePath(); try { + // Unregister RocksDB metrics from TableMetricGroup + serverMetricGroup.removeTableBucketMetricGroup(tablePath, tableBucket); + dropKvTablet.drop(); if (dropKvTablet.getPartitionName() == null) { LOG.info( @@ -386,6 +397,14 @@ public KvTablet loadKv(File tabletDir, SchemaGetter schemaGetter) throws Excepti currentKvs.get(tableBucket).getKvTabletDir().getAbsolutePath())); } this.currentKvs.put(tableBucket, kvTablet); + + // Register RocksDB metrics to TableMetricGroup for aggregation + // Note: BucketMetricGroup is already created by Replica when LogTablet is created + if (kvTablet.getRocksDBMetrics() != null) { + serverMetricGroup.registerRocksDBMetrics( + physicalTablePath.getTablePath(), tableBucket, kvTablet.getRocksDBMetrics()); + } + return kvTablet; } diff --git a/fluss-server/src/main/java/org/apache/fluss/server/kv/KvTablet.java b/fluss-server/src/main/java/org/apache/fluss/server/kv/KvTablet.java index d6cc086664..aa7a5791a1 100644 --- a/fluss-server/src/main/java/org/apache/fluss/server/kv/KvTablet.java +++ b/fluss-server/src/main/java/org/apache/fluss/server/kv/KvTablet.java @@ -49,6 +49,7 @@ import org.apache.fluss.server.kv.prewrite.KvPreWriteBuffer.TruncateReason; import org.apache.fluss.server.kv.rocksdb.RocksDBKv; import org.apache.fluss.server.kv.rocksdb.RocksDBKvBuilder; +import org.apache.fluss.server.kv.rocksdb.RocksDBMetrics; import org.apache.fluss.server.kv.rocksdb.RocksDBResourceContainer; import org.apache.fluss.server.kv.rowmerger.DefaultRowMerger; import org.apache.fluss.server.kv.rowmerger.RowMerger; @@ -118,6 +119,9 @@ public final class KvTablet { // the changelog image mode for this tablet private final ChangelogImage changelogImage; + // RocksDB metrics for this tablet + @Nullable private final RocksDBMetrics rocksDBMetrics; + /** * The kv data in pre-write buffer whose log offset is less than the flushedLogOffset has been * flushed into kv. @@ -142,7 +146,8 @@ private KvTablet( RowMerger rowMerger, ArrowCompressionInfo arrowCompressionInfo, SchemaGetter schemaGetter, - ChangelogImage changelogImage) { + ChangelogImage changelogImage, + @Nullable RocksDBMetrics rocksDBMetrics) { this.physicalPath = physicalPath; this.tableBucket = tableBucket; this.logTablet = logTablet; @@ -158,6 +163,7 @@ private KvTablet( this.arrowCompressionInfo = arrowCompressionInfo; this.schemaGetter = schemaGetter; this.changelogImage = changelogImage; + this.rocksDBMetrics = rocksDBMetrics; } public static KvTablet create( @@ -177,6 +183,19 @@ public static KvTablet create( RateLimiter sharedRateLimiter) throws IOException { RocksDBKv kv = buildRocksDBKv(serverConf, kvTabletDir, sharedRateLimiter); + + // Create RocksDB metrics accessor (will be registered to TableMetricGroup by KvManager) + // Pass ResourceGuard to ensure thread-safe access during concurrent close operations + // Pass ColumnFamilyHandle for column family specific properties like num-files-at-level0 + // Pass Cache for accurate block cache memory tracking + RocksDBMetrics rocksDBMetrics = + new RocksDBMetrics( + kv.getDb(), + kv.getStatistics(), + kv.getResourceGuard(), + kv.getDefaultColumnFamilyHandle(), + kv.getBlockCache()); + return new KvTablet( tablePath, tableBucket, @@ -192,14 +211,16 @@ public static KvTablet create( rowMerger, arrowCompressionInfo, schemaGetter, - changelogImage); + changelogImage, + rocksDBMetrics); } private static RocksDBKv buildRocksDBKv( Configuration configuration, File kvDir, RateLimiter sharedRateLimiter) throws IOException { + // Enable statistics to support RocksDB metrics RocksDBResourceContainer rocksDBResourceContainer = - new RocksDBResourceContainer(configuration, kvDir, false, sharedRateLimiter); + new RocksDBResourceContainer(configuration, kvDir, true, sharedRateLimiter); RocksDBKvBuilder rocksDBKvBuilder = new RocksDBKvBuilder( kvDir, @@ -225,6 +246,16 @@ public File getKvTabletDir() { return kvTabletDir; } + /** + * Get RocksDB metrics accessor for this tablet. + * + * @return the RocksDB metrics accessor, or null if not available + */ + @Nullable + public RocksDBMetrics getRocksDBMetrics() { + return rocksDBMetrics; + } + void setFlushedLogOffset(long flushedLogOffset) { this.flushedLogOffset = flushedLogOffset; } @@ -621,6 +652,8 @@ public void close() throws Exception { if (isClosed) { return; } + // Note: RocksDB metrics lifecycle is managed by TableMetricGroup + // No need to close it here if (rocksDBKv != null) { rocksDBKv.close(); } diff --git a/fluss-server/src/main/java/org/apache/fluss/server/kv/rocksdb/RocksDBKv.java b/fluss-server/src/main/java/org/apache/fluss/server/kv/rocksdb/RocksDBKv.java index 602a4b5910..6e45b2396a 100644 --- a/fluss-server/src/main/java/org/apache/fluss/server/kv/rocksdb/RocksDBKv.java +++ b/fluss-server/src/main/java/org/apache/fluss/server/kv/rocksdb/RocksDBKv.java @@ -23,12 +23,14 @@ import org.apache.fluss.utils.BytesUtils; import org.apache.fluss.utils.IOUtils; +import org.rocksdb.Cache; import org.rocksdb.ColumnFamilyHandle; import org.rocksdb.ColumnFamilyOptions; import org.rocksdb.ReadOptions; import org.rocksdb.RocksDB; import org.rocksdb.RocksDBException; import org.rocksdb.RocksIterator; +import org.rocksdb.Statistics; import org.rocksdb.WriteOptions; import javax.annotation.Nullable; @@ -63,6 +65,9 @@ public class RocksDBKv implements AutoCloseable { /** Our RocksDB database. Currently, one kv tablet, one RocksDB instance. */ protected final RocksDB db; + /** RocksDB Statistics for metrics collection. */ + private final @Nullable Statistics statistics; + // mark whether this kv is already closed and prevent duplicate closing private volatile boolean closed = false; @@ -70,12 +75,14 @@ public RocksDBKv( RocksDBResourceContainer optionsContainer, RocksDB db, ResourceGuard rocksDBResourceGuard, - ColumnFamilyHandle defaultColumnFamilyHandle) { + ColumnFamilyHandle defaultColumnFamilyHandle, + @Nullable Statistics statistics) { this.optionsContainer = optionsContainer; this.db = db; this.rocksDBResourceGuard = rocksDBResourceGuard; this.writeOptions = optionsContainer.getWriteOptions(); this.defaultColumnFamilyHandle = defaultColumnFamilyHandle; + this.statistics = statistics; } public ResourceGuard getResourceGuard() { @@ -206,4 +213,18 @@ public void close() throws Exception { public RocksDB getDb() { return db; } + + @Nullable + public Statistics getStatistics() { + return optionsContainer.getStatistics(); + } + + @Nullable + public Cache getBlockCache() { + return optionsContainer.getBlockCache(); + } + + public ColumnFamilyHandle getDefaultColumnFamilyHandle() { + return defaultColumnFamilyHandle; + } } diff --git a/fluss-server/src/main/java/org/apache/fluss/server/kv/rocksdb/RocksDBKvBuilder.java b/fluss-server/src/main/java/org/apache/fluss/server/kv/rocksdb/RocksDBKvBuilder.java index 8fbc0cf954..92e86fb964 100644 --- a/fluss-server/src/main/java/org/apache/fluss/server/kv/rocksdb/RocksDBKvBuilder.java +++ b/fluss-server/src/main/java/org/apache/fluss/server/kv/rocksdb/RocksDBKvBuilder.java @@ -107,7 +107,12 @@ public RocksDBKv build() throws KvBuildingException { throw new KvBuildingException(errMsg, t); } LOG.info("Finished building RocksDB kv at {}.", instanceBasePath); - return new RocksDBKv(optionsContainer, db, rocksDBResourceGuard, defaultColumnFamilyHandle); + return new RocksDBKv( + optionsContainer, + db, + rocksDBResourceGuard, + defaultColumnFamilyHandle, + optionsContainer.getStatistics()); } void prepareDirectories() throws IOException { diff --git a/fluss-server/src/main/java/org/apache/fluss/server/kv/rocksdb/RocksDBMetrics.java b/fluss-server/src/main/java/org/apache/fluss/server/kv/rocksdb/RocksDBMetrics.java new file mode 100644 index 0000000000..803cb8b6da --- /dev/null +++ b/fluss-server/src/main/java/org/apache/fluss/server/kv/rocksdb/RocksDBMetrics.java @@ -0,0 +1,374 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.fluss.server.kv.rocksdb; + +import org.apache.fluss.server.utils.ResourceGuard; + +import org.rocksdb.Cache; +import org.rocksdb.ColumnFamilyHandle; +import org.rocksdb.HistogramData; +import org.rocksdb.HistogramType; +import org.rocksdb.MemoryUsageType; +import org.rocksdb.MemoryUtil; +import org.rocksdb.RocksDB; +import org.rocksdb.RocksDBException; +import org.rocksdb.Statistics; +import org.rocksdb.TickerType; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import javax.annotation.Nullable; + +import java.util.Collections; +import java.util.HashSet; +import java.util.Map; +import java.util.Set; + +/** + * Manages RocksDB metrics for a single KvTablet. + * + *

This class encapsulates all RocksDB-specific metric access logic, providing semantic methods + * to access various RocksDB statistics and properties. Upper layers should use these methods + * instead of directly accessing RocksDB properties with string names. + * + *

Thread-safety: This class uses RocksDB's ResourceGuard to ensure safe concurrent access. All + * metric read operations acquire the resource guard to prevent accessing closed RocksDB instances. + */ +public class RocksDBMetrics implements AutoCloseable { + + private static final Logger LOG = LoggerFactory.getLogger(RocksDBMetrics.class); + + private final RocksDB db; + @Nullable private final Statistics statistics; + private final ResourceGuard resourceGuard; + private final ColumnFamilyHandle defaultColumnFamilyHandle; + @Nullable private final Cache blockCache; + + public RocksDBMetrics( + RocksDB db, + @Nullable Statistics statistics, + ResourceGuard resourceGuard, + ColumnFamilyHandle defaultColumnFamilyHandle, + @Nullable Cache blockCache) { + this.db = db; + this.statistics = statistics; + this.resourceGuard = resourceGuard; + this.defaultColumnFamilyHandle = defaultColumnFamilyHandle; + this.blockCache = blockCache; + } + + // ==================== Ticker-based Metrics ==================== + + /** + * Get write stall duration in microseconds. + * + * @return write stall duration, or 0 if not available + */ + public long getWriteStallMicros() { + return getTickerValue(TickerType.STALL_MICROS); + } + + /** + * Get total bytes read. + * + * @return bytes read, or 0 if not available + */ + public long getBytesRead() { + return getTickerValue(TickerType.BYTES_READ); + } + + /** + * Get total bytes written. + * + * @return bytes written, or 0 if not available + */ + public long getBytesWritten() { + return getTickerValue(TickerType.BYTES_WRITTEN); + } + + /** + * Get flush bytes written. + * + * @return flush bytes written, or 0 if not available + */ + public long getFlushBytesWritten() { + return getTickerValue(TickerType.FLUSH_WRITE_BYTES); + } + + /** + * Get compaction bytes read. + * + * @return compaction bytes read, or 0 if not available + */ + public long getCompactionBytesRead() { + return getTickerValue(TickerType.COMPACT_READ_BYTES); + } + + /** + * Get compaction bytes written. + * + * @return compaction bytes written, or 0 if not available + */ + public long getCompactionBytesWritten() { + return getTickerValue(TickerType.COMPACT_WRITE_BYTES); + } + + // ==================== Property-based Metrics ==================== + + /** + * Get get operation latency in microseconds (P99). + * + *

This uses RocksDB Statistics histogram data to get the P99 latency of get operations. P99 + * is used instead of average because it better reflects tail latency issues, which are more + * critical for monitoring database performance. + * + * @return P99 get latency in microseconds, or 0 if not available + */ + public long getGetLatencyMicros() { + return getHistogramValue(HistogramType.DB_GET); + } + + /** + * Get write operation latency in microseconds (P99). + * + *

This uses RocksDB Statistics histogram data to get the P99 latency of write operations. + * P99 is used instead of average because it better reflects tail latency issues, which are more + * critical for monitoring database performance. + * + * @return P99 write latency in microseconds, or 0 if not available + */ + public long getWriteLatencyMicros() { + return getHistogramValue(HistogramType.DB_WRITE); + } + + /** + * Get number of files at level 0. + * + *

This property is column family specific and must be accessed through the column family + * handle. + * + * @return number of L0 files, or 0 if not available + */ + public long getNumFilesAtLevel0() { + return getPropertyValue(defaultColumnFamilyHandle, "rocksdb.num-files-at-level0"); + } + + /** + * Get whether a memtable flush is pending. + * + * @return 1 if flush is pending, 0 otherwise + */ + public long getFlushPending() { + return getPropertyValue("rocksdb.mem-table-flush-pending"); + } + + /** + * Get whether a compaction is pending. + * + * @return 1 if compaction is pending, 0 otherwise + */ + public long getCompactionPending() { + return getPropertyValue("rocksdb.compaction-pending"); + } + + /** + * Get compaction time in microseconds (P99). + * + *

This uses RocksDB Statistics histogram data to get the P99 compaction time. P99 is used + * instead of average because it better reflects tail latency issues in compaction operations. + * + * @return P99 compaction time in microseconds, or 0 if not available + */ + public long getCompactionTimeMicros() { + return getHistogramValue(HistogramType.COMPACTION_TIME); + } + + /** + * Get total memory usage across all RocksDB components including block cache, memtables, + * indexes, filters, etc. + * + *

This uses RocksDB MemoryUtil to get approximate memory usage by type and sums all types. + * This includes: + * + *

+ * + *

Note: To get accurate block cache memory usage, an explicit Cache object must be provided + * during construction. If no cache is provided (null), the block cache memory usage may not be + * fully accounted for. + * + * @return total memory usage in bytes, or 0 if not available + */ + public long getTotalMemoryUsage() { + try (ResourceGuard.Lease lease = resourceGuard.acquireResource()) { + if (db == null) { + return 0L; + } + + // Create cache set for memory usage calculation + // If blockCache is null, pass null to MemoryUtil (will only count memtables, etc.) + Set caches = null; + if (blockCache != null) { + caches = new HashSet<>(); + caches.add(blockCache); + } + + Map memoryUsage = + MemoryUtil.getApproximateMemoryUsageByType( + Collections.singletonList(db), caches); + return memoryUsage.values().stream().mapToLong(Long::longValue).sum(); + } catch (Exception e) { + LOG.debug( + "Failed to get total memory usage from RocksDB (possibly closed or unavailable)", + e); + return 0L; + } + } + + // ==================== Internal Helper Methods ==================== + + /** + * Get ticker value from RocksDB Statistics with resource guard protection. + * + * @param tickerType the ticker type to query + * @return the ticker value, or 0 if not available or RocksDB is closed + */ + private long getTickerValue(TickerType tickerType) { + try (ResourceGuard.Lease lease = resourceGuard.acquireResource()) { + if (statistics != null) { + return statistics.getTickerCount(tickerType); + } + } catch (Exception e) { + LOG.debug( + "Failed to get ticker {} from RocksDB (possibly closed or unavailable)", + tickerType, + e); + } + return 0L; + } + + /** + * Get histogram P99 value from RocksDB Statistics with resource guard protection. + * + *

Histograms are used for latency metrics and provide average, median, percentile values. + * This method returns the P99 value (99th percentile) instead of average, which better reflects + * tail latency and is more useful for performance monitoring. For microsecond-level latencies, + * we round to the nearest long value to avoid precision loss where it matters. + * + *

Why P99 instead of average: + * + *

+ * + * @param histogramType the histogram type to query + * @return the P99 histogram value (rounded to nearest long), or 0 if not available or RocksDB + * is closed + */ + private long getHistogramValue(HistogramType histogramType) { + try (ResourceGuard.Lease lease = resourceGuard.acquireResource()) { + if (statistics != null) { + HistogramData histogramData = statistics.getHistogramData(histogramType); + if (histogramData != null) { + // Use P99 instead of average for better tail latency monitoring + // Round to nearest long to preserve precision for microsecond-level values + return Math.round(histogramData.getPercentile99()); + } + } + } catch (Exception e) { + LOG.debug( + "Failed to get histogram {} from RocksDB Statistics (possibly closed or unavailable)", + histogramType, + e); + } + return 0L; + } + + /** + * Get property value from RocksDB with resource guard protection. + * + * @param propertyName the property name to query + * @return the property value as long, or 0 if not available or RocksDB is closed + */ + private long getPropertyValue(String propertyName) { + try (ResourceGuard.Lease lease = resourceGuard.acquireResource()) { + String value = db.getProperty(propertyName); + if (value != null && !value.isEmpty()) { + return Long.parseLong(value); + } + } catch (RocksDBException e) { + LOG.debug( + "Failed to get property {} from RocksDB (possibly closed or unavailable)", + propertyName, + e); + } catch (NumberFormatException e) { + LOG.debug("Failed to parse property {} value as long", propertyName, e); + } catch (Exception e) { + // ResourceGuard may throw exception if RocksDB is closed + LOG.debug( + "Failed to access RocksDB for property {} (possibly closed)", propertyName, e); + } + return 0L; + } + + /** + * Get property value from RocksDB for a specific column family with resource guard protection. + * + *

Some RocksDB properties are column family specific and must be accessed through the column + * family handle. + * + * @param columnFamilyHandle the column family handle + * @param propertyName the property name to query + * @return the property value as long, or 0 if not available or RocksDB is closed + */ + private long getPropertyValue(ColumnFamilyHandle columnFamilyHandle, String propertyName) { + try (ResourceGuard.Lease lease = resourceGuard.acquireResource()) { + if (columnFamilyHandle == null) { + return 0L; + } + String value = db.getProperty(columnFamilyHandle, propertyName); + if (value != null && !value.isEmpty()) { + return Long.parseLong(value); + } + } catch (RocksDBException e) { + LOG.debug( + "Failed to get property {} from RocksDB column family (possibly closed or unavailable)", + propertyName, + e); + } catch (NumberFormatException e) { + LOG.debug("Failed to parse property {} value as long", propertyName, e); + } catch (Exception e) { + // ResourceGuard may throw exception if RocksDB is closed + LOG.debug( + "Failed to access RocksDB for property {} (possibly closed)", propertyName, e); + } + return 0L; + } + + @Override + public void close() { + // No resources to clean up, metrics are managed by TableMetricGroup + LOG.debug("RocksDB metrics accessor closed"); + } +} diff --git a/fluss-server/src/main/java/org/apache/fluss/server/kv/rocksdb/RocksDBResourceContainer.java b/fluss-server/src/main/java/org/apache/fluss/server/kv/rocksdb/RocksDBResourceContainer.java index 4495613e65..a07ea1a918 100644 --- a/fluss-server/src/main/java/org/apache/fluss/server/kv/rocksdb/RocksDBResourceContainer.java +++ b/fluss-server/src/main/java/org/apache/fluss/server/kv/rocksdb/RocksDBResourceContainer.java @@ -28,11 +28,13 @@ import org.rocksdb.BlockBasedTableConfig; import org.rocksdb.BloomFilter; +import org.rocksdb.Cache; import org.rocksdb.ColumnFamilyOptions; import org.rocksdb.CompactionStyle; import org.rocksdb.CompressionType; import org.rocksdb.DBOptions; import org.rocksdb.InfoLogLevel; +import org.rocksdb.LRUCache; import org.rocksdb.PlainTableConfig; import org.rocksdb.RateLimiter; import org.rocksdb.ReadOptions; @@ -80,6 +82,12 @@ public class RocksDBResourceContainer implements AutoCloseable { /** The shared rate limiter for all RocksDB instances. */ private final RateLimiter sharedRateLimiter; + /** The statistics object for RocksDB, null if statistics is disabled. */ + @Nullable private Statistics statistics; + + /** The block cache for RocksDB, shared across column families. */ + @Nullable private Cache blockCache; + /** The handles to be closed when the container is closed. */ private final ArrayList handlesToClose; @@ -138,7 +146,7 @@ public DBOptions getDbOptions() throws IOException { opt.setRateLimiter(sharedRateLimiter); if (enableStatistics) { - Statistics statistics = new Statistics(); + statistics = new Statistics(); opt.setStatistics(statistics); handlesToClose.add(statistics); } @@ -146,6 +154,18 @@ public DBOptions getDbOptions() throws IOException { return opt; } + /** Gets the Statistics object if statistics is enabled, null otherwise. */ + @Nullable + public Statistics getStatistics() { + return statistics; + } + + /** Gets the block cache used by RocksDB, null if not yet initialized. */ + @Nullable + public Cache getBlockCache() { + return blockCache; + } + /** Gets the RocksDB {@link ColumnFamilyOptions} to be used for all RocksDB instances. */ public ColumnFamilyOptions getColumnOptions() { // initial options from common profile @@ -282,8 +302,11 @@ private ColumnFamilyOptions setColumnFamilyOptionsFromConfigurableOptions( blockBasedTableConfig.setMetadataBlockSize( internalGetOption(ConfigOptions.KV_METADATA_BLOCK_SIZE).getBytes()); - blockBasedTableConfig.setBlockCacheSize( - internalGetOption(ConfigOptions.KV_BLOCK_CACHE_SIZE).getBytes()); + // Create explicit LRUCache for accurate memory tracking + long blockCacheSize = internalGetOption(ConfigOptions.KV_BLOCK_CACHE_SIZE).getBytes(); + blockCache = new LRUCache(blockCacheSize); + handlesToClose.add(blockCache); + blockBasedTableConfig.setBlockCache(blockCache); if (internalGetOption(ConfigOptions.KV_USE_BLOOM_FILTER)) { final double bitsPerKey = internalGetOption(ConfigOptions.KV_BLOOM_FILTER_BITS_PER_KEY); diff --git a/fluss-server/src/main/java/org/apache/fluss/server/metrics/group/TableMetricGroup.java b/fluss-server/src/main/java/org/apache/fluss/server/metrics/group/TableMetricGroup.java index 7620bcbfda..e8f4594a73 100644 --- a/fluss-server/src/main/java/org/apache/fluss/server/metrics/group/TableMetricGroup.java +++ b/fluss-server/src/main/java/org/apache/fluss/server/metrics/group/TableMetricGroup.java @@ -27,6 +27,7 @@ import org.apache.fluss.metrics.ThreadSafeSimpleCounter; import org.apache.fluss.metrics.groups.AbstractMetricGroup; import org.apache.fluss.metrics.registry.MetricRegistry; +import org.apache.fluss.server.kv.rocksdb.RocksDBMetrics; import javax.annotation.Nullable; @@ -43,6 +44,10 @@ public class TableMetricGroup extends AbstractMetricGroup { private final Map buckets = new HashMap<>(); + // Directly manage RocksDB metrics for aggregation + // This is cleaner than passing through BucketMetricGroup + private final Map rocksDBMetricsMap = new HashMap<>(); + private final TablePath tablePath; // server-level metrics @@ -70,6 +75,8 @@ public TableMetricGroup( if (isKvTable) { kvMetrics = new KvMetricGroup(this); logMetrics = new LogMetricGroup(this, TabletType.CDC_LOG); + // Register RocksDB aggregated metrics for kv tables + registerRocksDBMetrics(); } else { // otherwise, create log produce metrics kvMetrics = null; @@ -236,17 +243,159 @@ public BucketMetricGroup addBucketMetricGroup( public void removeBucketMetricGroup(TableBucket tableBucket) { BucketMetricGroup metricGroup = buckets.remove(tableBucket); - metricGroup.close(); + if (metricGroup != null) { + metricGroup.close(); + } + // Also remove RocksDB metrics if exists + RocksDBMetrics rocksDBMetrics = rocksDBMetricsMap.remove(tableBucket); + if (rocksDBMetrics != null) { + try { + rocksDBMetrics.close(); + } catch (Exception e) { + // Ignore close errors + } + } + } + + /** + * Register RocksDB metrics for a bucket. This allows table-level aggregation without + * registering bucket-level metrics. + * + * @param tableBucket the table bucket + * @param rocksDBMetrics the RocksDB metrics accessor + */ + public void registerRocksDBMetrics(TableBucket tableBucket, RocksDBMetrics rocksDBMetrics) { + rocksDBMetricsMap.put(tableBucket, rocksDBMetrics); + } + + /** + * Unregister RocksDB metrics for a bucket. + * + * @param tableBucket the table bucket + */ + public void unregisterRocksDBMetrics(TableBucket tableBucket) { + RocksDBMetrics rocksDBMetrics = rocksDBMetricsMap.remove(tableBucket); + if (rocksDBMetrics != null) { + try { + rocksDBMetrics.close(); + } catch (Exception e) { + // Ignore close errors + } + } } public int bucketGroupsCount() { return buckets.size(); } + public java.util.Collection getBucketMetricGroups() { + return buckets.values(); + } + + /** + * Get the RocksDB metrics map for server-level aggregation. + * + * @return the map of RocksDB metrics + */ + public Map getRocksDBMetricsMap() { + return rocksDBMetricsMap; + } + public TabletServerMetricGroup getServerMetricGroup() { return (TabletServerMetricGroup) parent; } + /** + * Register RocksDB aggregated metrics at table level. These metrics aggregate values from all + * buckets of this table. + * + *

This method is called once during TableMetricGroup construction for KV tables. + */ + private void registerRocksDBMetrics() { + // Max aggregation metrics - track the maximum value across all buckets + gauge( + MetricNames.ROCKSDB_WRITE_STALL_MICROS_MAX, + () -> + rocksDBMetricsMap.values().stream() + .mapToLong(RocksDBMetrics::getWriteStallMicros) + .max() + .orElse(0L)); + gauge( + MetricNames.ROCKSDB_GET_LATENCY_MICROS_MAX, + () -> + rocksDBMetricsMap.values().stream() + .mapToLong(RocksDBMetrics::getGetLatencyMicros) + .max() + .orElse(0L)); + gauge( + MetricNames.ROCKSDB_WRITE_LATENCY_MICROS_MAX, + () -> + rocksDBMetricsMap.values().stream() + .mapToLong(RocksDBMetrics::getWriteLatencyMicros) + .max() + .orElse(0L)); + gauge( + MetricNames.ROCKSDB_NUM_FILES_AT_LEVEL0_MAX, + () -> + rocksDBMetricsMap.values().stream() + .mapToLong(RocksDBMetrics::getNumFilesAtLevel0) + .max() + .orElse(0L)); + gauge( + MetricNames.ROCKSDB_FLUSH_PENDING_MAX, + () -> + rocksDBMetricsMap.values().stream() + .mapToLong(RocksDBMetrics::getFlushPending) + .max() + .orElse(0L)); + gauge( + MetricNames.ROCKSDB_COMPACTION_PENDING_MAX, + () -> + rocksDBMetricsMap.values().stream() + .mapToLong(RocksDBMetrics::getCompactionPending) + .max() + .orElse(0L)); + gauge( + MetricNames.ROCKSDB_COMPACTION_TIME_MICROS_MAX, + () -> + rocksDBMetricsMap.values().stream() + .mapToLong(RocksDBMetrics::getCompactionTimeMicros) + .max() + .orElse(0L)); + + // Sum aggregation metrics - track the total value across all buckets + gauge( + MetricNames.ROCKSDB_BYTES_READ_TOTAL, + () -> + rocksDBMetricsMap.values().stream() + .mapToLong(RocksDBMetrics::getBytesRead) + .sum()); + gauge( + MetricNames.ROCKSDB_BYTES_WRITTEN_TOTAL, + () -> + rocksDBMetricsMap.values().stream() + .mapToLong(RocksDBMetrics::getBytesWritten) + .sum()); + gauge( + MetricNames.ROCKSDB_FLUSH_BYTES_WRITTEN_TOTAL, + () -> + rocksDBMetricsMap.values().stream() + .mapToLong(RocksDBMetrics::getFlushBytesWritten) + .sum()); + gauge( + MetricNames.ROCKSDB_COMPACTION_BYTES_READ_TOTAL, + () -> + rocksDBMetricsMap.values().stream() + .mapToLong(RocksDBMetrics::getCompactionBytesRead) + .sum()); + gauge( + MetricNames.ROCKSDB_COMPACTION_BYTES_WRITTEN_TOTAL, + () -> + rocksDBMetricsMap.values().stream() + .mapToLong(RocksDBMetrics::getCompactionBytesWritten) + .sum()); + } + /** Metric group for specific kind of tablet of a table. */ private static class TabletMetricGroup extends AbstractMetricGroup { private final TabletType tabletType; diff --git a/fluss-server/src/main/java/org/apache/fluss/server/metrics/group/TabletServerMetricGroup.java b/fluss-server/src/main/java/org/apache/fluss/server/metrics/group/TabletServerMetricGroup.java index 59730b69fa..11a77895bf 100644 --- a/fluss-server/src/main/java/org/apache/fluss/server/metrics/group/TabletServerMetricGroup.java +++ b/fluss-server/src/main/java/org/apache/fluss/server/metrics/group/TabletServerMetricGroup.java @@ -30,13 +30,19 @@ import org.apache.fluss.metrics.ThreadSafeSimpleCounter; import org.apache.fluss.metrics.groups.AbstractMetricGroup; import org.apache.fluss.metrics.registry.MetricRegistry; +import org.apache.fluss.server.kv.rocksdb.RocksDBMetrics; import org.apache.fluss.utils.MapUtils; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + import java.util.Map; /** The metric group for tablet server. */ public class TabletServerMetricGroup extends AbstractMetricGroup { + private static final Logger LOG = LoggerFactory.getLogger(TabletServerMetricGroup.class); + private static final String NAME = "tabletserver"; private static final int WINDOW_SIZE = 1024; @@ -133,6 +139,24 @@ public TabletServerMetricGroup( meter(MetricNames.ISR_SHRINKS_RATE, new MeterView(isrShrinks)); failedIsrUpdates = new SimpleCounter(); meter(MetricNames.FAILED_ISR_UPDATES_RATE, new MeterView(failedIsrUpdates)); + + // Register server-level RocksDB aggregated metrics + registerServerRocksDBMetrics(); + } + + /** + * Register server-level RocksDB aggregated metrics. These metrics aggregate memory usage from + * all tables. + */ + private void registerServerRocksDBMetrics() { + // Total memory usage across all RocksDB instances in this server + gauge( + MetricNames.ROCKSDB_MEMORY_USAGE_TOTAL, + () -> + metricGroupByTable.values().stream() + .flatMap(table -> table.getRocksDBMetricsMap().values().stream()) + .mapToLong(RocksDBMetrics::getTotalMemoryUsage) + .sum()); } @Override @@ -234,6 +258,26 @@ public BucketMetricGroup addTableBucketMetricGroup( return tableMetricGroup.addBucketMetricGroup(physicalTablePath.getPartitionName(), bucket); } + /** + * Register RocksDB metrics for a specific table bucket. This method ensures the + * TableMetricGroup exists (creates it if necessary) before registering the RocksDB metrics. + * + *

This is an idempotent operation - calling it multiple times with the same parameters is + * safe. + * + * @param tablePath the table path + * @param tableBucket the table bucket + * @param rocksDBMetrics the RocksDB metrics to register + */ + public void registerRocksDBMetrics( + TablePath tablePath, TableBucket tableBucket, RocksDBMetrics rocksDBMetrics) { + // Ensure TableMetricGroup exists (idempotent creation) + TableMetricGroup tableMetricGroup = + metricGroupByTable.computeIfAbsent( + tablePath, table -> new TableMetricGroup(registry, tablePath, true, this)); + tableMetricGroup.registerRocksDBMetrics(tableBucket, rocksDBMetrics); + } + public void removeTableBucketMetricGroup(TablePath tablePath, TableBucket bucket) { // get the metric group of the table TableMetricGroup tableMetricGroup = metricGroupByTable.get(tablePath); diff --git a/fluss-server/src/test/java/org/apache/fluss/server/kv/KvTabletTest.java b/fluss-server/src/test/java/org/apache/fluss/server/kv/KvTabletTest.java index 06db30b340..c3b054dcaa 100644 --- a/fluss-server/src/test/java/org/apache/fluss/server/kv/KvTabletTest.java +++ b/fluss-server/src/test/java/org/apache/fluss/server/kv/KvTabletTest.java @@ -47,12 +47,14 @@ import org.apache.fluss.server.kv.prewrite.KvPreWriteBuffer.Key; import org.apache.fluss.server.kv.prewrite.KvPreWriteBuffer.KvEntry; import org.apache.fluss.server.kv.prewrite.KvPreWriteBuffer.Value; +import org.apache.fluss.server.kv.rocksdb.RocksDBMetrics; import org.apache.fluss.server.kv.rowmerger.RowMerger; import org.apache.fluss.server.log.FetchIsolation; import org.apache.fluss.server.log.LogAppendInfo; import org.apache.fluss.server.log.LogTablet; import org.apache.fluss.server.log.LogTestUtils; import org.apache.fluss.server.metrics.group.TestingMetricGroups; +import org.apache.fluss.server.zk.NOPErrorHandler; import org.apache.fluss.shaded.arrow.org.apache.arrow.memory.RootAllocator; import org.apache.fluss.types.DataTypes; import org.apache.fluss.types.RowType; @@ -1241,4 +1243,183 @@ private void checkEqual(LogRecords actaulLogRecords, List expe private Value valueOf(BinaryRow row) { return Value.of(ValueEncoder.encodeValue(schemaId, row)); } + + @Test + void testRocksDBMetrics() throws Exception { + // Initialize tablet with schema + initLogTabletAndKvTablet(DATA1_SCHEMA_PK, new HashMap<>()); + + // Get RocksDB metrics + RocksDBMetrics metrics = kvTablet.getRocksDBMetrics(); + assertThat(metrics).as("RocksDB metrics should be available").isNotNull(); + + // Verify statistics is properly initialized + org.rocksdb.Statistics stats = kvTablet.getRocksDBKv().getStatistics(); + assertThat(stats).as("RocksDB Statistics should be enabled").isNotNull(); + + // All metrics should start at 0 for a fresh database + assertThat(metrics.getBytesWritten()).isEqualTo(0); + assertThat(metrics.getBytesRead()).isEqualTo(0); + assertThat(metrics.getFlushBytesWritten()).isEqualTo(0); + assertThat(metrics.getWriteLatencyMicros()).isEqualTo(0); + assertThat(metrics.getGetLatencyMicros()).isEqualTo(0); + assertThat(metrics.getNumFilesAtLevel0()).isEqualTo(0); + assertThat(metrics.getFlushPending()).isEqualTo(0); + assertThat(metrics.getCompactionPending()).isEqualTo(0); + assertThat(metrics.getTotalMemoryUsage()).isGreaterThan(0); // Block cache is pre-allocated + + // ========== Phase 1: Write and Flush ========== + int numRecords = 10000; + List rows = new ArrayList<>(); + for (int i = 0; i < numRecords; i++) { + rows.add( + kvRecordFactory.ofRecord( + String.valueOf(i).getBytes(), new Object[] {i, "value-" + i})); + } + kvTablet.putAsLeader(kvRecordBatchFactory.ofRecords(rows), null); + kvTablet.flush(Long.MAX_VALUE, NOPErrorHandler.INSTANCE); + + // After write and flush: must have written data to RocksDB + long bytesWrittenAfterFlush = metrics.getBytesWritten(); + assertThat(bytesWrittenAfterFlush) + .as("Must write data to RocksDB after flush") + .isGreaterThan(0); + + // Flush must have written bytes (memtable to SST file) + long flushBytesWritten = metrics.getFlushBytesWritten(); + // Note: FLUSH_WRITE_BYTES may not be tracked in all configurations + assertThat(flushBytesWritten) + .as("Flush bytes written is non-negative") + .isGreaterThanOrEqualTo(0); + + // Write latency must be tracked for write operations + long writeLatency = metrics.getWriteLatencyMicros(); + assertThat(writeLatency) + .as("Write latency must be > 0 after write operations") + .isGreaterThan(0); + + // After flush, there should be at least 1 L0 file (unless immediate compaction occurred) + long numL0Files = metrics.getNumFilesAtLevel0(); + assertThat(numL0Files) + .as("Should have L0 files after flush (or 0 if compacted)") + .isGreaterThanOrEqualTo(0); + + // Flush pending must be 0 after flush completes + assertThat(metrics.getFlushPending()).as("No pending flush after completion").isEqualTo(0); + + // ========== Phase 2: Read Operations ========== + List keysToRead = new ArrayList<>(); + for (int i = 0; i < 100; i++) { + keysToRead.add(String.valueOf(i).getBytes()); + } + List readValues = kvTablet.multiGet(keysToRead); + assertThat(readValues).hasSize(100); + + // After reads: get latency must be tracked + long getLatency = metrics.getGetLatencyMicros(); + assertThat(getLatency) + .as("Get latency must be tracked after read operations") + .isGreaterThan(0); + + // Bytes read may increase (depending on cache hits) + long bytesRead = metrics.getBytesRead(); + // Note: bytesRead could be 0 if all data was served from block cache + assertThat(bytesRead).as("Bytes read is non-negative").isGreaterThanOrEqualTo(0); + + // ========== Phase 3: Write More Data to Trigger Compaction ========== + List moreRows = new ArrayList<>(); + for (int i = numRecords; i < numRecords * 2; i++) { + moreRows.add( + kvRecordFactory.ofRecord( + String.valueOf(i).getBytes(), new Object[] {i, "value-" + i})); + } + kvTablet.putAsLeader(kvRecordBatchFactory.ofRecords(moreRows), null); + kvTablet.flush(Long.MAX_VALUE, NOPErrorHandler.INSTANCE); + + // Bytes written must increase with more data + long bytesWrittenAfterSecondFlush = metrics.getBytesWritten(); + assertThat(bytesWrittenAfterSecondFlush) + .as("Bytes written must increase with second batch") + .isGreaterThan(bytesWrittenAfterFlush); + + // ========== Phase 4: Manual Compaction ========== + long compactionBytesReadBefore = metrics.getCompactionBytesRead(); + long compactionBytesWrittenBefore = metrics.getCompactionBytesWritten(); + long compactionTimeBefore = metrics.getCompactionTimeMicros(); + + // Trigger manual compaction + try { + kvTablet.getRocksDBKv().getDb().compactRange(); + } catch (Exception e) { + // Compaction failure is acceptable in test + } + + // After compaction: verify compaction metrics increased + long compactionBytesReadAfter = metrics.getCompactionBytesRead(); + long compactionBytesWrittenAfter = metrics.getCompactionBytesWritten(); + long compactionTimeAfter = metrics.getCompactionTimeMicros(); + + // If any compaction occurred, all three metrics should increase + boolean compactionOccurred = + compactionBytesReadAfter > compactionBytesReadBefore + || compactionBytesWrittenAfter > compactionBytesWrittenBefore + || compactionTimeAfter > compactionTimeBefore; + + if (compactionOccurred) { + assertThat(compactionBytesReadAfter) + .as("Compaction must read data") + .isGreaterThan(compactionBytesReadBefore); + assertThat(compactionBytesWrittenAfter) + .as("Compaction must write data") + .isGreaterThan(compactionBytesWrittenBefore); + assertThat(compactionTimeAfter) + .as("Compaction must take time") + .isGreaterThan(compactionTimeBefore); + } + + // Compaction pending must be 0 after compaction completes + assertThat(metrics.getCompactionPending()) + .as("No pending compaction after completion") + .isEqualTo(0); + + // ========== Phase 5: Verify Final State Before Close ========== + // Bytes written must be positive after all operations + assertThat(metrics.getBytesWritten()) + .as("Total bytes written must be positive") + .isGreaterThan(0); + + // Write and get latency must be positive (operations occurred) + assertThat(metrics.getWriteLatencyMicros()) + .as("Write latency must be positive after writes") + .isGreaterThan(0); + assertThat(metrics.getGetLatencyMicros()) + .as("Get latency must be positive after reads") + .isGreaterThan(0); + + // No pending operations + assertThat(metrics.getFlushPending()).isEqualTo(0); + assertThat(metrics.getCompactionPending()).isEqualTo(0); + + // Memory usage should be reasonable (> 0 due to block cache + memtables) + assertThat(metrics.getTotalMemoryUsage()) + .as("Total memory usage must be positive") + .isGreaterThan(0); + + // ========== Phase 6: Verify Metrics After Close ========== + kvTablet.close(); + + // After close: all metrics must return 0 (ResourceGuard protection) + assertThat(metrics.getBytesWritten()).isEqualTo(0); + assertThat(metrics.getBytesRead()).isEqualTo(0); + assertThat(metrics.getFlushBytesWritten()).isEqualTo(0); + assertThat(metrics.getWriteLatencyMicros()).isEqualTo(0); + assertThat(metrics.getGetLatencyMicros()).isEqualTo(0); + assertThat(metrics.getNumFilesAtLevel0()).isEqualTo(0); + assertThat(metrics.getFlushPending()).isEqualTo(0); + assertThat(metrics.getCompactionPending()).isEqualTo(0); + assertThat(metrics.getCompactionBytesRead()).isEqualTo(0); + assertThat(metrics.getCompactionBytesWritten()).isEqualTo(0); + assertThat(metrics.getCompactionTimeMicros()).isEqualTo(0); + assertThat(metrics.getTotalMemoryUsage()).isEqualTo(0); + } } diff --git a/fluss-server/src/test/java/org/apache/fluss/server/kv/rocksdb/RocksDBResourceContainerTest.java b/fluss-server/src/test/java/org/apache/fluss/server/kv/rocksdb/RocksDBResourceContainerTest.java index f36853cd48..676a17a6b0 100644 --- a/fluss-server/src/test/java/org/apache/fluss/server/kv/rocksdb/RocksDBResourceContainerTest.java +++ b/fluss-server/src/test/java/org/apache/fluss/server/kv/rocksdb/RocksDBResourceContainerTest.java @@ -189,7 +189,8 @@ void testConfigurationOptionsFromConfig() throws Exception { (BlockBasedTableConfig) columnOptions.tableFormatConfig(); assertThat(tableConfig.blockSize()).isEqualTo(4 * SizeUnit.KB); assertThat(tableConfig.metadataBlockSize()).isEqualTo(8 * SizeUnit.KB); - assertThat(tableConfig.blockCacheSize()).isEqualTo(512 * SizeUnit.MB); + // Verify block cache was created with explicit LRUCache for memory tracking + assertThat(optionsContainer.getBlockCache()).isNotNull(); assertThat(tableConfig.filterPolicy() instanceof BloomFilter).isTrue(); } } diff --git a/website/docs/maintenance/observability/monitor-metrics.md b/website/docs/maintenance/observability/monitor-metrics.md index 2d8ef80771..65109551c2 100644 --- a/website/docs/maintenance/observability/monitor-metrics.md +++ b/website/docs/maintenance/observability/monitor-metrics.md @@ -825,6 +825,134 @@ Some metrics might not be exposed when using other JVM implementations (e.g. IBM +### RocksDB + +RocksDB metrics provide insights into the performance and health of the underlying RocksDB storage engine used by Fluss. These metrics are categorized into table-level metrics (aggregated from all buckets of a table) and server-level metrics (aggregated from all tables in a server). + +#### Table-level RocksDB Metrics (Max Aggregation) + +These metrics use Max aggregation to show the maximum value across all buckets of a table, which helps identify the worst-performing bucket. + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + +
ScopeInfixMetricsDescriptionType
tabletservertablerocksdbWriteStallMicrosMaxMaximum write stall duration across all buckets of this table (in microseconds). Write stalls occur when RocksDB needs to slow down writes due to compaction pressure or memory limits.Gauge
rocksdbGetLatencyMicrosMaxMaximum get operation latency across all buckets of this table (in microseconds). This represents the slowest read operation among all buckets.Gauge
rocksdbWriteLatencyMicrosMaxMaximum write operation latency across all buckets of this table (in microseconds). This represents the slowest write operation among all buckets.Gauge
rocksdbNumFilesAtLevel0MaxMaximum number of L0 files across all buckets of this table. A high number of L0 files indicates compaction pressure and may impact read performance.Gauge
rocksdbFlushPendingMaxMaximum flush pending indicator across all buckets of this table. A value greater than 0 indicates that some buckets have pending flush operations.Gauge
rocksdbCompactionPendingMaxMaximum compaction pending indicator across all buckets of this table. A value greater than 0 indicates that some buckets have pending compaction operations.Gauge
rocksdbCompactionTimeMicrosMaxMaximum compaction time across all buckets of this table (in microseconds). This represents the longest compaction operation among all buckets.Gauge
+ +#### Table-level RocksDB Metrics (Sum Aggregation) + +These metrics use Sum aggregation to show the total value across all buckets of a table, providing an overall view of table-level I/O and storage operations. + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + +
ScopeInfixMetricsDescriptionType
tabletservertablerocksdbBytesReadTotalTotal bytes read across all buckets of this table. This includes both user reads and internal reads (e.g., compaction reads).Gauge
rocksdbBytesWrittenTotalTotal bytes written across all buckets of this table. This includes both user writes and internal writes (e.g., compaction writes).Gauge
rocksdbFlushBytesWrittenTotalTotal flush bytes written across all buckets of this table. This represents the amount of data flushed from memtable to persistent storage.Gauge
rocksdbCompactionBytesReadTotalTotal compaction bytes read across all buckets of this table. This represents the amount of data read during compaction operations.Gauge
rocksdbCompactionBytesWrittenTotalTotal compaction bytes written across all buckets of this table. This represents the amount of data written during compaction operations.Gauge
+ +#### Server-level RocksDB Metrics (Sum Aggregation) + +These metrics use Sum aggregation to show the total value across all tables in a server, providing a server-wide view of RocksDB resource usage. + + + + + + + + + + + + + + + + + + + + +
ScopeInfixMetricsDescriptionType
tabletserver-rocksdbMemoryUsageTotalTotal memory usage across all RocksDB instances in this server (in bytes). This includes memory used by memtables, block cache, and other RocksDB internal structures.Gauge
### Flink connector standard metrics From f52a69685d4ce5e8f9647205c8ea41fe6993de1f Mon Sep 17 00:00:00 2001 From: "ocean.wy" Date: Wed, 31 Dec 2025 09:13:04 +0800 Subject: [PATCH 2/4] refine code --- .../java/org/apache/fluss/server/kv/KvManager.java | 2 ++ .../metrics/group/TabletServerMetricGroup.java | 13 +++++++++++++ 2 files changed, 15 insertions(+) diff --git a/fluss-server/src/main/java/org/apache/fluss/server/kv/KvManager.java b/fluss-server/src/main/java/org/apache/fluss/server/kv/KvManager.java index 1879c8ff61..e2f73bb264 100644 --- a/fluss-server/src/main/java/org/apache/fluss/server/kv/KvManager.java +++ b/fluss-server/src/main/java/org/apache/fluss/server/kv/KvManager.java @@ -313,6 +313,8 @@ public void dropKv(TableBucket tableBucket) { TablePath tablePath = dropKvTablet.getTablePath(); try { // Unregister RocksDB metrics from TableMetricGroup + serverMetricGroup.unregisterRocksDBMetrics(tablePath, tableBucket); + // Remove bucket metric group and clean up table metric group if needed serverMetricGroup.removeTableBucketMetricGroup(tablePath, tableBucket); dropKvTablet.drop(); diff --git a/fluss-server/src/main/java/org/apache/fluss/server/metrics/group/TabletServerMetricGroup.java b/fluss-server/src/main/java/org/apache/fluss/server/metrics/group/TabletServerMetricGroup.java index 11a77895bf..f1dd4c3b3b 100644 --- a/fluss-server/src/main/java/org/apache/fluss/server/metrics/group/TabletServerMetricGroup.java +++ b/fluss-server/src/main/java/org/apache/fluss/server/metrics/group/TabletServerMetricGroup.java @@ -278,6 +278,19 @@ public void registerRocksDBMetrics( tableMetricGroup.registerRocksDBMetrics(tableBucket, rocksDBMetrics); } + /** + * Unregister RocksDB metrics for a specific table bucket. + * + * @param tablePath the table path + * @param tableBucket the table bucket + */ + public void unregisterRocksDBMetrics(TablePath tablePath, TableBucket tableBucket) { + TableMetricGroup tableMetricGroup = metricGroupByTable.get(tablePath); + if (tableMetricGroup != null) { + tableMetricGroup.unregisterRocksDBMetrics(tableBucket); + } + } + public void removeTableBucketMetricGroup(TablePath tablePath, TableBucket bucket) { // get the metric group of the table TableMetricGroup tableMetricGroup = metricGroupByTable.get(tablePath); From 663ebb7535cd19d6c20fb4e85a4df5f61589a057 Mon Sep 17 00:00:00 2001 From: "ocean.wy" Date: Wed, 31 Dec 2025 10:03:29 +0800 Subject: [PATCH 3/4] refact code --- .../src/main/java/org/apache/fluss/server/kv/KvManager.java | 2 -- .../java/org/apache/fluss/server/kv/rocksdb/RocksDBMetrics.java | 2 +- 2 files changed, 1 insertion(+), 3 deletions(-) diff --git a/fluss-server/src/main/java/org/apache/fluss/server/kv/KvManager.java b/fluss-server/src/main/java/org/apache/fluss/server/kv/KvManager.java index e2f73bb264..c8bf05bae7 100644 --- a/fluss-server/src/main/java/org/apache/fluss/server/kv/KvManager.java +++ b/fluss-server/src/main/java/org/apache/fluss/server/kv/KvManager.java @@ -314,8 +314,6 @@ public void dropKv(TableBucket tableBucket) { try { // Unregister RocksDB metrics from TableMetricGroup serverMetricGroup.unregisterRocksDBMetrics(tablePath, tableBucket); - // Remove bucket metric group and clean up table metric group if needed - serverMetricGroup.removeTableBucketMetricGroup(tablePath, tableBucket); dropKvTablet.drop(); if (dropKvTablet.getPartitionName() == null) { diff --git a/fluss-server/src/main/java/org/apache/fluss/server/kv/rocksdb/RocksDBMetrics.java b/fluss-server/src/main/java/org/apache/fluss/server/kv/rocksdb/RocksDBMetrics.java index 803cb8b6da..03be680ca9 100644 --- a/fluss-server/src/main/java/org/apache/fluss/server/kv/rocksdb/RocksDBMetrics.java +++ b/fluss-server/src/main/java/org/apache/fluss/server/kv/rocksdb/RocksDBMetrics.java @@ -224,7 +224,7 @@ public long getTotalMemoryUsage() { return 0L; } - // Create cache set for memory usage calculation + // Create cache set for memory usage calculation. // If blockCache is null, pass null to MemoryUtil (will only count memtables, etc.) Set caches = null; if (blockCache != null) { From 4dee716764e3d2c365253137e58ad4613fc2b83e Mon Sep 17 00:00:00 2001 From: "ocean.wy" Date: Mon, 5 Jan 2026 18:53:44 +0800 Subject: [PATCH 4/4] improve rocksdb metrics register/unregister logic --- .../org/apache/fluss/server/kv/KvManager.java | 18 --- .../org/apache/fluss/server/kv/KvTablet.java | 28 ++--- ...sDBMetrics.java => RocksDBStatistics.java} | 22 ++-- .../metrics/group/BucketMetricGroup.java | 71 ++++++++++- .../metrics/group/TableMetricGroup.java | 112 ++++++------------ .../group/TabletServerMetricGroup.java | 46 +------ .../apache/fluss/server/replica/Replica.java | 10 ++ .../apache/fluss/server/kv/KvTabletTest.java | 95 ++++++++------- 8 files changed, 196 insertions(+), 206 deletions(-) rename fluss-server/src/main/java/org/apache/fluss/server/kv/rocksdb/{RocksDBMetrics.java => RocksDBStatistics.java} (94%) diff --git a/fluss-server/src/main/java/org/apache/fluss/server/kv/KvManager.java b/fluss-server/src/main/java/org/apache/fluss/server/kv/KvManager.java index c8bf05bae7..c5be3693e1 100644 --- a/fluss-server/src/main/java/org/apache/fluss/server/kv/KvManager.java +++ b/fluss-server/src/main/java/org/apache/fluss/server/kv/KvManager.java @@ -266,14 +266,6 @@ public KvTablet getOrCreateKv( sharedRocksDBRateLimiter); currentKvs.put(tableBucket, tablet); - // Register RocksDB metrics to TableMetricGroup for aggregation - // Note: BucketMetricGroup is already created by Replica when LogTablet is - // created - if (tablet.getRocksDBMetrics() != null) { - serverMetricGroup.registerRocksDBMetrics( - tablePath.getTablePath(), tableBucket, tablet.getRocksDBMetrics()); - } - LOG.info( "Created kv tablet for bucket {} in dir {}.", tableBucket, @@ -312,9 +304,6 @@ public void dropKv(TableBucket tableBucket) { if (dropKvTablet != null) { TablePath tablePath = dropKvTablet.getTablePath(); try { - // Unregister RocksDB metrics from TableMetricGroup - serverMetricGroup.unregisterRocksDBMetrics(tablePath, tableBucket); - dropKvTablet.drop(); if (dropKvTablet.getPartitionName() == null) { LOG.info( @@ -398,13 +387,6 @@ public KvTablet loadKv(File tabletDir, SchemaGetter schemaGetter) throws Excepti } this.currentKvs.put(tableBucket, kvTablet); - // Register RocksDB metrics to TableMetricGroup for aggregation - // Note: BucketMetricGroup is already created by Replica when LogTablet is created - if (kvTablet.getRocksDBMetrics() != null) { - serverMetricGroup.registerRocksDBMetrics( - physicalTablePath.getTablePath(), tableBucket, kvTablet.getRocksDBMetrics()); - } - return kvTablet; } diff --git a/fluss-server/src/main/java/org/apache/fluss/server/kv/KvTablet.java b/fluss-server/src/main/java/org/apache/fluss/server/kv/KvTablet.java index aa7a5791a1..0b43f0a316 100644 --- a/fluss-server/src/main/java/org/apache/fluss/server/kv/KvTablet.java +++ b/fluss-server/src/main/java/org/apache/fluss/server/kv/KvTablet.java @@ -49,8 +49,8 @@ import org.apache.fluss.server.kv.prewrite.KvPreWriteBuffer.TruncateReason; import org.apache.fluss.server.kv.rocksdb.RocksDBKv; import org.apache.fluss.server.kv.rocksdb.RocksDBKvBuilder; -import org.apache.fluss.server.kv.rocksdb.RocksDBMetrics; import org.apache.fluss.server.kv.rocksdb.RocksDBResourceContainer; +import org.apache.fluss.server.kv.rocksdb.RocksDBStatistics; import org.apache.fluss.server.kv.rowmerger.DefaultRowMerger; import org.apache.fluss.server.kv.rowmerger.RowMerger; import org.apache.fluss.server.kv.snapshot.KvFileHandleAndLocalPath; @@ -119,8 +119,8 @@ public final class KvTablet { // the changelog image mode for this tablet private final ChangelogImage changelogImage; - // RocksDB metrics for this tablet - @Nullable private final RocksDBMetrics rocksDBMetrics; + // RocksDB statistics accessor for this tablet + @Nullable private final RocksDBStatistics rocksDBStatistics; /** * The kv data in pre-write buffer whose log offset is less than the flushedLogOffset has been @@ -147,7 +147,7 @@ private KvTablet( ArrowCompressionInfo arrowCompressionInfo, SchemaGetter schemaGetter, ChangelogImage changelogImage, - @Nullable RocksDBMetrics rocksDBMetrics) { + @Nullable RocksDBStatistics rocksDBStatistics) { this.physicalPath = physicalPath; this.tableBucket = tableBucket; this.logTablet = logTablet; @@ -163,7 +163,7 @@ private KvTablet( this.arrowCompressionInfo = arrowCompressionInfo; this.schemaGetter = schemaGetter; this.changelogImage = changelogImage; - this.rocksDBMetrics = rocksDBMetrics; + this.rocksDBStatistics = rocksDBStatistics; } public static KvTablet create( @@ -184,12 +184,12 @@ public static KvTablet create( throws IOException { RocksDBKv kv = buildRocksDBKv(serverConf, kvTabletDir, sharedRateLimiter); - // Create RocksDB metrics accessor (will be registered to TableMetricGroup by KvManager) + // Create RocksDB statistics accessor (will be registered to TableMetricGroup by Replica) // Pass ResourceGuard to ensure thread-safe access during concurrent close operations // Pass ColumnFamilyHandle for column family specific properties like num-files-at-level0 // Pass Cache for accurate block cache memory tracking - RocksDBMetrics rocksDBMetrics = - new RocksDBMetrics( + RocksDBStatistics rocksDBStatistics = + new RocksDBStatistics( kv.getDb(), kv.getStatistics(), kv.getResourceGuard(), @@ -212,13 +212,13 @@ public static KvTablet create( arrowCompressionInfo, schemaGetter, changelogImage, - rocksDBMetrics); + rocksDBStatistics); } private static RocksDBKv buildRocksDBKv( Configuration configuration, File kvDir, RateLimiter sharedRateLimiter) throws IOException { - // Enable statistics to support RocksDB metrics + // Enable statistics to support RocksDB statistics collection RocksDBResourceContainer rocksDBResourceContainer = new RocksDBResourceContainer(configuration, kvDir, true, sharedRateLimiter); RocksDBKvBuilder rocksDBKvBuilder = @@ -247,13 +247,13 @@ public File getKvTabletDir() { } /** - * Get RocksDB metrics accessor for this tablet. + * Get RocksDB statistics accessor for this tablet. * - * @return the RocksDB metrics accessor, or null if not available + * @return the RocksDB statistics accessor, or null if not available */ @Nullable - public RocksDBMetrics getRocksDBMetrics() { - return rocksDBMetrics; + public RocksDBStatistics getRocksDBStatistics() { + return rocksDBStatistics; } void setFlushedLogOffset(long flushedLogOffset) { diff --git a/fluss-server/src/main/java/org/apache/fluss/server/kv/rocksdb/RocksDBMetrics.java b/fluss-server/src/main/java/org/apache/fluss/server/kv/rocksdb/RocksDBStatistics.java similarity index 94% rename from fluss-server/src/main/java/org/apache/fluss/server/kv/rocksdb/RocksDBMetrics.java rename to fluss-server/src/main/java/org/apache/fluss/server/kv/rocksdb/RocksDBStatistics.java index 03be680ca9..fd7a20cd7e 100644 --- a/fluss-server/src/main/java/org/apache/fluss/server/kv/rocksdb/RocksDBMetrics.java +++ b/fluss-server/src/main/java/org/apache/fluss/server/kv/rocksdb/RocksDBStatistics.java @@ -40,18 +40,20 @@ import java.util.Set; /** - * Manages RocksDB metrics for a single KvTablet. + * Collects and provides access to RocksDB statistics for a single KvTablet. * - *

This class encapsulates all RocksDB-specific metric access logic, providing semantic methods - * to access various RocksDB statistics and properties. Upper layers should use these methods - * instead of directly accessing RocksDB properties with string names. + *

This class encapsulates low-level RocksDB statistics collection, providing semantic methods to + * access various RocksDB statistics and properties. It does NOT register Fluss metrics directly; + * instead, upper layers (e.g., TableMetricGroup) consume these statistics to compute and register + * actual Fluss Metrics. * *

Thread-safety: This class uses RocksDB's ResourceGuard to ensure safe concurrent access. All - * metric read operations acquire the resource guard to prevent accessing closed RocksDB instances. + * statistics read operations acquire the resource guard to prevent accessing closed RocksDB + * instances. */ -public class RocksDBMetrics implements AutoCloseable { +public class RocksDBStatistics implements AutoCloseable { - private static final Logger LOG = LoggerFactory.getLogger(RocksDBMetrics.class); + private static final Logger LOG = LoggerFactory.getLogger(RocksDBStatistics.class); private final RocksDB db; @Nullable private final Statistics statistics; @@ -59,7 +61,7 @@ public class RocksDBMetrics implements AutoCloseable { private final ColumnFamilyHandle defaultColumnFamilyHandle; @Nullable private final Cache blockCache; - public RocksDBMetrics( + public RocksDBStatistics( RocksDB db, @Nullable Statistics statistics, ResourceGuard resourceGuard, @@ -368,7 +370,7 @@ private long getPropertyValue(ColumnFamilyHandle columnFamilyHandle, String prop @Override public void close() { - // No resources to clean up, metrics are managed by TableMetricGroup - LOG.debug("RocksDB metrics accessor closed"); + // No resources to clean up, statistics are managed by TableMetricGroup + LOG.debug("RocksDB statistics accessor closed"); } } diff --git a/fluss-server/src/main/java/org/apache/fluss/server/metrics/group/BucketMetricGroup.java b/fluss-server/src/main/java/org/apache/fluss/server/metrics/group/BucketMetricGroup.java index fc8e281a57..ed5b111019 100644 --- a/fluss-server/src/main/java/org/apache/fluss/server/metrics/group/BucketMetricGroup.java +++ b/fluss-server/src/main/java/org/apache/fluss/server/metrics/group/BucketMetricGroup.java @@ -20,6 +20,11 @@ import org.apache.fluss.metrics.CharacterFilter; import org.apache.fluss.metrics.groups.AbstractMetricGroup; import org.apache.fluss.metrics.registry.MetricRegistry; +import org.apache.fluss.server.kv.rocksdb.RocksDBStatistics; +import org.apache.fluss.utils.IOUtils; + +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; import javax.annotation.Nullable; @@ -27,12 +32,24 @@ import static org.apache.fluss.metrics.utils.MetricGroupUtils.makeScope; -/** Metrics for the table buckets with table as parent group. */ +/** + * Metrics for the table buckets with table as parent group. + * + *

For KV tables, this class also manages the RocksDB statistics lifecycle. The statistics are + * registered when KvTablet is initialized and automatically cleaned up when this metric group is + * closed. + */ public class BucketMetricGroup extends AbstractMetricGroup { + + private static final Logger LOG = LoggerFactory.getLogger(BucketMetricGroup.class); + // will be null if the bucket doesn't belong to a partition private final @Nullable String partitionName; private final int bucket; + // RocksDB statistics for this bucket (null for non-KV tables) + private volatile @Nullable RocksDBStatistics rocksDBStatistics; + public BucketMetricGroup( MetricRegistry registry, @Nullable String partitionName, @@ -62,4 +79,56 @@ protected String getGroupName(CharacterFilter filter) { public TableMetricGroup getTableMetricGroup() { return (TableMetricGroup) parent; } + + /** + * Register RocksDB statistics for this bucket. This should be called when KvTablet is + * initialized. + * + *

This method must be paired with {@link #unregisterRocksDBStatistics()} to ensure proper + * resource cleanup. + * + * @param statistics the RocksDB statistics collector + */ + public void registerRocksDBStatistics(RocksDBStatistics statistics) { + if (this.rocksDBStatistics != null) { + LOG.warn( + "RocksDB statistics already registered for bucket {}, this may indicate a resource leak", + bucket); + } + this.rocksDBStatistics = statistics; + LOG.debug("Registered RocksDB statistics for bucket {}", bucket); + } + + /** + * Unregister and close RocksDB statistics for this bucket. This should be called when KvTablet + * is destroyed. + * + *

This method must be paired with {@link #registerRocksDBStatistics(RocksDBStatistics)} to + * ensure proper resource cleanup. + */ + public void unregisterRocksDBStatistics() { + if (rocksDBStatistics != null) { + LOG.debug("Unregistering RocksDB statistics for bucket {}", bucket); + IOUtils.closeQuietly(rocksDBStatistics); + rocksDBStatistics = null; + } + } + + /** + * Get the RocksDB statistics for this bucket. + * + * @return the RocksDB statistics, or null if not a KV table or not yet initialized + */ + @Nullable + public RocksDBStatistics getRocksDBStatistics() { + return rocksDBStatistics; + } + + @Override + public void close() { + // Clean up RocksDB statistics before closing the metric group + // This handles the case when the bucket is removed entirely + unregisterRocksDBStatistics(); + super.close(); + } } diff --git a/fluss-server/src/main/java/org/apache/fluss/server/metrics/group/TableMetricGroup.java b/fluss-server/src/main/java/org/apache/fluss/server/metrics/group/TableMetricGroup.java index e8f4594a73..fc40463f2d 100644 --- a/fluss-server/src/main/java/org/apache/fluss/server/metrics/group/TableMetricGroup.java +++ b/fluss-server/src/main/java/org/apache/fluss/server/metrics/group/TableMetricGroup.java @@ -27,12 +27,13 @@ import org.apache.fluss.metrics.ThreadSafeSimpleCounter; import org.apache.fluss.metrics.groups.AbstractMetricGroup; import org.apache.fluss.metrics.registry.MetricRegistry; -import org.apache.fluss.server.kv.rocksdb.RocksDBMetrics; +import org.apache.fluss.server.kv.rocksdb.RocksDBStatistics; +import org.apache.fluss.utils.MapUtils; import javax.annotation.Nullable; -import java.util.HashMap; import java.util.Map; +import java.util.stream.Stream; import static org.apache.fluss.metrics.utils.MetricGroupUtils.makeScope; @@ -42,11 +43,7 @@ */ public class TableMetricGroup extends AbstractMetricGroup { - private final Map buckets = new HashMap<>(); - - // Directly manage RocksDB metrics for aggregation - // This is cleaner than passing through BucketMetricGroup - private final Map rocksDBMetricsMap = new HashMap<>(); + private final Map buckets = MapUtils.newConcurrentHashMap(); private final TablePath tablePath; @@ -244,44 +241,9 @@ public BucketMetricGroup addBucketMetricGroup( public void removeBucketMetricGroup(TableBucket tableBucket) { BucketMetricGroup metricGroup = buckets.remove(tableBucket); if (metricGroup != null) { + // BucketMetricGroup.close() will automatically clean up RocksDB statistics metricGroup.close(); } - // Also remove RocksDB metrics if exists - RocksDBMetrics rocksDBMetrics = rocksDBMetricsMap.remove(tableBucket); - if (rocksDBMetrics != null) { - try { - rocksDBMetrics.close(); - } catch (Exception e) { - // Ignore close errors - } - } - } - - /** - * Register RocksDB metrics for a bucket. This allows table-level aggregation without - * registering bucket-level metrics. - * - * @param tableBucket the table bucket - * @param rocksDBMetrics the RocksDB metrics accessor - */ - public void registerRocksDBMetrics(TableBucket tableBucket, RocksDBMetrics rocksDBMetrics) { - rocksDBMetricsMap.put(tableBucket, rocksDBMetrics); - } - - /** - * Unregister RocksDB metrics for a bucket. - * - * @param tableBucket the table bucket - */ - public void unregisterRocksDBMetrics(TableBucket tableBucket) { - RocksDBMetrics rocksDBMetrics = rocksDBMetricsMap.remove(tableBucket); - if (rocksDBMetrics != null) { - try { - rocksDBMetrics.close(); - } catch (Exception e) { - // Ignore close errors - } - } } public int bucketGroupsCount() { @@ -293,12 +255,18 @@ public java.util.Collection getBucketMetricGroups() { } /** - * Get the RocksDB metrics map for server-level aggregation. + * Get all RocksDB statistics from bucket metric groups for table-level and server-level + * aggregation. * - * @return the map of RocksDB metrics + *

This method dynamically collects statistics from all buckets, allowing automatic cleanup + * when buckets are removed without maintaining a separate map. + * + * @return stream of RocksDB statistics from all buckets in this table */ - public Map getRocksDBMetricsMap() { - return rocksDBMetricsMap; + public Stream allRocksDBStatistics() { + return buckets.values().stream() + .map(BucketMetricGroup::getRocksDBStatistics) + .filter(stats -> stats != null); } public TabletServerMetricGroup getServerMetricGroup() { @@ -316,83 +284,77 @@ private void registerRocksDBMetrics() { gauge( MetricNames.ROCKSDB_WRITE_STALL_MICROS_MAX, () -> - rocksDBMetricsMap.values().stream() - .mapToLong(RocksDBMetrics::getWriteStallMicros) + allRocksDBStatistics() + .mapToLong(RocksDBStatistics::getWriteStallMicros) .max() .orElse(0L)); gauge( MetricNames.ROCKSDB_GET_LATENCY_MICROS_MAX, () -> - rocksDBMetricsMap.values().stream() - .mapToLong(RocksDBMetrics::getGetLatencyMicros) + allRocksDBStatistics() + .mapToLong(RocksDBStatistics::getGetLatencyMicros) .max() .orElse(0L)); gauge( MetricNames.ROCKSDB_WRITE_LATENCY_MICROS_MAX, () -> - rocksDBMetricsMap.values().stream() - .mapToLong(RocksDBMetrics::getWriteLatencyMicros) + allRocksDBStatistics() + .mapToLong(RocksDBStatistics::getWriteLatencyMicros) .max() .orElse(0L)); gauge( MetricNames.ROCKSDB_NUM_FILES_AT_LEVEL0_MAX, () -> - rocksDBMetricsMap.values().stream() - .mapToLong(RocksDBMetrics::getNumFilesAtLevel0) + allRocksDBStatistics() + .mapToLong(RocksDBStatistics::getNumFilesAtLevel0) .max() .orElse(0L)); gauge( MetricNames.ROCKSDB_FLUSH_PENDING_MAX, () -> - rocksDBMetricsMap.values().stream() - .mapToLong(RocksDBMetrics::getFlushPending) + allRocksDBStatistics() + .mapToLong(RocksDBStatistics::getFlushPending) .max() .orElse(0L)); gauge( MetricNames.ROCKSDB_COMPACTION_PENDING_MAX, () -> - rocksDBMetricsMap.values().stream() - .mapToLong(RocksDBMetrics::getCompactionPending) + allRocksDBStatistics() + .mapToLong(RocksDBStatistics::getCompactionPending) .max() .orElse(0L)); gauge( MetricNames.ROCKSDB_COMPACTION_TIME_MICROS_MAX, () -> - rocksDBMetricsMap.values().stream() - .mapToLong(RocksDBMetrics::getCompactionTimeMicros) + allRocksDBStatistics() + .mapToLong(RocksDBStatistics::getCompactionTimeMicros) .max() .orElse(0L)); // Sum aggregation metrics - track the total value across all buckets gauge( MetricNames.ROCKSDB_BYTES_READ_TOTAL, - () -> - rocksDBMetricsMap.values().stream() - .mapToLong(RocksDBMetrics::getBytesRead) - .sum()); + () -> allRocksDBStatistics().mapToLong(RocksDBStatistics::getBytesRead).sum()); gauge( MetricNames.ROCKSDB_BYTES_WRITTEN_TOTAL, - () -> - rocksDBMetricsMap.values().stream() - .mapToLong(RocksDBMetrics::getBytesWritten) - .sum()); + () -> allRocksDBStatistics().mapToLong(RocksDBStatistics::getBytesWritten).sum()); gauge( MetricNames.ROCKSDB_FLUSH_BYTES_WRITTEN_TOTAL, () -> - rocksDBMetricsMap.values().stream() - .mapToLong(RocksDBMetrics::getFlushBytesWritten) + allRocksDBStatistics() + .mapToLong(RocksDBStatistics::getFlushBytesWritten) .sum()); gauge( MetricNames.ROCKSDB_COMPACTION_BYTES_READ_TOTAL, () -> - rocksDBMetricsMap.values().stream() - .mapToLong(RocksDBMetrics::getCompactionBytesRead) + allRocksDBStatistics() + .mapToLong(RocksDBStatistics::getCompactionBytesRead) .sum()); gauge( MetricNames.ROCKSDB_COMPACTION_BYTES_WRITTEN_TOTAL, () -> - rocksDBMetricsMap.values().stream() - .mapToLong(RocksDBMetrics::getCompactionBytesWritten) + allRocksDBStatistics() + .mapToLong(RocksDBStatistics::getCompactionBytesWritten) .sum()); } diff --git a/fluss-server/src/main/java/org/apache/fluss/server/metrics/group/TabletServerMetricGroup.java b/fluss-server/src/main/java/org/apache/fluss/server/metrics/group/TabletServerMetricGroup.java index f1dd4c3b3b..f4b2c6b730 100644 --- a/fluss-server/src/main/java/org/apache/fluss/server/metrics/group/TabletServerMetricGroup.java +++ b/fluss-server/src/main/java/org/apache/fluss/server/metrics/group/TabletServerMetricGroup.java @@ -30,19 +30,14 @@ import org.apache.fluss.metrics.ThreadSafeSimpleCounter; import org.apache.fluss.metrics.groups.AbstractMetricGroup; import org.apache.fluss.metrics.registry.MetricRegistry; -import org.apache.fluss.server.kv.rocksdb.RocksDBMetrics; +import org.apache.fluss.server.kv.rocksdb.RocksDBStatistics; import org.apache.fluss.utils.MapUtils; -import org.slf4j.Logger; -import org.slf4j.LoggerFactory; - import java.util.Map; /** The metric group for tablet server. */ public class TabletServerMetricGroup extends AbstractMetricGroup { - private static final Logger LOG = LoggerFactory.getLogger(TabletServerMetricGroup.class); - private static final String NAME = "tabletserver"; private static final int WINDOW_SIZE = 1024; @@ -149,13 +144,13 @@ public TabletServerMetricGroup( * all tables. */ private void registerServerRocksDBMetrics() { - // Total memory usage across all RocksDB instances in this server + // Total memory usage across all RocksDB instances in this server. gauge( MetricNames.ROCKSDB_MEMORY_USAGE_TOTAL, () -> metricGroupByTable.values().stream() - .flatMap(table -> table.getRocksDBMetricsMap().values().stream()) - .mapToLong(RocksDBMetrics::getTotalMemoryUsage) + .flatMap(TableMetricGroup::allRocksDBStatistics) + .mapToLong(RocksDBStatistics::getTotalMemoryUsage) .sum()); } @@ -258,39 +253,6 @@ public BucketMetricGroup addTableBucketMetricGroup( return tableMetricGroup.addBucketMetricGroup(physicalTablePath.getPartitionName(), bucket); } - /** - * Register RocksDB metrics for a specific table bucket. This method ensures the - * TableMetricGroup exists (creates it if necessary) before registering the RocksDB metrics. - * - *

This is an idempotent operation - calling it multiple times with the same parameters is - * safe. - * - * @param tablePath the table path - * @param tableBucket the table bucket - * @param rocksDBMetrics the RocksDB metrics to register - */ - public void registerRocksDBMetrics( - TablePath tablePath, TableBucket tableBucket, RocksDBMetrics rocksDBMetrics) { - // Ensure TableMetricGroup exists (idempotent creation) - TableMetricGroup tableMetricGroup = - metricGroupByTable.computeIfAbsent( - tablePath, table -> new TableMetricGroup(registry, tablePath, true, this)); - tableMetricGroup.registerRocksDBMetrics(tableBucket, rocksDBMetrics); - } - - /** - * Unregister RocksDB metrics for a specific table bucket. - * - * @param tablePath the table path - * @param tableBucket the table bucket - */ - public void unregisterRocksDBMetrics(TablePath tablePath, TableBucket tableBucket) { - TableMetricGroup tableMetricGroup = metricGroupByTable.get(tablePath); - if (tableMetricGroup != null) { - tableMetricGroup.unregisterRocksDBMetrics(tableBucket); - } - } - public void removeTableBucketMetricGroup(TablePath tablePath, TableBucket bucket) { // get the metric group of the table TableMetricGroup tableMetricGroup = metricGroupByTable.get(tablePath); diff --git a/fluss-server/src/main/java/org/apache/fluss/server/replica/Replica.java b/fluss-server/src/main/java/org/apache/fluss/server/replica/Replica.java index 1669e004df..f86778ebc3 100644 --- a/fluss-server/src/main/java/org/apache/fluss/server/replica/Replica.java +++ b/fluss-server/src/main/java/org/apache/fluss/server/replica/Replica.java @@ -596,6 +596,10 @@ private void dropKv() { IOUtils.closeQuietly(closeableRegistryForKv); } if (kvTablet != null) { + // Unregister RocksDB statistics before dropping KvTablet + // This ensures statistics are cleaned up when KvTablet is destroyed + bucketMetricGroup.unregisterRocksDBStatistics(); + // drop the kv tablet checkNotNull(kvManager); kvManager.dropKv(tableBucket); @@ -689,6 +693,12 @@ private Optional initKvTablet() { physicalPath, tableBucket, endTime - startTime); + + // Register RocksDB statistics to BucketMetricGroup + if (kvTablet != null && kvTablet.getRocksDBStatistics() != null) { + bucketMetricGroup.registerRocksDBStatistics(kvTablet.getRocksDBStatistics()); + } + return optCompletedSnapshot; } diff --git a/fluss-server/src/test/java/org/apache/fluss/server/kv/KvTabletTest.java b/fluss-server/src/test/java/org/apache/fluss/server/kv/KvTabletTest.java index c3b054dcaa..dcafd5c3e4 100644 --- a/fluss-server/src/test/java/org/apache/fluss/server/kv/KvTabletTest.java +++ b/fluss-server/src/test/java/org/apache/fluss/server/kv/KvTabletTest.java @@ -47,7 +47,7 @@ import org.apache.fluss.server.kv.prewrite.KvPreWriteBuffer.Key; import org.apache.fluss.server.kv.prewrite.KvPreWriteBuffer.KvEntry; import org.apache.fluss.server.kv.prewrite.KvPreWriteBuffer.Value; -import org.apache.fluss.server.kv.rocksdb.RocksDBMetrics; +import org.apache.fluss.server.kv.rocksdb.RocksDBStatistics; import org.apache.fluss.server.kv.rowmerger.RowMerger; import org.apache.fluss.server.log.FetchIsolation; import org.apache.fluss.server.log.LogAppendInfo; @@ -1249,24 +1249,25 @@ void testRocksDBMetrics() throws Exception { // Initialize tablet with schema initLogTabletAndKvTablet(DATA1_SCHEMA_PK, new HashMap<>()); - // Get RocksDB metrics - RocksDBMetrics metrics = kvTablet.getRocksDBMetrics(); - assertThat(metrics).as("RocksDB metrics should be available").isNotNull(); + // Get RocksDB statistics + RocksDBStatistics statistics = kvTablet.getRocksDBStatistics(); + assertThat(statistics).as("RocksDB statistics should be available").isNotNull(); // Verify statistics is properly initialized org.rocksdb.Statistics stats = kvTablet.getRocksDBKv().getStatistics(); assertThat(stats).as("RocksDB Statistics should be enabled").isNotNull(); // All metrics should start at 0 for a fresh database - assertThat(metrics.getBytesWritten()).isEqualTo(0); - assertThat(metrics.getBytesRead()).isEqualTo(0); - assertThat(metrics.getFlushBytesWritten()).isEqualTo(0); - assertThat(metrics.getWriteLatencyMicros()).isEqualTo(0); - assertThat(metrics.getGetLatencyMicros()).isEqualTo(0); - assertThat(metrics.getNumFilesAtLevel0()).isEqualTo(0); - assertThat(metrics.getFlushPending()).isEqualTo(0); - assertThat(metrics.getCompactionPending()).isEqualTo(0); - assertThat(metrics.getTotalMemoryUsage()).isGreaterThan(0); // Block cache is pre-allocated + assertThat(statistics.getBytesWritten()).isEqualTo(0); + assertThat(statistics.getBytesRead()).isEqualTo(0); + assertThat(statistics.getFlushBytesWritten()).isEqualTo(0); + assertThat(statistics.getWriteLatencyMicros()).isEqualTo(0); + assertThat(statistics.getGetLatencyMicros()).isEqualTo(0); + assertThat(statistics.getNumFilesAtLevel0()).isEqualTo(0); + assertThat(statistics.getFlushPending()).isEqualTo(0); + assertThat(statistics.getCompactionPending()).isEqualTo(0); + assertThat(statistics.getTotalMemoryUsage()) + .isGreaterThan(0); // Block cache is pre-allocated // ========== Phase 1: Write and Flush ========== int numRecords = 10000; @@ -1280,32 +1281,34 @@ void testRocksDBMetrics() throws Exception { kvTablet.flush(Long.MAX_VALUE, NOPErrorHandler.INSTANCE); // After write and flush: must have written data to RocksDB - long bytesWrittenAfterFlush = metrics.getBytesWritten(); + long bytesWrittenAfterFlush = statistics.getBytesWritten(); assertThat(bytesWrittenAfterFlush) .as("Must write data to RocksDB after flush") .isGreaterThan(0); // Flush must have written bytes (memtable to SST file) - long flushBytesWritten = metrics.getFlushBytesWritten(); + long flushBytesWritten = statistics.getFlushBytesWritten(); // Note: FLUSH_WRITE_BYTES may not be tracked in all configurations assertThat(flushBytesWritten) .as("Flush bytes written is non-negative") .isGreaterThanOrEqualTo(0); // Write latency must be tracked for write operations - long writeLatency = metrics.getWriteLatencyMicros(); + long writeLatency = statistics.getWriteLatencyMicros(); assertThat(writeLatency) .as("Write latency must be > 0 after write operations") .isGreaterThan(0); // After flush, there should be at least 1 L0 file (unless immediate compaction occurred) - long numL0Files = metrics.getNumFilesAtLevel0(); + long numL0Files = statistics.getNumFilesAtLevel0(); assertThat(numL0Files) .as("Should have L0 files after flush (or 0 if compacted)") .isGreaterThanOrEqualTo(0); // Flush pending must be 0 after flush completes - assertThat(metrics.getFlushPending()).as("No pending flush after completion").isEqualTo(0); + assertThat(statistics.getFlushPending()) + .as("No pending flush after completion") + .isEqualTo(0); // ========== Phase 2: Read Operations ========== List keysToRead = new ArrayList<>(); @@ -1316,13 +1319,13 @@ void testRocksDBMetrics() throws Exception { assertThat(readValues).hasSize(100); // After reads: get latency must be tracked - long getLatency = metrics.getGetLatencyMicros(); + long getLatency = statistics.getGetLatencyMicros(); assertThat(getLatency) .as("Get latency must be tracked after read operations") .isGreaterThan(0); // Bytes read may increase (depending on cache hits) - long bytesRead = metrics.getBytesRead(); + long bytesRead = statistics.getBytesRead(); // Note: bytesRead could be 0 if all data was served from block cache assertThat(bytesRead).as("Bytes read is non-negative").isGreaterThanOrEqualTo(0); @@ -1337,15 +1340,15 @@ void testRocksDBMetrics() throws Exception { kvTablet.flush(Long.MAX_VALUE, NOPErrorHandler.INSTANCE); // Bytes written must increase with more data - long bytesWrittenAfterSecondFlush = metrics.getBytesWritten(); + long bytesWrittenAfterSecondFlush = statistics.getBytesWritten(); assertThat(bytesWrittenAfterSecondFlush) .as("Bytes written must increase with second batch") .isGreaterThan(bytesWrittenAfterFlush); // ========== Phase 4: Manual Compaction ========== - long compactionBytesReadBefore = metrics.getCompactionBytesRead(); - long compactionBytesWrittenBefore = metrics.getCompactionBytesWritten(); - long compactionTimeBefore = metrics.getCompactionTimeMicros(); + long compactionBytesReadBefore = statistics.getCompactionBytesRead(); + long compactionBytesWrittenBefore = statistics.getCompactionBytesWritten(); + long compactionTimeBefore = statistics.getCompactionTimeMicros(); // Trigger manual compaction try { @@ -1355,9 +1358,9 @@ void testRocksDBMetrics() throws Exception { } // After compaction: verify compaction metrics increased - long compactionBytesReadAfter = metrics.getCompactionBytesRead(); - long compactionBytesWrittenAfter = metrics.getCompactionBytesWritten(); - long compactionTimeAfter = metrics.getCompactionTimeMicros(); + long compactionBytesReadAfter = statistics.getCompactionBytesRead(); + long compactionBytesWrittenAfter = statistics.getCompactionBytesWritten(); + long compactionTimeAfter = statistics.getCompactionTimeMicros(); // If any compaction occurred, all three metrics should increase boolean compactionOccurred = @@ -1378,30 +1381,30 @@ void testRocksDBMetrics() throws Exception { } // Compaction pending must be 0 after compaction completes - assertThat(metrics.getCompactionPending()) + assertThat(statistics.getCompactionPending()) .as("No pending compaction after completion") .isEqualTo(0); // ========== Phase 5: Verify Final State Before Close ========== // Bytes written must be positive after all operations - assertThat(metrics.getBytesWritten()) + assertThat(statistics.getBytesWritten()) .as("Total bytes written must be positive") .isGreaterThan(0); // Write and get latency must be positive (operations occurred) - assertThat(metrics.getWriteLatencyMicros()) + assertThat(statistics.getWriteLatencyMicros()) .as("Write latency must be positive after writes") .isGreaterThan(0); - assertThat(metrics.getGetLatencyMicros()) + assertThat(statistics.getGetLatencyMicros()) .as("Get latency must be positive after reads") .isGreaterThan(0); // No pending operations - assertThat(metrics.getFlushPending()).isEqualTo(0); - assertThat(metrics.getCompactionPending()).isEqualTo(0); + assertThat(statistics.getFlushPending()).isEqualTo(0); + assertThat(statistics.getCompactionPending()).isEqualTo(0); // Memory usage should be reasonable (> 0 due to block cache + memtables) - assertThat(metrics.getTotalMemoryUsage()) + assertThat(statistics.getTotalMemoryUsage()) .as("Total memory usage must be positive") .isGreaterThan(0); @@ -1409,17 +1412,17 @@ void testRocksDBMetrics() throws Exception { kvTablet.close(); // After close: all metrics must return 0 (ResourceGuard protection) - assertThat(metrics.getBytesWritten()).isEqualTo(0); - assertThat(metrics.getBytesRead()).isEqualTo(0); - assertThat(metrics.getFlushBytesWritten()).isEqualTo(0); - assertThat(metrics.getWriteLatencyMicros()).isEqualTo(0); - assertThat(metrics.getGetLatencyMicros()).isEqualTo(0); - assertThat(metrics.getNumFilesAtLevel0()).isEqualTo(0); - assertThat(metrics.getFlushPending()).isEqualTo(0); - assertThat(metrics.getCompactionPending()).isEqualTo(0); - assertThat(metrics.getCompactionBytesRead()).isEqualTo(0); - assertThat(metrics.getCompactionBytesWritten()).isEqualTo(0); - assertThat(metrics.getCompactionTimeMicros()).isEqualTo(0); - assertThat(metrics.getTotalMemoryUsage()).isEqualTo(0); + assertThat(statistics.getBytesWritten()).isEqualTo(0); + assertThat(statistics.getBytesRead()).isEqualTo(0); + assertThat(statistics.getFlushBytesWritten()).isEqualTo(0); + assertThat(statistics.getWriteLatencyMicros()).isEqualTo(0); + assertThat(statistics.getGetLatencyMicros()).isEqualTo(0); + assertThat(statistics.getNumFilesAtLevel0()).isEqualTo(0); + assertThat(statistics.getFlushPending()).isEqualTo(0); + assertThat(statistics.getCompactionPending()).isEqualTo(0); + assertThat(statistics.getCompactionBytesRead()).isEqualTo(0); + assertThat(statistics.getCompactionBytesWritten()).isEqualTo(0); + assertThat(statistics.getCompactionTimeMicros()).isEqualTo(0); + assertThat(statistics.getTotalMemoryUsage()).isEqualTo(0); } }