From b64b506db342ff0c89cfcf9b5d3af73a41a7c3e3 Mon Sep 17 00:00:00 2001 From: Tsz-Wo Nicholas Sze Date: Wed, 31 Jul 2024 11:10:41 -0700 Subject: [PATCH 01/11] RATIS-2129. Low replication performance because LogAppender is often blocked by RaftLog's readLock. --- .../server/raftlog/segmented/LogSegment.java | 71 ++++++++++++++----- .../raftlog/segmented/SegmentedRaftLog.java | 41 ++++------- .../segmented/SegmentedRaftLogCache.java | 8 ++- .../segmented/TestSegmentedRaftLogCache.java | 5 +- 4 files changed, 76 insertions(+), 49 deletions(-) diff --git a/ratis-server/src/main/java/org/apache/ratis/server/raftlog/segmented/LogSegment.java b/ratis-server/src/main/java/org/apache/ratis/server/raftlog/segmented/LogSegment.java index 444d417ba5..8358887554 100644 --- a/ratis-server/src/main/java/org/apache/ratis/server/raftlog/segmented/LogSegment.java +++ b/ratis-server/src/main/java/org/apache/ratis/server/raftlog/segmented/LogSegment.java @@ -36,13 +36,13 @@ import java.io.File; import java.io.IOException; -import java.util.ArrayList; import java.nio.file.Path; import java.util.Comparator; -import java.util.List; import java.util.Map; import java.util.Objects; import java.util.concurrent.ConcurrentHashMap; +import java.util.concurrent.ConcurrentNavigableMap; +import java.util.concurrent.ConcurrentSkipListMap; import java.util.concurrent.atomic.AtomicInteger; import java.util.concurrent.atomic.AtomicLong; import java.util.concurrent.atomic.AtomicReference; @@ -105,6 +105,44 @@ long getOffset() { } } + private static class Records { + private final ConcurrentNavigableMap map = new ConcurrentSkipListMap<>(); + + int size() { + return map.size(); + } + + LogRecord getFirst() { + final Map.Entry first = map.firstEntry(); + return first != null? first.getValue() : null; + } + + LogRecord getLast() { + final Map.Entry last = map.lastEntry(); + return last != null? last.getValue() : null; + } + + LogRecord get(long i) { + return map.get(i); + } + + long append(LogRecord record) { + final long index = record.getTermIndex().getIndex(); + final LogRecord previous = map.put(index, record); + Preconditions.assertNull(previous, "previous"); + return index; + } + + LogRecord removeLast() { + final Map.Entry last = map.pollLastEntry(); + return Objects.requireNonNull(last, "last == null").getValue(); + } + + void clear() { + map.clear(); + } + } + static LogSegment newOpenSegment(RaftStorage storage, long start, SizeInBytes maxOpSize, SegmentedRaftLogMetrics raftLogMetrics) { Preconditions.assertTrue(start >= 0); @@ -204,10 +242,12 @@ private void assertSegment(long expectedStart, int expectedEntryCount, boolean c final long expectedLastIndex = expectedStart + expectedEntryCount - 1; Preconditions.assertSame(expectedLastIndex, getEndIndex(), "Segment end index"); - final LogRecord last = getLastRecord(); + final LogRecord last = records.getLast(); if (last != null) { Preconditions.assertSame(expectedLastIndex, last.getTermIndex().getIndex(), "Index at the last record"); - Preconditions.assertSame(expectedStart, records.get(0).getTermIndex().getIndex(), "Index at the first record"); + final LogRecord first = records.getFirst(); + Objects.requireNonNull(first, "first record"); + Preconditions.assertSame(expectedStart, first.getTermIndex().getIndex(), "Index at the first record"); } if (!corrupted) { Preconditions.assertSame(expectedEnd, expectedLastIndex, "End/last Index"); @@ -272,7 +312,7 @@ File getFile() { /** * the list of records is more like the index of a segment */ - private final List records = new ArrayList<>(); + private final Records records = new Records(); /** * the entryCache caches the content of log entries. */ @@ -315,14 +355,12 @@ void appendToOpenSegment(LogEntryProto entry, Op op) { private void append(boolean keepEntryInCache, LogEntryProto entry, Op op) { Objects.requireNonNull(entry, "entry == null"); - if (records.isEmpty()) { + final LogRecord currentLast = records.getLast(); + if (currentLast == null) { Preconditions.assertTrue(entry.getIndex() == startIndex, "gap between start index %s and first entry to append %s", startIndex, entry.getIndex()); - } - - final LogRecord currentLast = getLastRecord(); - if (currentLast != null) { + } else { Preconditions.assertTrue(entry.getIndex() == currentLast.getTermIndex().getIndex() + 1, "gap between entries %s and %s", entry.getIndex(), currentLast.getTermIndex().getIndex()); } @@ -331,7 +369,7 @@ private void append(boolean keepEntryInCache, LogEntryProto entry, Op op) { if (keepEntryInCache) { putEntryCache(record.getTermIndex(), entry, op); } - records.add(record); + records.append(record); totalFileSize += getEntrySize(entry, op); endIndex = entry.getIndex(); } @@ -359,17 +397,13 @@ synchronized LogEntryProto loadCache(LogRecord record) throws RaftLogIOException LogRecord getLogRecord(long index) { if (index >= startIndex && index <= endIndex) { - return records.get(Math.toIntExact(index - startIndex)); + return records.get(index); } return null; } - private LogRecord getLastRecord() { - return records.isEmpty() ? null : records.get(records.size() - 1); - } - TermIndex getLastTermIndex() { - LogRecord last = getLastRecord(); + final LogRecord last = records.getLast(); return last == null ? null : last.getTermIndex(); } @@ -387,7 +421,8 @@ long getTotalCacheSize() { synchronized void truncate(long fromIndex) { Preconditions.assertTrue(fromIndex >= startIndex && fromIndex <= endIndex); for (long index = endIndex; index >= fromIndex; index--) { - LogRecord removed = records.remove(Math.toIntExact(index - startIndex)); + final LogRecord removed = records.removeLast(); + Preconditions.assertSame(index, removed.getTermIndex().getIndex(), "removedIndex"); removeEntryCache(removed.getTermIndex(), Op.REMOVE_CACHE); totalFileSize = removed.offset; } diff --git a/ratis-server/src/main/java/org/apache/ratis/server/raftlog/segmented/SegmentedRaftLog.java b/ratis-server/src/main/java/org/apache/ratis/server/raftlog/segmented/SegmentedRaftLog.java index 6dc3d7961c..ffca252c9e 100644 --- a/ratis-server/src/main/java/org/apache/ratis/server/raftlog/segmented/SegmentedRaftLog.java +++ b/ratis-server/src/main/java/org/apache/ratis/server/raftlog/segmented/SegmentedRaftLog.java @@ -272,22 +272,18 @@ private void loadLogSegments(long lastIndexInSnapshot, @Override public LogEntryProto get(long index) throws RaftLogIOException { checkLogState(); - final LogSegment segment; - final LogRecord record; - try (AutoCloseableLock readLock = readLock()) { - segment = cache.getSegment(index); - if (segment == null) { - return null; - } - record = segment.getLogRecord(index); - if (record == null) { - return null; - } - final LogEntryProto entry = segment.getEntryFromCache(record.getTermIndex()); - if (entry != null) { - getRaftLogMetrics().onRaftLogCacheHit(); - return entry; - } + final LogSegment segment = cache.getSegment(index); + if (segment == null) { + return null; + } + final LogRecord record = segment.getLogRecord(index); + if (record == null) { + return null; + } + final LogEntryProto entry = segment.getEntryFromCache(record.getTermIndex()); + if (entry != null) { + getRaftLogMetrics().onRaftLogCacheHit(); + return entry; } // the entry is not in the segment's cache. Load the cache without holding the lock. @@ -337,26 +333,19 @@ private void checkAndEvictCache() { @Override public TermIndex getTermIndex(long index) { checkLogState(); - try(AutoCloseableLock readLock = readLock()) { - LogRecord record = cache.getLogRecord(index); - return record != null ? record.getTermIndex() : null; - } + return cache.getTermIndex(index); } @Override public LogEntryHeader[] getEntries(long startIndex, long endIndex) { checkLogState(); - try(AutoCloseableLock readLock = readLock()) { - return cache.getTermIndices(startIndex, endIndex); - } + return cache.getTermIndices(startIndex, endIndex); } @Override public TermIndex getLastEntryTermIndex() { checkLogState(); - try(AutoCloseableLock readLock = readLock()) { - return cache.getLastTermIndex(); - } + return cache.getLastTermIndex(); } @Override diff --git a/ratis-server/src/main/java/org/apache/ratis/server/raftlog/segmented/SegmentedRaftLogCache.java b/ratis-server/src/main/java/org/apache/ratis/server/raftlog/segmented/SegmentedRaftLogCache.java index a1f0cdd8ab..9a69e27bad 100644 --- a/ratis-server/src/main/java/org/apache/ratis/server/raftlog/segmented/SegmentedRaftLogCache.java +++ b/ratis-server/src/main/java/org/apache/ratis/server/raftlog/segmented/SegmentedRaftLogCache.java @@ -547,9 +547,13 @@ LogSegment getSegment(long index) { } } - LogRecord getLogRecord(long index) { + TermIndex getTermIndex(long index) { LogSegment segment = getSegment(index); - return segment == null ? null : segment.getLogRecord(index); + if (segment == null) { + return null; + } + final LogRecord record = segment.getLogRecord(index); + return record != null ? record.getTermIndex() : null; } /** diff --git a/ratis-test/src/test/java/org/apache/ratis/server/raftlog/segmented/TestSegmentedRaftLogCache.java b/ratis-test/src/test/java/org/apache/ratis/server/raftlog/segmented/TestSegmentedRaftLogCache.java index 7c2dbac912..532e32c87d 100644 --- a/ratis-test/src/test/java/org/apache/ratis/server/raftlog/segmented/TestSegmentedRaftLogCache.java +++ b/ratis-test/src/test/java/org/apache/ratis/server/raftlog/segmented/TestSegmentedRaftLogCache.java @@ -20,7 +20,6 @@ import static org.apache.ratis.server.metrics.SegmentedRaftLogMetrics.*; import static org.apache.ratis.server.raftlog.segmented.SegmentedRaftLogTestUtils.MAX_OP_SIZE; -import java.io.IOException; import java.util.Iterator; import java.util.stream.IntStream; @@ -282,12 +281,12 @@ private void populatedSegment(int start, int end, int segmentSize, boolean isOpe }); } - private void testIterator(long startIndex) throws IOException { + private void testIterator(long startIndex) { Iterator iterator = cache.iterator(startIndex); TermIndex prev = null; while (iterator.hasNext()) { TermIndex termIndex = iterator.next(); - Assertions.assertEquals(cache.getLogRecord(termIndex.getIndex()).getTermIndex(), termIndex); + Assertions.assertEquals(cache.getTermIndex(termIndex.getIndex()), termIndex); if (prev != null) { Assertions.assertEquals(prev.getIndex() + 1, termIndex.getIndex()); } From 71ff146bcc824ecf86f388b09160dc3e667b6557 Mon Sep 17 00:00:00 2001 From: Symious Date: Fri, 28 Nov 2025 17:50:53 +0800 Subject: [PATCH 02/11] RATIS-2129. Fix exceptions met in Ozone writing --- .../ratis/server/raftlog/segmented/LogSegment.java | 11 +++++++---- .../raftlog/segmented/SegmentedRaftLogCache.java | 2 +- 2 files changed, 8 insertions(+), 5 deletions(-) diff --git a/ratis-server/src/main/java/org/apache/ratis/server/raftlog/segmented/LogSegment.java b/ratis-server/src/main/java/org/apache/ratis/server/raftlog/segmented/LogSegment.java index 8358887554..2876e7cb57 100644 --- a/ratis-server/src/main/java/org/apache/ratis/server/raftlog/segmented/LogSegment.java +++ b/ratis-server/src/main/java/org/apache/ratis/server/raftlog/segmented/LogSegment.java @@ -333,7 +333,10 @@ long getStartIndex() { } long getEndIndex() { - return endIndex; + if (records.getLast() == null) { + return getStartIndex() - 1; + } + return records.getLast().getTermIndex().getIndex(); } boolean isOpen() { @@ -341,7 +344,7 @@ boolean isOpen() { } int numOfEntries() { - return Math.toIntExact(endIndex - startIndex + 1); + return Math.toIntExact(getEndIndex() - startIndex + 1); } CorruptionPolicy getLogCorruptionPolicy() { @@ -396,7 +399,7 @@ synchronized LogEntryProto loadCache(LogRecord record) throws RaftLogIOException } LogRecord getLogRecord(long index) { - if (index >= startIndex && index <= endIndex) { + if (index >= startIndex && index <= getEndIndex()) { return records.get(index); } return null; @@ -493,7 +496,7 @@ boolean hasCache() { } boolean containsIndex(long index) { - return startIndex <= index && endIndex >= index; + return startIndex <= index && getEndIndex() >= index; } boolean hasEntries() { diff --git a/ratis-server/src/main/java/org/apache/ratis/server/raftlog/segmented/SegmentedRaftLogCache.java b/ratis-server/src/main/java/org/apache/ratis/server/raftlog/segmented/SegmentedRaftLogCache.java index 9a69e27bad..df0a639e71 100644 --- a/ratis-server/src/main/java/org/apache/ratis/server/raftlog/segmented/SegmentedRaftLogCache.java +++ b/ratis-server/src/main/java/org/apache/ratis/server/raftlog/segmented/SegmentedRaftLogCache.java @@ -614,7 +614,7 @@ long getLastIndexInClosedSegments() { TermIndex getLastTermIndex() { try (AutoCloseableLock readLock = closedSegments.readLock()) { - return (openSegment != null && openSegment.numOfEntries() > 0) ? + return (openSegment != null && openSegment.getLastTermIndex() != null) ? openSegment.getLastTermIndex() : (closedSegments.isEmpty() ? null : closedSegments.get(closedSegments.size() - 1).getLastTermIndex()); From 869eac6e4964b7d7c5b3c016196117b99d4f3fcd Mon Sep 17 00:00:00 2001 From: Symious Date: Fri, 28 Nov 2025 19:15:08 +0800 Subject: [PATCH 03/11] RATIS-2129. Fix unit test --- .../server/raftlog/segmented/CacheInvalidationPolicy.java | 2 ++ .../org/apache/ratis/server/raftlog/segmented/LogSegment.java | 3 +++ 2 files changed, 5 insertions(+) diff --git a/ratis-server/src/main/java/org/apache/ratis/server/raftlog/segmented/CacheInvalidationPolicy.java b/ratis-server/src/main/java/org/apache/ratis/server/raftlog/segmented/CacheInvalidationPolicy.java index baae8e2ee6..4795558f1c 100644 --- a/ratis-server/src/main/java/org/apache/ratis/server/raftlog/segmented/CacheInvalidationPolicy.java +++ b/ratis-server/src/main/java/org/apache/ratis/server/raftlog/segmented/CacheInvalidationPolicy.java @@ -64,6 +64,8 @@ private List evictImpl(long[] followerNextIndices, // a segment's cache can be invalidated only if it's close and all its // entries have been flushed to the local disk and the local disk // segment is also closed. + boolean a = segment.isOpen(); + long t1 = segment.getEndIndex(); if (!segment.isOpen() && segment.getEndIndex() <= safeEvictIndex) { break; } diff --git a/ratis-server/src/main/java/org/apache/ratis/server/raftlog/segmented/LogSegment.java b/ratis-server/src/main/java/org/apache/ratis/server/raftlog/segmented/LogSegment.java index 2876e7cb57..a3d74cb1a0 100644 --- a/ratis-server/src/main/java/org/apache/ratis/server/raftlog/segmented/LogSegment.java +++ b/ratis-server/src/main/java/org/apache/ratis/server/raftlog/segmented/LogSegment.java @@ -333,6 +333,9 @@ long getStartIndex() { } long getEndIndex() { + if (!isOpen) { + return endIndex; + } if (records.getLast() == null) { return getStartIndex() - 1; } From b91bbad86415e4f46a41d5a7ac17fd2959076dc5 Mon Sep 17 00:00:00 2001 From: Symious Date: Sun, 30 Nov 2025 13:48:58 +0800 Subject: [PATCH 04/11] RATIS-2129. Remove unused code --- .../ratis/server/raftlog/segmented/CacheInvalidationPolicy.java | 2 -- 1 file changed, 2 deletions(-) diff --git a/ratis-server/src/main/java/org/apache/ratis/server/raftlog/segmented/CacheInvalidationPolicy.java b/ratis-server/src/main/java/org/apache/ratis/server/raftlog/segmented/CacheInvalidationPolicy.java index 4795558f1c..baae8e2ee6 100644 --- a/ratis-server/src/main/java/org/apache/ratis/server/raftlog/segmented/CacheInvalidationPolicy.java +++ b/ratis-server/src/main/java/org/apache/ratis/server/raftlog/segmented/CacheInvalidationPolicy.java @@ -64,8 +64,6 @@ private List evictImpl(long[] followerNextIndices, // a segment's cache can be invalidated only if it's close and all its // entries have been flushed to the local disk and the local disk // segment is also closed. - boolean a = segment.isOpen(); - long t1 = segment.getEndIndex(); if (!segment.isOpen() && segment.getEndIndex() <= safeEvictIndex) { break; } From 80183095978429f5aea297f1123cf9abd1eeb004 Mon Sep 17 00:00:00 2001 From: Symious Date: Sun, 30 Nov 2025 14:31:33 +0800 Subject: [PATCH 05/11] Revert "RATIS-2129. Fix unit test" This reverts commit 869eac6e4964b7d7c5b3c016196117b99d4f3fcd. --- .../org/apache/ratis/server/raftlog/segmented/LogSegment.java | 3 --- 1 file changed, 3 deletions(-) diff --git a/ratis-server/src/main/java/org/apache/ratis/server/raftlog/segmented/LogSegment.java b/ratis-server/src/main/java/org/apache/ratis/server/raftlog/segmented/LogSegment.java index a3d74cb1a0..2876e7cb57 100644 --- a/ratis-server/src/main/java/org/apache/ratis/server/raftlog/segmented/LogSegment.java +++ b/ratis-server/src/main/java/org/apache/ratis/server/raftlog/segmented/LogSegment.java @@ -333,9 +333,6 @@ long getStartIndex() { } long getEndIndex() { - if (!isOpen) { - return endIndex; - } if (records.getLast() == null) { return getStartIndex() - 1; } From b57c89d6added168d9604990d7bf8b8a74b201b6 Mon Sep 17 00:00:00 2001 From: Symious Date: Wed, 3 Dec 2025 19:14:44 +0800 Subject: [PATCH 06/11] Reapply "RATIS-2129. Fix unit test" This reverts commit 80183095978429f5aea297f1123cf9abd1eeb004. --- .../org/apache/ratis/server/raftlog/segmented/LogSegment.java | 3 +++ 1 file changed, 3 insertions(+) diff --git a/ratis-server/src/main/java/org/apache/ratis/server/raftlog/segmented/LogSegment.java b/ratis-server/src/main/java/org/apache/ratis/server/raftlog/segmented/LogSegment.java index 2876e7cb57..a3d74cb1a0 100644 --- a/ratis-server/src/main/java/org/apache/ratis/server/raftlog/segmented/LogSegment.java +++ b/ratis-server/src/main/java/org/apache/ratis/server/raftlog/segmented/LogSegment.java @@ -333,6 +333,9 @@ long getStartIndex() { } long getEndIndex() { + if (!isOpen) { + return endIndex; + } if (records.getLast() == null) { return getStartIndex() - 1; } From 0919f634e2ffece88a55dcc78186f041640e9656 Mon Sep 17 00:00:00 2001 From: Symious Date: Fri, 5 Dec 2025 14:31:40 +0800 Subject: [PATCH 07/11] RATIS-2129. Single call of records.getLast() --- .../apache/ratis/server/raftlog/segmented/LogSegment.java | 6 ++---- 1 file changed, 2 insertions(+), 4 deletions(-) diff --git a/ratis-server/src/main/java/org/apache/ratis/server/raftlog/segmented/LogSegment.java b/ratis-server/src/main/java/org/apache/ratis/server/raftlog/segmented/LogSegment.java index a3d74cb1a0..c40b91f708 100644 --- a/ratis-server/src/main/java/org/apache/ratis/server/raftlog/segmented/LogSegment.java +++ b/ratis-server/src/main/java/org/apache/ratis/server/raftlog/segmented/LogSegment.java @@ -336,10 +336,8 @@ long getEndIndex() { if (!isOpen) { return endIndex; } - if (records.getLast() == null) { - return getStartIndex() - 1; - } - return records.getLast().getTermIndex().getIndex(); + final LogRecord last = records.getLast(); + return last == null ? getStartIndex() - 1 : last.getTermIndex().getIndex(); } boolean isOpen() { From 1fdcba77329b3e7b619dbd7ca06180c377e743a9 Mon Sep 17 00:00:00 2001 From: Symious Date: Mon, 8 Dec 2025 18:04:55 +0800 Subject: [PATCH 08/11] RATIS-2129. Single call of getLastTermIndex --- .../raftlog/segmented/SegmentedRaftLogCache.java | 10 ++++++---- 1 file changed, 6 insertions(+), 4 deletions(-) diff --git a/ratis-server/src/main/java/org/apache/ratis/server/raftlog/segmented/SegmentedRaftLogCache.java b/ratis-server/src/main/java/org/apache/ratis/server/raftlog/segmented/SegmentedRaftLogCache.java index df0a639e71..2e31037d0d 100644 --- a/ratis-server/src/main/java/org/apache/ratis/server/raftlog/segmented/SegmentedRaftLogCache.java +++ b/ratis-server/src/main/java/org/apache/ratis/server/raftlog/segmented/SegmentedRaftLogCache.java @@ -614,10 +614,12 @@ long getLastIndexInClosedSegments() { TermIndex getLastTermIndex() { try (AutoCloseableLock readLock = closedSegments.readLock()) { - return (openSegment != null && openSegment.getLastTermIndex() != null) ? - openSegment.getLastTermIndex() : - (closedSegments.isEmpty() ? null : - closedSegments.get(closedSegments.size() - 1).getLastTermIndex()); + TermIndex lastOpenIndex = + (openSegment != null) ? openSegment.getLastTermIndex() : null; + return lastOpenIndex != null + ? lastOpenIndex + : (closedSegments.isEmpty() ? null : + closedSegments.get(closedSegments.size() - 1).getLastTermIndex()); } } From d3e32d5a1f7e2cc9b10c49c2722c6e4edf2ef163 Mon Sep 17 00:00:00 2001 From: Symious Date: Tue, 9 Dec 2025 09:35:07 +0800 Subject: [PATCH 09/11] Revert "RATIS-2129. Single call of getLastTermIndex" This reverts commit 1fdcba77329b3e7b619dbd7ca06180c377e743a9. --- .../raftlog/segmented/SegmentedRaftLogCache.java | 10 ++++------ 1 file changed, 4 insertions(+), 6 deletions(-) diff --git a/ratis-server/src/main/java/org/apache/ratis/server/raftlog/segmented/SegmentedRaftLogCache.java b/ratis-server/src/main/java/org/apache/ratis/server/raftlog/segmented/SegmentedRaftLogCache.java index 2e31037d0d..df0a639e71 100644 --- a/ratis-server/src/main/java/org/apache/ratis/server/raftlog/segmented/SegmentedRaftLogCache.java +++ b/ratis-server/src/main/java/org/apache/ratis/server/raftlog/segmented/SegmentedRaftLogCache.java @@ -614,12 +614,10 @@ long getLastIndexInClosedSegments() { TermIndex getLastTermIndex() { try (AutoCloseableLock readLock = closedSegments.readLock()) { - TermIndex lastOpenIndex = - (openSegment != null) ? openSegment.getLastTermIndex() : null; - return lastOpenIndex != null - ? lastOpenIndex - : (closedSegments.isEmpty() ? null : - closedSegments.get(closedSegments.size() - 1).getLastTermIndex()); + return (openSegment != null && openSegment.getLastTermIndex() != null) ? + openSegment.getLastTermIndex() : + (closedSegments.isEmpty() ? null : + closedSegments.get(closedSegments.size() - 1).getLastTermIndex()); } } From bdb69606201293de8d34f6ac29bbbe53c3431cbd Mon Sep 17 00:00:00 2001 From: Symious Date: Tue, 9 Dec 2025 09:35:47 +0800 Subject: [PATCH 10/11] RATIS-2129. Save as tmpSegment --- .../server/raftlog/segmented/SegmentedRaftLogCache.java | 5 +++-- 1 file changed, 3 insertions(+), 2 deletions(-) diff --git a/ratis-server/src/main/java/org/apache/ratis/server/raftlog/segmented/SegmentedRaftLogCache.java b/ratis-server/src/main/java/org/apache/ratis/server/raftlog/segmented/SegmentedRaftLogCache.java index df0a639e71..46acbcc3d8 100644 --- a/ratis-server/src/main/java/org/apache/ratis/server/raftlog/segmented/SegmentedRaftLogCache.java +++ b/ratis-server/src/main/java/org/apache/ratis/server/raftlog/segmented/SegmentedRaftLogCache.java @@ -614,8 +614,9 @@ long getLastIndexInClosedSegments() { TermIndex getLastTermIndex() { try (AutoCloseableLock readLock = closedSegments.readLock()) { - return (openSegment != null && openSegment.getLastTermIndex() != null) ? - openSegment.getLastTermIndex() : + LogSegment tmpSegment = openSegment; + return (tmpSegment != null && tmpSegment.getLastTermIndex() != null) ? + tmpSegment.getLastTermIndex() : (closedSegments.isEmpty() ? null : closedSegments.get(closedSegments.size() - 1).getLastTermIndex()); } From d8ef8c345ea9c4e862c16d362826a307120e71cd Mon Sep 17 00:00:00 2001 From: Symious Date: Thu, 18 Dec 2025 22:23:17 +0800 Subject: [PATCH 11/11] RATIS-2129. Add config to enable read lock --- .../ratis/server/RaftServerConfigKeys.java | 10 ++++ .../raftlog/segmented/SegmentedRaftLog.java | 47 +++++++++++++------ 2 files changed, 42 insertions(+), 15 deletions(-) diff --git a/ratis-server-api/src/main/java/org/apache/ratis/server/RaftServerConfigKeys.java b/ratis-server-api/src/main/java/org/apache/ratis/server/RaftServerConfigKeys.java index 849597433a..002286c4ca 100644 --- a/ratis-server-api/src/main/java/org/apache/ratis/server/RaftServerConfigKeys.java +++ b/ratis-server-api/src/main/java/org/apache/ratis/server/RaftServerConfigKeys.java @@ -415,6 +415,16 @@ static void setSegmentSizeMax(RaftProperties properties, SizeInBytes segmentSize setSizeInBytes(properties::set, SEGMENT_SIZE_MAX_KEY, segmentSizeMax); } + String READ_LOCK_ENABLED_KEY = PREFIX + ".read.lock.enabled"; + boolean READ_LOCK_ENABLED_DEFAULT = true; + static boolean readLockEnabled(RaftProperties properties) { + return getBoolean(properties::getBoolean, + READ_LOCK_ENABLED_KEY, READ_LOCK_ENABLED_DEFAULT, getDefaultLog()); + } + static void setReadLockEnabled(RaftProperties properties, boolean readLockEnabled) { + setBoolean(properties::setBoolean, READ_LOCK_ENABLED_KEY, readLockEnabled); + } + /** * Besides the open segment, the max number of segments caching log entries. */ diff --git a/ratis-server/src/main/java/org/apache/ratis/server/raftlog/segmented/SegmentedRaftLog.java b/ratis-server/src/main/java/org/apache/ratis/server/raftlog/segmented/SegmentedRaftLog.java index ffca252c9e..6bcc3f8e1c 100644 --- a/ratis-server/src/main/java/org/apache/ratis/server/raftlog/segmented/SegmentedRaftLog.java +++ b/ratis-server/src/main/java/org/apache/ratis/server/raftlog/segmented/SegmentedRaftLog.java @@ -202,6 +202,7 @@ public TransactionContext getTransactionContext(LogEntryProto entry, boolean cre private final long segmentMaxSize; private final boolean stateMachineCachingEnabled; private final SegmentedRaftLogMetrics metrics; + private final boolean readLockEnabled; @SuppressWarnings({"squid:S2095"}) // Suppress closeable warning private SegmentedRaftLog(Builder b) { @@ -217,6 +218,12 @@ private SegmentedRaftLog(Builder b) { this.fileLogWorker = new SegmentedRaftLogWorker(b.memberId, stateMachine, b.submitUpdateCommitEvent, b.server, storage, b.properties, getRaftLogMetrics()); stateMachineCachingEnabled = RaftServerConfigKeys.Log.StateMachineData.cachingEnabled(b.properties); + this.readLockEnabled = RaftServerConfigKeys.Log.readLockEnabled(b.properties); + } + + @Override + public AutoCloseableLock readLock() { + return readLockEnabled ? super.readLock() : null; } @Override @@ -272,18 +279,22 @@ private void loadLogSegments(long lastIndexInSnapshot, @Override public LogEntryProto get(long index) throws RaftLogIOException { checkLogState(); - final LogSegment segment = cache.getSegment(index); - if (segment == null) { - return null; - } - final LogRecord record = segment.getLogRecord(index); - if (record == null) { - return null; - } - final LogEntryProto entry = segment.getEntryFromCache(record.getTermIndex()); - if (entry != null) { - getRaftLogMetrics().onRaftLogCacheHit(); - return entry; + final LogSegment segment; + final LogRecord record; + try (AutoCloseableLock readLock = readLock()) { + segment = cache.getSegment(index); + if (segment == null) { + return null; + } + record = segment.getLogRecord(index); + if (record == null) { + return null; + } + final LogEntryProto entry = segment.getEntryFromCache(record.getTermIndex()); + if (entry != null) { + getRaftLogMetrics().onRaftLogCacheHit(); + return entry; + } } // the entry is not in the segment's cache. Load the cache without holding the lock. @@ -333,19 +344,25 @@ private void checkAndEvictCache() { @Override public TermIndex getTermIndex(long index) { checkLogState(); - return cache.getTermIndex(index); + try(AutoCloseableLock readLock = readLock()) { + return cache.getTermIndex(index); + } } @Override public LogEntryHeader[] getEntries(long startIndex, long endIndex) { checkLogState(); - return cache.getTermIndices(startIndex, endIndex); + try(AutoCloseableLock readLock = readLock()) { + return cache.getTermIndices(startIndex, endIndex); + } } @Override public TermIndex getLastEntryTermIndex() { checkLogState(); - return cache.getLastTermIndex(); + try(AutoCloseableLock readLock = readLock()) { + return cache.getLastTermIndex(); + } } @Override