diff --git a/native/core/src/execution/planner.rs b/native/core/src/execution/planner.rs index 500203e76e..6a4ad97f88 100644 --- a/native/core/src/execution/planner.rs +++ b/native/core/src/execution/planner.rs @@ -2629,6 +2629,103 @@ fn convert_spark_types_to_arrow_schema( arrow_schema } +/// Converts a protobuf PartitionValue to an iceberg Literal. +/// +fn partition_value_to_literal( + proto_value: &spark_operator::PartitionValue, +) -> Result, ExecutionError> { + use spark_operator::partition_value::Value; + + if proto_value.is_null { + return Ok(None); + } + + let literal = match &proto_value.value { + Some(Value::IntVal(v)) => iceberg::spec::Literal::int(*v), + Some(Value::LongVal(v)) => iceberg::spec::Literal::long(*v), + Some(Value::DateVal(v)) => { + // Convert i64 to i32 for date (days since epoch) + let days = (*v) + .try_into() + .map_err(|_| GeneralError(format!("Date value out of range: {}", v)))?; + iceberg::spec::Literal::date(days) + } + Some(Value::TimestampVal(v)) => iceberg::spec::Literal::timestamp(*v), + Some(Value::TimestampTzVal(v)) => iceberg::spec::Literal::timestamptz(*v), + Some(Value::StringVal(s)) => iceberg::spec::Literal::string(s.clone()), + Some(Value::DoubleVal(v)) => iceberg::spec::Literal::double(*v), + Some(Value::FloatVal(v)) => iceberg::spec::Literal::float(*v), + Some(Value::DecimalVal(bytes)) => { + // Deserialize unscaled BigInteger bytes to i128 + // BigInteger is serialized as signed big-endian bytes + if bytes.len() > 16 { + return Err(GeneralError(format!( + "Decimal bytes too large: {} bytes (max 16 for i128)", + bytes.len() + ))); + } + + // Convert big-endian bytes to i128 + let mut buf = [0u8; 16]; + let offset = 16 - bytes.len(); + buf[offset..].copy_from_slice(bytes); + + // Handle sign extension for negative numbers + let value = if !bytes.is_empty() && (bytes[0] & 0x80) != 0 { + // Negative number - sign extend + for byte in buf.iter_mut().take(offset) { + *byte = 0xFF; + } + i128::from_be_bytes(buf) + } else { + // Positive number + i128::from_be_bytes(buf) + }; + + iceberg::spec::Literal::decimal(value) + } + Some(Value::BoolVal(v)) => iceberg::spec::Literal::bool(*v), + Some(Value::UuidVal(bytes)) => { + // Deserialize UUID from 16 bytes + if bytes.len() != 16 { + return Err(GeneralError(format!( + "Invalid UUID bytes length: {} (expected 16)", + bytes.len() + ))); + } + let uuid = uuid::Uuid::from_slice(bytes) + .map_err(|e| GeneralError(format!("Failed to parse UUID: {}", e)))?; + iceberg::spec::Literal::uuid(uuid) + } + Some(Value::FixedVal(bytes)) => iceberg::spec::Literal::fixed(bytes.to_vec()), + Some(Value::BinaryVal(bytes)) => iceberg::spec::Literal::binary(bytes.to_vec()), + None => { + return Err(GeneralError( + "PartitionValue has no value set and is_null is false".to_string(), + )); + } + }; + + Ok(Some(literal)) +} + +/// Converts a protobuf PartitionData to an iceberg Struct. +/// +/// Uses the existing Struct::from_iter() API from iceberg-rust to construct the struct +/// from the list of partition values. +/// This can potentially be upstreamed to iceberg_rust +fn partition_data_to_struct( + proto_partition: &spark_operator::PartitionData, +) -> Result { + let literals: Vec> = proto_partition + .values + .iter() + .map(partition_value_to_literal) + .collect::, _>>()?; + + Ok(iceberg::spec::Struct::from_iter(literals)) +} + /// Converts protobuf FileScanTasks from Scala into iceberg-rust FileScanTask objects. /// /// Each task contains a residual predicate that is used for row-group level filtering @@ -2655,19 +2752,6 @@ fn parse_file_scan_tasks( }) .collect::, _>>()?; - let partition_type_cache: Vec = proto_scan - .partition_type_pool - .iter() - .map(|json| { - serde_json::from_str(json).map_err(|e| { - ExecutionError::GeneralError(format!( - "Failed to parse partition type JSON from pool: {}", - e - )) - }) - }) - .collect::, _>>()?; - let partition_spec_cache: Vec>> = proto_scan .partition_spec_pool .iter() @@ -2721,19 +2805,7 @@ fn parse_file_scan_tasks( }) .collect::, _>>()?; - let partition_data_cache: Vec = proto_scan - .partition_data_pool - .iter() - .map(|json| { - serde_json::from_str(json).map_err(|e| { - ExecutionError::GeneralError(format!( - "Failed to parse partition data JSON from pool: {}", - e - )) - }) - }) - .collect::, _>>()?; - + // Partition data pool is in protobuf messages let results: Result, _> = proto_tasks .iter() .map(|proto_task| { @@ -2787,48 +2859,24 @@ fn parse_file_scan_tasks( }; let partition = if let Some(partition_data_idx) = proto_task.partition_data_idx { - let partition_type_idx = proto_task.partition_type_idx.ok_or_else(|| { - ExecutionError::GeneralError( - "partition_type_idx is required when partition_data_idx is present" - .to_string(), - ) - })?; - - let partition_data_value = partition_data_cache + // Get partition data from protobuf pool + let partition_data_proto = proto_scan + .partition_data_pool .get(partition_data_idx as usize) .ok_or_else(|| { ExecutionError::GeneralError(format!( - "Invalid partition_data_idx: {} (cache size: {})", + "Invalid partition_data_idx: {} (pool size: {})", partition_data_idx, - partition_data_cache.len() + proto_scan.partition_data_pool.len() )) })?; - let partition_type = partition_type_cache - .get(partition_type_idx as usize) - .ok_or_else(|| { - ExecutionError::GeneralError(format!( - "Invalid partition_type_idx: {} (cache size: {})", - partition_type_idx, - partition_type_cache.len() - )) - })?; - - match iceberg::spec::Literal::try_from_json( - partition_data_value.clone(), - &iceberg::spec::Type::Struct(partition_type.clone()), - ) { - Ok(Some(iceberg::spec::Literal::Struct(s))) => Some(s), - Ok(None) => None, - Ok(other) => { - return Err(GeneralError(format!( - "Expected struct literal for partition data, got: {:?}", - other - ))) - } + // Convert protobuf PartitionData to iceberg Struct + match partition_data_to_struct(partition_data_proto) { + Ok(s) => Some(s), Err(e) => { - return Err(GeneralError(format!( - "Failed to deserialize partition data from JSON: {}", + return Err(ExecutionError::GeneralError(format!( + "Failed to deserialize partition data from protobuf: {}", e ))) } diff --git a/native/proto/src/proto/operator.proto b/native/proto/src/proto/operator.proto index c00b953966..73c087cf36 100644 --- a/native/proto/src/proto/operator.proto +++ b/native/proto/src/proto/operator.proto @@ -130,6 +130,32 @@ message CsvOptions { bool truncated_rows = 8; } +// Partition value for Iceberg partition data +message PartitionValue { + int32 field_id = 1; + oneof value { + int32 int_val = 2; + int64 long_val = 3; + int64 date_val = 4; // days since epoch + int64 timestamp_val = 5; // microseconds since epoch + int64 timestamp_tz_val = 6; // microseconds with timezone + string string_val = 7; + double double_val = 8; + float float_val = 9; + bytes decimal_val = 10; // unscaled BigInteger bytes + bool bool_val = 11; + bytes uuid_val = 12; + bytes fixed_val = 13; + bytes binary_val = 14; + } + bool is_null = 15; +} + +// Collection of partition values for a single partition +message PartitionData { + repeated PartitionValue values = 1; +} + message IcebergScan { // Schema to read repeated SparkStructField required_schema = 1; @@ -149,7 +175,7 @@ message IcebergScan { repeated string partition_spec_pool = 7; repeated string name_mapping_pool = 8; repeated ProjectFieldIdList project_field_ids_pool = 9; - repeated string partition_data_pool = 10; + repeated PartitionData partition_data_pool = 10; repeated DeleteFileList delete_files_pool = 11; repeated spark.spark_expression.Expr residual_pool = 12; } diff --git a/spark/src/main/scala/org/apache/comet/serde/operator/CometIcebergNativeScan.scala b/spark/src/main/scala/org/apache/comet/serde/operator/CometIcebergNativeScan.scala index dc7df531f6..7238f8ae8c 100644 --- a/spark/src/main/scala/org/apache/comet/serde/operator/CometIcebergNativeScan.scala +++ b/spark/src/main/scala/org/apache/comet/serde/operator/CometIcebergNativeScan.scala @@ -22,7 +22,6 @@ package org.apache.comet.serde.operator import scala.collection.mutable import scala.jdk.CollectionConverters._ -import org.json4s._ import org.json4s.JsonDSL._ import org.json4s.jackson.JsonMethods._ @@ -71,81 +70,133 @@ object CometIcebergNativeScan extends CometOperatorSerde[CometBatchScanExec] wit } /** - * Converts an Iceberg partition value to JSON format expected by iceberg-rust. - * - * iceberg-rust's Literal::try_from_json() expects specific formats for certain types: - * - Timestamps: ISO string format "yyyy-MM-dd'T'HH:mm:ss.SSSSSS" - * - Dates: ISO string format "YYYY-MM-DD" - * - Decimals: String representation - * - * See: iceberg-rust/crates/iceberg/src/spec/values/literal.rs + * Converts an Iceberg partition value to protobuf format. Protobuf is less verbose than JSON. + * The following types are also serialized as integer values instead of as strings - Timestamps, + * Dates, Decimals, FieldIDs */ - private def partitionValueToJson(fieldTypeStr: String, value: Any): JValue = { - fieldTypeStr match { - case t if t.startsWith("timestamp") => - val micros = value match { - case l: java.lang.Long => l.longValue() - case i: java.lang.Integer => i.longValue() - case _ => value.toString.toLong - } - val instant = java.time.Instant.ofEpochSecond(micros / 1000000, (micros % 1000000) * 1000) - val formatted = java.time.format.DateTimeFormatter - .ofPattern("yyyy-MM-dd'T'HH:mm:ss.SSSSSS") - .withZone(java.time.ZoneOffset.UTC) - .format(instant) - JString(formatted) - - case "date" => - val days = value.asInstanceOf[java.lang.Integer].intValue() - val localDate = java.time.LocalDate.ofEpochDay(days.toLong) - JString(localDate.toString) - - case d if d.startsWith("decimal(") => - JString(value.toString) - - case "string" => - JString(value.toString) - - case "int" | "long" => - value match { - case i: java.lang.Integer => JInt(BigInt(i.intValue())) - case l: java.lang.Long => JInt(BigInt(l.longValue())) - case _ => JDecimal(BigDecimal(value.toString)) - } + private def partitionValueToProto( + fieldId: Int, + fieldTypeStr: String, + value: Any): OperatorOuterClass.PartitionValue = { + val builder = OperatorOuterClass.PartitionValue.newBuilder() + builder.setFieldId(fieldId) + + if (value == null) { + builder.setIsNull(true) + } else { + builder.setIsNull(false) + fieldTypeStr match { + case t if t.startsWith("timestamp") => + val micros = value match { + case l: java.lang.Long => l.longValue() + case i: java.lang.Integer => i.longValue() + case _ => value.toString.toLong + } + if (t.contains("tz")) { + builder.setTimestampTzVal(micros) + } else { + builder.setTimestampVal(micros) + } - case "float" | "double" => - value match { - // NaN/Infinity are not valid JSON numbers - serialize as strings - case f: java.lang.Float if f.isNaN || f.isInfinite => - JString(f.toString) - case d: java.lang.Double if d.isNaN || d.isInfinite => - JString(d.toString) - case f: java.lang.Float => JDouble(f.doubleValue()) - case d: java.lang.Double => JDouble(d.doubleValue()) - case _ => JDecimal(BigDecimal(value.toString)) - } + case "date" => + val days = value.asInstanceOf[java.lang.Integer].intValue() + builder.setDateVal(days) - case "boolean" => - value match { - case b: java.lang.Boolean => JBool(b.booleanValue()) - case _ => JBool(value.toString.toBoolean) - } + case d if d.startsWith("decimal(") => + // Serialize as unscaled BigInteger bytes + val bigDecimal = value match { + case bd: java.math.BigDecimal => bd + case _ => new java.math.BigDecimal(value.toString) + } + val unscaledBytes = bigDecimal.unscaledValue().toByteArray + builder.setDecimalVal(com.google.protobuf.ByteString.copyFrom(unscaledBytes)) - case "uuid" => - JString(value.toString) - - // Fallback: infer JSON type from Java type - case _ => - value match { - case s: String => JString(s) - case i: java.lang.Integer => JInt(BigInt(i.intValue())) - case l: java.lang.Long => JInt(BigInt(l.longValue())) - case d: java.lang.Double => JDouble(d.doubleValue()) - case f: java.lang.Float => JDouble(f.doubleValue()) - case b: java.lang.Boolean => JBool(b.booleanValue()) - case other => JString(other.toString) - } + case "string" => + builder.setStringVal(value.toString) + + case "int" => + val intVal = value match { + case i: java.lang.Integer => i.intValue() + case l: java.lang.Long => l.intValue() + case _ => value.toString.toInt + } + builder.setIntVal(intVal) + + case "long" => + val longVal = value match { + case l: java.lang.Long => l.longValue() + case i: java.lang.Integer => i.longValue() + case _ => value.toString.toLong + } + builder.setLongVal(longVal) + + case "float" => + val floatVal = value match { + case f: java.lang.Float => f.floatValue() + case d: java.lang.Double => d.floatValue() + case _ => value.toString.toFloat + } + builder.setFloatVal(floatVal) + + case "double" => + val doubleVal = value match { + case d: java.lang.Double => d.doubleValue() + case f: java.lang.Float => f.doubleValue() + case _ => value.toString.toDouble + } + builder.setDoubleVal(doubleVal) + + case "boolean" => + val boolVal = value match { + case b: java.lang.Boolean => b.booleanValue() + case _ => value.toString.toBoolean + } + builder.setBoolVal(boolVal) + + case "uuid" => + // UUID as bytes (16 bytes) or string + val uuidBytes = value match { + case uuid: java.util.UUID => + val bb = java.nio.ByteBuffer.wrap(new Array[Byte](16)) + bb.putLong(uuid.getMostSignificantBits) + bb.putLong(uuid.getLeastSignificantBits) + bb.array() + case _ => + // Parse UUID string and convert to bytes + val uuid = java.util.UUID.fromString(value.toString) + val bb = java.nio.ByteBuffer.wrap(new Array[Byte](16)) + bb.putLong(uuid.getMostSignificantBits) + bb.putLong(uuid.getLeastSignificantBits) + bb.array() + } + builder.setUuidVal(com.google.protobuf.ByteString.copyFrom(uuidBytes)) + + case t if t.startsWith("fixed[") || t.startsWith("binary") => + val bytes = value match { + case bytes: Array[Byte] => bytes + case _ => value.toString.getBytes("UTF-8") + } + if (t.startsWith("fixed")) { + builder.setFixedVal(com.google.protobuf.ByteString.copyFrom(bytes)) + } else { + builder.setBinaryVal(com.google.protobuf.ByteString.copyFrom(bytes)) + } + + // Fallback: infer type from Java type ? + case _ => + value match { + case s: String => builder.setStringVal(s) + case i: java.lang.Integer => builder.setIntVal(i.intValue()) + case l: java.lang.Long => builder.setLongVal(l.longValue()) + case d: java.lang.Double => builder.setDoubleVal(d.doubleValue()) + case f: java.lang.Float => builder.setFloatVal(f.floatValue()) + case b: java.lang.Boolean => builder.setBoolVal(b.booleanValue()) + case other => builder.setStringVal(other.toString) + } + } } + + builder.build() } /** @@ -375,18 +426,17 @@ object CometIcebergNativeScan extends CometOperatorSerde[CometBatchScanExec] wit } } - // Serialize partition data to JSON for iceberg-rust's constants_map. - // The native execution engine uses partition_data_json + - // partition_type_json to build a constants_map, which is the primary - // mechanism for providing partition values to identity-transformed - // partition columns. Non-identity transforms (bucket, truncate, days, - // etc.) read values from data files. + // Serialize partition data to protobuf for native execution. + // The native execution engine uses partition_data protobuf messages to + // build a constants_map, which provides partition values to identity- + // transformed partition columns. Non-identity transforms (bucket, truncate, + // days, etc.) read values from data files. // - // IMPORTANT: Use the same field IDs as partition_type_json (partition field IDs, - // not source field IDs) so that JSON deserialization matches correctly. + // IMPORTANT: Use partition field IDs (not source field IDs) to match + // the schema. // Filter out fields with unknown type (same as partition type filtering) - val partitionDataMap: Map[String, JValue] = + val partitionValues: Seq[OperatorOuterClass.PartitionValue] = fields.asScala.zipWithIndex.flatMap { case (field, idx) => val fieldTypeStr = getFieldType(field) @@ -402,23 +452,25 @@ object CometIcebergNativeScan extends CometOperatorSerde[CometBatchScanExec] wit partitionData.getClass.getMethod("get", classOf[Int], classOf[Class[_]]) val value = getMethod.invoke(partitionData, Integer.valueOf(idx), classOf[Object]) - val jsonValue = if (value == null) { - JNull - } else { - partitionValueToJson(fieldTypeStr, value) - } - Some(fieldId.toString -> jsonValue) + Some(partitionValueToProto(fieldId, fieldTypeStr, value)) } - }.toMap + }.toSeq // Only serialize partition data if we have non-unknown fields - if (partitionDataMap.nonEmpty) { - val partitionJson = compact(render(JObject(partitionDataMap.toList))) + if (partitionValues.nonEmpty) { + val partitionDataProto = OperatorOuterClass.PartitionData + .newBuilder() + .addAllValues(partitionValues.asJava) + .build() + + // Deduplicate by protobuf bytes (use Base64 string as key) + val partitionDataBytes = partitionDataProto.toByteArray + val partitionDataKey = java.util.Base64.getEncoder.encodeToString(partitionDataBytes) val partitionDataIdx = partitionDataToPoolIndex.getOrElseUpdate( - partitionJson, { + partitionDataKey, { val idx = partitionDataToPoolIndex.size - icebergScanBuilder.addPartitionDataPool(partitionJson) + icebergScanBuilder.addPartitionDataPool(partitionDataProto) idx }) taskBuilder.setPartitionDataIdx(partitionDataIdx) @@ -637,7 +689,7 @@ object CometIcebergNativeScan extends CometOperatorSerde[CometBatchScanExec] wit val partitionSpecToPoolIndex = mutable.HashMap[String, Int]() val nameMappingToPoolIndex = mutable.HashMap[String, Int]() val projectFieldIdsToPoolIndex = mutable.HashMap[Seq[Int], Int]() - val partitionDataToPoolIndex = mutable.HashMap[String, Int]() + val partitionDataToPoolIndex = mutable.HashMap[String, Int]() // Base64 bytes -> pool index val deleteFilesToPoolIndex = mutable.HashMap[Seq[OperatorOuterClass.IcebergDeleteFile], Int]() val residualToPoolIndex = mutable.HashMap[Option[Expr], Int]() @@ -727,104 +779,6 @@ object CometIcebergNativeScan extends CometOperatorSerde[CometBatchScanExec] wit throw new RuntimeException(msg) } - // Extract partition values for Hive-style partitioning - var partitionJsonOpt: Option[String] = None - try { - val partitionMethod = contentFileClass.getMethod("partition") - val partitionStruct = partitionMethod.invoke(dataFile) - - if (partitionStruct != null) { - // scalastyle:off classforname - val structLikeClass = - Class.forName(IcebergReflection.ClassNames.STRUCT_LIKE) - // scalastyle:on classforname - val sizeMethod = structLikeClass.getMethod("size") - val getMethod = - structLikeClass.getMethod("get", classOf[Int], classOf[Class[_]]) - - val partitionSize = - sizeMethod.invoke(partitionStruct).asInstanceOf[Int] - - if (partitionSize > 0) { - // Get the partition spec directly from the task - // scalastyle:off classforname - val partitionScanTaskClass = - Class.forName(IcebergReflection.ClassNames.PARTITION_SCAN_TASK) - // scalastyle:on classforname - val specMethod = partitionScanTaskClass.getMethod("spec") - val partitionSpec = specMethod.invoke(task) - - // Build JSON representation of partition values using json4s - - val partitionMap = scala.collection.mutable.Map[String, JValue]() - - if (partitionSpec != null) { - // Get the list of partition fields from the spec - val fieldsMethod = partitionSpec.getClass.getMethod("fields") - val fields = fieldsMethod - .invoke(partitionSpec) - .asInstanceOf[java.util.List[_]] - - for (i <- 0 until partitionSize) { - val value = - getMethod.invoke(partitionStruct, Int.box(i), classOf[Object]) - - // Get the partition field and check its transform type - val partitionField = fields.get(i) - - // Only inject partition values for IDENTITY transforms - val transformMethod = - partitionField.getClass.getMethod("transform") - val transform = transformMethod.invoke(partitionField) - val isIdentity = - transform.toString == IcebergReflection.Transforms.IDENTITY - - if (isIdentity) { - // Get the source field ID - val sourceIdMethod = - partitionField.getClass.getMethod("sourceId") - val sourceFieldId = - sourceIdMethod.invoke(partitionField).asInstanceOf[Int] - - val jsonValue = if (value == null) { - JNull - } else { - // Get field type from schema to serialize correctly - val fieldTypeStr = - try { - val findFieldMethod = - metadata.tableSchema.getClass - .getMethod("findField", classOf[Int]) - val field = findFieldMethod.invoke( - metadata.tableSchema, - sourceFieldId.asInstanceOf[Object]) - if (field != null) { - val typeMethod = field.getClass.getMethod("type") - typeMethod.invoke(field).toString - } else { - "unknown" - } - } catch { - case _: Exception => "unknown" - } - - partitionValueToJson(fieldTypeStr, value) - } - partitionMap(sourceFieldId.toString) = jsonValue - } - } - } - - val partitionJson = compact(render(JObject(partitionMap.toList))) - partitionJsonOpt = Some(partitionJson) - } - } - } catch { - case e: Exception => - logWarning( - s"Failed to extract partition values from DataFile: ${e.getMessage}") - } - val startMethod = contentScanTaskClass.getMethod("start") val start = startMethod.invoke(task).asInstanceOf[Long] taskBuilder.setStart(start) @@ -1056,7 +1010,17 @@ object CometIcebergNativeScan extends CometOperatorSerde[CometBatchScanExec] wit } } + // Calculate partition data pool size in bytes (protobuf format) + val partitionDataPoolBytes = icebergScanBuilder.getPartitionDataPoolList.asScala + .map(_.getSerializedSize) + .sum + logInfo(s"IcebergScan: $totalTasks tasks, ${allPoolSizes.size} pools ($avgDedup% avg dedup)") + if (partitionDataToPoolIndex.nonEmpty) { + logInfo( + s" Partition data pool: ${partitionDataToPoolIndex.size} unique values, " + + s"$partitionDataPoolBytes bytes (protobuf)") + } builder.clearChildren() Some(builder.setIcebergScan(icebergScanBuilder).build()) diff --git a/spark/src/test/scala/org/apache/comet/IcebergReadFromS3Suite.scala b/spark/src/test/scala/org/apache/comet/IcebergReadFromS3Suite.scala index c8d360ae57..00955e6291 100644 --- a/spark/src/test/scala/org/apache/comet/IcebergReadFromS3Suite.scala +++ b/spark/src/test/scala/org/apache/comet/IcebergReadFromS3Suite.scala @@ -163,6 +163,43 @@ class IcebergReadFromS3Suite extends CometS3TestBase { } } + test("large scale partitioned table - 100 partitions with many files") { + assume(icebergAvailable, "Iceberg not available in classpath") + + withSQLConf( + "spark.sql.files.maxRecordsPerFile" -> "50", + "spark.sql.adaptive.enabled" -> "false") { + spark.sql(""" + CREATE TABLE s3_catalog.db.large_partitioned_test ( + id INT, + data STRING, + partition_id INT + ) USING iceberg + PARTITIONED BY (partition_id) + """) + + spark.sql(""" + INSERT INTO s3_catalog.db.large_partitioned_test + SELECT + id, + CONCAT('data_', CAST(id AS STRING)) as data, + (id % 100) as partition_id + FROM range(500000) + """) + + checkIcebergNativeScan( + "SELECT COUNT(DISTINCT id) FROM s3_catalog.db.large_partitioned_test") + checkIcebergNativeScan( + "SELECT * FROM s3_catalog.db.large_partitioned_test WHERE id < 10 ORDER BY id") + checkIcebergNativeScan( + "SELECT SUM(id) FROM s3_catalog.db.large_partitioned_test WHERE partition_id = 0") + checkIcebergNativeScan( + "SELECT SUM(id) FROM s3_catalog.db.large_partitioned_test WHERE partition_id IN (0, 50, 99)") + + spark.sql("DROP TABLE s3_catalog.db.large_partitioned_test PURGE") + } + } + test("MOR table with deletes in S3") { assume(icebergAvailable, "Iceberg not available in classpath")