Skip to content
Draft
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -17,15 +17,19 @@

package org.apache.hadoop.hdds.utils.db;

import com.google.common.primitives.UnsignedBytes;
import java.util.Arrays;
import java.util.NoSuchElementException;
import java.util.function.Consumer;
import org.apache.hadoop.hdds.utils.db.managed.ManagedReadOptions;
import org.apache.hadoop.hdds.utils.db.managed.ManagedRocksIterator;
import org.apache.ratis.util.function.CheckedFunction;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

/**
* An abstract {@link Table.KeyValueIterator} to iterate raw {@link Table.KeyValue}s.
*
* NOTE: This class only works with RocksDB when comparator is set to Rocksdb's ByteWiseComparator.
* @param <RAW> the raw type.
*/
abstract class RDBStoreAbstractIterator<RAW>
Expand All @@ -34,29 +38,41 @@ abstract class RDBStoreAbstractIterator<RAW>
private static final Logger LOG =
LoggerFactory.getLogger(RDBStoreAbstractIterator.class);

private final ManagedReadOptions readOptions;
private final ManagedRocksIterator rocksDBIterator;
private final RDBTable rocksDBTable;
private Table.KeyValue<RAW, RAW> currentEntry;
// This is for schemas that use a fixed-length
// prefix for each key.
private final RAW prefix;

private final IteratorType type;

