Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions native-engine/auron-jni-bridge/src/conf.rs
Original file line number Diff line number Diff line change
Expand Up @@ -57,6 +57,7 @@ define_conf!(IntConf, SUGGESTED_BATCH_MEM_SIZE);
define_conf!(IntConf, SUGGESTED_BATCH_MEM_SIZE_KWAY_MERGE);
define_conf!(BooleanConf, ORC_FORCE_POSITIONAL_EVOLUTION);
define_conf!(BooleanConf, ORC_TIMESTAMP_USE_MICROSECOND);
Copy link

Copilot AI Dec 25, 2025

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The configuration name "ORC_SCHEMA_ISCASE_SENSITIVE" with a default value of false is semantically confusing. The name suggests that when set to false (the default), case-insensitive matching is disabled, but the actual behavior is the opposite: false enables case-insensitive matching. This creates a double negative that makes the configuration harder to understand. Consider renaming to "ORC_SCHEMA_CASE_INSENSITIVE" (with default true for Hive compatibility) or improving the documentation to clearly state that false means "case-insensitive matching enabled" and true means "case-sensitive matching enabled".

Suggested change
define_conf!(BooleanConf, ORC_TIMESTAMP_USE_MICROSECOND);
define_conf!(BooleanConf, ORC_TIMESTAMP_USE_MICROSECOND);
// NOTE: The semantics of this flag are intentionally inverted for Hive compatibility:
// - Default: false => case-insensitive schema matching is ENABLED
// - true => case-sensitive schema matching is ENABLED
// The name ORC_SCHEMA_ISCASE_SENSITIVE is historical; do not change it without
// coordinating with the corresponding configuration on the JVM side.

Copilot uses AI. Check for mistakes.
define_conf!(BooleanConf, ORC_SCHEMA_CASE_SENSITIVE);
define_conf!(IntConf, UDAF_FALLBACK_NUM_UDAFS_TRIGGER_SORT_AGG);
define_conf!(BooleanConf, PARSE_JSON_ERROR_FALLBACK);
define_conf!(StringConf, NATIVE_LOG_LEVEL);
Expand Down
24 changes: 22 additions & 2 deletions native-engine/datafusion-ext-plans/src/orc_exec.rs
Original file line number Diff line number Diff line change
Expand Up @@ -160,6 +160,7 @@ impl ExecutionPlan for OrcExec {

let force_positional_evolution = conf::ORC_FORCE_POSITIONAL_EVOLUTION.value()?;
let use_microsecond_precision = conf::ORC_TIMESTAMP_USE_MICROSECOND.value()?;
let is_case_sensitive = conf::ORC_SCHEMA_CASE_SENSITIVE.value()?;

let opener: Arc<dyn FileOpener> = Arc::new(OrcOpener {
projection,
Expand All @@ -170,6 +171,7 @@ impl ExecutionPlan for OrcExec {
metrics: self.metrics.clone(),
force_positional_evolution,
use_microsecond_precision,
is_case_sensitive,
});

let file_stream = Box::pin(FileStream::new(
Expand Down Expand Up @@ -217,6 +219,7 @@ struct OrcOpener {
metrics: ExecutionPlanMetricsSet,
force_positional_evolution: bool,
use_microsecond_precision: bool,
is_case_sensitive: bool,
}

impl FileOpener for OrcOpener {
Expand Down Expand Up @@ -245,6 +248,7 @@ impl FileOpener for OrcOpener {
self.force_positional_evolution,
);
let use_microsecond = self.use_microsecond_precision;
let is_case = self.is_case_sensitive;

Ok(Box::pin(async move {
let mut builder = ArrowReaderBuilder::try_new_async(reader)
Expand All @@ -259,7 +263,7 @@ impl FileOpener for OrcOpener {
}

let (schema_mapping, projection) =
schema_adapter.map_schema(builder.file_metadata())?;
schema_adapter.map_schema(builder.file_metadata(), is_case)?;

let projection_mask =
ProjectionMask::roots(builder.file_metadata().root_data_type(), projection);
Expand Down Expand Up @@ -325,6 +329,7 @@ impl SchemaAdapter {
fn map_schema(
&self,
orc_file_meta: &FileMetadata,
is_case_sensitive: bool,
) -> Result<(Arc<dyn SchemaMapper>, Vec<usize>)> {
let mut projection = Vec::with_capacity(self.projected_schema.fields().len());
let mut field_mappings = vec![None; self.projected_schema.fields().len()];
Expand Down Expand Up @@ -363,7 +368,7 @@ impl SchemaAdapter {
}
}
}
} else {
} else if is_case_sensitive {
for named_column in file_named_columns {
if let Some((proj_idx, _)) =
self.projected_schema.fields().find(named_column.name())
Expand All @@ -372,6 +377,21 @@ impl SchemaAdapter {
projection.push(named_column.data_type().column_index());
}
}
} else {
for named_column in file_named_columns {
// Case-insensitive field name matching
let named_column_name_lower = named_column.name().to_lowercase();
if let Some((proj_idx, _)) = self
.projected_schema
.fields()
.iter()
.enumerate()
.find(|(_, f)| f.name().to_lowercase() == named_column_name_lower)
{
field_mappings[proj_idx] = Some(projection.len());
projection.push(named_column.data_type().column_index());
}
}
Comment on lines +381 to +394
Copy link

Copilot AI Dec 25, 2025

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The case-insensitive field matching performs string lowercase conversion on every iteration (line 383) and then again for each field comparison (line 389). For files with many columns, this could be inefficient. Consider pre-computing a HashMap of lowercase field names to their indices for O(1) lookup instead of O(n*m) where n is the number of file columns and m is the number of projected fields. Alternatively, at minimum, compute the lowercase versions of projected_schema field names once before the loop to avoid repeated conversions.

Copilot uses AI. Check for mistakes.
}

Ok((
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -237,6 +237,11 @@ public class SparkAuronConfiguration extends AuronConfiguration {
.description("use microsecond precision when reading ORC timestamp columns. ")
.booleanType()
.defaultValue(false);
public static final ConfigOption<Boolean> ORC_SCHEMA_CASE_SENSITIVE = ConfigOptions.key(
"auron.orc.schema.caseSensitive.enable")
.description("whether ORC file schema matching distinguishes between uppercase and lowercase. ")
.booleanType()
.defaultValue(false);

private final SparkConf sparkConf;

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -139,6 +139,8 @@ public enum AuronConf {
// use microsecond precision when reading ORC timestamp columns
ORC_TIMESTAMP_USE_MICROSECOND("spark.auron.orc.timestamp.use.microsecond", false),

ORC_SCHEMA_CASE_SENSITIVE("spark.auron.orc.schema.caseSensitive.enable", false),

NATIVE_LOG_LEVEL("spark.auron.native.log.level", "info");

public final String key;
Expand Down
Loading