Skip to content
Draft
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,8 @@
import org.apache.arrow.vector.FieldVector;
import org.apache.arrow.vector.types.pojo.Field;

import org.apache.comet.IcebergApi;

/** This is a simple wrapper around SchemaImporter to make it accessible from Java Arrow. */
public abstract class AbstractCometSchemaImporter {
private final BufferAllocator allocator;
Expand Down Expand Up @@ -67,6 +69,7 @@ public FieldVector importVector(ArrowArray array, ArrowSchema schema) {
return vector;
}

@IcebergApi
public void close() {
provider.close();
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -23,7 +23,9 @@
import org.apache.arrow.memory.BufferAllocator;

/** This is a simple wrapper around SchemaImporter to make it accessible from Java Arrow. */
@IcebergApi
public class CometSchemaImporter extends AbstractCometSchemaImporter {
@IcebergApi
public CometSchemaImporter(BufferAllocator allocator) {
super(allocator);
}
Expand Down
44 changes: 44 additions & 0 deletions common/src/main/java/org/apache/comet/IcebergApi.java
Original file line number Diff line number Diff line change
@@ -0,0 +1,44 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/

package org.apache.comet;

import java.lang.annotation.Documented;
import java.lang.annotation.ElementType;
import java.lang.annotation.Retention;
import java.lang.annotation.RetentionPolicy;
import java.lang.annotation.Target;

/**
* Indicates that the annotated element is part of the public API used by Apache Iceberg.
*
* <p>This annotation marks classes, methods, constructors, and fields that form the contract
* between Comet and Iceberg. Changes to these APIs may break Iceberg's Comet integration, so
* contributors should exercise caution and consider backward compatibility when modifying annotated
* elements.
*
* <p>The Iceberg integration uses Comet's native Parquet reader for accelerated vectorized reads.
* See the contributor guide documentation for details on how Iceberg uses these APIs.
*
* @see <a href="https://iceberg.apache.org/">Apache Iceberg</a>
*/
@Documented
@Retention(RetentionPolicy.RUNTIME)
@Target({ElementType.TYPE, ElementType.METHOD, ElementType.CONSTRUCTOR, ElementType.FIELD})
public @interface IcebergApi {}
Original file line number Diff line number Diff line change
Expand Up @@ -28,9 +28,11 @@
import org.apache.spark.sql.types.TimestampNTZType$;

import org.apache.comet.CometConf;
import org.apache.comet.IcebergApi;
import org.apache.comet.vector.CometVector;

/** Base class for Comet Parquet column reader implementations. */
@IcebergApi
public abstract class AbstractColumnReader implements AutoCloseable {
protected static final Logger LOG = LoggerFactory.getLogger(AbstractColumnReader.class);

Expand Down Expand Up @@ -61,7 +63,7 @@ public abstract class AbstractColumnReader implements AutoCloseable {
protected int batchSize;

/** A pointer to the native implementation of ColumnReader. */
protected long nativeHandle;
@IcebergApi protected long nativeHandle;

AbstractColumnReader(
DataType type,
Expand Down Expand Up @@ -96,6 +98,7 @@ String getPath() {
/**
* Set the batch size of this reader to be 'batchSize'. Also initializes the native column reader.
*/
@IcebergApi
public void setBatchSize(int batchSize) {
assert nativeHandle == 0
: "Native column reader shouldn't be initialized before " + "'setBatchSize' is called";
Expand All @@ -113,6 +116,7 @@ public void setBatchSize(int batchSize) {
/** Returns the {@link CometVector} read by this reader. */
public abstract CometVector currentBatch();

@IcebergApi
@Override
public void close() {
if (nativeHandle != 0) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -65,6 +65,7 @@

import org.apache.comet.CometConf;
import org.apache.comet.CometSchemaImporter;
import org.apache.comet.IcebergApi;
import org.apache.comet.shims.ShimBatchReader;
import org.apache.comet.shims.ShimFileFormat;
import org.apache.comet.vector.CometVector;
Expand All @@ -87,6 +88,7 @@
* }
* </pre>
*/
@IcebergApi
public class BatchReader extends RecordReader<Void, ColumnarBatch> implements Closeable {
private static final Logger LOG = LoggerFactory.getLogger(FileReader.class);
protected static final BufferAllocator ALLOCATOR = new RootAllocator();
Expand Down Expand Up @@ -189,6 +191,7 @@ public BatchReader(
* @deprecated since 0.10.0, will be removed in 0.11.0.
* @see <a href="https://github.com/apache/datafusion-comet/issues/2079">Comet Issue #2079</a>
*/
@IcebergApi
public BatchReader(AbstractColumnReader[] columnReaders) {
// Todo: set useDecimal128 and useLazyMaterialization
int numColumns = columnReaders.length;
Expand Down Expand Up @@ -387,6 +390,7 @@ public void init() throws URISyntaxException, IOException {
* @deprecated since 0.10.0, will be removed in 0.11.0.
* @see <a href="https://github.com/apache/datafusion-comet/issues/2079">Comet Issue #2079</a>
*/
@IcebergApi
public void setSparkSchema(StructType schema) {
this.sparkSchema = schema;
}
Expand All @@ -395,6 +399,7 @@ public void setSparkSchema(StructType schema) {
* @deprecated since 0.10.0, will be removed in 0.11.0.
* @see <a href="https://github.com/apache/datafusion-comet/issues/2079">Comet Issue #2079</a>
*/
@IcebergApi
public AbstractColumnReader[] getColumnReaders() {
return columnReaders;
}
Expand Down Expand Up @@ -498,6 +503,7 @@ public boolean nextBatch() throws IOException {
return nextBatch(batchSize);
}

@IcebergApi
public boolean nextBatch(int batchSize) {
long totalDecodeTime = 0, totalLoadTime = 0;
for (int i = 0; i < columnReaders.length; i++) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -44,12 +44,14 @@

import org.apache.comet.CometConf;
import org.apache.comet.CometSchemaImporter;
import org.apache.comet.IcebergApi;
import org.apache.comet.vector.CometDecodedVector;
import org.apache.comet.vector.CometDictionary;
import org.apache.comet.vector.CometDictionaryVector;
import org.apache.comet.vector.CometPlainVector;
import org.apache.comet.vector.CometVector;

@IcebergApi
public class ColumnReader extends AbstractColumnReader {
protected static final Logger LOG = LoggerFactory.getLogger(ColumnReader.class);
protected final BufferAllocator ALLOCATOR = new RootAllocator();
Expand Down Expand Up @@ -114,6 +116,7 @@ public class ColumnReader extends AbstractColumnReader {
* @deprecated since 0.10.0, will be removed in 0.11.0.
* @see <a href="https://github.com/apache/datafusion-comet/issues/2079">Comet Issue #2079</a>
*/
@IcebergApi
public void setPageReader(PageReader pageReader) throws IOException {
this.pageReader = pageReader;

Expand All @@ -129,6 +132,7 @@ public void setPageReader(PageReader pageReader) throws IOException {
}

/** This method is called from Apache Iceberg. */
@IcebergApi
public void setRowGroupReader(RowGroupReader rowGroupReader, ParquetColumnSpec columnSpec)
throws IOException {
ColumnDescriptor descriptor = Utils.buildColumnDescriptor(columnSpec);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -27,10 +27,13 @@
import org.apache.spark.sql.types.*;
import org.apache.spark.unsafe.types.UTF8String;

import org.apache.comet.IcebergApi;

/**
* A column reader that always return constant vectors. Used for reading partition columns, for
* instance.
*/
@IcebergApi
public class ConstantColumnReader extends MetadataColumnReader {
/** Whether all the values in this constant column are nulls */
private boolean isNull;
Expand All @@ -56,13 +59,15 @@ public class ConstantColumnReader extends MetadataColumnReader {
* @deprecated since 0.10.0, will be removed in 0.11.0.
* @see <a href="https://github.com/apache/datafusion-comet/issues/2079">Comet Issue #2079</a>
*/
@IcebergApi
public ConstantColumnReader(
DataType type, ColumnDescriptor descriptor, Object value, boolean useDecimal128) {
super(type, descriptor, useDecimal128, true);
this.value = value;
}

// Used by Iceberg
@IcebergApi
public ConstantColumnReader(
DataType type, ParquetColumnSpec spec, Object value, boolean useDecimal128) {
super(type, spec, useDecimal128, true);
Expand Down
8 changes: 8 additions & 0 deletions common/src/main/java/org/apache/comet/parquet/FileReader.java
Original file line number Diff line number Diff line change
Expand Up @@ -90,6 +90,8 @@
import org.apache.parquet.schema.PrimitiveType;
import org.apache.spark.sql.execution.metric.SQLMetric;

import org.apache.comet.IcebergApi;

import static org.apache.parquet.hadoop.ParquetFileWriter.EFMAGIC;
import static org.apache.parquet.hadoop.ParquetFileWriter.MAGIC;

Expand All @@ -101,6 +103,7 @@
* A Parquet file reader. Mostly followed {@code ParquetFileReader} in {@code parquet-mr}, but with
* customizations & optimizations for Comet.
*/
@IcebergApi
public class FileReader implements Closeable {
private static final Logger LOG = LoggerFactory.getLogger(FileReader.class);

Expand Down Expand Up @@ -135,6 +138,7 @@ public class FileReader implements Closeable {
}

/** This constructor is called from Apache Iceberg. */
@IcebergApi
public FileReader(
WrappedInputFile file,
ReadOptions cometOptions,
Expand Down Expand Up @@ -258,6 +262,7 @@ public void setRequestedSchema(List<ColumnDescriptor> projection) {
}

/** This method is called from Apache Iceberg. */
@IcebergApi
public void setRequestedSchemaFromSpecs(List<ParquetColumnSpec> specList) {
paths.clear();
for (ParquetColumnSpec colSpec : specList) {
Expand Down Expand Up @@ -336,6 +341,7 @@ public long getFilteredRecordCount() {
}

/** Skips the next row group. Returns false if there's no row group to skip. Otherwise, true. */
@IcebergApi
public boolean skipNextRowGroup() {
return advanceToNextBlock();
}
Expand All @@ -344,6 +350,7 @@ public boolean skipNextRowGroup() {
* Returns the next row group to read (after applying row group filtering), or null if there's no
* more row group.
*/
@IcebergApi
public RowGroupReader readNextRowGroup() throws IOException {
if (currentBlock == blocks.size()) {
return null;
Expand Down Expand Up @@ -864,6 +871,7 @@ public void closeStream() throws IOException {
}
}

@IcebergApi
@Override
public void close() throws IOException {
try {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -28,10 +28,12 @@
import org.apache.parquet.column.ColumnDescriptor;
import org.apache.spark.sql.types.DataType;

import org.apache.comet.IcebergApi;
import org.apache.comet.vector.CometPlainVector;
import org.apache.comet.vector.CometVector;

/** A metadata column reader that can be extended by {@link RowIndexColumnReader} etc. */
@IcebergApi
public class MetadataColumnReader extends AbstractColumnReader {
private final BufferAllocator allocator = new RootAllocator();

Expand All @@ -46,6 +48,7 @@ public class MetadataColumnReader extends AbstractColumnReader {
* @deprecated since 0.10.0, will be made package private in 0.11.0.
* @see <a href="https://github.com/apache/datafusion-comet/issues/2079">Comet Issue #2079</a>
*/
@IcebergApi
public MetadataColumnReader(
DataType type, ColumnDescriptor descriptor, boolean useDecimal128, boolean isConstant) {
// TODO: should we handle legacy dates & timestamps for metadata columns?
Expand All @@ -55,6 +58,7 @@ public MetadataColumnReader(
}

// Used by Iceberg
@IcebergApi
public MetadataColumnReader(
DataType type, ParquetColumnSpec spec, boolean useDecimal128, boolean isConstant) {
// TODO: should we handle legacy dates & timestamps for metadata columns?
Expand All @@ -69,6 +73,7 @@ public void setBatchSize(int batchSize) {
super.setBatchSize(batchSize);
}

@IcebergApi
@Override
public void readBatch(int total) {
if (vector == null) {
Expand All @@ -90,6 +95,7 @@ void setNumNulls(int total) {
vector.setNumNulls(total);
}

@IcebergApi
@Override
public CometVector currentBatch() {
return vector;
Expand Down
4 changes: 4 additions & 0 deletions common/src/main/java/org/apache/comet/parquet/Native.java
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,7 @@
import java.nio.ByteBuffer;
import java.util.Map;

import org.apache.comet.IcebergApi;
import org.apache.comet.NativeBase;

public final class Native extends NativeBase {
Expand Down Expand Up @@ -143,6 +144,7 @@ public static native void setPageV2(
*
* @param handle the handle to the native Parquet column reader
*/
@IcebergApi
public static native void resetBatch(long handle);

/**
Expand Down Expand Up @@ -221,12 +223,14 @@ public static native void setPageV2(
public static native void setDecimal(long handle, byte[] value);

/** Set position of row index vector for Iceberg Metadata Column */
@IcebergApi
public static native void setPosition(long handle, long value, int size);

/** Set row index vector for Spark row index metadata column and return vector size */
public static native int setIndices(long handle, long offset, int size, long[] indices);

/** Set deleted info for Iceberg Metadata Column */
@IcebergApi
public static native void setIsDeleted(long handle, boolean[] isDeleted);

/**
Expand Down
Loading
Loading