RDBStoreAbstractIterator(ManagedRocksIterator iterator, RDBTable table, RAW prefix, IteratorType type) {
this.rocksDBIterator = iterator;
RDBStoreAbstractIterator(
CheckedFunction<ManagedReadOptions, ManagedRocksIterator, RocksDatabaseException> itrSupplier, RDBTable table,
byte[] prefix, IteratorType type) throws RocksDatabaseException {
this.readOptions = new ManagedReadOptions(false, prefix, getNextHigherPrefix(prefix));
this.rocksDBIterator = itrSupplier.apply(readOptions);
this.rocksDBTable = table;
this.prefix = prefix;
this.type = type;
}

private byte[] getNextHigherPrefix(byte[] prefix) {
if (prefix == null) {
return null;
}
for (int i = prefix.length - 1; i >= 0; i--) {
if (UnsignedBytes.compare(prefix[i], UnsignedBytes.MAX_VALUE) != 0) {
byte[] nextHigher = Arrays.copyOf(prefix, i + 1);
nextHigher[i] = (byte) (prefix[i] + 1);
return nextHigher;
}
}
// No higher prefix exists since all bytes are MAX_VALUE.
return null;
}

IteratorType getType() {
return type;
}

/** @return the key for the current entry. */
abstract RAW key();

/** @return the {@link Table.KeyValue} for the current entry. */
abstract Table.KeyValue<RAW, RAW> getKeyValue();

Expand All @@ -66,9 +82,6 @@ IteratorType getType() {
/** Delete the given key. */
abstract void delete(RAW key) throws RocksDatabaseException;

/** Does the given key start with the prefix? */
abstract boolean startsWithPrefix(RAW key);

final ManagedRocksIterator getRocksDBIterator() {
return rocksDBIterator;
}
Expand All @@ -77,10 +90,6 @@ final RDBTable getRocksDBTable() {
return rocksDBTable;
}

final RAW getPrefix() {
return prefix;
}

@Override
public final void forEachRemaining(
Consumer<? super Table.KeyValue<RAW, RAW>> action) {
Expand All @@ -99,8 +108,7 @@ private void setCurrentEntry() {

@Override
public final boolean hasNext() {
return rocksDBIterator.get().isValid() &&
(prefix == null || startsWithPrefix(key()));
return rocksDBIterator.get().isValid();
}

@Override
Expand All @@ -115,22 +123,12 @@ public final Table.KeyValue<RAW, RAW> next() {

@Override
public final void seekToFirst() {
if (prefix == null) {
rocksDBIterator.get().seekToFirst();
} else {
seek0(prefix);
}
setCurrentEntry();
rocksDBIterator.get().seekToFirst();
}

@Override
public final void seekToLast() {
if (prefix == null) {
rocksDBIterator.get().seekToLast();
} else {
throw new UnsupportedOperationException("seekToLast: prefix != null");
}
setCurrentEntry();
rocksDBIterator.get().seekToLast();
}

@Override
Expand All @@ -155,5 +153,6 @@ public final void removeFromDB() throws RocksDatabaseException, CodecException {
@Override
public void close() {
rocksDBIterator.close();
readOptions.close();
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -17,28 +17,22 @@

package org.apache.hadoop.hdds.utils.db;

import java.util.Arrays;
import org.apache.hadoop.hdds.utils.db.managed.ManagedReadOptions;
import org.apache.hadoop.hdds.utils.db.managed.ManagedRocksIterator;
import org.apache.ratis.util.function.CheckedFunction;

/**
* RocksDB store iterator using the byte[] API.
*/
class RDBStoreByteArrayIterator extends RDBStoreAbstractIterator<byte[]> {
private static byte[] copyPrefix(byte[] prefix) {
return prefix == null || prefix.length == 0 ? null : Arrays.copyOf(prefix, prefix.length);
}

RDBStoreByteArrayIterator(ManagedRocksIterator iterator,
RDBTable table, byte[] prefix, IteratorType type) {
super(iterator, table, copyPrefix(prefix), type);
RDBStoreByteArrayIterator(
CheckedFunction<ManagedReadOptions, ManagedRocksIterator, RocksDatabaseException> itrSupplier,
RDBTable table, byte[] prefix, IteratorType type) throws RocksDatabaseException {
super(itrSupplier, table, prefix, type);
seekToFirst();
}

@Override
byte[] key() {
return getRocksDBIterator().get().key();
}

@Override
Table.KeyValue<byte[], byte[]> getKeyValue() {
final ManagedRocksIterator i = getRocksDBIterator();
Expand All @@ -56,27 +50,4 @@ void seek0(byte[] key) {
void delete(byte[] key) throws RocksDatabaseException {
getRocksDBTable().delete(key);
}

@Override
boolean startsWithPrefix(byte[] value) {
final byte[] prefix = getPrefix();
if (prefix == null) {
return true;
}
if (value == null) {
return false;
}

int length = prefix.length;
if (value.length < length) {
return false;
}

for (int i = 0; i < length; i++) {
if (value[i] != prefix[i]) {
return false;
}
}
return true;
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -17,10 +17,11 @@

package org.apache.hadoop.hdds.utils.db;

import java.util.Optional;
import java.util.concurrent.atomic.AtomicBoolean;
import org.apache.hadoop.hdds.utils.db.managed.ManagedReadOptions;
import org.apache.hadoop.hdds.utils.db.managed.ManagedRocksIterator;
import org.apache.ratis.util.Preconditions;
import org.apache.ratis.util.function.CheckedFunction;

/**
* Implement {@link RDBStoreAbstractIterator} using {@link CodecBuffer}.
Expand All @@ -31,15 +32,15 @@ class RDBStoreCodecBufferIterator extends RDBStoreAbstractIterator<CodecBuffer>
private final Buffer valueBuffer;
private final AtomicBoolean closed = new AtomicBoolean();

RDBStoreCodecBufferIterator(ManagedRocksIterator iterator, RDBTable table,
CodecBuffer prefix, IteratorType type) {
super(iterator, table, prefix, type);
RDBStoreCodecBufferIterator(
CheckedFunction<ManagedReadOptions, ManagedRocksIterator, RocksDatabaseException> itrSupplier, RDBTable table,
byte[] prefix, IteratorType type) throws RocksDatabaseException {
super(itrSupplier, table, prefix, type);

final String name = table != null ? table.getName() : null;
this.keyBuffer = new Buffer(
new CodecBuffer.Capacity(name + "-iterator-key", 1 << 10),
// it has to read key for matching prefix.
getType().readKey() || prefix != null ? buffer -> getRocksDBIterator().get().key(buffer) : null);
getType().readKey() ? buffer -> getRocksDBIterator().get().key(buffer) : null);
this.valueBuffer = new Buffer(
new CodecBuffer.Capacity(name + "-iterator-value", 4 << 10),
getType().readValue() ? buffer -> getRocksDBIterator().get().value(buffer) : null);
Expand All @@ -50,17 +51,10 @@ void assertOpen() {
Preconditions.assertTrue(!closed.get(), "Already closed");
}

@Override
CodecBuffer key() {
assertOpen();
return keyBuffer.getFromDb();
}

@Override
Table.KeyValue<CodecBuffer, CodecBuffer> getKeyValue() {
assertOpen();
final CodecBuffer key = getType().readKey() ? key() : null;
return Table.newKeyValue(key, valueBuffer.getFromDb());
return Table.newKeyValue(keyBuffer.getFromDb(), valueBuffer.getFromDb());
}

@Override
Expand All @@ -75,24 +69,10 @@ void delete(CodecBuffer key) throws RocksDatabaseException {
getRocksDBTable().delete(key.asReadOnlyByteBuffer());
}

@Override
boolean startsWithPrefix(CodecBuffer key) {
assertOpen();
final CodecBuffer prefix = getPrefix();
if (prefix == null) {
return true;
}
if (key == null) {
return false;
}
return key.startsWith(prefix);
}

@Override
public void close() {
if (closed.compareAndSet(false, true)) {
super.close();
Optional.ofNullable(getPrefix()).ifPresent(CodecBuffer::release);
keyBuffer.release();
valueBuffer.release();
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -206,13 +206,13 @@ public void deleteWithBatch(BatchOperation batch, byte[] key) {
@Override
public KeyValueIterator<byte[], byte[]> iterator(byte[] prefix, IteratorType type)
throws RocksDatabaseException {
return new RDBStoreByteArrayIterator(db.newIterator(family, false), this,
prefix, type);
return new RDBStoreByteArrayIterator(readOptions -> db.newIterator(family, readOptions),
this, prefix, type);
}

KeyValueIterator<CodecBuffer, CodecBuffer> iterator(
CodecBuffer prefix, IteratorType type) throws RocksDatabaseException {
return new RDBStoreCodecBufferIterator(db.newIterator(family, false),
KeyValueIterator<CodecBuffer, CodecBuffer> newCodecBufferIterator(
byte[] prefix, IteratorType type) throws RocksDatabaseException {
return new RDBStoreCodecBufferIterator(readOptions -> db.newIterator(family, readOptions),
this, prefix, type);
}

Expand All @@ -237,26 +237,13 @@ public void deleteBatchWithPrefix(BatchOperation batch, byte[] prefix)
}

@Override
public void dumpToFileWithPrefix(File externalFile, byte[] prefix)
throws RocksDatabaseException, CodecException {
CodecBuffer prefixBuffer = prefix == null || prefix.length == 0 ? null :
CodecBufferCodec.get(true).fromPersistedFormat(prefix);
KeyValueIterator<CodecBuffer, CodecBuffer> iter;
try {
iter = iterator(prefixBuffer, IteratorType.KEY_AND_VALUE);
} catch (RocksDatabaseException e) {
if (prefixBuffer != null) {
prefixBuffer.close();
}
throw e;
}
try (RDBSstFileWriter fileWriter = new RDBSstFileWriter(externalFile)) {
public void dumpToFileWithPrefix(File externalFile, byte[] prefix) throws RocksDatabaseException {
try (KeyValueIterator<CodecBuffer, CodecBuffer> iter = newCodecBufferIterator(prefix, IteratorType.KEY_AND_VALUE);
RDBSstFileWriter fileWriter = new RDBSstFileWriter(externalFile)) {
while (iter.hasNext()) {
final KeyValue<CodecBuffer, CodecBuffer> entry = iter.next();
fileWriter.put(entry.getKey(), entry.getValue());
}
} finally {
iter.close();
}
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -790,10 +790,8 @@ public ManagedRocksIterator newIterator(ColumnFamily family)
}

public ManagedRocksIterator newIterator(ColumnFamily family,
boolean fillCache) throws RocksDatabaseException {
try (UncheckedAutoCloseable ignored = acquire();
ManagedReadOptions readOptions = new ManagedReadOptions()) {
readOptions.setFillCache(fillCache);
ManagedReadOptions readOptions) throws RocksDatabaseException {
try (UncheckedAutoCloseable ignored = acquire()) {
return managed(db.get().newIterator(family.getHandle(), readOptions));
}
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -388,10 +388,10 @@ public void deleteRange(KEY beginKey, KEY endKey) throws RocksDatabaseException,
@Override
public KeyValueIterator<KEY, VALUE> iterator(KEY prefix, IteratorType type)
throws RocksDatabaseException, CodecException {
final byte[] prefixBytes = encodeKey(prefix);
if (supportCodecBuffer) {
return newCodecBufferTableIterator(prefix, type);
return newCodecBufferTableIterator(rawTable.newCodecBufferIterator(prefixBytes, type));
} else {
final byte[] prefixBytes = encodeKey(prefix);
return new TypedTableIterator(rawTable.iterator(prefixBytes, type));
}
}
Expand Down Expand Up @@ -481,27 +481,6 @@ TableCache<KEY, VALUE> getCache() {
return cache;
}

private RawIterator<CodecBuffer> newCodecBufferTableIterator(KEY prefix, IteratorType type)
throws RocksDatabaseException, CodecException {
final CodecBuffer encoded = encodeKeyCodecBuffer(prefix);
final CodecBuffer prefixBuffer;
if (encoded != null && encoded.readableBytes() == 0) {
encoded.release();
prefixBuffer = null;
} else {
prefixBuffer = encoded;
}

try {
return newCodecBufferTableIterator(rawTable.iterator(prefixBuffer, type));
} catch (Throwable t) {
if (prefixBuffer != null) {
prefixBuffer.release();
}
throw t;
}
}

private RawIterator<CodecBuffer> newCodecBufferTableIterator(KeyValueIterator<CodecBuffer, CodecBuffer> i) {
return new RawIterator<CodecBuffer>(i) {
@Override
Expand Down
Loading