From b7cda692b66371e1e6bb0ac305b085b919633d86 Mon Sep 17 00:00:00 2001 From: Andy Grove Date: Wed, 21 Jan 2026 18:53:53 -0700 Subject: [PATCH 1/5] refactor: rename scan.allowIncompatible to scan.unsignedSmallIntSafetyCheck This change renames `spark.comet.scan.allowIncompatible` to `spark.comet.scan.unsignedSmallIntSafetyCheck` and changes its default to `true` (enabled by default). The key change is that ByteType is removed from the safety check entirely, leaving only ShortType subject to fallback behavior. ## Why ByteType is Safe ByteType columns are always safe for native execution because: 1. **Parquet type mapping**: Spark's ByteType can only originate from signed INT8 in Parquet. There is no unsigned 8-bit Parquet type (UINT_8) that maps to ByteType. 2. **UINT_8 maps to ShortType**: When Parquet files contain unsigned UINT_8 columns, Spark maps them to ShortType (16-bit), not ByteType. This is because UINT_8 values (0-255) exceed the signed byte range (-128 to 127). 3. **Truncation preserves signed values**: When storing signed INT8 in 8 bits, the truncation from any wider representation preserves the correct signed value due to two's complement representation. ## Why ShortType Needs the Safety Check ShortType columns may be problematic because: 1. **Ambiguous origin**: ShortType can come from either signed INT16 (safe) or unsigned UINT_8 (potentially incompatible). 2. **Different reader behavior**: Arrow-based readers like DataFusion respect the unsigned UINT_8 logical type and read data as unsigned, while Spark ignores the logical type and reads as signed. This can produce different results for values 128-255. 3. **No metadata available**: At scan time, Comet cannot determine whether a ShortType column originated from INT16 or UINT_8, so the safety check conservatively falls back to Spark for all ShortType columns. Users who know their data does not contain unsigned UINT_8 columns can disable the safety check with `spark.comet.scan.unsignedSmallIntSafetyCheck=false`. Co-Authored-By: Claude Opus 4.5 --- .../main/scala/org/apache/comet/CometConf.scala | 15 ++++++++++----- docs/source/contributor-guide/parquet_scans.md | 13 +++++++------ .../org/apache/comet/rules/CometScanRule.scala | 10 ++++++---- .../org/apache/comet/CometExpressionSuite.scala | 11 ++++++----- .../org/apache/comet/CometFuzzTestBase.scala | 2 +- .../comet/parquet/CometParquetWriterSuite.scala | 4 ++-- .../apache/comet/parquet/ParquetReadSuite.scala | 2 +- .../apache/comet/rules/CometScanRuleSuite.scala | 14 +++++++------- .../org/apache/spark/sql/CometTestBase.scala | 6 +++--- 9 files changed, 43 insertions(+), 34 deletions(-) diff --git a/common/src/main/scala/org/apache/comet/CometConf.scala b/common/src/main/scala/org/apache/comet/CometConf.scala index 89dbb6468d..8107d33fb6 100644 --- a/common/src/main/scala/org/apache/comet/CometConf.scala +++ b/common/src/main/scala/org/apache/comet/CometConf.scala @@ -715,13 +715,18 @@ object CometConf extends ShimCometConf { .booleanConf .createWithDefault(false) - val COMET_SCAN_ALLOW_INCOMPATIBLE: ConfigEntry[Boolean] = - conf("spark.comet.scan.allowIncompatible") + val COMET_SCAN_UNSIGNED_SMALL_INT_SAFETY_CHECK: ConfigEntry[Boolean] = + conf("spark.comet.scan.unsignedSmallIntSafetyCheck") .category(CATEGORY_SCAN) - .doc("Some Comet scan implementations are not currently fully compatible with Spark for " + - s"all datatypes. Set this config to true to allow them anyway. $COMPAT_GUIDE.") + .doc( + "Parquet files may contain unsigned 8-bit integers (UINT_8) which Spark maps to " + + "ShortType. When this config is true (default), Comet falls back to Spark for " + + "ShortType columns because we cannot distinguish signed INT16 (safe) from unsigned " + + "UINT_8 (may produce different results). Set to false to allow native execution of " + + "ShortType columns if you know your data does not contain unsigned UINT_8 columns " + + s"from improperly encoded Parquet files. $COMPAT_GUIDE.") .booleanConf - .createWithDefault(false) + .createWithDefault(true) val COMET_EXEC_STRICT_FLOATING_POINT: ConfigEntry[Boolean] = conf("spark.comet.exec.strictFloatingPoint") diff --git a/docs/source/contributor-guide/parquet_scans.md b/docs/source/contributor-guide/parquet_scans.md index 8c2fbfd8f6..7924e12ee1 100644 --- a/docs/source/contributor-guide/parquet_scans.md +++ b/docs/source/contributor-guide/parquet_scans.md @@ -42,12 +42,13 @@ implementation: The `native_datafusion` and `native_iceberg_compat` scans share the following limitations: -- When reading Parquet files written by systems other than Spark that contain columns with the logical types `UINT_8` - or `UINT_16`, Comet will produce different results than Spark because Spark does not preserve or understand these - logical types. Arrow-based readers, such as DataFusion and Comet do respect these types and read the data as unsigned - rather than signed. By default, Comet will fall back to `native_comet` when scanning Parquet files containing `byte` or `short` - types (regardless of the logical type). This behavior can be disabled by setting - `spark.comet.scan.allowIncompatible=true`. +- When reading Parquet files written by systems other than Spark that contain columns with the logical type `UINT_8` + (unsigned 8-bit integers), Comet may produce different results than Spark. Spark maps `UINT_8` to `ShortType`, but + Comet's Arrow-based readers respect the unsigned type and read the data as unsigned rather than signed. Since Comet + cannot distinguish `ShortType` columns that came from `UINT_8` versus signed `INT16`, by default Comet falls back to + `native_comet` when scanning Parquet files containing `ShortType` columns. This behavior can be disabled by setting + `spark.comet.scan.unsignedSmallIntSafetyCheck=false`. Note that `ByteType` columns are always safe because they can + only come from signed `INT8`, where truncation preserves the signed value. - No support for default values that are nested types (e.g., maps, arrays, structs). Literal default values are supported. The `native_datafusion` scan has some additional limitations: diff --git a/spark/src/main/scala/org/apache/comet/rules/CometScanRule.scala b/spark/src/main/scala/org/apache/comet/rules/CometScanRule.scala index 8ba2c5845e..468503eafc 100644 --- a/spark/src/main/scala/org/apache/comet/rules/CometScanRule.scala +++ b/spark/src/main/scala/org/apache/comet/rules/CometScanRule.scala @@ -680,11 +680,13 @@ case class CometScanTypeChecker(scanImpl: String) extends DataTypeSupport with C name: String, fallbackReasons: ListBuffer[String]): Boolean = { dt match { - case ByteType | ShortType + case ShortType if scanImpl != CometConf.SCAN_NATIVE_COMET && - !CometConf.COMET_SCAN_ALLOW_INCOMPATIBLE.get() => - fallbackReasons += s"$scanImpl scan cannot read $dt when " + - s"${CometConf.COMET_SCAN_ALLOW_INCOMPATIBLE.key} is false. ${CometConf.COMPAT_GUIDE}." + CometConf.COMET_SCAN_UNSIGNED_SMALL_INT_SAFETY_CHECK.get() => + fallbackReasons += s"$scanImpl scan may not handle unsigned UINT_8 correctly for $dt. " + + s"Set ${CometConf.COMET_SCAN_UNSIGNED_SMALL_INT_SAFETY_CHECK.key}=false to allow " + + s"native execution if your data does not contain unsigned small integers. " + + CometConf.COMPAT_GUIDE false case _: StructType | _: ArrayType | _: MapType if scanImpl == CometConf.SCAN_NATIVE_COMET => false diff --git a/spark/src/test/scala/org/apache/comet/CometExpressionSuite.scala b/spark/src/test/scala/org/apache/comet/CometExpressionSuite.scala index 0352da7850..3b721435f0 100644 --- a/spark/src/test/scala/org/apache/comet/CometExpressionSuite.scala +++ b/spark/src/test/scala/org/apache/comet/CometExpressionSuite.scala @@ -191,7 +191,7 @@ class CometExpressionSuite extends CometTestBase with AdaptiveSparkPlanHelper { withTempDir { dir => val path = new Path(dir.toURI.toString, "test.parquet") makeParquetFileAllPrimitiveTypes(path, dictionaryEnabled = dictionaryEnabled, 10000) - withSQLConf(CometConf.COMET_SCAN_ALLOW_INCOMPATIBLE.key -> "false") { + withSQLConf(CometConf.COMET_SCAN_UNSIGNED_SMALL_INT_SAFETY_CHECK.key -> "true") { withParquetTable(path.toString, "tbl") { checkSparkAnswerAndOperator("select * FROM tbl WHERE _2 > 100") } @@ -203,10 +203,11 @@ class CometExpressionSuite extends CometTestBase with AdaptiveSparkPlanHelper { test("uint data type support") { Seq(true, false).foreach { dictionaryEnabled => // TODO: Once the question of what to get back from uint_8, uint_16 types is resolved, - // we can also update this test to check for COMET_SCAN_ALLOW_INCOMPATIBLE=true - Seq(false).foreach { allowIncompatible => + // we can also update this test to check for COMET_SCAN_UNSIGNED_SMALL_INT_SAFETY_CHECK=false + Seq(true).foreach { safetyCheck => { - withSQLConf(CometConf.COMET_SCAN_ALLOW_INCOMPATIBLE.key -> allowIncompatible.toString) { + withSQLConf( + CometConf.COMET_SCAN_UNSIGNED_SMALL_INT_SAFETY_CHECK.key -> safetyCheck.toString) { withTempDir { dir => val path = new Path(dir.toURI.toString, "testuint.parquet") makeParquetFileAllPrimitiveTypes( @@ -217,7 +218,7 @@ class CometExpressionSuite extends CometTestBase with AdaptiveSparkPlanHelper { withParquetTable(path.toString, "tbl") { val qry = "select _9 from tbl order by _11" if (usingDataSourceExec(conf)) { - if (!allowIncompatible) { + if (safetyCheck) { checkSparkAnswerAndOperator(qry) } else { // need to convert the values to unsigned values diff --git a/spark/src/test/scala/org/apache/comet/CometFuzzTestBase.scala b/spark/src/test/scala/org/apache/comet/CometFuzzTestBase.scala index 74858ed614..025488b945 100644 --- a/spark/src/test/scala/org/apache/comet/CometFuzzTestBase.scala +++ b/spark/src/test/scala/org/apache/comet/CometFuzzTestBase.scala @@ -120,7 +120,7 @@ class CometFuzzTestBase extends CometTestBase with AdaptiveSparkPlanHelper { super.test(testName + s" ($scanImpl, $shuffleMode shuffle)", testTags: _*) { withSQLConf( CometConf.COMET_NATIVE_SCAN_IMPL.key -> scanImpl, - CometConf.COMET_SCAN_ALLOW_INCOMPATIBLE.key -> "true", + CometConf.COMET_SCAN_UNSIGNED_SMALL_INT_SAFETY_CHECK.key -> "false", CometConf.COMET_SHUFFLE_MODE.key -> shuffleMode) { testFun } diff --git a/spark/src/test/scala/org/apache/comet/parquet/CometParquetWriterSuite.scala b/spark/src/test/scala/org/apache/comet/parquet/CometParquetWriterSuite.scala index c4856c3cc2..0eb3cfa0f2 100644 --- a/spark/src/test/scala/org/apache/comet/parquet/CometParquetWriterSuite.scala +++ b/spark/src/test/scala/org/apache/comet/parquet/CometParquetWriterSuite.scala @@ -459,8 +459,8 @@ class CometParquetWriterSuite extends CometTestBase { CometConf.COMET_NATIVE_PARQUET_WRITE_ENABLED.key -> "true", // explicitly set scan impl to override CI defaults CometConf.COMET_NATIVE_SCAN_IMPL.key -> "auto", - // COMET_SCAN_ALLOW_INCOMPATIBLE is needed because input data contains byte/short types - CometConf.COMET_SCAN_ALLOW_INCOMPATIBLE.key -> "true", + // Disable unsigned small int safety check for ShortType columns + CometConf.COMET_SCAN_UNSIGNED_SMALL_INT_SAFETY_CHECK.key -> "false", // use a different timezone to make sure that timezone handling works with nested types SQLConf.SESSION_LOCAL_TIMEZONE.key -> "America/Halifax") { diff --git a/spark/src/test/scala/org/apache/comet/parquet/ParquetReadSuite.scala b/spark/src/test/scala/org/apache/comet/parquet/ParquetReadSuite.scala index b028a70dca..e658979e12 100644 --- a/spark/src/test/scala/org/apache/comet/parquet/ParquetReadSuite.scala +++ b/spark/src/test/scala/org/apache/comet/parquet/ParquetReadSuite.scala @@ -1904,7 +1904,7 @@ class ParquetReadV1Suite extends ParquetReadSuite with AdaptiveSparkPlanHelper { val rows = 1000 withSQLConf( CometConf.COMET_NATIVE_SCAN_IMPL.key -> CometConf.SCAN_NATIVE_ICEBERG_COMPAT, - CometConf.COMET_SCAN_ALLOW_INCOMPATIBLE.key -> "false") { + CometConf.COMET_SCAN_UNSIGNED_SMALL_INT_SAFETY_CHECK.key -> "true") { makeParquetFileAllPrimitiveTypes( path, dictionaryEnabled = false, diff --git a/spark/src/test/scala/org/apache/comet/rules/CometScanRuleSuite.scala b/spark/src/test/scala/org/apache/comet/rules/CometScanRuleSuite.scala index 7f54d0b7ca..8b420d2204 100644 --- a/spark/src/test/scala/org/apache/comet/rules/CometScanRuleSuite.scala +++ b/spark/src/test/scala/org/apache/comet/rules/CometScanRuleSuite.scala @@ -140,21 +140,21 @@ class CometScanRuleSuite extends CometTestBase { } } - test("CometScanRule should fallback to Spark for unsupported data types in v1 scan") { + test("CometScanRule should fallback to Spark for ShortType when safety check enabled") { withTempPath { path => - // Create test data with unsupported types (e.g., BinaryType, CalendarIntervalType) + // Create test data with ShortType which may be from unsigned UINT_8 import org.apache.spark.sql.types._ val unsupportedSchema = new StructType( Array( StructField("id", DataTypes.IntegerType, nullable = true), StructField( "value", - DataTypes.ByteType, + DataTypes.ShortType, nullable = true - ), // Unsupported in some scan modes + ), // May be from unsigned UINT_8 StructField("name", DataTypes.StringType, nullable = true))) - val testData = Seq(Row(1, 1.toByte, "test1"), Row(2, -1.toByte, "test2")) + val testData = Seq(Row(1, 1.toShort, "test1"), Row(2, -1.toShort, "test2")) val df = spark.createDataFrame(spark.sparkContext.parallelize(testData), unsupportedSchema) df.write.parquet(path.toString) @@ -167,10 +167,10 @@ class CometScanRuleSuite extends CometTestBase { withSQLConf( CometConf.COMET_NATIVE_SCAN_IMPL.key -> CometConf.SCAN_NATIVE_ICEBERG_COMPAT, - CometConf.COMET_SCAN_ALLOW_INCOMPATIBLE.key -> "false") { + CometConf.COMET_SCAN_UNSIGNED_SMALL_INT_SAFETY_CHECK.key -> "true") { val transformedPlan = applyCometScanRule(sparkPlan) - // Should fallback to Spark due to unsupported ByteType in schema + // Should fallback to Spark due to ShortType (may be from unsigned UINT_8) assert(countOperators(transformedPlan, classOf[FileSourceScanExec]) == 1) assert(countOperators(transformedPlan, classOf[CometScanExec]) == 0) } diff --git a/spark/src/test/scala/org/apache/spark/sql/CometTestBase.scala b/spark/src/test/scala/org/apache/spark/sql/CometTestBase.scala index 81ac72247f..7959ef5093 100644 --- a/spark/src/test/scala/org/apache/spark/sql/CometTestBase.scala +++ b/spark/src/test/scala/org/apache/spark/sql/CometTestBase.scala @@ -83,7 +83,7 @@ abstract class CometTestBase conf.set(CometConf.COMET_RESPECT_PARQUET_FILTER_PUSHDOWN.key, "true") conf.set(CometConf.COMET_SPARK_TO_ARROW_ENABLED.key, "true") conf.set(CometConf.COMET_NATIVE_SCAN_ENABLED.key, "true") - conf.set(CometConf.COMET_SCAN_ALLOW_INCOMPATIBLE.key, "true") + conf.set(CometConf.COMET_SCAN_UNSIGNED_SMALL_INT_SAFETY_CHECK.key, "false") conf.set(CometConf.COMET_ONHEAP_MEMORY_OVERHEAD.key, "2g") conf.set(CometConf.COMET_EXEC_SORT_MERGE_JOIN_WITH_JOIN_FILTER_ENABLED.key, "true") // SortOrder is incompatible for mixed zero and negative zero floating point values, but @@ -1113,7 +1113,7 @@ abstract class CometTestBase * |""".stripMargin, * "select arr from tbl", * sqlConf = Seq( - * CometConf.COMET_SCAN_ALLOW_INCOMPATIBLE.key -> "false", + * CometConf.COMET_SCAN_UNSIGNED_SMALL_INT_SAFETY_CHECK.key -> "true", * "spark.comet.explainFallback.enabled" -> "false" * ), * debugCometDF = df => { @@ -1275,6 +1275,6 @@ abstract class CometTestBase def usingDataSourceExecWithIncompatTypes(conf: SQLConf): Boolean = { usingDataSourceExec(conf) && - !CometConf.COMET_SCAN_ALLOW_INCOMPATIBLE.get(conf) + CometConf.COMET_SCAN_UNSIGNED_SMALL_INT_SAFETY_CHECK.get(conf) } } From 8185dae48b78cd61cd1cb14ed3608deeebb55544 Mon Sep 17 00:00:00 2001 From: Andy Grove Date: Thu, 22 Jan 2026 07:34:02 -0700 Subject: [PATCH 2/5] format --- spark/src/main/scala/org/apache/comet/rules/CometScanRule.scala | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/spark/src/main/scala/org/apache/comet/rules/CometScanRule.scala b/spark/src/main/scala/org/apache/comet/rules/CometScanRule.scala index 468503eafc..f20b8211f5 100644 --- a/spark/src/main/scala/org/apache/comet/rules/CometScanRule.scala +++ b/spark/src/main/scala/org/apache/comet/rules/CometScanRule.scala @@ -685,7 +685,7 @@ case class CometScanTypeChecker(scanImpl: String) extends DataTypeSupport with C CometConf.COMET_SCAN_UNSIGNED_SMALL_INT_SAFETY_CHECK.get() => fallbackReasons += s"$scanImpl scan may not handle unsigned UINT_8 correctly for $dt. " + s"Set ${CometConf.COMET_SCAN_UNSIGNED_SMALL_INT_SAFETY_CHECK.key}=false to allow " + - s"native execution if your data does not contain unsigned small integers. " + + "native execution if your data does not contain unsigned small integers. " + CometConf.COMPAT_GUIDE false case _: StructType | _: ArrayType | _: MapType if scanImpl == CometConf.SCAN_NATIVE_COMET => From b0305d2588a948c3f864f76332071feac844d8d7 Mon Sep 17 00:00:00 2001 From: Andy Grove Date: Thu, 22 Jan 2026 07:34:42 -0700 Subject: [PATCH 3/5] rename --- common/src/main/scala/org/apache/comet/CometConf.scala | 2 +- .../src/main/scala/org/apache/comet/rules/CometScanRule.scala | 4 ++-- .../test/scala/org/apache/comet/CometExpressionSuite.scala | 4 ++-- spark/src/test/scala/org/apache/comet/CometFuzzTestBase.scala | 2 +- .../org/apache/comet/parquet/CometParquetWriterSuite.scala | 2 +- .../scala/org/apache/comet/parquet/ParquetReadSuite.scala | 2 +- .../scala/org/apache/comet/rules/CometScanRuleSuite.scala | 2 +- spark/src/test/scala/org/apache/spark/sql/CometTestBase.scala | 4 ++-- 8 files changed, 11 insertions(+), 11 deletions(-) diff --git a/common/src/main/scala/org/apache/comet/CometConf.scala b/common/src/main/scala/org/apache/comet/CometConf.scala index 8107d33fb6..a57458b5d3 100644 --- a/common/src/main/scala/org/apache/comet/CometConf.scala +++ b/common/src/main/scala/org/apache/comet/CometConf.scala @@ -715,7 +715,7 @@ object CometConf extends ShimCometConf { .booleanConf .createWithDefault(false) - val COMET_SCAN_UNSIGNED_SMALL_INT_SAFETY_CHECK: ConfigEntry[Boolean] = + val COMET_PARQUET_UNSIGNED_SMALL_INT__CHECK: ConfigEntry[Boolean] = conf("spark.comet.scan.unsignedSmallIntSafetyCheck") .category(CATEGORY_SCAN) .doc( diff --git a/spark/src/main/scala/org/apache/comet/rules/CometScanRule.scala b/spark/src/main/scala/org/apache/comet/rules/CometScanRule.scala index f20b8211f5..0fa8ed6f4c 100644 --- a/spark/src/main/scala/org/apache/comet/rules/CometScanRule.scala +++ b/spark/src/main/scala/org/apache/comet/rules/CometScanRule.scala @@ -682,9 +682,9 @@ case class CometScanTypeChecker(scanImpl: String) extends DataTypeSupport with C dt match { case ShortType if scanImpl != CometConf.SCAN_NATIVE_COMET && - CometConf.COMET_SCAN_UNSIGNED_SMALL_INT_SAFETY_CHECK.get() => + CometConf.COMET_PARQUET_UNSIGNED_SMALL_INT__CHECK.get() => fallbackReasons += s"$scanImpl scan may not handle unsigned UINT_8 correctly for $dt. " + - s"Set ${CometConf.COMET_SCAN_UNSIGNED_SMALL_INT_SAFETY_CHECK.key}=false to allow " + + s"Set ${CometConf.COMET_PARQUET_UNSIGNED_SMALL_INT__CHECK.key}=false to allow " + "native execution if your data does not contain unsigned small integers. " + CometConf.COMPAT_GUIDE false diff --git a/spark/src/test/scala/org/apache/comet/CometExpressionSuite.scala b/spark/src/test/scala/org/apache/comet/CometExpressionSuite.scala index 3b721435f0..2dbd08c491 100644 --- a/spark/src/test/scala/org/apache/comet/CometExpressionSuite.scala +++ b/spark/src/test/scala/org/apache/comet/CometExpressionSuite.scala @@ -191,7 +191,7 @@ class CometExpressionSuite extends CometTestBase with AdaptiveSparkPlanHelper { withTempDir { dir => val path = new Path(dir.toURI.toString, "test.parquet") makeParquetFileAllPrimitiveTypes(path, dictionaryEnabled = dictionaryEnabled, 10000) - withSQLConf(CometConf.COMET_SCAN_UNSIGNED_SMALL_INT_SAFETY_CHECK.key -> "true") { + withSQLConf(CometConf.COMET_PARQUET_UNSIGNED_SMALL_INT__CHECK.key -> "true") { withParquetTable(path.toString, "tbl") { checkSparkAnswerAndOperator("select * FROM tbl WHERE _2 > 100") } @@ -207,7 +207,7 @@ class CometExpressionSuite extends CometTestBase with AdaptiveSparkPlanHelper { Seq(true).foreach { safetyCheck => { withSQLConf( - CometConf.COMET_SCAN_UNSIGNED_SMALL_INT_SAFETY_CHECK.key -> safetyCheck.toString) { + CometConf.COMET_PARQUET_UNSIGNED_SMALL_INT__CHECK.key -> safetyCheck.toString) { withTempDir { dir => val path = new Path(dir.toURI.toString, "testuint.parquet") makeParquetFileAllPrimitiveTypes( diff --git a/spark/src/test/scala/org/apache/comet/CometFuzzTestBase.scala b/spark/src/test/scala/org/apache/comet/CometFuzzTestBase.scala index 025488b945..12802a64bb 100644 --- a/spark/src/test/scala/org/apache/comet/CometFuzzTestBase.scala +++ b/spark/src/test/scala/org/apache/comet/CometFuzzTestBase.scala @@ -120,7 +120,7 @@ class CometFuzzTestBase extends CometTestBase with AdaptiveSparkPlanHelper { super.test(testName + s" ($scanImpl, $shuffleMode shuffle)", testTags: _*) { withSQLConf( CometConf.COMET_NATIVE_SCAN_IMPL.key -> scanImpl, - CometConf.COMET_SCAN_UNSIGNED_SMALL_INT_SAFETY_CHECK.key -> "false", + CometConf.COMET_PARQUET_UNSIGNED_SMALL_INT__CHECK.key -> "false", CometConf.COMET_SHUFFLE_MODE.key -> shuffleMode) { testFun } diff --git a/spark/src/test/scala/org/apache/comet/parquet/CometParquetWriterSuite.scala b/spark/src/test/scala/org/apache/comet/parquet/CometParquetWriterSuite.scala index 0eb3cfa0f2..bdcd093e15 100644 --- a/spark/src/test/scala/org/apache/comet/parquet/CometParquetWriterSuite.scala +++ b/spark/src/test/scala/org/apache/comet/parquet/CometParquetWriterSuite.scala @@ -460,7 +460,7 @@ class CometParquetWriterSuite extends CometTestBase { // explicitly set scan impl to override CI defaults CometConf.COMET_NATIVE_SCAN_IMPL.key -> "auto", // Disable unsigned small int safety check for ShortType columns - CometConf.COMET_SCAN_UNSIGNED_SMALL_INT_SAFETY_CHECK.key -> "false", + CometConf.COMET_PARQUET_UNSIGNED_SMALL_INT__CHECK.key -> "false", // use a different timezone to make sure that timezone handling works with nested types SQLConf.SESSION_LOCAL_TIMEZONE.key -> "America/Halifax") { diff --git a/spark/src/test/scala/org/apache/comet/parquet/ParquetReadSuite.scala b/spark/src/test/scala/org/apache/comet/parquet/ParquetReadSuite.scala index e658979e12..348f3ba9aa 100644 --- a/spark/src/test/scala/org/apache/comet/parquet/ParquetReadSuite.scala +++ b/spark/src/test/scala/org/apache/comet/parquet/ParquetReadSuite.scala @@ -1904,7 +1904,7 @@ class ParquetReadV1Suite extends ParquetReadSuite with AdaptiveSparkPlanHelper { val rows = 1000 withSQLConf( CometConf.COMET_NATIVE_SCAN_IMPL.key -> CometConf.SCAN_NATIVE_ICEBERG_COMPAT, - CometConf.COMET_SCAN_UNSIGNED_SMALL_INT_SAFETY_CHECK.key -> "true") { + CometConf.COMET_PARQUET_UNSIGNED_SMALL_INT__CHECK.key -> "true") { makeParquetFileAllPrimitiveTypes( path, dictionaryEnabled = false, diff --git a/spark/src/test/scala/org/apache/comet/rules/CometScanRuleSuite.scala b/spark/src/test/scala/org/apache/comet/rules/CometScanRuleSuite.scala index 8b420d2204..8737696439 100644 --- a/spark/src/test/scala/org/apache/comet/rules/CometScanRuleSuite.scala +++ b/spark/src/test/scala/org/apache/comet/rules/CometScanRuleSuite.scala @@ -167,7 +167,7 @@ class CometScanRuleSuite extends CometTestBase { withSQLConf( CometConf.COMET_NATIVE_SCAN_IMPL.key -> CometConf.SCAN_NATIVE_ICEBERG_COMPAT, - CometConf.COMET_SCAN_UNSIGNED_SMALL_INT_SAFETY_CHECK.key -> "true") { + CometConf.COMET_PARQUET_UNSIGNED_SMALL_INT__CHECK.key -> "true") { val transformedPlan = applyCometScanRule(sparkPlan) // Should fallback to Spark due to ShortType (may be from unsigned UINT_8) diff --git a/spark/src/test/scala/org/apache/spark/sql/CometTestBase.scala b/spark/src/test/scala/org/apache/spark/sql/CometTestBase.scala index 7959ef5093..8ce3a296a7 100644 --- a/spark/src/test/scala/org/apache/spark/sql/CometTestBase.scala +++ b/spark/src/test/scala/org/apache/spark/sql/CometTestBase.scala @@ -83,7 +83,7 @@ abstract class CometTestBase conf.set(CometConf.COMET_RESPECT_PARQUET_FILTER_PUSHDOWN.key, "true") conf.set(CometConf.COMET_SPARK_TO_ARROW_ENABLED.key, "true") conf.set(CometConf.COMET_NATIVE_SCAN_ENABLED.key, "true") - conf.set(CometConf.COMET_SCAN_UNSIGNED_SMALL_INT_SAFETY_CHECK.key, "false") + conf.set(CometConf.COMET_PARQUET_UNSIGNED_SMALL_INT__CHECK.key, "false") conf.set(CometConf.COMET_ONHEAP_MEMORY_OVERHEAD.key, "2g") conf.set(CometConf.COMET_EXEC_SORT_MERGE_JOIN_WITH_JOIN_FILTER_ENABLED.key, "true") // SortOrder is incompatible for mixed zero and negative zero floating point values, but @@ -1275,6 +1275,6 @@ abstract class CometTestBase def usingDataSourceExecWithIncompatTypes(conf: SQLConf): Boolean = { usingDataSourceExec(conf) && - CometConf.COMET_SCAN_UNSIGNED_SMALL_INT_SAFETY_CHECK.get(conf) + CometConf.COMET_PARQUET_UNSIGNED_SMALL_INT__CHECK.get(conf) } } From 0bae43cd0ace9ab0276af626c45d2908e7469fe6 Mon Sep 17 00:00:00 2001 From: Andy Grove Date: Thu, 22 Jan 2026 07:36:09 -0700 Subject: [PATCH 4/5] rename --- common/src/main/scala/org/apache/comet/CometConf.scala | 2 +- .../src/main/scala/org/apache/comet/rules/CometScanRule.scala | 4 ++-- .../test/scala/org/apache/comet/CometExpressionSuite.scala | 4 ++-- spark/src/test/scala/org/apache/comet/CometFuzzTestBase.scala | 2 +- .../org/apache/comet/parquet/CometParquetWriterSuite.scala | 2 +- .../scala/org/apache/comet/parquet/ParquetReadSuite.scala | 2 +- .../scala/org/apache/comet/rules/CometScanRuleSuite.scala | 2 +- spark/src/test/scala/org/apache/spark/sql/CometTestBase.scala | 4 ++-- 8 files changed, 11 insertions(+), 11 deletions(-) diff --git a/common/src/main/scala/org/apache/comet/CometConf.scala b/common/src/main/scala/org/apache/comet/CometConf.scala index a57458b5d3..7b0c865861 100644 --- a/common/src/main/scala/org/apache/comet/CometConf.scala +++ b/common/src/main/scala/org/apache/comet/CometConf.scala @@ -715,7 +715,7 @@ object CometConf extends ShimCometConf { .booleanConf .createWithDefault(false) - val COMET_PARQUET_UNSIGNED_SMALL_INT__CHECK: ConfigEntry[Boolean] = + val COMET_PARQUET_UNSIGNED_SMALL_INT_CHECK: ConfigEntry[Boolean] = conf("spark.comet.scan.unsignedSmallIntSafetyCheck") .category(CATEGORY_SCAN) .doc( diff --git a/spark/src/main/scala/org/apache/comet/rules/CometScanRule.scala b/spark/src/main/scala/org/apache/comet/rules/CometScanRule.scala index 0fa8ed6f4c..16a3575abd 100644 --- a/spark/src/main/scala/org/apache/comet/rules/CometScanRule.scala +++ b/spark/src/main/scala/org/apache/comet/rules/CometScanRule.scala @@ -682,9 +682,9 @@ case class CometScanTypeChecker(scanImpl: String) extends DataTypeSupport with C dt match { case ShortType if scanImpl != CometConf.SCAN_NATIVE_COMET && - CometConf.COMET_PARQUET_UNSIGNED_SMALL_INT__CHECK.get() => + CometConf.COMET_PARQUET_UNSIGNED_SMALL_INT_CHECK.get() => fallbackReasons += s"$scanImpl scan may not handle unsigned UINT_8 correctly for $dt. " + - s"Set ${CometConf.COMET_PARQUET_UNSIGNED_SMALL_INT__CHECK.key}=false to allow " + + s"Set ${CometConf.COMET_PARQUET_UNSIGNED_SMALL_INT_CHECK.key}=false to allow " + "native execution if your data does not contain unsigned small integers. " + CometConf.COMPAT_GUIDE false diff --git a/spark/src/test/scala/org/apache/comet/CometExpressionSuite.scala b/spark/src/test/scala/org/apache/comet/CometExpressionSuite.scala index 2dbd08c491..b0784e713c 100644 --- a/spark/src/test/scala/org/apache/comet/CometExpressionSuite.scala +++ b/spark/src/test/scala/org/apache/comet/CometExpressionSuite.scala @@ -191,7 +191,7 @@ class CometExpressionSuite extends CometTestBase with AdaptiveSparkPlanHelper { withTempDir { dir => val path = new Path(dir.toURI.toString, "test.parquet") makeParquetFileAllPrimitiveTypes(path, dictionaryEnabled = dictionaryEnabled, 10000) - withSQLConf(CometConf.COMET_PARQUET_UNSIGNED_SMALL_INT__CHECK.key -> "true") { + withSQLConf(CometConf.COMET_PARQUET_UNSIGNED_SMALL_INT_CHECK.key -> "true") { withParquetTable(path.toString, "tbl") { checkSparkAnswerAndOperator("select * FROM tbl WHERE _2 > 100") } @@ -207,7 +207,7 @@ class CometExpressionSuite extends CometTestBase with AdaptiveSparkPlanHelper { Seq(true).foreach { safetyCheck => { withSQLConf( - CometConf.COMET_PARQUET_UNSIGNED_SMALL_INT__CHECK.key -> safetyCheck.toString) { + CometConf.COMET_PARQUET_UNSIGNED_SMALL_INT_CHECK.key -> safetyCheck.toString) { withTempDir { dir => val path = new Path(dir.toURI.toString, "testuint.parquet") makeParquetFileAllPrimitiveTypes( diff --git a/spark/src/test/scala/org/apache/comet/CometFuzzTestBase.scala b/spark/src/test/scala/org/apache/comet/CometFuzzTestBase.scala index 12802a64bb..5c5251b5e4 100644 --- a/spark/src/test/scala/org/apache/comet/CometFuzzTestBase.scala +++ b/spark/src/test/scala/org/apache/comet/CometFuzzTestBase.scala @@ -120,7 +120,7 @@ class CometFuzzTestBase extends CometTestBase with AdaptiveSparkPlanHelper { super.test(testName + s" ($scanImpl, $shuffleMode shuffle)", testTags: _*) { withSQLConf( CometConf.COMET_NATIVE_SCAN_IMPL.key -> scanImpl, - CometConf.COMET_PARQUET_UNSIGNED_SMALL_INT__CHECK.key -> "false", + CometConf.COMET_PARQUET_UNSIGNED_SMALL_INT_CHECK.key -> "false", CometConf.COMET_SHUFFLE_MODE.key -> shuffleMode) { testFun } diff --git a/spark/src/test/scala/org/apache/comet/parquet/CometParquetWriterSuite.scala b/spark/src/test/scala/org/apache/comet/parquet/CometParquetWriterSuite.scala index bdcd093e15..1d63b9d3ae 100644 --- a/spark/src/test/scala/org/apache/comet/parquet/CometParquetWriterSuite.scala +++ b/spark/src/test/scala/org/apache/comet/parquet/CometParquetWriterSuite.scala @@ -460,7 +460,7 @@ class CometParquetWriterSuite extends CometTestBase { // explicitly set scan impl to override CI defaults CometConf.COMET_NATIVE_SCAN_IMPL.key -> "auto", // Disable unsigned small int safety check for ShortType columns - CometConf.COMET_PARQUET_UNSIGNED_SMALL_INT__CHECK.key -> "false", + CometConf.COMET_PARQUET_UNSIGNED_SMALL_INT_CHECK.key -> "false", // use a different timezone to make sure that timezone handling works with nested types SQLConf.SESSION_LOCAL_TIMEZONE.key -> "America/Halifax") { diff --git a/spark/src/test/scala/org/apache/comet/parquet/ParquetReadSuite.scala b/spark/src/test/scala/org/apache/comet/parquet/ParquetReadSuite.scala index 348f3ba9aa..a05bb7c390 100644 --- a/spark/src/test/scala/org/apache/comet/parquet/ParquetReadSuite.scala +++ b/spark/src/test/scala/org/apache/comet/parquet/ParquetReadSuite.scala @@ -1904,7 +1904,7 @@ class ParquetReadV1Suite extends ParquetReadSuite with AdaptiveSparkPlanHelper { val rows = 1000 withSQLConf( CometConf.COMET_NATIVE_SCAN_IMPL.key -> CometConf.SCAN_NATIVE_ICEBERG_COMPAT, - CometConf.COMET_PARQUET_UNSIGNED_SMALL_INT__CHECK.key -> "true") { + CometConf.COMET_PARQUET_UNSIGNED_SMALL_INT_CHECK.key -> "true") { makeParquetFileAllPrimitiveTypes( path, dictionaryEnabled = false, diff --git a/spark/src/test/scala/org/apache/comet/rules/CometScanRuleSuite.scala b/spark/src/test/scala/org/apache/comet/rules/CometScanRuleSuite.scala index 8737696439..d0dfbbb09d 100644 --- a/spark/src/test/scala/org/apache/comet/rules/CometScanRuleSuite.scala +++ b/spark/src/test/scala/org/apache/comet/rules/CometScanRuleSuite.scala @@ -167,7 +167,7 @@ class CometScanRuleSuite extends CometTestBase { withSQLConf( CometConf.COMET_NATIVE_SCAN_IMPL.key -> CometConf.SCAN_NATIVE_ICEBERG_COMPAT, - CometConf.COMET_PARQUET_UNSIGNED_SMALL_INT__CHECK.key -> "true") { + CometConf.COMET_PARQUET_UNSIGNED_SMALL_INT_CHECK.key -> "true") { val transformedPlan = applyCometScanRule(sparkPlan) // Should fallback to Spark due to ShortType (may be from unsigned UINT_8) diff --git a/spark/src/test/scala/org/apache/spark/sql/CometTestBase.scala b/spark/src/test/scala/org/apache/spark/sql/CometTestBase.scala index 8ce3a296a7..89249240cf 100644 --- a/spark/src/test/scala/org/apache/spark/sql/CometTestBase.scala +++ b/spark/src/test/scala/org/apache/spark/sql/CometTestBase.scala @@ -83,7 +83,7 @@ abstract class CometTestBase conf.set(CometConf.COMET_RESPECT_PARQUET_FILTER_PUSHDOWN.key, "true") conf.set(CometConf.COMET_SPARK_TO_ARROW_ENABLED.key, "true") conf.set(CometConf.COMET_NATIVE_SCAN_ENABLED.key, "true") - conf.set(CometConf.COMET_PARQUET_UNSIGNED_SMALL_INT__CHECK.key, "false") + conf.set(CometConf.COMET_PARQUET_UNSIGNED_SMALL_INT_CHECK.key, "false") conf.set(CometConf.COMET_ONHEAP_MEMORY_OVERHEAD.key, "2g") conf.set(CometConf.COMET_EXEC_SORT_MERGE_JOIN_WITH_JOIN_FILTER_ENABLED.key, "true") // SortOrder is incompatible for mixed zero and negative zero floating point values, but @@ -1275,6 +1275,6 @@ abstract class CometTestBase def usingDataSourceExecWithIncompatTypes(conf: SQLConf): Boolean = { usingDataSourceExec(conf) && - CometConf.COMET_PARQUET_UNSIGNED_SMALL_INT__CHECK.get(conf) + CometConf.COMET_PARQUET_UNSIGNED_SMALL_INT_CHECK.get(conf) } } From 537d62e8274a95248b7b946e1a80a087b07ba8f8 Mon Sep 17 00:00:00 2001 From: Andy Grove Date: Thu, 22 Jan 2026 08:54:31 -0700 Subject: [PATCH 5/5] Fix clippy warnings for Rust 1.93 - Use local `root_op` variable instead of unwrapping `exec_context.root_op` - Replace `is_some()` + `unwrap()` pattern with `if let Some(...)` Co-Authored-By: Claude Opus 4.5 --- native/core/src/execution/jni_api.rs | 10 ++-------- native/core/src/parquet/schema_adapter.rs | 5 ++--- 2 files changed, 4 insertions(+), 11 deletions(-) diff --git a/native/core/src/execution/jni_api.rs b/native/core/src/execution/jni_api.rs index 56569bc69c..680cf80c75 100644 --- a/native/core/src/execution/jni_api.rs +++ b/native/core/src/execution/jni_api.rs @@ -503,12 +503,7 @@ pub unsafe extern "system" fn Java_org_apache_comet_Native_executePlan( let task_ctx = exec_context.session_ctx.task_ctx(); // Each Comet native execution corresponds to a single Spark partition, // so we should always execute partition 0. - let stream = exec_context - .root_op - .as_ref() - .unwrap() - .native_plan - .execute(0, task_ctx)?; + let stream = root_op.native_plan.execute(0, task_ctx)?; exec_context.stream = Some(stream); } else { // Pull input batches @@ -619,8 +614,7 @@ pub extern "system" fn Java_org_apache_comet_Native_releasePlan( /// Updates the metrics of the query plan. fn update_metrics(env: &mut JNIEnv, exec_context: &mut ExecutionContext) -> CometResult<()> { - if exec_context.root_op.is_some() { - let native_query = exec_context.root_op.as_ref().unwrap(); + if let Some(native_query) = &exec_context.root_op { let metrics = exec_context.metrics.as_obj(); update_comet_metric(env, metrics, native_query) } else { diff --git a/native/core/src/parquet/schema_adapter.rs b/native/core/src/parquet/schema_adapter.rs index b321d902a9..1e0d30c835 100644 --- a/native/core/src/parquet/schema_adapter.rs +++ b/native/core/src/parquet/schema_adapter.rs @@ -209,10 +209,9 @@ impl SchemaMapper for SchemaMapping { // If this field only exists in the table, and not in the file, then we need to // populate a default value for it. || { - if self.default_values.is_some() { + if let Some(default_values) = &self.default_values { // We have a map of default values, see if this field is in there. - if let Some(value) = - self.default_values.as_ref().unwrap().get(&field_idx) + if let Some(value) = default_values.get(&field_idx) // Default value exists, construct a column from it. { let cv = if field.data_type() == &value.data_type() {