diff --git a/hadoop-hdds/framework/pom.xml b/hadoop-hdds/framework/pom.xml index ea7f08edd82..b4e0a1f3a3a 100644 --- a/hadoop-hdds/framework/pom.xml +++ b/hadoop-hdds/framework/pom.xml @@ -132,6 +132,10 @@ org.apache.commons commons-lang3 + + org.apache.commons + commons-pool2 + org.apache.hadoop hadoop-auth diff --git a/hadoop-hdds/framework/src/main/java/org/apache/hadoop/hdds/utils/db/RDBStoreAbstractIterator.java b/hadoop-hdds/framework/src/main/java/org/apache/hadoop/hdds/utils/db/RDBStoreAbstractIterator.java index c6ca7247fdc..16b00aedb8e 100644 --- a/hadoop-hdds/framework/src/main/java/org/apache/hadoop/hdds/utils/db/RDBStoreAbstractIterator.java +++ b/hadoop-hdds/framework/src/main/java/org/apache/hadoop/hdds/utils/db/RDBStoreAbstractIterator.java @@ -17,48 +17,63 @@ package org.apache.hadoop.hdds.utils.db; +import com.google.common.primitives.UnsignedBytes; +import java.util.Arrays; import java.util.NoSuchElementException; import java.util.function.Consumer; +import org.apache.hadoop.hdds.utils.db.managed.ManagedReadOptions; import org.apache.hadoop.hdds.utils.db.managed.ManagedRocksIterator; +import org.apache.ratis.util.function.CheckedFunction; import org.slf4j.Logger; import org.slf4j.LoggerFactory; /** * An abstract {@link Table.KeyValueIterator} to iterate raw {@link Table.KeyValue}s. - * + * NOTE: This class only works with RocksDB when comparator is set to Rocksdb's ByteWiseComparator. * @param the raw type. */ -abstract class RDBStoreAbstractIterator - implements Table.KeyValueIterator { +abstract class RDBStoreAbstractIterator> implements TableIterator { private static final Logger LOG = LoggerFactory.getLogger(RDBStoreAbstractIterator.class); + private final ManagedReadOptions readOptions; private final ManagedRocksIterator rocksDBIterator; private final RDBTable rocksDBTable; - private Table.KeyValue currentEntry; - // This is for schemas that use a fixed-length - // prefix for each key. - private final RAW prefix; + private KV currentEntry; private final IteratorType type; - RDBStoreAbstractIterator(ManagedRocksIterator iterator, RDBTable table, RAW prefix, IteratorType type) { - this.rocksDBIterator = iterator; + RDBStoreAbstractIterator( + CheckedFunction itrSupplier, RDBTable table, + byte[] prefix, IteratorType type) throws RocksDatabaseException { + this.readOptions = new ManagedReadOptions(false, prefix, getNextHigherPrefix(prefix)); + this.rocksDBIterator = itrSupplier.apply(readOptions); this.rocksDBTable = table; - this.prefix = prefix; this.type = type; } + private byte[] getNextHigherPrefix(byte[] prefix) { + if (prefix == null) { + return null; + } + for (int i = prefix.length - 1; i >= 0; i--) { + if (UnsignedBytes.compare(prefix[i], UnsignedBytes.MAX_VALUE) != 0) { + byte[] nextHigher = Arrays.copyOf(prefix, i + 1); + nextHigher[i] = (byte) (prefix[i] + 1); + return nextHigher; + } + } + // No higher prefix exists since all bytes are MAX_VALUE. + return null; + } + IteratorType getType() { return type; } - /** @return the key for the current entry. */ - abstract RAW key(); - /** @return the {@link Table.KeyValue} for the current entry. */ - abstract Table.KeyValue getKeyValue(); + abstract KV getKeyValue(); /** Seek to the given key. */ abstract void seek0(RAW key); @@ -66,9 +81,6 @@ IteratorType getType() { /** Delete the given key. */ abstract void delete(RAW key) throws RocksDatabaseException; - /** Does the given key start with the prefix? */ - abstract boolean startsWithPrefix(RAW key); - final ManagedRocksIterator getRocksDBIterator() { return rocksDBIterator; } @@ -77,13 +89,9 @@ final RDBTable getRocksDBTable() { return rocksDBTable; } - final RAW getPrefix() { - return prefix; - } - @Override public final void forEachRemaining( - Consumer> action) { + Consumer action) { while (hasNext()) { action.accept(next()); } @@ -99,12 +107,11 @@ private void setCurrentEntry() { @Override public final boolean hasNext() { - return rocksDBIterator.get().isValid() && - (prefix == null || startsWithPrefix(key())); + return rocksDBIterator.get().isValid(); } @Override - public final Table.KeyValue next() { + public final KV next() { setCurrentEntry(); if (currentEntry != null) { rocksDBIterator.get().next(); @@ -115,26 +122,16 @@ public final Table.KeyValue next() { @Override public final void seekToFirst() { - if (prefix == null) { - rocksDBIterator.get().seekToFirst(); - } else { - seek0(prefix); - } - setCurrentEntry(); + rocksDBIterator.get().seekToFirst(); } @Override public final void seekToLast() { - if (prefix == null) { - rocksDBIterator.get().seekToLast(); - } else { - throw new UnsupportedOperationException("seekToLast: prefix != null"); - } - setCurrentEntry(); + rocksDBIterator.get().seekToLast(); } @Override - public final Table.KeyValue seek(RAW key) { + public final KV seek(RAW key) { seek0(key); setCurrentEntry(); return currentEntry; @@ -155,5 +152,6 @@ public final void removeFromDB() throws RocksDatabaseException, CodecException { @Override public void close() { rocksDBIterator.close(); + readOptions.close(); } } diff --git a/hadoop-hdds/framework/src/main/java/org/apache/hadoop/hdds/utils/db/RDBStoreByteArrayIterator.java b/hadoop-hdds/framework/src/main/java/org/apache/hadoop/hdds/utils/db/RDBStoreByteArrayIterator.java index 67593f744e3..441659b3766 100644 --- a/hadoop-hdds/framework/src/main/java/org/apache/hadoop/hdds/utils/db/RDBStoreByteArrayIterator.java +++ b/hadoop-hdds/framework/src/main/java/org/apache/hadoop/hdds/utils/db/RDBStoreByteArrayIterator.java @@ -17,30 +17,27 @@ package org.apache.hadoop.hdds.utils.db; -import java.util.Arrays; +import org.apache.hadoop.hdds.utils.db.Table.KeyValue; +import org.apache.hadoop.hdds.utils.db.Table.KeyValueIterator; +import org.apache.hadoop.hdds.utils.db.managed.ManagedReadOptions; import org.apache.hadoop.hdds.utils.db.managed.ManagedRocksIterator; +import org.apache.ratis.util.function.CheckedFunction; /** * RocksDB store iterator using the byte[] API. */ -class RDBStoreByteArrayIterator extends RDBStoreAbstractIterator { - private static byte[] copyPrefix(byte[] prefix) { - return prefix == null || prefix.length == 0 ? null : Arrays.copyOf(prefix, prefix.length); - } +class RDBStoreByteArrayIterator extends RDBStoreAbstractIterator> + implements KeyValueIterator { - RDBStoreByteArrayIterator(ManagedRocksIterator iterator, - RDBTable table, byte[] prefix, IteratorType type) { - super(iterator, table, copyPrefix(prefix), type); + RDBStoreByteArrayIterator( + CheckedFunction itrSupplier, + RDBTable table, byte[] prefix, IteratorType type) throws RocksDatabaseException { + super(itrSupplier, table, prefix, type); seekToFirst(); } @Override - byte[] key() { - return getRocksDBIterator().get().key(); - } - - @Override - Table.KeyValue getKeyValue() { + KeyValue getKeyValue() { final ManagedRocksIterator i = getRocksDBIterator(); final byte[] key = getType().readKey() ? i.get().key() : null; final byte[] value = getType().readValue() ? i.get().value() : null; @@ -56,27 +53,4 @@ void seek0(byte[] key) { void delete(byte[] key) throws RocksDatabaseException { getRocksDBTable().delete(key); } - - @Override - boolean startsWithPrefix(byte[] value) { - final byte[] prefix = getPrefix(); - if (prefix == null) { - return true; - } - if (value == null) { - return false; - } - - int length = prefix.length; - if (value.length < length) { - return false; - } - - for (int i = 0; i < length; i++) { - if (value[i] != prefix[i]) { - return false; - } - } - return true; - } } diff --git a/hadoop-hdds/framework/src/main/java/org/apache/hadoop/hdds/utils/db/RDBStoreCodecBufferIterator.java b/hadoop-hdds/framework/src/main/java/org/apache/hadoop/hdds/utils/db/RDBStoreCodecBufferIterator.java index aa703249ebe..0b87916f327 100644 --- a/hadoop-hdds/framework/src/main/java/org/apache/hadoop/hdds/utils/db/RDBStoreCodecBufferIterator.java +++ b/hadoop-hdds/framework/src/main/java/org/apache/hadoop/hdds/utils/db/RDBStoreCodecBufferIterator.java @@ -17,29 +17,33 @@ package org.apache.hadoop.hdds.utils.db; -import java.util.Optional; import java.util.concurrent.atomic.AtomicBoolean; +import org.apache.hadoop.hdds.utils.db.Table.KeyValue; +import org.apache.hadoop.hdds.utils.db.Table.KeyValueIterator; +import org.apache.hadoop.hdds.utils.db.managed.ManagedReadOptions; import org.apache.hadoop.hdds.utils.db.managed.ManagedRocksIterator; import org.apache.ratis.util.Preconditions; +import org.apache.ratis.util.function.CheckedFunction; /** * Implement {@link RDBStoreAbstractIterator} using {@link CodecBuffer}. */ -class RDBStoreCodecBufferIterator extends RDBStoreAbstractIterator { +class RDBStoreCodecBufferIterator extends RDBStoreAbstractIterator> + implements KeyValueIterator { private final Buffer keyBuffer; private final Buffer valueBuffer; private final AtomicBoolean closed = new AtomicBoolean(); - RDBStoreCodecBufferIterator(ManagedRocksIterator iterator, RDBTable table, - CodecBuffer prefix, IteratorType type) { - super(iterator, table, prefix, type); + RDBStoreCodecBufferIterator( + CheckedFunction itrSupplier, RDBTable table, + byte[] prefix, IteratorType type) throws RocksDatabaseException { + super(itrSupplier, table, prefix, type); final String name = table != null ? table.getName() : null; this.keyBuffer = new Buffer( new CodecBuffer.Capacity(name + "-iterator-key", 1 << 10), - // it has to read key for matching prefix. - getType().readKey() || prefix != null ? buffer -> getRocksDBIterator().get().key(buffer) : null); + getType().readKey() ? buffer -> getRocksDBIterator().get().key(buffer) : null); this.valueBuffer = new Buffer( new CodecBuffer.Capacity(name + "-iterator-value", 4 << 10), getType().readValue() ? buffer -> getRocksDBIterator().get().value(buffer) : null); @@ -51,16 +55,9 @@ void assertOpen() { } @Override - CodecBuffer key() { + KeyValue getKeyValue() { assertOpen(); - return keyBuffer.getFromDb(); - } - - @Override - Table.KeyValue getKeyValue() { - assertOpen(); - final CodecBuffer key = getType().readKey() ? key() : null; - return Table.newKeyValue(key, valueBuffer.getFromDb()); + return Table.newKeyValue(keyBuffer.getFromDb(), valueBuffer.getFromDb()); } @Override @@ -75,24 +72,10 @@ void delete(CodecBuffer key) throws RocksDatabaseException { getRocksDBTable().delete(key.asReadOnlyByteBuffer()); } - @Override - boolean startsWithPrefix(CodecBuffer key) { - assertOpen(); - final CodecBuffer prefix = getPrefix(); - if (prefix == null) { - return true; - } - if (key == null) { - return false; - } - return key.startsWith(prefix); - } - @Override public void close() { if (closed.compareAndSet(false, true)) { super.close(); - Optional.ofNullable(getPrefix()).ifPresent(CodecBuffer::release); keyBuffer.release(); valueBuffer.release(); } diff --git a/hadoop-hdds/framework/src/main/java/org/apache/hadoop/hdds/utils/db/RDBStorePoolBackedCodecBufferIterator.java b/hadoop-hdds/framework/src/main/java/org/apache/hadoop/hdds/utils/db/RDBStorePoolBackedCodecBufferIterator.java new file mode 100644 index 00000000000..8aec0bea7c6 --- /dev/null +++ b/hadoop-hdds/framework/src/main/java/org/apache/hadoop/hdds/utils/db/RDBStorePoolBackedCodecBufferIterator.java @@ -0,0 +1,131 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.hadoop.hdds.utils.db; + +import java.util.concurrent.atomic.AtomicBoolean; +import org.apache.commons.pool2.BasePooledObjectFactory; +import org.apache.commons.pool2.PooledObject; +import org.apache.commons.pool2.impl.DefaultPooledObject; +import org.apache.commons.pool2.impl.GenericObjectPool; +import org.apache.commons.pool2.impl.GenericObjectPoolConfig; +import org.apache.hadoop.hdds.utils.db.Table.CloseableKeyValue; +import org.apache.hadoop.hdds.utils.db.Table.CloseableKeyValueIterator; +import org.apache.hadoop.hdds.utils.db.managed.ManagedReadOptions; +import org.apache.hadoop.hdds.utils.db.managed.ManagedRocksIterator; +import org.apache.ratis.util.function.CheckedFunction; + +/** + * A concrete implementation of {@link RDBStoreAbstractIterator} that provides an iterator + * for {@link CodecBuffer} keys and values managed within a RocksDB store. The iterator + * leverages an object pool to manage reusable {@link CloseableKeyValue} objects, + * enabling efficient memory and resource management by reusing buffers for keys and values + * during iteration. + * + * This iterator supports operations such as seeking to a specific key, retrieving key-value + * pairs from the database, and removing entries from the database, while encapsulating + * all RocksDB-specific logic internally. + */ +class RDBStorePoolBackedCodecBufferIterator extends RDBStoreAbstractIterator> implements CloseableKeyValueIterator { + + private final GenericObjectPool> kvBufferPool; + + RDBStorePoolBackedCodecBufferIterator( + CheckedFunction itrSupplier, RDBTable table, + byte[] prefix, IteratorType type, int maxNumberOfObjectsNeededConcurrently) + throws RocksDatabaseException { + super(itrSupplier, table, prefix, type); + GenericObjectPoolConfig> config = new GenericObjectPoolConfig<>(); + config.setMaxTotal(Math.max(maxNumberOfObjectsNeededConcurrently, 1)); + config.setBlockWhenExhausted(true); + + final String name = table != null ? table.getName() : null; + this.kvBufferPool = new GenericObjectPool<>(new KeyValueBufferFactory(name), config); + seekToFirst(); + } + + private final class KeyValueBufferFactory extends BasePooledObjectFactory> { + + private final String iteratorName; + + private KeyValueBufferFactory(String iteratorName) { + this.iteratorName = iteratorName; + } + + @Override + public CloseableKeyValue create() { + Buffer keyBuffer = new Buffer( + new CodecBuffer.Capacity(iteratorName + "-iterator-key", 1 << 10), + getType().readKey() ? buffer -> getRocksDBIterator().get().key(buffer) : null); + Buffer valueBuffer = new Buffer( + new CodecBuffer.Capacity(iteratorName + "-iterator-value", 4 << 10), + getType().readValue() ? buffer -> getRocksDBIterator().get().value(buffer) : null); + Runnable closeAction = () -> { + keyBuffer.release(); + valueBuffer.release(); + }; + return Table.newCloseableKeyValue(keyBuffer, valueBuffer, closeAction); + } + + @Override + public void destroyObject(PooledObject> p) { + p.getObject().close(); + } + + @Override + public PooledObject> wrap( + CloseableKeyValue bufferBufferCloseableKeyValue) { + return new DefaultPooledObject<>(bufferBufferCloseableKeyValue); + } + } + + @Override + Table.CloseableKeyValue getKeyValue() { + try { + AtomicBoolean closed = new AtomicBoolean(false); + CloseableKeyValue kvBuffers = kvBufferPool.borrowObject(); + return Table.newCloseableKeyValue(kvBuffers.getKey().getFromDb(), kvBuffers.getValue().getFromDb(), + () -> { + // To handle multiple close calls. + if (closed.compareAndSet(false, true)) { + kvBufferPool.returnObject(kvBuffers); + } + }); + } catch (Exception e) { + // Ideally, this should never happen since we have a generic object pool where Buffers would always be created + // and the wait for object borrow never times out. + throw new IllegalStateException("Failed to borrow key/value buffer from pool", e); + } + } + + @Override + void seek0(CodecBuffer key) { + getRocksDBIterator().get().seek(key.asReadOnlyByteBuffer()); + } + + @Override + void delete(CodecBuffer key) throws RocksDatabaseException { + getRocksDBTable().delete(key.asReadOnlyByteBuffer()); + } + + @Override + public void close() { + super.close(); + this.kvBufferPool.close(); + } +} diff --git a/hadoop-hdds/framework/src/main/java/org/apache/hadoop/hdds/utils/db/RDBTable.java b/hadoop-hdds/framework/src/main/java/org/apache/hadoop/hdds/utils/db/RDBTable.java index f732735cbe3..1d4f3d4b6c2 100644 --- a/hadoop-hdds/framework/src/main/java/org/apache/hadoop/hdds/utils/db/RDBTable.java +++ b/hadoop-hdds/framework/src/main/java/org/apache/hadoop/hdds/utils/db/RDBTable.java @@ -200,22 +200,27 @@ public void deleteWithBatch(BatchOperation batch, byte[] key) { } else { throw new IllegalArgumentException("batch should be RDBBatchOperation"); } - } @Override public KeyValueIterator iterator(byte[] prefix, IteratorType type) throws RocksDatabaseException { - return new RDBStoreByteArrayIterator(db.newIterator(family, false), this, - prefix, type); + return new RDBStoreByteArrayIterator(readOptions -> db.newIterator(family, readOptions), + this, prefix, type); } - KeyValueIterator iterator( - CodecBuffer prefix, IteratorType type) throws RocksDatabaseException { - return new RDBStoreCodecBufferIterator(db.newIterator(family, false), + KeyValueIterator newCodecBufferIterator( + byte[] prefix, IteratorType type) throws RocksDatabaseException { + return new RDBStoreCodecBufferIterator(readOptions -> db.newIterator(family, readOptions), this, prefix, type); } + CloseableKeyValueIterator newCloseableCodecBufferIterator( + byte[] prefix, IteratorType type, int numberOfObjectsNeededConcurrently) throws RocksDatabaseException { + return new RDBStorePoolBackedCodecBufferIterator(readOptions -> db.newIterator(family, readOptions), + this, prefix, type, numberOfObjectsNeededConcurrently); + } + @Override public String getName() { return family.getName(); @@ -237,26 +242,13 @@ public void deleteBatchWithPrefix(BatchOperation batch, byte[] prefix) } @Override - public void dumpToFileWithPrefix(File externalFile, byte[] prefix) - throws RocksDatabaseException, CodecException { - CodecBuffer prefixBuffer = prefix == null || prefix.length == 0 ? null : - CodecBufferCodec.get(true).fromPersistedFormat(prefix); - KeyValueIterator iter; - try { - iter = iterator(prefixBuffer, IteratorType.KEY_AND_VALUE); - } catch (RocksDatabaseException e) { - if (prefixBuffer != null) { - prefixBuffer.close(); - } - throw e; - } - try (RDBSstFileWriter fileWriter = new RDBSstFileWriter(externalFile)) { + public void dumpToFileWithPrefix(File externalFile, byte[] prefix) throws RocksDatabaseException { + try (KeyValueIterator iter = newCodecBufferIterator(prefix, IteratorType.KEY_AND_VALUE); + RDBSstFileWriter fileWriter = new RDBSstFileWriter(externalFile)) { while (iter.hasNext()) { final KeyValue entry = iter.next(); fileWriter.put(entry.getKey(), entry.getValue()); } - } finally { - iter.close(); } } diff --git a/hadoop-hdds/framework/src/main/java/org/apache/hadoop/hdds/utils/db/RocksDatabase.java b/hadoop-hdds/framework/src/main/java/org/apache/hadoop/hdds/utils/db/RocksDatabase.java index 659954a861b..5abc933d54b 100644 --- a/hadoop-hdds/framework/src/main/java/org/apache/hadoop/hdds/utils/db/RocksDatabase.java +++ b/hadoop-hdds/framework/src/main/java/org/apache/hadoop/hdds/utils/db/RocksDatabase.java @@ -790,10 +790,8 @@ public ManagedRocksIterator newIterator(ColumnFamily family) } public ManagedRocksIterator newIterator(ColumnFamily family, - boolean fillCache) throws RocksDatabaseException { - try (UncheckedAutoCloseable ignored = acquire(); - ManagedReadOptions readOptions = new ManagedReadOptions()) { - readOptions.setFillCache(fillCache); + ManagedReadOptions readOptions) throws RocksDatabaseException { + try (UncheckedAutoCloseable ignored = acquire()) { return managed(db.get().newIterator(family.getHandle(), readOptions)); } } diff --git a/hadoop-hdds/framework/src/main/java/org/apache/hadoop/hdds/utils/db/Table.java b/hadoop-hdds/framework/src/main/java/org/apache/hadoop/hdds/utils/db/Table.java index fc049034406..9be0cd19de2 100644 --- a/hadoop-hdds/framework/src/main/java/org/apache/hadoop/hdds/utils/db/Table.java +++ b/hadoop-hdds/framework/src/main/java/org/apache/hadoop/hdds/utils/db/Table.java @@ -332,7 +332,7 @@ default List> getRangeKVs(KEY startKey, int count, KEY pref /** * Class used to represent the key and value pair of a db entry. */ - final class KeyValue { + class KeyValue { private final K key; private final V value; private final int valueByteSize; @@ -379,6 +379,20 @@ public int hashCode() { } } + /** + * Class used to represent the key and value pair of a db entry which also supports closeable function to cleanup + * resources associated with the entry. + */ + abstract class CloseableKeyValue extends KeyValue implements AutoCloseable { + + private CloseableKeyValue(K key, V value, int valueByteSize) { + super(key, value, valueByteSize); + } + + @Override + public abstract void close(); + } + static KeyValue newKeyValue(K key, V value) { return newKeyValue(key, value, -1); } @@ -387,9 +401,27 @@ static KeyValue newKeyValue(K key, V value, int valueByteSize) { return new KeyValue<>(key, value, valueByteSize); } + static CloseableKeyValue newCloseableKeyValue(K key, V value, int valueByteSize, Runnable closeAction) { + return new CloseableKeyValue(key, value, valueByteSize) { + @Override + public void close() { + closeAction.run(); + } + }; + } + + static CloseableKeyValue newCloseableKeyValue(K key, V value, Runnable closeAction) { + return newCloseableKeyValue(key, value, -1, closeAction); + } + /** A {@link TableIterator} to iterate {@link KeyValue}s. */ interface KeyValueIterator extends TableIterator> { } + + /** A {@link TableIterator} to iterate {@link CloseableKeyValue}s. */ + interface CloseableKeyValueIterator extends TableIterator> { + + } } diff --git a/hadoop-hdds/framework/src/main/java/org/apache/hadoop/hdds/utils/db/TypedTable.java b/hadoop-hdds/framework/src/main/java/org/apache/hadoop/hdds/utils/db/TypedTable.java index 8000d48c618..6f8d859d98d 100644 --- a/hadoop-hdds/framework/src/main/java/org/apache/hadoop/hdds/utils/db/TypedTable.java +++ b/hadoop-hdds/framework/src/main/java/org/apache/hadoop/hdds/utils/db/TypedTable.java @@ -388,10 +388,10 @@ public void deleteRange(KEY beginKey, KEY endKey) throws RocksDatabaseException, @Override public KeyValueIterator iterator(KEY prefix, IteratorType type) throws RocksDatabaseException, CodecException { + final byte[] prefixBytes = encodeKey(prefix); if (supportCodecBuffer) { - return newCodecBufferTableIterator(prefix, type); + return newCodecBufferTableIterator(rawTable.newCodecBufferIterator(prefixBytes, type)); } else { - final byte[] prefixBytes = encodeKey(prefix); return new TypedTableIterator(rawTable.iterator(prefixBytes, type)); } } @@ -481,27 +481,6 @@ TableCache getCache() { return cache; } - private RawIterator newCodecBufferTableIterator(KEY prefix, IteratorType type) - throws RocksDatabaseException, CodecException { - final CodecBuffer encoded = encodeKeyCodecBuffer(prefix); - final CodecBuffer prefixBuffer; - if (encoded != null && encoded.readableBytes() == 0) { - encoded.release(); - prefixBuffer = null; - } else { - prefixBuffer = encoded; - } - - try { - return newCodecBufferTableIterator(rawTable.iterator(prefixBuffer, type)); - } catch (Throwable t) { - if (prefixBuffer != null) { - prefixBuffer.release(); - } - throw t; - } - } - private RawIterator newCodecBufferTableIterator(KeyValueIterator i) { return new RawIterator(i) { @Override diff --git a/hadoop-hdds/framework/src/test/java/org/apache/hadoop/hdds/utils/db/TestRDBStoreAbstractIterator.java b/hadoop-hdds/framework/src/test/java/org/apache/hadoop/hdds/utils/db/TestRDBStoreAbstractIterator.java new file mode 100644 index 00000000000..2b9f806374e --- /dev/null +++ b/hadoop-hdds/framework/src/test/java/org/apache/hadoop/hdds/utils/db/TestRDBStoreAbstractIterator.java @@ -0,0 +1,168 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.hadoop.hdds.utils.db; + +import static org.junit.jupiter.api.Assertions.assertFalse; +import static org.junit.jupiter.api.Assertions.assertTrue; +import static org.mockito.Mockito.clearInvocations; +import static org.mockito.Mockito.mock; +import static org.mockito.Mockito.times; +import static org.mockito.Mockito.verify; +import static org.mockito.Mockito.when; + +import java.util.Arrays; +import java.util.Optional; +import java.util.stream.Stream; +import org.apache.commons.lang3.RandomUtils; +import org.apache.hadoop.hdds.utils.db.managed.ManagedReadOptions; +import org.apache.hadoop.hdds.utils.db.managed.ManagedRocksIterator; +import org.apache.hadoop.hdds.utils.db.managed.ManagedRocksObjectUtils; +import org.apache.hadoop.hdds.utils.db.managed.ManagedSlice; +import org.apache.log4j.Level; +import org.apache.log4j.Logger; +import org.apache.ratis.util.function.CheckedFunction; +import org.junit.jupiter.api.Assertions; +import org.junit.jupiter.api.BeforeAll; +import org.junit.jupiter.api.BeforeEach; +import org.junit.jupiter.api.TestInstance; +import org.junit.jupiter.params.ParameterizedTest; +import org.junit.jupiter.params.provider.Arguments; +import org.junit.jupiter.params.provider.MethodSource; +import org.rocksdb.RocksIterator; + +/** + * Abstract class for testing RDBStoreAbstractIterator. + */ +@TestInstance(TestInstance.Lifecycle.PER_CLASS) +public abstract class TestRDBStoreAbstractIterator> { + + private RocksIterator rocksDBIteratorMock; + private CheckedFunction itrInitializer; + private RDBTable rocksTableMock; + private ManagedReadOptions readOptions; + + public RDBTable getRocksTableMock() { + return rocksTableMock; + } + + public RocksIterator getRocksDBIteratorMock() { + return rocksDBIteratorMock; + } + + public CheckedFunction getItrInitializer() { + return itrInitializer; + } + + @BeforeAll + public static void init() { + ManagedRocksObjectUtils.loadRocksDBLibrary(); + } + + @BeforeEach + public void setup() { + rocksDBIteratorMock = mock(RocksIterator.class); + itrInitializer = readOpts -> { + this.readOptions = readOpts; + return new ManagedRocksIterator(rocksDBIteratorMock); + }; + rocksTableMock = mock(RDBTable.class); + Logger.getLogger(ManagedRocksObjectUtils.class).setLevel(Level.DEBUG); + } + + abstract T newIterator() throws RocksDatabaseException; + + abstract T newIterator(byte[] prefix) throws RocksDatabaseException; + + public static Stream prefixTestArgumentsProvider() { + byte[] randomBytes = RandomUtils.nextBytes(100); + return Stream.of( + Arguments.of("Empty prefix", new byte[0], null), + Arguments.of("null prefix", null, null), + Arguments.of("Prefix with 0", getArrayFilledWithValue(0, 1), getArrayFilledWithValue(1, 1)), + Arguments.of("Prefix with 0 with 100 times", getArrayFilledWithValue(0, 100), + getLastByteIncreased(getArrayFilledWithValue(0, 100))), + Arguments.of("Prefix with 0xFF", getArrayFilledWithValue(0xFF, 1), null), + Arguments.of("Prefix with 0xFF with 100 times", getArrayFilledWithValue(0xFF, 100), null), + Arguments.of("Prefix with random bytes", randomBytes, + getLastByteIncreased(Arrays.copyOf(randomBytes, randomBytes.length))), + Arguments.of("Prefix with 0xFF prefixed 100 times and 0 at end", + getArrayFilledWithValueAndPrefix(0, 1, getArrayFilledWithValue(0xFF, 100)), + getArrayFilledWithValueAndPrefix(1, 1, getArrayFilledWithValue(0xFF, 100))), + Arguments.of("Prefix with 0xFF prefixed 100 times and 50 at end", + getArrayFilledWithValueAndPrefix(50, 1, getArrayFilledWithValue(0xFF, 100)), + getArrayFilledWithValueAndPrefix(51, 1, getArrayFilledWithValue(0xFF, 100))), + Arguments.of("Prefix with 0xFF prefixed 100 times and value 0xFE at end", + getArrayFilledWithValueAndPrefix(0xFE, 1, getArrayFilledWithValue(0xFF, 100)), + getArrayFilledWithValue(0xFF, 101)), + Arguments.of("Prefix with 0 prefix 100 times and 100 0xFF at end", + getArrayFilledWithValueAndPrefix(0xFF, 100, getArrayFilledWithValue(0, 100)), + getLastByteIncreased(getArrayFilledWithValue(0, 100))), + Arguments.of("Prefix with 50 prefix 100 times and 100 0xFF at end", + getArrayFilledWithValueAndPrefix(0xFF, 100, getArrayFilledWithValue(50, 100)), + getLastByteIncreased(getArrayFilledWithValue(50, 100))), + Arguments.of("Prefix with 0xFE prefix 100 times and 100 0xFF at end", + getArrayFilledWithValueAndPrefix(0xFF, 100, getArrayFilledWithValue(0xFE, 100)), + getLastByteIncreased(getArrayFilledWithValue(0xFE, 100))) + ); + } + + private static byte[] getArrayFilledWithValue(int value, int length) { + return getArrayFilledWithValueAndPrefix(value, length, null); + } + + private static byte[] getArrayFilledWithValueAndPrefix(int value, int length, byte[] prefix) { + byte[] array = new byte[length + (prefix == null ? 0 : prefix.length)]; + if (prefix != null) { + System.arraycopy(prefix, 0, array, 0, prefix.length); + } + Arrays.fill(array, prefix == null ? 0 : prefix.length, array.length, (byte) value); + return array; + } + + private static byte[] getLastByteIncreased(byte[] arr) { + arr[arr.length - 1]++; + return arr; + } + + @ParameterizedTest(name = "{0}") + @MethodSource("prefixTestArgumentsProvider") + public void testNormalPrefixedIterator(String name, byte[] prefix, byte[] expectedUpperBound) throws Exception { + try (T itr = newIterator(prefix); + ManagedSlice lowerBound = prefix == null ? null : new ManagedSlice(prefix); + ManagedSlice upperBound = expectedUpperBound == null ? null : new ManagedSlice(expectedUpperBound)) { + Assertions.assertEquals(this.readOptions.getLowerBound(), lowerBound); + Assertions.assertEquals(this.readOptions.getUpperBound(), upperBound); + verify(rocksDBIteratorMock, times(1)).seekToFirst(); + clearInvocations(rocksDBIteratorMock); + itr.seekToFirst(); + verify(rocksDBIteratorMock, times(1)).seekToFirst(); + clearInvocations(rocksDBIteratorMock); + + when(rocksDBIteratorMock.isValid()).thenReturn(true); + when(rocksDBIteratorMock.key()).thenReturn(prefix); + assertTrue(itr.hasNext()); + verify(rocksDBIteratorMock, times(1)).isValid(); + verify(rocksDBIteratorMock, times(0)).key(); + itr.seekToLast(); + verify(rocksDBIteratorMock, times(1)).seekToLast(); + } + assertFalse(readOptions.isOwningHandle()); + assertFalse(Optional.ofNullable(readOptions.getLowerBound()).map(ManagedSlice::isOwningHandle).orElse(false)); + assertFalse(Optional.ofNullable(readOptions.getUpperBound()).map(ManagedSlice::isOwningHandle).orElse(false)); + } +} diff --git a/hadoop-hdds/framework/src/test/java/org/apache/hadoop/hdds/utils/db/TestRDBStoreByteArrayIterator.java b/hadoop-hdds/framework/src/test/java/org/apache/hadoop/hdds/utils/db/TestRDBStoreByteArrayIterator.java index 136990522b7..8eb28842892 100644 --- a/hadoop-hdds/framework/src/test/java/org/apache/hadoop/hdds/utils/db/TestRDBStoreByteArrayIterator.java +++ b/hadoop-hdds/framework/src/test/java/org/apache/hadoop/hdds/utils/db/TestRDBStoreByteArrayIterator.java @@ -23,7 +23,6 @@ import static org.apache.hadoop.hdds.utils.db.IteratorType.VALUE_ONLY; import static org.junit.jupiter.api.Assertions.assertArrayEquals; import static org.junit.jupiter.api.Assertions.assertFalse; -import static org.junit.jupiter.api.Assertions.assertInstanceOf; import static org.junit.jupiter.api.Assertions.assertThrows; import static org.junit.jupiter.api.Assertions.assertTrue; import static org.mockito.ArgumentCaptor.forClass; @@ -37,18 +36,11 @@ import static org.mockito.Mockito.when; import java.io.IOException; -import java.nio.charset.StandardCharsets; import java.util.NoSuchElementException; import java.util.function.Consumer; -import org.apache.hadoop.hdds.utils.db.managed.ManagedRocksIterator; -import org.apache.hadoop.hdds.utils.db.managed.ManagedRocksObjectUtils; -import org.apache.log4j.Level; -import org.apache.log4j.Logger; -import org.junit.jupiter.api.BeforeEach; import org.junit.jupiter.api.Test; import org.mockito.ArgumentCaptor; import org.mockito.InOrder; -import org.rocksdb.RocksIterator; /** * This test prescribe expected behaviour @@ -57,38 +49,28 @@ * RDBStoreIterator to provide iteration over table elements in a typed manner. * The tests are to ensure we access RocksDB via the iterator properly. */ -public class TestRDBStoreByteArrayIterator { - - private RocksIterator rocksDBIteratorMock; - private ManagedRocksIterator managedRocksIterator; - private RDBTable rocksTableMock; - - @BeforeEach - public void setup() { - rocksDBIteratorMock = mock(RocksIterator.class); - managedRocksIterator = new ManagedRocksIterator(rocksDBIteratorMock); - rocksTableMock = mock(RDBTable.class); - Logger.getLogger(ManagedRocksObjectUtils.class).setLevel(Level.DEBUG); - } +public class TestRDBStoreByteArrayIterator extends TestRDBStoreAbstractIterator { - RDBStoreByteArrayIterator newIterator() { - return new RDBStoreByteArrayIterator(managedRocksIterator, null, null, KEY_AND_VALUE); + @Override + RDBStoreByteArrayIterator newIterator() throws RocksDatabaseException { + return new RDBStoreByteArrayIterator(getItrInitializer(), null, null, KEY_AND_VALUE); } - RDBStoreByteArrayIterator newIterator(byte[] prefix) { - return new RDBStoreByteArrayIterator(managedRocksIterator, rocksTableMock, prefix, KEY_AND_VALUE); + @Override + RDBStoreByteArrayIterator newIterator(byte[] prefix) throws RocksDatabaseException { + return new RDBStoreByteArrayIterator(getItrInitializer(), getRocksTableMock(), prefix, KEY_AND_VALUE); } @Test - public void testForeachRemainingCallsConsumerWithAllElements() { - when(rocksDBIteratorMock.isValid()) - .thenReturn(true, true, true, true, true, true, true, false); - when(rocksDBIteratorMock.key()) - .thenReturn(new byte[]{0x00}, new byte[]{0x00}, new byte[]{0x01}, + public void testForeachRemainingCallsConsumerWithAllElements() throws RocksDatabaseException { + when(getRocksDBIteratorMock().isValid()) + .thenReturn(true, true, true, true, true, true, false); + when(getRocksDBIteratorMock().key()) + .thenReturn(new byte[]{0x00}, new byte[]{0x01}, new byte[]{0x02}) .thenThrow(new NoSuchElementException()); - when(rocksDBIteratorMock.value()) - .thenReturn(new byte[]{0x7f}, new byte[]{0x7f}, new byte[]{0x7e}, + when(getRocksDBIteratorMock().value()) + .thenReturn(new byte[]{0x7f}, new byte[]{0x7e}, new byte[]{0x7d}) .thenThrow(new NoSuchElementException()); @@ -115,8 +97,8 @@ public void testForeachRemainingCallsConsumerWithAllElements() { } @Test - public void testHasNextDependsOnIsvalid() { - when(rocksDBIteratorMock.isValid()).thenReturn(true, true, false); + public void testHasNextDependsOnIsvalid() throws RocksDatabaseException { + when(getRocksDBIteratorMock().isValid()).thenReturn(true, false); RDBStoreByteArrayIterator iter = newIterator(); @@ -125,70 +107,70 @@ public void testHasNextDependsOnIsvalid() { } @Test - public void testNextCallsIsValidThenGetsTheValueAndStepsToNext() { - when(rocksDBIteratorMock.isValid()).thenReturn(true); + public void testNextCallsIsValidThenGetsTheValueAndStepsToNext() throws RocksDatabaseException { + when(getRocksDBIteratorMock().isValid()).thenReturn(true); RDBStoreByteArrayIterator iter = newIterator(); - InOrder verifier = inOrder(rocksDBIteratorMock); + InOrder verifier = inOrder(getRocksDBIteratorMock()); iter.next(); - verifier.verify(rocksDBIteratorMock).isValid(); - verifier.verify(rocksDBIteratorMock).key(); - verifier.verify(rocksDBIteratorMock).value(); - verifier.verify(rocksDBIteratorMock).next(); + verifier.verify(getRocksDBIteratorMock()).isValid(); + verifier.verify(getRocksDBIteratorMock()).key(); + verifier.verify(getRocksDBIteratorMock()).value(); + verifier.verify(getRocksDBIteratorMock()).next(); } @Test - public void testConstructorSeeksToFirstElement() { + public void testConstructorSeeksToFirstElement() throws RocksDatabaseException { newIterator(); - verify(rocksDBIteratorMock, times(1)).seekToFirst(); + verify(getRocksDBIteratorMock(), times(1)).seekToFirst(); } @Test - public void testSeekToFirstSeeks() { + public void testSeekToFirstSeeks() throws RocksDatabaseException { RDBStoreByteArrayIterator iter = newIterator(); iter.seekToFirst(); - verify(rocksDBIteratorMock, times(2)).seekToFirst(); + verify(getRocksDBIteratorMock(), times(2)).seekToFirst(); } @Test - public void testSeekToLastSeeks() { + public void testSeekToLastSeeks() throws RocksDatabaseException { RDBStoreByteArrayIterator iter = newIterator(); iter.seekToLast(); - verify(rocksDBIteratorMock, times(1)).seekToLast(); + verify(getRocksDBIteratorMock(), times(1)).seekToLast(); } @Test public void testSeekReturnsTheActualKey() throws Exception { - when(rocksDBIteratorMock.isValid()).thenReturn(true); - when(rocksDBIteratorMock.key()).thenReturn(new byte[]{0x00}); - when(rocksDBIteratorMock.value()).thenReturn(new byte[]{0x7f}); + when(getRocksDBIteratorMock().isValid()).thenReturn(true); + when(getRocksDBIteratorMock().key()).thenReturn(new byte[]{0x00}); + when(getRocksDBIteratorMock().value()).thenReturn(new byte[]{0x7f}); RDBStoreByteArrayIterator iter = newIterator(); final Table.KeyValue val = iter.seek(new byte[]{0x55}); - InOrder verifier = inOrder(rocksDBIteratorMock); + InOrder verifier = inOrder(getRocksDBIteratorMock()); - verify(rocksDBIteratorMock, times(1)).seekToFirst(); //at construct time - verify(rocksDBIteratorMock, never()).seekToLast(); - verifier.verify(rocksDBIteratorMock, times(1)).seek(any(byte[].class)); - verifier.verify(rocksDBIteratorMock, times(1)).isValid(); - verifier.verify(rocksDBIteratorMock, times(1)).key(); - verifier.verify(rocksDBIteratorMock, times(1)).value(); + verify(getRocksDBIteratorMock(), times(1)).seekToFirst(); //at construct time + verify(getRocksDBIteratorMock(), never()).seekToLast(); + verifier.verify(getRocksDBIteratorMock(), times(1)).seek(any(byte[].class)); + verifier.verify(getRocksDBIteratorMock(), times(1)).isValid(); + verifier.verify(getRocksDBIteratorMock(), times(1)).key(); + verifier.verify(getRocksDBIteratorMock(), times(1)).value(); assertArrayEquals(new byte[]{0x00}, val.getKey()); assertArrayEquals(new byte[]{0x7f}, val.getValue()); } @Test public void testGettingTheKeyIfIteratorIsValid() throws Exception { - when(rocksDBIteratorMock.isValid()).thenReturn(true); - when(rocksDBIteratorMock.key()).thenReturn(new byte[]{0x00}); + when(getRocksDBIteratorMock().isValid()).thenReturn(true); + when(getRocksDBIteratorMock().key()).thenReturn(new byte[]{0x00}); RDBStoreByteArrayIterator iter = newIterator(); byte[] key = null; @@ -197,18 +179,18 @@ public void testGettingTheKeyIfIteratorIsValid() throws Exception { key = entry.getKey(); } - InOrder verifier = inOrder(rocksDBIteratorMock); + InOrder verifier = inOrder(getRocksDBIteratorMock()); - verifier.verify(rocksDBIteratorMock, times(1)).isValid(); - verifier.verify(rocksDBIteratorMock, times(1)).key(); + verifier.verify(getRocksDBIteratorMock(), times(2)).isValid(); + verifier.verify(getRocksDBIteratorMock(), times(1)).key(); assertArrayEquals(new byte[]{0x00}, key); } @Test public void testGettingTheValueIfIteratorIsValid() throws Exception { - when(rocksDBIteratorMock.isValid()).thenReturn(true); - when(rocksDBIteratorMock.key()).thenReturn(new byte[]{0x00}); - when(rocksDBIteratorMock.value()).thenReturn(new byte[]{0x7f}); + when(getRocksDBIteratorMock().isValid()).thenReturn(true); + when(getRocksDBIteratorMock().key()).thenReturn(new byte[]{0x00}); + when(getRocksDBIteratorMock().value()).thenReturn(new byte[]{0x7f}); RDBStoreByteArrayIterator iter = newIterator(); Table.KeyValue entry; @@ -220,10 +202,10 @@ public void testGettingTheValueIfIteratorIsValid() throws Exception { value = entry.getValue(); } - InOrder verifier = inOrder(rocksDBIteratorMock); + InOrder verifier = inOrder(getRocksDBIteratorMock()); - verifier.verify(rocksDBIteratorMock, times(1)).isValid(); - verifier.verify(rocksDBIteratorMock, times(1)).key(); + verifier.verify(getRocksDBIteratorMock(), times(2)).isValid(); + verifier.verify(getRocksDBIteratorMock(), times(1)).key(); assertArrayEquals(new byte[]{0x00}, key); assertArrayEquals(new byte[]{0x7f}, value); } @@ -231,20 +213,21 @@ public void testGettingTheValueIfIteratorIsValid() throws Exception { @Test public void testRemovingFromDBActuallyDeletesFromTable() throws Exception { byte[] testKey = new byte[]{0x00}; - when(rocksDBIteratorMock.isValid()).thenReturn(true); - when(rocksDBIteratorMock.key()).thenReturn(testKey); + when(getRocksDBIteratorMock().isValid()).thenReturn(true); + when(getRocksDBIteratorMock().key()).thenReturn(testKey); RDBStoreByteArrayIterator iter = newIterator(null); + iter.next(); iter.removeFromDB(); - InOrder verifier = inOrder(rocksDBIteratorMock, rocksTableMock); + InOrder verifier = inOrder(getRocksDBIteratorMock(), getRocksTableMock()); - verifier.verify(rocksDBIteratorMock, times(1)).isValid(); - verifier.verify(rocksTableMock, times(1)).delete(testKey); + verifier.verify(getRocksDBIteratorMock(), times(1)).isValid(); + verifier.verify(getRocksTableMock(), times(1)).delete(testKey); } @Test - public void testRemoveFromDBWithoutDBTableSet() { + public void testRemoveFromDBWithoutDBTableSet() throws RocksDatabaseException { RDBStoreByteArrayIterator iter = newIterator(); assertThrows(UnsupportedOperationException.class, iter::removeFromDB); @@ -255,49 +238,26 @@ public void testCloseCloses() throws Exception { RDBStoreByteArrayIterator iter = newIterator(); iter.close(); - verify(rocksDBIteratorMock, times(1)).close(); + verify(getRocksDBIteratorMock(), times(1)).close(); } @Test public void testNullPrefixedIterator() throws IOException { RDBStoreByteArrayIterator iter = newIterator(null); - verify(rocksDBIteratorMock, times(1)).seekToFirst(); - clearInvocations(rocksDBIteratorMock); + verify(getRocksDBIteratorMock(), times(1)).seekToFirst(); + clearInvocations(getRocksDBIteratorMock()); iter.seekToFirst(); - verify(rocksDBIteratorMock, times(1)).seekToFirst(); - clearInvocations(rocksDBIteratorMock); + verify(getRocksDBIteratorMock(), times(1)).seekToFirst(); + clearInvocations(getRocksDBIteratorMock()); - when(rocksDBIteratorMock.isValid()).thenReturn(true); + when(getRocksDBIteratorMock().isValid()).thenReturn(true); assertTrue(iter.hasNext()); - verify(rocksDBIteratorMock, times(1)).isValid(); - verify(rocksDBIteratorMock, times(0)).key(); + verify(getRocksDBIteratorMock(), times(1)).isValid(); + verify(getRocksDBIteratorMock(), times(0)).key(); iter.seekToLast(); - verify(rocksDBIteratorMock, times(1)).seekToLast(); - - iter.close(); - } - - @Test - public void testNormalPrefixedIterator() throws IOException { - byte[] testPrefix = "sample".getBytes(StandardCharsets.UTF_8); - RDBStoreByteArrayIterator iter = newIterator(testPrefix); - verify(rocksDBIteratorMock, times(1)).seek(testPrefix); - clearInvocations(rocksDBIteratorMock); - - iter.seekToFirst(); - verify(rocksDBIteratorMock, times(1)).seek(testPrefix); - clearInvocations(rocksDBIteratorMock); - - when(rocksDBIteratorMock.isValid()).thenReturn(true); - when(rocksDBIteratorMock.key()).thenReturn(testPrefix); - assertTrue(iter.hasNext()); - verify(rocksDBIteratorMock, times(1)).isValid(); - verify(rocksDBIteratorMock, times(1)).key(); - Exception e = - assertThrows(Exception.class, () -> iter.seekToLast(), "Prefixed iterator does not support seekToLast"); - assertInstanceOf(UnsupportedOperationException.class, e); + verify(getRocksDBIteratorMock(), times(1)).seekToLast(); iter.close(); } diff --git a/hadoop-hdds/framework/src/test/java/org/apache/hadoop/hdds/utils/db/TestRDBStoreCodecBufferIterator.java b/hadoop-hdds/framework/src/test/java/org/apache/hadoop/hdds/utils/db/TestRDBStoreCodecBufferIterator.java index 919b3b6cdad..13e68f898d6 100644 --- a/hadoop-hdds/framework/src/test/java/org/apache/hadoop/hdds/utils/db/TestRDBStoreCodecBufferIterator.java +++ b/hadoop-hdds/framework/src/test/java/org/apache/hadoop/hdds/utils/db/TestRDBStoreCodecBufferIterator.java @@ -20,20 +20,17 @@ import static org.apache.hadoop.hdds.utils.db.IteratorType.KEY_AND_VALUE; import static org.junit.jupiter.api.Assertions.assertArrayEquals; import static org.junit.jupiter.api.Assertions.assertFalse; -import static org.junit.jupiter.api.Assertions.assertInstanceOf; import static org.junit.jupiter.api.Assertions.assertThrows; import static org.junit.jupiter.api.Assertions.assertTrue; import static org.mockito.Mockito.any; import static org.mockito.Mockito.clearInvocations; import static org.mockito.Mockito.inOrder; -import static org.mockito.Mockito.mock; import static org.mockito.Mockito.never; import static org.mockito.Mockito.times; import static org.mockito.Mockito.verify; import static org.mockito.Mockito.when; import java.nio.ByteBuffer; -import java.nio.charset.StandardCharsets; import java.util.ArrayList; import java.util.List; import java.util.NoSuchElementException; @@ -41,43 +38,40 @@ import org.apache.hadoop.hdds.StringUtils; import org.apache.hadoop.hdds.utils.db.managed.ManagedRocksIterator; import org.apache.hadoop.hdds.utils.db.managed.ManagedRocksObjectUtils; -import org.apache.log4j.Level; -import org.apache.log4j.Logger; +import org.junit.jupiter.api.BeforeAll; import org.junit.jupiter.api.BeforeEach; import org.junit.jupiter.api.Test; import org.mockito.InOrder; import org.mockito.stubbing.Answer; -import org.rocksdb.RocksIterator; /** * This test is similar to {@link TestRDBStoreByteArrayIterator} * except that this test is for {@link RDBStoreCodecBufferIterator}. */ -public class TestRDBStoreCodecBufferIterator { +public class TestRDBStoreCodecBufferIterator extends TestRDBStoreAbstractIterator { - private RocksIterator rocksIteratorMock; - private ManagedRocksIterator managedRocksIterator; - private RDBTable rdbTableMock; + @BeforeAll + public static void init() { + ManagedRocksObjectUtils.loadRocksDBLibrary(); + } @BeforeEach - public void setup() { + public void setupTest() { CodecBuffer.enableLeakDetection(); - rocksIteratorMock = mock(RocksIterator.class); - managedRocksIterator = newManagedRocksIterator(); - rdbTableMock = mock(RDBTable.class); - Logger.getLogger(ManagedRocksObjectUtils.class).setLevel(Level.DEBUG); } ManagedRocksIterator newManagedRocksIterator() { - return new ManagedRocksIterator(rocksIteratorMock); + return new ManagedRocksIterator(getRocksDBIteratorMock()); } - RDBStoreCodecBufferIterator newIterator() { - return new RDBStoreCodecBufferIterator(managedRocksIterator, null, null, KEY_AND_VALUE); + @Override + RDBStoreCodecBufferIterator newIterator() throws RocksDatabaseException { + return new RDBStoreCodecBufferIterator(getItrInitializer(), null, null, KEY_AND_VALUE); } - RDBStoreCodecBufferIterator newIterator(CodecBuffer prefix) { - return new RDBStoreCodecBufferIterator(managedRocksIterator, rdbTableMock, prefix, KEY_AND_VALUE); + @Override + RDBStoreCodecBufferIterator newIterator(byte[] prefix) throws RocksDatabaseException { + return new RDBStoreCodecBufferIterator(getItrInitializer(), getRocksTableMock(), prefix, KEY_AND_VALUE); } Answer newAnswerInt(String name, int b) { @@ -98,16 +92,14 @@ Answer newAnswer(String name, byte... b) { @Test public void testForEachRemaining() throws Exception { - when(rocksIteratorMock.isValid()) - .thenReturn(true, true, true, true, true, true, true, false); - when(rocksIteratorMock.key(any())) - .then(newAnswerInt("key1", 0x00)) + when(getRocksDBIteratorMock().isValid()) + .thenReturn(true, true, true, true, true, true, false); + when(getRocksDBIteratorMock().key(any())) .then(newAnswerInt("key2", 0x00)) .then(newAnswerInt("key3", 0x01)) .then(newAnswerInt("key4", 0x02)) .thenThrow(new NoSuchElementException()); - when(rocksIteratorMock.value(any())) - .then(newAnswerInt("val1", 0x7f)) + when(getRocksDBIteratorMock().value(any())) .then(newAnswerInt("val2", 0x7f)) .then(newAnswerInt("val3", 0x7e)) .then(newAnswerInt("val4", 0x7d)) @@ -132,7 +124,7 @@ public void testForEachRemaining() throws Exception { @Test public void testHasNextDependsOnIsvalid() throws Exception { - when(rocksIteratorMock.isValid()).thenReturn(true, true, false); + when(getRocksDBIteratorMock().isValid()).thenReturn(true, false); try (RDBStoreCodecBufferIterator i = newIterator()) { assertTrue(i.hasNext()); @@ -145,16 +137,16 @@ public void testHasNextDependsOnIsvalid() throws Exception { @Test public void testNextCallsIsValidThenGetsTheValueAndStepsToNext() throws Exception { - when(rocksIteratorMock.isValid()).thenReturn(true); - InOrder verifier = inOrder(rocksIteratorMock); + when(getRocksDBIteratorMock().isValid()).thenReturn(true); + InOrder verifier = inOrder(getRocksDBIteratorMock()); try (RDBStoreCodecBufferIterator i = newIterator()) { i.next(); } - verifier.verify(rocksIteratorMock).isValid(); - verifier.verify(rocksIteratorMock).key(any()); - verifier.verify(rocksIteratorMock).value(any()); - verifier.verify(rocksIteratorMock).next(); + verifier.verify(getRocksDBIteratorMock()).isValid(); + verifier.verify(getRocksDBIteratorMock()).key(any()); + verifier.verify(getRocksDBIteratorMock()).value(any()); + verifier.verify(getRocksDBIteratorMock()).next(); CodecTestUtil.gc(); } @@ -163,7 +155,7 @@ public void testNextCallsIsValidThenGetsTheValueAndStepsToNext() public void testConstructorSeeksToFirstElement() throws Exception { newIterator().close(); - verify(rocksIteratorMock, times(1)).seekToFirst(); + verify(getRocksDBIteratorMock(), times(1)).seekToFirst(); CodecTestUtil.gc(); } @@ -173,7 +165,7 @@ public void testSeekToFirstSeeks() throws Exception { try (RDBStoreCodecBufferIterator i = newIterator()) { i.seekToFirst(); } - verify(rocksIteratorMock, times(2)).seekToFirst(); + verify(getRocksDBIteratorMock(), times(2)).seekToFirst(); CodecTestUtil.gc(); } @@ -184,32 +176,32 @@ public void testSeekToLastSeeks() throws Exception { i.seekToLast(); } - verify(rocksIteratorMock, times(1)).seekToLast(); + verify(getRocksDBIteratorMock(), times(1)).seekToLast(); CodecTestUtil.gc(); } @Test public void testSeekReturnsTheActualKey() throws Exception { - when(rocksIteratorMock.isValid()).thenReturn(true); - when(rocksIteratorMock.key(any())) + when(getRocksDBIteratorMock().isValid()).thenReturn(true); + when(getRocksDBIteratorMock().key(any())) .then(newAnswerInt("key1", 0x00)); - when(rocksIteratorMock.value(any())) + when(getRocksDBIteratorMock().value(any())) .then(newAnswerInt("val1", 0x7f)); try (RDBStoreCodecBufferIterator i = newIterator(); CodecBuffer target = CodecBuffer.wrap(new byte[]{0x55})) { final Table.KeyValue val = i.seek(target); - InOrder verifier = inOrder(rocksIteratorMock); + InOrder verifier = inOrder(getRocksDBIteratorMock()); - verify(rocksIteratorMock, times(1)).seekToFirst(); //at construct time - verify(rocksIteratorMock, never()).seekToLast(); - verifier.verify(rocksIteratorMock, times(1)) + verify(getRocksDBIteratorMock(), times(1)).seekToFirst(); //at construct time + verify(getRocksDBIteratorMock(), never()).seekToLast(); + verifier.verify(getRocksDBIteratorMock(), times(1)) .seek(any(ByteBuffer.class)); - verifier.verify(rocksIteratorMock, times(1)).isValid(); - verifier.verify(rocksIteratorMock, times(1)).key(any()); - verifier.verify(rocksIteratorMock, times(1)).value(any()); + verifier.verify(getRocksDBIteratorMock(), times(1)).isValid(); + verifier.verify(getRocksDBIteratorMock(), times(1)).key(any()); + verifier.verify(getRocksDBIteratorMock(), times(1)).value(any()); assertArrayEquals(new byte[]{0x00}, val.getKey().getArray()); assertArrayEquals(new byte[]{0x7f}, val.getValue().getArray()); } @@ -219,8 +211,8 @@ public void testSeekReturnsTheActualKey() throws Exception { @Test public void testGettingTheKeyIfIteratorIsValid() throws Exception { - when(rocksIteratorMock.isValid()).thenReturn(true); - when(rocksIteratorMock.key(any())) + when(getRocksDBIteratorMock().isValid()).thenReturn(true); + when(getRocksDBIteratorMock().key(any())) .then(newAnswerInt("key1", 0x00)); byte[] key = null; @@ -230,10 +222,10 @@ public void testGettingTheKeyIfIteratorIsValid() throws Exception { } } - InOrder verifier = inOrder(rocksIteratorMock); + InOrder verifier = inOrder(getRocksDBIteratorMock()); - verifier.verify(rocksIteratorMock, times(1)).isValid(); - verifier.verify(rocksIteratorMock, times(1)).key(any()); + verifier.verify(getRocksDBIteratorMock(), times(2)).isValid(); + verifier.verify(getRocksDBIteratorMock(), times(1)).key(any()); assertArrayEquals(new byte[]{0x00}, key); CodecTestUtil.gc(); @@ -241,10 +233,10 @@ public void testGettingTheKeyIfIteratorIsValid() throws Exception { @Test public void testGettingTheValueIfIteratorIsValid() throws Exception { - when(rocksIteratorMock.isValid()).thenReturn(true); - when(rocksIteratorMock.key(any())) + when(getRocksDBIteratorMock().isValid()).thenReturn(true); + when(getRocksDBIteratorMock().key(any())) .then(newAnswerInt("key1", 0x00)); - when(rocksIteratorMock.value(any())) + when(getRocksDBIteratorMock().value(any())) .then(newAnswerInt("val1", 0x7f)); byte[] key = null; @@ -257,10 +249,10 @@ public void testGettingTheValueIfIteratorIsValid() throws Exception { } } - InOrder verifier = inOrder(rocksIteratorMock); + InOrder verifier = inOrder(getRocksDBIteratorMock()); - verifier.verify(rocksIteratorMock, times(1)).isValid(); - verifier.verify(rocksIteratorMock, times(1)).key(any()); + verifier.verify(getRocksDBIteratorMock(), times(2)).isValid(); + verifier.verify(getRocksDBIteratorMock(), times(1)).key(any()); assertArrayEquals(new byte[]{0x00}, key); assertArrayEquals(new byte[]{0x7f}, value); @@ -271,18 +263,19 @@ public void testGettingTheValueIfIteratorIsValid() throws Exception { public void testRemovingFromDBActuallyDeletesFromTable() throws Exception { final byte[] testKey = new byte[10]; ThreadLocalRandom.current().nextBytes(testKey); - when(rocksIteratorMock.isValid()).thenReturn(true); - when(rocksIteratorMock.key(any())) + when(getRocksDBIteratorMock().isValid()).thenReturn(true); + when(getRocksDBIteratorMock().key(any())) .then(newAnswer("key1", testKey)); try (RDBStoreCodecBufferIterator i = newIterator(null)) { + i.next(); i.removeFromDB(); } - InOrder verifier = inOrder(rocksIteratorMock, rdbTableMock); + InOrder verifier = inOrder(getRocksDBIteratorMock(), getRocksTableMock()); - verifier.verify(rocksIteratorMock, times(1)).isValid(); - verifier.verify(rdbTableMock, times(1)) + verifier.verify(getRocksDBIteratorMock(), times(1)).isValid(); + verifier.verify(getRocksTableMock(), times(1)) .delete(ByteBuffer.wrap(testKey)); CodecTestUtil.gc(); @@ -302,7 +295,7 @@ public void testRemoveFromDBWithoutDBTableSet() throws Exception { public void testCloseCloses() throws Exception { newIterator().close(); - verify(rocksIteratorMock, times(1)).close(); + verify(getRocksDBIteratorMock(), times(1)).close(); CodecTestUtil.gc(); } @@ -310,48 +303,20 @@ public void testCloseCloses() throws Exception { @Test public void testNullPrefixedIterator() throws Exception { try (RDBStoreCodecBufferIterator i = newIterator()) { - verify(rocksIteratorMock, times(1)).seekToFirst(); - clearInvocations(rocksIteratorMock); + verify(getRocksDBIteratorMock(), times(1)).seekToFirst(); + clearInvocations(getRocksDBIteratorMock()); i.seekToFirst(); - verify(rocksIteratorMock, times(1)).seekToFirst(); - clearInvocations(rocksIteratorMock); + verify(getRocksDBIteratorMock(), times(1)).seekToFirst(); + clearInvocations(getRocksDBIteratorMock()); - when(rocksIteratorMock.isValid()).thenReturn(true); + when(getRocksDBIteratorMock().isValid()).thenReturn(true); assertTrue(i.hasNext()); - verify(rocksIteratorMock, times(1)).isValid(); - verify(rocksIteratorMock, times(0)).key(any()); + verify(getRocksDBIteratorMock(), times(1)).isValid(); + verify(getRocksDBIteratorMock(), times(0)).key(any()); i.seekToLast(); - verify(rocksIteratorMock, times(1)).seekToLast(); - } - - CodecTestUtil.gc(); - } - - @Test - public void testNormalPrefixedIterator() throws Exception { - final byte[] prefixBytes = "sample".getBytes(StandardCharsets.UTF_8); - try (RDBStoreCodecBufferIterator i = newIterator( - CodecBuffer.wrap(prefixBytes))) { - final ByteBuffer prefix = ByteBuffer.wrap(prefixBytes); - verify(rocksIteratorMock, times(1)).seek(prefix); - clearInvocations(rocksIteratorMock); - - i.seekToFirst(); - verify(rocksIteratorMock, times(1)).seek(prefix); - clearInvocations(rocksIteratorMock); - - when(rocksIteratorMock.isValid()).thenReturn(true); - when(rocksIteratorMock.key(any())) - .then(newAnswer("key1", prefixBytes)); - assertTrue(i.hasNext()); - verify(rocksIteratorMock, times(1)).isValid(); - verify(rocksIteratorMock, times(1)).key(any()); - - Exception e = - assertThrows(Exception.class, () -> i.seekToLast(), "Prefixed iterator does not support seekToLast"); - assertInstanceOf(UnsupportedOperationException.class, e); + verify(getRocksDBIteratorMock(), times(1)).seekToLast(); } CodecTestUtil.gc(); diff --git a/hadoop-hdds/framework/src/test/java/org/apache/hadoop/hdds/utils/db/TestRDBStorePoolBackedCodecBufferIterator.java b/hadoop-hdds/framework/src/test/java/org/apache/hadoop/hdds/utils/db/TestRDBStorePoolBackedCodecBufferIterator.java new file mode 100644 index 00000000000..8c158e21b13 --- /dev/null +++ b/hadoop-hdds/framework/src/test/java/org/apache/hadoop/hdds/utils/db/TestRDBStorePoolBackedCodecBufferIterator.java @@ -0,0 +1,434 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.hadoop.hdds.utils.db; + +import static org.apache.hadoop.hdds.utils.db.IteratorType.KEY_AND_VALUE; +import static org.junit.jupiter.api.Assertions.assertArrayEquals; +import static org.junit.jupiter.api.Assertions.assertEquals; +import static org.junit.jupiter.api.Assertions.assertFalse; +import static org.junit.jupiter.api.Assertions.assertNull; +import static org.junit.jupiter.api.Assertions.assertThrows; +import static org.junit.jupiter.api.Assertions.assertTrue; +import static org.mockito.Mockito.any; +import static org.mockito.Mockito.clearInvocations; +import static org.mockito.Mockito.inOrder; +import static org.mockito.Mockito.never; +import static org.mockito.Mockito.times; +import static org.mockito.Mockito.verify; +import static org.mockito.Mockito.when; + +import java.nio.ByteBuffer; +import java.util.ArrayList; +import java.util.List; +import java.util.NoSuchElementException; +import java.util.concurrent.CompletableFuture; +import java.util.concurrent.ExecutorService; +import java.util.concurrent.Executors; +import java.util.concurrent.ThreadLocalRandom; +import java.util.concurrent.TimeoutException; +import java.util.concurrent.atomic.AtomicInteger; +import java.util.concurrent.atomic.AtomicReference; +import org.apache.hadoop.hdds.StringUtils; +import org.apache.hadoop.hdds.utils.db.Table.CloseableKeyValue; +import org.apache.hadoop.hdds.utils.db.Table.KeyValue; +import org.apache.hadoop.hdds.utils.db.managed.ManagedRocksObjectUtils; +import org.apache.ozone.test.GenericTestUtils; +import org.junit.jupiter.api.BeforeAll; +import org.junit.jupiter.api.BeforeEach; +import org.junit.jupiter.api.Test; +import org.junit.jupiter.params.ParameterizedTest; +import org.junit.jupiter.params.provider.ValueSource; +import org.mockito.InOrder; +import org.mockito.stubbing.Answer; + +/** + */ +public class TestRDBStorePoolBackedCodecBufferIterator + extends TestRDBStoreAbstractIterator { + + @BeforeAll + public static void init() { + ManagedRocksObjectUtils.loadRocksDBLibrary(); + } + + @BeforeEach + public void setupTest() { + CodecBuffer.enableLeakDetection(); + } + + @Override + RDBStorePoolBackedCodecBufferIterator newIterator() throws RocksDatabaseException { + return new RDBStorePoolBackedCodecBufferIterator(getItrInitializer(), null, null, KEY_AND_VALUE, 1); + } + + RDBStorePoolBackedCodecBufferIterator newIterator(int poolSize) throws RocksDatabaseException { + return new RDBStorePoolBackedCodecBufferIterator(getItrInitializer(), null, null, KEY_AND_VALUE, + poolSize); + } + + @Override + RDBStorePoolBackedCodecBufferIterator newIterator(byte[] prefix) throws RocksDatabaseException { + return new RDBStorePoolBackedCodecBufferIterator(getItrInitializer(), getRocksTableMock(), prefix, KEY_AND_VALUE, + 1); + } + + Answer newAnswerInt(String name, int b) { + return newAnswer(name, (byte) b); + } + + Answer newAnswer(String name, byte... b) { + return invocation -> { + System.out.printf("answer %s: %s%n", name, StringUtils.bytes2Hex(b)); + Object[] args = invocation.getArguments(); + final ByteBuffer buffer = (ByteBuffer) args[0]; + return writeToBuffer(buffer, b); + }; + } + + private int writeToBuffer(ByteBuffer buffer, byte... bytesToWrite) { + buffer.clear(); + buffer.put(bytesToWrite); + buffer.flip(); + return bytesToWrite.length; + } + + @ParameterizedTest + @ValueSource(ints = {0, 1, 2, 3, 5, 10}) + public void testRDBStoreCodecBufferIterGetsFailBeyondMaxBuffers(int maxBuffers) + throws InterruptedException, TimeoutException, RocksDatabaseException { + List> vals = new ArrayList<>(); + when(getRocksDBIteratorMock().isValid()).thenReturn(true); + AtomicInteger counter = new AtomicInteger(0); + + when(getRocksDBIteratorMock().key(any())) + .thenAnswer(i -> writeToBuffer(i.getArgument(0), (byte)counter.getAndIncrement())); + when(getRocksDBIteratorMock().value(any())) + .thenAnswer(i -> writeToBuffer(i.getArgument(0), (byte)counter.getAndIncrement())); + try (RDBStorePoolBackedCodecBufferIterator iterator = newIterator(maxBuffers)) { + for (int i = 0; i < maxBuffers; i++) { + vals.add(iterator.next()); + } + assertEquals(Math.max(maxBuffers, 0), vals.size()); + ExecutorService executor = Executors.newSingleThreadExecutor(); + AtomicReference> nextThread = new AtomicReference<>(CompletableFuture.supplyAsync( + () -> { + CloseableKeyValue v = iterator.next(); + vals.add(v); + return true; + }, + executor)); + if (maxBuffers < 1) { + // Number of max buffers is always going to be at least 1. + GenericTestUtils.waitFor(() -> nextThread.get().isDone() && nextThread.get().join(), 10, 100); + nextThread.set(CompletableFuture.supplyAsync( + () -> { + CloseableKeyValue v = iterator.next(); + vals.add(v); + return true; + }, + executor)); + } + assertEquals(Math.max(1, maxBuffers), vals.size()); + for (int i = 0; i < vals.size(); i++) { + assertEquals(2 * i, vals.get(i).getKey().getArray()[0]); + assertEquals(2 * i + 1, vals.get(i).getValue().getArray()[0]); + } + assertFalse(nextThread.get().isDone()); + int size = vals.size(); + vals.get(0).close(); + GenericTestUtils.waitFor(() -> nextThread.get().isDone() && nextThread.get().join(), 10, 100); + assertEquals(size + 1, vals.size()); + assertEquals(2 * size, vals.get(size).getKey().getArray()[0]); + assertEquals(2 * size + 1, vals.get(size).getValue().getArray()[0]); + for (CloseableKeyValue v : vals) { + v.close(); + } + executor.shutdown(); + } + } + + @Test + public void testForEachRemaining() throws Exception { + when(getRocksDBIteratorMock().isValid()) + .thenReturn(true, true, true, true, true, true, false); + when(getRocksDBIteratorMock().key(any())) + .then(newAnswerInt("key1", 0x00)) + .then(newAnswerInt("key2", 0x01)) + .then(newAnswerInt("key3", 0x02)) + .thenThrow(new NoSuchElementException()); + when(getRocksDBIteratorMock().value(any())) + .then(newAnswerInt("val1", 0x7f)) + .then(newAnswerInt("val2", 0x7e)) + .then(newAnswerInt("val3", 0x7d)) + .thenThrow(new NoSuchElementException()); + + List> remaining = new ArrayList<>(); + try (RDBStorePoolBackedCodecBufferIterator i = newIterator()) { + i.forEachRemaining(kvSupplier -> { + try { + remaining.add(Table.newKeyValue( + kvSupplier.getKey().getArray(), kvSupplier.getValue().getArray())); + } finally { + kvSupplier.close(); + } + }); + + System.out.println("remaining: " + remaining); + assertArrayEquals(new byte[]{0x00}, remaining.get(0).getKey()); + assertArrayEquals(new byte[]{0x7f}, remaining.get(0).getValue()); + assertArrayEquals(new byte[]{0x01}, remaining.get(1).getKey()); + assertArrayEquals(new byte[]{0x7e}, remaining.get(1).getValue()); + assertArrayEquals(new byte[]{0x02}, remaining.get(2).getKey()); + assertArrayEquals(new byte[]{0x7d}, remaining.get(2).getValue()); + } + + CodecTestUtil.gc(); + } + + @Test + public void testHasNextDoesNotDependsOnIsvalid() throws Exception { + when(getRocksDBIteratorMock().isValid()).thenReturn(true, true, false); + + try (RDBStorePoolBackedCodecBufferIterator i = newIterator()) { + assertTrue(i.hasNext()); + try (CloseableKeyValue val = i.next()) { + assertFalse(i.hasNext()); + assertThrows(NoSuchElementException.class, i::next); + assertFalse(i.hasNext()); + } + } + + CodecTestUtil.gc(); + } + + @Test + public void testNextCallsIsValidThenGetsTheValueAndStepsToNext() + throws Exception { + when(getRocksDBIteratorMock().isValid()).thenReturn(true); + InOrder verifier = inOrder(getRocksDBIteratorMock()); + try (RDBStorePoolBackedCodecBufferIterator i = newIterator()) { + i.next().close(); + } + + verifier.verify(getRocksDBIteratorMock()).isValid(); + verifier.verify(getRocksDBIteratorMock()).key(any()); + verifier.verify(getRocksDBIteratorMock()).value(any()); + verifier.verify(getRocksDBIteratorMock()).next(); + + CodecTestUtil.gc(); + } + + @Test + public void testConstructorSeeksToFirstElement() throws Exception { + newIterator().close(); + + verify(getRocksDBIteratorMock(), times(1)).seekToFirst(); + + CodecTestUtil.gc(); + } + + @Test + public void testSeekToFirstSeeks() throws Exception { + try (RDBStorePoolBackedCodecBufferIterator i = newIterator()) { + i.seekToFirst(); + } + verify(getRocksDBIteratorMock(), times(2)).seekToFirst(); + + CodecTestUtil.gc(); + } + + @Test + public void testSeekToLastSeeks() throws Exception { + try (RDBStorePoolBackedCodecBufferIterator i = newIterator()) { + i.seekToLast(); + } + + verify(getRocksDBIteratorMock(), times(1)).seekToLast(); + + CodecTestUtil.gc(); + } + + @Test + public void testSeekWithInvalidValue() throws RocksDatabaseException { + when(getRocksDBIteratorMock().isValid()).thenReturn(false); + + try (RDBStorePoolBackedCodecBufferIterator i = newIterator(); + CodecBuffer target = CodecBuffer.wrap(new byte[] {0x55}); + CloseableKeyValue valSupplier = i.seek(target)) { + assertFalse(i.hasNext()); + InOrder verifier = inOrder(getRocksDBIteratorMock()); + verify(getRocksDBIteratorMock(), times(1)).seekToFirst(); //at construct time + verify(getRocksDBIteratorMock(), never()).seekToLast(); + verifier.verify(getRocksDBIteratorMock(), times(1)) + .seek(any(ByteBuffer.class)); + verifier.verify(getRocksDBIteratorMock(), times(2)).isValid(); + verifier.verify(getRocksDBIteratorMock(), never()).key(any()); + verifier.verify(getRocksDBIteratorMock(), never()).value(any()); + assertNull(valSupplier); + } + } + + @Test + public void testSeekReturnsTheActualKey() throws Exception { + when(getRocksDBIteratorMock().isValid()).thenReturn(true); + when(getRocksDBIteratorMock().key(any())) + .then(newAnswerInt("key1", 0x00)); + when(getRocksDBIteratorMock().value(any())) + .then(newAnswerInt("val1", 0x7f)); + + try (RDBStorePoolBackedCodecBufferIterator i = newIterator(); + CodecBuffer target = CodecBuffer.wrap(new byte[] {0x55}); + CloseableKeyValue valSupplier = i.seek(target)) { + assertTrue(i.hasNext()); + InOrder verifier = inOrder(getRocksDBIteratorMock()); + verify(getRocksDBIteratorMock(), times(1)).seekToFirst(); //at construct time + verify(getRocksDBIteratorMock(), never()).seekToLast(); + verifier.verify(getRocksDBIteratorMock(), times(1)) + .seek(any(ByteBuffer.class)); + verifier.verify(getRocksDBIteratorMock(), times(1)).isValid(); + verifier.verify(getRocksDBIteratorMock(), times(1)).key(any()); + verifier.verify(getRocksDBIteratorMock(), times(1)).value(any()); + assertArrayEquals(new byte[] {0x00}, valSupplier.getKey().getArray()); + assertArrayEquals(new byte[] {0x7f}, valSupplier.getValue().getArray()); + } + + CodecTestUtil.gc(); + } + + @Test + public void testGettingTheKeyIfIteratorIsValid() throws Exception { + when(getRocksDBIteratorMock().isValid()).thenReturn(true); + when(getRocksDBIteratorMock().key(any())) + .then(newAnswerInt("key1", 0x00)); + + byte[] key = null; + try (RDBStorePoolBackedCodecBufferIterator i = newIterator()) { + if (i.hasNext()) { + try (CloseableKeyValue kv = i.next()) { + key = kv.getKey().getArray(); + } + } + } + + InOrder verifier = inOrder(getRocksDBIteratorMock()); + + verifier.verify(getRocksDBIteratorMock(), times(2)).isValid(); + verifier.verify(getRocksDBIteratorMock(), times(1)).key(any()); + assertArrayEquals(new byte[]{0x00}, key); + + CodecTestUtil.gc(); + } + + @Test + public void testGettingTheValueIfIteratorIsValid() throws Exception { + when(getRocksDBIteratorMock().isValid()).thenReturn(true); + when(getRocksDBIteratorMock().key(any())) + .then(newAnswerInt("key1", 0x00)); + when(getRocksDBIteratorMock().value(any())) + .then(newAnswerInt("val1", 0x7f)); + + byte[] key = null; + byte[] value = null; + try (RDBStorePoolBackedCodecBufferIterator i = newIterator()) { + if (i.hasNext()) { + try (CloseableKeyValue entry = i.next()) { + key = entry.getKey().getArray(); + value = entry.getValue().getArray(); + } + } + } + + InOrder verifier = inOrder(getRocksDBIteratorMock()); + + verifier.verify(getRocksDBIteratorMock(), times(2)).isValid(); + verifier.verify(getRocksDBIteratorMock(), times(1)).key(any()); + assertArrayEquals(new byte[]{0x00}, key); + assertArrayEquals(new byte[]{0x7f}, value); + + CodecTestUtil.gc(); + } + + @Test + public void testRemovingFromDBActuallyDeletesFromTable() throws Exception { + final byte[] testKey = new byte[10]; + ThreadLocalRandom.current().nextBytes(testKey); + when(getRocksDBIteratorMock().isValid()).thenReturn(true); + when(getRocksDBIteratorMock().key(any())) + .then(newAnswer("key1", testKey)); + + try (RDBStorePoolBackedCodecBufferIterator i = newIterator(null)) { + try (CloseableKeyValue kv = i.next()) { + i.removeFromDB(); + } + + } + + InOrder verifier = inOrder(getRocksDBIteratorMock(), getRocksTableMock()); + + verifier.verify(getRocksDBIteratorMock(), times(1)).isValid(); + verifier.verify(getRocksTableMock(), times(1)) + .delete(ByteBuffer.wrap(testKey)); + CodecTestUtil.gc(); + } + + @Test + public void testRemoveFromDBWithoutDBTableSet() throws Exception { + try (RDBStorePoolBackedCodecBufferIterator i = newIterator()) { + assertThrows(UnsupportedOperationException.class, + i::removeFromDB); + } + + CodecTestUtil.gc(); + } + + @Test + public void testCloseCloses() throws Exception { + newIterator().close(); + + verify(getRocksDBIteratorMock(), times(1)).close(); + + CodecTestUtil.gc(); + } + + @Test + public void testNullPrefixedIterator() throws Exception { + try (RDBStorePoolBackedCodecBufferIterator i = newIterator()) { + verify(getRocksDBIteratorMock(), times(1)).seekToFirst(); + clearInvocations(getRocksDBIteratorMock()); + when(getRocksDBIteratorMock().isValid()).thenReturn(true); + i.seekToFirst(); + verify(getRocksDBIteratorMock(), times(0)).isValid(); + verify(getRocksDBIteratorMock(), times(0)).key(any()); + verify(getRocksDBIteratorMock(), times(1)).seekToFirst(); + clearInvocations(getRocksDBIteratorMock()); + i.hasNext(); + verify(getRocksDBIteratorMock(), times(1)).isValid(); + clearInvocations(getRocksDBIteratorMock()); + + assertTrue(i.hasNext()); + // hasNext shouldn't make isValid() redundant calls. + verify(getRocksDBIteratorMock(), times(1)).isValid(); + verify(getRocksDBIteratorMock(), times(0)).key(any()); + + i.seekToLast(); + verify(getRocksDBIteratorMock(), times(1)).seekToLast(); + } + + CodecTestUtil.gc(); + } +} diff --git a/hadoop-hdds/framework/src/test/java/org/apache/hadoop/hdds/utils/db/TestTypedRDBTableStore.java b/hadoop-hdds/framework/src/test/java/org/apache/hadoop/hdds/utils/db/TestTypedRDBTableStore.java index 8bf3e59e210..07df181c6a0 100644 --- a/hadoop-hdds/framework/src/test/java/org/apache/hadoop/hdds/utils/db/TestTypedRDBTableStore.java +++ b/hadoop-hdds/framework/src/test/java/org/apache/hadoop/hdds/utils/db/TestTypedRDBTableStore.java @@ -232,7 +232,7 @@ public void forEachAndIterator() throws Exception { @Test public void testIteratorOnException() throws Exception { RDBTable rdbTable = mock(RDBTable.class); - when(rdbTable.iterator((CodecBuffer) null, IteratorType.KEY_AND_VALUE)) + when(rdbTable.newCodecBufferIterator(null, IteratorType.KEY_AND_VALUE)) .thenThrow(new RocksDatabaseException()); final Table testTable = new TypedTable<>(rdbTable, StringCodec.get(), StringCodec.get(), CacheType.PARTIAL_CACHE); diff --git a/hadoop-hdds/managed-rocksdb/src/main/java/org/apache/hadoop/hdds/utils/db/managed/ManagedReadOptions.java b/hadoop-hdds/managed-rocksdb/src/main/java/org/apache/hadoop/hdds/utils/db/managed/ManagedReadOptions.java index 1d35cc1623f..47dac0e7b68 100644 --- a/hadoop-hdds/managed-rocksdb/src/main/java/org/apache/hadoop/hdds/utils/db/managed/ManagedReadOptions.java +++ b/hadoop-hdds/managed-rocksdb/src/main/java/org/apache/hadoop/hdds/utils/db/managed/ManagedReadOptions.java @@ -17,8 +17,11 @@ package org.apache.hadoop.hdds.utils.db.managed; +import static org.apache.hadoop.hdds.utils.db.managed.ManagedRocksObjectUtils.LOG; import static org.apache.hadoop.hdds.utils.db.managed.ManagedRocksObjectUtils.track; +import com.google.common.annotations.VisibleForTesting; +import org.apache.hadoop.hdds.utils.IOUtils; import org.apache.ratis.util.UncheckedAutoCloseable; import org.rocksdb.ReadOptions; @@ -28,12 +31,47 @@ public class ManagedReadOptions extends ReadOptions { private final UncheckedAutoCloseable leakTracker = track(this); + private ManagedSlice lowerBound; + private ManagedSlice upperBound; + + public ManagedReadOptions() { + super(); + } + + public ManagedReadOptions(boolean fillCache) { + this(fillCache, null, null); + } + + public ManagedReadOptions(boolean fillCache, byte[] lowerBound, byte[] upperBound) { + super(); + setFillCache(fillCache); + if (lowerBound != null) { + this.lowerBound = new ManagedSlice(lowerBound); + this.setIterateLowerBound(this.lowerBound); + } + if (upperBound != null) { + this.upperBound = new ManagedSlice(upperBound); + this.setIterateUpperBound(this.upperBound); + } + } + @Override public void close() { try { super.close(); + IOUtils.close(LOG, lowerBound, upperBound); } finally { leakTracker.close(); } } + + @VisibleForTesting + public ManagedSlice getLowerBound() { + return lowerBound; + } + + @VisibleForTesting + public ManagedSlice getUpperBound() { + return upperBound; + } } diff --git a/hadoop-hdds/managed-rocksdb/src/main/java/org/apache/hadoop/hdds/utils/db/managed/ManagedSlice.java b/hadoop-hdds/managed-rocksdb/src/main/java/org/apache/hadoop/hdds/utils/db/managed/ManagedSlice.java index e35a4edc757..7f1b13a0842 100644 --- a/hadoop-hdds/managed-rocksdb/src/main/java/org/apache/hadoop/hdds/utils/db/managed/ManagedSlice.java +++ b/hadoop-hdds/managed-rocksdb/src/main/java/org/apache/hadoop/hdds/utils/db/managed/ManagedSlice.java @@ -47,4 +47,9 @@ protected void disposeInternal() { leakTracker.close(); } } + + @Override + public boolean isOwningHandle() { + return super.isOwningHandle(); + } }