Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -32,8 +32,11 @@
import org.apache.fluss.record.ProjectionPushdownCache;
import org.apache.fluss.record.TestingSchemaGetter;
import org.apache.fluss.row.GenericArray;
import org.apache.fluss.row.GenericMap;
import org.apache.fluss.row.GenericRow;
import org.apache.fluss.row.InternalRow;
import org.apache.fluss.rpc.entity.FetchLogResultForBucket;
import org.apache.fluss.testutils.InternalRowAssert;
import org.apache.fluss.types.DataTypes;
import org.apache.fluss.types.RowType;
import org.apache.fluss.utils.FlussPaths;
Expand Down Expand Up @@ -65,6 +68,7 @@
import static org.apache.fluss.record.TestData.DATA2_TABLE_INFO;
import static org.apache.fluss.record.TestData.DATA2_TABLE_PATH;
import static org.apache.fluss.record.TestData.DEFAULT_SCHEMA_ID;
import static org.apache.fluss.row.BinaryString.fromString;
import static org.apache.fluss.rpc.util.CommonRpcMessageUtils.toByteBuffer;
import static org.apache.fluss.testutils.DataTestUtils.createRecordsWithoutBaseLogOffset;
import static org.assertj.core.api.Assertions.assertThat;
Expand Down Expand Up @@ -226,21 +230,54 @@ void testComplexTypeFetch() throws Exception {
Arrays.asList(
new Object[] {
1,
new String[] {"a", "b"},
new Object[] {new int[] {1, 2}, new int[] {3, 4}},
new Object[] {10, new Object[] {20, "nested"}, "row1"}
new Object[] {fromString("a"), fromString("b")},
new Object[] {
new GenericArray(new int[] {1, 2}),
new GenericArray(new int[] {3, 4})
},
new Object[] {
10, new Object[] {20, fromString("nested")}, fromString("row1")
},
GenericMap.of(1, fromString("one"), 2, fromString("two")),
GenericMap.of(
fromString("k1"),
GenericMap.of(10, fromString("v1"), 20, fromString("v2"))),
GenericMap.of(
fromString("arr1"),
new GenericArray(new int[] {1, 2}),
fromString("arr2"),
new GenericArray(new int[] {3, 4, 5}))
},
new Object[] {
2,
new String[] {"c", null},
new Object[] {null, new int[] {3, 4}},
new Object[] {30, new Object[] {40, "test"}, "row2"}
new Object[] {fromString("c"), null},
new Object[] {null, new GenericArray(new int[] {3, 4})},
new Object[] {
30, new Object[] {40, fromString("test")}, fromString("row2")
},
GenericMap.of(3, null, 4, fromString("four")),
GenericMap.of(fromString("k2"), GenericMap.of(30, fromString("v3"))),
GenericMap.of(fromString("arr3"), new GenericArray(new int[] {6}))
},
new Object[] {
3,
new String[] {"e", "f"},
new Object[] {new int[] {5, 6, 7}, new int[] {8}},
new Object[] {50, new Object[] {60, "value"}, "row3"}
new Object[] {fromString("e"), fromString("f")},
new Object[] {
new GenericArray(new int[] {5, 6, 7}),
new GenericArray(new int[] {8})
},
new Object[] {
50, new Object[] {60, fromString("value")}, fromString("row3")
},
GenericMap.of(5, fromString("five")),
GenericMap.of(
fromString("k3"),
GenericMap.of(50, fromString("v5"), 60, fromString("v6"))),
GenericMap.of(
fromString("arr4"),
new GenericArray(new int[] {7, 8}),
fromString("arr5"),
new GenericArray(new int[] {9}))
});
Schema schema =
Schema.newBuilder()
Expand All @@ -253,6 +290,18 @@ void testComplexTypeFetch() throws Exception {
DataTypes.INT(),
DataTypes.ROW(DataTypes.INT(), DataTypes.STRING()),
DataTypes.STRING()))
.column("e", DataTypes.MAP(DataTypes.INT().copy(false), DataTypes.STRING()))
.column(
"f",
DataTypes.MAP(
DataTypes.STRING().copy(false),
DataTypes.MAP(
DataTypes.INT().copy(false), DataTypes.STRING())))
.column(
"g",
DataTypes.MAP(
DataTypes.STRING().copy(false),
DataTypes.ARRAY(DataTypes.INT())))
.build();
TableInfo tableInfo =
TableInfo.of(
Expand Down Expand Up @@ -299,28 +348,30 @@ void testComplexTypeFetch() throws Exception {
// this is important to test complex types
defaultCompletedFetch.readContext.close();
assertThat(scanRecords.size()).isEqualTo(3);

List<GenericRow> expectedRows = new ArrayList<>();
for (Object[] data : complexData) {
Object[] rowData = (Object[]) data[3];
Object[] nestedRowData = (Object[]) rowData[1];
GenericRow nestedRow = GenericRow.of(nestedRowData[0], nestedRowData[1]);
GenericRow row = GenericRow.of(rowData[0], nestedRow, rowData[2]);
expectedRows.add(
GenericRow.of(
data[0],
new GenericArray((Object[]) data[1]),
new GenericArray((Object[]) data[2]),
row,
data[4],
data[5],
data[6]));
}

for (int i = 0; i < scanRecords.size(); i++) {
ScanRecord record = scanRecords.get(i);
assertThat(record.logOffset()).isEqualTo(i);
InternalRow row = record.getRow();
assertThat(row.getInt(0)).isEqualTo(complexData.get(i)[0]);
assertThat(row.getArray(1)).isInstanceOf(GenericArray.class);
GenericArray array = (GenericArray) row.getArray(1);
assertThat(array.toString())
.isEqualTo(Arrays.deepToString((Object[]) complexData.get(i)[1]));
assertThat(row.getArray(2).toString())
.isEqualTo(Arrays.deepToString((Object[]) complexData.get(i)[2]));
InternalRow nestedRow = row.getRow(3, 3);
assertThat(nestedRow).isNotNull();
assertThat(nestedRow.getInt(0)).isEqualTo(((Object[]) complexData.get(i)[3])[0]);
InternalRow deeplyNestedRow = nestedRow.getRow(1, 2);
assertThat(deeplyNestedRow).isNotNull();
assertThat(deeplyNestedRow.getInt(0))
.isEqualTo(((Object[]) ((Object[]) complexData.get(i)[3])[1])[0]);
assertThat(deeplyNestedRow.getString(1).toString())
.isEqualTo(((Object[]) ((Object[]) complexData.get(i)[3])[1])[1]);
assertThat(nestedRow.getString(2).toString())
.isEqualTo(((Object[]) complexData.get(i)[3])[2]);
InternalRowAssert.assertThatRow(record.getRow())
.withSchema(schema.getRowType())
.isEqualTo(expectedRows.get(i));
}
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,7 @@

import org.apache.fluss.memory.MemorySegment;
import org.apache.fluss.row.serializer.ArraySerializer;
import org.apache.fluss.row.serializer.MapSerializer;
import org.apache.fluss.row.serializer.RowSerializer;

import java.nio.charset.StandardCharsets;
Expand Down Expand Up @@ -89,13 +90,12 @@ public void writeArray(int pos, InternalArray input, ArraySerializer serializer)
pos, binary.getSegments(), binary.getOffset(), binary.getSizeInBytes());
}

