diff --git a/common/src/main/scala/org/apache/comet/CometConf.scala b/common/src/main/scala/org/apache/comet/CometConf.scala index 656dbc9a58..c44806a9d5 100644 --- a/common/src/main/scala/org/apache/comet/CometConf.scala +++ b/common/src/main/scala/org/apache/comet/CometConf.scala @@ -725,13 +725,18 @@ object CometConf extends ShimCometConf { .booleanConf .createWithDefault(false) - val COMET_SCAN_ALLOW_INCOMPATIBLE: ConfigEntry[Boolean] = - conf("spark.comet.scan.allowIncompatible") + val COMET_PARQUET_UNSIGNED_SMALL_INT_CHECK: ConfigEntry[Boolean] = + conf("spark.comet.scan.unsignedSmallIntSafetyCheck") .category(CATEGORY_SCAN) - .doc("Some Comet scan implementations are not currently fully compatible with Spark for " + - s"all datatypes. Set this config to true to allow them anyway. $COMPAT_GUIDE.") + .doc( + "Parquet files may contain unsigned 8-bit integers (UINT_8) which Spark maps to " + + "ShortType. When this config is true (default), Comet falls back to Spark for " + + "ShortType columns because we cannot distinguish signed INT16 (safe) from unsigned " + + "UINT_8 (may produce different results). Set to false to allow native execution of " + + "ShortType columns if you know your data does not contain unsigned UINT_8 columns " + + s"from improperly encoded Parquet files. $COMPAT_GUIDE.") .booleanConf - .createWithDefault(false) + .createWithDefault(true) val COMET_EXEC_STRICT_FLOATING_POINT: ConfigEntry[Boolean] = conf("spark.comet.exec.strictFloatingPoint") diff --git a/docs/source/contributor-guide/parquet_scans.md b/docs/source/contributor-guide/parquet_scans.md index 48fda7dc39..3dcb78e87f 100644 --- a/docs/source/contributor-guide/parquet_scans.md +++ b/docs/source/contributor-guide/parquet_scans.md @@ -42,12 +42,13 @@ implementation: The `native_datafusion` and `native_iceberg_compat` scans share the following limitations: -- When reading Parquet files written by systems other than Spark that contain columns with the logical types `UINT_8` - or `UINT_16`, Comet will produce different results than Spark because Spark does not preserve or understand these - logical types. Arrow-based readers, such as DataFusion and Comet do respect these types and read the data as unsigned - rather than signed. By default, Comet will fall back to Spark's native scan when scanning Parquet files containing - `byte` or `short` types (regardless of the logical type). This behavior can be disabled by setting - `spark.comet.scan.allowIncompatible=true`. +- When reading Parquet files written by systems other than Spark that contain columns with the logical type `UINT_8` + (unsigned 8-bit integers), Comet may produce different results than Spark. Spark maps `UINT_8` to `ShortType`, but + Comet's Arrow-based readers respect the unsigned type and read the data as unsigned rather than signed. Since Comet + cannot distinguish `ShortType` columns that came from `UINT_8` versus signed `INT16`, by default Comet falls back to + Spark when scanning Parquet files containing `ShortType` columns. This behavior can be disabled by setting + `spark.comet.scan.unsignedSmallIntSafetyCheck=false`. Note that `ByteType` columns are always safe because they can + only come from signed `INT8`, where truncation preserves the signed value. - No support for default values that are nested types (e.g., maps, arrays, structs). Literal default values are supported. The `native_datafusion` scan has some additional limitations: diff --git a/spark/src/main/scala/org/apache/comet/rules/CometScanRule.scala b/spark/src/main/scala/org/apache/comet/rules/CometScanRule.scala index bfcf250741..4291e3fb65 100644 --- a/spark/src/main/scala/org/apache/comet/rules/CometScanRule.scala +++ b/spark/src/main/scala/org/apache/comet/rules/CometScanRule.scala @@ -721,11 +721,13 @@ case class CometScanTypeChecker(scanImpl: String) extends DataTypeSupport with C name: String, fallbackReasons: ListBuffer[String]): Boolean = { dt match { - case ByteType | ShortType + case ShortType if scanImpl != CometConf.SCAN_NATIVE_COMET && - !CometConf.COMET_SCAN_ALLOW_INCOMPATIBLE.get() => - fallbackReasons += s"$scanImpl scan cannot read $dt when " + - s"${CometConf.COMET_SCAN_ALLOW_INCOMPATIBLE.key} is false. ${CometConf.COMPAT_GUIDE}." + CometConf.COMET_PARQUET_UNSIGNED_SMALL_INT_CHECK.get() => + fallbackReasons += s"$scanImpl scan may not handle unsigned UINT_8 correctly for $dt. " + + s"Set ${CometConf.COMET_PARQUET_UNSIGNED_SMALL_INT_CHECK.key}=false to allow " + + "native execution if your data does not contain unsigned small integers. " + + CometConf.COMPAT_GUIDE false case _: StructType | _: ArrayType | _: MapType if scanImpl == CometConf.SCAN_NATIVE_COMET => false diff --git a/spark/src/test/scala/org/apache/comet/CometFuzzTestBase.scala b/spark/src/test/scala/org/apache/comet/CometFuzzTestBase.scala index 74858ed614..5c5251b5e4 100644 --- a/spark/src/test/scala/org/apache/comet/CometFuzzTestBase.scala +++ b/spark/src/test/scala/org/apache/comet/CometFuzzTestBase.scala @@ -120,7 +120,7 @@ class CometFuzzTestBase extends CometTestBase with AdaptiveSparkPlanHelper { super.test(testName + s" ($scanImpl, $shuffleMode shuffle)", testTags: _*) { withSQLConf( CometConf.COMET_NATIVE_SCAN_IMPL.key -> scanImpl, - CometConf.COMET_SCAN_ALLOW_INCOMPATIBLE.key -> "true", + CometConf.COMET_PARQUET_UNSIGNED_SMALL_INT_CHECK.key -> "false", CometConf.COMET_SHUFFLE_MODE.key -> shuffleMode) { testFun } diff --git a/spark/src/test/scala/org/apache/comet/parquet/CometParquetWriterSuite.scala b/spark/src/test/scala/org/apache/comet/parquet/CometParquetWriterSuite.scala index c4856c3cc2..1d63b9d3ae 100644 --- a/spark/src/test/scala/org/apache/comet/parquet/CometParquetWriterSuite.scala +++ b/spark/src/test/scala/org/apache/comet/parquet/CometParquetWriterSuite.scala @@ -459,8 +459,8 @@ class CometParquetWriterSuite extends CometTestBase { CometConf.COMET_NATIVE_PARQUET_WRITE_ENABLED.key -> "true", // explicitly set scan impl to override CI defaults CometConf.COMET_NATIVE_SCAN_IMPL.key -> "auto", - // COMET_SCAN_ALLOW_INCOMPATIBLE is needed because input data contains byte/short types - CometConf.COMET_SCAN_ALLOW_INCOMPATIBLE.key -> "true", + // Disable unsigned small int safety check for ShortType columns + CometConf.COMET_PARQUET_UNSIGNED_SMALL_INT_CHECK.key -> "false", // use a different timezone to make sure that timezone handling works with nested types SQLConf.SESSION_LOCAL_TIMEZONE.key -> "America/Halifax") { diff --git a/spark/src/test/scala/org/apache/comet/parquet/ParquetReadSuite.scala b/spark/src/test/scala/org/apache/comet/parquet/ParquetReadSuite.scala index b028a70dca..a05bb7c390 100644 --- a/spark/src/test/scala/org/apache/comet/parquet/ParquetReadSuite.scala +++ b/spark/src/test/scala/org/apache/comet/parquet/ParquetReadSuite.scala @@ -1904,7 +1904,7 @@ class ParquetReadV1Suite extends ParquetReadSuite with AdaptiveSparkPlanHelper { val rows = 1000 withSQLConf( CometConf.COMET_NATIVE_SCAN_IMPL.key -> CometConf.SCAN_NATIVE_ICEBERG_COMPAT, - CometConf.COMET_SCAN_ALLOW_INCOMPATIBLE.key -> "false") { + CometConf.COMET_PARQUET_UNSIGNED_SMALL_INT_CHECK.key -> "true") { makeParquetFileAllPrimitiveTypes( path, dictionaryEnabled = false, diff --git a/spark/src/test/scala/org/apache/comet/rules/CometScanRuleSuite.scala b/spark/src/test/scala/org/apache/comet/rules/CometScanRuleSuite.scala index 7f54d0b7ca..d0dfbbb09d 100644 --- a/spark/src/test/scala/org/apache/comet/rules/CometScanRuleSuite.scala +++ b/spark/src/test/scala/org/apache/comet/rules/CometScanRuleSuite.scala @@ -140,21 +140,21 @@ class CometScanRuleSuite extends CometTestBase { } } - test("CometScanRule should fallback to Spark for unsupported data types in v1 scan") { + test("CometScanRule should fallback to Spark for ShortType when safety check enabled") { withTempPath { path => - // Create test data with unsupported types (e.g., BinaryType, CalendarIntervalType) + // Create test data with ShortType which may be from unsigned UINT_8 import org.apache.spark.sql.types._ val unsupportedSchema = new StructType( Array( StructField("id", DataTypes.IntegerType, nullable = true), StructField( "value", - DataTypes.ByteType, + DataTypes.ShortType, nullable = true - ), // Unsupported in some scan modes + ), // May be from unsigned UINT_8 StructField("name", DataTypes.StringType, nullable = true))) - val testData = Seq(Row(1, 1.toByte, "test1"), Row(2, -1.toByte, "test2")) + val testData = Seq(Row(1, 1.toShort, "test1"), Row(2, -1.toShort, "test2")) val df = spark.createDataFrame(spark.sparkContext.parallelize(testData), unsupportedSchema) df.write.parquet(path.toString) @@ -167,10 +167,10 @@ class CometScanRuleSuite extends CometTestBase { withSQLConf( CometConf.COMET_NATIVE_SCAN_IMPL.key -> CometConf.SCAN_NATIVE_ICEBERG_COMPAT, - CometConf.COMET_SCAN_ALLOW_INCOMPATIBLE.key -> "false") { + CometConf.COMET_PARQUET_UNSIGNED_SMALL_INT_CHECK.key -> "true") { val transformedPlan = applyCometScanRule(sparkPlan) - // Should fallback to Spark due to unsupported ByteType in schema + // Should fallback to Spark due to ShortType (may be from unsigned UINT_8) assert(countOperators(transformedPlan, classOf[FileSourceScanExec]) == 1) assert(countOperators(transformedPlan, classOf[CometScanExec]) == 0) } diff --git a/spark/src/test/scala/org/apache/spark/sql/CometTestBase.scala b/spark/src/test/scala/org/apache/spark/sql/CometTestBase.scala index 81ac72247f..89249240cf 100644 --- a/spark/src/test/scala/org/apache/spark/sql/CometTestBase.scala +++ b/spark/src/test/scala/org/apache/spark/sql/CometTestBase.scala @@ -83,7 +83,7 @@ abstract class CometTestBase conf.set(CometConf.COMET_RESPECT_PARQUET_FILTER_PUSHDOWN.key, "true") conf.set(CometConf.COMET_SPARK_TO_ARROW_ENABLED.key, "true") conf.set(CometConf.COMET_NATIVE_SCAN_ENABLED.key, "true") - conf.set(CometConf.COMET_SCAN_ALLOW_INCOMPATIBLE.key, "true") + conf.set(CometConf.COMET_PARQUET_UNSIGNED_SMALL_INT_CHECK.key, "false") conf.set(CometConf.COMET_ONHEAP_MEMORY_OVERHEAD.key, "2g") conf.set(CometConf.COMET_EXEC_SORT_MERGE_JOIN_WITH_JOIN_FILTER_ENABLED.key, "true") // SortOrder is incompatible for mixed zero and negative zero floating point values, but @@ -1113,7 +1113,7 @@ abstract class CometTestBase * |""".stripMargin, * "select arr from tbl", * sqlConf = Seq( - * CometConf.COMET_SCAN_ALLOW_INCOMPATIBLE.key -> "false", + * CometConf.COMET_SCAN_UNSIGNED_SMALL_INT_SAFETY_CHECK.key -> "true", * "spark.comet.explainFallback.enabled" -> "false" * ), * debugCometDF = df => { @@ -1275,6 +1275,6 @@ abstract class CometTestBase def usingDataSourceExecWithIncompatTypes(conf: SQLConf): Boolean = { usingDataSourceExec(conf) && - !CometConf.COMET_SCAN_ALLOW_INCOMPATIBLE.get(conf) + CometConf.COMET_PARQUET_UNSIGNED_SMALL_INT_CHECK.get(conf) } }