Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
168 changes: 108 additions & 60 deletions native/core/src/execution/planner.rs
Original file line number Diff line number Diff line change
Expand Up @@ -2629,6 +2629,103 @@ fn convert_spark_types_to_arrow_schema(
arrow_schema
}

/// Converts a protobuf PartitionValue to an iceberg Literal.
///
fn partition_value_to_literal(
proto_value: &spark_operator::PartitionValue,
) -> Result<Option<iceberg::spec::Literal>, ExecutionError> {
use spark_operator::partition_value::Value;

if proto_value.is_null {
return Ok(None);
}

let literal = match &proto_value.value {
Some(Value::IntVal(v)) => iceberg::spec::Literal::int(*v),
Some(Value::LongVal(v)) => iceberg::spec::Literal::long(*v),
Some(Value::DateVal(v)) => {
// Convert i64 to i32 for date (days since epoch)
let days = (*v)
.try_into()
.map_err(|_| GeneralError(format!("Date value out of range: {}", v)))?;
iceberg::spec::Literal::date(days)
}
Some(Value::TimestampVal(v)) => iceberg::spec::Literal::timestamp(*v),
Some(Value::TimestampTzVal(v)) => iceberg::spec::Literal::timestamptz(*v),
Some(Value::StringVal(s)) => iceberg::spec::Literal::string(s.clone()),
Some(Value::DoubleVal(v)) => iceberg::spec::Literal::double(*v),
Some(Value::FloatVal(v)) => iceberg::spec::Literal::float(*v),
Some(Value::DecimalVal(bytes)) => {
// Deserialize unscaled BigInteger bytes to i128
// BigInteger is serialized as signed big-endian bytes
if bytes.len() > 16 {
return Err(GeneralError(format!(
"Decimal bytes too large: {} bytes (max 16 for i128)",
bytes.len()
)));
}

// Convert big-endian bytes to i128
let mut buf = [0u8; 16];
let offset = 16 - bytes.len();
buf[offset..].copy_from_slice(bytes);

// Handle sign extension for negative numbers
let value = if !bytes.is_empty() && (bytes[0] & 0x80) != 0 {
// Negative number - sign extend
for byte in buf.iter_mut().take(offset) {
*byte = 0xFF;
}
i128::from_be_bytes(buf)
} else {
// Positive number
i128::from_be_bytes(buf)
};

iceberg::spec::Literal::decimal(value)
}
Some(Value::BoolVal(v)) => iceberg::spec::Literal::bool(*v),
Some(Value::UuidVal(bytes)) => {
// Deserialize UUID from 16 bytes
if bytes.len() != 16 {
return Err(GeneralError(format!(
"Invalid UUID bytes length: {} (expected 16)",
bytes.len()
)));
}
let uuid = uuid::Uuid::from_slice(bytes)
.map_err(|e| GeneralError(format!("Failed to parse UUID: {}", e)))?;
iceberg::spec::Literal::uuid(uuid)
}
Some(Value::FixedVal(bytes)) => iceberg::spec::Literal::fixed(bytes.to_vec()),
Some(Value::BinaryVal(bytes)) => iceberg::spec::Literal::binary(bytes.to_vec()),
None => {
return Err(GeneralError(
"PartitionValue has no value set and is_null is false".to_string(),
));
}
};

Ok(Some(literal))
}

/// Converts a protobuf PartitionData to an iceberg Struct.
///
/// Uses the existing Struct::from_iter() API from iceberg-rust to construct the struct
/// from the list of partition values.
/// This can potentially be upstreamed to iceberg_rust
fn partition_data_to_struct(
proto_partition: &spark_operator::PartitionData,
) -> Result<iceberg::spec::Struct, ExecutionError> {
let literals: Vec<Option<iceberg::spec::Literal>> = proto_partition
.values
.iter()
.map(partition_value_to_literal)
.collect::<Result<Vec<_>, _>>()?;

Ok(iceberg::spec::Struct::from_iter(literals))
}

/// Converts protobuf FileScanTasks from Scala into iceberg-rust FileScanTask objects.
///
/// Each task contains a residual predicate that is used for row-group level filtering
Expand All @@ -2655,19 +2752,6 @@ fn parse_file_scan_tasks(
})
.collect::<Result<Vec<_>, _>>()?;

let partition_type_cache: Vec<iceberg::spec::StructType> = proto_scan
.partition_type_pool
.iter()
.map(|json| {
serde_json::from_str(json).map_err(|e| {
ExecutionError::GeneralError(format!(
"Failed to parse partition type JSON from pool: {}",
e
))
})
})
.collect::<Result<Vec<_>, _>>()?;