// TODO: Map and Row write methods will be added in Issue #1973
// @Override
// public void writeMap(int pos, InternalMap input, InternalMapSerializer serializer) {
// BinaryMap binary = serializer.toBinaryMap(input);
// writeSegmentsToVarLenPart(
// pos, binary.getSegments(), binary.getOffset(), binary.getSizeInBytes());
// }
@Override
public void writeMap(int pos, InternalMap input, MapSerializer serializer) {
BinaryMap binary = serializer.toBinaryMap(input);
writeSegmentsToVarLenPart(
pos, binary.getSegments(), binary.getOffset(), binary.getSizeInBytes());
}

@Override
public void writeRow(int pos, InternalRow value, RowSerializer serializer) {
Expand Down
12 changes: 10 additions & 2 deletions fluss-common/src/main/java/org/apache/fluss/row/BinaryArray.java
Original file line number Diff line number Diff line change
Expand Up @@ -271,7 +271,15 @@ public InternalArray getArray(int pos) {
/** Creates a nested {@link BinaryArray} with the nested data type information. */
protected abstract BinaryArray createNestedArrayInstance();

// TODO: getMap() will be added in Issue #1973
/** Creates a nested {@link BinaryMap} with the nested data type information. */
protected abstract BinaryMap createNestedMapInstance();

@Override
public InternalMap getMap(int pos) {
assertIndexIsValid(pos);
return BinarySegmentUtils.readBinaryMap(
segments, offset, getLong(pos), createNestedMapInstance());
}

@Override
public boolean getBoolean(int pos) {
Expand Down Expand Up @@ -602,7 +610,7 @@ public static BinaryArray fromPrimitiveArray(double[] arr) {
private static BinaryArray fromPrimitiveArray(
Object arr, int offset, int length, int elementSize) {
final long headerInBytes = calculateHeaderInBytes(length);
final long valueRegionInBytes = elementSize * length;
final long valueRegionInBytes = ((long) elementSize) * length;

// must align by 8 bytes
long totalSizeInLongs = (headerInBytes + valueRegionInBytes + 7) / 8;
Expand Down
137 changes: 137 additions & 0 deletions fluss-common/src/main/java/org/apache/fluss/row/BinaryMap.java
Original file line number Diff line number Diff line change
@@ -0,0 +1,137 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package org.apache.fluss.row;

import org.apache.fluss.annotation.PublicEvolving;
import org.apache.fluss.memory.MemorySegment;

import static org.apache.fluss.utils.Preconditions.checkArgument;

/**
* Binary implementation of {@link InternalMap} backed by {@link MemorySegment}s.
*
* <p>The binary layout of {@link BinaryMap}:
*
* <pre>
* [4 byte(keyArray size in bytes)] + [Key BinaryArray] + [Value BinaryArray].
* </pre>
*
* <p>Influenced by Apache Spark UnsafeMapData.
*
* @since 0.9
*/
@PublicEvolving
public abstract class BinaryMap extends BinarySection implements InternalMap {

private static final long serialVersionUID = 1L;

private transient BinaryArray keys;
private transient BinaryArray values;

@Override
public int size() {
return keys.size();
}

@Override
public void pointTo(MemorySegment[] segments, int offset, int sizeInBytes) {
// Read the numBytes of key array from the first 4 bytes.
final int keyArrayBytes = BinarySegmentUtils.getInt(segments, offset);
assert keyArrayBytes >= 0 : "keyArraySize (" + keyArrayBytes + ") should >= 0";
final int valueArrayBytes = sizeInBytes - keyArrayBytes - 4;
assert valueArrayBytes >= 0 : "valueArraySize (" + valueArrayBytes + ") should >= 0";

// see BinarySection.readObject, on this call stack, keys and values are not initialized
if (keys == null) {
keys = createKeyArrayInstance();
}
keys.pointTo(segments, offset + 4, keyArrayBytes);
if (values == null) {
values = createValueArrayInstance();
}
values.pointTo(segments, offset + 4 + keyArrayBytes, valueArrayBytes);

assert keys.size() == values.size();

this.segments = segments;
this.offset = offset;
this.sizeInBytes = sizeInBytes;
}

/** Creates a {@link BinaryArray} instance for keys with the nested data type information. */
protected abstract BinaryArray createKeyArrayInstance();

/** Creates a {@link BinaryArray} instance for values with the nested data type information. */
protected abstract BinaryArray createValueArrayInstance();

/** Creates a {@link BinaryMap} instance for copy operation. */
protected abstract BinaryMap createMapInstance();

@Override
public BinaryArray keyArray() {
return keys;
}

@Override
public BinaryArray valueArray() {
return values;
}

public BinaryMap copy() {
return copy(createMapInstance());
}

public BinaryMap copy(BinaryMap reuse) {
byte[] bytes = BinarySegmentUtils.copyToBytes(segments, offset, sizeInBytes);
reuse.pointTo(MemorySegment.wrap(bytes), 0, sizeInBytes);
return reuse;
}

@Override
public boolean equals(Object o) {
if (this == o) {
return true;
}
// override equals and only checks the other object is instance of BinaryMap
if (!(o instanceof BinaryMap)) {
return false;
}
final BinarySection that = (BinarySection) o;
return sizeInBytes == that.sizeInBytes
&& BinarySegmentUtils.equals(
segments, offset, that.segments, that.offset, sizeInBytes);
}

@Override
public int hashCode() {
return BinarySegmentUtils.hash(segments, offset, sizeInBytes);
}

public static BinaryMap valueOf(BinaryArray key, BinaryArray value, BinaryMap reuse) {
checkArgument(key.segments.length == 1 && value.getSegments().length == 1);
byte[] bytes = new byte[4 + key.sizeInBytes + value.sizeInBytes];
MemorySegment segment = MemorySegment.wrap(bytes);
segment.putInt(0, key.sizeInBytes);
key.getSegments()[0].copyTo(key.getOffset(), segment, 4, key.sizeInBytes);
value.getSegments()[0].copyTo(
value.getOffset(), segment, 4 + key.sizeInBytes, value.sizeInBytes);
reuse.pointTo(segment, 0, bytes.length);
return reuse;
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -1002,18 +1002,16 @@ public static BinaryArray readBinaryArray(
return reusedArray;
}

/** Read map data from segments. */
public static InternalMap readMap(MemorySegment[] segments, int offset, int numBytes) {
// TODO: Map type support will be added in Issue #1973
throw new UnsupportedOperationException(
"Map type is not supported yet. Will be added in Issue #1973.");
}

/** Read map data from segments with long offset. */
public static InternalMap readMap(MemorySegment[] segments, int offset, long numBytes) {
// TODO: Map type support will be added in Issue #1973
throw new UnsupportedOperationException(
"Map type is not supported yet. Will be added in Issue #1973.");
/**
* Read the map data into the reused {@link BinaryMap} instance from underlying {@link
* MemorySegment}.
*/
public static BinaryMap readBinaryMap(
MemorySegment[] segments, int baseOffset, long offsetAndSize, BinaryMap reusedMap) {
final int size = ((int) offsetAndSize);
int offset = (int) (offsetAndSize >> 32);
reusedMap.pointTo(segments, offset + baseOffset, size);
return reusedMap;
}

/** Read aligned row from segments. */
Expand Down
Loading