From c0eaee5ed630accbe06bef38348af48ea47c8c52 Mon Sep 17 00:00:00 2001 From: hemanthsavasere Date: Tue, 30 Dec 2025 09:38:56 +0000 Subject: [PATCH] This PR implements comprehensive support for Apache Arrow `ARRAY` data types in the Lance data lake integration, enabling Fluss to write array columns (e.g., `ARRAY`, `ARRAY`, nested arrays) to Lance datasets. --- .../lake/lance/utils/LanceArrowUtils.java | 98 ++++- .../lake/lance/writers/ArrowListWriter.java | 65 +++ .../lance/writers/ArrowNestedListWriter.java | 52 +++ .../writers/array/ArrowArrayBigIntWriter.java | 55 +++ .../writers/array/ArrowArrayBinaryWriter.java | 61 +++ .../array/ArrowArrayBooleanWriter.java | 55 +++ .../writers/array/ArrowArrayDateWriter.java | 55 +++ .../array/ArrowArrayDecimalWriter.java | 67 +++ .../writers/array/ArrowArrayDoubleWriter.java | 55 +++ .../writers/array/ArrowArrayFloatWriter.java | 55 +++ .../writers/array/ArrowArrayIntWriter.java | 55 +++ .../array/ArrowArraySmallIntWriter.java | 55 +++ .../writers/array/ArrowArrayTimeWriter.java | 91 ++++ .../array/ArrowArrayTimestampLtzWriter.java | 100 +++++ .../array/ArrowArrayTimestampNtzWriter.java | 98 +++++ .../array/ArrowArrayTinyIntWriter.java | 55 +++ .../array/ArrowArrayVarBinaryWriter.java | 58 +++ .../array/ArrowArrayVarCharWriter.java | 58 +++ .../lake/lance/tiering/LanceTieringTest.java | 400 ++++++++++++++++++ .../lance/writers/ArrowListWriterTest.java | 151 +++++++ 20 files changed, 1738 insertions(+), 1 deletion(-) create mode 100644 fluss-lake/fluss-lake-lance/src/main/java/org/apache/fluss/lake/lance/writers/ArrowListWriter.java create mode 100644 fluss-lake/fluss-lake-lance/src/main/java/org/apache/fluss/lake/lance/writers/ArrowNestedListWriter.java create mode 100644 fluss-lake/fluss-lake-lance/src/main/java/org/apache/fluss/lake/lance/writers/array/ArrowArrayBigIntWriter.java create mode 100644 fluss-lake/fluss-lake-lance/src/main/java/org/apache/fluss/lake/lance/writers/array/ArrowArrayBinaryWriter.java create mode 100644 fluss-lake/fluss-lake-lance/src/main/java/org/apache/fluss/lake/lance/writers/array/ArrowArrayBooleanWriter.java create mode 100644 fluss-lake/fluss-lake-lance/src/main/java/org/apache/fluss/lake/lance/writers/array/ArrowArrayDateWriter.java create mode 100644 fluss-lake/fluss-lake-lance/src/main/java/org/apache/fluss/lake/lance/writers/array/ArrowArrayDecimalWriter.java create mode 100644 fluss-lake/fluss-lake-lance/src/main/java/org/apache/fluss/lake/lance/writers/array/ArrowArrayDoubleWriter.java create mode 100644 fluss-lake/fluss-lake-lance/src/main/java/org/apache/fluss/lake/lance/writers/array/ArrowArrayFloatWriter.java create mode 100644 fluss-lake/fluss-lake-lance/src/main/java/org/apache/fluss/lake/lance/writers/array/ArrowArrayIntWriter.java create mode 100644 fluss-lake/fluss-lake-lance/src/main/java/org/apache/fluss/lake/lance/writers/array/ArrowArraySmallIntWriter.java create mode 100644 fluss-lake/fluss-lake-lance/src/main/java/org/apache/fluss/lake/lance/writers/array/ArrowArrayTimeWriter.java create mode 100644 fluss-lake/fluss-lake-lance/src/main/java/org/apache/fluss/lake/lance/writers/array/ArrowArrayTimestampLtzWriter.java create mode 100644 fluss-lake/fluss-lake-lance/src/main/java/org/apache/fluss/lake/lance/writers/array/ArrowArrayTimestampNtzWriter.java create mode 100644 fluss-lake/fluss-lake-lance/src/main/java/org/apache/fluss/lake/lance/writers/array/ArrowArrayTinyIntWriter.java create mode 100644 fluss-lake/fluss-lake-lance/src/main/java/org/apache/fluss/lake/lance/writers/array/ArrowArrayVarBinaryWriter.java create mode 100644 fluss-lake/fluss-lake-lance/src/main/java/org/apache/fluss/lake/lance/writers/array/ArrowArrayVarCharWriter.java create mode 100644 fluss-lake/fluss-lake-lance/src/test/java/org/apache/fluss/lake/lance/writers/ArrowListWriterTest.java diff --git a/fluss-lake/fluss-lake-lance/src/main/java/org/apache/fluss/lake/lance/utils/LanceArrowUtils.java b/fluss-lake/fluss-lake-lance/src/main/java/org/apache/fluss/lake/lance/utils/LanceArrowUtils.java index 6b4da804f3..d23c19c79e 100644 --- a/fluss-lake/fluss-lake-lance/src/main/java/org/apache/fluss/lake/lance/utils/LanceArrowUtils.java +++ b/fluss-lake/fluss-lake-lance/src/main/java/org/apache/fluss/lake/lance/utils/LanceArrowUtils.java @@ -26,6 +26,8 @@ import org.apache.fluss.lake.lance.writers.ArrowFieldWriter; import org.apache.fluss.lake.lance.writers.ArrowFloatWriter; import org.apache.fluss.lake.lance.writers.ArrowIntWriter; +import org.apache.fluss.lake.lance.writers.ArrowListWriter; +import org.apache.fluss.lake.lance.writers.ArrowNestedListWriter; import org.apache.fluss.lake.lance.writers.ArrowSmallIntWriter; import org.apache.fluss.lake.lance.writers.ArrowTimeWriter; import org.apache.fluss.lake.lance.writers.ArrowTimestampLtzWriter; @@ -33,7 +35,24 @@ import org.apache.fluss.lake.lance.writers.ArrowTinyIntWriter; import org.apache.fluss.lake.lance.writers.ArrowVarBinaryWriter; import org.apache.fluss.lake.lance.writers.ArrowVarCharWriter; +import org.apache.fluss.lake.lance.writers.array.ArrowArrayBigIntWriter; +import org.apache.fluss.lake.lance.writers.array.ArrowArrayBinaryWriter; +import org.apache.fluss.lake.lance.writers.array.ArrowArrayBooleanWriter; +import org.apache.fluss.lake.lance.writers.array.ArrowArrayDateWriter; +import org.apache.fluss.lake.lance.writers.array.ArrowArrayDecimalWriter; +import org.apache.fluss.lake.lance.writers.array.ArrowArrayDoubleWriter; +import org.apache.fluss.lake.lance.writers.array.ArrowArrayFloatWriter; +import org.apache.fluss.lake.lance.writers.array.ArrowArrayIntWriter; +import org.apache.fluss.lake.lance.writers.array.ArrowArraySmallIntWriter; +import org.apache.fluss.lake.lance.writers.array.ArrowArrayTimeWriter; +import org.apache.fluss.lake.lance.writers.array.ArrowArrayTimestampLtzWriter; +import org.apache.fluss.lake.lance.writers.array.ArrowArrayTimestampNtzWriter; +import org.apache.fluss.lake.lance.writers.array.ArrowArrayTinyIntWriter; +import org.apache.fluss.lake.lance.writers.array.ArrowArrayVarBinaryWriter; +import org.apache.fluss.lake.lance.writers.array.ArrowArrayVarCharWriter; +import org.apache.fluss.row.InternalArray; import org.apache.fluss.row.InternalRow; +import org.apache.fluss.types.ArrayType; import org.apache.fluss.types.BigIntType; import org.apache.fluss.types.BinaryType; import org.apache.fluss.types.BooleanType; @@ -72,6 +91,7 @@ import org.apache.arrow.vector.ValueVector; import org.apache.arrow.vector.VarBinaryVector; import org.apache.arrow.vector.VarCharVector; +import org.apache.arrow.vector.complex.ListVector; import org.apache.arrow.vector.types.DateUnit; import org.apache.arrow.vector.types.FloatingPointPrecision; import org.apache.arrow.vector.types.TimeUnit; @@ -80,6 +100,7 @@ import org.apache.arrow.vector.types.pojo.FieldType; import org.apache.arrow.vector.types.pojo.Schema; +import java.util.Collections; import java.util.List; import java.util.stream.Collectors; @@ -100,7 +121,13 @@ private static Field toArrowField(String fieldName, DataType logicalType) { logicalType.isNullable(), logicalType.accept(DataTypeToArrowTypeConverter.INSTANCE), null); - return new Field(fieldName, fieldType, null); + List children = null; + if (logicalType instanceof ArrayType) { + children = + Collections.singletonList( + toArrowField("element", ((ArrayType) logicalType).getElementType())); + } + return new Field(fieldName, fieldType, children); } private static class DataTypeToArrowTypeConverter extends DataTypeDefaultVisitor { @@ -215,6 +242,11 @@ public ArrowType visit(TimestampType timestampType) { } } + @Override + public ArrowType visit(ArrayType arrayType) { + return ArrowType.List.INSTANCE; + } + @Override protected ArrowType defaultMethod(DataType dataType) { throw new UnsupportedOperationException( @@ -270,9 +302,73 @@ public static ArrowFieldWriter createArrowFieldWriter( precision = ((TimestampType) dataType).getPrecision(); return ArrowTimestampNtzWriter.forField(vector, precision); } + } else if (vector instanceof ListVector) { + ArrayType arrayType = (ArrayType) dataType; + ListVector listVector = (ListVector) vector; + ValueVector elementVector = listVector.getDataVector(); + ArrowFieldWriter elementWriter = + createArrayElementWriter(elementVector, arrayType.getElementType()); + return ArrowListWriter.forField(listVector, elementWriter); } else { throw new UnsupportedOperationException( String.format("Unsupported type %s.", dataType)); } } + + private static ArrowFieldWriter createArrayElementWriter( + ValueVector vector, DataType dataType) { + if (vector instanceof TinyIntVector) { + return ArrowArrayTinyIntWriter.forField((TinyIntVector) vector); + } else if (vector instanceof SmallIntVector) { + return ArrowArraySmallIntWriter.forField((SmallIntVector) vector); + } else if (vector instanceof IntVector) { + return ArrowArrayIntWriter.forField((IntVector) vector); + } else if (vector instanceof BigIntVector) { + return ArrowArrayBigIntWriter.forField((BigIntVector) vector); + } else if (vector instanceof BitVector) { + return ArrowArrayBooleanWriter.forField((BitVector) vector); + } else if (vector instanceof Float4Vector) { + return ArrowArrayFloatWriter.forField((Float4Vector) vector); + } else if (vector instanceof Float8Vector) { + return ArrowArrayDoubleWriter.forField((Float8Vector) vector); + } else if (vector instanceof VarCharVector) { + return ArrowArrayVarCharWriter.forField((VarCharVector) vector); + } else if (vector instanceof FixedSizeBinaryVector) { + return ArrowArrayBinaryWriter.forField((FixedSizeBinaryVector) vector); + } else if (vector instanceof VarBinaryVector) { + return ArrowArrayVarBinaryWriter.forField((VarBinaryVector) vector); + } else if (vector instanceof DecimalVector) { + DecimalVector decimalVector = (DecimalVector) vector; + return ArrowArrayDecimalWriter.forField( + decimalVector, getPrecision(decimalVector), decimalVector.getScale()); + } else if (vector instanceof DateDayVector) { + return ArrowArrayDateWriter.forField((DateDayVector) vector); + } else if (vector instanceof TimeSecVector + || vector instanceof TimeMilliVector + || vector instanceof TimeMicroVector + || vector instanceof TimeNanoVector) { + return ArrowArrayTimeWriter.forField(vector); + } else if (vector instanceof TimeStampVector + && ((ArrowType.Timestamp) vector.getField().getType()).getTimezone() == null) { + int precision; + if (dataType instanceof LocalZonedTimestampType) { + precision = ((LocalZonedTimestampType) dataType).getPrecision(); + return ArrowArrayTimestampLtzWriter.forField(vector, precision); + } else { + precision = ((TimestampType) dataType).getPrecision(); + return ArrowArrayTimestampNtzWriter.forField(vector, precision); + } + } else if (vector instanceof ListVector) { + // Handle nested arrays + ArrayType arrayType = (ArrayType) dataType; + ListVector listVector = (ListVector) vector; + ValueVector elementVector = listVector.getDataVector(); + ArrowFieldWriter elementWriter = + createArrayElementWriter(elementVector, arrayType.getElementType()); + return new ArrowNestedListWriter(listVector, elementWriter); + } else { + throw new UnsupportedOperationException( + String.format("Unsupported array element type %s.", dataType)); + } + } } diff --git a/fluss-lake/fluss-lake-lance/src/main/java/org/apache/fluss/lake/lance/writers/ArrowListWriter.java b/fluss-lake/fluss-lake-lance/src/main/java/org/apache/fluss/lake/lance/writers/ArrowListWriter.java new file mode 100644 index 0000000000..cf81c7f7bc --- /dev/null +++ b/fluss-lake/fluss-lake-lance/src/main/java/org/apache/fluss/lake/lance/writers/ArrowListWriter.java @@ -0,0 +1,65 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.fluss.lake.lance.writers; + +import org.apache.fluss.row.InternalArray; +import org.apache.fluss.row.InternalRow; + +import org.apache.arrow.vector.complex.ListVector; + +/** {@link ArrowFieldWriter} for Array. */ +public class ArrowListWriter extends ArrowFieldWriter { + + private final ArrowFieldWriter elementWriter; + private int offset; + + public static ArrowListWriter forField( + ListVector listVector, ArrowFieldWriter elementWriter) { + return new ArrowListWriter(listVector, elementWriter); + } + + private ArrowListWriter(ListVector listVector, ArrowFieldWriter elementWriter) { + super(listVector); + this.elementWriter = elementWriter; + } + + @Override + public void doWrite(InternalRow row, int ordinal, boolean handleSafe) { + ListVector listVector = (ListVector) getValueVector(); + int rowIndex = getCount(); + + if (isNullAt(row, ordinal)) { + listVector.setNull(rowIndex); + } else { + InternalArray array = readArray(row, ordinal); + listVector.startNewValue(rowIndex); + for (int i = 0; i < array.size(); i++) { + elementWriter.write(array, i, handleSafe); + } + listVector.endValue(rowIndex, array.size()); + } + } + + private boolean isNullAt(InternalRow row, int ordinal) { + return row.isNullAt(ordinal); + } + + private InternalArray readArray(InternalRow row, int ordinal) { + return row.getArray(ordinal); + } +} diff --git a/fluss-lake/fluss-lake-lance/src/main/java/org/apache/fluss/lake/lance/writers/ArrowNestedListWriter.java b/fluss-lake/fluss-lake-lance/src/main/java/org/apache/fluss/lake/lance/writers/ArrowNestedListWriter.java new file mode 100644 index 0000000000..ac99d52d03 --- /dev/null +++ b/fluss-lake/fluss-lake-lance/src/main/java/org/apache/fluss/lake/lance/writers/ArrowNestedListWriter.java @@ -0,0 +1,52 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.fluss.lake.lance.writers; + +import org.apache.fluss.row.InternalArray; + +import org.apache.arrow.vector.complex.ListVector; + +/** {@link ArrowFieldWriter} for nested arrays (ARRAY<ARRAY<T>>). */ +public class ArrowNestedListWriter extends ArrowFieldWriter { + + private final ArrowFieldWriter elementWriter; + private int offset; + + public ArrowNestedListWriter( + ListVector listVector, ArrowFieldWriter elementWriter) { + super(listVector); + this.elementWriter = elementWriter; + } + + @Override + public void doWrite(InternalArray array, int ordinal, boolean handleSafe) { + ListVector listVector = (ListVector) getValueVector(); + int rowIndex = getCount(); + + if (array.isNullAt(ordinal)) { + listVector.setNull(rowIndex); + } else { + InternalArray nestedArray = array.getArray(ordinal); + listVector.startNewValue(rowIndex); + for (int i = 0; i < nestedArray.size(); i++) { + elementWriter.write(nestedArray, i, handleSafe); + } + listVector.endValue(rowIndex, nestedArray.size()); + } + } +} diff --git a/fluss-lake/fluss-lake-lance/src/main/java/org/apache/fluss/lake/lance/writers/array/ArrowArrayBigIntWriter.java b/fluss-lake/fluss-lake-lance/src/main/java/org/apache/fluss/lake/lance/writers/array/ArrowArrayBigIntWriter.java new file mode 100644 index 0000000000..4684129aab --- /dev/null +++ b/fluss-lake/fluss-lake-lance/src/main/java/org/apache/fluss/lake/lance/writers/array/ArrowArrayBigIntWriter.java @@ -0,0 +1,55 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.fluss.lake.lance.writers.array; + +import org.apache.fluss.lake.lance.writers.ArrowFieldWriter; +import org.apache.fluss.row.InternalArray; + +import org.apache.arrow.vector.BigIntVector; + +/** {@link ArrowFieldWriter} for BigInt elements in arrays. */ +public class ArrowArrayBigIntWriter extends ArrowFieldWriter { + + public static ArrowArrayBigIntWriter forField(BigIntVector bigIntVector) { + return new ArrowArrayBigIntWriter(bigIntVector); + } + + private ArrowArrayBigIntWriter(BigIntVector bigIntVector) { + super(bigIntVector); + } + + @Override + public void doWrite(InternalArray array, int ordinal, boolean handleSafe) { + BigIntVector vector = (BigIntVector) getValueVector(); + if (isNullAt(array, ordinal)) { + vector.setNull(getCount()); + } else if (handleSafe) { + vector.setSafe(getCount(), readLong(array, ordinal)); + } else { + vector.set(getCount(), readLong(array, ordinal)); + } + } + + private boolean isNullAt(InternalArray array, int ordinal) { + return array.isNullAt(ordinal); + } + + private long readLong(InternalArray array, int ordinal) { + return array.getLong(ordinal); + } +} diff --git a/fluss-lake/fluss-lake-lance/src/main/java/org/apache/fluss/lake/lance/writers/array/ArrowArrayBinaryWriter.java b/fluss-lake/fluss-lake-lance/src/main/java/org/apache/fluss/lake/lance/writers/array/ArrowArrayBinaryWriter.java new file mode 100644 index 0000000000..d694ac0368 --- /dev/null +++ b/fluss-lake/fluss-lake-lance/src/main/java/org/apache/fluss/lake/lance/writers/array/ArrowArrayBinaryWriter.java @@ -0,0 +1,61 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.fluss.lake.lance.writers.array; + +import org.apache.fluss.lake.lance.writers.ArrowFieldWriter; +import org.apache.fluss.row.InternalArray; + +import org.apache.arrow.vector.FixedSizeBinaryVector; + +/** {@link ArrowFieldWriter} for Binary elements in arrays. */ +public class ArrowArrayBinaryWriter extends ArrowFieldWriter { + + private final int byteWidth; + + public static ArrowArrayBinaryWriter forField(FixedSizeBinaryVector binaryVector) { + return new ArrowArrayBinaryWriter(binaryVector); + } + + private ArrowArrayBinaryWriter(FixedSizeBinaryVector binaryVector) { + super(binaryVector); + this.byteWidth = binaryVector.getByteWidth(); + } + + @Override + public void doWrite(InternalArray array, int ordinal, boolean handleSafe) { + FixedSizeBinaryVector vector = (FixedSizeBinaryVector) getValueVector(); + if (isNullAt(array, ordinal)) { + vector.setNull(getCount()); + } else { + byte[] bytes = readBinary(array, ordinal); + if (handleSafe) { + vector.setSafe(getCount(), bytes); + } else { + vector.set(getCount(), bytes); + } + } + } + + private boolean isNullAt(InternalArray array, int ordinal) { + return array.isNullAt(ordinal); + } + + private byte[] readBinary(InternalArray array, int ordinal) { + return array.getBinary(ordinal, byteWidth); + } +} diff --git a/fluss-lake/fluss-lake-lance/src/main/java/org/apache/fluss/lake/lance/writers/array/ArrowArrayBooleanWriter.java b/fluss-lake/fluss-lake-lance/src/main/java/org/apache/fluss/lake/lance/writers/array/ArrowArrayBooleanWriter.java new file mode 100644 index 0000000000..2a0202933d --- /dev/null +++ b/fluss-lake/fluss-lake-lance/src/main/java/org/apache/fluss/lake/lance/writers/array/ArrowArrayBooleanWriter.java @@ -0,0 +1,55 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.fluss.lake.lance.writers.array; + +import org.apache.fluss.lake.lance.writers.ArrowFieldWriter; +import org.apache.fluss.row.InternalArray; + +import org.apache.arrow.vector.BitVector; + +/** {@link ArrowFieldWriter} for Boolean elements in arrays. */ +public class ArrowArrayBooleanWriter extends ArrowFieldWriter { + + public static ArrowArrayBooleanWriter forField(BitVector bitVector) { + return new ArrowArrayBooleanWriter(bitVector); + } + + private ArrowArrayBooleanWriter(BitVector bitVector) { + super(bitVector); + } + + @Override + public void doWrite(InternalArray array, int ordinal, boolean handleSafe) { + BitVector vector = (BitVector) getValueVector(); + if (isNullAt(array, ordinal)) { + vector.setNull(getCount()); + } else if (handleSafe) { + vector.setSafe(getCount(), readBoolean(array, ordinal) ? 1 : 0); + } else { + vector.set(getCount(), readBoolean(array, ordinal) ? 1 : 0); + } + } + + private boolean isNullAt(InternalArray array, int ordinal) { + return array.isNullAt(ordinal); + } + + private boolean readBoolean(InternalArray array, int ordinal) { + return array.getBoolean(ordinal); + } +} diff --git a/fluss-lake/fluss-lake-lance/src/main/java/org/apache/fluss/lake/lance/writers/array/ArrowArrayDateWriter.java b/fluss-lake/fluss-lake-lance/src/main/java/org/apache/fluss/lake/lance/writers/array/ArrowArrayDateWriter.java new file mode 100644 index 0000000000..107249a36e --- /dev/null +++ b/fluss-lake/fluss-lake-lance/src/main/java/org/apache/fluss/lake/lance/writers/array/ArrowArrayDateWriter.java @@ -0,0 +1,55 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.fluss.lake.lance.writers.array; + +import org.apache.fluss.lake.lance.writers.ArrowFieldWriter; +import org.apache.fluss.row.InternalArray; + +import org.apache.arrow.vector.DateDayVector; + +/** {@link ArrowFieldWriter} for Date elements in arrays. */ +public class ArrowArrayDateWriter extends ArrowFieldWriter { + + public static ArrowArrayDateWriter forField(DateDayVector dateDayVector) { + return new ArrowArrayDateWriter(dateDayVector); + } + + private ArrowArrayDateWriter(DateDayVector dateDayVector) { + super(dateDayVector); + } + + @Override + public void doWrite(InternalArray array, int ordinal, boolean handleSafe) { + DateDayVector vector = (DateDayVector) getValueVector(); + if (isNullAt(array, ordinal)) { + vector.setNull(getCount()); + } else if (handleSafe) { + vector.setSafe(getCount(), readInt(array, ordinal)); + } else { + vector.set(getCount(), readInt(array, ordinal)); + } + } + + private boolean isNullAt(InternalArray array, int ordinal) { + return array.isNullAt(ordinal); + } + + private int readInt(InternalArray array, int ordinal) { + return array.getInt(ordinal); + } +} diff --git a/fluss-lake/fluss-lake-lance/src/main/java/org/apache/fluss/lake/lance/writers/array/ArrowArrayDecimalWriter.java b/fluss-lake/fluss-lake-lance/src/main/java/org/apache/fluss/lake/lance/writers/array/ArrowArrayDecimalWriter.java new file mode 100644 index 0000000000..d7fa76a300 --- /dev/null +++ b/fluss-lake/fluss-lake-lance/src/main/java/org/apache/fluss/lake/lance/writers/array/ArrowArrayDecimalWriter.java @@ -0,0 +1,67 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.fluss.lake.lance.writers.array; + +import org.apache.fluss.lake.lance.writers.ArrowFieldWriter; +import org.apache.fluss.row.InternalArray; + +import org.apache.arrow.vector.DecimalVector; + +import java.math.BigDecimal; + +/** {@link ArrowFieldWriter} for Decimal elements in arrays. */ +public class ArrowArrayDecimalWriter extends ArrowFieldWriter { + + private final int precision; + private final int scale; + + public static ArrowArrayDecimalWriter forField( + DecimalVector decimalVector, int precision, int scale) { + return new ArrowArrayDecimalWriter(decimalVector, precision, scale); + } + + private ArrowArrayDecimalWriter(DecimalVector decimalVector, int precision, int scale) { + super(decimalVector); + this.precision = precision; + this.scale = scale; + } + + @Override + public void doWrite(InternalArray array, int ordinal, boolean handleSafe) { + DecimalVector vector = (DecimalVector) getValueVector(); + if (isNullAt(array, ordinal)) { + vector.setNull(getCount()); + } else { + BigDecimal decimal = readDecimal(array, ordinal, precision, scale).toBigDecimal(); + if (handleSafe) { + vector.setSafe(getCount(), decimal); + } else { + vector.set(getCount(), decimal); + } + } + } + + private boolean isNullAt(InternalArray array, int ordinal) { + return array.isNullAt(ordinal); + } + + private org.apache.fluss.row.Decimal readDecimal( + InternalArray array, int ordinal, int precision, int scale) { + return array.getDecimal(ordinal, precision, scale); + } +} diff --git a/fluss-lake/fluss-lake-lance/src/main/java/org/apache/fluss/lake/lance/writers/array/ArrowArrayDoubleWriter.java b/fluss-lake/fluss-lake-lance/src/main/java/org/apache/fluss/lake/lance/writers/array/ArrowArrayDoubleWriter.java new file mode 100644 index 0000000000..610d6109e7 --- /dev/null +++ b/fluss-lake/fluss-lake-lance/src/main/java/org/apache/fluss/lake/lance/writers/array/ArrowArrayDoubleWriter.java @@ -0,0 +1,55 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.fluss.lake.lance.writers.array; + +import org.apache.fluss.lake.lance.writers.ArrowFieldWriter; +import org.apache.fluss.row.InternalArray; + +import org.apache.arrow.vector.Float8Vector; + +/** {@link ArrowFieldWriter} for Double elements in arrays. */ +public class ArrowArrayDoubleWriter extends ArrowFieldWriter { + + public static ArrowArrayDoubleWriter forField(Float8Vector doubleVector) { + return new ArrowArrayDoubleWriter(doubleVector); + } + + private ArrowArrayDoubleWriter(Float8Vector doubleVector) { + super(doubleVector); + } + + @Override + public void doWrite(InternalArray array, int ordinal, boolean handleSafe) { + Float8Vector vector = (Float8Vector) getValueVector(); + if (isNullAt(array, ordinal)) { + vector.setNull(getCount()); + } else if (handleSafe) { + vector.setSafe(getCount(), readDouble(array, ordinal)); + } else { + vector.set(getCount(), readDouble(array, ordinal)); + } + } + + private boolean isNullAt(InternalArray array, int ordinal) { + return array.isNullAt(ordinal); + } + + private double readDouble(InternalArray array, int ordinal) { + return array.getDouble(ordinal); + } +} diff --git a/fluss-lake/fluss-lake-lance/src/main/java/org/apache/fluss/lake/lance/writers/array/ArrowArrayFloatWriter.java b/fluss-lake/fluss-lake-lance/src/main/java/org/apache/fluss/lake/lance/writers/array/ArrowArrayFloatWriter.java new file mode 100644 index 0000000000..3f8761fe21 --- /dev/null +++ b/fluss-lake/fluss-lake-lance/src/main/java/org/apache/fluss/lake/lance/writers/array/ArrowArrayFloatWriter.java @@ -0,0 +1,55 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.fluss.lake.lance.writers.array; + +import org.apache.fluss.lake.lance.writers.ArrowFieldWriter; +import org.apache.fluss.row.InternalArray; + +import org.apache.arrow.vector.Float4Vector; + +/** {@link ArrowFieldWriter} for Float elements in arrays. */ +public class ArrowArrayFloatWriter extends ArrowFieldWriter { + + public static ArrowArrayFloatWriter forField(Float4Vector floatVector) { + return new ArrowArrayFloatWriter(floatVector); + } + + private ArrowArrayFloatWriter(Float4Vector floatVector) { + super(floatVector); + } + + @Override + public void doWrite(InternalArray array, int ordinal, boolean handleSafe) { + Float4Vector vector = (Float4Vector) getValueVector(); + if (isNullAt(array, ordinal)) { + vector.setNull(getCount()); + } else if (handleSafe) { + vector.setSafe(getCount(), readFloat(array, ordinal)); + } else { + vector.set(getCount(), readFloat(array, ordinal)); + } + } + + private boolean isNullAt(InternalArray array, int ordinal) { + return array.isNullAt(ordinal); + } + + private float readFloat(InternalArray array, int ordinal) { + return array.getFloat(ordinal); + } +} diff --git a/fluss-lake/fluss-lake-lance/src/main/java/org/apache/fluss/lake/lance/writers/array/ArrowArrayIntWriter.java b/fluss-lake/fluss-lake-lance/src/main/java/org/apache/fluss/lake/lance/writers/array/ArrowArrayIntWriter.java new file mode 100644 index 0000000000..02dbe14891 --- /dev/null +++ b/fluss-lake/fluss-lake-lance/src/main/java/org/apache/fluss/lake/lance/writers/array/ArrowArrayIntWriter.java @@ -0,0 +1,55 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.fluss.lake.lance.writers.array; + +import org.apache.fluss.lake.lance.writers.ArrowFieldWriter; +import org.apache.fluss.row.InternalArray; + +import org.apache.arrow.vector.IntVector; + +/** {@link ArrowFieldWriter} for Int elements in arrays. */ +public class ArrowArrayIntWriter extends ArrowFieldWriter { + + public static ArrowArrayIntWriter forField(IntVector intVector) { + return new ArrowArrayIntWriter(intVector); + } + + private ArrowArrayIntWriter(IntVector intVector) { + super(intVector); + } + + @Override + public void doWrite(InternalArray array, int ordinal, boolean handleSafe) { + IntVector vector = (IntVector) getValueVector(); + if (isNullAt(array, ordinal)) { + vector.setNull(getCount()); + } else if (handleSafe) { + vector.setSafe(getCount(), readInt(array, ordinal)); + } else { + vector.set(getCount(), readInt(array, ordinal)); + } + } + + private boolean isNullAt(InternalArray array, int ordinal) { + return array.isNullAt(ordinal); + } + + private int readInt(InternalArray array, int ordinal) { + return array.getInt(ordinal); + } +} diff --git a/fluss-lake/fluss-lake-lance/src/main/java/org/apache/fluss/lake/lance/writers/array/ArrowArraySmallIntWriter.java b/fluss-lake/fluss-lake-lance/src/main/java/org/apache/fluss/lake/lance/writers/array/ArrowArraySmallIntWriter.java new file mode 100644 index 0000000000..9e1f34d9f3 --- /dev/null +++ b/fluss-lake/fluss-lake-lance/src/main/java/org/apache/fluss/lake/lance/writers/array/ArrowArraySmallIntWriter.java @@ -0,0 +1,55 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.fluss.lake.lance.writers.array; + +import org.apache.fluss.lake.lance.writers.ArrowFieldWriter; +import org.apache.fluss.row.InternalArray; + +import org.apache.arrow.vector.SmallIntVector; + +/** {@link ArrowFieldWriter} for SmallInt elements in arrays. */ +public class ArrowArraySmallIntWriter extends ArrowFieldWriter { + + public static ArrowArraySmallIntWriter forField(SmallIntVector smallIntVector) { + return new ArrowArraySmallIntWriter(smallIntVector); + } + + private ArrowArraySmallIntWriter(SmallIntVector smallIntVector) { + super(smallIntVector); + } + + @Override + public void doWrite(InternalArray array, int ordinal, boolean handleSafe) { + SmallIntVector vector = (SmallIntVector) getValueVector(); + if (isNullAt(array, ordinal)) { + vector.setNull(getCount()); + } else if (handleSafe) { + vector.setSafe(getCount(), readShort(array, ordinal)); + } else { + vector.set(getCount(), readShort(array, ordinal)); + } + } + + private boolean isNullAt(InternalArray array, int ordinal) { + return array.isNullAt(ordinal); + } + + private short readShort(InternalArray array, int ordinal) { + return array.getShort(ordinal); + } +} diff --git a/fluss-lake/fluss-lake-lance/src/main/java/org/apache/fluss/lake/lance/writers/array/ArrowArrayTimeWriter.java b/fluss-lake/fluss-lake-lance/src/main/java/org/apache/fluss/lake/lance/writers/array/ArrowArrayTimeWriter.java new file mode 100644 index 0000000000..1d64cec7f2 --- /dev/null +++ b/fluss-lake/fluss-lake-lance/src/main/java/org/apache/fluss/lake/lance/writers/array/ArrowArrayTimeWriter.java @@ -0,0 +1,91 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.fluss.lake.lance.writers.array; + +import org.apache.fluss.lake.lance.writers.ArrowFieldWriter; +import org.apache.fluss.row.InternalArray; + +import org.apache.arrow.vector.BaseFixedWidthVector; +import org.apache.arrow.vector.TimeMicroVector; +import org.apache.arrow.vector.TimeMilliVector; +import org.apache.arrow.vector.TimeNanoVector; +import org.apache.arrow.vector.TimeSecVector; +import org.apache.arrow.vector.ValueVector; + +import static org.apache.fluss.utils.Preconditions.checkState; + +/** {@link ArrowFieldWriter} for Time elements in arrays. */ +public class ArrowArrayTimeWriter extends ArrowFieldWriter { + + public static ArrowArrayTimeWriter forField(ValueVector valueVector) { + return new ArrowArrayTimeWriter(valueVector); + } + + private ArrowArrayTimeWriter(ValueVector valueVector) { + super(valueVector); + checkState( + valueVector instanceof TimeSecVector + || valueVector instanceof TimeMilliVector + || valueVector instanceof TimeMicroVector + || valueVector instanceof TimeNanoVector); + } + + @Override + public void doWrite(InternalArray array, int ordinal, boolean handleSafe) { + ValueVector valueVector = getValueVector(); + if (isNullAt(array, ordinal)) { + ((BaseFixedWidthVector) valueVector).setNull(getCount()); + } else if (valueVector instanceof TimeSecVector) { + int sec = readTime(array, ordinal) / 1000; + if (handleSafe) { + ((TimeSecVector) valueVector).setSafe(getCount(), sec); + } else { + ((TimeSecVector) valueVector).set(getCount(), sec); + } + } else if (valueVector instanceof TimeMilliVector) { + int ms = readTime(array, ordinal); + if (handleSafe) { + ((TimeMilliVector) valueVector).setSafe(getCount(), ms); + } else { + ((TimeMilliVector) valueVector).set(getCount(), ms); + } + } else if (valueVector instanceof TimeMicroVector) { + long microSec = readTime(array, ordinal) * 1000L; + if (handleSafe) { + ((TimeMicroVector) valueVector).setSafe(getCount(), microSec); + } else { + ((TimeMicroVector) valueVector).set(getCount(), microSec); + } + } else { + long nanoSec = readTime(array, ordinal) * 1000000L; + if (handleSafe) { + ((TimeNanoVector) valueVector).setSafe(getCount(), nanoSec); + } else { + ((TimeNanoVector) valueVector).set(getCount(), nanoSec); + } + } + } + + private boolean isNullAt(InternalArray array, int ordinal) { + return array.isNullAt(ordinal); + } + + private int readTime(InternalArray array, int ordinal) { + return array.getInt(ordinal); + } +} diff --git a/fluss-lake/fluss-lake-lance/src/main/java/org/apache/fluss/lake/lance/writers/array/ArrowArrayTimestampLtzWriter.java b/fluss-lake/fluss-lake-lance/src/main/java/org/apache/fluss/lake/lance/writers/array/ArrowArrayTimestampLtzWriter.java new file mode 100644 index 0000000000..22c35db0fc --- /dev/null +++ b/fluss-lake/fluss-lake-lance/src/main/java/org/apache/fluss/lake/lance/writers/array/ArrowArrayTimestampLtzWriter.java @@ -0,0 +1,100 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.fluss.lake.lance.writers.array; + +import org.apache.fluss.lake.lance.writers.ArrowFieldWriter; +import org.apache.fluss.row.InternalArray; +import org.apache.fluss.row.TimestampLtz; + +import org.apache.arrow.vector.TimeStampMicroVector; +import org.apache.arrow.vector.TimeStampMilliVector; +import org.apache.arrow.vector.TimeStampSecVector; +import org.apache.arrow.vector.TimeStampVector; +import org.apache.arrow.vector.ValueVector; +import org.apache.arrow.vector.types.pojo.ArrowType; + +import static org.apache.fluss.utils.Preconditions.checkState; + +/** {@link ArrowFieldWriter} for TimestampLtz elements in arrays. */ +public class ArrowArrayTimestampLtzWriter extends ArrowFieldWriter { + public static ArrowArrayTimestampLtzWriter forField(ValueVector valueVector, int precision) { + return new ArrowArrayTimestampLtzWriter(valueVector, precision); + } + + private final int precision; + + private ArrowArrayTimestampLtzWriter(ValueVector valueVector, int precision) { + super(valueVector); + checkState( + valueVector instanceof TimeStampVector + && ((ArrowType.Timestamp) valueVector.getField().getType()).getTimezone() + == null); + this.precision = precision; + } + + @Override + public void doWrite(InternalArray array, int ordinal, boolean handleSafe) { + TimeStampVector vector = (TimeStampVector) getValueVector(); + if (isNullAt(array, ordinal)) { + vector.setNull(getCount()); + } else { + TimestampLtz timestamp = readTimestamp(array, ordinal); + if (vector instanceof TimeStampSecVector) { + long sec = timestamp.getEpochMillisecond() / 1000; + if (handleSafe) { + vector.setSafe(getCount(), sec); + } else { + vector.set(getCount(), sec); + } + } else if (vector instanceof TimeStampMilliVector) { + long ms = timestamp.getEpochMillisecond(); + if (handleSafe) { + vector.setSafe(getCount(), ms); + } else { + vector.set(getCount(), ms); + } + } else if (vector instanceof TimeStampMicroVector) { + long microSec = + timestamp.getEpochMillisecond() * 1000 + + timestamp.getNanoOfMillisecond() / 1000; + if (handleSafe) { + vector.setSafe(getCount(), microSec); + } else { + vector.set(getCount(), microSec); + } + } else { + long nanoSec = + timestamp.getEpochMillisecond() * 1_000_000 + + timestamp.getNanoOfMillisecond(); + if (handleSafe) { + vector.setSafe(getCount(), nanoSec); + } else { + vector.set(getCount(), nanoSec); + } + } + } + } + + private boolean isNullAt(InternalArray array, int ordinal) { + return array.isNullAt(ordinal); + } + + private TimestampLtz readTimestamp(InternalArray array, int ordinal) { + return array.getTimestampLtz(ordinal, precision); + } +} diff --git a/fluss-lake/fluss-lake-lance/src/main/java/org/apache/fluss/lake/lance/writers/array/ArrowArrayTimestampNtzWriter.java b/fluss-lake/fluss-lake-lance/src/main/java/org/apache/fluss/lake/lance/writers/array/ArrowArrayTimestampNtzWriter.java new file mode 100644 index 0000000000..7fc8d13d80 --- /dev/null +++ b/fluss-lake/fluss-lake-lance/src/main/java/org/apache/fluss/lake/lance/writers/array/ArrowArrayTimestampNtzWriter.java @@ -0,0 +1,98 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.fluss.lake.lance.writers.array; + +import org.apache.fluss.lake.lance.writers.ArrowFieldWriter; +import org.apache.fluss.row.InternalArray; +import org.apache.fluss.row.TimestampNtz; + +import org.apache.arrow.vector.TimeStampMicroVector; +import org.apache.arrow.vector.TimeStampMilliVector; +import org.apache.arrow.vector.TimeStampSecVector; +import org.apache.arrow.vector.TimeStampVector; +import org.apache.arrow.vector.ValueVector; +import org.apache.arrow.vector.types.pojo.ArrowType; + +import static org.apache.fluss.utils.Preconditions.checkState; + +/** {@link ArrowFieldWriter} for TimestampNtz elements in arrays. */ +public class ArrowArrayTimestampNtzWriter extends ArrowFieldWriter { + public static ArrowArrayTimestampNtzWriter forField(ValueVector valueVector, int precision) { + return new ArrowArrayTimestampNtzWriter(valueVector, precision); + } + + private final int precision; + + private ArrowArrayTimestampNtzWriter(ValueVector valueVector, int precision) { + super(valueVector); + checkState( + valueVector instanceof TimeStampVector + && ((ArrowType.Timestamp) valueVector.getField().getType()).getTimezone() + == null); + this.precision = precision; + } + + @Override + public void doWrite(InternalArray array, int ordinal, boolean handleSafe) { + TimeStampVector vector = (TimeStampVector) getValueVector(); + if (isNullAt(array, ordinal)) { + vector.setNull(getCount()); + } else { + TimestampNtz timestamp = readTimestamp(array, ordinal); + if (vector instanceof TimeStampSecVector) { + long sec = timestamp.getMillisecond() / 1000; + if (handleSafe) { + vector.setSafe(getCount(), sec); + } else { + vector.set(getCount(), sec); + } + } else if (vector instanceof TimeStampMilliVector) { + long ms = timestamp.getMillisecond(); + if (handleSafe) { + vector.setSafe(getCount(), ms); + } else { + vector.set(getCount(), ms); + } + } else if (vector instanceof TimeStampMicroVector) { + long microSec = + timestamp.getMillisecond() * 1000 + timestamp.getNanoOfMillisecond() / 1000; + if (handleSafe) { + vector.setSafe(getCount(), microSec); + } else { + vector.set(getCount(), microSec); + } + } else { + long nanoSec = + timestamp.getMillisecond() * 1_000_000 + timestamp.getNanoOfMillisecond(); + if (handleSafe) { + vector.setSafe(getCount(), nanoSec); + } else { + vector.set(getCount(), nanoSec); + } + } + } + } + + private boolean isNullAt(InternalArray array, int ordinal) { + return array.isNullAt(ordinal); + } + + private TimestampNtz readTimestamp(InternalArray array, int ordinal) { + return array.getTimestampNtz(ordinal, precision); + } +} diff --git a/fluss-lake/fluss-lake-lance/src/main/java/org/apache/fluss/lake/lance/writers/array/ArrowArrayTinyIntWriter.java b/fluss-lake/fluss-lake-lance/src/main/java/org/apache/fluss/lake/lance/writers/array/ArrowArrayTinyIntWriter.java new file mode 100644 index 0000000000..aa0981f50c --- /dev/null +++ b/fluss-lake/fluss-lake-lance/src/main/java/org/apache/fluss/lake/lance/writers/array/ArrowArrayTinyIntWriter.java @@ -0,0 +1,55 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.fluss.lake.lance.writers.array; + +import org.apache.fluss.lake.lance.writers.ArrowFieldWriter; +import org.apache.fluss.row.InternalArray; + +import org.apache.arrow.vector.TinyIntVector; + +/** {@link ArrowFieldWriter} for TinyInt elements in arrays. */ +public class ArrowArrayTinyIntWriter extends ArrowFieldWriter { + + public static ArrowArrayTinyIntWriter forField(TinyIntVector tinyIntVector) { + return new ArrowArrayTinyIntWriter(tinyIntVector); + } + + private ArrowArrayTinyIntWriter(TinyIntVector tinyIntVector) { + super(tinyIntVector); + } + + @Override + public void doWrite(InternalArray array, int ordinal, boolean handleSafe) { + TinyIntVector vector = (TinyIntVector) getValueVector(); + if (isNullAt(array, ordinal)) { + vector.setNull(getCount()); + } else if (handleSafe) { + vector.setSafe(getCount(), readByte(array, ordinal)); + } else { + vector.set(getCount(), readByte(array, ordinal)); + } + } + + private boolean isNullAt(InternalArray array, int ordinal) { + return array.isNullAt(ordinal); + } + + private byte readByte(InternalArray array, int ordinal) { + return array.getByte(ordinal); + } +} diff --git a/fluss-lake/fluss-lake-lance/src/main/java/org/apache/fluss/lake/lance/writers/array/ArrowArrayVarBinaryWriter.java b/fluss-lake/fluss-lake-lance/src/main/java/org/apache/fluss/lake/lance/writers/array/ArrowArrayVarBinaryWriter.java new file mode 100644 index 0000000000..fc2b960470 --- /dev/null +++ b/fluss-lake/fluss-lake-lance/src/main/java/org/apache/fluss/lake/lance/writers/array/ArrowArrayVarBinaryWriter.java @@ -0,0 +1,58 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.fluss.lake.lance.writers.array; + +import org.apache.fluss.lake.lance.writers.ArrowFieldWriter; +import org.apache.fluss.row.InternalArray; + +import org.apache.arrow.vector.VarBinaryVector; + +/** {@link ArrowFieldWriter} for VarBinary elements in arrays. */ +public class ArrowArrayVarBinaryWriter extends ArrowFieldWriter { + + public static ArrowArrayVarBinaryWriter forField(VarBinaryVector varBinaryVector) { + return new ArrowArrayVarBinaryWriter(varBinaryVector); + } + + private ArrowArrayVarBinaryWriter(VarBinaryVector varBinaryVector) { + super(varBinaryVector); + } + + @Override + public void doWrite(InternalArray array, int ordinal, boolean handleSafe) { + VarBinaryVector vector = (VarBinaryVector) getValueVector(); + if (isNullAt(array, ordinal)) { + vector.setNull(getCount()); + } else { + byte[] bytes = readBinary(array, ordinal); + if (handleSafe) { + vector.setSafe(getCount(), bytes); + } else { + vector.set(getCount(), bytes); + } + } + } + + private boolean isNullAt(InternalArray array, int ordinal) { + return array.isNullAt(ordinal); + } + + private byte[] readBinary(InternalArray array, int ordinal) { + return array.getBytes(ordinal); + } +} diff --git a/fluss-lake/fluss-lake-lance/src/main/java/org/apache/fluss/lake/lance/writers/array/ArrowArrayVarCharWriter.java b/fluss-lake/fluss-lake-lance/src/main/java/org/apache/fluss/lake/lance/writers/array/ArrowArrayVarCharWriter.java new file mode 100644 index 0000000000..f7d3d4adc0 --- /dev/null +++ b/fluss-lake/fluss-lake-lance/src/main/java/org/apache/fluss/lake/lance/writers/array/ArrowArrayVarCharWriter.java @@ -0,0 +1,58 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.fluss.lake.lance.writers.array; + +import org.apache.fluss.lake.lance.writers.ArrowFieldWriter; +import org.apache.fluss.row.InternalArray; + +import org.apache.arrow.vector.VarCharVector; + +/** {@link ArrowFieldWriter} for VarChar elements in arrays. */ +public class ArrowArrayVarCharWriter extends ArrowFieldWriter { + + public static ArrowArrayVarCharWriter forField(VarCharVector varCharVector) { + return new ArrowArrayVarCharWriter(varCharVector); + } + + private ArrowArrayVarCharWriter(VarCharVector varCharVector) { + super(varCharVector); + } + + @Override + public void doWrite(InternalArray array, int ordinal, boolean handleSafe) { + VarCharVector vector = (VarCharVector) getValueVector(); + if (isNullAt(array, ordinal)) { + vector.setNull(getCount()); + } else { + byte[] bytes = readString(array, ordinal).toBytes(); + if (handleSafe) { + vector.setSafe(getCount(), bytes); + } else { + vector.set(getCount(), bytes); + } + } + } + + private boolean isNullAt(InternalArray array, int ordinal) { + return array.isNullAt(ordinal); + } + + private org.apache.fluss.row.BinaryString readString(InternalArray array, int ordinal) { + return array.getString(ordinal); + } +} diff --git a/fluss-lake/fluss-lake-lance/src/test/java/org/apache/fluss/lake/lance/tiering/LanceTieringTest.java b/fluss-lake/fluss-lake-lance/src/test/java/org/apache/fluss/lake/lance/tiering/LanceTieringTest.java index 1cfac3723f..057df8fc0c 100644 --- a/fluss-lake/fluss-lake-lance/src/test/java/org/apache/fluss/lake/lance/tiering/LanceTieringTest.java +++ b/fluss-lake/fluss-lake-lance/src/test/java/org/apache/fluss/lake/lance/tiering/LanceTieringTest.java @@ -37,6 +37,7 @@ import org.apache.fluss.record.GenericRecord; import org.apache.fluss.record.LogRecord; import org.apache.fluss.row.BinaryString; +import org.apache.fluss.row.GenericArray; import org.apache.fluss.row.GenericRow; import org.apache.fluss.types.DataTypes; import org.apache.fluss.utils.types.Tuple2; @@ -48,6 +49,7 @@ import org.apache.arrow.vector.VectorSchemaRoot; import org.apache.arrow.vector.ipc.ArrowReader; import org.junit.jupiter.api.BeforeEach; +import org.junit.jupiter.api.Test; import org.junit.jupiter.api.io.TempDir; import org.junit.jupiter.params.ParameterizedTest; import org.junit.jupiter.params.provider.Arguments; @@ -341,4 +343,402 @@ private Schema createTable(LanceConfig config) { return schema; } + + @Test + void testArrayTypeInt() throws Exception { + TablePath tablePath = TablePath.of("lance", "arrayTable"); + Map customProperties = new HashMap<>(); + customProperties.put("lance.batch_size", "256"); + LanceConfig config = + LanceConfig.from( + configuration.toMap(), + customProperties, + tablePath.getDatabaseName(), + tablePath.getTableName()); + + // Create schema with ARRAY column + List columns = new ArrayList<>(); + columns.add(new Schema.Column("id", DataTypes.INT())); + columns.add(new Schema.Column("int_array", DataTypes.ARRAY(DataTypes.INT()))); + Schema schema = Schema.newBuilder().fromColumns(columns).build(); + + WriteParams params = LanceConfig.genWriteParamsFromConfig(config); + LanceDatasetAdapter.createDataset( + config.getDatasetUri(), LanceArrowUtils.toArrowSchema(schema.getRowType()), params); + + TableDescriptor descriptor = + TableDescriptor.builder() + .schema(schema) + .distributedBy(1) + .property(ConfigOptions.TABLE_DATALAKE_ENABLED, true) + .customProperties(customProperties) + .build(); + TableInfo tableInfo = TableInfo.of(tablePath, 0, 1, descriptor, 1L, 1L); + + List lanceWriteResults = new ArrayList<>(); + Map tableBucketOffsets = new HashMap<>(); + + // Write data with arrays + try (LakeWriter lakeWriter = + createLakeWriter(tablePath, 0, null, tableInfo)) { + List logRecords = new ArrayList<>(); + for (int i = 0; i < 10; i++) { + GenericRow row = new GenericRow(2); + row.setField(0, i); + // Create array with values [i, i+1, i+2] + row.setField(1, new GenericArray(new int[] {i, i + 1, i + 2})); + logRecords.add( + new GenericRecord( + i, System.currentTimeMillis(), ChangeType.APPEND_ONLY, row)); + } + tableBucketOffsets.put(new TableBucket(0, null, 0), 10L); + for (LogRecord logRecord : logRecords) { + lakeWriter.write(logRecord); + } + lanceWriteResults.add(lakeWriter.complete()); + } + + // Commit data + try (LakeCommitter lakeCommitter = + createLakeCommitter(tablePath, tableInfo)) { + LanceCommittable lanceCommittable = lakeCommitter.toCommittable(lanceWriteResults); + long snapshot = + lakeCommitter.commit( + lanceCommittable, toBucketOffsetsProperty(tableBucketOffsets)); + assertThat(snapshot).isEqualTo(2); + } + + // Verify dataset was created successfully + try (Dataset dataset = + Dataset.open( + new RootAllocator(), + config.getDatasetUri(), + LanceConfig.genReadOptionFromConfig(config))) { + assertThat(dataset).isNotNull(); + } + } + + @Test + void testArrayTypeString() throws Exception { + TablePath tablePath = TablePath.of("lance", "stringArrayTable"); + Map customProperties = new HashMap<>(); + customProperties.put("lance.batch_size", "256"); + LanceConfig config = + LanceConfig.from( + configuration.toMap(), + customProperties, + tablePath.getDatabaseName(), + tablePath.getTableName()); + + // Create schema with ARRAY column + List columns = new ArrayList<>(); + columns.add(new Schema.Column("id", DataTypes.INT())); + columns.add(new Schema.Column("string_array", DataTypes.ARRAY(DataTypes.STRING()))); + Schema schema = Schema.newBuilder().fromColumns(columns).build(); + + WriteParams params = LanceConfig.genWriteParamsFromConfig(config); + LanceDatasetAdapter.createDataset( + config.getDatasetUri(), LanceArrowUtils.toArrowSchema(schema.getRowType()), params); + + TableDescriptor descriptor = + TableDescriptor.builder() + .schema(schema) + .distributedBy(1) + .property(ConfigOptions.TABLE_DATALAKE_ENABLED, true) + .customProperties(customProperties) + .build(); + TableInfo tableInfo = TableInfo.of(tablePath, 0, 1, descriptor, 1L, 1L); + + List lanceWriteResults = new ArrayList<>(); + Map tableBucketOffsets = new HashMap<>(); + + // Write data with string arrays + try (LakeWriter lakeWriter = + createLakeWriter(tablePath, 0, null, tableInfo)) { + List logRecords = new ArrayList<>(); + for (int i = 0; i < 10; i++) { + GenericRow row = new GenericRow(2); + row.setField(0, i); + row.setField( + 1, + new GenericArray( + new BinaryString[] { + BinaryString.fromString("str_" + i), + BinaryString.fromString("str_" + (i + 1)) + })); + logRecords.add( + new GenericRecord( + i, System.currentTimeMillis(), ChangeType.APPEND_ONLY, row)); + } + tableBucketOffsets.put(new TableBucket(0, null, 0), 10L); + for (LogRecord logRecord : logRecords) { + lakeWriter.write(logRecord); + } + lanceWriteResults.add(lakeWriter.complete()); + } + + // Commit data + try (LakeCommitter lakeCommitter = + createLakeCommitter(tablePath, tableInfo)) { + LanceCommittable lanceCommittable = lakeCommitter.toCommittable(lanceWriteResults); + long snapshot = + lakeCommitter.commit( + lanceCommittable, toBucketOffsetsProperty(tableBucketOffsets)); + assertThat(snapshot).isEqualTo(2); + } + + // Verify dataset was created successfully + try (Dataset dataset = + Dataset.open( + new RootAllocator(), + config.getDatasetUri(), + LanceConfig.genReadOptionFromConfig(config))) { + assertThat(dataset).isNotNull(); + } + } + + @Test + void testArrayTypeNullable() throws Exception { + TablePath tablePath = TablePath.of("lance", "nullableArrayTable"); + Map customProperties = new HashMap<>(); + customProperties.put("lance.batch_size", "256"); + LanceConfig config = + LanceConfig.from( + configuration.toMap(), + customProperties, + tablePath.getDatabaseName(), + tablePath.getTableName()); + + // Create schema with ARRAY column (elements can be null using Object[] arrays) + List columns = new ArrayList<>(); + columns.add(new Schema.Column("id", DataTypes.INT())); + columns.add(new Schema.Column("nullable_array", DataTypes.ARRAY(DataTypes.INT()))); + Schema schema = Schema.newBuilder().fromColumns(columns).build(); + + WriteParams params = LanceConfig.genWriteParamsFromConfig(config); + LanceDatasetAdapter.createDataset( + config.getDatasetUri(), LanceArrowUtils.toArrowSchema(schema.getRowType()), params); + + TableDescriptor descriptor = + TableDescriptor.builder() + .schema(schema) + .distributedBy(1) + .property(ConfigOptions.TABLE_DATALAKE_ENABLED, true) + .customProperties(customProperties) + .build(); + TableInfo tableInfo = TableInfo.of(tablePath, 0, 1, descriptor, 1L, 1L); + + List lanceWriteResults = new ArrayList<>(); + Map tableBucketOffsets = new HashMap<>(); + + // Write data with null arrays and null elements + try (LakeWriter lakeWriter = + createLakeWriter(tablePath, 0, null, tableInfo)) { + List logRecords = new ArrayList<>(); + + // Row with null array + GenericRow row1 = new GenericRow(2); + row1.setField(0, 0); + row1.setField(1, null); + logRecords.add( + new GenericRecord(0, System.currentTimeMillis(), ChangeType.APPEND_ONLY, row1)); + + // Row with empty array + GenericRow row2 = new GenericRow(2); + row2.setField(0, 1); + row2.setField(1, new GenericArray(new Integer[] {})); + logRecords.add( + new GenericRecord(1, System.currentTimeMillis(), ChangeType.APPEND_ONLY, row2)); + + // Row with array containing null elements + GenericRow row3 = new GenericRow(2); + row3.setField(0, 2); + row3.setField(1, new GenericArray(new Integer[] {1, null, 3})); + logRecords.add( + new GenericRecord(2, System.currentTimeMillis(), ChangeType.APPEND_ONLY, row3)); + + tableBucketOffsets.put(new TableBucket(0, null, 0), 3L); + for (LogRecord logRecord : logRecords) { + lakeWriter.write(logRecord); + } + lanceWriteResults.add(lakeWriter.complete()); + } + + // Commit data + try (LakeCommitter lakeCommitter = + createLakeCommitter(tablePath, tableInfo)) { + LanceCommittable lanceCommittable = lakeCommitter.toCommittable(lanceWriteResults); + long snapshot = + lakeCommitter.commit( + lanceCommittable, toBucketOffsetsProperty(tableBucketOffsets)); + assertThat(snapshot).isEqualTo(2); + } + + // Verify dataset was created successfully + try (Dataset dataset = + Dataset.open( + new RootAllocator(), + config.getDatasetUri(), + LanceConfig.genReadOptionFromConfig(config))) { + assertThat(dataset).isNotNull(); + } + } + + @Test + void testNestedArrayType() throws Exception { + TablePath tablePath = TablePath.of("lance", "nestedArrayTable"); + Map customProperties = new HashMap<>(); + customProperties.put("lance.batch_size", "256"); + LanceConfig config = + LanceConfig.from( + configuration.toMap(), + customProperties, + tablePath.getDatabaseName(), + tablePath.getTableName()); + + // Create schema with ARRAY> column + List columns = new ArrayList<>(); + columns.add(new Schema.Column("id", DataTypes.INT())); + columns.add( + new Schema.Column( + "nested_array", DataTypes.ARRAY(DataTypes.ARRAY(DataTypes.INT())))); + Schema schema = Schema.newBuilder().fromColumns(columns).build(); + + WriteParams params = LanceConfig.genWriteParamsFromConfig(config); + LanceDatasetAdapter.createDataset( + config.getDatasetUri(), LanceArrowUtils.toArrowSchema(schema.getRowType()), params); + + TableDescriptor descriptor = + TableDescriptor.builder() + .schema(schema) + .distributedBy(1) + .property(ConfigOptions.TABLE_DATALAKE_ENABLED, true) + .customProperties(customProperties) + .build(); + TableInfo tableInfo = TableInfo.of(tablePath, 0, 1, descriptor, 1L, 1L); + + List lanceWriteResults = new ArrayList<>(); + Map tableBucketOffsets = new HashMap<>(); + + // Write data with nested arrays + try (LakeWriter lakeWriter = + createLakeWriter(tablePath, 0, null, tableInfo)) { + List logRecords = new ArrayList<>(); + GenericRow row = new GenericRow(2); + row.setField(0, 0); + + // Create nested array: [[1, 2], [3, 4, 5], [6]] + GenericArray inner1 = new GenericArray(new int[] {1, 2}); + GenericArray inner2 = new GenericArray(new int[] {3, 4, 5}); + GenericArray inner3 = new GenericArray(new int[] {6}); + GenericArray outer = new GenericArray(new Object[] {inner1, inner2, inner3}); + row.setField(1, outer); + + logRecords.add( + new GenericRecord(0, System.currentTimeMillis(), ChangeType.APPEND_ONLY, row)); + + tableBucketOffsets.put(new TableBucket(0, null, 0), 1L); + for (LogRecord logRecord : logRecords) { + lakeWriter.write(logRecord); + } + lanceWriteResults.add(lakeWriter.complete()); + } + + // Commit data + try (LakeCommitter lakeCommitter = + createLakeCommitter(tablePath, tableInfo)) { + LanceCommittable lanceCommittable = lakeCommitter.toCommittable(lanceWriteResults); + long snapshot = + lakeCommitter.commit( + lanceCommittable, toBucketOffsetsProperty(tableBucketOffsets)); + assertThat(snapshot).isEqualTo(2); + } + + // Verify dataset was created successfully + try (Dataset dataset = + Dataset.open( + new RootAllocator(), + config.getDatasetUri(), + LanceConfig.genReadOptionFromConfig(config))) { + assertThat(dataset).isNotNull(); + } + } + + @Test + void testMultiplePrimitiveArrayTypes() throws Exception { + TablePath tablePath = TablePath.of("lance", "multiArrayTable"); + Map customProperties = new HashMap<>(); + customProperties.put("lance.batch_size", "256"); + LanceConfig config = + LanceConfig.from( + configuration.toMap(), + customProperties, + tablePath.getDatabaseName(), + tablePath.getTableName()); + + // Create schema with multiple array type columns + List columns = new ArrayList<>(); + columns.add(new Schema.Column("int_array", DataTypes.ARRAY(DataTypes.INT()))); + columns.add(new Schema.Column("bigint_array", DataTypes.ARRAY(DataTypes.BIGINT()))); + columns.add(new Schema.Column("double_array", DataTypes.ARRAY(DataTypes.DOUBLE()))); + columns.add(new Schema.Column("boolean_array", DataTypes.ARRAY(DataTypes.BOOLEAN()))); + columns.add(new Schema.Column("date_array", DataTypes.ARRAY(DataTypes.DATE()))); + Schema schema = Schema.newBuilder().fromColumns(columns).build(); + + WriteParams params = LanceConfig.genWriteParamsFromConfig(config); + LanceDatasetAdapter.createDataset( + config.getDatasetUri(), LanceArrowUtils.toArrowSchema(schema.getRowType()), params); + + TableDescriptor descriptor = + TableDescriptor.builder() + .schema(schema) + .distributedBy(1) + .property(ConfigOptions.TABLE_DATALAKE_ENABLED, true) + .customProperties(customProperties) + .build(); + TableInfo tableInfo = TableInfo.of(tablePath, 0, 1, descriptor, 1L, 1L); + + List lanceWriteResults = new ArrayList<>(); + Map tableBucketOffsets = new HashMap<>(); + + // Write data with multiple array types + try (LakeWriter lakeWriter = + createLakeWriter(tablePath, 0, null, tableInfo)) { + List logRecords = new ArrayList<>(); + GenericRow row = new GenericRow(5); + row.setField(0, new GenericArray(new int[] {1, 2, 3})); + row.setField(1, new GenericArray(new long[] {100L, 200L, 300L})); + row.setField(2, new GenericArray(new double[] {1.1, 2.2, 3.3})); + row.setField(3, new GenericArray(new boolean[] {true, false, true})); + row.setField(4, new GenericArray(new Integer[] {18993, 18994, 18995})); + + logRecords.add( + new GenericRecord(0, System.currentTimeMillis(), ChangeType.APPEND_ONLY, row)); + + tableBucketOffsets.put(new TableBucket(0, null, 0), 1L); + for (LogRecord logRecord : logRecords) { + lakeWriter.write(logRecord); + } + lanceWriteResults.add(lakeWriter.complete()); + } + + // Commit data + try (LakeCommitter lakeCommitter = + createLakeCommitter(tablePath, tableInfo)) { + LanceCommittable lanceCommittable = lakeCommitter.toCommittable(lanceWriteResults); + long snapshot = + lakeCommitter.commit( + lanceCommittable, toBucketOffsetsProperty(tableBucketOffsets)); + assertThat(snapshot).isEqualTo(2); + } + + // Verify dataset was created successfully + try (Dataset dataset = + Dataset.open( + new RootAllocator(), + config.getDatasetUri(), + LanceConfig.genReadOptionFromConfig(config))) { + assertThat(dataset).isNotNull(); + } + } } diff --git a/fluss-lake/fluss-lake-lance/src/test/java/org/apache/fluss/lake/lance/writers/ArrowListWriterTest.java b/fluss-lake/fluss-lake-lance/src/test/java/org/apache/fluss/lake/lance/writers/ArrowListWriterTest.java new file mode 100644 index 0000000000..0dfc7b33d4 --- /dev/null +++ b/fluss-lake/fluss-lake-lance/src/test/java/org/apache/fluss/lake/lance/writers/ArrowListWriterTest.java @@ -0,0 +1,151 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.fluss.lake.lance.writers; + +import org.apache.fluss.lake.lance.writers.array.ArrowArrayIntWriter; +import org.apache.fluss.row.GenericArray; +import org.apache.fluss.row.GenericRow; +import org.apache.fluss.row.InternalArray; + +import org.apache.arrow.memory.BufferAllocator; +import org.apache.arrow.memory.RootAllocator; +import org.apache.arrow.vector.IntVector; +import org.apache.arrow.vector.complex.ListVector; +import org.apache.arrow.vector.types.pojo.ArrowType; +import org.apache.arrow.vector.types.pojo.Field; +import org.apache.arrow.vector.types.pojo.FieldType; +import org.junit.jupiter.api.AfterEach; +import org.junit.jupiter.api.BeforeEach; +import org.junit.jupiter.api.Test; + +import java.util.Collections; + +import static org.assertj.core.api.Assertions.assertThat; + +/** Unit tests for {@link ArrowListWriter}. */ +class ArrowListWriterTest { + + private BufferAllocator allocator; + + @BeforeEach + void setUp() { + allocator = new RootAllocator(Long.MAX_VALUE); + } + + @AfterEach + void tearDown() { + allocator.close(); + } + + @Test + void testWriteSimpleArray() { + Field elementField = + new Field("element", FieldType.nullable(new ArrowType.Int(32, true)), null); + Field listField = + new Field( + "list", + FieldType.nullable(ArrowType.List.INSTANCE), + Collections.singletonList(elementField)); + + ListVector listVector = (ListVector) listField.createVector(allocator); + listVector.allocateNew(); + + IntVector elementVector = (IntVector) listVector.getDataVector(); + ArrowFieldWriter elementWriter = ArrowArrayIntWriter.forField(elementVector); + ArrowListWriter writer = ArrowListWriter.forField(listVector, elementWriter); + + // Create test data: row with array [1, 2, 3] + GenericRow row = new GenericRow(1); + row.setField(0, new GenericArray(new int[] {1, 2, 3})); + + // Write the row + writer.write(row, 0, true); + writer.finish(); + + // Verify + assertThat(listVector.getValueCount()).isEqualTo(1); + assertThat(listVector.isNull(0)).isFalse(); + // Additional verification would require reading back the values + + listVector.close(); + } + + @Test + void testWriteNullArray() { + Field elementField = + new Field("element", FieldType.nullable(new ArrowType.Int(32, true)), null); + Field listField = + new Field( + "list", + FieldType.nullable(ArrowType.List.INSTANCE), + Collections.singletonList(elementField)); + + ListVector listVector = (ListVector) listField.createVector(allocator); + listVector.allocateNew(); + + IntVector elementVector = (IntVector) listVector.getDataVector(); + ArrowFieldWriter elementWriter = ArrowArrayIntWriter.forField(elementVector); + ArrowListWriter writer = ArrowListWriter.forField(listVector, elementWriter); + + // Create test data: row with null array + GenericRow row = new GenericRow(1); + row.setField(0, null); + + // Write the row + writer.write(row, 0, true); + writer.finish(); + + // Verify + assertThat(listVector.getValueCount()).isEqualTo(1); + assertThat(listVector.isNull(0)).isTrue(); + + listVector.close(); + } + + @Test + void testWriteEmptyArray() { + Field elementField = + new Field("element", FieldType.nullable(new ArrowType.Int(32, true)), null); + Field listField = + new Field( + "list", + FieldType.nullable(ArrowType.List.INSTANCE), + Collections.singletonList(elementField)); + + ListVector listVector = (ListVector) listField.createVector(allocator); + listVector.allocateNew(); + + IntVector elementVector = (IntVector) listVector.getDataVector(); + ArrowFieldWriter elementWriter = ArrowArrayIntWriter.forField(elementVector); + ArrowListWriter writer = ArrowListWriter.forField(listVector, elementWriter); + + // Create test data: row with empty array [] + GenericRow row = new GenericRow(1); + row.setField(0, new GenericArray(new int[] {})); + + // Write the row + writer.write(row, 0, true); + writer.finish(); + + // Verify + assertThat(listVector.getValueCount()).isEqualTo(1); + assertThat(listVector.isNull(0)).isFalse(); + + listVector.close(); + } +}