From c75bc36787ab9bc2ebe6a225cfd7252208970bdf Mon Sep 17 00:00:00 2001 From: vraulji567 Date: Tue, 4 Nov 2025 17:19:10 -0500 Subject: [PATCH] [fix] Ledger Replication worker should throttle on verrification reads --- .../replication/ReplicationWorker.java | 2 +- .../bookkeeper/client/TestLedgerChecker.java | 51 +++++++++++++++++++ 2 files changed, 52 insertions(+), 1 deletion(-) diff --git a/bookkeeper-server/src/main/java/org/apache/bookkeeper/replication/ReplicationWorker.java b/bookkeeper-server/src/main/java/org/apache/bookkeeper/replication/ReplicationWorker.java index 05807455dc4..cebd8a1b551 100644 --- a/bookkeeper-server/src/main/java/org/apache/bookkeeper/replication/ReplicationWorker.java +++ b/bookkeeper-server/src/main/java/org/apache/bookkeeper/replication/ReplicationWorker.java @@ -192,7 +192,7 @@ public ReplicationWorker(final ServerConfiguration conf, this.underreplicationManager = bkc.getLedgerManagerFactory().newLedgerUnderreplicationManager(); this.ledgerManager = bkc.getLedgerManagerFactory().newLedgerManager(); this.admin = new BookKeeperAdmin(bkc, statsLogger, new ClientConfiguration(conf)); - this.ledgerChecker = new LedgerChecker(bkc); + this.ledgerChecker = new LedgerChecker(bkc, conf.getInFlightReadEntryNumInLedgerChecker()); this.workerThread = new BookieThread(this, "ReplicationWorker"); this.openLedgerRereplicationGracePeriod = conf .getOpenLedgerRereplicationGracePeriod(); diff --git a/bookkeeper-server/src/test/java/org/apache/bookkeeper/client/TestLedgerChecker.java b/bookkeeper-server/src/test/java/org/apache/bookkeeper/client/TestLedgerChecker.java index 222cd0aac4c..545c9b8ac0b 100644 --- a/bookkeeper-server/src/test/java/org/apache/bookkeeper/client/TestLedgerChecker.java +++ b/bookkeeper-server/src/test/java/org/apache/bookkeeper/client/TestLedgerChecker.java @@ -24,6 +24,9 @@ import static org.junit.Assert.assertNotNull; import static org.junit.Assert.assertTrue; import static org.mockito.ArgumentMatchers.any; +import static org.mockito.ArgumentMatchers.anyLong; +import static org.mockito.ArgumentMatchers.anyInt; +import static org.mockito.Mockito.doAnswer; import static org.mockito.Mockito.mock; import static org.mockito.Mockito.times; import static org.mockito.Mockito.verify; @@ -34,6 +37,8 @@ import java.util.concurrent.CountDownLatch; import java.util.concurrent.atomic.AtomicInteger; import org.apache.bookkeeper.net.BookieId; +import org.apache.bookkeeper.proto.BookieClient; +import org.apache.bookkeeper.proto.BookkeeperInternalCallbacks; import org.apache.bookkeeper.proto.BookkeeperInternalCallbacks.GenericCallback; import org.apache.bookkeeper.test.BookKeeperClusterTestCase; import org.junit.Test; @@ -530,4 +535,50 @@ private void killBookie(List firstEnsemble, BookieId ensemble) killBookie(ensemble); } + @Test + public void testLedgerCheckerDoesNotExceedPermits() throws Exception { + int permits = 100; + + // Atomic counter to track concurrent reads + AtomicInteger concurrentReads = new AtomicInteger(0); + AtomicInteger maxConcurrentReads = new AtomicInteger(0); + + // Mock Bookie Client + BookieClient mockBookieClient = mock(BookieClient.class); + BookieWatcher mockBookieWatcher = mock(BookieWatcher.class); + when(mockBookieWatcher.isBookieUnavailable(any())).thenReturn(false); + + doAnswer(invocation -> { + int now = concurrentReads.incrementAndGet(); + maxConcurrentReads.updateAndGet(prev -> Math.max(prev, now)); + + // Simulate async completion + new Thread(() -> { + try{Thread.sleep(10);} catch (InterruptedException ie){} + int d = concurrentReads.decrementAndGet(); + BookkeeperInternalCallbacks.ReadEntryCallback cb = invocation.getArgument(3); + cb.readEntryComplete(0, invocation.getArgument(1), + invocation.getArgument(2), null, null); + }).start(); + return null; + }).when(mockBookieClient).readEntry(any(BookieId.class), anyLong(), anyLong(), + any(), any(), anyInt()); + + // Create dummy LedgerHandle with enough fragments + LedgerHandle lh = bkc.createLedger(BookKeeper.DigestType.CRC32, TEST_LEDGER_PASSWORD); + startNewBookie(); + for(int i = 0; i < 1000; i++) { + lh.addEntry(TEST_LEDGER_ENTRY_DATA); + } + + LedgerChecker checker = new LedgerChecker(mockBookieClient, mockBookieWatcher, permits); + checker.checkLedger(lh, (rc, result) -> {}, 100); + + // Wait for all async reads to finish + Thread.sleep(2000); + + System.out.println("Max Concurrent reads observed = " + maxConcurrentReads.get()); + assertTrue("Exceeded permit limit!", maxConcurrentReads.get() <= permits); + } + }