diff --git a/native/Cargo.lock b/native/Cargo.lock index ce0eb0f2b3..2e53b3c274 100644 --- a/native/Cargo.lock +++ b/native/Cargo.lock @@ -418,6 +418,23 @@ dependencies = [ "pin-project-lite", ] +[[package]] +name = "async-compression" +version = "0.4.19" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "06575e6a9673580f52661c92107baabffbf41e2141373441cbcdc47cb733003c" +dependencies = [ + "bzip2 0.5.2", + "flate2", + "futures-core", + "memchr", + "pin-project-lite", + "tokio", + "xz2", + "zstd", + "zstd-safe", +] + [[package]] name = "async-executor" version = "1.13.3" @@ -1189,6 +1206,34 @@ dependencies = [ "either", ] +[[package]] +name = "bzip2" +version = "0.5.2" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "49ecfb22d906f800d4fe833b6282cf4dc1c298f5057ca0b5445e5c209735ca47" +dependencies = [ + "bzip2-sys", +] + +[[package]] +name = "bzip2" +version = "0.6.1" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "f3a53fac24f34a81bc9954b5d6cfce0c21e18ec6959f44f56e8e90e4bb7c346c" +dependencies = [ + "libbz2-rs-sys", +] + +[[package]] +name = "bzip2-sys" +version = "0.1.13+1.0.8" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "225bff33b2141874fe80d71e07d6eec4f85c5c216453dd96388240f96e1acc14" +dependencies = [ + "cc", + "pkg-config", +] + [[package]] name = "cast" version = "0.3.0" @@ -1784,6 +1829,7 @@ dependencies = [ "datafusion-comet-objectstore-hdfs", "datafusion-comet-proto", "datafusion-comet-spark-expr", + "datafusion-datasource", "datafusion-functions-nested", "datafusion-spark", "futures", @@ -1925,8 +1971,10 @@ source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "fde13794244bc7581cd82f6fff217068ed79cdc344cafe4ab2c3a1c3510b38d6" dependencies = [ "arrow", + "async-compression", "async-trait", "bytes", + "bzip2 0.6.1", "chrono", "datafusion-common", "datafusion-common-runtime", @@ -1937,6 +1985,7 @@ dependencies = [ "datafusion-physical-expr-common", "datafusion-physical-plan", "datafusion-session", + "flate2", "futures", "glob", "itertools 0.14.0", @@ -1944,7 +1993,10 @@ dependencies = [ "object_store", "rand 0.9.2", "tokio", + "tokio-util", "url", + "xz2", + "zstd", ] [[package]] @@ -3625,6 +3677,12 @@ dependencies = [ "lexical-util", ] +[[package]] +name = "libbz2-rs-sys" +version = "0.2.2" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "2c4a545a15244c7d945065b5d392b2d2d7f21526fba56ce51467b06ed445e8f7" + [[package]] name = "libc" version = "0.2.180" @@ -3754,6 +3812,17 @@ dependencies = [ "twox-hash", ] +[[package]] +name = "lzma-sys" +version = "0.1.20" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "5fda04ab3764e6cde78b9974eec4f779acaba7c4e84b36eca3cf77c581b85d27" +dependencies = [ + "cc", + "libc", + "pkg-config", +] + [[package]] name = "md-5" version = "0.10.6" @@ -6573,6 +6642,15 @@ version = "0.13.6" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "66fee0b777b0f5ac1c69bb06d361268faafa61cd4682ae064a171c16c433e9e4" +[[package]] +name = "xz2" +version = "0.1.7" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "388c44dc09d76f1536602ead6d325eb532f5c122f17782bd57fb47baeeb767e2" +dependencies = [ + "lzma-sys", +] + [[package]] name = "yoke" version = "0.8.1" diff --git a/native/core/Cargo.toml b/native/core/Cargo.toml index 5e30883e35..b13d6d54fd 100644 --- a/native/core/Cargo.toml +++ b/native/core/Cargo.toml @@ -60,6 +60,7 @@ tempfile = "3.24.0" itertools = "0.14.0" paste = "1.0.14" datafusion = { workspace = true, features = ["parquet_encryption", "sql"] } +datafusion-datasource = { workspace = true } datafusion-spark = { workspace = true } once_cell = "1.18.0" regex = { workspace = true } diff --git a/spark/src/main/scala/org/apache/comet/rules/CometScanRule.scala b/spark/src/main/scala/org/apache/comet/rules/CometScanRule.scala index 4310605f22..bfcf250741 100644 --- a/spark/src/main/scala/org/apache/comet/rules/CometScanRule.scala +++ b/spark/src/main/scala/org/apache/comet/rules/CometScanRule.scala @@ -167,7 +167,6 @@ case class CometScanRule(session: SparkSession) extends Rule[SparkPlan] with Com case SCAN_AUTO => // TODO add support for native_datafusion in the future nativeIcebergCompatScan(session, scanExec, r, hadoopConf) - .orElse(nativeCometScan(session, scanExec, r, hadoopConf)) .getOrElse(scanExec) case SCAN_NATIVE_DATAFUSION => nativeDataFusionScan(session, scanExec, r, hadoopConf).getOrElse(scanExec) diff --git a/spark/src/test/scala/org/apache/comet/CometExpressionSuite.scala b/spark/src/test/scala/org/apache/comet/CometExpressionSuite.scala index 250d2f91c4..e0a5c43aef 100644 --- a/spark/src/test/scala/org/apache/comet/CometExpressionSuite.scala +++ b/spark/src/test/scala/org/apache/comet/CometExpressionSuite.scala @@ -187,53 +187,69 @@ class CometExpressionSuite extends CometTestBase with AdaptiveSparkPlanHelper { } test("basic data type support") { + // this test requires native_comet scan due to unsigned u8/u16 issue + withSQLConf(CometConf.COMET_NATIVE_SCAN_IMPL.key -> CometConf.SCAN_NATIVE_COMET) { + Seq(true, false).foreach { dictionaryEnabled => + withTempDir { dir => + val path = new Path(dir.toURI.toString, "test.parquet") + makeParquetFileAllPrimitiveTypes(path, dictionaryEnabled = dictionaryEnabled, 10000) + withParquetTable(path.toString, "tbl") { + checkSparkAnswerAndOperator("select * FROM tbl WHERE _2 > 100") + } + } + } + } + } + + test("basic data type support - excluding u8/u16") { + // variant that skips _9 (UINT_8) and _10 (UINT_16) for default scan impl Seq(true, false).foreach { dictionaryEnabled => withTempDir { dir => val path = new Path(dir.toURI.toString, "test.parquet") makeParquetFileAllPrimitiveTypes(path, dictionaryEnabled = dictionaryEnabled, 10000) - withSQLConf(CometConf.COMET_SCAN_ALLOW_INCOMPATIBLE.key -> "false") { + withParquetTable(path.toString, "tbl") { + // select all columns except _9 (UINT_8) and _10 (UINT_16) + checkSparkAnswerAndOperator( + """select _1, _2, _3, _4, _5, _6, _7, _8, _11, _12, _13, _14, _15, _16, _17, + |_18, _19, _20, _21, _id FROM tbl WHERE _2 > 100""".stripMargin) + } + } + } + } + + test("uint data type support") { + // this test requires native_comet scan due to unsigned u8/u16 issue + withSQLConf(CometConf.COMET_NATIVE_SCAN_IMPL.key -> CometConf.SCAN_NATIVE_COMET) { + Seq(true, false).foreach { dictionaryEnabled => + withTempDir { dir => + val path = new Path(dir.toURI.toString, "testuint.parquet") + makeParquetFileAllPrimitiveTypes( + path, + dictionaryEnabled = dictionaryEnabled, + Byte.MinValue, + Byte.MaxValue) withParquetTable(path.toString, "tbl") { - checkSparkAnswerAndOperator("select * FROM tbl WHERE _2 > 100") + val qry = "select _9 from tbl order by _11" + checkSparkAnswerAndOperator(qry) } } } } } - test("uint data type support") { + test("uint data type support - excluding u8/u16") { + // variant that tests UINT_32 and UINT_64, skipping _9 (UINT_8) and _10 (UINT_16) Seq(true, false).foreach { dictionaryEnabled => - // TODO: Once the question of what to get back from uint_8, uint_16 types is resolved, - // we can also update this test to check for COMET_SCAN_ALLOW_INCOMPATIBLE=true - Seq(false).foreach { allowIncompatible => - { - withSQLConf(CometConf.COMET_SCAN_ALLOW_INCOMPATIBLE.key -> allowIncompatible.toString) { - withTempDir { dir => - val path = new Path(dir.toURI.toString, "testuint.parquet") - makeParquetFileAllPrimitiveTypes( - path, - dictionaryEnabled = dictionaryEnabled, - Byte.MinValue, - Byte.MaxValue) - withParquetTable(path.toString, "tbl") { - val qry = "select _9 from tbl order by _11" - if (usingDataSourceExec(conf)) { - if (!allowIncompatible) { - checkSparkAnswerAndOperator(qry) - } else { - // need to convert the values to unsigned values - val expected = (Byte.MinValue to Byte.MaxValue) - .map(v => { - if (v < 0) Byte.MaxValue.toShort - v else v - }) - .toDF("a") - checkAnswer(sql(qry), expected) - } - } else { - checkSparkAnswerAndOperator(qry) - } - } - } - } + withTempDir { dir => + val path = new Path(dir.toURI.toString, "testuint.parquet") + makeParquetFileAllPrimitiveTypes( + path, + dictionaryEnabled = dictionaryEnabled, + Byte.MinValue, + Byte.MaxValue) + withParquetTable(path.toString, "tbl") { + // test UINT_32 (_11) and UINT_64 (_12) only + checkSparkAnswerAndOperator("select _11, _12 from tbl order by _11") } } }