diff --git a/native-engine/auron-jni-bridge/src/conf.rs b/native-engine/auron-jni-bridge/src/conf.rs index e99037ad2..383596d60 100644 --- a/native-engine/auron-jni-bridge/src/conf.rs +++ b/native-engine/auron-jni-bridge/src/conf.rs @@ -57,6 +57,7 @@ define_conf!(IntConf, SUGGESTED_BATCH_MEM_SIZE); define_conf!(IntConf, SUGGESTED_BATCH_MEM_SIZE_KWAY_MERGE); define_conf!(BooleanConf, ORC_FORCE_POSITIONAL_EVOLUTION); define_conf!(BooleanConf, ORC_TIMESTAMP_USE_MICROSECOND); +define_conf!(BooleanConf, ORC_SCHEMA_CASE_SENSITIVE); define_conf!(IntConf, UDAF_FALLBACK_NUM_UDAFS_TRIGGER_SORT_AGG); define_conf!(BooleanConf, PARSE_JSON_ERROR_FALLBACK); define_conf!(StringConf, NATIVE_LOG_LEVEL); diff --git a/native-engine/datafusion-ext-plans/src/orc_exec.rs b/native-engine/datafusion-ext-plans/src/orc_exec.rs index d07e4794e..c53cb6b5f 100644 --- a/native-engine/datafusion-ext-plans/src/orc_exec.rs +++ b/native-engine/datafusion-ext-plans/src/orc_exec.rs @@ -160,6 +160,7 @@ impl ExecutionPlan for OrcExec { let force_positional_evolution = conf::ORC_FORCE_POSITIONAL_EVOLUTION.value()?; let use_microsecond_precision = conf::ORC_TIMESTAMP_USE_MICROSECOND.value()?; + let is_case_sensitive = conf::ORC_SCHEMA_CASE_SENSITIVE.value()?; let opener: Arc = Arc::new(OrcOpener { projection, @@ -170,6 +171,7 @@ impl ExecutionPlan for OrcExec { metrics: self.metrics.clone(), force_positional_evolution, use_microsecond_precision, + is_case_sensitive, }); let file_stream = Box::pin(FileStream::new( @@ -217,6 +219,7 @@ struct OrcOpener { metrics: ExecutionPlanMetricsSet, force_positional_evolution: bool, use_microsecond_precision: bool, + is_case_sensitive: bool, } impl FileOpener for OrcOpener { @@ -245,6 +248,7 @@ impl FileOpener for OrcOpener { self.force_positional_evolution, ); let use_microsecond = self.use_microsecond_precision; + let is_case = self.is_case_sensitive; Ok(Box::pin(async move { let mut builder = ArrowReaderBuilder::try_new_async(reader) @@ -259,7 +263,7 @@ impl FileOpener for OrcOpener { } let (schema_mapping, projection) = - schema_adapter.map_schema(builder.file_metadata())?; + schema_adapter.map_schema(builder.file_metadata(), is_case)?; let projection_mask = ProjectionMask::roots(builder.file_metadata().root_data_type(), projection); @@ -325,6 +329,7 @@ impl SchemaAdapter { fn map_schema( &self, orc_file_meta: &FileMetadata, + is_case_sensitive: bool, ) -> Result<(Arc, Vec)> { let mut projection = Vec::with_capacity(self.projected_schema.fields().len()); let mut field_mappings = vec![None; self.projected_schema.fields().len()]; @@ -363,7 +368,7 @@ impl SchemaAdapter { } } } - } else { + } else if is_case_sensitive { for named_column in file_named_columns { if let Some((proj_idx, _)) = self.projected_schema.fields().find(named_column.name()) @@ -372,6 +377,21 @@ impl SchemaAdapter { projection.push(named_column.data_type().column_index()); } } + } else { + for named_column in file_named_columns { + // Case-insensitive field name matching + let named_column_name_lower = named_column.name().to_lowercase(); + if let Some((proj_idx, _)) = self + .projected_schema + .fields() + .iter() + .enumerate() + .find(|(_, f)| f.name().to_lowercase() == named_column_name_lower) + { + field_mappings[proj_idx] = Some(projection.len()); + projection.push(named_column.data_type().column_index()); + } + } } Ok(( diff --git a/spark-extension/src/main/java/org/apache/auron/spark/configuration/SparkAuronConfiguration.java b/spark-extension/src/main/java/org/apache/auron/spark/configuration/SparkAuronConfiguration.java index 8d29a0bc3..88f494142 100644 --- a/spark-extension/src/main/java/org/apache/auron/spark/configuration/SparkAuronConfiguration.java +++ b/spark-extension/src/main/java/org/apache/auron/spark/configuration/SparkAuronConfiguration.java @@ -237,6 +237,11 @@ public class SparkAuronConfiguration extends AuronConfiguration { .description("use microsecond precision when reading ORC timestamp columns. ") .booleanType() .defaultValue(false); + public static final ConfigOption ORC_SCHEMA_CASE_SENSITIVE = ConfigOptions.key( + "auron.orc.schema.caseSensitive.enable") + .description("whether ORC file schema matching distinguishes between uppercase and lowercase. ") + .booleanType() + .defaultValue(false); private final SparkConf sparkConf; diff --git a/spark-extension/src/main/java/org/apache/spark/sql/auron/AuronConf.java b/spark-extension/src/main/java/org/apache/spark/sql/auron/AuronConf.java index b3cbca1a5..2943d4b2a 100644 --- a/spark-extension/src/main/java/org/apache/spark/sql/auron/AuronConf.java +++ b/spark-extension/src/main/java/org/apache/spark/sql/auron/AuronConf.java @@ -139,6 +139,8 @@ public enum AuronConf { // use microsecond precision when reading ORC timestamp columns ORC_TIMESTAMP_USE_MICROSECOND("spark.auron.orc.timestamp.use.microsecond", false), + ORC_SCHEMA_CASE_SENSITIVE("spark.auron.orc.schema.caseSensitive.enable", false), + NATIVE_LOG_LEVEL("spark.auron.native.log.level", "info"); public final String key;