Skip to content
Draft
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
15 changes: 10 additions & 5 deletions common/src/main/scala/org/apache/comet/CometConf.scala
Original file line number Diff line number Diff line change
Expand Up @@ -725,13 +725,18 @@ object CometConf extends ShimCometConf {
.booleanConf
.createWithDefault(false)

val COMET_SCAN_ALLOW_INCOMPATIBLE: ConfigEntry[Boolean] =
conf("spark.comet.scan.allowIncompatible")
val COMET_PARQUET_UNSIGNED_SMALL_INT_CHECK: ConfigEntry[Boolean] =
conf("spark.comet.scan.unsignedSmallIntSafetyCheck")
.category(CATEGORY_SCAN)
.doc("Some Comet scan implementations are not currently fully compatible with Spark for " +
s"all datatypes. Set this config to true to allow them anyway. $COMPAT_GUIDE.")
.doc(
"Parquet files may contain unsigned 8-bit integers (UINT_8) which Spark maps to " +
"ShortType. When this config is true (default), Comet falls back to Spark for " +
"ShortType columns because we cannot distinguish signed INT16 (safe) from unsigned " +
"UINT_8 (may produce different results). Set to false to allow native execution of " +
"ShortType columns if you know your data does not contain unsigned UINT_8 columns " +
s"from improperly encoded Parquet files. $COMPAT_GUIDE.")
.booleanConf
.createWithDefault(false)
.createWithDefault(true)

val COMET_EXEC_STRICT_FLOATING_POINT: ConfigEntry[Boolean] =
conf("spark.comet.exec.strictFloatingPoint")
Expand Down
13 changes: 7 additions & 6 deletions docs/source/contributor-guide/parquet_scans.md
Original file line number Diff line number Diff line change
Expand Up @@ -42,12 +42,13 @@ implementation:

The `native_datafusion` and `native_iceberg_compat` scans share the following limitations:

- When reading Parquet files written by systems other than Spark that contain columns with the logical types `UINT_8`
or `UINT_16`, Comet will produce different results than Spark because Spark does not preserve or understand these
logical types. Arrow-based readers, such as DataFusion and Comet do respect these types and read the data as unsigned
rather than signed. By default, Comet will fall back to `native_comet` when scanning Parquet files containing `byte` or `short`
types (regardless of the logical type). This behavior can be disabled by setting
`spark.comet.scan.allowIncompatible=true`.
- When reading Parquet files written by systems other than Spark that contain columns with the logical type `UINT_8`
(unsigned 8-bit integers), Comet may produce different results than Spark. Spark maps `UINT_8` to `ShortType`, but
Comet's Arrow-based readers respect the unsigned type and read the data as unsigned rather than signed. Since Comet
cannot distinguish `ShortType` columns that came from `UINT_8` versus signed `INT16`, by default Comet falls back to
`native_comet` when scanning Parquet files containing `ShortType` columns. This behavior can be disabled by setting
`spark.comet.scan.unsignedSmallIntSafetyCheck=false`. Note that `ByteType` columns are always safe because they can
only come from signed `INT8`, where truncation preserves the signed value.
- No support for default values that are nested types (e.g., maps, arrays, structs). Literal default values are supported.

The `native_datafusion` scan has some additional limitations:
Expand Down
10 changes: 6 additions & 4 deletions spark/src/main/scala/org/apache/comet/rules/CometScanRule.scala
Original file line number Diff line number Diff line change
Expand Up @@ -721,11 +721,13 @@ case class CometScanTypeChecker(scanImpl: String) extends DataTypeSupport with C
name: String,
fallbackReasons: ListBuffer[String]): Boolean = {
dt match {
case ByteType | ShortType
case ShortType
if scanImpl != CometConf.SCAN_NATIVE_COMET &&
!CometConf.COMET_SCAN_ALLOW_INCOMPATIBLE.get() =>
fallbackReasons += s"$scanImpl scan cannot read $dt when " +
s"${CometConf.COMET_SCAN_ALLOW_INCOMPATIBLE.key} is false. ${CometConf.COMPAT_GUIDE}."
CometConf.COMET_PARQUET_UNSIGNED_SMALL_INT_CHECK.get() =>
fallbackReasons += s"$scanImpl scan may not handle unsigned UINT_8 correctly for $dt. " +
s"Set ${CometConf.COMET_PARQUET_UNSIGNED_SMALL_INT_CHECK.key}=false to allow " +
"native execution if your data does not contain unsigned small integers. " +
CometConf.COMPAT_GUIDE
false
case _: StructType | _: ArrayType | _: MapType if scanImpl == CometConf.SCAN_NATIVE_COMET =>
false
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -120,7 +120,7 @@ class CometFuzzTestBase extends CometTestBase with AdaptiveSparkPlanHelper {
super.test(testName + s" ($scanImpl, $shuffleMode shuffle)", testTags: _*) {
withSQLConf(
CometConf.COMET_NATIVE_SCAN_IMPL.key -> scanImpl,
CometConf.COMET_SCAN_ALLOW_INCOMPATIBLE.key -> "true",
CometConf.COMET_PARQUET_UNSIGNED_SMALL_INT_CHECK.key -> "false",
CometConf.COMET_SHUFFLE_MODE.key -> shuffleMode) {
testFun
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -459,8 +459,8 @@ class CometParquetWriterSuite extends CometTestBase {
CometConf.COMET_NATIVE_PARQUET_WRITE_ENABLED.key -> "true",
// explicitly set scan impl to override CI defaults
CometConf.COMET_NATIVE_SCAN_IMPL.key -> "auto",
// COMET_SCAN_ALLOW_INCOMPATIBLE is needed because input data contains byte/short types
CometConf.COMET_SCAN_ALLOW_INCOMPATIBLE.key -> "true",
// Disable unsigned small int safety check for ShortType columns
CometConf.COMET_PARQUET_UNSIGNED_SMALL_INT_CHECK.key -> "false",
// use a different timezone to make sure that timezone handling works with nested types
SQLConf.SESSION_LOCAL_TIMEZONE.key -> "America/Halifax") {

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -1904,7 +1904,7 @@ class ParquetReadV1Suite extends ParquetReadSuite with AdaptiveSparkPlanHelper {
val rows = 1000
withSQLConf(
CometConf.COMET_NATIVE_SCAN_IMPL.key -> CometConf.SCAN_NATIVE_ICEBERG_COMPAT,
CometConf.COMET_SCAN_ALLOW_INCOMPATIBLE.key -> "false") {
CometConf.COMET_PARQUET_UNSIGNED_SMALL_INT_CHECK.key -> "true") {
makeParquetFileAllPrimitiveTypes(
path,
dictionaryEnabled = false,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -140,21 +140,21 @@ class CometScanRuleSuite extends CometTestBase {
}
}

test("CometScanRule should fallback to Spark for unsupported data types in v1 scan") {
test("CometScanRule should fallback to Spark for ShortType when safety check enabled") {
withTempPath { path =>
// Create test data with unsupported types (e.g., BinaryType, CalendarIntervalType)
// Create test data with ShortType which may be from unsigned UINT_8
import org.apache.spark.sql.types._
val unsupportedSchema = new StructType(
Array(
StructField("id", DataTypes.IntegerType, nullable = true),
StructField(
"value",
DataTypes.ByteType,
DataTypes.ShortType,
nullable = true
), // Unsupported in some scan modes
), // May be from unsigned UINT_8
StructField("name", DataTypes.StringType, nullable = true)))

val testData = Seq(Row(1, 1.toByte, "test1"), Row(2, -1.toByte, "test2"))
val testData = Seq(Row(1, 1.toShort, "test1"), Row(2, -1.toShort, "test2"))

val df = spark.createDataFrame(spark.sparkContext.parallelize(testData), unsupportedSchema)
df.write.parquet(path.toString)
Expand All @@ -167,10 +167,10 @@ class CometScanRuleSuite extends CometTestBase {

withSQLConf(
CometConf.COMET_NATIVE_SCAN_IMPL.key -> CometConf.SCAN_NATIVE_ICEBERG_COMPAT,
CometConf.COMET_SCAN_ALLOW_INCOMPATIBLE.key -> "false") {
CometConf.COMET_PARQUET_UNSIGNED_SMALL_INT_CHECK.key -> "true") {
val transformedPlan = applyCometScanRule(sparkPlan)

// Should fallback to Spark due to unsupported ByteType in schema
// Should fallback to Spark due to ShortType (may be from unsigned UINT_8)
assert(countOperators(transformedPlan, classOf[FileSourceScanExec]) == 1)
assert(countOperators(transformedPlan, classOf[CometScanExec]) == 0)
}
Expand Down
6 changes: 3 additions & 3 deletions spark/src/test/scala/org/apache/spark/sql/CometTestBase.scala
Original file line number Diff line number Diff line change
Expand Up @@ -83,7 +83,7 @@ abstract class CometTestBase
conf.set(CometConf.COMET_RESPECT_PARQUET_FILTER_PUSHDOWN.key, "true")
conf.set(CometConf.COMET_SPARK_TO_ARROW_ENABLED.key, "true")
conf.set(CometConf.COMET_NATIVE_SCAN_ENABLED.key, "true")
conf.set(CometConf.COMET_SCAN_ALLOW_INCOMPATIBLE.key, "true")
conf.set(CometConf.COMET_PARQUET_UNSIGNED_SMALL_INT_CHECK.key, "false")
conf.set(CometConf.COMET_ONHEAP_MEMORY_OVERHEAD.key, "2g")
conf.set(CometConf.COMET_EXEC_SORT_MERGE_JOIN_WITH_JOIN_FILTER_ENABLED.key, "true")
// SortOrder is incompatible for mixed zero and negative zero floating point values, but
Expand Down Expand Up @@ -1113,7 +1113,7 @@ abstract class CometTestBase
* |""".stripMargin,
* "select arr from tbl",
* sqlConf = Seq(
* CometConf.COMET_SCAN_ALLOW_INCOMPATIBLE.key -> "false",
* CometConf.COMET_SCAN_UNSIGNED_SMALL_INT_SAFETY_CHECK.key -> "true",
* "spark.comet.explainFallback.enabled" -> "false"
* ),
* debugCometDF = df => {
Expand Down Expand Up @@ -1275,6 +1275,6 @@ abstract class CometTestBase

def usingDataSourceExecWithIncompatTypes(conf: SQLConf): Boolean = {
usingDataSourceExec(conf) &&
!CometConf.COMET_SCAN_ALLOW_INCOMPATIBLE.get(conf)
CometConf.COMET_PARQUET_UNSIGNED_SMALL_INT_CHECK.get(conf)
}
}
Loading