diff --git a/fluss-common/src/main/java/org/apache/fluss/metrics/MetricNames.java b/fluss-common/src/main/java/org/apache/fluss/metrics/MetricNames.java index b1326d46bc..60eb942e8b 100644 --- a/fluss-common/src/main/java/org/apache/fluss/metrics/MetricNames.java +++ b/fluss-common/src/main/java/org/apache/fluss/metrics/MetricNames.java @@ -131,6 +131,54 @@ public class MetricNames { public static final String KV_PRE_WRITE_BUFFER_TRUNCATE_AS_ERROR_RATE = "preWriteBufferTruncateAsErrorPerSecond"; + // -------------------------------------------------------------------------------------------- + // RocksDB metrics + // -------------------------------------------------------------------------------------------- + // Table-level RocksDB metrics (aggregated from all buckets of a table, Max aggregation) + /** Maximum write stall duration across all buckets of this table (Max aggregation). */ + public static final String ROCKSDB_WRITE_STALL_MICROS_MAX = "rocksdbWriteStallMicrosMax"; + + /** Maximum get latency across all buckets of this table (Max aggregation). */ + public static final String ROCKSDB_GET_LATENCY_MICROS_MAX = "rocksdbGetLatencyMicrosMax"; + + /** Maximum write latency across all buckets of this table (Max aggregation). */ + public static final String ROCKSDB_WRITE_LATENCY_MICROS_MAX = "rocksdbWriteLatencyMicrosMax"; + + /** Maximum number of L0 files across all buckets of this table (Max aggregation). */ + public static final String ROCKSDB_NUM_FILES_AT_LEVEL0_MAX = "rocksdbNumFilesAtLevel0Max"; + + /** Maximum flush pending indicator across all buckets of this table (Max aggregation). */ + public static final String ROCKSDB_FLUSH_PENDING_MAX = "rocksdbFlushPendingMax"; + + /** Maximum compaction pending indicator across all buckets of this table (Max aggregation). */ + public static final String ROCKSDB_COMPACTION_PENDING_MAX = "rocksdbCompactionPendingMax"; + + /** Maximum compaction time across all buckets of this table (Max aggregation). */ + public static final String ROCKSDB_COMPACTION_TIME_MICROS_MAX = + "rocksdbCompactionTimeMicrosMax"; + + // Table-level RocksDB metrics (aggregated from all buckets of a table, Sum aggregation) + /** Total bytes read across all buckets of this table (Sum aggregation). */ + public static final String ROCKSDB_BYTES_READ_TOTAL = "rocksdbBytesReadTotal"; + + /** Total bytes written across all buckets of this table (Sum aggregation). */ + public static final String ROCKSDB_BYTES_WRITTEN_TOTAL = "rocksdbBytesWrittenTotal"; + + /** Total flush bytes written across all buckets of this table (Sum aggregation). */ + public static final String ROCKSDB_FLUSH_BYTES_WRITTEN_TOTAL = "rocksdbFlushBytesWrittenTotal"; + + /** Total compaction bytes read across all buckets of this table (Sum aggregation). */ + public static final String ROCKSDB_COMPACTION_BYTES_READ_TOTAL = + "rocksdbCompactionBytesReadTotal"; + + /** Total compaction bytes written across all buckets of this table (Sum aggregation). */ + public static final String ROCKSDB_COMPACTION_BYTES_WRITTEN_TOTAL = + "rocksdbCompactionBytesWrittenTotal"; + + // Server-level RocksDB metrics (aggregated from all tables, Sum aggregation) + /** Total memory usage across all RocksDB instances in this server (Sum aggregation). */ + public static final String ROCKSDB_MEMORY_USAGE_TOTAL = "rocksdbMemoryUsageTotal"; + // -------------------------------------------------------------------------------------------- // metrics for table bucket // -------------------------------------------------------------------------------------------- diff --git a/fluss-server/src/main/java/org/apache/fluss/server/kv/KvManager.java b/fluss-server/src/main/java/org/apache/fluss/server/kv/KvManager.java index 4353530903..c5be3693e1 100644 --- a/fluss-server/src/main/java/org/apache/fluss/server/kv/KvManager.java +++ b/fluss-server/src/main/java/org/apache/fluss/server/kv/KvManager.java @@ -386,6 +386,7 @@ public KvTablet loadKv(File tabletDir, SchemaGetter schemaGetter) throws Excepti currentKvs.get(tableBucket).getKvTabletDir().getAbsolutePath())); } this.currentKvs.put(tableBucket, kvTablet); + return kvTablet; } diff --git a/fluss-server/src/main/java/org/apache/fluss/server/kv/KvTablet.java b/fluss-server/src/main/java/org/apache/fluss/server/kv/KvTablet.java index d6cc086664..0b43f0a316 100644 --- a/fluss-server/src/main/java/org/apache/fluss/server/kv/KvTablet.java +++ b/fluss-server/src/main/java/org/apache/fluss/server/kv/KvTablet.java @@ -50,6 +50,7 @@ import org.apache.fluss.server.kv.rocksdb.RocksDBKv; import org.apache.fluss.server.kv.rocksdb.RocksDBKvBuilder; import org.apache.fluss.server.kv.rocksdb.RocksDBResourceContainer; +import org.apache.fluss.server.kv.rocksdb.RocksDBStatistics; import org.apache.fluss.server.kv.rowmerger.DefaultRowMerger; import org.apache.fluss.server.kv.rowmerger.RowMerger; import org.apache.fluss.server.kv.snapshot.KvFileHandleAndLocalPath; @@ -118,6 +119,9 @@ public final class KvTablet { // the changelog image mode for this tablet private final ChangelogImage changelogImage; + // RocksDB statistics accessor for this tablet + @Nullable private final RocksDBStatistics rocksDBStatistics; + /** * The kv data in pre-write buffer whose log offset is less than the flushedLogOffset has been * flushed into kv. @@ -142,7 +146,8 @@ private KvTablet( RowMerger rowMerger, ArrowCompressionInfo arrowCompressionInfo, SchemaGetter schemaGetter, - ChangelogImage changelogImage) { + ChangelogImage changelogImage, + @Nullable RocksDBStatistics rocksDBStatistics) { this.physicalPath = physicalPath; this.tableBucket = tableBucket; this.logTablet = logTablet; @@ -158,6 +163,7 @@ private KvTablet( this.arrowCompressionInfo = arrowCompressionInfo; this.schemaGetter = schemaGetter; this.changelogImage = changelogImage; + this.rocksDBStatistics = rocksDBStatistics; } public static KvTablet create( @@ -177,6 +183,19 @@ public static KvTablet create( RateLimiter sharedRateLimiter) throws IOException { RocksDBKv kv = buildRocksDBKv(serverConf, kvTabletDir, sharedRateLimiter); + + // Create RocksDB statistics accessor (will be registered to TableMetricGroup by Replica) + // Pass ResourceGuard to ensure thread-safe access during concurrent close operations + // Pass ColumnFamilyHandle for column family specific properties like num-files-at-level0 + // Pass Cache for accurate block cache memory tracking + RocksDBStatistics rocksDBStatistics = + new RocksDBStatistics( + kv.getDb(), + kv.getStatistics(), + kv.getResourceGuard(), + kv.getDefaultColumnFamilyHandle(), + kv.getBlockCache()); + return new KvTablet( tablePath, tableBucket, @@ -192,14 +211,16 @@ public static KvTablet create( rowMerger, arrowCompressionInfo, schemaGetter, - changelogImage); + changelogImage, + rocksDBStatistics); } private static RocksDBKv buildRocksDBKv( Configuration configuration, File kvDir, RateLimiter sharedRateLimiter) throws IOException { + // Enable statistics to support RocksDB statistics collection RocksDBResourceContainer rocksDBResourceContainer = - new RocksDBResourceContainer(configuration, kvDir, false, sharedRateLimiter); + new RocksDBResourceContainer(configuration, kvDir, true, sharedRateLimiter); RocksDBKvBuilder rocksDBKvBuilder = new RocksDBKvBuilder( kvDir, @@ -225,6 +246,16 @@ public File getKvTabletDir() { return kvTabletDir; } + /** + * Get RocksDB statistics accessor for this tablet. + * + * @return the RocksDB statistics accessor, or null if not available + */ + @Nullable + public RocksDBStatistics getRocksDBStatistics() { + return rocksDBStatistics; + } + void setFlushedLogOffset(long flushedLogOffset) { this.flushedLogOffset = flushedLogOffset; } @@ -621,6 +652,8 @@ public void close() throws Exception { if (isClosed) { return; } + // Note: RocksDB metrics lifecycle is managed by TableMetricGroup + // No need to close it here if (rocksDBKv != null) { rocksDBKv.close(); } diff --git a/fluss-server/src/main/java/org/apache/fluss/server/kv/rocksdb/RocksDBKv.java b/fluss-server/src/main/java/org/apache/fluss/server/kv/rocksdb/RocksDBKv.java index 602a4b5910..6e45b2396a 100644 --- a/fluss-server/src/main/java/org/apache/fluss/server/kv/rocksdb/RocksDBKv.java +++ b/fluss-server/src/main/java/org/apache/fluss/server/kv/rocksdb/RocksDBKv.java @@ -23,12 +23,14 @@ import org.apache.fluss.utils.BytesUtils; import org.apache.fluss.utils.IOUtils; +import org.rocksdb.Cache; import org.rocksdb.ColumnFamilyHandle; import org.rocksdb.ColumnFamilyOptions; import org.rocksdb.ReadOptions; import org.rocksdb.RocksDB; import org.rocksdb.RocksDBException; import org.rocksdb.RocksIterator; +import org.rocksdb.Statistics; import org.rocksdb.WriteOptions; import javax.annotation.Nullable; @@ -63,6 +65,9 @@ public class RocksDBKv implements AutoCloseable { /** Our RocksDB database. Currently, one kv tablet, one RocksDB instance. */ protected final RocksDB db; + /** RocksDB Statistics for metrics collection. */ + private final @Nullable Statistics statistics; + // mark whether this kv is already closed and prevent duplicate closing private volatile boolean closed = false; @@ -70,12 +75,14 @@ public RocksDBKv( RocksDBResourceContainer optionsContainer, RocksDB db, ResourceGuard rocksDBResourceGuard, - ColumnFamilyHandle defaultColumnFamilyHandle) { + ColumnFamilyHandle defaultColumnFamilyHandle, + @Nullable Statistics statistics) { this.optionsContainer = optionsContainer; this.db = db; this.rocksDBResourceGuard = rocksDBResourceGuard; this.writeOptions = optionsContainer.getWriteOptions(); this.defaultColumnFamilyHandle = defaultColumnFamilyHandle; + this.statistics = statistics; } public ResourceGuard getResourceGuard() { @@ -206,4 +213,18 @@ public void close() throws Exception { public RocksDB getDb() { return db; } + + @Nullable + public Statistics getStatistics() { + return optionsContainer.getStatistics(); + } + + @Nullable + public Cache getBlockCache() { + return optionsContainer.getBlockCache(); + } + + public ColumnFamilyHandle getDefaultColumnFamilyHandle() { + return defaultColumnFamilyHandle; + } } diff --git a/fluss-server/src/main/java/org/apache/fluss/server/kv/rocksdb/RocksDBKvBuilder.java b/fluss-server/src/main/java/org/apache/fluss/server/kv/rocksdb/RocksDBKvBuilder.java index 8fbc0cf954..92e86fb964 100644 --- a/fluss-server/src/main/java/org/apache/fluss/server/kv/rocksdb/RocksDBKvBuilder.java +++ b/fluss-server/src/main/java/org/apache/fluss/server/kv/rocksdb/RocksDBKvBuilder.java @@ -107,7 +107,12 @@ public RocksDBKv build() throws KvBuildingException { throw new KvBuildingException(errMsg, t); } LOG.info("Finished building RocksDB kv at {}.", instanceBasePath); - return new RocksDBKv(optionsContainer, db, rocksDBResourceGuard, defaultColumnFamilyHandle); + return new RocksDBKv( + optionsContainer, + db, + rocksDBResourceGuard, + defaultColumnFamilyHandle, + optionsContainer.getStatistics()); } void prepareDirectories() throws IOException { diff --git a/fluss-server/src/main/java/org/apache/fluss/server/kv/rocksdb/RocksDBResourceContainer.java b/fluss-server/src/main/java/org/apache/fluss/server/kv/rocksdb/RocksDBResourceContainer.java index 4495613e65..a07ea1a918 100644 --- a/fluss-server/src/main/java/org/apache/fluss/server/kv/rocksdb/RocksDBResourceContainer.java +++ b/fluss-server/src/main/java/org/apache/fluss/server/kv/rocksdb/RocksDBResourceContainer.java @@ -28,11 +28,13 @@ import org.rocksdb.BlockBasedTableConfig; import org.rocksdb.BloomFilter; +import org.rocksdb.Cache; import org.rocksdb.ColumnFamilyOptions; import org.rocksdb.CompactionStyle; import org.rocksdb.CompressionType; import org.rocksdb.DBOptions; import org.rocksdb.InfoLogLevel; +import org.rocksdb.LRUCache; import org.rocksdb.PlainTableConfig; import org.rocksdb.RateLimiter; import org.rocksdb.ReadOptions; @@ -80,6 +82,12 @@ public class RocksDBResourceContainer implements AutoCloseable { /** The shared rate limiter for all RocksDB instances. */ private final RateLimiter sharedRateLimiter; + /** The statistics object for RocksDB, null if statistics is disabled. */ + @Nullable private Statistics statistics; + + /** The block cache for RocksDB, shared across column families. */ + @Nullable private Cache blockCache; + /** The handles to be closed when the container is closed. */ private final ArrayList handlesToClose; @@ -138,7 +146,7 @@ public DBOptions getDbOptions() throws IOException { opt.setRateLimiter(sharedRateLimiter); if (enableStatistics) { - Statistics statistics = new Statistics(); + statistics = new Statistics(); opt.setStatistics(statistics); handlesToClose.add(statistics); } @@ -146,6 +154,18 @@ public DBOptions getDbOptions() throws IOException { return opt; } + /** Gets the Statistics object if statistics is enabled, null otherwise. */ + @Nullable + public Statistics getStatistics() { + return statistics; + } + + /** Gets the block cache used by RocksDB, null if not yet initialized. */ + @Nullable + public Cache getBlockCache() { + return blockCache; + } + /** Gets the RocksDB {@link ColumnFamilyOptions} to be used for all RocksDB instances. */ public ColumnFamilyOptions getColumnOptions() { // initial options from common profile @@ -282,8 +302,11 @@ private ColumnFamilyOptions setColumnFamilyOptionsFromConfigurableOptions( blockBasedTableConfig.setMetadataBlockSize( internalGetOption(ConfigOptions.KV_METADATA_BLOCK_SIZE).getBytes()); - blockBasedTableConfig.setBlockCacheSize( - internalGetOption(ConfigOptions.KV_BLOCK_CACHE_SIZE).getBytes()); + // Create explicit LRUCache for accurate memory tracking + long blockCacheSize = internalGetOption(ConfigOptions.KV_BLOCK_CACHE_SIZE).getBytes(); + blockCache = new LRUCache(blockCacheSize); + handlesToClose.add(blockCache); + blockBasedTableConfig.setBlockCache(blockCache); if (internalGetOption(ConfigOptions.KV_USE_BLOOM_FILTER)) { final double bitsPerKey = internalGetOption(ConfigOptions.KV_BLOOM_FILTER_BITS_PER_KEY); diff --git a/fluss-server/src/main/java/org/apache/fluss/server/kv/rocksdb/RocksDBStatistics.java b/fluss-server/src/main/java/org/apache/fluss/server/kv/rocksdb/RocksDBStatistics.java new file mode 100644 index 0000000000..fd7a20cd7e --- /dev/null +++ b/fluss-server/src/main/java/org/apache/fluss/server/kv/rocksdb/RocksDBStatistics.java @@ -0,0 +1,376 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.fluss.server.kv.rocksdb; + +import org.apache.fluss.server.utils.ResourceGuard; + +import org.rocksdb.Cache; +import org.rocksdb.ColumnFamilyHandle; +import org.rocksdb.HistogramData; +import org.rocksdb.HistogramType; +import org.rocksdb.MemoryUsageType; +import org.rocksdb.MemoryUtil; +import org.rocksdb.RocksDB; +import org.rocksdb.RocksDBException; +import org.rocksdb.Statistics; +import org.rocksdb.TickerType; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import javax.annotation.Nullable; + +import java.util.Collections; +import java.util.HashSet; +import java.util.Map; +import java.util.Set; + +/** + * Collects and provides access to RocksDB statistics for a single KvTablet. + * + *

This class encapsulates low-level RocksDB statistics collection, providing semantic methods to + * access various RocksDB statistics and properties. It does NOT register Fluss metrics directly; + * instead, upper layers (e.g., TableMetricGroup) consume these statistics to compute and register + * actual Fluss Metrics. + * + *

Thread-safety: This class uses RocksDB's ResourceGuard to ensure safe concurrent access. All + * statistics read operations acquire the resource guard to prevent accessing closed RocksDB + * instances. + */ +public class RocksDBStatistics implements AutoCloseable { + + private static final Logger LOG = LoggerFactory.getLogger(RocksDBStatistics.class); + + private final RocksDB db; + @Nullable private final Statistics statistics; + private final ResourceGuard resourceGuard; + private final ColumnFamilyHandle defaultColumnFamilyHandle; + @Nullable private final Cache blockCache; + + public RocksDBStatistics( + RocksDB db, + @Nullable Statistics statistics, + ResourceGuard resourceGuard, + ColumnFamilyHandle defaultColumnFamilyHandle, + @Nullable Cache blockCache) { + this.db = db; + this.statistics = statistics; + this.resourceGuard = resourceGuard; + this.defaultColumnFamilyHandle = defaultColumnFamilyHandle; + this.blockCache = blockCache; + } + + // ==================== Ticker-based Metrics ==================== + + /** + * Get write stall duration in microseconds. + * + * @return write stall duration, or 0 if not available + */ + public long getWriteStallMicros() { + return getTickerValue(TickerType.STALL_MICROS); + } + + /** + * Get total bytes read. + * + * @return bytes read, or 0 if not available + */ + public long getBytesRead() { + return getTickerValue(TickerType.BYTES_READ); + } + + /** + * Get total bytes written. + * + * @return bytes written, or 0 if not available + */ + public long getBytesWritten() { + return getTickerValue(TickerType.BYTES_WRITTEN); + } + + /** + * Get flush bytes written. + * + * @return flush bytes written, or 0 if not available + */ + public long getFlushBytesWritten() { + return getTickerValue(TickerType.FLUSH_WRITE_BYTES); + } + + /** + * Get compaction bytes read. + * + * @return compaction bytes read, or 0 if not available + */ + public long getCompactionBytesRead() { + return getTickerValue(TickerType.COMPACT_READ_BYTES); + } + + /** + * Get compaction bytes written. + * + * @return compaction bytes written, or 0 if not available + */ + public long getCompactionBytesWritten() { + return getTickerValue(TickerType.COMPACT_WRITE_BYTES); + } + + // ==================== Property-based Metrics ==================== + + /** + * Get get operation latency in microseconds (P99). + * + *

This uses RocksDB Statistics histogram data to get the P99 latency of get operations. P99 + * is used instead of average because it better reflects tail latency issues, which are more + * critical for monitoring database performance. + * + * @return P99 get latency in microseconds, or 0 if not available + */ + public long getGetLatencyMicros() { + return getHistogramValue(HistogramType.DB_GET); + } + + /** + * Get write operation latency in microseconds (P99). + * + *

This uses RocksDB Statistics histogram data to get the P99 latency of write operations. + * P99 is used instead of average because it better reflects tail latency issues, which are more + * critical for monitoring database performance. + * + * @return P99 write latency in microseconds, or 0 if not available + */ + public long getWriteLatencyMicros() { + return getHistogramValue(HistogramType.DB_WRITE); + } + + /** + * Get number of files at level 0. + * + *

This property is column family specific and must be accessed through the column family + * handle. + * + * @return number of L0 files, or 0 if not available + */ + public long getNumFilesAtLevel0() { + return getPropertyValue(defaultColumnFamilyHandle, "rocksdb.num-files-at-level0"); + } + + /** + * Get whether a memtable flush is pending. + * + * @return 1 if flush is pending, 0 otherwise + */ + public long getFlushPending() { + return getPropertyValue("rocksdb.mem-table-flush-pending"); + } + + /** + * Get whether a compaction is pending. + * + * @return 1 if compaction is pending, 0 otherwise + */ + public long getCompactionPending() { + return getPropertyValue("rocksdb.compaction-pending"); + } + + /** + * Get compaction time in microseconds (P99). + * + *

This uses RocksDB Statistics histogram data to get the P99 compaction time. P99 is used + * instead of average because it better reflects tail latency issues in compaction operations. + * + * @return P99 compaction time in microseconds, or 0 if not available + */ + public long getCompactionTimeMicros() { + return getHistogramValue(HistogramType.COMPACTION_TIME); + } + + /** + * Get total memory usage across all RocksDB components including block cache, memtables, + * indexes, filters, etc. + * + *

This uses RocksDB MemoryUtil to get approximate memory usage by type and sums all types. + * This includes: + * + *

+ * + *

Note: To get accurate block cache memory usage, an explicit Cache object must be provided + * during construction. If no cache is provided (null), the block cache memory usage may not be + * fully accounted for. + * + * @return total memory usage in bytes, or 0 if not available + */ + public long getTotalMemoryUsage() { + try (ResourceGuard.Lease lease = resourceGuard.acquireResource()) { + if (db == null) { + return 0L; + } + + // Create cache set for memory usage calculation. + // If blockCache is null, pass null to MemoryUtil (will only count memtables, etc.) + Set caches = null; + if (blockCache != null) { + caches = new HashSet<>(); + caches.add(blockCache); + } + + Map memoryUsage = + MemoryUtil.getApproximateMemoryUsageByType( + Collections.singletonList(db), caches); + return memoryUsage.values().stream().mapToLong(Long::longValue).sum(); + } catch (Exception e) { + LOG.debug( + "Failed to get total memory usage from RocksDB (possibly closed or unavailable)", + e); + return 0L; + } + } + + // ==================== Internal Helper Methods ==================== + + /** + * Get ticker value from RocksDB Statistics with resource guard protection. + * + * @param tickerType the ticker type to query + * @return the ticker value, or 0 if not available or RocksDB is closed + */ + private long getTickerValue(TickerType tickerType) { + try (ResourceGuard.Lease lease = resourceGuard.acquireResource()) { + if (statistics != null) { + return statistics.getTickerCount(tickerType); + } + } catch (Exception e) { + LOG.debug( + "Failed to get ticker {} from RocksDB (possibly closed or unavailable)", + tickerType, + e); + } + return 0L; + } + + /** + * Get histogram P99 value from RocksDB Statistics with resource guard protection. + * + *

Histograms are used for latency metrics and provide average, median, percentile values. + * This method returns the P99 value (99th percentile) instead of average, which better reflects + * tail latency and is more useful for performance monitoring. For microsecond-level latencies, + * we round to the nearest long value to avoid precision loss where it matters. + * + *

Why P99 instead of average: + * + *

+ * + * @param histogramType the histogram type to query + * @return the P99 histogram value (rounded to nearest long), or 0 if not available or RocksDB + * is closed + */ + private long getHistogramValue(HistogramType histogramType) { + try (ResourceGuard.Lease lease = resourceGuard.acquireResource()) { + if (statistics != null) { + HistogramData histogramData = statistics.getHistogramData(histogramType); + if (histogramData != null) { + // Use P99 instead of average for better tail latency monitoring + // Round to nearest long to preserve precision for microsecond-level values + return Math.round(histogramData.getPercentile99()); + } + } + } catch (Exception e) { + LOG.debug( + "Failed to get histogram {} from RocksDB Statistics (possibly closed or unavailable)", + histogramType, + e); + } + return 0L; + } + + /** + * Get property value from RocksDB with resource guard protection. + * + * @param propertyName the property name to query + * @return the property value as long, or 0 if not available or RocksDB is closed + */ + private long getPropertyValue(String propertyName) { + try (ResourceGuard.Lease lease = resourceGuard.acquireResource()) { + String value = db.getProperty(propertyName); + if (value != null && !value.isEmpty()) { + return Long.parseLong(value); + } + } catch (RocksDBException e) { + LOG.debug( + "Failed to get property {} from RocksDB (possibly closed or unavailable)", + propertyName, + e); + } catch (NumberFormatException e) { + LOG.debug("Failed to parse property {} value as long", propertyName, e); + } catch (Exception e) { + // ResourceGuard may throw exception if RocksDB is closed + LOG.debug( + "Failed to access RocksDB for property {} (possibly closed)", propertyName, e); + } + return 0L; + } + + /** + * Get property value from RocksDB for a specific column family with resource guard protection. + * + *

Some RocksDB properties are column family specific and must be accessed through the column + * family handle. + * + * @param columnFamilyHandle the column family handle + * @param propertyName the property name to query + * @return the property value as long, or 0 if not available or RocksDB is closed + */ + private long getPropertyValue(ColumnFamilyHandle columnFamilyHandle, String propertyName) { + try (ResourceGuard.Lease lease = resourceGuard.acquireResource()) { + if (columnFamilyHandle == null) { + return 0L; + } + String value = db.getProperty(columnFamilyHandle, propertyName); + if (value != null && !value.isEmpty()) { + return Long.parseLong(value); + } + } catch (RocksDBException e) { + LOG.debug( + "Failed to get property {} from RocksDB column family (possibly closed or unavailable)", + propertyName, + e); + } catch (NumberFormatException e) { + LOG.debug("Failed to parse property {} value as long", propertyName, e); + } catch (Exception e) { + // ResourceGuard may throw exception if RocksDB is closed + LOG.debug( + "Failed to access RocksDB for property {} (possibly closed)", propertyName, e); + } + return 0L; + } + + @Override + public void close() { + // No resources to clean up, statistics are managed by TableMetricGroup + LOG.debug("RocksDB statistics accessor closed"); + } +} diff --git a/fluss-server/src/main/java/org/apache/fluss/server/metrics/group/BucketMetricGroup.java b/fluss-server/src/main/java/org/apache/fluss/server/metrics/group/BucketMetricGroup.java index fc8e281a57..ed5b111019 100644 --- a/fluss-server/src/main/java/org/apache/fluss/server/metrics/group/BucketMetricGroup.java +++ b/fluss-server/src/main/java/org/apache/fluss/server/metrics/group/BucketMetricGroup.java @@ -20,6 +20,11 @@ import org.apache.fluss.metrics.CharacterFilter; import org.apache.fluss.metrics.groups.AbstractMetricGroup; import org.apache.fluss.metrics.registry.MetricRegistry; +import org.apache.fluss.server.kv.rocksdb.RocksDBStatistics; +import org.apache.fluss.utils.IOUtils; + +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; import javax.annotation.Nullable; @@ -27,12 +32,24 @@ import static org.apache.fluss.metrics.utils.MetricGroupUtils.makeScope; -/** Metrics for the table buckets with table as parent group. */ +/** + * Metrics for the table buckets with table as parent group. + * + *

For KV tables, this class also manages the RocksDB statistics lifecycle. The statistics are + * registered when KvTablet is initialized and automatically cleaned up when this metric group is + * closed. + */ public class BucketMetricGroup extends AbstractMetricGroup { + + private static final Logger LOG = LoggerFactory.getLogger(BucketMetricGroup.class); + // will be null if the bucket doesn't belong to a partition private final @Nullable String partitionName; private final int bucket; + // RocksDB statistics for this bucket (null for non-KV tables) + private volatile @Nullable RocksDBStatistics rocksDBStatistics; + public BucketMetricGroup( MetricRegistry registry, @Nullable String partitionName, @@ -62,4 +79,56 @@ protected String getGroupName(CharacterFilter filter) { public TableMetricGroup getTableMetricGroup() { return (TableMetricGroup) parent; } + + /** + * Register RocksDB statistics for this bucket. This should be called when KvTablet is + * initialized. + * + *

This method must be paired with {@link #unregisterRocksDBStatistics()} to ensure proper + * resource cleanup. + * + * @param statistics the RocksDB statistics collector + */ + public void registerRocksDBStatistics(RocksDBStatistics statistics) { + if (this.rocksDBStatistics != null) { + LOG.warn( + "RocksDB statistics already registered for bucket {}, this may indicate a resource leak", + bucket); + } + this.rocksDBStatistics = statistics; + LOG.debug("Registered RocksDB statistics for bucket {}", bucket); + } + + /** + * Unregister and close RocksDB statistics for this bucket. This should be called when KvTablet + * is destroyed. + * + *

This method must be paired with {@link #registerRocksDBStatistics(RocksDBStatistics)} to + * ensure proper resource cleanup. + */ + public void unregisterRocksDBStatistics() { + if (rocksDBStatistics != null) { + LOG.debug("Unregistering RocksDB statistics for bucket {}", bucket); + IOUtils.closeQuietly(rocksDBStatistics); + rocksDBStatistics = null; + } + } + + /** + * Get the RocksDB statistics for this bucket. + * + * @return the RocksDB statistics, or null if not a KV table or not yet initialized + */ + @Nullable + public RocksDBStatistics getRocksDBStatistics() { + return rocksDBStatistics; + } + + @Override + public void close() { + // Clean up RocksDB statistics before closing the metric group + // This handles the case when the bucket is removed entirely + unregisterRocksDBStatistics(); + super.close(); + } } diff --git a/fluss-server/src/main/java/org/apache/fluss/server/metrics/group/TableMetricGroup.java b/fluss-server/src/main/java/org/apache/fluss/server/metrics/group/TableMetricGroup.java index 7620bcbfda..fc40463f2d 100644 --- a/fluss-server/src/main/java/org/apache/fluss/server/metrics/group/TableMetricGroup.java +++ b/fluss-server/src/main/java/org/apache/fluss/server/metrics/group/TableMetricGroup.java @@ -27,11 +27,13 @@ import org.apache.fluss.metrics.ThreadSafeSimpleCounter; import org.apache.fluss.metrics.groups.AbstractMetricGroup; import org.apache.fluss.metrics.registry.MetricRegistry; +import org.apache.fluss.server.kv.rocksdb.RocksDBStatistics; +import org.apache.fluss.utils.MapUtils; import javax.annotation.Nullable; -import java.util.HashMap; import java.util.Map; +import java.util.stream.Stream; import static org.apache.fluss.metrics.utils.MetricGroupUtils.makeScope; @@ -41,7 +43,7 @@ */ public class TableMetricGroup extends AbstractMetricGroup { - private final Map buckets = new HashMap<>(); + private final Map buckets = MapUtils.newConcurrentHashMap(); private final TablePath tablePath; @@ -70,6 +72,8 @@ public TableMetricGroup( if (isKvTable) { kvMetrics = new KvMetricGroup(this); logMetrics = new LogMetricGroup(this, TabletType.CDC_LOG); + // Register RocksDB aggregated metrics for kv tables + registerRocksDBMetrics(); } else { // otherwise, create log produce metrics kvMetrics = null; @@ -236,17 +240,124 @@ public BucketMetricGroup addBucketMetricGroup( public void removeBucketMetricGroup(TableBucket tableBucket) { BucketMetricGroup metricGroup = buckets.remove(tableBucket); - metricGroup.close(); + if (metricGroup != null) { + // BucketMetricGroup.close() will automatically clean up RocksDB statistics + metricGroup.close(); + } } public int bucketGroupsCount() { return buckets.size(); } + public java.util.Collection getBucketMetricGroups() { + return buckets.values(); + } + + /** + * Get all RocksDB statistics from bucket metric groups for table-level and server-level + * aggregation. + * + *

This method dynamically collects statistics from all buckets, allowing automatic cleanup + * when buckets are removed without maintaining a separate map. + * + * @return stream of RocksDB statistics from all buckets in this table + */ + public Stream allRocksDBStatistics() { + return buckets.values().stream() + .map(BucketMetricGroup::getRocksDBStatistics) + .filter(stats -> stats != null); + } + public TabletServerMetricGroup getServerMetricGroup() { return (TabletServerMetricGroup) parent; } + /** + * Register RocksDB aggregated metrics at table level. These metrics aggregate values from all + * buckets of this table. + * + *

This method is called once during TableMetricGroup construction for KV tables. + */ + private void registerRocksDBMetrics() { + // Max aggregation metrics - track the maximum value across all buckets + gauge( + MetricNames.ROCKSDB_WRITE_STALL_MICROS_MAX, + () -> + allRocksDBStatistics() + .mapToLong(RocksDBStatistics::getWriteStallMicros) + .max() + .orElse(0L)); + gauge( + MetricNames.ROCKSDB_GET_LATENCY_MICROS_MAX, + () -> + allRocksDBStatistics() + .mapToLong(RocksDBStatistics::getGetLatencyMicros) + .max() + .orElse(0L)); + gauge( + MetricNames.ROCKSDB_WRITE_LATENCY_MICROS_MAX, + () -> + allRocksDBStatistics() + .mapToLong(RocksDBStatistics::getWriteLatencyMicros) + .max() + .orElse(0L)); + gauge( + MetricNames.ROCKSDB_NUM_FILES_AT_LEVEL0_MAX, + () -> + allRocksDBStatistics() + .mapToLong(RocksDBStatistics::getNumFilesAtLevel0) + .max() + .orElse(0L)); + gauge( + MetricNames.ROCKSDB_FLUSH_PENDING_MAX, + () -> + allRocksDBStatistics() + .mapToLong(RocksDBStatistics::getFlushPending) + .max() + .orElse(0L)); + gauge( + MetricNames.ROCKSDB_COMPACTION_PENDING_MAX, + () -> + allRocksDBStatistics() + .mapToLong(RocksDBStatistics::getCompactionPending) + .max() + .orElse(0L)); + gauge( + MetricNames.ROCKSDB_COMPACTION_TIME_MICROS_MAX, + () -> + allRocksDBStatistics() + .mapToLong(RocksDBStatistics::getCompactionTimeMicros) + .max() + .orElse(0L)); + + // Sum aggregation metrics - track the total value across all buckets + gauge( + MetricNames.ROCKSDB_BYTES_READ_TOTAL, + () -> allRocksDBStatistics().mapToLong(RocksDBStatistics::getBytesRead).sum()); + gauge( + MetricNames.ROCKSDB_BYTES_WRITTEN_TOTAL, + () -> allRocksDBStatistics().mapToLong(RocksDBStatistics::getBytesWritten).sum()); + gauge( + MetricNames.ROCKSDB_FLUSH_BYTES_WRITTEN_TOTAL, + () -> + allRocksDBStatistics() + .mapToLong(RocksDBStatistics::getFlushBytesWritten) + .sum()); + gauge( + MetricNames.ROCKSDB_COMPACTION_BYTES_READ_TOTAL, + () -> + allRocksDBStatistics() + .mapToLong(RocksDBStatistics::getCompactionBytesRead) + .sum()); + gauge( + MetricNames.ROCKSDB_COMPACTION_BYTES_WRITTEN_TOTAL, + () -> + allRocksDBStatistics() + .mapToLong(RocksDBStatistics::getCompactionBytesWritten) + .sum()); + } + /** Metric group for specific kind of tablet of a table. */ private static class TabletMetricGroup extends AbstractMetricGroup { private final TabletType tabletType; diff --git a/fluss-server/src/main/java/org/apache/fluss/server/metrics/group/TabletServerMetricGroup.java b/fluss-server/src/main/java/org/apache/fluss/server/metrics/group/TabletServerMetricGroup.java index 59730b69fa..f4b2c6b730 100644 --- a/fluss-server/src/main/java/org/apache/fluss/server/metrics/group/TabletServerMetricGroup.java +++ b/fluss-server/src/main/java/org/apache/fluss/server/metrics/group/TabletServerMetricGroup.java @@ -30,6 +30,7 @@ import org.apache.fluss.metrics.ThreadSafeSimpleCounter; import org.apache.fluss.metrics.groups.AbstractMetricGroup; import org.apache.fluss.metrics.registry.MetricRegistry; +import org.apache.fluss.server.kv.rocksdb.RocksDBStatistics; import org.apache.fluss.utils.MapUtils; import java.util.Map; @@ -133,6 +134,24 @@ public TabletServerMetricGroup( meter(MetricNames.ISR_SHRINKS_RATE, new MeterView(isrShrinks)); failedIsrUpdates = new SimpleCounter(); meter(MetricNames.FAILED_ISR_UPDATES_RATE, new MeterView(failedIsrUpdates)); + + // Register server-level RocksDB aggregated metrics + registerServerRocksDBMetrics(); + } + + /** + * Register server-level RocksDB aggregated metrics. These metrics aggregate memory usage from + * all tables. + */ + private void registerServerRocksDBMetrics() { + // Total memory usage across all RocksDB instances in this server. + gauge( + MetricNames.ROCKSDB_MEMORY_USAGE_TOTAL, + () -> + metricGroupByTable.values().stream() + .flatMap(TableMetricGroup::allRocksDBStatistics) + .mapToLong(RocksDBStatistics::getTotalMemoryUsage) + .sum()); } @Override diff --git a/fluss-server/src/main/java/org/apache/fluss/server/replica/Replica.java b/fluss-server/src/main/java/org/apache/fluss/server/replica/Replica.java index 1669e004df..f86778ebc3 100644 --- a/fluss-server/src/main/java/org/apache/fluss/server/replica/Replica.java +++ b/fluss-server/src/main/java/org/apache/fluss/server/replica/Replica.java @@ -596,6 +596,10 @@ private void dropKv() { IOUtils.closeQuietly(closeableRegistryForKv); } if (kvTablet != null) { + // Unregister RocksDB statistics before dropping KvTablet + // This ensures statistics are cleaned up when KvTablet is destroyed + bucketMetricGroup.unregisterRocksDBStatistics(); + // drop the kv tablet checkNotNull(kvManager); kvManager.dropKv(tableBucket); @@ -689,6 +693,12 @@ private Optional initKvTablet() { physicalPath, tableBucket, endTime - startTime); + + // Register RocksDB statistics to BucketMetricGroup + if (kvTablet != null && kvTablet.getRocksDBStatistics() != null) { + bucketMetricGroup.registerRocksDBStatistics(kvTablet.getRocksDBStatistics()); + } + return optCompletedSnapshot; } diff --git a/fluss-server/src/test/java/org/apache/fluss/server/kv/KvTabletTest.java b/fluss-server/src/test/java/org/apache/fluss/server/kv/KvTabletTest.java index 06db30b340..dcafd5c3e4 100644 --- a/fluss-server/src/test/java/org/apache/fluss/server/kv/KvTabletTest.java +++ b/fluss-server/src/test/java/org/apache/fluss/server/kv/KvTabletTest.java @@ -47,12 +47,14 @@ import org.apache.fluss.server.kv.prewrite.KvPreWriteBuffer.Key; import org.apache.fluss.server.kv.prewrite.KvPreWriteBuffer.KvEntry; import org.apache.fluss.server.kv.prewrite.KvPreWriteBuffer.Value; +import org.apache.fluss.server.kv.rocksdb.RocksDBStatistics; import org.apache.fluss.server.kv.rowmerger.RowMerger; import org.apache.fluss.server.log.FetchIsolation; import org.apache.fluss.server.log.LogAppendInfo; import org.apache.fluss.server.log.LogTablet; import org.apache.fluss.server.log.LogTestUtils; import org.apache.fluss.server.metrics.group.TestingMetricGroups; +import org.apache.fluss.server.zk.NOPErrorHandler; import org.apache.fluss.shaded.arrow.org.apache.arrow.memory.RootAllocator; import org.apache.fluss.types.DataTypes; import org.apache.fluss.types.RowType; @@ -1241,4 +1243,186 @@ private void checkEqual(LogRecords actaulLogRecords, List expe private Value valueOf(BinaryRow row) { return Value.of(ValueEncoder.encodeValue(schemaId, row)); } + + @Test + void testRocksDBMetrics() throws Exception { + // Initialize tablet with schema + initLogTabletAndKvTablet(DATA1_SCHEMA_PK, new HashMap<>()); + + // Get RocksDB statistics + RocksDBStatistics statistics = kvTablet.getRocksDBStatistics(); + assertThat(statistics).as("RocksDB statistics should be available").isNotNull(); + + // Verify statistics is properly initialized + org.rocksdb.Statistics stats = kvTablet.getRocksDBKv().getStatistics(); + assertThat(stats).as("RocksDB Statistics should be enabled").isNotNull(); + + // All metrics should start at 0 for a fresh database + assertThat(statistics.getBytesWritten()).isEqualTo(0); + assertThat(statistics.getBytesRead()).isEqualTo(0); + assertThat(statistics.getFlushBytesWritten()).isEqualTo(0); + assertThat(statistics.getWriteLatencyMicros()).isEqualTo(0); + assertThat(statistics.getGetLatencyMicros()).isEqualTo(0); + assertThat(statistics.getNumFilesAtLevel0()).isEqualTo(0); + assertThat(statistics.getFlushPending()).isEqualTo(0); + assertThat(statistics.getCompactionPending()).isEqualTo(0); + assertThat(statistics.getTotalMemoryUsage()) + .isGreaterThan(0); // Block cache is pre-allocated + + // ========== Phase 1: Write and Flush ========== + int numRecords = 10000; + List rows = new ArrayList<>(); + for (int i = 0; i < numRecords; i++) { + rows.add( + kvRecordFactory.ofRecord( + String.valueOf(i).getBytes(), new Object[] {i, "value-" + i})); + } + kvTablet.putAsLeader(kvRecordBatchFactory.ofRecords(rows), null); + kvTablet.flush(Long.MAX_VALUE, NOPErrorHandler.INSTANCE); + + // After write and flush: must have written data to RocksDB + long bytesWrittenAfterFlush = statistics.getBytesWritten(); + assertThat(bytesWrittenAfterFlush) + .as("Must write data to RocksDB after flush") + .isGreaterThan(0); + + // Flush must have written bytes (memtable to SST file) + long flushBytesWritten = statistics.getFlushBytesWritten(); + // Note: FLUSH_WRITE_BYTES may not be tracked in all configurations + assertThat(flushBytesWritten) + .as("Flush bytes written is non-negative") + .isGreaterThanOrEqualTo(0); + + // Write latency must be tracked for write operations + long writeLatency = statistics.getWriteLatencyMicros(); + assertThat(writeLatency) + .as("Write latency must be > 0 after write operations") + .isGreaterThan(0); + + // After flush, there should be at least 1 L0 file (unless immediate compaction occurred) + long numL0Files = statistics.getNumFilesAtLevel0(); + assertThat(numL0Files) + .as("Should have L0 files after flush (or 0 if compacted)") + .isGreaterThanOrEqualTo(0); + + // Flush pending must be 0 after flush completes + assertThat(statistics.getFlushPending()) + .as("No pending flush after completion") + .isEqualTo(0); + + // ========== Phase 2: Read Operations ========== + List keysToRead = new ArrayList<>(); + for (int i = 0; i < 100; i++) { + keysToRead.add(String.valueOf(i).getBytes()); + } + List readValues = kvTablet.multiGet(keysToRead); + assertThat(readValues).hasSize(100); + + // After reads: get latency must be tracked + long getLatency = statistics.getGetLatencyMicros(); + assertThat(getLatency) + .as("Get latency must be tracked after read operations") + .isGreaterThan(0); + + // Bytes read may increase (depending on cache hits) + long bytesRead = statistics.getBytesRead(); + // Note: bytesRead could be 0 if all data was served from block cache + assertThat(bytesRead).as("Bytes read is non-negative").isGreaterThanOrEqualTo(0); + + // ========== Phase 3: Write More Data to Trigger Compaction ========== + List moreRows = new ArrayList<>(); + for (int i = numRecords; i < numRecords * 2; i++) { + moreRows.add( + kvRecordFactory.ofRecord( + String.valueOf(i).getBytes(), new Object[] {i, "value-" + i})); + } + kvTablet.putAsLeader(kvRecordBatchFactory.ofRecords(moreRows), null); + kvTablet.flush(Long.MAX_VALUE, NOPErrorHandler.INSTANCE); + + // Bytes written must increase with more data + long bytesWrittenAfterSecondFlush = statistics.getBytesWritten(); + assertThat(bytesWrittenAfterSecondFlush) + .as("Bytes written must increase with second batch") + .isGreaterThan(bytesWrittenAfterFlush); + + // ========== Phase 4: Manual Compaction ========== + long compactionBytesReadBefore = statistics.getCompactionBytesRead(); + long compactionBytesWrittenBefore = statistics.getCompactionBytesWritten(); + long compactionTimeBefore = statistics.getCompactionTimeMicros(); + + // Trigger manual compaction + try { + kvTablet.getRocksDBKv().getDb().compactRange(); + } catch (Exception e) { + // Compaction failure is acceptable in test + } + + // After compaction: verify compaction metrics increased + long compactionBytesReadAfter = statistics.getCompactionBytesRead(); + long compactionBytesWrittenAfter = statistics.getCompactionBytesWritten(); + long compactionTimeAfter = statistics.getCompactionTimeMicros(); + + // If any compaction occurred, all three metrics should increase + boolean compactionOccurred = + compactionBytesReadAfter > compactionBytesReadBefore + || compactionBytesWrittenAfter > compactionBytesWrittenBefore + || compactionTimeAfter > compactionTimeBefore; + + if (compactionOccurred) { + assertThat(compactionBytesReadAfter) + .as("Compaction must read data") + .isGreaterThan(compactionBytesReadBefore); + assertThat(compactionBytesWrittenAfter) + .as("Compaction must write data") + .isGreaterThan(compactionBytesWrittenBefore); + assertThat(compactionTimeAfter) + .as("Compaction must take time") + .isGreaterThan(compactionTimeBefore); + } + + // Compaction pending must be 0 after compaction completes + assertThat(statistics.getCompactionPending()) + .as("No pending compaction after completion") + .isEqualTo(0); + + // ========== Phase 5: Verify Final State Before Close ========== + // Bytes written must be positive after all operations + assertThat(statistics.getBytesWritten()) + .as("Total bytes written must be positive") + .isGreaterThan(0); + + // Write and get latency must be positive (operations occurred) + assertThat(statistics.getWriteLatencyMicros()) + .as("Write latency must be positive after writes") + .isGreaterThan(0); + assertThat(statistics.getGetLatencyMicros()) + .as("Get latency must be positive after reads") + .isGreaterThan(0); + + // No pending operations + assertThat(statistics.getFlushPending()).isEqualTo(0); + assertThat(statistics.getCompactionPending()).isEqualTo(0); + + // Memory usage should be reasonable (> 0 due to block cache + memtables) + assertThat(statistics.getTotalMemoryUsage()) + .as("Total memory usage must be positive") + .isGreaterThan(0); + + // ========== Phase 6: Verify Metrics After Close ========== + kvTablet.close(); + + // After close: all metrics must return 0 (ResourceGuard protection) + assertThat(statistics.getBytesWritten()).isEqualTo(0); + assertThat(statistics.getBytesRead()).isEqualTo(0); + assertThat(statistics.getFlushBytesWritten()).isEqualTo(0); + assertThat(statistics.getWriteLatencyMicros()).isEqualTo(0); + assertThat(statistics.getGetLatencyMicros()).isEqualTo(0); + assertThat(statistics.getNumFilesAtLevel0()).isEqualTo(0); + assertThat(statistics.getFlushPending()).isEqualTo(0); + assertThat(statistics.getCompactionPending()).isEqualTo(0); + assertThat(statistics.getCompactionBytesRead()).isEqualTo(0); + assertThat(statistics.getCompactionBytesWritten()).isEqualTo(0); + assertThat(statistics.getCompactionTimeMicros()).isEqualTo(0); + assertThat(statistics.getTotalMemoryUsage()).isEqualTo(0); + } } diff --git a/fluss-server/src/test/java/org/apache/fluss/server/kv/rocksdb/RocksDBResourceContainerTest.java b/fluss-server/src/test/java/org/apache/fluss/server/kv/rocksdb/RocksDBResourceContainerTest.java index f36853cd48..676a17a6b0 100644 --- a/fluss-server/src/test/java/org/apache/fluss/server/kv/rocksdb/RocksDBResourceContainerTest.java +++ b/fluss-server/src/test/java/org/apache/fluss/server/kv/rocksdb/RocksDBResourceContainerTest.java @@ -189,7 +189,8 @@ void testConfigurationOptionsFromConfig() throws Exception { (BlockBasedTableConfig) columnOptions.tableFormatConfig(); assertThat(tableConfig.blockSize()).isEqualTo(4 * SizeUnit.KB); assertThat(tableConfig.metadataBlockSize()).isEqualTo(8 * SizeUnit.KB); - assertThat(tableConfig.blockCacheSize()).isEqualTo(512 * SizeUnit.MB); + // Verify block cache was created with explicit LRUCache for memory tracking + assertThat(optionsContainer.getBlockCache()).isNotNull(); assertThat(tableConfig.filterPolicy() instanceof BloomFilter).isTrue(); } } diff --git a/website/docs/maintenance/observability/monitor-metrics.md b/website/docs/maintenance/observability/monitor-metrics.md index 2d8ef80771..65109551c2 100644 --- a/website/docs/maintenance/observability/monitor-metrics.md +++ b/website/docs/maintenance/observability/monitor-metrics.md @@ -825,6 +825,134 @@ Some metrics might not be exposed when using other JVM implementations (e.g. IBM +### RocksDB + +RocksDB metrics provide insights into the performance and health of the underlying RocksDB storage engine used by Fluss. These metrics are categorized into table-level metrics (aggregated from all buckets of a table) and server-level metrics (aggregated from all tables in a server). + +#### Table-level RocksDB Metrics (Max Aggregation) + +These metrics use Max aggregation to show the maximum value across all buckets of a table, which helps identify the worst-performing bucket. + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + +
ScopeInfixMetricsDescriptionType
tabletservertablerocksdbWriteStallMicrosMaxMaximum write stall duration across all buckets of this table (in microseconds). Write stalls occur when RocksDB needs to slow down writes due to compaction pressure or memory limits.Gauge
rocksdbGetLatencyMicrosMaxMaximum get operation latency across all buckets of this table (in microseconds). This represents the slowest read operation among all buckets.Gauge
rocksdbWriteLatencyMicrosMaxMaximum write operation latency across all buckets of this table (in microseconds). This represents the slowest write operation among all buckets.Gauge
rocksdbNumFilesAtLevel0MaxMaximum number of L0 files across all buckets of this table. A high number of L0 files indicates compaction pressure and may impact read performance.Gauge
rocksdbFlushPendingMaxMaximum flush pending indicator across all buckets of this table. A value greater than 0 indicates that some buckets have pending flush operations.Gauge
rocksdbCompactionPendingMaxMaximum compaction pending indicator across all buckets of this table. A value greater than 0 indicates that some buckets have pending compaction operations.Gauge
rocksdbCompactionTimeMicrosMaxMaximum compaction time across all buckets of this table (in microseconds). This represents the longest compaction operation among all buckets.Gauge
+ +#### Table-level RocksDB Metrics (Sum Aggregation) + +These metrics use Sum aggregation to show the total value across all buckets of a table, providing an overall view of table-level I/O and storage operations. + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + +
ScopeInfixMetricsDescriptionType
tabletservertablerocksdbBytesReadTotalTotal bytes read across all buckets of this table. This includes both user reads and internal reads (e.g., compaction reads).Gauge
rocksdbBytesWrittenTotalTotal bytes written across all buckets of this table. This includes both user writes and internal writes (e.g., compaction writes).Gauge
rocksdbFlushBytesWrittenTotalTotal flush bytes written across all buckets of this table. This represents the amount of data flushed from memtable to persistent storage.Gauge
rocksdbCompactionBytesReadTotalTotal compaction bytes read across all buckets of this table. This represents the amount of data read during compaction operations.Gauge
rocksdbCompactionBytesWrittenTotalTotal compaction bytes written across all buckets of this table. This represents the amount of data written during compaction operations.Gauge
+ +#### Server-level RocksDB Metrics (Sum Aggregation) + +These metrics use Sum aggregation to show the total value across all tables in a server, providing a server-wide view of RocksDB resource usage. + + + + + + + + + + + + + + + + + + + + +
ScopeInfixMetricsDescriptionType
tabletserver-rocksdbMemoryUsageTotalTotal memory usage across all RocksDB instances in this server (in bytes). This includes memory used by memtables, block cache, and other RocksDB internal structures.Gauge
### Flink connector standard metrics