Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -26,14 +26,33 @@
import org.apache.fluss.lake.lance.writers.ArrowFieldWriter;
import org.apache.fluss.lake.lance.writers.ArrowFloatWriter;
import org.apache.fluss.lake.lance.writers.ArrowIntWriter;
import org.apache.fluss.lake.lance.writers.ArrowListWriter;
import org.apache.fluss.lake.lance.writers.ArrowNestedListWriter;
import org.apache.fluss.lake.lance.writers.ArrowSmallIntWriter;
import org.apache.fluss.lake.lance.writers.ArrowTimeWriter;
import org.apache.fluss.lake.lance.writers.ArrowTimestampLtzWriter;
import org.apache.fluss.lake.lance.writers.ArrowTimestampNtzWriter;
import org.apache.fluss.lake.lance.writers.ArrowTinyIntWriter;
import org.apache.fluss.lake.lance.writers.ArrowVarBinaryWriter;
import org.apache.fluss.lake.lance.writers.ArrowVarCharWriter;
import org.apache.fluss.lake.lance.writers.array.ArrowArrayBigIntWriter;
import org.apache.fluss.lake.lance.writers.array.ArrowArrayBinaryWriter;
import org.apache.fluss.lake.lance.writers.array.ArrowArrayBooleanWriter;
import org.apache.fluss.lake.lance.writers.array.ArrowArrayDateWriter;
import org.apache.fluss.lake.lance.writers.array.ArrowArrayDecimalWriter;
import org.apache.fluss.lake.lance.writers.array.ArrowArrayDoubleWriter;
import org.apache.fluss.lake.lance.writers.array.ArrowArrayFloatWriter;
import org.apache.fluss.lake.lance.writers.array.ArrowArrayIntWriter;
import org.apache.fluss.lake.lance.writers.array.ArrowArraySmallIntWriter;
import org.apache.fluss.lake.lance.writers.array.ArrowArrayTimeWriter;
import org.apache.fluss.lake.lance.writers.array.ArrowArrayTimestampLtzWriter;
import org.apache.fluss.lake.lance.writers.array.ArrowArrayTimestampNtzWriter;
import org.apache.fluss.lake.lance.writers.array.ArrowArrayTinyIntWriter;
import org.apache.fluss.lake.lance.writers.array.ArrowArrayVarBinaryWriter;
import org.apache.fluss.lake.lance.writers.array.ArrowArrayVarCharWriter;
import org.apache.fluss.row.InternalArray;
import org.apache.fluss.row.InternalRow;
import org.apache.fluss.types.ArrayType;
import org.apache.fluss.types.BigIntType;
import org.apache.fluss.types.BinaryType;
import org.apache.fluss.types.BooleanType;
Expand Down Expand Up @@ -72,6 +91,7 @@
import org.apache.arrow.vector.ValueVector;
import org.apache.arrow.vector.VarBinaryVector;
import org.apache.arrow.vector.VarCharVector;
import org.apache.arrow.vector.complex.ListVector;
import org.apache.arrow.vector.types.DateUnit;
import org.apache.arrow.vector.types.FloatingPointPrecision;
import org.apache.arrow.vector.types.TimeUnit;
Expand All @@ -80,6 +100,7 @@
import org.apache.arrow.vector.types.pojo.FieldType;
import org.apache.arrow.vector.types.pojo.Schema;

import java.util.Collections;
import java.util.List;
import java.util.stream.Collectors;

Expand All @@ -100,7 +121,13 @@ private static Field toArrowField(String fieldName, DataType logicalType) {
logicalType.isNullable(),
logicalType.accept(DataTypeToArrowTypeConverter.INSTANCE),
null);
return new Field(fieldName, fieldType, null);
List<Field> children = null;
if (logicalType instanceof ArrayType) {
children =
Collections.singletonList(
toArrowField("element", ((ArrayType) logicalType).getElementType()));
}
return new Field(fieldName, fieldType, children);
}

private static class DataTypeToArrowTypeConverter extends DataTypeDefaultVisitor<ArrowType> {
Expand Down Expand Up @@ -215,6 +242,11 @@ public ArrowType visit(TimestampType timestampType) {
}
}

@Override
public ArrowType visit(ArrayType arrayType) {
return ArrowType.List.INSTANCE;
}

