diff --git a/native/core/src/execution/operators/iceberg_scan.rs b/native/core/src/execution/operators/iceberg_scan.rs index 2f639e9f70..db70c05f95 100644 --- a/native/core/src/execution/operators/iceberg_scan.rs +++ b/native/core/src/execution/operators/iceberg_scan.rs @@ -62,6 +62,9 @@ pub struct IcebergScanExec { file_task_groups: Vec>, /// Metrics metrics: ExecutionPlanMetricsSet, + #[allow(dead_code)] + /// Optional master key ID for encrypted Iceberg table + table_master_key: Option, } impl IcebergScanExec { @@ -70,6 +73,7 @@ impl IcebergScanExec { schema: SchemaRef, catalog_properties: HashMap, file_task_groups: Vec>, + table_master_key: Option, ) -> Result { let output_schema = schema; let num_partitions = file_task_groups.len(); @@ -84,6 +88,7 @@ impl IcebergScanExec { catalog_properties, file_task_groups, metrics, + table_master_key, }) } @@ -166,6 +171,7 @@ impl IcebergScanExec { let task_stream = futures::stream::iter(tasks.into_iter().map(Ok)).boxed(); + // TODO: pass table master key to Iceberg Rust ArrowReaderBuilder let reader = iceberg::arrow::ArrowReaderBuilder::new(file_io) .with_batch_size(batch_size) .with_data_file_concurrency_limit(context.session_config().target_partitions()) diff --git a/native/core/src/execution/planner.rs b/native/core/src/execution/planner.rs index b13fafe458..8c3b36be2e 100644 --- a/native/core/src/execution/planner.rs +++ b/native/core/src/execution/planner.rs @@ -1180,6 +1180,7 @@ impl PhysicalPlanner { required_schema, catalog_properties, file_task_groups, + scan.table_master_key.clone(), )?; Ok(( diff --git a/native/proto/src/proto/operator.proto b/native/proto/src/proto/operator.proto index a1a3c4bed9..8a4b9d3ed2 100644 --- a/native/proto/src/proto/operator.proto +++ b/native/proto/src/proto/operator.proto @@ -132,6 +132,7 @@ message IcebergScan { repeated string partition_data_pool = 10; repeated DeleteFileList delete_files_pool = 11; repeated spark.spark_expression.Expr residual_pool = 12; + optional string table_master_key = 13; } // Helper message for deduplicating field ID lists diff --git a/spark/src/main/scala/org/apache/comet/iceberg/IcebergReflection.scala b/spark/src/main/scala/org/apache/comet/iceberg/IcebergReflection.scala index 2d772063e4..c2b763e6e0 100644 --- a/spark/src/main/scala/org/apache/comet/iceberg/IcebergReflection.scala +++ b/spark/src/main/scala/org/apache/comet/iceberg/IcebergReflection.scala @@ -729,6 +729,8 @@ object IcebergReflection extends Logging { * Mapping from column names to Iceberg field IDs (built from scanSchema) * @param catalogProperties * Catalog properties for FileIO (S3 credentials, regions, etc.) + * @param tableMasterKey + * Optional table master key for encrypted table */ case class CometIcebergNativeScanMetadata( table: Any, @@ -739,7 +741,8 @@ case class CometIcebergNativeScanMetadata( tableSchema: Any, globalFieldIdMapping: Map[String, Int], catalogProperties: Map[String, String], - fileFormat: String) + fileFormat: String, + tableMasterKey: Option[String]) object CometIcebergNativeScanMetadata extends Logging { @@ -780,6 +783,16 @@ object CometIcebergNativeScanMetadata extends Logging { } } + // tableMasterKey is optional + val tableMasterKey = getTableProperties(table).flatMap { properties => + val masterKey = "encryption.key-id" + if (properties.containsKey(masterKey)) { + Some(properties.get(masterKey)) + } else { + None + } + } + val globalFieldIdMapping = buildFieldIdMapping(scanSchema) // File format is always PARQUET, @@ -796,7 +809,8 @@ object CometIcebergNativeScanMetadata extends Logging { tableSchema = tableSchema, globalFieldIdMapping = globalFieldIdMapping, catalogProperties = catalogProperties, - fileFormat = fileFormat) + fileFormat = fileFormat, + tableMasterKey = tableMasterKey) } } }