Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
6 changes: 6 additions & 0 deletions native/core/src/execution/operators/iceberg_scan.rs
Original file line number Diff line number Diff line change
Expand Up @@ -62,6 +62,9 @@ pub struct IcebergScanExec {
file_task_groups: Vec<Vec<iceberg::scan::FileScanTask>>,
/// Metrics
metrics: ExecutionPlanMetricsSet,
#[allow(dead_code)]
/// Optional master key ID for encrypted Iceberg table
table_master_key: Option<String>,
}

impl IcebergScanExec {
Expand All @@ -70,6 +73,7 @@ impl IcebergScanExec {
schema: SchemaRef,
catalog_properties: HashMap<String, String>,
file_task_groups: Vec<Vec<iceberg::scan::FileScanTask>>,
table_master_key: Option<String>,
) -> Result<Self, ExecutionError> {
let output_schema = schema;
let num_partitions = file_task_groups.len();
Expand All @@ -84,6 +88,7 @@ impl IcebergScanExec {
catalog_properties,
file_task_groups,
metrics,
table_master_key,
})
}

Expand Down Expand Up @@ -166,6 +171,7 @@ impl IcebergScanExec {

let task_stream = futures::stream::iter(tasks.into_iter().map(Ok)).boxed();

// TODO: pass table master key to Iceberg Rust ArrowReaderBuilder
let reader = iceberg::arrow::ArrowReaderBuilder::new(file_io)
.with_batch_size(batch_size)
.with_data_file_concurrency_limit(context.session_config().target_partitions())
Expand Down
1 change: 1 addition & 0 deletions native/core/src/execution/planner.rs
Original file line number Diff line number Diff line change
Expand Up @@ -1180,6 +1180,7 @@ impl PhysicalPlanner {
required_schema,
catalog_properties,
file_task_groups,
scan.table_master_key.clone(),
)?;

Ok((
Expand Down
1 change: 1 addition & 0 deletions native/proto/src/proto/operator.proto
Original file line number Diff line number Diff line change
Expand Up @@ -132,6 +132,7 @@ message IcebergScan {
repeated string partition_data_pool = 10;
repeated DeleteFileList delete_files_pool = 11;
repeated spark.spark_expression.Expr residual_pool = 12;
optional string table_master_key = 13;
}

// Helper message for deduplicating field ID lists
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -729,6 +729,8 @@ object IcebergReflection extends Logging {
* Mapping from column names to Iceberg field IDs (built from scanSchema)
* @param catalogProperties
* Catalog properties for FileIO (S3 credentials, regions, etc.)
* @param tableMasterKey
* Optional table master key for encrypted table
*/
case class CometIcebergNativeScanMetadata(
table: Any,
Expand All @@ -739,7 +741,8 @@ case class CometIcebergNativeScanMetadata(
tableSchema: Any,
globalFieldIdMapping: Map[String, Int],
catalogProperties: Map[String, String],
fileFormat: String)
fileFormat: String,
tableMasterKey: Option[String])

object CometIcebergNativeScanMetadata extends Logging {

Expand Down Expand Up @@ -780,6 +783,16 @@ object CometIcebergNativeScanMetadata extends Logging {
}
}

// tableMasterKey is optional
val tableMasterKey = getTableProperties(table).flatMap { properties =>
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Can we make it so that we only call getTableProperties once and get all of the fields we want to avoid reflection overhead?

val masterKey = "encryption.key-id"
if (properties.containsKey(masterKey)) {
Some(properties.get(masterKey))
} else {
None
}
}

val globalFieldIdMapping = buildFieldIdMapping(scanSchema)

// File format is always PARQUET,
Expand All @@ -796,7 +809,8 @@ object CometIcebergNativeScanMetadata extends Logging {
tableSchema = tableSchema,
globalFieldIdMapping = globalFieldIdMapping,
catalogProperties = catalogProperties,
fileFormat = fileFormat)
fileFormat = fileFormat,
tableMasterKey = tableMasterKey)
}
}
}
Loading