diff --git a/hadoop-hdds/framework/pom.xml b/hadoop-hdds/framework/pom.xml
index ea7f08edd82..b4e0a1f3a3a 100644
--- a/hadoop-hdds/framework/pom.xml
+++ b/hadoop-hdds/framework/pom.xml
@@ -132,6 +132,10 @@
org.apache.commons
commons-lang3
+
+ org.apache.commons
+ commons-pool2
+
org.apache.hadoop
hadoop-auth
diff --git a/hadoop-hdds/framework/src/main/java/org/apache/hadoop/hdds/utils/db/RDBStoreAbstractIterator.java b/hadoop-hdds/framework/src/main/java/org/apache/hadoop/hdds/utils/db/RDBStoreAbstractIterator.java
index c6ca7247fdc..16b00aedb8e 100644
--- a/hadoop-hdds/framework/src/main/java/org/apache/hadoop/hdds/utils/db/RDBStoreAbstractIterator.java
+++ b/hadoop-hdds/framework/src/main/java/org/apache/hadoop/hdds/utils/db/RDBStoreAbstractIterator.java
@@ -17,48 +17,63 @@
package org.apache.hadoop.hdds.utils.db;
+import com.google.common.primitives.UnsignedBytes;
+import java.util.Arrays;
import java.util.NoSuchElementException;
import java.util.function.Consumer;
+import org.apache.hadoop.hdds.utils.db.managed.ManagedReadOptions;
import org.apache.hadoop.hdds.utils.db.managed.ManagedRocksIterator;
+import org.apache.ratis.util.function.CheckedFunction;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
/**
* An abstract {@link Table.KeyValueIterator} to iterate raw {@link Table.KeyValue}s.
- *
+ * NOTE: This class only works with RocksDB when comparator is set to Rocksdb's ByteWiseComparator.
* @param the raw type.
*/
-abstract class RDBStoreAbstractIterator
- implements Table.KeyValueIterator {
+abstract class RDBStoreAbstractIterator> implements TableIterator {
private static final Logger LOG =
LoggerFactory.getLogger(RDBStoreAbstractIterator.class);
+ private final ManagedReadOptions readOptions;
private final ManagedRocksIterator rocksDBIterator;
private final RDBTable rocksDBTable;
- private Table.KeyValue currentEntry;
- // This is for schemas that use a fixed-length
- // prefix for each key.
- private final RAW prefix;
+ private KV currentEntry;
private final IteratorType type;
- RDBStoreAbstractIterator(ManagedRocksIterator iterator, RDBTable table, RAW prefix, IteratorType type) {
- this.rocksDBIterator = iterator;
+ RDBStoreAbstractIterator(
+ CheckedFunction itrSupplier, RDBTable table,
+ byte[] prefix, IteratorType type) throws RocksDatabaseException {
+ this.readOptions = new ManagedReadOptions(false, prefix, getNextHigherPrefix(prefix));
+ this.rocksDBIterator = itrSupplier.apply(readOptions);
this.rocksDBTable = table;
- this.prefix = prefix;
this.type = type;
}
+ private byte[] getNextHigherPrefix(byte[] prefix) {
+ if (prefix == null) {
+ return null;
+ }
+ for (int i = prefix.length - 1; i >= 0; i--) {
+ if (UnsignedBytes.compare(prefix[i], UnsignedBytes.MAX_VALUE) != 0) {
+ byte[] nextHigher = Arrays.copyOf(prefix, i + 1);
+ nextHigher[i] = (byte) (prefix[i] + 1);
+ return nextHigher;
+ }
+ }
+ // No higher prefix exists since all bytes are MAX_VALUE.
+ return null;
+ }
+
IteratorType getType() {
return type;
}
- /** @return the key for the current entry. */
- abstract RAW key();
-
/** @return the {@link Table.KeyValue} for the current entry. */
- abstract Table.KeyValue getKeyValue();
+ abstract KV getKeyValue();
/** Seek to the given key. */
abstract void seek0(RAW key);
@@ -66,9 +81,6 @@ IteratorType getType() {
/** Delete the given key. */
abstract void delete(RAW key) throws RocksDatabaseException;
- /** Does the given key start with the prefix? */
- abstract boolean startsWithPrefix(RAW key);
-
final ManagedRocksIterator getRocksDBIterator() {
return rocksDBIterator;
}
@@ -77,13 +89,9 @@ final RDBTable getRocksDBTable() {
return rocksDBTable;
}
- final RAW getPrefix() {
- return prefix;
- }
-
@Override
public final void forEachRemaining(
- Consumer super Table.KeyValue> action) {
+ Consumer super KV> action) {
while (hasNext()) {
action.accept(next());
}
@@ -99,12 +107,11 @@ private void setCurrentEntry() {
@Override
public final boolean hasNext() {
- return rocksDBIterator.get().isValid() &&
- (prefix == null || startsWithPrefix(key()));
+ return rocksDBIterator.get().isValid();
}
@Override
- public final Table.KeyValue next() {
+ public final KV next() {
setCurrentEntry();
if (currentEntry != null) {
rocksDBIterator.get().next();
@@ -115,26 +122,16 @@ public final Table.KeyValue next() {
@Override
public final void seekToFirst() {
- if (prefix == null) {
- rocksDBIterator.get().seekToFirst();
- } else {
- seek0(prefix);
- }
- setCurrentEntry();
+ rocksDBIterator.get().seekToFirst();
}
@Override
public final void seekToLast() {
- if (prefix == null) {
- rocksDBIterator.get().seekToLast();
- } else {
- throw new UnsupportedOperationException("seekToLast: prefix != null");
- }
- setCurrentEntry();
+ rocksDBIterator.get().seekToLast();
}
@Override
- public final Table.KeyValue seek(RAW key) {
+ public final KV seek(RAW key) {
seek0(key);
setCurrentEntry();
return currentEntry;
@@ -155,5 +152,6 @@ public final void removeFromDB() throws RocksDatabaseException, CodecException {
@Override
public void close() {
rocksDBIterator.close();
+ readOptions.close();
}
}
diff --git a/hadoop-hdds/framework/src/main/java/org/apache/hadoop/hdds/utils/db/RDBStoreByteArrayIterator.java b/hadoop-hdds/framework/src/main/java/org/apache/hadoop/hdds/utils/db/RDBStoreByteArrayIterator.java
index 67593f744e3..441659b3766 100644
--- a/hadoop-hdds/framework/src/main/java/org/apache/hadoop/hdds/utils/db/RDBStoreByteArrayIterator.java
+++ b/hadoop-hdds/framework/src/main/java/org/apache/hadoop/hdds/utils/db/RDBStoreByteArrayIterator.java
@@ -17,30 +17,27 @@
package org.apache.hadoop.hdds.utils.db;
-import java.util.Arrays;
+import org.apache.hadoop.hdds.utils.db.Table.KeyValue;
+import org.apache.hadoop.hdds.utils.db.Table.KeyValueIterator;
+import org.apache.hadoop.hdds.utils.db.managed.ManagedReadOptions;
import org.apache.hadoop.hdds.utils.db.managed.ManagedRocksIterator;
+import org.apache.ratis.util.function.CheckedFunction;
/**
* RocksDB store iterator using the byte[] API.
*/
-class RDBStoreByteArrayIterator extends RDBStoreAbstractIterator {
- private static byte[] copyPrefix(byte[] prefix) {
- return prefix == null || prefix.length == 0 ? null : Arrays.copyOf(prefix, prefix.length);
- }
+class RDBStoreByteArrayIterator extends RDBStoreAbstractIterator>
+ implements KeyValueIterator {
- RDBStoreByteArrayIterator(ManagedRocksIterator iterator,
- RDBTable table, byte[] prefix, IteratorType type) {
- super(iterator, table, copyPrefix(prefix), type);
+ RDBStoreByteArrayIterator(
+ CheckedFunction itrSupplier,
+ RDBTable table, byte[] prefix, IteratorType type) throws RocksDatabaseException {
+ super(itrSupplier, table, prefix, type);
seekToFirst();
}
@Override
- byte[] key() {
- return getRocksDBIterator().get().key();
- }
-
- @Override
- Table.KeyValue getKeyValue() {
+ KeyValue getKeyValue() {
final ManagedRocksIterator i = getRocksDBIterator();
final byte[] key = getType().readKey() ? i.get().key() : null;
final byte[] value = getType().readValue() ? i.get().value() : null;
@@ -56,27 +53,4 @@ void seek0(byte[] key) {
void delete(byte[] key) throws RocksDatabaseException {
getRocksDBTable().delete(key);
}
-
- @Override
- boolean startsWithPrefix(byte[] value) {
- final byte[] prefix = getPrefix();
- if (prefix == null) {
- return true;
- }
- if (value == null) {
- return false;
- }
-
- int length = prefix.length;
- if (value.length < length) {
- return false;
- }
-
- for (int i = 0; i < length; i++) {
- if (value[i] != prefix[i]) {
- return false;
- }
- }
- return true;
- }
}
diff --git a/hadoop-hdds/framework/src/main/java/org/apache/hadoop/hdds/utils/db/RDBStoreCodecBufferIterator.java b/hadoop-hdds/framework/src/main/java/org/apache/hadoop/hdds/utils/db/RDBStoreCodecBufferIterator.java
index aa703249ebe..0b87916f327 100644
--- a/hadoop-hdds/framework/src/main/java/org/apache/hadoop/hdds/utils/db/RDBStoreCodecBufferIterator.java
+++ b/hadoop-hdds/framework/src/main/java/org/apache/hadoop/hdds/utils/db/RDBStoreCodecBufferIterator.java
@@ -17,29 +17,33 @@
package org.apache.hadoop.hdds.utils.db;
-import java.util.Optional;
import java.util.concurrent.atomic.AtomicBoolean;
+import org.apache.hadoop.hdds.utils.db.Table.KeyValue;
+import org.apache.hadoop.hdds.utils.db.Table.KeyValueIterator;
+import org.apache.hadoop.hdds.utils.db.managed.ManagedReadOptions;
import org.apache.hadoop.hdds.utils.db.managed.ManagedRocksIterator;
import org.apache.ratis.util.Preconditions;
+import org.apache.ratis.util.function.CheckedFunction;
/**
* Implement {@link RDBStoreAbstractIterator} using {@link CodecBuffer}.
*/
-class RDBStoreCodecBufferIterator extends RDBStoreAbstractIterator {
+class RDBStoreCodecBufferIterator extends RDBStoreAbstractIterator>
+ implements KeyValueIterator {
private final Buffer keyBuffer;
private final Buffer valueBuffer;
private final AtomicBoolean closed = new AtomicBoolean();
- RDBStoreCodecBufferIterator(ManagedRocksIterator iterator, RDBTable table,
- CodecBuffer prefix, IteratorType type) {
- super(iterator, table, prefix, type);
+ RDBStoreCodecBufferIterator(
+ CheckedFunction itrSupplier, RDBTable table,
+ byte[] prefix, IteratorType type) throws RocksDatabaseException {
+ super(itrSupplier, table, prefix, type);
final String name = table != null ? table.getName() : null;
this.keyBuffer = new Buffer(
new CodecBuffer.Capacity(name + "-iterator-key", 1 << 10),
- // it has to read key for matching prefix.
- getType().readKey() || prefix != null ? buffer -> getRocksDBIterator().get().key(buffer) : null);
+ getType().readKey() ? buffer -> getRocksDBIterator().get().key(buffer) : null);
this.valueBuffer = new Buffer(
new CodecBuffer.Capacity(name + "-iterator-value", 4 << 10),
getType().readValue() ? buffer -> getRocksDBIterator().get().value(buffer) : null);
@@ -51,16 +55,9 @@ void assertOpen() {
}
@Override
- CodecBuffer key() {
+ KeyValue getKeyValue() {
assertOpen();
- return keyBuffer.getFromDb();
- }
-
- @Override
- Table.KeyValue getKeyValue() {
- assertOpen();
- final CodecBuffer key = getType().readKey() ? key() : null;
- return Table.newKeyValue(key, valueBuffer.getFromDb());
+ return Table.newKeyValue(keyBuffer.getFromDb(), valueBuffer.getFromDb());
}
@Override
@@ -75,24 +72,10 @@ void delete(CodecBuffer key) throws RocksDatabaseException {
getRocksDBTable().delete(key.asReadOnlyByteBuffer());
}
- @Override
- boolean startsWithPrefix(CodecBuffer key) {
- assertOpen();
- final CodecBuffer prefix = getPrefix();
- if (prefix == null) {
- return true;
- }
- if (key == null) {
- return false;
- }
- return key.startsWith(prefix);
- }
-
@Override
public void close() {
if (closed.compareAndSet(false, true)) {
super.close();
- Optional.ofNullable(getPrefix()).ifPresent(CodecBuffer::release);
keyBuffer.release();
valueBuffer.release();
}
diff --git a/hadoop-hdds/framework/src/main/java/org/apache/hadoop/hdds/utils/db/RDBStorePoolBackedCodecBufferIterator.java b/hadoop-hdds/framework/src/main/java/org/apache/hadoop/hdds/utils/db/RDBStorePoolBackedCodecBufferIterator.java
new file mode 100644
index 00000000000..8aec0bea7c6
--- /dev/null
+++ b/hadoop-hdds/framework/src/main/java/org/apache/hadoop/hdds/utils/db/RDBStorePoolBackedCodecBufferIterator.java
@@ -0,0 +1,131 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.hdds.utils.db;
+
+import java.util.concurrent.atomic.AtomicBoolean;
+import org.apache.commons.pool2.BasePooledObjectFactory;
+import org.apache.commons.pool2.PooledObject;
+import org.apache.commons.pool2.impl.DefaultPooledObject;
+import org.apache.commons.pool2.impl.GenericObjectPool;
+import org.apache.commons.pool2.impl.GenericObjectPoolConfig;
+import org.apache.hadoop.hdds.utils.db.Table.CloseableKeyValue;
+import org.apache.hadoop.hdds.utils.db.Table.CloseableKeyValueIterator;
+import org.apache.hadoop.hdds.utils.db.managed.ManagedReadOptions;
+import org.apache.hadoop.hdds.utils.db.managed.ManagedRocksIterator;
+import org.apache.ratis.util.function.CheckedFunction;
+
+/**
+ * A concrete implementation of {@link RDBStoreAbstractIterator} that provides an iterator
+ * for {@link CodecBuffer} keys and values managed within a RocksDB store. The iterator
+ * leverages an object pool to manage reusable {@link CloseableKeyValue} objects,
+ * enabling efficient memory and resource management by reusing buffers for keys and values
+ * during iteration.
+ *
+ * This iterator supports operations such as seeking to a specific key, retrieving key-value
+ * pairs from the database, and removing entries from the database, while encapsulating
+ * all RocksDB-specific logic internally.
+ */
+class RDBStorePoolBackedCodecBufferIterator extends RDBStoreAbstractIterator> implements CloseableKeyValueIterator {
+
+ private final GenericObjectPool> kvBufferPool;
+
+ RDBStorePoolBackedCodecBufferIterator(
+ CheckedFunction itrSupplier, RDBTable table,
+ byte[] prefix, IteratorType type, int maxNumberOfObjectsNeededConcurrently)
+ throws RocksDatabaseException {
+ super(itrSupplier, table, prefix, type);
+ GenericObjectPoolConfig> config = new GenericObjectPoolConfig<>();
+ config.setMaxTotal(Math.max(maxNumberOfObjectsNeededConcurrently, 1));
+ config.setBlockWhenExhausted(true);
+
+ final String name = table != null ? table.getName() : null;
+ this.kvBufferPool = new GenericObjectPool<>(new KeyValueBufferFactory(name), config);
+ seekToFirst();
+ }
+
+ private final class KeyValueBufferFactory extends BasePooledObjectFactory> {
+
+ private final String iteratorName;
+
+ private KeyValueBufferFactory(String iteratorName) {
+ this.iteratorName = iteratorName;
+ }
+
+ @Override
+ public CloseableKeyValue create() {
+ Buffer keyBuffer = new Buffer(
+ new CodecBuffer.Capacity(iteratorName + "-iterator-key", 1 << 10),
+ getType().readKey() ? buffer -> getRocksDBIterator().get().key(buffer) : null);
+ Buffer valueBuffer = new Buffer(
+ new CodecBuffer.Capacity(iteratorName + "-iterator-value", 4 << 10),
+ getType().readValue() ? buffer -> getRocksDBIterator().get().value(buffer) : null);
+ Runnable closeAction = () -> {
+ keyBuffer.release();
+ valueBuffer.release();
+ };
+ return Table.newCloseableKeyValue(keyBuffer, valueBuffer, closeAction);
+ }
+
+ @Override
+ public void destroyObject(PooledObject> p) {
+ p.getObject().close();
+ }
+
+ @Override
+ public PooledObject> wrap(
+ CloseableKeyValue bufferBufferCloseableKeyValue) {
+ return new DefaultPooledObject<>(bufferBufferCloseableKeyValue);
+ }
+ }
+
+ @Override
+ Table.CloseableKeyValue getKeyValue() {
+ try {
+ AtomicBoolean closed = new AtomicBoolean(false);
+ CloseableKeyValue kvBuffers = kvBufferPool.borrowObject();
+ return Table.newCloseableKeyValue(kvBuffers.getKey().getFromDb(), kvBuffers.getValue().getFromDb(),
+ () -> {
+ // To handle multiple close calls.
+ if (closed.compareAndSet(false, true)) {
+ kvBufferPool.returnObject(kvBuffers);
+ }
+ });
+ } catch (Exception e) {
+ // Ideally, this should never happen since we have a generic object pool where Buffers would always be created
+ // and the wait for object borrow never times out.
+ throw new IllegalStateException("Failed to borrow key/value buffer from pool", e);
+ }
+ }
+
+ @Override
+ void seek0(CodecBuffer key) {
+ getRocksDBIterator().get().seek(key.asReadOnlyByteBuffer());
+ }
+
+ @Override
+ void delete(CodecBuffer key) throws RocksDatabaseException {
+ getRocksDBTable().delete(key.asReadOnlyByteBuffer());
+ }
+
+ @Override
+ public void close() {
+ super.close();
+ this.kvBufferPool.close();
+ }
+}
diff --git a/hadoop-hdds/framework/src/main/java/org/apache/hadoop/hdds/utils/db/RDBTable.java b/hadoop-hdds/framework/src/main/java/org/apache/hadoop/hdds/utils/db/RDBTable.java
index f732735cbe3..1d4f3d4b6c2 100644
--- a/hadoop-hdds/framework/src/main/java/org/apache/hadoop/hdds/utils/db/RDBTable.java
+++ b/hadoop-hdds/framework/src/main/java/org/apache/hadoop/hdds/utils/db/RDBTable.java
@@ -200,22 +200,27 @@ public void deleteWithBatch(BatchOperation batch, byte[] key) {
} else {
throw new IllegalArgumentException("batch should be RDBBatchOperation");
}
-
}
@Override
public KeyValueIterator iterator(byte[] prefix, IteratorType type)
throws RocksDatabaseException {
- return new RDBStoreByteArrayIterator(db.newIterator(family, false), this,
- prefix, type);
+ return new RDBStoreByteArrayIterator(readOptions -> db.newIterator(family, readOptions),
+ this, prefix, type);
}
- KeyValueIterator iterator(
- CodecBuffer prefix, IteratorType type) throws RocksDatabaseException {
- return new RDBStoreCodecBufferIterator(db.newIterator(family, false),
+ KeyValueIterator newCodecBufferIterator(
+ byte[] prefix, IteratorType type) throws RocksDatabaseException {
+ return new RDBStoreCodecBufferIterator(readOptions -> db.newIterator(family, readOptions),
this, prefix, type);
}
+ CloseableKeyValueIterator newCloseableCodecBufferIterator(
+ byte[] prefix, IteratorType type, int numberOfObjectsNeededConcurrently) throws RocksDatabaseException {
+ return new RDBStorePoolBackedCodecBufferIterator(readOptions -> db.newIterator(family, readOptions),
+ this, prefix, type, numberOfObjectsNeededConcurrently);
+ }
+
@Override
public String getName() {
return family.getName();
@@ -237,26 +242,13 @@ public void deleteBatchWithPrefix(BatchOperation batch, byte[] prefix)
}
@Override
- public void dumpToFileWithPrefix(File externalFile, byte[] prefix)
- throws RocksDatabaseException, CodecException {
- CodecBuffer prefixBuffer = prefix == null || prefix.length == 0 ? null :
- CodecBufferCodec.get(true).fromPersistedFormat(prefix);
- KeyValueIterator iter;
- try {
- iter = iterator(prefixBuffer, IteratorType.KEY_AND_VALUE);
- } catch (RocksDatabaseException e) {
- if (prefixBuffer != null) {
- prefixBuffer.close();
- }
- throw e;
- }
- try (RDBSstFileWriter fileWriter = new RDBSstFileWriter(externalFile)) {
+ public void dumpToFileWithPrefix(File externalFile, byte[] prefix) throws RocksDatabaseException {
+ try (KeyValueIterator iter = newCodecBufferIterator(prefix, IteratorType.KEY_AND_VALUE);
+ RDBSstFileWriter fileWriter = new RDBSstFileWriter(externalFile)) {
while (iter.hasNext()) {
final KeyValue entry = iter.next();
fileWriter.put(entry.getKey(), entry.getValue());
}
- } finally {
- iter.close();
}
}
diff --git a/hadoop-hdds/framework/src/main/java/org/apache/hadoop/hdds/utils/db/RocksDatabase.java b/hadoop-hdds/framework/src/main/java/org/apache/hadoop/hdds/utils/db/RocksDatabase.java
index 659954a861b..5abc933d54b 100644
--- a/hadoop-hdds/framework/src/main/java/org/apache/hadoop/hdds/utils/db/RocksDatabase.java
+++ b/hadoop-hdds/framework/src/main/java/org/apache/hadoop/hdds/utils/db/RocksDatabase.java
@@ -790,10 +790,8 @@ public ManagedRocksIterator newIterator(ColumnFamily family)
}
public ManagedRocksIterator newIterator(ColumnFamily family,
- boolean fillCache) throws RocksDatabaseException {
- try (UncheckedAutoCloseable ignored = acquire();
- ManagedReadOptions readOptions = new ManagedReadOptions()) {
- readOptions.setFillCache(fillCache);
+ ManagedReadOptions readOptions) throws RocksDatabaseException {
+ try (UncheckedAutoCloseable ignored = acquire()) {
return managed(db.get().newIterator(family.getHandle(), readOptions));
}
}
diff --git a/hadoop-hdds/framework/src/main/java/org/apache/hadoop/hdds/utils/db/Table.java b/hadoop-hdds/framework/src/main/java/org/apache/hadoop/hdds/utils/db/Table.java
index fc049034406..9be0cd19de2 100644
--- a/hadoop-hdds/framework/src/main/java/org/apache/hadoop/hdds/utils/db/Table.java
+++ b/hadoop-hdds/framework/src/main/java/org/apache/hadoop/hdds/utils/db/Table.java
@@ -332,7 +332,7 @@ default List> getRangeKVs(KEY startKey, int count, KEY pref
/**
* Class used to represent the key and value pair of a db entry.
*/
- final class KeyValue {
+ class KeyValue {
private final K key;
private final V value;
private final int valueByteSize;
@@ -379,6 +379,20 @@ public int hashCode() {
}
}
+ /**
+ * Class used to represent the key and value pair of a db entry which also supports closeable function to cleanup
+ * resources associated with the entry.
+ */
+ abstract class CloseableKeyValue extends KeyValue implements AutoCloseable {
+
+ private CloseableKeyValue(K key, V value, int valueByteSize) {
+ super(key, value, valueByteSize);
+ }
+
+ @Override
+ public abstract void close();
+ }
+
static KeyValue newKeyValue(K key, V value) {
return newKeyValue(key, value, -1);
}
@@ -387,9 +401,27 @@ static KeyValue newKeyValue(K key, V value, int valueByteSize) {
return new KeyValue<>(key, value, valueByteSize);
}
+ static CloseableKeyValue newCloseableKeyValue(K key, V value, int valueByteSize, Runnable closeAction) {
+ return new CloseableKeyValue(key, value, valueByteSize) {
+ @Override
+ public void close() {
+ closeAction.run();
+ }
+ };
+ }
+
+ static CloseableKeyValue newCloseableKeyValue(K key, V value, Runnable closeAction) {
+ return newCloseableKeyValue(key, value, -1, closeAction);
+ }
+
/** A {@link TableIterator} to iterate {@link KeyValue}s. */
interface KeyValueIterator
extends TableIterator> {
}
+
+ /** A {@link TableIterator} to iterate {@link CloseableKeyValue}s. */
+ interface CloseableKeyValueIterator extends TableIterator> {
+
+ }
}
diff --git a/hadoop-hdds/framework/src/main/java/org/apache/hadoop/hdds/utils/db/TypedTable.java b/hadoop-hdds/framework/src/main/java/org/apache/hadoop/hdds/utils/db/TypedTable.java
index 8000d48c618..6f8d859d98d 100644
--- a/hadoop-hdds/framework/src/main/java/org/apache/hadoop/hdds/utils/db/TypedTable.java
+++ b/hadoop-hdds/framework/src/main/java/org/apache/hadoop/hdds/utils/db/TypedTable.java
@@ -388,10 +388,10 @@ public void deleteRange(KEY beginKey, KEY endKey) throws RocksDatabaseException,
@Override
public KeyValueIterator iterator(KEY prefix, IteratorType type)
throws RocksDatabaseException, CodecException {
+ final byte[] prefixBytes = encodeKey(prefix);
if (supportCodecBuffer) {
- return newCodecBufferTableIterator(prefix, type);
+ return newCodecBufferTableIterator(rawTable.newCodecBufferIterator(prefixBytes, type));
} else {
- final byte[] prefixBytes = encodeKey(prefix);
return new TypedTableIterator(rawTable.iterator(prefixBytes, type));
}
}
@@ -481,27 +481,6 @@ TableCache getCache() {
return cache;
}
- private RawIterator newCodecBufferTableIterator(KEY prefix, IteratorType type)
- throws RocksDatabaseException, CodecException {
- final CodecBuffer encoded = encodeKeyCodecBuffer(prefix);
- final CodecBuffer prefixBuffer;
- if (encoded != null && encoded.readableBytes() == 0) {
- encoded.release();
- prefixBuffer = null;
- } else {
- prefixBuffer = encoded;
- }
-
- try {
- return newCodecBufferTableIterator(rawTable.iterator(prefixBuffer, type));
- } catch (Throwable t) {
- if (prefixBuffer != null) {
- prefixBuffer.release();
- }
- throw t;
- }
- }
-
private RawIterator newCodecBufferTableIterator(KeyValueIterator i) {
return new RawIterator(i) {
@Override
diff --git a/hadoop-hdds/framework/src/test/java/org/apache/hadoop/hdds/utils/db/TestRDBStoreAbstractIterator.java b/hadoop-hdds/framework/src/test/java/org/apache/hadoop/hdds/utils/db/TestRDBStoreAbstractIterator.java
new file mode 100644
index 00000000000..2b9f806374e
--- /dev/null
+++ b/hadoop-hdds/framework/src/test/java/org/apache/hadoop/hdds/utils/db/TestRDBStoreAbstractIterator.java
@@ -0,0 +1,168 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.hdds.utils.db;
+
+import static org.junit.jupiter.api.Assertions.assertFalse;
+import static org.junit.jupiter.api.Assertions.assertTrue;
+import static org.mockito.Mockito.clearInvocations;
+import static org.mockito.Mockito.mock;
+import static org.mockito.Mockito.times;
+import static org.mockito.Mockito.verify;
+import static org.mockito.Mockito.when;
+
+import java.util.Arrays;
+import java.util.Optional;
+import java.util.stream.Stream;
+import org.apache.commons.lang3.RandomUtils;
+import org.apache.hadoop.hdds.utils.db.managed.ManagedReadOptions;
+import org.apache.hadoop.hdds.utils.db.managed.ManagedRocksIterator;
+import org.apache.hadoop.hdds.utils.db.managed.ManagedRocksObjectUtils;
+import org.apache.hadoop.hdds.utils.db.managed.ManagedSlice;
+import org.apache.log4j.Level;
+import org.apache.log4j.Logger;
+import org.apache.ratis.util.function.CheckedFunction;
+import org.junit.jupiter.api.Assertions;
+import org.junit.jupiter.api.BeforeAll;
+import org.junit.jupiter.api.BeforeEach;
+import org.junit.jupiter.api.TestInstance;
+import org.junit.jupiter.params.ParameterizedTest;
+import org.junit.jupiter.params.provider.Arguments;
+import org.junit.jupiter.params.provider.MethodSource;
+import org.rocksdb.RocksIterator;
+
+/**
+ * Abstract class for testing RDBStoreAbstractIterator.
+ */
+@TestInstance(TestInstance.Lifecycle.PER_CLASS)
+public abstract class TestRDBStoreAbstractIterator> {
+
+ private RocksIterator rocksDBIteratorMock;
+ private CheckedFunction itrInitializer;
+ private RDBTable rocksTableMock;
+ private ManagedReadOptions readOptions;
+
+ public RDBTable getRocksTableMock() {
+ return rocksTableMock;
+ }
+
+ public RocksIterator getRocksDBIteratorMock() {
+ return rocksDBIteratorMock;
+ }
+
+ public CheckedFunction getItrInitializer() {
+ return itrInitializer;
+ }
+
+ @BeforeAll
+ public static void init() {
+ ManagedRocksObjectUtils.loadRocksDBLibrary();
+ }
+
+ @BeforeEach
+ public void setup() {
+ rocksDBIteratorMock = mock(RocksIterator.class);
+ itrInitializer = readOpts -> {
+ this.readOptions = readOpts;
+ return new ManagedRocksIterator(rocksDBIteratorMock);
+ };
+ rocksTableMock = mock(RDBTable.class);
+ Logger.getLogger(ManagedRocksObjectUtils.class).setLevel(Level.DEBUG);
+ }
+
+ abstract T newIterator() throws RocksDatabaseException;
+
+ abstract T newIterator(byte[] prefix) throws RocksDatabaseException;
+
+ public static Stream prefixTestArgumentsProvider() {
+ byte[] randomBytes = RandomUtils.nextBytes(100);
+ return Stream.of(
+ Arguments.of("Empty prefix", new byte[0], null),
+ Arguments.of("null prefix", null, null),
+ Arguments.of("Prefix with 0", getArrayFilledWithValue(0, 1), getArrayFilledWithValue(1, 1)),
+ Arguments.of("Prefix with 0 with 100 times", getArrayFilledWithValue(0, 100),
+ getLastByteIncreased(getArrayFilledWithValue(0, 100))),
+ Arguments.of("Prefix with 0xFF", getArrayFilledWithValue(0xFF, 1), null),
+ Arguments.of("Prefix with 0xFF with 100 times", getArrayFilledWithValue(0xFF, 100), null),
+ Arguments.of("Prefix with random bytes", randomBytes,
+ getLastByteIncreased(Arrays.copyOf(randomBytes, randomBytes.length))),
+ Arguments.of("Prefix with 0xFF prefixed 100 times and 0 at end",
+ getArrayFilledWithValueAndPrefix(0, 1, getArrayFilledWithValue(0xFF, 100)),
+ getArrayFilledWithValueAndPrefix(1, 1, getArrayFilledWithValue(0xFF, 100))),
+ Arguments.of("Prefix with 0xFF prefixed 100 times and 50 at end",
+ getArrayFilledWithValueAndPrefix(50, 1, getArrayFilledWithValue(0xFF, 100)),
+ getArrayFilledWithValueAndPrefix(51, 1, getArrayFilledWithValue(0xFF, 100))),
+ Arguments.of("Prefix with 0xFF prefixed 100 times and value 0xFE at end",
+ getArrayFilledWithValueAndPrefix(0xFE, 1, getArrayFilledWithValue(0xFF, 100)),
+ getArrayFilledWithValue(0xFF, 101)),
+ Arguments.of("Prefix with 0 prefix 100 times and 100 0xFF at end",
+ getArrayFilledWithValueAndPrefix(0xFF, 100, getArrayFilledWithValue(0, 100)),
+ getLastByteIncreased(getArrayFilledWithValue(0, 100))),
+ Arguments.of("Prefix with 50 prefix 100 times and 100 0xFF at end",
+ getArrayFilledWithValueAndPrefix(0xFF, 100, getArrayFilledWithValue(50, 100)),
+ getLastByteIncreased(getArrayFilledWithValue(50, 100))),
+ Arguments.of("Prefix with 0xFE prefix 100 times and 100 0xFF at end",
+ getArrayFilledWithValueAndPrefix(0xFF, 100, getArrayFilledWithValue(0xFE, 100)),
+ getLastByteIncreased(getArrayFilledWithValue(0xFE, 100)))
+ );
+ }
+
+ private static byte[] getArrayFilledWithValue(int value, int length) {
+ return getArrayFilledWithValueAndPrefix(value, length, null);
+ }
+
+ private static byte[] getArrayFilledWithValueAndPrefix(int value, int length, byte[] prefix) {
+ byte[] array = new byte[length + (prefix == null ? 0 : prefix.length)];
+ if (prefix != null) {
+ System.arraycopy(prefix, 0, array, 0, prefix.length);
+ }
+ Arrays.fill(array, prefix == null ? 0 : prefix.length, array.length, (byte) value);
+ return array;
+ }
+
+ private static byte[] getLastByteIncreased(byte[] arr) {
+ arr[arr.length - 1]++;
+ return arr;
+ }
+
+ @ParameterizedTest(name = "{0}")
+ @MethodSource("prefixTestArgumentsProvider")
+ public void testNormalPrefixedIterator(String name, byte[] prefix, byte[] expectedUpperBound) throws Exception {
+ try (T itr = newIterator(prefix);
+ ManagedSlice lowerBound = prefix == null ? null : new ManagedSlice(prefix);
+ ManagedSlice upperBound = expectedUpperBound == null ? null : new ManagedSlice(expectedUpperBound)) {
+ Assertions.assertEquals(this.readOptions.getLowerBound(), lowerBound);
+ Assertions.assertEquals(this.readOptions.getUpperBound(), upperBound);
+ verify(rocksDBIteratorMock, times(1)).seekToFirst();
+ clearInvocations(rocksDBIteratorMock);
+ itr.seekToFirst();
+ verify(rocksDBIteratorMock, times(1)).seekToFirst();
+ clearInvocations(rocksDBIteratorMock);
+
+ when(rocksDBIteratorMock.isValid()).thenReturn(true);
+ when(rocksDBIteratorMock.key()).thenReturn(prefix);
+ assertTrue(itr.hasNext());
+ verify(rocksDBIteratorMock, times(1)).isValid();
+ verify(rocksDBIteratorMock, times(0)).key();
+ itr.seekToLast();
+ verify(rocksDBIteratorMock, times(1)).seekToLast();
+ }
+ assertFalse(readOptions.isOwningHandle());
+ assertFalse(Optional.ofNullable(readOptions.getLowerBound()).map(ManagedSlice::isOwningHandle).orElse(false));
+ assertFalse(Optional.ofNullable(readOptions.getUpperBound()).map(ManagedSlice::isOwningHandle).orElse(false));
+ }
+}
diff --git a/hadoop-hdds/framework/src/test/java/org/apache/hadoop/hdds/utils/db/TestRDBStoreByteArrayIterator.java b/hadoop-hdds/framework/src/test/java/org/apache/hadoop/hdds/utils/db/TestRDBStoreByteArrayIterator.java
index 136990522b7..8eb28842892 100644
--- a/hadoop-hdds/framework/src/test/java/org/apache/hadoop/hdds/utils/db/TestRDBStoreByteArrayIterator.java
+++ b/hadoop-hdds/framework/src/test/java/org/apache/hadoop/hdds/utils/db/TestRDBStoreByteArrayIterator.java
@@ -23,7 +23,6 @@
import static org.apache.hadoop.hdds.utils.db.IteratorType.VALUE_ONLY;
import static org.junit.jupiter.api.Assertions.assertArrayEquals;
import static org.junit.jupiter.api.Assertions.assertFalse;
-import static org.junit.jupiter.api.Assertions.assertInstanceOf;
import static org.junit.jupiter.api.Assertions.assertThrows;
import static org.junit.jupiter.api.Assertions.assertTrue;
import static org.mockito.ArgumentCaptor.forClass;
@@ -37,18 +36,11 @@
import static org.mockito.Mockito.when;
import java.io.IOException;
-import java.nio.charset.StandardCharsets;
import java.util.NoSuchElementException;
import java.util.function.Consumer;
-import org.apache.hadoop.hdds.utils.db.managed.ManagedRocksIterator;
-import org.apache.hadoop.hdds.utils.db.managed.ManagedRocksObjectUtils;
-import org.apache.log4j.Level;
-import org.apache.log4j.Logger;
-import org.junit.jupiter.api.BeforeEach;
import org.junit.jupiter.api.Test;
import org.mockito.ArgumentCaptor;
import org.mockito.InOrder;
-import org.rocksdb.RocksIterator;
/**
* This test prescribe expected behaviour
@@ -57,38 +49,28 @@
* RDBStoreIterator to provide iteration over table elements in a typed manner.
* The tests are to ensure we access RocksDB via the iterator properly.
*/
-public class TestRDBStoreByteArrayIterator {
-
- private RocksIterator rocksDBIteratorMock;
- private ManagedRocksIterator managedRocksIterator;
- private RDBTable rocksTableMock;
-
- @BeforeEach
- public void setup() {
- rocksDBIteratorMock = mock(RocksIterator.class);
- managedRocksIterator = new ManagedRocksIterator(rocksDBIteratorMock);
- rocksTableMock = mock(RDBTable.class);
- Logger.getLogger(ManagedRocksObjectUtils.class).setLevel(Level.DEBUG);
- }
+public class TestRDBStoreByteArrayIterator extends TestRDBStoreAbstractIterator {
- RDBStoreByteArrayIterator newIterator() {
- return new RDBStoreByteArrayIterator(managedRocksIterator, null, null, KEY_AND_VALUE);
+ @Override
+ RDBStoreByteArrayIterator newIterator() throws RocksDatabaseException {
+ return new RDBStoreByteArrayIterator(getItrInitializer(), null, null, KEY_AND_VALUE);
}
- RDBStoreByteArrayIterator newIterator(byte[] prefix) {
- return new RDBStoreByteArrayIterator(managedRocksIterator, rocksTableMock, prefix, KEY_AND_VALUE);
+ @Override
+ RDBStoreByteArrayIterator newIterator(byte[] prefix) throws RocksDatabaseException {
+ return new RDBStoreByteArrayIterator(getItrInitializer(), getRocksTableMock(), prefix, KEY_AND_VALUE);
}
@Test
- public void testForeachRemainingCallsConsumerWithAllElements() {
- when(rocksDBIteratorMock.isValid())
- .thenReturn(true, true, true, true, true, true, true, false);
- when(rocksDBIteratorMock.key())
- .thenReturn(new byte[]{0x00}, new byte[]{0x00}, new byte[]{0x01},
+ public void testForeachRemainingCallsConsumerWithAllElements() throws RocksDatabaseException {
+ when(getRocksDBIteratorMock().isValid())
+ .thenReturn(true, true, true, true, true, true, false);
+ when(getRocksDBIteratorMock().key())
+ .thenReturn(new byte[]{0x00}, new byte[]{0x01},
new byte[]{0x02})
.thenThrow(new NoSuchElementException());
- when(rocksDBIteratorMock.value())
- .thenReturn(new byte[]{0x7f}, new byte[]{0x7f}, new byte[]{0x7e},
+ when(getRocksDBIteratorMock().value())
+ .thenReturn(new byte[]{0x7f}, new byte[]{0x7e},
new byte[]{0x7d})
.thenThrow(new NoSuchElementException());
@@ -115,8 +97,8 @@ public void testForeachRemainingCallsConsumerWithAllElements() {
}
@Test
- public void testHasNextDependsOnIsvalid() {
- when(rocksDBIteratorMock.isValid()).thenReturn(true, true, false);
+ public void testHasNextDependsOnIsvalid() throws RocksDatabaseException {
+ when(getRocksDBIteratorMock().isValid()).thenReturn(true, false);
RDBStoreByteArrayIterator iter = newIterator();
@@ -125,70 +107,70 @@ public void testHasNextDependsOnIsvalid() {
}
@Test
- public void testNextCallsIsValidThenGetsTheValueAndStepsToNext() {
- when(rocksDBIteratorMock.isValid()).thenReturn(true);
+ public void testNextCallsIsValidThenGetsTheValueAndStepsToNext() throws RocksDatabaseException {
+ when(getRocksDBIteratorMock().isValid()).thenReturn(true);
RDBStoreByteArrayIterator iter = newIterator();
- InOrder verifier = inOrder(rocksDBIteratorMock);
+ InOrder verifier = inOrder(getRocksDBIteratorMock());
iter.next();
- verifier.verify(rocksDBIteratorMock).isValid();
- verifier.verify(rocksDBIteratorMock).key();
- verifier.verify(rocksDBIteratorMock).value();
- verifier.verify(rocksDBIteratorMock).next();
+ verifier.verify(getRocksDBIteratorMock()).isValid();
+ verifier.verify(getRocksDBIteratorMock()).key();
+ verifier.verify(getRocksDBIteratorMock()).value();
+ verifier.verify(getRocksDBIteratorMock()).next();
}
@Test
- public void testConstructorSeeksToFirstElement() {
+ public void testConstructorSeeksToFirstElement() throws RocksDatabaseException {
newIterator();
- verify(rocksDBIteratorMock, times(1)).seekToFirst();
+ verify(getRocksDBIteratorMock(), times(1)).seekToFirst();
}
@Test
- public void testSeekToFirstSeeks() {
+ public void testSeekToFirstSeeks() throws RocksDatabaseException {
RDBStoreByteArrayIterator iter = newIterator();
iter.seekToFirst();
- verify(rocksDBIteratorMock, times(2)).seekToFirst();
+ verify(getRocksDBIteratorMock(), times(2)).seekToFirst();
}
@Test
- public void testSeekToLastSeeks() {
+ public void testSeekToLastSeeks() throws RocksDatabaseException {
RDBStoreByteArrayIterator iter = newIterator();
iter.seekToLast();
- verify(rocksDBIteratorMock, times(1)).seekToLast();
+ verify(getRocksDBIteratorMock(), times(1)).seekToLast();
}
@Test
public void testSeekReturnsTheActualKey() throws Exception {
- when(rocksDBIteratorMock.isValid()).thenReturn(true);
- when(rocksDBIteratorMock.key()).thenReturn(new byte[]{0x00});
- when(rocksDBIteratorMock.value()).thenReturn(new byte[]{0x7f});
+ when(getRocksDBIteratorMock().isValid()).thenReturn(true);
+ when(getRocksDBIteratorMock().key()).thenReturn(new byte[]{0x00});
+ when(getRocksDBIteratorMock().value()).thenReturn(new byte[]{0x7f});
RDBStoreByteArrayIterator iter = newIterator();
final Table.KeyValue val = iter.seek(new byte[]{0x55});
- InOrder verifier = inOrder(rocksDBIteratorMock);
+ InOrder verifier = inOrder(getRocksDBIteratorMock());
- verify(rocksDBIteratorMock, times(1)).seekToFirst(); //at construct time
- verify(rocksDBIteratorMock, never()).seekToLast();
- verifier.verify(rocksDBIteratorMock, times(1)).seek(any(byte[].class));
- verifier.verify(rocksDBIteratorMock, times(1)).isValid();
- verifier.verify(rocksDBIteratorMock, times(1)).key();
- verifier.verify(rocksDBIteratorMock, times(1)).value();
+ verify(getRocksDBIteratorMock(), times(1)).seekToFirst(); //at construct time
+ verify(getRocksDBIteratorMock(), never()).seekToLast();
+ verifier.verify(getRocksDBIteratorMock(), times(1)).seek(any(byte[].class));
+ verifier.verify(getRocksDBIteratorMock(), times(1)).isValid();
+ verifier.verify(getRocksDBIteratorMock(), times(1)).key();
+ verifier.verify(getRocksDBIteratorMock(), times(1)).value();
assertArrayEquals(new byte[]{0x00}, val.getKey());
assertArrayEquals(new byte[]{0x7f}, val.getValue());
}
@Test
public void testGettingTheKeyIfIteratorIsValid() throws Exception {
- when(rocksDBIteratorMock.isValid()).thenReturn(true);
- when(rocksDBIteratorMock.key()).thenReturn(new byte[]{0x00});
+ when(getRocksDBIteratorMock().isValid()).thenReturn(true);
+ when(getRocksDBIteratorMock().key()).thenReturn(new byte[]{0x00});
RDBStoreByteArrayIterator iter = newIterator();
byte[] key = null;
@@ -197,18 +179,18 @@ public void testGettingTheKeyIfIteratorIsValid() throws Exception {
key = entry.getKey();
}
- InOrder verifier = inOrder(rocksDBIteratorMock);
+ InOrder verifier = inOrder(getRocksDBIteratorMock());
- verifier.verify(rocksDBIteratorMock, times(1)).isValid();
- verifier.verify(rocksDBIteratorMock, times(1)).key();
+ verifier.verify(getRocksDBIteratorMock(), times(2)).isValid();
+ verifier.verify(getRocksDBIteratorMock(), times(1)).key();
assertArrayEquals(new byte[]{0x00}, key);
}
@Test
public void testGettingTheValueIfIteratorIsValid() throws Exception {
- when(rocksDBIteratorMock.isValid()).thenReturn(true);
- when(rocksDBIteratorMock.key()).thenReturn(new byte[]{0x00});
- when(rocksDBIteratorMock.value()).thenReturn(new byte[]{0x7f});
+ when(getRocksDBIteratorMock().isValid()).thenReturn(true);
+ when(getRocksDBIteratorMock().key()).thenReturn(new byte[]{0x00});
+ when(getRocksDBIteratorMock().value()).thenReturn(new byte[]{0x7f});
RDBStoreByteArrayIterator iter = newIterator();
Table.KeyValue entry;
@@ -220,10 +202,10 @@ public void testGettingTheValueIfIteratorIsValid() throws Exception {
value = entry.getValue();
}
- InOrder verifier = inOrder(rocksDBIteratorMock);
+ InOrder verifier = inOrder(getRocksDBIteratorMock());
- verifier.verify(rocksDBIteratorMock, times(1)).isValid();
- verifier.verify(rocksDBIteratorMock, times(1)).key();
+ verifier.verify(getRocksDBIteratorMock(), times(2)).isValid();
+ verifier.verify(getRocksDBIteratorMock(), times(1)).key();
assertArrayEquals(new byte[]{0x00}, key);
assertArrayEquals(new byte[]{0x7f}, value);
}
@@ -231,20 +213,21 @@ public void testGettingTheValueIfIteratorIsValid() throws Exception {
@Test
public void testRemovingFromDBActuallyDeletesFromTable() throws Exception {
byte[] testKey = new byte[]{0x00};
- when(rocksDBIteratorMock.isValid()).thenReturn(true);
- when(rocksDBIteratorMock.key()).thenReturn(testKey);
+ when(getRocksDBIteratorMock().isValid()).thenReturn(true);
+ when(getRocksDBIteratorMock().key()).thenReturn(testKey);
RDBStoreByteArrayIterator iter = newIterator(null);
+ iter.next();
iter.removeFromDB();
- InOrder verifier = inOrder(rocksDBIteratorMock, rocksTableMock);
+ InOrder verifier = inOrder(getRocksDBIteratorMock(), getRocksTableMock());
- verifier.verify(rocksDBIteratorMock, times(1)).isValid();
- verifier.verify(rocksTableMock, times(1)).delete(testKey);
+ verifier.verify(getRocksDBIteratorMock(), times(1)).isValid();
+ verifier.verify(getRocksTableMock(), times(1)).delete(testKey);
}
@Test
- public void testRemoveFromDBWithoutDBTableSet() {
+ public void testRemoveFromDBWithoutDBTableSet() throws RocksDatabaseException {
RDBStoreByteArrayIterator iter = newIterator();
assertThrows(UnsupportedOperationException.class,
iter::removeFromDB);
@@ -255,49 +238,26 @@ public void testCloseCloses() throws Exception {
RDBStoreByteArrayIterator iter = newIterator();
iter.close();
- verify(rocksDBIteratorMock, times(1)).close();
+ verify(getRocksDBIteratorMock(), times(1)).close();
}
@Test
public void testNullPrefixedIterator() throws IOException {
RDBStoreByteArrayIterator iter = newIterator(null);
- verify(rocksDBIteratorMock, times(1)).seekToFirst();
- clearInvocations(rocksDBIteratorMock);
+ verify(getRocksDBIteratorMock(), times(1)).seekToFirst();
+ clearInvocations(getRocksDBIteratorMock());
iter.seekToFirst();
- verify(rocksDBIteratorMock, times(1)).seekToFirst();
- clearInvocations(rocksDBIteratorMock);
+ verify(getRocksDBIteratorMock(), times(1)).seekToFirst();
+ clearInvocations(getRocksDBIteratorMock());
- when(rocksDBIteratorMock.isValid()).thenReturn(true);
+ when(getRocksDBIteratorMock().isValid()).thenReturn(true);
assertTrue(iter.hasNext());
- verify(rocksDBIteratorMock, times(1)).isValid();
- verify(rocksDBIteratorMock, times(0)).key();
+ verify(getRocksDBIteratorMock(), times(1)).isValid();
+ verify(getRocksDBIteratorMock(), times(0)).key();
iter.seekToLast();
- verify(rocksDBIteratorMock, times(1)).seekToLast();
-
- iter.close();
- }
-
- @Test
- public void testNormalPrefixedIterator() throws IOException {
- byte[] testPrefix = "sample".getBytes(StandardCharsets.UTF_8);
- RDBStoreByteArrayIterator iter = newIterator(testPrefix);
- verify(rocksDBIteratorMock, times(1)).seek(testPrefix);
- clearInvocations(rocksDBIteratorMock);
-
- iter.seekToFirst();
- verify(rocksDBIteratorMock, times(1)).seek(testPrefix);
- clearInvocations(rocksDBIteratorMock);
-
- when(rocksDBIteratorMock.isValid()).thenReturn(true);
- when(rocksDBIteratorMock.key()).thenReturn(testPrefix);
- assertTrue(iter.hasNext());
- verify(rocksDBIteratorMock, times(1)).isValid();
- verify(rocksDBIteratorMock, times(1)).key();
- Exception e =
- assertThrows(Exception.class, () -> iter.seekToLast(), "Prefixed iterator does not support seekToLast");
- assertInstanceOf(UnsupportedOperationException.class, e);
+ verify(getRocksDBIteratorMock(), times(1)).seekToLast();
iter.close();
}
diff --git a/hadoop-hdds/framework/src/test/java/org/apache/hadoop/hdds/utils/db/TestRDBStoreCodecBufferIterator.java b/hadoop-hdds/framework/src/test/java/org/apache/hadoop/hdds/utils/db/TestRDBStoreCodecBufferIterator.java
index 919b3b6cdad..13e68f898d6 100644
--- a/hadoop-hdds/framework/src/test/java/org/apache/hadoop/hdds/utils/db/TestRDBStoreCodecBufferIterator.java
+++ b/hadoop-hdds/framework/src/test/java/org/apache/hadoop/hdds/utils/db/TestRDBStoreCodecBufferIterator.java
@@ -20,20 +20,17 @@
import static org.apache.hadoop.hdds.utils.db.IteratorType.KEY_AND_VALUE;
import static org.junit.jupiter.api.Assertions.assertArrayEquals;
import static org.junit.jupiter.api.Assertions.assertFalse;
-import static org.junit.jupiter.api.Assertions.assertInstanceOf;
import static org.junit.jupiter.api.Assertions.assertThrows;
import static org.junit.jupiter.api.Assertions.assertTrue;
import static org.mockito.Mockito.any;
import static org.mockito.Mockito.clearInvocations;
import static org.mockito.Mockito.inOrder;
-import static org.mockito.Mockito.mock;
import static org.mockito.Mockito.never;
import static org.mockito.Mockito.times;
import static org.mockito.Mockito.verify;
import static org.mockito.Mockito.when;
import java.nio.ByteBuffer;
-import java.nio.charset.StandardCharsets;
import java.util.ArrayList;
import java.util.List;
import java.util.NoSuchElementException;
@@ -41,43 +38,40 @@
import org.apache.hadoop.hdds.StringUtils;
import org.apache.hadoop.hdds.utils.db.managed.ManagedRocksIterator;
import org.apache.hadoop.hdds.utils.db.managed.ManagedRocksObjectUtils;
-import org.apache.log4j.Level;
-import org.apache.log4j.Logger;
+import org.junit.jupiter.api.BeforeAll;
import org.junit.jupiter.api.BeforeEach;
import org.junit.jupiter.api.Test;
import org.mockito.InOrder;
import org.mockito.stubbing.Answer;
-import org.rocksdb.RocksIterator;
/**
* This test is similar to {@link TestRDBStoreByteArrayIterator}
* except that this test is for {@link RDBStoreCodecBufferIterator}.
*/
-public class TestRDBStoreCodecBufferIterator {
+public class TestRDBStoreCodecBufferIterator extends TestRDBStoreAbstractIterator {
- private RocksIterator rocksIteratorMock;
- private ManagedRocksIterator managedRocksIterator;
- private RDBTable rdbTableMock;
+ @BeforeAll
+ public static void init() {
+ ManagedRocksObjectUtils.loadRocksDBLibrary();
+ }
@BeforeEach
- public void setup() {
+ public void setupTest() {
CodecBuffer.enableLeakDetection();
- rocksIteratorMock = mock(RocksIterator.class);
- managedRocksIterator = newManagedRocksIterator();
- rdbTableMock = mock(RDBTable.class);
- Logger.getLogger(ManagedRocksObjectUtils.class).setLevel(Level.DEBUG);
}
ManagedRocksIterator newManagedRocksIterator() {
- return new ManagedRocksIterator(rocksIteratorMock);
+ return new ManagedRocksIterator(getRocksDBIteratorMock());
}
- RDBStoreCodecBufferIterator newIterator() {
- return new RDBStoreCodecBufferIterator(managedRocksIterator, null, null, KEY_AND_VALUE);
+ @Override
+ RDBStoreCodecBufferIterator newIterator() throws RocksDatabaseException {
+ return new RDBStoreCodecBufferIterator(getItrInitializer(), null, null, KEY_AND_VALUE);
}
- RDBStoreCodecBufferIterator newIterator(CodecBuffer prefix) {
- return new RDBStoreCodecBufferIterator(managedRocksIterator, rdbTableMock, prefix, KEY_AND_VALUE);
+ @Override
+ RDBStoreCodecBufferIterator newIterator(byte[] prefix) throws RocksDatabaseException {
+ return new RDBStoreCodecBufferIterator(getItrInitializer(), getRocksTableMock(), prefix, KEY_AND_VALUE);
}
Answer newAnswerInt(String name, int b) {
@@ -98,16 +92,14 @@ Answer newAnswer(String name, byte... b) {
@Test
public void testForEachRemaining() throws Exception {
- when(rocksIteratorMock.isValid())
- .thenReturn(true, true, true, true, true, true, true, false);
- when(rocksIteratorMock.key(any()))
- .then(newAnswerInt("key1", 0x00))
+ when(getRocksDBIteratorMock().isValid())
+ .thenReturn(true, true, true, true, true, true, false);
+ when(getRocksDBIteratorMock().key(any()))
.then(newAnswerInt("key2", 0x00))
.then(newAnswerInt("key3", 0x01))
.then(newAnswerInt("key4", 0x02))
.thenThrow(new NoSuchElementException());
- when(rocksIteratorMock.value(any()))
- .then(newAnswerInt("val1", 0x7f))
+ when(getRocksDBIteratorMock().value(any()))
.then(newAnswerInt("val2", 0x7f))
.then(newAnswerInt("val3", 0x7e))
.then(newAnswerInt("val4", 0x7d))
@@ -132,7 +124,7 @@ public void testForEachRemaining() throws Exception {
@Test
public void testHasNextDependsOnIsvalid() throws Exception {
- when(rocksIteratorMock.isValid()).thenReturn(true, true, false);
+ when(getRocksDBIteratorMock().isValid()).thenReturn(true, false);
try (RDBStoreCodecBufferIterator i = newIterator()) {
assertTrue(i.hasNext());
@@ -145,16 +137,16 @@ public void testHasNextDependsOnIsvalid() throws Exception {
@Test
public void testNextCallsIsValidThenGetsTheValueAndStepsToNext()
throws Exception {
- when(rocksIteratorMock.isValid()).thenReturn(true);
- InOrder verifier = inOrder(rocksIteratorMock);
+ when(getRocksDBIteratorMock().isValid()).thenReturn(true);
+ InOrder verifier = inOrder(getRocksDBIteratorMock());
try (RDBStoreCodecBufferIterator i = newIterator()) {
i.next();
}
- verifier.verify(rocksIteratorMock).isValid();
- verifier.verify(rocksIteratorMock).key(any());
- verifier.verify(rocksIteratorMock).value(any());
- verifier.verify(rocksIteratorMock).next();
+ verifier.verify(getRocksDBIteratorMock()).isValid();
+ verifier.verify(getRocksDBIteratorMock()).key(any());
+ verifier.verify(getRocksDBIteratorMock()).value(any());
+ verifier.verify(getRocksDBIteratorMock()).next();
CodecTestUtil.gc();
}
@@ -163,7 +155,7 @@ public void testNextCallsIsValidThenGetsTheValueAndStepsToNext()
public void testConstructorSeeksToFirstElement() throws Exception {
newIterator().close();
- verify(rocksIteratorMock, times(1)).seekToFirst();
+ verify(getRocksDBIteratorMock(), times(1)).seekToFirst();
CodecTestUtil.gc();
}
@@ -173,7 +165,7 @@ public void testSeekToFirstSeeks() throws Exception {
try (RDBStoreCodecBufferIterator i = newIterator()) {
i.seekToFirst();
}
- verify(rocksIteratorMock, times(2)).seekToFirst();
+ verify(getRocksDBIteratorMock(), times(2)).seekToFirst();
CodecTestUtil.gc();
}
@@ -184,32 +176,32 @@ public void testSeekToLastSeeks() throws Exception {
i.seekToLast();
}
- verify(rocksIteratorMock, times(1)).seekToLast();
+ verify(getRocksDBIteratorMock(), times(1)).seekToLast();
CodecTestUtil.gc();
}
@Test
public void testSeekReturnsTheActualKey() throws Exception {
- when(rocksIteratorMock.isValid()).thenReturn(true);
- when(rocksIteratorMock.key(any()))
+ when(getRocksDBIteratorMock().isValid()).thenReturn(true);
+ when(getRocksDBIteratorMock().key(any()))
.then(newAnswerInt("key1", 0x00));
- when(rocksIteratorMock.value(any()))
+ when(getRocksDBIteratorMock().value(any()))
.then(newAnswerInt("val1", 0x7f));
try (RDBStoreCodecBufferIterator i = newIterator();
CodecBuffer target = CodecBuffer.wrap(new byte[]{0x55})) {
final Table.KeyValue val = i.seek(target);
- InOrder verifier = inOrder(rocksIteratorMock);
+ InOrder verifier = inOrder(getRocksDBIteratorMock());
- verify(rocksIteratorMock, times(1)).seekToFirst(); //at construct time
- verify(rocksIteratorMock, never()).seekToLast();
- verifier.verify(rocksIteratorMock, times(1))
+ verify(getRocksDBIteratorMock(), times(1)).seekToFirst(); //at construct time
+ verify(getRocksDBIteratorMock(), never()).seekToLast();
+ verifier.verify(getRocksDBIteratorMock(), times(1))
.seek(any(ByteBuffer.class));
- verifier.verify(rocksIteratorMock, times(1)).isValid();
- verifier.verify(rocksIteratorMock, times(1)).key(any());
- verifier.verify(rocksIteratorMock, times(1)).value(any());
+ verifier.verify(getRocksDBIteratorMock(), times(1)).isValid();
+ verifier.verify(getRocksDBIteratorMock(), times(1)).key(any());
+ verifier.verify(getRocksDBIteratorMock(), times(1)).value(any());
assertArrayEquals(new byte[]{0x00}, val.getKey().getArray());
assertArrayEquals(new byte[]{0x7f}, val.getValue().getArray());
}
@@ -219,8 +211,8 @@ public void testSeekReturnsTheActualKey() throws Exception {
@Test
public void testGettingTheKeyIfIteratorIsValid() throws Exception {
- when(rocksIteratorMock.isValid()).thenReturn(true);
- when(rocksIteratorMock.key(any()))
+ when(getRocksDBIteratorMock().isValid()).thenReturn(true);
+ when(getRocksDBIteratorMock().key(any()))
.then(newAnswerInt("key1", 0x00));
byte[] key = null;
@@ -230,10 +222,10 @@ public void testGettingTheKeyIfIteratorIsValid() throws Exception {
}
}
- InOrder verifier = inOrder(rocksIteratorMock);
+ InOrder verifier = inOrder(getRocksDBIteratorMock());
- verifier.verify(rocksIteratorMock, times(1)).isValid();
- verifier.verify(rocksIteratorMock, times(1)).key(any());
+ verifier.verify(getRocksDBIteratorMock(), times(2)).isValid();
+ verifier.verify(getRocksDBIteratorMock(), times(1)).key(any());
assertArrayEquals(new byte[]{0x00}, key);
CodecTestUtil.gc();
@@ -241,10 +233,10 @@ public void testGettingTheKeyIfIteratorIsValid() throws Exception {
@Test
public void testGettingTheValueIfIteratorIsValid() throws Exception {
- when(rocksIteratorMock.isValid()).thenReturn(true);
- when(rocksIteratorMock.key(any()))
+ when(getRocksDBIteratorMock().isValid()).thenReturn(true);
+ when(getRocksDBIteratorMock().key(any()))
.then(newAnswerInt("key1", 0x00));
- when(rocksIteratorMock.value(any()))
+ when(getRocksDBIteratorMock().value(any()))
.then(newAnswerInt("val1", 0x7f));
byte[] key = null;
@@ -257,10 +249,10 @@ public void testGettingTheValueIfIteratorIsValid() throws Exception {
}
}
- InOrder verifier = inOrder(rocksIteratorMock);
+ InOrder verifier = inOrder(getRocksDBIteratorMock());
- verifier.verify(rocksIteratorMock, times(1)).isValid();
- verifier.verify(rocksIteratorMock, times(1)).key(any());
+ verifier.verify(getRocksDBIteratorMock(), times(2)).isValid();
+ verifier.verify(getRocksDBIteratorMock(), times(1)).key(any());
assertArrayEquals(new byte[]{0x00}, key);
assertArrayEquals(new byte[]{0x7f}, value);
@@ -271,18 +263,19 @@ public void testGettingTheValueIfIteratorIsValid() throws Exception {
public void testRemovingFromDBActuallyDeletesFromTable() throws Exception {
final byte[] testKey = new byte[10];
ThreadLocalRandom.current().nextBytes(testKey);
- when(rocksIteratorMock.isValid()).thenReturn(true);
- when(rocksIteratorMock.key(any()))
+ when(getRocksDBIteratorMock().isValid()).thenReturn(true);
+ when(getRocksDBIteratorMock().key(any()))
.then(newAnswer("key1", testKey));
try (RDBStoreCodecBufferIterator i = newIterator(null)) {
+ i.next();
i.removeFromDB();
}
- InOrder verifier = inOrder(rocksIteratorMock, rdbTableMock);
+ InOrder verifier = inOrder(getRocksDBIteratorMock(), getRocksTableMock());
- verifier.verify(rocksIteratorMock, times(1)).isValid();
- verifier.verify(rdbTableMock, times(1))
+ verifier.verify(getRocksDBIteratorMock(), times(1)).isValid();
+ verifier.verify(getRocksTableMock(), times(1))
.delete(ByteBuffer.wrap(testKey));
CodecTestUtil.gc();
@@ -302,7 +295,7 @@ public void testRemoveFromDBWithoutDBTableSet() throws Exception {
public void testCloseCloses() throws Exception {
newIterator().close();
- verify(rocksIteratorMock, times(1)).close();
+ verify(getRocksDBIteratorMock(), times(1)).close();
CodecTestUtil.gc();
}
@@ -310,48 +303,20 @@ public void testCloseCloses() throws Exception {
@Test
public void testNullPrefixedIterator() throws Exception {
try (RDBStoreCodecBufferIterator i = newIterator()) {
- verify(rocksIteratorMock, times(1)).seekToFirst();
- clearInvocations(rocksIteratorMock);
+ verify(getRocksDBIteratorMock(), times(1)).seekToFirst();
+ clearInvocations(getRocksDBIteratorMock());
i.seekToFirst();
- verify(rocksIteratorMock, times(1)).seekToFirst();
- clearInvocations(rocksIteratorMock);
+ verify(getRocksDBIteratorMock(), times(1)).seekToFirst();
+ clearInvocations(getRocksDBIteratorMock());
- when(rocksIteratorMock.isValid()).thenReturn(true);
+ when(getRocksDBIteratorMock().isValid()).thenReturn(true);
assertTrue(i.hasNext());
- verify(rocksIteratorMock, times(1)).isValid();
- verify(rocksIteratorMock, times(0)).key(any());
+ verify(getRocksDBIteratorMock(), times(1)).isValid();
+ verify(getRocksDBIteratorMock(), times(0)).key(any());
i.seekToLast();
- verify(rocksIteratorMock, times(1)).seekToLast();
- }
-
- CodecTestUtil.gc();
- }
-
- @Test
- public void testNormalPrefixedIterator() throws Exception {
- final byte[] prefixBytes = "sample".getBytes(StandardCharsets.UTF_8);
- try (RDBStoreCodecBufferIterator i = newIterator(
- CodecBuffer.wrap(prefixBytes))) {
- final ByteBuffer prefix = ByteBuffer.wrap(prefixBytes);
- verify(rocksIteratorMock, times(1)).seek(prefix);
- clearInvocations(rocksIteratorMock);
-
- i.seekToFirst();
- verify(rocksIteratorMock, times(1)).seek(prefix);
- clearInvocations(rocksIteratorMock);
-
- when(rocksIteratorMock.isValid()).thenReturn(true);
- when(rocksIteratorMock.key(any()))
- .then(newAnswer("key1", prefixBytes));
- assertTrue(i.hasNext());
- verify(rocksIteratorMock, times(1)).isValid();
- verify(rocksIteratorMock, times(1)).key(any());
-
- Exception e =
- assertThrows(Exception.class, () -> i.seekToLast(), "Prefixed iterator does not support seekToLast");
- assertInstanceOf(UnsupportedOperationException.class, e);
+ verify(getRocksDBIteratorMock(), times(1)).seekToLast();
}
CodecTestUtil.gc();
diff --git a/hadoop-hdds/framework/src/test/java/org/apache/hadoop/hdds/utils/db/TestRDBStorePoolBackedCodecBufferIterator.java b/hadoop-hdds/framework/src/test/java/org/apache/hadoop/hdds/utils/db/TestRDBStorePoolBackedCodecBufferIterator.java
new file mode 100644
index 00000000000..8c158e21b13
--- /dev/null
+++ b/hadoop-hdds/framework/src/test/java/org/apache/hadoop/hdds/utils/db/TestRDBStorePoolBackedCodecBufferIterator.java
@@ -0,0 +1,434 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.hdds.utils.db;
+
+import static org.apache.hadoop.hdds.utils.db.IteratorType.KEY_AND_VALUE;
+import static org.junit.jupiter.api.Assertions.assertArrayEquals;
+import static org.junit.jupiter.api.Assertions.assertEquals;
+import static org.junit.jupiter.api.Assertions.assertFalse;
+import static org.junit.jupiter.api.Assertions.assertNull;
+import static org.junit.jupiter.api.Assertions.assertThrows;
+import static org.junit.jupiter.api.Assertions.assertTrue;
+import static org.mockito.Mockito.any;
+import static org.mockito.Mockito.clearInvocations;
+import static org.mockito.Mockito.inOrder;
+import static org.mockito.Mockito.never;
+import static org.mockito.Mockito.times;
+import static org.mockito.Mockito.verify;
+import static org.mockito.Mockito.when;
+
+import java.nio.ByteBuffer;
+import java.util.ArrayList;
+import java.util.List;
+import java.util.NoSuchElementException;
+import java.util.concurrent.CompletableFuture;
+import java.util.concurrent.ExecutorService;
+import java.util.concurrent.Executors;
+import java.util.concurrent.ThreadLocalRandom;
+import java.util.concurrent.TimeoutException;
+import java.util.concurrent.atomic.AtomicInteger;
+import java.util.concurrent.atomic.AtomicReference;
+import org.apache.hadoop.hdds.StringUtils;
+import org.apache.hadoop.hdds.utils.db.Table.CloseableKeyValue;
+import org.apache.hadoop.hdds.utils.db.Table.KeyValue;
+import org.apache.hadoop.hdds.utils.db.managed.ManagedRocksObjectUtils;
+import org.apache.ozone.test.GenericTestUtils;
+import org.junit.jupiter.api.BeforeAll;
+import org.junit.jupiter.api.BeforeEach;
+import org.junit.jupiter.api.Test;
+import org.junit.jupiter.params.ParameterizedTest;
+import org.junit.jupiter.params.provider.ValueSource;
+import org.mockito.InOrder;
+import org.mockito.stubbing.Answer;
+
+/**
+ */
+public class TestRDBStorePoolBackedCodecBufferIterator
+ extends TestRDBStoreAbstractIterator {
+
+ @BeforeAll
+ public static void init() {
+ ManagedRocksObjectUtils.loadRocksDBLibrary();
+ }
+
+ @BeforeEach
+ public void setupTest() {
+ CodecBuffer.enableLeakDetection();
+ }
+
+ @Override
+ RDBStorePoolBackedCodecBufferIterator newIterator() throws RocksDatabaseException {
+ return new RDBStorePoolBackedCodecBufferIterator(getItrInitializer(), null, null, KEY_AND_VALUE, 1);
+ }
+
+ RDBStorePoolBackedCodecBufferIterator newIterator(int poolSize) throws RocksDatabaseException {
+ return new RDBStorePoolBackedCodecBufferIterator(getItrInitializer(), null, null, KEY_AND_VALUE,
+ poolSize);
+ }
+
+ @Override
+ RDBStorePoolBackedCodecBufferIterator newIterator(byte[] prefix) throws RocksDatabaseException {
+ return new RDBStorePoolBackedCodecBufferIterator(getItrInitializer(), getRocksTableMock(), prefix, KEY_AND_VALUE,
+ 1);
+ }
+
+ Answer newAnswerInt(String name, int b) {
+ return newAnswer(name, (byte) b);
+ }
+
+ Answer newAnswer(String name, byte... b) {
+ return invocation -> {
+ System.out.printf("answer %s: %s%n", name, StringUtils.bytes2Hex(b));
+ Object[] args = invocation.getArguments();
+ final ByteBuffer buffer = (ByteBuffer) args[0];
+ return writeToBuffer(buffer, b);
+ };
+ }
+
+ private int writeToBuffer(ByteBuffer buffer, byte... bytesToWrite) {
+ buffer.clear();
+ buffer.put(bytesToWrite);
+ buffer.flip();
+ return bytesToWrite.length;
+ }
+
+ @ParameterizedTest
+ @ValueSource(ints = {0, 1, 2, 3, 5, 10})
+ public void testRDBStoreCodecBufferIterGetsFailBeyondMaxBuffers(int maxBuffers)
+ throws InterruptedException, TimeoutException, RocksDatabaseException {
+ List> vals = new ArrayList<>();
+ when(getRocksDBIteratorMock().isValid()).thenReturn(true);
+ AtomicInteger counter = new AtomicInteger(0);
+
+ when(getRocksDBIteratorMock().key(any()))
+ .thenAnswer(i -> writeToBuffer(i.getArgument(0), (byte)counter.getAndIncrement()));
+ when(getRocksDBIteratorMock().value(any()))
+ .thenAnswer(i -> writeToBuffer(i.getArgument(0), (byte)counter.getAndIncrement()));
+ try (RDBStorePoolBackedCodecBufferIterator iterator = newIterator(maxBuffers)) {
+ for (int i = 0; i < maxBuffers; i++) {
+ vals.add(iterator.next());
+ }
+ assertEquals(Math.max(maxBuffers, 0), vals.size());
+ ExecutorService executor = Executors.newSingleThreadExecutor();
+ AtomicReference> nextThread = new AtomicReference<>(CompletableFuture.supplyAsync(
+ () -> {
+ CloseableKeyValue v = iterator.next();
+ vals.add(v);
+ return true;
+ },
+ executor));
+ if (maxBuffers < 1) {
+ // Number of max buffers is always going to be at least 1.
+ GenericTestUtils.waitFor(() -> nextThread.get().isDone() && nextThread.get().join(), 10, 100);
+ nextThread.set(CompletableFuture.supplyAsync(
+ () -> {
+ CloseableKeyValue v = iterator.next();
+ vals.add(v);
+ return true;
+ },
+ executor));
+ }
+ assertEquals(Math.max(1, maxBuffers), vals.size());
+ for (int i = 0; i < vals.size(); i++) {
+ assertEquals(2 * i, vals.get(i).getKey().getArray()[0]);
+ assertEquals(2 * i + 1, vals.get(i).getValue().getArray()[0]);
+ }
+ assertFalse(nextThread.get().isDone());
+ int size = vals.size();
+ vals.get(0).close();
+ GenericTestUtils.waitFor(() -> nextThread.get().isDone() && nextThread.get().join(), 10, 100);
+ assertEquals(size + 1, vals.size());
+ assertEquals(2 * size, vals.get(size).getKey().getArray()[0]);
+ assertEquals(2 * size + 1, vals.get(size).getValue().getArray()[0]);
+ for (CloseableKeyValue v : vals) {
+ v.close();
+ }
+ executor.shutdown();
+ }
+ }
+
+ @Test
+ public void testForEachRemaining() throws Exception {
+ when(getRocksDBIteratorMock().isValid())
+ .thenReturn(true, true, true, true, true, true, false);
+ when(getRocksDBIteratorMock().key(any()))
+ .then(newAnswerInt("key1", 0x00))
+ .then(newAnswerInt("key2", 0x01))
+ .then(newAnswerInt("key3", 0x02))
+ .thenThrow(new NoSuchElementException());
+ when(getRocksDBIteratorMock().value(any()))
+ .then(newAnswerInt("val1", 0x7f))
+ .then(newAnswerInt("val2", 0x7e))
+ .then(newAnswerInt("val3", 0x7d))
+ .thenThrow(new NoSuchElementException());
+
+ List> remaining = new ArrayList<>();
+ try (RDBStorePoolBackedCodecBufferIterator i = newIterator()) {
+ i.forEachRemaining(kvSupplier -> {
+ try {
+ remaining.add(Table.newKeyValue(
+ kvSupplier.getKey().getArray(), kvSupplier.getValue().getArray()));
+ } finally {
+ kvSupplier.close();
+ }
+ });
+
+ System.out.println("remaining: " + remaining);
+ assertArrayEquals(new byte[]{0x00}, remaining.get(0).getKey());
+ assertArrayEquals(new byte[]{0x7f}, remaining.get(0).getValue());
+ assertArrayEquals(new byte[]{0x01}, remaining.get(1).getKey());
+ assertArrayEquals(new byte[]{0x7e}, remaining.get(1).getValue());
+ assertArrayEquals(new byte[]{0x02}, remaining.get(2).getKey());
+ assertArrayEquals(new byte[]{0x7d}, remaining.get(2).getValue());
+ }
+
+ CodecTestUtil.gc();
+ }
+
+ @Test
+ public void testHasNextDoesNotDependsOnIsvalid() throws Exception {
+ when(getRocksDBIteratorMock().isValid()).thenReturn(true, true, false);
+
+ try (RDBStorePoolBackedCodecBufferIterator i = newIterator()) {
+ assertTrue(i.hasNext());
+ try (CloseableKeyValue val = i.next()) {
+ assertFalse(i.hasNext());
+ assertThrows(NoSuchElementException.class, i::next);
+ assertFalse(i.hasNext());
+ }
+ }
+
+ CodecTestUtil.gc();
+ }
+
+ @Test
+ public void testNextCallsIsValidThenGetsTheValueAndStepsToNext()
+ throws Exception {
+ when(getRocksDBIteratorMock().isValid()).thenReturn(true);
+ InOrder verifier = inOrder(getRocksDBIteratorMock());
+ try (RDBStorePoolBackedCodecBufferIterator i = newIterator()) {
+ i.next().close();
+ }
+
+ verifier.verify(getRocksDBIteratorMock()).isValid();
+ verifier.verify(getRocksDBIteratorMock()).key(any());
+ verifier.verify(getRocksDBIteratorMock()).value(any());
+ verifier.verify(getRocksDBIteratorMock()).next();
+
+ CodecTestUtil.gc();
+ }
+
+ @Test
+ public void testConstructorSeeksToFirstElement() throws Exception {
+ newIterator().close();
+
+ verify(getRocksDBIteratorMock(), times(1)).seekToFirst();
+
+ CodecTestUtil.gc();
+ }
+
+ @Test
+ public void testSeekToFirstSeeks() throws Exception {
+ try (RDBStorePoolBackedCodecBufferIterator i = newIterator()) {
+ i.seekToFirst();
+ }
+ verify(getRocksDBIteratorMock(), times(2)).seekToFirst();
+
+ CodecTestUtil.gc();
+ }
+
+ @Test
+ public void testSeekToLastSeeks() throws Exception {
+ try (RDBStorePoolBackedCodecBufferIterator i = newIterator()) {
+ i.seekToLast();
+ }
+
+ verify(getRocksDBIteratorMock(), times(1)).seekToLast();
+
+ CodecTestUtil.gc();
+ }
+
+ @Test
+ public void testSeekWithInvalidValue() throws RocksDatabaseException {
+ when(getRocksDBIteratorMock().isValid()).thenReturn(false);
+
+ try (RDBStorePoolBackedCodecBufferIterator i = newIterator();
+ CodecBuffer target = CodecBuffer.wrap(new byte[] {0x55});
+ CloseableKeyValue valSupplier = i.seek(target)) {
+ assertFalse(i.hasNext());
+ InOrder verifier = inOrder(getRocksDBIteratorMock());
+ verify(getRocksDBIteratorMock(), times(1)).seekToFirst(); //at construct time
+ verify(getRocksDBIteratorMock(), never()).seekToLast();
+ verifier.verify(getRocksDBIteratorMock(), times(1))
+ .seek(any(ByteBuffer.class));
+ verifier.verify(getRocksDBIteratorMock(), times(2)).isValid();
+ verifier.verify(getRocksDBIteratorMock(), never()).key(any());
+ verifier.verify(getRocksDBIteratorMock(), never()).value(any());
+ assertNull(valSupplier);
+ }
+ }
+
+ @Test
+ public void testSeekReturnsTheActualKey() throws Exception {
+ when(getRocksDBIteratorMock().isValid()).thenReturn(true);
+ when(getRocksDBIteratorMock().key(any()))
+ .then(newAnswerInt("key1", 0x00));
+ when(getRocksDBIteratorMock().value(any()))
+ .then(newAnswerInt("val1", 0x7f));
+
+ try (RDBStorePoolBackedCodecBufferIterator i = newIterator();
+ CodecBuffer target = CodecBuffer.wrap(new byte[] {0x55});
+ CloseableKeyValue valSupplier = i.seek(target)) {
+ assertTrue(i.hasNext());
+ InOrder verifier = inOrder(getRocksDBIteratorMock());
+ verify(getRocksDBIteratorMock(), times(1)).seekToFirst(); //at construct time
+ verify(getRocksDBIteratorMock(), never()).seekToLast();
+ verifier.verify(getRocksDBIteratorMock(), times(1))
+ .seek(any(ByteBuffer.class));
+ verifier.verify(getRocksDBIteratorMock(), times(1)).isValid();
+ verifier.verify(getRocksDBIteratorMock(), times(1)).key(any());
+ verifier.verify(getRocksDBIteratorMock(), times(1)).value(any());
+ assertArrayEquals(new byte[] {0x00}, valSupplier.getKey().getArray());
+ assertArrayEquals(new byte[] {0x7f}, valSupplier.getValue().getArray());
+ }
+
+ CodecTestUtil.gc();
+ }
+
+ @Test
+ public void testGettingTheKeyIfIteratorIsValid() throws Exception {
+ when(getRocksDBIteratorMock().isValid()).thenReturn(true);
+ when(getRocksDBIteratorMock().key(any()))
+ .then(newAnswerInt("key1", 0x00));
+
+ byte[] key = null;
+ try (RDBStorePoolBackedCodecBufferIterator i = newIterator()) {
+ if (i.hasNext()) {
+ try (CloseableKeyValue kv = i.next()) {
+ key = kv.getKey().getArray();
+ }
+ }
+ }
+
+ InOrder verifier = inOrder(getRocksDBIteratorMock());
+
+ verifier.verify(getRocksDBIteratorMock(), times(2)).isValid();
+ verifier.verify(getRocksDBIteratorMock(), times(1)).key(any());
+ assertArrayEquals(new byte[]{0x00}, key);
+
+ CodecTestUtil.gc();
+ }
+
+ @Test
+ public void testGettingTheValueIfIteratorIsValid() throws Exception {
+ when(getRocksDBIteratorMock().isValid()).thenReturn(true);
+ when(getRocksDBIteratorMock().key(any()))
+ .then(newAnswerInt("key1", 0x00));
+ when(getRocksDBIteratorMock().value(any()))
+ .then(newAnswerInt("val1", 0x7f));
+
+ byte[] key = null;
+ byte[] value = null;
+ try (RDBStorePoolBackedCodecBufferIterator i = newIterator()) {
+ if (i.hasNext()) {
+ try (CloseableKeyValue entry = i.next()) {
+ key = entry.getKey().getArray();
+ value = entry.getValue().getArray();
+ }
+ }
+ }
+
+ InOrder verifier = inOrder(getRocksDBIteratorMock());
+
+ verifier.verify(getRocksDBIteratorMock(), times(2)).isValid();
+ verifier.verify(getRocksDBIteratorMock(), times(1)).key(any());
+ assertArrayEquals(new byte[]{0x00}, key);
+ assertArrayEquals(new byte[]{0x7f}, value);
+
+ CodecTestUtil.gc();
+ }
+
+ @Test
+ public void testRemovingFromDBActuallyDeletesFromTable() throws Exception {
+ final byte[] testKey = new byte[10];
+ ThreadLocalRandom.current().nextBytes(testKey);
+ when(getRocksDBIteratorMock().isValid()).thenReturn(true);
+ when(getRocksDBIteratorMock().key(any()))
+ .then(newAnswer("key1", testKey));
+
+ try (RDBStorePoolBackedCodecBufferIterator i = newIterator(null)) {
+ try (CloseableKeyValue kv = i.next()) {
+ i.removeFromDB();
+ }
+
+ }
+
+ InOrder verifier = inOrder(getRocksDBIteratorMock(), getRocksTableMock());
+
+ verifier.verify(getRocksDBIteratorMock(), times(1)).isValid();
+ verifier.verify(getRocksTableMock(), times(1))
+ .delete(ByteBuffer.wrap(testKey));
+ CodecTestUtil.gc();
+ }
+
+ @Test
+ public void testRemoveFromDBWithoutDBTableSet() throws Exception {
+ try (RDBStorePoolBackedCodecBufferIterator i = newIterator()) {
+ assertThrows(UnsupportedOperationException.class,
+ i::removeFromDB);
+ }
+
+ CodecTestUtil.gc();
+ }
+
+ @Test
+ public void testCloseCloses() throws Exception {
+ newIterator().close();
+
+ verify(getRocksDBIteratorMock(), times(1)).close();
+
+ CodecTestUtil.gc();
+ }
+
+ @Test
+ public void testNullPrefixedIterator() throws Exception {
+ try (RDBStorePoolBackedCodecBufferIterator i = newIterator()) {
+ verify(getRocksDBIteratorMock(), times(1)).seekToFirst();
+ clearInvocations(getRocksDBIteratorMock());
+ when(getRocksDBIteratorMock().isValid()).thenReturn(true);
+ i.seekToFirst();
+ verify(getRocksDBIteratorMock(), times(0)).isValid();
+ verify(getRocksDBIteratorMock(), times(0)).key(any());
+ verify(getRocksDBIteratorMock(), times(1)).seekToFirst();
+ clearInvocations(getRocksDBIteratorMock());
+ i.hasNext();
+ verify(getRocksDBIteratorMock(), times(1)).isValid();
+ clearInvocations(getRocksDBIteratorMock());
+
+ assertTrue(i.hasNext());
+ // hasNext shouldn't make isValid() redundant calls.
+ verify(getRocksDBIteratorMock(), times(1)).isValid();
+ verify(getRocksDBIteratorMock(), times(0)).key(any());
+
+ i.seekToLast();
+ verify(getRocksDBIteratorMock(), times(1)).seekToLast();
+ }
+
+ CodecTestUtil.gc();
+ }
+}
diff --git a/hadoop-hdds/framework/src/test/java/org/apache/hadoop/hdds/utils/db/TestTypedRDBTableStore.java b/hadoop-hdds/framework/src/test/java/org/apache/hadoop/hdds/utils/db/TestTypedRDBTableStore.java
index 8bf3e59e210..07df181c6a0 100644
--- a/hadoop-hdds/framework/src/test/java/org/apache/hadoop/hdds/utils/db/TestTypedRDBTableStore.java
+++ b/hadoop-hdds/framework/src/test/java/org/apache/hadoop/hdds/utils/db/TestTypedRDBTableStore.java
@@ -232,7 +232,7 @@ public void forEachAndIterator() throws Exception {
@Test
public void testIteratorOnException() throws Exception {
RDBTable rdbTable = mock(RDBTable.class);
- when(rdbTable.iterator((CodecBuffer) null, IteratorType.KEY_AND_VALUE))
+ when(rdbTable.newCodecBufferIterator(null, IteratorType.KEY_AND_VALUE))
.thenThrow(new RocksDatabaseException());
final Table testTable = new TypedTable<>(rdbTable,
StringCodec.get(), StringCodec.get(), CacheType.PARTIAL_CACHE);
diff --git a/hadoop-hdds/managed-rocksdb/src/main/java/org/apache/hadoop/hdds/utils/db/managed/ManagedReadOptions.java b/hadoop-hdds/managed-rocksdb/src/main/java/org/apache/hadoop/hdds/utils/db/managed/ManagedReadOptions.java
index 1d35cc1623f..47dac0e7b68 100644
--- a/hadoop-hdds/managed-rocksdb/src/main/java/org/apache/hadoop/hdds/utils/db/managed/ManagedReadOptions.java
+++ b/hadoop-hdds/managed-rocksdb/src/main/java/org/apache/hadoop/hdds/utils/db/managed/ManagedReadOptions.java
@@ -17,8 +17,11 @@
package org.apache.hadoop.hdds.utils.db.managed;
+import static org.apache.hadoop.hdds.utils.db.managed.ManagedRocksObjectUtils.LOG;
import static org.apache.hadoop.hdds.utils.db.managed.ManagedRocksObjectUtils.track;
+import com.google.common.annotations.VisibleForTesting;
+import org.apache.hadoop.hdds.utils.IOUtils;
import org.apache.ratis.util.UncheckedAutoCloseable;
import org.rocksdb.ReadOptions;
@@ -28,12 +31,47 @@
public class ManagedReadOptions extends ReadOptions {
private final UncheckedAutoCloseable leakTracker = track(this);
+ private ManagedSlice lowerBound;
+ private ManagedSlice upperBound;
+
+ public ManagedReadOptions() {
+ super();
+ }
+
+ public ManagedReadOptions(boolean fillCache) {
+ this(fillCache, null, null);
+ }
+
+ public ManagedReadOptions(boolean fillCache, byte[] lowerBound, byte[] upperBound) {
+ super();
+ setFillCache(fillCache);
+ if (lowerBound != null) {
+ this.lowerBound = new ManagedSlice(lowerBound);
+ this.setIterateLowerBound(this.lowerBound);
+ }
+ if (upperBound != null) {
+ this.upperBound = new ManagedSlice(upperBound);
+ this.setIterateUpperBound(this.upperBound);
+ }
+ }
+
@Override
public void close() {
try {
super.close();
+ IOUtils.close(LOG, lowerBound, upperBound);
} finally {
leakTracker.close();
}
}
+
+ @VisibleForTesting
+ public ManagedSlice getLowerBound() {
+ return lowerBound;
+ }
+
+ @VisibleForTesting
+ public ManagedSlice getUpperBound() {
+ return upperBound;
+ }
}
diff --git a/hadoop-hdds/managed-rocksdb/src/main/java/org/apache/hadoop/hdds/utils/db/managed/ManagedSlice.java b/hadoop-hdds/managed-rocksdb/src/main/java/org/apache/hadoop/hdds/utils/db/managed/ManagedSlice.java
index e35a4edc757..7f1b13a0842 100644
--- a/hadoop-hdds/managed-rocksdb/src/main/java/org/apache/hadoop/hdds/utils/db/managed/ManagedSlice.java
+++ b/hadoop-hdds/managed-rocksdb/src/main/java/org/apache/hadoop/hdds/utils/db/managed/ManagedSlice.java
@@ -47,4 +47,9 @@ protected void disposeInternal() {
leakTracker.close();
}
}
+
+ @Override
+ public boolean isOwningHandle() {
+ return super.isOwningHandle();
+ }
}