From e301b55de438815e9233feafd9afcfdab356cb7c Mon Sep 17 00:00:00 2001 From: zhaizhibo Date: Tue, 18 Nov 2025 21:18:05 +0800 Subject: [PATCH 1/2] feat: pre-read location for missing cache read. --- .../bookkeeper/bookie/BufferedChannel.java | 11 +- .../bookie/BufferedReadChannel.java | 37 +++--- .../bookkeeper/bookie/DefaultEntryLogger.java | 41 +++++-- .../bookkeeper/bookie/HandleFactoryImpl.java | 8 +- .../bookie/storage/EntryLogger.java | 6 + .../directentrylogger/DirectEntryLogger.java | 9 ++ .../bookie/storage/ldb/DbLedgerStorage.java | 21 +++- .../storage/ldb/DbLedgerStorageStats.java | 12 +- .../bookie/storage/ldb/DisableReadCache.java | 47 +++++++ .../storage/ldb/EntryLocationIndex.java | 19 ++- .../bookie/storage/ldb/LocationCache.java | 61 ++++++++++ .../ldb/SingleDirectoryDbLedgerStorage.java | 115 ++++++++++++++---- .../ldb/DbLedgerStorageWriteCacheTest.java | 5 +- 13 files changed, 323 insertions(+), 69 deletions(-) create mode 100644 bookkeeper-server/src/main/java/org/apache/bookkeeper/bookie/storage/ldb/DisableReadCache.java create mode 100644 bookkeeper-server/src/main/java/org/apache/bookkeeper/bookie/storage/ldb/LocationCache.java diff --git a/bookkeeper-server/src/main/java/org/apache/bookkeeper/bookie/BufferedChannel.java b/bookkeeper-server/src/main/java/org/apache/bookkeeper/bookie/BufferedChannel.java index dbba31083d9..c18d78d7b33 100644 --- a/bookkeeper-server/src/main/java/org/apache/bookkeeper/bookie/BufferedChannel.java +++ b/bookkeeper-server/src/main/java/org/apache/bookkeeper/bookie/BufferedChannel.java @@ -242,12 +242,11 @@ public long forceWrite(boolean forceMetadata) throws IOException { } @Override - public synchronized int read(ByteBuf dest, long pos, int length) throws IOException { + public synchronized int read(ByteBuf dest, long pos, int length, int readExtraBytes) throws IOException { if (dest.writableBytes() < length) { throw new IllegalArgumentException("dest buffer remaining capacity is not enough" + "(must be at least as \"length\"=" + length + ")"); } - long prevPos = pos; while (length > 0) { // check if it is in the write buffer @@ -255,6 +254,10 @@ public synchronized int read(ByteBuf dest, long pos, int length) throws IOExcept int positionInBuffer = (int) (pos - writeBufferStartPosition.get()); int bytesToCopy = Math.min(writeBuffer.writerIndex() - positionInBuffer, dest.writableBytes()); + if (bytesToCopy == 0 && length <= readExtraBytes) { + // try to read next entry position, but we have reached the last entry + break; + } if (bytesToCopy == 0) { throw new IOException("Read past EOF"); } @@ -278,6 +281,10 @@ public synchronized int read(ByteBuf dest, long pos, int length) throws IOExcept int readBytes = fileChannel.read(readBuffer.internalNioBuffer(0, readCapacity), readBufferStartPosition); + if (readBytes <= 0 && length <= readExtraBytes) { + // we have reached the last entry + break; + } if (readBytes <= 0) { throw new IOException("Reading from filechannel returned a non-positive value. Short read."); } diff --git a/bookkeeper-server/src/main/java/org/apache/bookkeeper/bookie/BufferedReadChannel.java b/bookkeeper-server/src/main/java/org/apache/bookkeeper/bookie/BufferedReadChannel.java index 4de3890e082..203263903bf 100644 --- a/bookkeeper-server/src/main/java/org/apache/bookkeeper/bookie/BufferedReadChannel.java +++ b/bookkeeper-server/src/main/java/org/apache/bookkeeper/bookie/BufferedReadChannel.java @@ -57,18 +57,8 @@ public BufferedReadChannel(FileChannel fileChannel, int readCapacity, boolean se this.readBuffer = Unpooled.buffer(readCapacity); } - /** - * Read as many bytes into dest as dest.capacity() starting at position pos in the - * FileChannel. This function can read from the buffer or the file channel - * depending on the implementation.. - * @param dest - * @param pos - * @return The total number of bytes read. - * -1 if the given position is greater than or equal to the file's current size. - * @throws IOException if I/O error occurs - */ public int read(ByteBuf dest, long pos) throws IOException { - return read(dest, pos, dest.writableBytes()); + return read(dest, pos, dest.writableBytes(), 0); } @Override @@ -88,6 +78,22 @@ public long size() throws IOException { } public synchronized int read(ByteBuf dest, long pos, int length) throws IOException { + return read(dest, pos, length, 0); + } + + /** + * Read as many bytes into dest as dest.capacity() starting at position pos in the + * FileChannel. This function can read from the buffer or the file channel + * depending on the implementation.. + * @param dest + * @param pos + * @param length + * @param readExtraBytes + * @return The total number of bytes read. + * -1 if the given position is greater than or equal to the file's current size. + * @throws IOException if I/O error occurs + */ + public synchronized int read(ByteBuf dest, long pos, int length, int readExtraBytes) throws IOException { invocationCount++; long currentPosition = pos; long eof = size(); @@ -111,9 +117,12 @@ public synchronized int read(ByteBuf dest, long pos, int length) throws IOExcept } else { // We don't have it in the buffer, so put necessary data in the buffer readBufferStartPosition = currentPosition; - int readBytes = 0; - if ((readBytes = validateAndGetFileChannel().read(readBuffer.internalNioBuffer(0, readCapacity), - currentPosition)) <= 0) { + int readBytes = validateAndGetFileChannel().read(readBuffer.internalNioBuffer(0, readCapacity), + currentPosition); + if (readBytes <= 0 && length <= readExtraBytes) { + break; + } + if (readBytes <= 0) { throw new IOException("Reading from filechannel returned a non-positive value. Short read."); } readBuffer.writerIndex(readBytes); diff --git a/bookkeeper-server/src/main/java/org/apache/bookkeeper/bookie/DefaultEntryLogger.java b/bookkeeper-server/src/main/java/org/apache/bookkeeper/bookie/DefaultEntryLogger.java index 2696aee3c94..2b6a6a619d4 100644 --- a/bookkeeper-server/src/main/java/org/apache/bookkeeper/bookie/DefaultEntryLogger.java +++ b/bookkeeper-server/src/main/java/org/apache/bookkeeper/bookie/DefaultEntryLogger.java @@ -384,15 +384,21 @@ void addListener(EntryLogListener listener) { */ private int readFromLogChannel(long entryLogId, BufferedReadChannel channel, ByteBuf buff, long pos) throws IOException { + return readFromLogChannel(entryLogId, channel, buff, pos, 0); + } + + private int readFromLogChannel(long entryLogId, BufferedReadChannel channel, ByteBuf buff, long pos, + int readExtraBytes) + throws IOException { BufferedLogChannel bc = entryLogManager.getCurrentLogIfPresent(entryLogId); if (null != bc) { synchronized (bc) { if (pos + buff.writableBytes() >= bc.getFileChannelPosition()) { - return bc.read(buff, pos); + return bc.read(buff, pos, buff.writableBytes(), readExtraBytes); } } } - return channel.read(buff, pos); + return channel.read(buff, pos, buff.writableBytes(), readExtraBytes); } /** @@ -828,16 +834,27 @@ private void validateEntry(long ledgerId, long entryId, long entryLogId, long po @Override public ByteBuf readEntry(long ledgerId, long entryId, long entryLocation) throws IOException, Bookie.NoEntryException { - return internalReadEntry(ledgerId, entryId, entryLocation, true /* validateEntry */); + return internalReadEntry(ledgerId, entryId, entryLocation, true /* validateEntry */, + 0).getRight(); + } + + @Override + public Pair readEntryAndExtraBytes(long ledgerId, long entryId, long entryLocation, + int extraBytes) + throws IOException, Bookie.NoEntryException { + return internalReadEntry(ledgerId, entryId, entryLocation, true /* validateEntry */, + extraBytes); } @Override public ByteBuf readEntry(long location) throws IOException, Bookie.NoEntryException { - return internalReadEntry(-1L, -1L, location, false /* validateEntry */); + return internalReadEntry(-1L, -1L, location, false /* validateEntry */, + 0).getRight(); } - private ByteBuf internalReadEntry(long ledgerId, long entryId, long location, boolean validateEntry) + private Pair internalReadEntry(long ledgerId, long entryId, long location, boolean validateEntry, + int readExtraBytes) throws IOException, Bookie.NoEntryException { long entryLogId = logIdForOffset(location); long pos = posForOffset(location); @@ -857,18 +874,20 @@ private ByteBuf internalReadEntry(long ledgerId, long entryId, long location, bo throw new IOException("Bad entry read from log file id: " + entryLogId, e); } - ByteBuf data = allocator.buffer(entrySize, entrySize); - int rc = readFromLogChannel(entryLogId, fc, data, pos); - if (rc != entrySize) { + int readSize = entrySize + readExtraBytes; + ByteBuf data = allocator.buffer(readSize, readSize); + int rc = readFromLogChannel(entryLogId, fc, data, pos, readExtraBytes); + if (rc < entrySize) { ReferenceCountUtil.release(data); throw new IOException("Bad entry read from log file id: " + entryLogId, new EntryLookupException("Short read for " + ledgerId + "@" + entryId + " in " + entryLogId + "@" - + pos + "(" + rc + "!=" + entrySize + ")")); + + pos + "(" + rc + "<" + entrySize + "@" + + rc + "!=" + readSize + ")")); } - data.writerIndex(entrySize); + data.writerIndex(rc); - return data; + return Pair.of(entrySize, data); } /** diff --git a/bookkeeper-server/src/main/java/org/apache/bookkeeper/bookie/HandleFactoryImpl.java b/bookkeeper-server/src/main/java/org/apache/bookkeeper/bookie/HandleFactoryImpl.java index ac87c3aed45..c50230052e5 100644 --- a/bookkeeper-server/src/main/java/org/apache/bookkeeper/bookie/HandleFactoryImpl.java +++ b/bookkeeper-server/src/main/java/org/apache/bookkeeper/bookie/HandleFactoryImpl.java @@ -72,8 +72,12 @@ public LedgerDescriptor getReadOnlyHandle(final long ledgerId) throws IOExceptio LedgerDescriptor handle = readOnlyLedgers.get(ledgerId); if (handle == null) { - handle = LedgerDescriptor.createReadOnly(ledgerId, ledgerStorage); - readOnlyLedgers.putIfAbsent(ledgerId, handle); + synchronized (this) { + if ((handle = readOnlyLedgers.get(ledgerId)) == null) { + handle = LedgerDescriptor.createReadOnly(ledgerId, ledgerStorage); + readOnlyLedgers.putIfAbsent(ledgerId, handle); + } + } } return handle; diff --git a/bookkeeper-server/src/main/java/org/apache/bookkeeper/bookie/storage/EntryLogger.java b/bookkeeper-server/src/main/java/org/apache/bookkeeper/bookie/storage/EntryLogger.java index c8d127c96ba..28b5e639372 100644 --- a/bookkeeper-server/src/main/java/org/apache/bookkeeper/bookie/storage/EntryLogger.java +++ b/bookkeeper-server/src/main/java/org/apache/bookkeeper/bookie/storage/EntryLogger.java @@ -24,8 +24,10 @@ import java.io.IOException; import java.util.Collection; import org.apache.bookkeeper.bookie.AbstractLogCompactor; +import org.apache.bookkeeper.bookie.Bookie; import org.apache.bookkeeper.bookie.Bookie.NoEntryException; import org.apache.bookkeeper.bookie.EntryLogMetadata; +import org.apache.commons.lang3.tuple.Pair; /** @@ -56,6 +58,10 @@ public interface EntryLogger extends AutoCloseable { */ ByteBuf readEntry(long entryLocation) throws IOException, NoEntryException; + Pair readEntryAndExtraBytes(long ledgerId, long entryId, long entryLocation, + int extraBytes) + throws IOException, Bookie.NoEntryException; + /** * Read an entry from an entrylog location, and verify that is matches the * expected ledger and entry ID. diff --git a/bookkeeper-server/src/main/java/org/apache/bookkeeper/bookie/storage/directentrylogger/DirectEntryLogger.java b/bookkeeper-server/src/main/java/org/apache/bookkeeper/bookie/storage/directentrylogger/DirectEntryLogger.java index 035981514e9..fba11c40f7a 100644 --- a/bookkeeper-server/src/main/java/org/apache/bookkeeper/bookie/storage/directentrylogger/DirectEntryLogger.java +++ b/bookkeeper-server/src/main/java/org/apache/bookkeeper/bookie/storage/directentrylogger/DirectEntryLogger.java @@ -49,6 +49,7 @@ import java.util.regex.Matcher; import java.util.stream.Collectors; import org.apache.bookkeeper.bookie.AbstractLogCompactor; +import org.apache.bookkeeper.bookie.Bookie; import org.apache.bookkeeper.bookie.Bookie.NoEntryException; import org.apache.bookkeeper.bookie.EntryLogMetadata; import org.apache.bookkeeper.bookie.storage.CompactionEntryLog; @@ -59,6 +60,7 @@ import org.apache.bookkeeper.slogger.Slogger; import org.apache.bookkeeper.stats.StatsLogger; import org.apache.bookkeeper.util.LedgerDirUtil; +import org.apache.commons.lang3.tuple.Pair; /** * DirectEntryLogger. @@ -211,6 +213,13 @@ public ByteBuf readEntry(long ledgerId, long entryId, long entryLocation) return internalReadEntry(ledgerId, entryId, entryLocation, true); } + @Override + public Pair readEntryAndExtraBytes(long ledgerId, long entryId, long entryLocation, + int readBufferSize) + throws IOException, Bookie.NoEntryException { + throw new UnsupportedOperationException("readEntryAndExtraBytes is not supported in DirectEntryLogger"); + } + private LogReader getReader(int logId) throws IOException { Cache cache = caches.get(); try { diff --git a/bookkeeper-server/src/main/java/org/apache/bookkeeper/bookie/storage/ldb/DbLedgerStorage.java b/bookkeeper-server/src/main/java/org/apache/bookkeeper/bookie/storage/ldb/DbLedgerStorage.java index 69964c8f81f..d750e9684b2 100644 --- a/bookkeeper-server/src/main/java/org/apache/bookkeeper/bookie/storage/ldb/DbLedgerStorage.java +++ b/bookkeeper-server/src/main/java/org/apache/bookkeeper/bookie/storage/ldb/DbLedgerStorage.java @@ -82,6 +82,8 @@ public class DbLedgerStorage implements LedgerStorage { public static final String WRITE_CACHE_MAX_SIZE_MB = "dbStorage_writeCacheMaxSizeMb"; public static final String READ_AHEAD_CACHE_MAX_SIZE_MB = "dbStorage_readAheadCacheMaxSizeMb"; + public static final String DISABLE_READ_CACHE = "dbStorage_disableReadCache"; + public static final String ENABLE_LOCATION_CACHE = "dbStorage_enableLocationCache"; public static final String DIRECT_IO_ENTRYLOGGER = "dbStorage_directIOEntryLogger"; public static final String DIRECT_IO_ENTRYLOGGER_TOTAL_WRITEBUFFER_SIZE_MB = "dbStorage_directIOEntryLoggerTotalWriteBufferSizeMB"; @@ -101,6 +103,8 @@ public class DbLedgerStorage implements LedgerStorage { private static final long DEFAULT_READ_CACHE_MAX_SIZE_MB = (long) (0.25 * PlatformDependent.estimateMaxDirectMemory()) / MB; + private static final boolean DEFAULT_DISABLE_READ_CACHE = false; + private static final boolean DEFAULT_ENABLE_LOCATION_CACHE = false; static final String READ_AHEAD_CACHE_BATCH_SIZE = "dbStorage_readAheadCacheBatchSize"; static final String READ_AHEAD_CACHE_BATCH_BYTES_SIZE = "dbStorage_readAheadCacheBatchBytesSize"; private static final int DEFAULT_READ_AHEAD_CACHE_BATCH_SIZE = 100; @@ -155,13 +159,20 @@ public void initialize(ServerConfiguration conf, LedgerManager ledgerManager, Le DEFAULT_READ_CACHE_MAX_SIZE_MB) * MB; boolean directIOEntryLogger = getBooleanVariableOrDefault(conf, DIRECT_IO_ENTRYLOGGER, false); + boolean disableReadCache = getBooleanVariableOrDefault(conf, DISABLE_READ_CACHE, DEFAULT_DISABLE_READ_CACHE); + boolean enableLocationCache = getBooleanVariableOrDefault(conf, ENABLE_LOCATION_CACHE, + DEFAULT_ENABLE_LOCATION_CACHE); this.allocator = allocator; this.numberOfDirs = ledgerDirsManager.getAllLedgerDirs().size(); log.info("Started Db Ledger Storage"); log.info(" - Number of directories: {}", numberOfDirs); log.info(" - Write cache size: {} MB", writeCacheMaxSize / MB); - log.info(" - Read Cache: {} MB", readCacheMaxSize / MB); + if (disableReadCache) { + log.info(" - Read Cache: DISABLED"); + } else { + log.info(" - Read Cache: {} MB", readCacheMaxSize / MB); + } if (readCacheMaxSize + writeCacheMaxSize > PlatformDependent.estimateMaxDirectMemory()) { throw new IOException("Read and write cache sizes exceed the configured max direct memory size"); @@ -242,7 +253,7 @@ public void initialize(ServerConfiguration conf, LedgerManager ledgerManager, Le idm, entrylogger, statsLogger, perDirectoryWriteCacheSize, perDirectoryReadCacheSize, - readAheadCacheBatchSize, readAheadCacheBatchBytesSize)); + readAheadCacheBatchSize, readAheadCacheBatchBytesSize, disableReadCache, enableLocationCache)); ldm.getListeners().forEach(ledgerDirsManager::addLedgerDirsListener); if (!lDirs[0].getPath().equals(iDirs[0].getPath())) { idm.getListeners().forEach(indexDirsManager::addLedgerDirsListener); @@ -281,11 +292,13 @@ public Long getSample() { protected SingleDirectoryDbLedgerStorage newSingleDirectoryDbLedgerStorage(ServerConfiguration conf, LedgerManager ledgerManager, LedgerDirsManager ledgerDirsManager, LedgerDirsManager indexDirsManager, EntryLogger entryLogger, StatsLogger statsLogger, long writeCacheSize, long readCacheSize, - int readAheadCacheBatchSize, long readAheadCacheBatchBytesSize) + int readAheadCacheBatchSize, long readAheadCacheBatchBytesSize, boolean disableReadCache, + boolean enableLocationCache) throws IOException { return new SingleDirectoryDbLedgerStorage(conf, ledgerManager, ledgerDirsManager, indexDirsManager, entryLogger, statsLogger, allocator, writeCacheSize, readCacheSize, - readAheadCacheBatchSize, readAheadCacheBatchBytesSize); + readAheadCacheBatchSize, readAheadCacheBatchBytesSize, disableReadCache, + enableLocationCache); } @Override diff --git a/bookkeeper-server/src/main/java/org/apache/bookkeeper/bookie/storage/ldb/DbLedgerStorageStats.java b/bookkeeper-server/src/main/java/org/apache/bookkeeper/bookie/storage/ldb/DbLedgerStorageStats.java index 6546dfde0f8..0d7d08c39af 100644 --- a/bookkeeper-server/src/main/java/org/apache/bookkeeper/bookie/storage/ldb/DbLedgerStorageStats.java +++ b/bookkeeper-server/src/main/java/org/apache/bookkeeper/bookie/storage/ldb/DbLedgerStorageStats.java @@ -88,13 +88,13 @@ class DbLedgerStorageStats { help = "time spent reading entries from the locations index of the db ledger storage engine", parent = READ_ENTRY ) - private final Counter readFromLocationIndexTime; + private final OpStatsLogger readFromLocationIndexTime; @StatsDoc( name = READ_ENTRYLOG_TIME, help = "time spent reading entries from the entry log files of the db ledger storage engine", parent = READ_ENTRY ) - private final Counter readFromEntryLogTime; + private final OpStatsLogger readFromEntryLogTime; @StatsDoc( name = WRITE_CACHE_HITS, help = "number of write cache hits (on reads)", @@ -133,7 +133,7 @@ class DbLedgerStorageStats { name = READAHEAD_TIME, help = "Time spent on readahead operations" ) - private final Counter readAheadTime; + private final OpStatsLogger readAheadTime; @StatsDoc( name = FLUSH, help = "operation stats of flushing write cache to entry log files" @@ -203,15 +203,15 @@ class DbLedgerStorageStats { Supplier readCacheCountSupplier) { addEntryStats = stats.getThreadScopedOpStatsLogger(ADD_ENTRY); readEntryStats = stats.getThreadScopedOpStatsLogger(READ_ENTRY); - readFromLocationIndexTime = stats.getThreadScopedCounter(READ_ENTRY_LOCATIONS_INDEX_TIME); - readFromEntryLogTime = stats.getThreadScopedCounter(READ_ENTRYLOG_TIME); + readFromLocationIndexTime = stats.getThreadScopedOpStatsLogger(READ_ENTRY_LOCATIONS_INDEX_TIME); + readFromEntryLogTime = stats.getThreadScopedOpStatsLogger(READ_ENTRYLOG_TIME); readCacheHitCounter = stats.getCounter(READ_CACHE_HITS); readCacheMissCounter = stats.getCounter(READ_CACHE_MISSES); writeCacheHitCounter = stats.getCounter(WRITE_CACHE_HITS); writeCacheMissCounter = stats.getCounter(WRITE_CACHE_MISSES); readAheadBatchCountStats = stats.getOpStatsLogger(READAHEAD_BATCH_COUNT); readAheadBatchSizeStats = stats.getOpStatsLogger(READAHEAD_BATCH_SIZE); - readAheadTime = stats.getThreadScopedCounter(READAHEAD_TIME); + readAheadTime = stats.getOpStatsLogger(READAHEAD_TIME); flushStats = stats.getOpStatsLogger(FLUSH); flushEntryLogStats = stats.getOpStatsLogger(FLUSH_ENTRYLOG); flushLocationIndexStats = stats.getOpStatsLogger(FLUSH_LOCATIONS_INDEX); diff --git a/bookkeeper-server/src/main/java/org/apache/bookkeeper/bookie/storage/ldb/DisableReadCache.java b/bookkeeper-server/src/main/java/org/apache/bookkeeper/bookie/storage/ldb/DisableReadCache.java new file mode 100644 index 00000000000..cf31acd171e --- /dev/null +++ b/bookkeeper-server/src/main/java/org/apache/bookkeeper/bookie/storage/ldb/DisableReadCache.java @@ -0,0 +1,47 @@ +package org.apache.bookkeeper.bookie.storage.ldb; + +import io.netty.buffer.ByteBuf; +import io.netty.buffer.ByteBufAllocator; + +/** + * Disable read cache. + */ +public class DisableReadCache extends ReadCache { + public DisableReadCache(ByteBufAllocator allocator, long maxCacheSize) { + super(allocator, 0); + } + + @Override + public void put(long ledgerId, long entryId, ByteBuf entry) { + // do nothing + } + + @Override + public ByteBuf get(long ledgerId, long entryId) { + // do nothing + return null; + } + + @Override + public boolean hasEntry(long ledgerId, long entryId) { + // do nothing + return false; + } + + @Override + public long size() { + // do nothing + return 0; + } + + @Override + public long count() { + // do nothing + return 0; + } + + @Override + public void close() { + super.close(); + } +} diff --git a/bookkeeper-server/src/main/java/org/apache/bookkeeper/bookie/storage/ldb/EntryLocationIndex.java b/bookkeeper-server/src/main/java/org/apache/bookkeeper/bookie/storage/ldb/EntryLocationIndex.java index a353b7cf7ee..381d3f1d3e2 100644 --- a/bookkeeper-server/src/main/java/org/apache/bookkeeper/bookie/storage/ldb/EntryLocationIndex.java +++ b/bookkeeper-server/src/main/java/org/apache/bookkeeper/bookie/storage/ldb/EntryLocationIndex.java @@ -49,7 +49,7 @@ public class EntryLocationIndex implements Closeable { private final ConcurrentLongHashSet deletedLedgers = ConcurrentLongHashSet.newBuilder().build(); private final EntryLocationIndexStats stats; private boolean isCompacting; - + private final LocationCache locationCache = new LocationCache(); public EntryLocationIndex(ServerConfiguration conf, KeyValueStorageFactory storageFactory, String basePath, StatsLogger stats) throws IOException { locationsDb = storageFactory.newKeyValueStorage(basePath, "locations", DbConfigType.EntryLocation, conf); @@ -236,6 +236,9 @@ public void removeOffsetFromDeletedLedgers() throws IOException { firstKeyWrapper.set(ledgerId, 0); lastKeyWrapper.set(ledgerId, Long.MAX_VALUE); + + // remove ledger from location cache + locationCache.removeLedger(ledgerId); batch.deleteRange(firstKeyWrapper.array, lastKeyWrapper.array); } @@ -252,5 +255,19 @@ public void removeOffsetFromDeletedLedgers() throws IOException { TimeUnit.NANOSECONDS.toMillis(System.nanoTime() - startTime) / 1000.0); } + public long tryGetLocationFromLocationCache(long ledgerId, long entryId) { + return locationCache.getIfExists(ledgerId, entryId); + } + + public void updateLocationToLocationCache(long ledgerId, long entryId, long location) { + locationCache.put(ledgerId, entryId, location); + } + + public void updateIndexCache(Iterable newLocations) { + for (EntryLocation location : newLocations) { + locationCache.put(location.ledger, location.entry, location.location); + } + } + private static final Logger log = LoggerFactory.getLogger(EntryLocationIndex.class); } diff --git a/bookkeeper-server/src/main/java/org/apache/bookkeeper/bookie/storage/ldb/LocationCache.java b/bookkeeper-server/src/main/java/org/apache/bookkeeper/bookie/storage/ldb/LocationCache.java new file mode 100644 index 00000000000..40d928ef0ce --- /dev/null +++ b/bookkeeper-server/src/main/java/org/apache/bookkeeper/bookie/storage/ldb/LocationCache.java @@ -0,0 +1,61 @@ +/* + * + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + * + */ +package org.apache.bookkeeper.bookie.storage.ldb; + +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import java.util.Map; +import java.util.concurrent.ConcurrentHashMap; + +/** + * Location cache. + * + *

This class is used to cache the location of entries in the entry logger. + * + *

The location of an entry is the position of the entry in the entry logger. + */ +public class LocationCache { + private static final Logger log = LoggerFactory.getLogger(LocationCache.class); + + private final Map> cache = new ConcurrentHashMap<>(); + + public long getIfExists(long ledgerId, long entryId) { + Map innerMap = cache.get(ledgerId); + if (innerMap != null) { + Long aLong = innerMap.get(entryId); + return aLong == null ? 0L : aLong; + } + return 0L; + } + + public void put(long ledgerId, long entryId, long position) { + cache.computeIfAbsent(ledgerId, k -> new ConcurrentHashMap<>()) + .put(entryId, position); + } + + public void removeLedger(long ledgerId) { + Map remove = cache.remove(ledgerId); + if (remove != null) { + remove.clear(); + } + } +} diff --git a/bookkeeper-server/src/main/java/org/apache/bookkeeper/bookie/storage/ldb/SingleDirectoryDbLedgerStorage.java b/bookkeeper-server/src/main/java/org/apache/bookkeeper/bookie/storage/ldb/SingleDirectoryDbLedgerStorage.java index 774d10c158f..a07e24a9e45 100644 --- a/bookkeeper-server/src/main/java/org/apache/bookkeeper/bookie/storage/ldb/SingleDirectoryDbLedgerStorage.java +++ b/bookkeeper-server/src/main/java/org/apache/bookkeeper/bookie/storage/ldb/SingleDirectoryDbLedgerStorage.java @@ -57,6 +57,7 @@ import org.apache.bookkeeper.bookie.CheckpointSource.Checkpoint; import org.apache.bookkeeper.bookie.Checkpointer; import org.apache.bookkeeper.bookie.CompactableLedgerStorage; +import org.apache.bookkeeper.bookie.DefaultEntryLogger; import org.apache.bookkeeper.bookie.EntryLocation; import org.apache.bookkeeper.bookie.GarbageCollectionStatus; import org.apache.bookkeeper.bookie.GarbageCollectorThread; @@ -81,6 +82,7 @@ import org.apache.bookkeeper.util.collections.ConcurrentLongHashMap; import org.apache.commons.collections4.CollectionUtils; import org.apache.commons.lang3.mutable.MutableLong; +import org.apache.commons.lang3.tuple.Pair; import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -91,6 +93,11 @@ *

This is meant only to be used from {@link DbLedgerStorage}. */ public class SingleDirectoryDbLedgerStorage implements CompactableLedgerStorage { + + static final long INVALID_LID = -1L; + + static final long INVALID_ENTRY_ID = -2L; + static final int NEXT_POSITION_READ_BYTES = 4/*entry size*/ + 8/*ledgerId*/ + 8/*entryId*/; private final EntryLogger entryLogger; private final LedgerMetadataIndex ledgerIndex; @@ -152,12 +159,15 @@ protected Thread newThread(Runnable r, String name) { private final boolean singleLedgerDirs; private final String ledgerBaseDir; private final String indexBaseDir; + private final boolean disableReadCache; + private final boolean enableLocationCache; public SingleDirectoryDbLedgerStorage(ServerConfiguration conf, LedgerManager ledgerManager, LedgerDirsManager ledgerDirsManager, LedgerDirsManager indexDirsManager, EntryLogger entryLogger, StatsLogger statsLogger, ByteBufAllocator allocator, long writeCacheSize, long readCacheSize, int readAheadCacheBatchSize, - long readAheadCacheBatchBytesSize) + long readAheadCacheBatchBytesSize, + boolean disableReadCache, boolean enableLocationCache) throws IOException { checkArgument(ledgerDirsManager.getAllLedgerDirs().size() == 1, "Db implementation only allows for one storage dir"); @@ -196,7 +206,13 @@ public SingleDirectoryDbLedgerStorage(ServerConfiguration conf, LedgerManager le DEFAULT_MAX_THROTTLE_TIME_MILLIS); maxThrottleTimeNanos = TimeUnit.MILLISECONDS.toNanos(maxThrottleTimeMillis); - readCache = new ReadCache(allocator, readCacheMaxSize); + this.enableLocationCache = enableLocationCache; + this.disableReadCache = disableReadCache; + if (disableReadCache) { + readCache = new DisableReadCache(allocator, readCacheMaxSize); + } else { + readCache = new ReadCache(allocator, readCacheMaxSize); + } ledgerIndex = new LedgerMetadataIndex(conf, KeyValueStorageRocksDB.factory, indexBaseDir, ledgerIndexDirStatsLogger); @@ -623,44 +639,85 @@ private ByteBuf doGetEntry(long ledgerId, long entryId) throws IOException, Book dbLedgerStorageStats.getWriteCacheMissCounter().inc(); - // Try reading from read-ahead cache - entry = readCache.get(ledgerId, entryId); - if (entry != null) { - dbLedgerStorageStats.getReadCacheHitCounter().inc(); - return entry; + if (!disableReadCache) { + // Try reading from read-ahead cache + entry = readCache.get(ledgerId, entryId); + if (entry != null) { + dbLedgerStorageStats.getReadCacheHitCounter().inc(); + return entry; + } + dbLedgerStorageStats.getReadCacheMissCounter().inc(); } - dbLedgerStorageStats.getReadCacheMissCounter().inc(); - // Read from main storage - long entryLocation; - long locationIndexStartNano = MathUtils.nowInNano(); - try { + long entryLocation = 0; + if (enableLocationCache) { + entryLocation = entryLocationIndex.tryGetLocationFromLocationCache(ledgerId, entryId); + } + if (entryLocation == 0) { + long locationIndexStartNano = MathUtils.nowInNano(); entryLocation = entryLocationIndex.getLocation(ledgerId, entryId); if (entryLocation == 0) { // Only a negative result while in limbo equates to unknown throwIfLimbo(ledgerId); + dbLedgerStorageStats.getReadFromLocationIndexTime() + .registerFailedEvent(MathUtils.elapsedNanos(locationIndexStartNano), + TimeUnit.NANOSECONDS); throw new NoEntryException(ledgerId, entryId); } - } finally { - dbLedgerStorageStats.getReadFromLocationIndexTime().addLatency( - MathUtils.elapsedNanos(locationIndexStartNano), TimeUnit.NANOSECONDS); + dbLedgerStorageStats.getReadFromLocationIndexTime() + .registerSuccessfulEvent(MathUtils.elapsedNanos(locationIndexStartNano), + TimeUnit.NANOSECONDS); } long readEntryStartNano = MathUtils.nowInNano(); + Pair entrySizeAndEntry = null; try { - entry = entryLogger.readEntry(ledgerId, entryId, entryLocation); - } finally { - dbLedgerStorageStats.getReadFromEntryLogTime().addLatency( + if (enableLocationCache + && entryLogger instanceof DefaultEntryLogger) { + entrySizeAndEntry = entryLogger.readEntryAndExtraBytes(ledgerId, entryId, entryLocation, + NEXT_POSITION_READ_BYTES); + } else { + ByteBuf buf = entryLogger.readEntry(ledgerId, entryId, entryLocation); + entrySizeAndEntry = Pair.of(buf.readableBytes(), buf); + } + } catch (Throwable t) { + dbLedgerStorageStats.getReadFromEntryLogTime().registerFailedEvent( MathUtils.elapsedNanos(readEntryStartNano), TimeUnit.NANOSECONDS); + throw t; } + dbLedgerStorageStats.getReadFromEntryLogTime().registerSuccessfulEvent( + MathUtils.elapsedNanos(readEntryStartNano), TimeUnit.NANOSECONDS); - readCache.put(ledgerId, entryId, entry); + entry = entrySizeAndEntry.getRight(); + // calculate next position and put it in the index cache + int entrySize = entrySizeAndEntry.getLeft(); + int writeIndex = entry.writerIndex(); + if (entry.readableBytes() == entrySize + NEXT_POSITION_READ_BYTES) { + // has read next entry position + long nextLedgerId = entry.getLong(writeIndex - 16); + long nextEntryId = entry.getLong(writeIndex - 8); + entry.writerIndex(writeIndex - NEXT_POSITION_READ_BYTES); + if (nextLedgerId >= 0 && nextEntryId >= 0) { + long nextEntryLocation = entryLocation + 4 /* size header */ + entry.readableBytes(); + entryLocationIndex.updateLocationToLocationCache(nextLedgerId, nextEntryId, nextEntryLocation); + } else if (nextLedgerId != INVALID_LID || nextEntryId != INVALID_ENTRY_ID) { + log.error("Invalid next entry: {}@{}", nextLedgerId, nextEntryId); + } + } else if (entry.readableBytes() < entrySize) { + log.error("Invalid entry size: {} != {}", entry.readableBytes(), entrySize); + throw new IOException("Invalid entry size: " + entry.readableBytes() + " != " + entrySize); + } else { + entry.writerIndex(entry.readerIndex() + entrySize); + } - // Try to read more entries - long nextEntryLocation = entryLocation + 4 /* size header */ + entry.readableBytes(); - fillReadAheadCache(ledgerId, entryId + 1, nextEntryLocation); + if (!disableReadCache) { + readCache.put(ledgerId, entryId, entry); + // Try to read more entries + long nextEntryLocation = entryLocation + 4 /* size header */ + entry.readableBytes(); + fillReadAheadCache(ledgerId, entryId + 1, nextEntryLocation); + } return entry; } @@ -701,15 +758,17 @@ private void fillReadAheadCache(long originalLedgerId, long firstEntryId, long f ReferenceCountUtil.release(entry); } } + dbLedgerStorageStats.getReadAheadTime().registerSuccessfulEvent( + MathUtils.elapsedNanos(readAheadStartNano), TimeUnit.NANOSECONDS); } catch (Exception e) { if (log.isDebugEnabled()) { log.debug("Exception during read ahead for ledger: {}: e", originalLedgerId, e); } + dbLedgerStorageStats.getReadAheadTime().registerFailedEvent( + MathUtils.elapsedNanos(readAheadStartNano), TimeUnit.NANOSECONDS); } finally { dbLedgerStorageStats.getReadAheadBatchCountStats().registerSuccessfulValue(count); dbLedgerStorageStats.getReadAheadBatchSizeStats().registerSuccessfulValue(size); - dbLedgerStorageStats.getReadAheadTime().addLatency( - MathUtils.elapsedNanos(readAheadStartNano), TimeUnit.NANOSECONDS); } } @@ -775,12 +834,13 @@ public ByteBuf getLastEntry(long ledgerId) throws IOException, BookieException { } long entryLocation = entryLocationIndex.getLocation(ledgerId, lastEntryId); - dbLedgerStorageStats.getReadFromLocationIndexTime().addLatency( - MathUtils.elapsedNanos(locationIndexStartNano), TimeUnit.NANOSECONDS); + dbLedgerStorageStats.getReadFromLocationIndexTime().registerSuccessfulEvent( + MathUtils.elapsedNanos(locationIndexStartNano), + TimeUnit.NANOSECONDS); long readEntryStartNano = MathUtils.nowInNano(); ByteBuf content = entryLogger.readEntry(ledgerId, lastEntryId, entryLocation); - dbLedgerStorageStats.getReadFromEntryLogTime().addLatency( + dbLedgerStorageStats.getReadFromEntryLogTime().registerSuccessfulEvent( MathUtils.elapsedNanos(readEntryStartNano), TimeUnit.NANOSECONDS); return content; } @@ -981,6 +1041,7 @@ public void updateEntriesLocations(Iterable locations) throws IOE flushMutex.lock(); flushMutex.unlock(); + entryLocationIndex.updateIndexCache(locations); // We don't need to keep the flush mutex locked here while updating the DB. // It's fine to have a concurrent flush operation at this point, because we // know that none of the entries being flushed was included in the compaction diff --git a/bookkeeper-server/src/test/java/org/apache/bookkeeper/bookie/storage/ldb/DbLedgerStorageWriteCacheTest.java b/bookkeeper-server/src/test/java/org/apache/bookkeeper/bookie/storage/ldb/DbLedgerStorageWriteCacheTest.java index 102f7f5addc..ec33d3c19f1 100644 --- a/bookkeeper-server/src/test/java/org/apache/bookkeeper/bookie/storage/ldb/DbLedgerStorageWriteCacheTest.java +++ b/bookkeeper-server/src/test/java/org/apache/bookkeeper/bookie/storage/ldb/DbLedgerStorageWriteCacheTest.java @@ -56,7 +56,8 @@ private static class MockedDbLedgerStorage extends DbLedgerStorage { protected SingleDirectoryDbLedgerStorage newSingleDirectoryDbLedgerStorage(ServerConfiguration conf, LedgerManager ledgerManager, LedgerDirsManager ledgerDirsManager, LedgerDirsManager indexDirsManager, EntryLogger entryLogger, StatsLogger statsLogger, - long writeCacheSize, long readCacheSize, int readAheadCacheBatchSize, long readAheadCacheBatchBytesSize) + long writeCacheSize, long readCacheSize, int readAheadCacheBatchSize, long readAheadCacheBatchBytesSize, + boolean disableReadCache, boolean enableLocationCache) throws IOException { return new MockedSingleDirectoryDbLedgerStorage(conf, ledgerManager, ledgerDirsManager, indexDirsManager, entryLogger, statsLogger, allocator, writeCacheSize, @@ -72,7 +73,7 @@ public MockedSingleDirectoryDbLedgerStorage(ServerConfiguration conf, LedgerMana throws IOException { super(conf, ledgerManager, ledgerDirsManager, indexDirsManager, entryLogger, statsLogger, allocator, writeCacheSize, readCacheSize, readAheadCacheBatchSize, - readAheadCacheBatchBytesSize); + readAheadCacheBatchBytesSize, false, false); } @Override From 404b75148552fa0b0ccf63566937dde459b4168e Mon Sep 17 00:00:00 2001 From: zhaizhibo Date: Fri, 2 Jan 2026 01:36:41 +0800 Subject: [PATCH 2/2] update --- .../bookie/storage/ldb/LocationCache.java | 136 +++++++++++++++--- 1 file changed, 120 insertions(+), 16 deletions(-) diff --git a/bookkeeper-server/src/main/java/org/apache/bookkeeper/bookie/storage/ldb/LocationCache.java b/bookkeeper-server/src/main/java/org/apache/bookkeeper/bookie/storage/ldb/LocationCache.java index 40d928ef0ce..16cf8ec8185 100644 --- a/bookkeeper-server/src/main/java/org/apache/bookkeeper/bookie/storage/ldb/LocationCache.java +++ b/bookkeeper-server/src/main/java/org/apache/bookkeeper/bookie/storage/ldb/LocationCache.java @@ -20,11 +20,13 @@ */ package org.apache.bookkeeper.bookie.storage.ldb; -import org.slf4j.Logger; -import org.slf4j.LoggerFactory; - import java.util.Map; +import java.util.NavigableMap; +import java.util.Objects; +import java.util.Set; import java.util.concurrent.ConcurrentHashMap; +import java.util.concurrent.ConcurrentSkipListMap; +import java.util.concurrent.locks.ReentrantReadWriteLock; /** * Location cache. @@ -33,29 +35,131 @@ * *

The location of an entry is the position of the entry in the entry logger. */ + +/** + * LocationCache with composite key (ledgerId | entryId) using SkipList + */ public class LocationCache { - private static final Logger log = LoggerFactory.getLogger(LocationCache.class); + private final static int MAX_ENTRIES = 1000000; + private static final long TIME_WINDOW_MS = 100; + private final NavigableMap skipMap = new ConcurrentSkipListMap<>(); + private final long maxEntries; + private final NavigableMap> createTime2LedgerIds = new ConcurrentSkipListMap<>(); + private final ConcurrentHashMap ledgerId2CreateTime = new ConcurrentHashMap<>(); + private final ThreadLocal queryKey = + ThreadLocal.withInitial(() -> new CompositeKey(0, 0)); + private final ReentrantReadWriteLock lock = new ReentrantReadWriteLock(); - private final Map> cache = new ConcurrentHashMap<>(); + static class CompositeKey implements Comparable { + long ledgerId; + long entryId; - public long getIfExists(long ledgerId, long entryId) { - Map innerMap = cache.get(ledgerId); - if (innerMap != null) { - Long aLong = innerMap.get(entryId); - return aLong == null ? 0L : aLong; + CompositeKey(long ledgerId, long entryId) { + this.ledgerId = ledgerId; + this.entryId = entryId; + } + + void set(long ledgerId, long entryId) { + this.ledgerId = ledgerId; + this.entryId = entryId; } - return 0L; + + @Override + public int compareTo(CompositeKey other) { + int cmp = Long.compare(this.ledgerId, other.ledgerId); + if (cmp != 0) { + return cmp; + } + return Long.compare(this.entryId, other.entryId); + } + + @Override + public boolean equals(Object o) { + if (this == o) { + return true; + } + if (o == null || getClass() != o.getClass()) { + return false; + } + CompositeKey that = (CompositeKey) o; + return ledgerId == that.ledgerId && entryId == that.entryId; + } + + @Override + public int hashCode() { + return Objects.hash(ledgerId, entryId); + } + } + + public LocationCache() { + this(MAX_ENTRIES); + } + + public LocationCache(long maxEntries) { + this.maxEntries = maxEntries; + } + + public long getIfExists(long ledgerId, long entryId) { + CompositeKey key = queryKey.get(); + key.set(ledgerId, entryId); + Long position = skipMap.get(key); + return position != null ? position : 0L; } public void put(long ledgerId, long entryId, long position) { - cache.computeIfAbsent(ledgerId, k -> new ConcurrentHashMap<>()) - .put(entryId, position); + CompositeKey key = new CompositeKey(ledgerId, entryId); + if (skipMap.containsKey(key)) { + skipMap.put(key, position); + return; + } + if (skipMap.size() >= maxEntries) { + evictOldestEntries(); + } + + lock.readLock().lock(); + try { + skipMap.put(key, position); + Long timeStamp = ledgerId2CreateTime.computeIfAbsent(ledgerId, + __ -> System.currentTimeMillis() / TIME_WINDOW_MS); + createTime2LedgerIds.computeIfAbsent(timeStamp, + __ -> ConcurrentHashMap.newKeySet()).add(ledgerId); + } finally { + lock.readLock().unlock(); + } + } + + + private void evictOldestEntries() { + lock.writeLock().lock(); + try { + if (skipMap.size() < maxEntries) { + return; + } + Map.Entry> firstEntry = createTime2LedgerIds.pollFirstEntry(); + if (firstEntry != null) { + Long createTime = firstEntry.getKey(); + Set ledgerIds = firstEntry.getValue(); + for (Long ledgerId : ledgerIds) { + removeLedger(ledgerId); + } + createTime2LedgerIds.remove(createTime); + } + } finally { + lock.writeLock().unlock(); + } } public void removeLedger(long ledgerId) { - Map remove = cache.remove(ledgerId); - if (remove != null) { - remove.clear(); + lock.writeLock().lock(); + try { + CompositeKey startKey = new CompositeKey(ledgerId, 0); + CompositeKey endKey = new CompositeKey(ledgerId, Long.MAX_VALUE); + NavigableMap ledgerEntries = + skipMap.subMap(startKey, true, endKey, true); + ledgerEntries.clear(); + ledgerId2CreateTime.remove(ledgerId); + } finally { + lock.writeLock().unlock(); } } }