diff --git a/kafka-impl/src/main/java/io/streamnative/pulsar/handlers/kop/storage/PulsarTopicProducerStateManagerSnapshotBuffer.java b/kafka-impl/src/main/java/io/streamnative/pulsar/handlers/kop/storage/PulsarTopicProducerStateManagerSnapshotBuffer.java index 98cf532803..3563f01ad6 100644 --- a/kafka-impl/src/main/java/io/streamnative/pulsar/handlers/kop/storage/PulsarTopicProducerStateManagerSnapshotBuffer.java +++ b/kafka-impl/src/main/java/io/streamnative/pulsar/handlers/kop/storage/PulsarTopicProducerStateManagerSnapshotBuffer.java @@ -13,11 +13,14 @@ */ package io.streamnative.pulsar.handlers.kop.storage; +import com.google.common.annotations.VisibleForTesting; +import edu.umd.cs.findbugs.annotations.SuppressFBWarnings; import io.netty.buffer.ByteBuf; import io.netty.buffer.ByteBufInputStream; import io.netty.buffer.ByteBufOutputStream; import io.netty.buffer.Unpooled; import io.streamnative.pulsar.handlers.kop.SystemTopicClient; +import io.streamnative.pulsar.handlers.kop.exceptions.KoPTopicException; import java.io.DataInputStream; import java.io.DataOutputStream; import java.io.IOException; @@ -51,30 +54,73 @@ public class PulsarTopicProducerStateManagerSnapshotBuffer implements ProducerSt private CompletableFuture currentReadHandle; - private synchronized CompletableFuture> ensureReaderHandle() { + @VisibleForTesting + public synchronized CompletableFuture> ensureReaderHandle() { if (reader == null) { - reader = pulsarClient.newReaderBuilder() + CompletableFuture> newReader = pulsarClient.newReaderBuilder() .topic(topic) .startMessageId(MessageId.earliest) .readCompacted(true) .createAsync(); + reader = newReader; + + newReader.whenComplete((r, error) -> { + if (error != null) { + discardReader(newReader); + } + }); } return reader; } - private synchronized CompletableFuture> ensureProducerHandle() { + private synchronized void discardReader(CompletableFuture> oldReader) { + if (reader == oldReader || (reader != null && reader.isCompletedExceptionally())) { + reader = null; + log.info("discard broken reader for {}", topic); + } + } + + @SuppressFBWarnings("NP_NONNULL_PARAM_VIOLATION") + private synchronized void discardReader(Reader oldReader) { + if (reader == null) { + return; + } + if (reader.isCompletedExceptionally() || (reader.isDone() + && !reader.isCompletedExceptionally() + && reader.getNow(null) == oldReader)) { + log.info("discard broken reader for {}", topic); + reader = null; + } + } + + @VisibleForTesting + public synchronized CompletableFuture> ensureProducerHandle() { if (producer == null) { - producer = pulsarClient.newProducerBuilder() + CompletableFuture> newProducer = pulsarClient.newProducerBuilder() .enableBatching(false) .topic(topic) .blockIfQueueFull(true) .createAsync(); + + producer = newProducer; + + newProducer.whenComplete((r, error) -> { + if (error != null) { + discardProducer(newProducer); + } + }); } return producer; } + private synchronized void discardProducer(CompletableFuture> oldProducer) { + if (producer == oldProducer) { + producer = null; + } + } + private CompletableFuture readNextMessageIfAvailable(Reader reader) { - return reader + CompletableFuture result = reader .hasMessageAvailableAsync() .thenCompose(hasMessageAvailable -> { if (hasMessageAvailable == null @@ -88,11 +134,19 @@ private CompletableFuture readNextMessageIfAvailable(Reader re }); } }); + + result.whenComplete((r, error) -> { + if (error != null) { + discardReader(reader); + } + }); + + return result; } private synchronized CompletableFuture ensureLatestData(boolean beforeWrite) { - if (currentReadHandle != null) { + if (currentReadHandle != null && !currentReadHandle.isCompletedExceptionally()) { if (beforeWrite) { // we are inside a write loop, so // we must ensure that we start to read now @@ -110,9 +164,19 @@ private synchronized CompletableFuture ensureLatestData(boolean beforeWrit // please note that the read operation is async, // and it is not execute inside this synchronized block CompletableFuture> readerHandle = ensureReaderHandle(); + if (readerHandle == null) { + return CompletableFuture.failedFuture( + new KoPTopicException("Failed to create reader handle for " + topic)); + } final CompletableFuture newReadHandle = readerHandle.thenCompose(this::readNextMessageIfAvailable); currentReadHandle = newReadHandle; + + newReadHandle.exceptionally(___ -> { + endReadLoop(newReadHandle); + return null; + }); + return newReadHandle.thenApply((__) -> { endReadLoop(newReadHandle); return null; @@ -132,7 +196,12 @@ public CompletableFuture write(ProducerStateManagerSnapshot snapshot) { // cannot serialise, skip return CompletableFuture.completedFuture(null); } - return ensureProducerHandle().thenCompose(opProducer -> { + CompletableFuture> producerFuture = ensureProducerHandle(); + if (producerFuture == null) { + return CompletableFuture.failedFuture( + new KoPTopicException("Failed to create producer handle for " + topic)); + } + return producerFuture.thenCompose(opProducer -> { // nobody can write now to the topic // wait for local cache to be up-to-date return ensureLatestData(true) diff --git a/tests/src/test/java/io/streamnative/pulsar/handlers/kop/storage/PulsarTopicProducerStateManagerSnapshotBufferTest.java b/tests/src/test/java/io/streamnative/pulsar/handlers/kop/storage/PulsarTopicProducerStateManagerSnapshotBufferTest.java index 082c5dc060..2deadce55d 100644 --- a/tests/src/test/java/io/streamnative/pulsar/handlers/kop/storage/PulsarTopicProducerStateManagerSnapshotBufferTest.java +++ b/tests/src/test/java/io/streamnative/pulsar/handlers/kop/storage/PulsarTopicProducerStateManagerSnapshotBufferTest.java @@ -13,17 +13,26 @@ */ package io.streamnative.pulsar.handlers.kop.storage; +import static org.mockito.Mockito.spy; +import static org.mockito.Mockito.when; import static org.testng.Assert.assertEquals; +import static org.testng.Assert.fail; import com.google.common.collect.Lists; import com.google.common.collect.Maps; +import io.streamnative.pulsar.handlers.kop.SystemTopicClient; import java.nio.ByteBuffer; import java.util.List; import java.util.Map; import java.util.Optional; import java.util.TreeMap; import java.util.UUID; +import java.util.concurrent.CompletableFuture; import lombok.extern.slf4j.Slf4j; +import org.apache.pulsar.client.api.Producer; +import org.apache.pulsar.client.api.ProducerBuilder; +import org.apache.pulsar.client.api.Reader; +import org.apache.pulsar.client.api.ReaderBuilder; import org.testng.annotations.Test; /** @@ -69,4 +78,48 @@ public void testSerializeAndDeserialize() { } } + @Test(timeOut = 5_000) + public void ensureReaderHandleCaughtExceptionTest() { + SystemTopicClient sysTopicClient = spy(new SystemTopicClient(pulsar, conf)); + ReaderBuilder readerBuilder = spy(sysTopicClient.newReaderBuilder()); + when(readerBuilder.createAsync()).thenReturn(CompletableFuture.failedFuture(new RuntimeException("inject"))); + when(sysTopicClient.newReaderBuilder()).thenReturn(readerBuilder); + + PulsarTopicProducerStateManagerSnapshotBuffer snapshotBuffer = + new PulsarTopicProducerStateManagerSnapshotBuffer("snapshot-test-topic", sysTopicClient); + CompletableFuture> readerFuture = snapshotBuffer.ensureReaderHandle(); + if (readerFuture != null) { + try { + readerFuture.get(); + fail("should fail"); + } catch (Exception e) { + assertEquals(e.getCause().getMessage(), "inject"); + } + } else { + log.info("This is expected behavior."); + } + } + + @Test(timeOut = 5_000) + public void ensureProducerCaughtExceptionTest() { + SystemTopicClient sysTopicClient = spy(new SystemTopicClient(pulsar, conf)); + ProducerBuilder producerBuilder = spy(sysTopicClient.newProducerBuilder()); + when(producerBuilder.createAsync()).thenReturn(CompletableFuture.failedFuture(new RuntimeException("inject"))); + when(sysTopicClient.newProducerBuilder()).thenReturn(producerBuilder); + + PulsarTopicProducerStateManagerSnapshotBuffer snapshotBuffer = + new PulsarTopicProducerStateManagerSnapshotBuffer("snapshot-test-topic", sysTopicClient); + CompletableFuture> producerFuture = snapshotBuffer.ensureProducerHandle(); + if (producerFuture != null) { + try { + producerFuture.get(); + fail("should fail"); + } catch (Exception e) { + assertEquals(e.getCause().getMessage(), "inject"); + } + } else { + log.info("This is expected behavior."); + } + } + }