@Override
protected ArrowType defaultMethod(DataType dataType) {
throw new UnsupportedOperationException(
Expand Down Expand Up @@ -270,9 +302,73 @@ public static ArrowFieldWriter<InternalRow> createArrowFieldWriter(
precision = ((TimestampType) dataType).getPrecision();
return ArrowTimestampNtzWriter.forField(vector, precision);
}
} else if (vector instanceof ListVector) {
ArrayType arrayType = (ArrayType) dataType;
ListVector listVector = (ListVector) vector;
ValueVector elementVector = listVector.getDataVector();
ArrowFieldWriter<InternalArray> elementWriter =
createArrayElementWriter(elementVector, arrayType.getElementType());
return ArrowListWriter.forField(listVector, elementWriter);
} else {
throw new UnsupportedOperationException(
String.format("Unsupported type %s.", dataType));
}
}

private static ArrowFieldWriter<InternalArray> createArrayElementWriter(
ValueVector vector, DataType dataType) {
if (vector instanceof TinyIntVector) {
return ArrowArrayTinyIntWriter.forField((TinyIntVector) vector);
} else if (vector instanceof SmallIntVector) {
return ArrowArraySmallIntWriter.forField((SmallIntVector) vector);
} else if (vector instanceof IntVector) {
return ArrowArrayIntWriter.forField((IntVector) vector);
} else if (vector instanceof BigIntVector) {
return ArrowArrayBigIntWriter.forField((BigIntVector) vector);
} else if (vector instanceof BitVector) {
return ArrowArrayBooleanWriter.forField((BitVector) vector);
} else if (vector instanceof Float4Vector) {
return ArrowArrayFloatWriter.forField((Float4Vector) vector);
} else if (vector instanceof Float8Vector) {
return ArrowArrayDoubleWriter.forField((Float8Vector) vector);
} else if (vector instanceof VarCharVector) {
return ArrowArrayVarCharWriter.forField((VarCharVector) vector);
} else if (vector instanceof FixedSizeBinaryVector) {
return ArrowArrayBinaryWriter.forField((FixedSizeBinaryVector) vector);
} else if (vector instanceof VarBinaryVector) {
return ArrowArrayVarBinaryWriter.forField((VarBinaryVector) vector);
} else if (vector instanceof DecimalVector) {
DecimalVector decimalVector = (DecimalVector) vector;
return ArrowArrayDecimalWriter.forField(
decimalVector, getPrecision(decimalVector), decimalVector.getScale());
} else if (vector instanceof DateDayVector) {
return ArrowArrayDateWriter.forField((DateDayVector) vector);
} else if (vector instanceof TimeSecVector
|| vector instanceof TimeMilliVector
|| vector instanceof TimeMicroVector
|| vector instanceof TimeNanoVector) {
return ArrowArrayTimeWriter.forField(vector);
} else if (vector instanceof TimeStampVector
&& ((ArrowType.Timestamp) vector.getField().getType()).getTimezone() == null) {
int precision;
if (dataType instanceof LocalZonedTimestampType) {
precision = ((LocalZonedTimestampType) dataType).getPrecision();
return ArrowArrayTimestampLtzWriter.forField(vector, precision);
} else {
precision = ((TimestampType) dataType).getPrecision();
return ArrowArrayTimestampNtzWriter.forField(vector, precision);
}
} else if (vector instanceof ListVector) {
// Handle nested arrays
ArrayType arrayType = (ArrayType) dataType;
ListVector listVector = (ListVector) vector;
ValueVector elementVector = listVector.getDataVector();
ArrowFieldWriter<InternalArray> elementWriter =
createArrayElementWriter(elementVector, arrayType.getElementType());
return new ArrowNestedListWriter(listVector, elementWriter);
} else {
throw new UnsupportedOperationException(
String.format("Unsupported array element type %s.", dataType));
}
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,65 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package org.apache.fluss.lake.lance.writers;

import org.apache.fluss.row.InternalArray;
import org.apache.fluss.row.InternalRow;

import org.apache.arrow.vector.complex.ListVector;

/** {@link ArrowFieldWriter} for Array. */
public class ArrowListWriter extends ArrowFieldWriter<InternalRow> {

private final ArrowFieldWriter<InternalArray> elementWriter;
private int offset;

public static ArrowListWriter forField(
ListVector listVector, ArrowFieldWriter<InternalArray> elementWriter) {
return new ArrowListWriter(listVector, elementWriter);
}

private ArrowListWriter(ListVector listVector, ArrowFieldWriter<InternalArray> elementWriter) {
super(listVector);
this.elementWriter = elementWriter;
}

@Override
public void doWrite(InternalRow row, int ordinal, boolean handleSafe) {
ListVector listVector = (ListVector) getValueVector();
int rowIndex = getCount();

if (isNullAt(row, ordinal)) {
listVector.setNull(rowIndex);
} else {
InternalArray array = readArray(row, ordinal);
listVector.startNewValue(rowIndex);
for (int i = 0; i < array.size(); i++) {
elementWriter.write(array, i, handleSafe);
}
listVector.endValue(rowIndex, array.size());
}
}

private boolean isNullAt(InternalRow row, int ordinal) {
return row.isNullAt(ordinal);
}

private InternalArray readArray(InternalRow row, int ordinal) {
return row.getArray(ordinal);
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,52 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package org.apache.fluss.lake.lance.writers;

import org.apache.fluss.row.InternalArray;

import org.apache.arrow.vector.complex.ListVector;

/** {@link ArrowFieldWriter} for nested arrays (ARRAY&lt;ARRAY&lt;T&gt;&gt;). */
public class ArrowNestedListWriter extends ArrowFieldWriter<InternalArray> {

private final ArrowFieldWriter<InternalArray> elementWriter;
private int offset;

public ArrowNestedListWriter(
ListVector listVector, ArrowFieldWriter<InternalArray> elementWriter) {
super(listVector);
this.elementWriter = elementWriter;
}

@Override
public void doWrite(InternalArray array, int ordinal, boolean handleSafe) {
ListVector listVector = (ListVector) getValueVector();
int rowIndex = getCount();

if (array.isNullAt(ordinal)) {
listVector.setNull(rowIndex);
} else {
InternalArray nestedArray = array.getArray(ordinal);
listVector.startNewValue(rowIndex);
for (int i = 0; i < nestedArray.size(); i++) {
elementWriter.write(nestedArray, i, handleSafe);
}
listVector.endValue(rowIndex, nestedArray.size());
}
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,55 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package org.apache.fluss.lake.lance.writers.array;

import org.apache.fluss.lake.lance.writers.ArrowFieldWriter;
import org.apache.fluss.row.InternalArray;

import org.apache.arrow.vector.BigIntVector;

/** {@link ArrowFieldWriter} for BigInt elements in arrays. */
public class ArrowArrayBigIntWriter extends ArrowFieldWriter<InternalArray> {

public static ArrowArrayBigIntWriter forField(BigIntVector bigIntVector) {
return new ArrowArrayBigIntWriter(bigIntVector);
}

private ArrowArrayBigIntWriter(BigIntVector bigIntVector) {
super(bigIntVector);
}

@Override
public void doWrite(InternalArray array, int ordinal, boolean handleSafe) {
BigIntVector vector = (BigIntVector) getValueVector();
if (isNullAt(array, ordinal)) {
vector.setNull(getCount());
} else if (handleSafe) {
vector.setSafe(getCount(), readLong(array, ordinal));
} else {
vector.set(getCount(), readLong(array, ordinal));
}
}

private boolean isNullAt(InternalArray array, int ordinal) {
return array.isNullAt(ordinal);
}

private long readLong(InternalArray array, int ordinal) {
return array.getLong(ordinal);
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,61 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package org.apache.fluss.lake.lance.writers.array;

import org.apache.fluss.lake.lance.writers.ArrowFieldWriter;
import org.apache.fluss.row.InternalArray;

import org.apache.arrow.vector.FixedSizeBinaryVector;

/** {@link ArrowFieldWriter} for Binary elements in arrays. */
public class ArrowArrayBinaryWriter extends ArrowFieldWriter<InternalArray> {

private final int byteWidth;

public static ArrowArrayBinaryWriter forField(FixedSizeBinaryVector binaryVector) {
return new ArrowArrayBinaryWriter(binaryVector);
}

private ArrowArrayBinaryWriter(FixedSizeBinaryVector binaryVector) {
super(binaryVector);
this.byteWidth = binaryVector.getByteWidth();
}

@Override
public void doWrite(InternalArray array, int ordinal, boolean handleSafe) {
FixedSizeBinaryVector vector = (FixedSizeBinaryVector) getValueVector();
if (isNullAt(array, ordinal)) {
vector.setNull(getCount());
} else {
byte[] bytes = readBinary(array, ordinal);
if (handleSafe) {
vector.setSafe(getCount(), bytes);
} else {
vector.set(getCount(), bytes);
}
}
}

private boolean isNullAt(InternalArray array, int ordinal) {
return array.isNullAt(ordinal);
}

private byte[] readBinary(InternalArray array, int ordinal) {
return array.getBinary(ordinal, byteWidth);
}
}
Loading