let partition_spec_cache: Vec<Option<Arc<iceberg::spec::PartitionSpec>>> = proto_scan
.partition_spec_pool
.iter()
Expand Down Expand Up @@ -2721,19 +2805,7 @@ fn parse_file_scan_tasks(
})
.collect::<Result<Vec<_>, _>>()?;

let partition_data_cache: Vec<serde_json::Value> = proto_scan
.partition_data_pool
.iter()
.map(|json| {
serde_json::from_str(json).map_err(|e| {
ExecutionError::GeneralError(format!(
"Failed to parse partition data JSON from pool: {}",
e
))
})
})
.collect::<Result<Vec<_>, _>>()?;

// Partition data pool is in protobuf messages
let results: Result<Vec<_>, _> = proto_tasks
.iter()
.map(|proto_task| {
Expand Down Expand Up @@ -2787,48 +2859,24 @@ fn parse_file_scan_tasks(
};

let partition = if let Some(partition_data_idx) = proto_task.partition_data_idx {
let partition_type_idx = proto_task.partition_type_idx.ok_or_else(|| {
ExecutionError::GeneralError(
"partition_type_idx is required when partition_data_idx is present"
.to_string(),
)
})?;

let partition_data_value = partition_data_cache
// Get partition data from protobuf pool
let partition_data_proto = proto_scan
.partition_data_pool
.get(partition_data_idx as usize)
.ok_or_else(|| {
ExecutionError::GeneralError(format!(
"Invalid partition_data_idx: {} (cache size: {})",
"Invalid partition_data_idx: {} (pool size: {})",
partition_data_idx,
partition_data_cache.len()
proto_scan.partition_data_pool.len()
))
})?;

let partition_type = partition_type_cache
.get(partition_type_idx as usize)
.ok_or_else(|| {
ExecutionError::GeneralError(format!(
"Invalid partition_type_idx: {} (cache size: {})",
partition_type_idx,
partition_type_cache.len()
))
})?;

match iceberg::spec::Literal::try_from_json(
partition_data_value.clone(),
&iceberg::spec::Type::Struct(partition_type.clone()),
) {
Ok(Some(iceberg::spec::Literal::Struct(s))) => Some(s),
Ok(None) => None,
Ok(other) => {
return Err(GeneralError(format!(
"Expected struct literal for partition data, got: {:?}",
other
)))
}
// Convert protobuf PartitionData to iceberg Struct
match partition_data_to_struct(partition_data_proto) {
Ok(s) => Some(s),
Err(e) => {
return Err(GeneralError(format!(
"Failed to deserialize partition data from JSON: {}",
return Err(ExecutionError::GeneralError(format!(
"Failed to deserialize partition data from protobuf: {}",
e
)))
}
Expand Down
28 changes: 27 additions & 1 deletion native/proto/src/proto/operator.proto
Original file line number Diff line number Diff line change
Expand Up @@ -130,6 +130,32 @@ message CsvOptions {
bool truncated_rows = 8;
}

// Partition value for Iceberg partition data
message PartitionValue {
int32 field_id = 1;
oneof value {
int32 int_val = 2;
int64 long_val = 3;
int64 date_val = 4; // days since epoch
int64 timestamp_val = 5; // microseconds since epoch
int64 timestamp_tz_val = 6; // microseconds with timezone
string string_val = 7;
double double_val = 8;
float float_val = 9;
bytes decimal_val = 10; // unscaled BigInteger bytes
bool bool_val = 11;
bytes uuid_val = 12;
bytes fixed_val = 13;
bytes binary_val = 14;
Comment on lines +136 to +149
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

We may want to consider consolidating this with the existing Literal defined in protobuf. This does not need to happen for the current PR.

message Literal {
  oneof value {
    bool bool_val = 1;
    // Protobuf doesn't provide int8 and int16, we put them into int32 and convert
    // to int8 and int16 when deserializing.
    int32 byte_val = 2;
    int32 short_val = 3;
    int32 int_val = 4;
    int64 long_val = 5;
    float float_val = 6;
    double double_val = 7;
    string string_val = 8;
    bytes bytes_val = 9;
    bytes decimal_val = 10;
    ListLiteral list_val = 11;
  }

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

These are Iceberg types though.

}
bool is_null = 15;
}

// Collection of partition values for a single partition
message PartitionData {
repeated PartitionValue values = 1;
}

message IcebergScan {
// Schema to read
repeated SparkStructField required_schema = 1;
Expand All @@ -149,7 +175,7 @@ message IcebergScan {
repeated string partition_spec_pool = 7;
repeated string name_mapping_pool = 8;
repeated ProjectFieldIdList project_field_ids_pool = 9;
repeated string partition_data_pool = 10;
repeated PartitionData partition_data_pool = 10;
repeated DeleteFileList delete_files_pool = 11;
repeated spark.spark_expression.Expr residual_pool = 12;
}
Expand Down
Loading
Loading