Skip to content
Closed
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
67 changes: 40 additions & 27 deletions spark/src/test/scala/org/apache/comet/CometFuzzTestSuite.scala
Original file line number Diff line number Diff line change
Expand Up @@ -271,19 +271,25 @@ class CometFuzzTestSuite extends CometFuzzTestBase {
}

test("Parquet temporal types written as INT96") {
testParquetTemporalTypes(ParquetOutputTimestampType.INT96)
// INT96 is the only format where int96AsTimestamp and int96TimestampConversion matter
testParquetTemporalTypes(ParquetOutputTimestampType.INT96, testInt96Settings = true)
}

test("Parquet temporal types written as TIMESTAMP_MICROS") {
testParquetTemporalTypes(ParquetOutputTimestampType.TIMESTAMP_MICROS)
testParquetTemporalTypes(
ParquetOutputTimestampType.TIMESTAMP_MICROS,
testInt96Settings = false)
}

test("Parquet temporal types written as TIMESTAMP_MILLIS") {
testParquetTemporalTypes(ParquetOutputTimestampType.TIMESTAMP_MILLIS)
testParquetTemporalTypes(
ParquetOutputTimestampType.TIMESTAMP_MILLIS,
testInt96Settings = false)
}

private def testParquetTemporalTypes(
outputTimestampType: ParquetOutputTimestampType.Value): Unit = {
outputTimestampType: ParquetOutputTimestampType.Value,
testInt96Settings: Boolean): Unit = {

val dataGenOptions = DataGenOptions(generateNegativeZero = false)

Expand All @@ -294,15 +300,17 @@ class CometFuzzTestSuite extends CometFuzzTestBase {
SQLConf.PARQUET_OUTPUT_TIMESTAMP_TYPE.key -> outputTimestampType.toString,
SQLConf.SESSION_LOCAL_TIMEZONE.key -> defaultTimezone) {

// Include both DateType and TimestampType to properly test timestamp configurations
// TODO test with MapType
// https://github.com/apache/datafusion-comet/issues/2945
val schema = StructType(
Seq(
StructField("c0", DataTypes.DateType),
StructField("c1", DataTypes.createArrayType(DataTypes.DateType)),
StructField("c1", DataTypes.TimestampType),
StructField("c2", DataTypes.createArrayType(DataTypes.DateType)),
StructField(
"c2",
DataTypes.createStructType(Array(StructField("c3", DataTypes.DateType))))))
"c3",
DataTypes.createStructType(Array(StructField("c4", DataTypes.DateType))))))

ParquetGenerator.makeParquetFile(
random,
Expand All @@ -313,28 +321,33 @@ class CometFuzzTestSuite extends CometFuzzTestBase {
dataGenOptions)
}

Seq(defaultTimezone, "UTC", "America/Denver").foreach { tz =>
Seq(true, false).foreach { inferTimestampNtzEnabled =>
Seq(true, false).foreach { int96TimestampConversion =>
Seq(true, false).foreach { int96AsTimestamp =>
withSQLConf(
CometConf.COMET_ENABLED.key -> "true",
SQLConf.SESSION_LOCAL_TIMEZONE.key -> tz,
SQLConf.PARQUET_INT96_AS_TIMESTAMP.key -> int96AsTimestamp.toString,
SQLConf.PARQUET_INT96_TIMESTAMP_CONVERSION.key -> int96TimestampConversion.toString,
SQLConf.PARQUET_INFER_TIMESTAMP_NTZ_ENABLED.key -> inferTimestampNtzEnabled.toString) {
// Test with local timezone and UTC (2 timezones instead of 3)
Seq(defaultTimezone, "UTC").foreach { tz =>
// INT96-specific settings only matter for INT96 format
val int96Combinations = if (testInt96Settings) {
// Test key combinations for INT96: both settings true, both false, and mixed
Seq((true, true), (false, false), (true, false))
} else {
// For non-INT96 formats, these settings don't matter, so just use defaults
Seq((true, false))
}

int96Combinations.foreach { case (int96AsTimestamp, int96TimestampConversion) =>
withSQLConf(
CometConf.COMET_ENABLED.key -> "true",
SQLConf.SESSION_LOCAL_TIMEZONE.key -> tz,
SQLConf.PARQUET_INT96_AS_TIMESTAMP.key -> int96AsTimestamp.toString,
SQLConf.PARQUET_INT96_TIMESTAMP_CONVERSION.key -> int96TimestampConversion.toString) {

val df = spark.read.parquet(filename.toString)
df.createOrReplaceTempView("t1")
val columns =
df.schema.fields
.filter(f => DataTypeSupport.hasTemporalType(f.dataType))
.map(_.name)
val df = spark.read.parquet(filename.toString)
df.createOrReplaceTempView("t1")
val columns =
df.schema.fields
.filter(f => DataTypeSupport.hasTemporalType(f.dataType))
.map(_.name)

for (col <- columns) {
checkSparkAnswer(s"SELECT $col FROM t1 ORDER BY $col")
}
}
for (col <- columns) {
checkSparkAnswer(s"SELECT $col FROM t1 ORDER BY $col")
}
}
}
Expand Down
Loading