From 9786e4338d7205bcad394c594b8d82bf1dcad551 Mon Sep 17 00:00:00 2001 From: shekharrajak Date: Sat, 17 Jan 2026 01:40:16 +0530 Subject: [PATCH 1/5] feat: implement map_sort native function --- native/spark-expr/src/comet_scalar_funcs.rs | 10 +- native/spark-expr/src/lib.rs | 2 + native/spark-expr/src/map_funcs/map_sort.rs | 185 ++++++++++++++++++++ native/spark-expr/src/map_funcs/mod.rs | 20 +++ 4 files changed, 214 insertions(+), 3 deletions(-) create mode 100644 native/spark-expr/src/map_funcs/map_sort.rs create mode 100644 native/spark-expr/src/map_funcs/mod.rs diff --git a/native/spark-expr/src/comet_scalar_funcs.rs b/native/spark-expr/src/comet_scalar_funcs.rs index 8384a4646a..a71a7b143c 100644 --- a/native/spark-expr/src/comet_scalar_funcs.rs +++ b/native/spark-expr/src/comet_scalar_funcs.rs @@ -21,9 +21,9 @@ use crate::math_funcs::checked_arithmetic::{checked_add, checked_div, checked_mu use crate::math_funcs::modulo_expr::spark_modulo; use crate::{ spark_array_repeat, spark_ceil, spark_decimal_div, spark_decimal_integral_div, spark_floor, - spark_isnan, spark_lpad, spark_make_decimal, spark_read_side_padding, spark_round, spark_rpad, - spark_unhex, spark_unscaled_value, EvalMode, SparkBitwiseCount, SparkDateTrunc, SparkSizeFunc, - SparkStringSpace, + spark_isnan, spark_lpad, spark_make_decimal, spark_map_sort, spark_read_side_padding, + spark_round, spark_rpad, spark_unhex, spark_unscaled_value, EvalMode, SparkBitwiseCount, + SparkDateTrunc, SparkSizeFunc, SparkStringSpace, }; use arrow::datatypes::DataType; use datafusion::common::{DataFusionError, Result as DataFusionResult}; @@ -181,6 +181,10 @@ pub fn create_comet_physical_fun_with_eval_mode( let func = Arc::new(abs); make_comet_scalar_udf!("abs", func, without data_type) } + "map_sort" => { + let func = Arc::new(spark_map_sort); + make_comet_scalar_udf!("map_sort", func, without data_type) + } _ => registry.udf(fun_name).map_err(|e| { DataFusionError::Execution(format!( "Function {fun_name} not found in the registry: {e}", diff --git a/native/spark-expr/src/lib.rs b/native/spark-expr/src/lib.rs index f26fd911d8..eefae2e346 100644 --- a/native/spark-expr/src/lib.rs +++ b/native/spark-expr/src/lib.rs @@ -44,6 +44,7 @@ mod bitwise_funcs; mod comet_scalar_funcs; pub mod hash_funcs; +mod map_funcs; mod string_funcs; mod datetime_funcs; @@ -63,6 +64,7 @@ pub use array_funcs::*; pub use bitwise_funcs::*; pub use conditional_funcs::*; pub use conversion_funcs::*; +pub use map_funcs::*; pub use nondetermenistic_funcs::*; pub use comet_scalar_funcs::{ diff --git a/native/spark-expr/src/map_funcs/map_sort.rs b/native/spark-expr/src/map_funcs/map_sort.rs new file mode 100644 index 0000000000..4841cf50a6 --- /dev/null +++ b/native/spark-expr/src/map_funcs/map_sort.rs @@ -0,0 +1,185 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +use arrow::array::{Array, ArrayRef, MapArray}; +use arrow::compute::{lexsort_to_indices, take, SortColumn, SortOptions}; +use arrow::datatypes::{DataType, Field}; +use datafusion::common::{exec_err, DataFusionError, ScalarValue}; +use datafusion::logical_expr::ColumnarValue; +use std::sync::Arc; + +pub fn spark_map_sort(args: &[ColumnarValue]) -> Result { + if args.len() != 1 { + return exec_err!("map_sort function takes exactly one argument"); + } + + match &args[0] { + ColumnarValue::Array(array) => { + let result = spark_map_sort_array(array)?; + Ok(ColumnarValue::Array(result)) + } + ColumnarValue::Scalar(scalar) => { + let result = spark_map_sort_scalar(scalar)?; + Ok(ColumnarValue::Scalar(result)) + } + } +} + +fn spark_map_sort_array(array: &ArrayRef) -> Result { + let map_array = array + .as_any() + .downcast_ref::() + .ok_or_else(|| DataFusionError::Internal("Expected MapArray".to_string()))?; + + let entries = map_array.entries(); + let struct_array = entries + .as_any() + .downcast_ref::() + .ok_or_else(|| DataFusionError::Internal("Expected StructArray for entries".to_string()))?; + + if struct_array.num_columns() != 2 { + return exec_err!("Map entries must have exactly 2 columns (keys and values)"); + } + + let keys = struct_array.column(0); + let values = struct_array.column(1); + let offsets = map_array.offsets(); + + let mut sorted_keys_arrays = Vec::new(); + let mut sorted_values_arrays = Vec::new(); + let mut new_offsets = Vec::with_capacity(map_array.len() + 1); + new_offsets.push(0i32); + + for row_idx in 0..map_array.len() { + let start = offsets[row_idx] as usize; + let end = offsets[row_idx + 1] as usize; + let len = end - start; + + if len == 0 { + new_offsets.push(new_offsets[row_idx]); + continue; + } + + let row_keys = keys.slice(start, len); + let row_values = values.slice(start, len); + + if len == 1 { + sorted_keys_arrays.push(row_keys); + sorted_values_arrays.push(row_values); + new_offsets.push(new_offsets[row_idx] + len as i32); + continue; + } + + let sort_columns = vec![SortColumn { + values: Arc::clone(&row_keys), + options: Some(SortOptions { + descending: false, + nulls_first: false, + }), + }]; + + let indices = lexsort_to_indices(&sort_columns, None)?; + let sorted_keys = take(&row_keys, &indices, None)?; + let sorted_values = take(&row_values, &indices, None)?; + + sorted_keys_arrays.push(sorted_keys); + sorted_values_arrays.push(sorted_values); + new_offsets.push(new_offsets[row_idx] + len as i32); + } + + if sorted_keys_arrays.is_empty() { + let key_field = Arc::new(Field::new( + "key", + keys.data_type().clone(), + keys.is_nullable(), + )); + let value_field = Arc::new(Field::new( + "value", + values.data_type().clone(), + values.is_nullable(), + )); + let entries_field = Arc::new(Field::new( + "entries", + DataType::Struct(vec![Arc::clone(&key_field), Arc::clone(&value_field)].into()), + false, + )); + + let empty_keys = arrow::array::new_empty_array(keys.data_type()); + let empty_values = arrow::array::new_empty_array(values.data_type()); + let empty_entries = arrow::array::StructArray::new( + vec![key_field, value_field].into(), + vec![empty_keys, empty_values], + None, + ); + + return Ok(Arc::new(MapArray::new( + entries_field, + arrow::buffer::OffsetBuffer::new(vec![0i32; map_array.len() + 1].into()), + empty_entries, + map_array.nulls().cloned(), + false, + ))); + } + + let sorted_keys_refs: Vec<&dyn Array> = sorted_keys_arrays.iter().map(|a| a.as_ref()).collect(); + let sorted_values_refs: Vec<&dyn Array> = + sorted_values_arrays.iter().map(|a| a.as_ref()).collect(); + + let concatenated_keys = arrow::compute::concat(&sorted_keys_refs)?; + let concatenated_values = arrow::compute::concat(&sorted_values_refs)?; + + let key_field = Arc::new(Field::new( + "key", + keys.data_type().clone(), + keys.is_nullable(), + )); + let value_field = Arc::new(Field::new( + "value", + values.data_type().clone(), + values.is_nullable(), + )); + + let sorted_entries = arrow::array::StructArray::new( + vec![Arc::clone(&key_field), Arc::clone(&value_field)].into(), + vec![concatenated_keys, concatenated_values], + None, + ); + + let entries_field = Arc::new(Field::new( + "entries", + DataType::Struct(vec![key_field, value_field].into()), + false, + )); + + Ok(Arc::new(MapArray::new( + entries_field, + arrow::buffer::OffsetBuffer::new(new_offsets.into()), + sorted_entries, + map_array.nulls().cloned(), + false, + ))) +} + +fn spark_map_sort_scalar(scalar: &ScalarValue) -> Result { + match scalar { + ScalarValue::Null => Ok(ScalarValue::Null), + _ => exec_err!( + "map_sort scalar function only supports map types, got: {:?}", + scalar + ), + } +} diff --git a/native/spark-expr/src/map_funcs/mod.rs b/native/spark-expr/src/map_funcs/mod.rs new file mode 100644 index 0000000000..a1bfd34923 --- /dev/null +++ b/native/spark-expr/src/map_funcs/mod.rs @@ -0,0 +1,20 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +mod map_sort; + +pub use map_sort::spark_map_sort; From f9e805649f1c9dd6c224f3e06d1cbbf8a333fd3b Mon Sep 17 00:00:00 2001 From: shekharrajak Date: Sat, 17 Jan 2026 01:40:20 +0530 Subject: [PATCH 2/5] feat: add CometMapSort serializer --- .../apache/comet/serde/QueryPlanSerde.scala | 3 ++- .../scala/org/apache/comet/serde/maps.scala | 20 +++++++++++++++++++ 2 files changed, 22 insertions(+), 1 deletion(-) diff --git a/spark/src/main/scala/org/apache/comet/serde/QueryPlanSerde.scala b/spark/src/main/scala/org/apache/comet/serde/QueryPlanSerde.scala index e50b1d80e6..9d7794b5a5 100644 --- a/spark/src/main/scala/org/apache/comet/serde/QueryPlanSerde.scala +++ b/spark/src/main/scala/org/apache/comet/serde/QueryPlanSerde.scala @@ -126,7 +126,8 @@ object QueryPlanSerde extends Logging with CometExprShim { classOf[MapKeys] -> CometMapKeys, classOf[MapEntries] -> CometMapEntries, classOf[MapValues] -> CometMapValues, - classOf[MapFromArrays] -> CometMapFromArrays) + classOf[MapFromArrays] -> CometMapFromArrays, + classOf[MapSort] -> CometMapSort) private val structExpressions: Map[Class[_ <: Expression], CometExpressionSerde[_]] = Map( classOf[CreateNamedStruct] -> CometCreateNamedStruct, diff --git a/spark/src/main/scala/org/apache/comet/serde/maps.scala b/spark/src/main/scala/org/apache/comet/serde/maps.scala index 2e217f6af0..c15ab7e547 100644 --- a/spark/src/main/scala/org/apache/comet/serde/maps.scala +++ b/spark/src/main/scala/org/apache/comet/serde/maps.scala @@ -20,8 +20,10 @@ package org.apache.comet.serde import org.apache.spark.sql.catalyst.expressions._ +import org.apache.spark.sql.catalyst.expressions.objects.RowOrdering import org.apache.spark.sql.types.{ArrayType, MapType} +import org.apache.comet.CometSparkSessionExtensions.withInfo import org.apache.comet.serde.QueryPlanSerde.{exprToProtoInternal, optExprWithInfo, scalarFunctionExprToProto, scalarFunctionExprToProtoWithReturnType} object CometMapKeys extends CometExpressionSerde[MapKeys] { @@ -89,3 +91,21 @@ object CometMapFromArrays extends CometExpressionSerde[MapFromArrays] { optExprWithInfo(mapFromArraysExpr, expr, expr.children: _*) } } + +object CometMapSort extends CometExpressionSerde[MapSort] { + + override def convert( + expr: MapSort, + inputs: Seq[Attribute], + binding: Boolean): Option[ExprOuterClass.Expr] = { + val keyType = expr.base.dataType.asInstanceOf[MapType].keyType + if (!RowOrdering.isOrderable(keyType)) { + withInfo(expr, s"map_sort requires orderable key type, got: $keyType") + return None + } + + val childExpr = exprToProtoInternal(expr.base, inputs, binding) + val mapSortScalarExpr = scalarFunctionExprToProto("map_sort", childExpr) + optExprWithInfo(mapSortScalarExpr, expr, expr.children: _*) + } +} From 3e9336a2984750dc3b58a532d67464bfea884f2a Mon Sep 17 00:00:00 2001 From: shekharrajak Date: Sat, 17 Jan 2026 01:40:26 +0530 Subject: [PATCH 3/5] test: add comprehensive map_sort tests --- .../comet/CometMapExpressionSuite.scala | 161 ++++++++++++++++++ 1 file changed, 161 insertions(+) diff --git a/spark/src/test/scala/org/apache/comet/CometMapExpressionSuite.scala b/spark/src/test/scala/org/apache/comet/CometMapExpressionSuite.scala index 9276a20348..9b91d17ba5 100644 --- a/spark/src/test/scala/org/apache/comet/CometMapExpressionSuite.scala +++ b/spark/src/test/scala/org/apache/comet/CometMapExpressionSuite.scala @@ -157,4 +157,165 @@ class CometMapExpressionSuite extends CometTestBase { } } + test("map_sort with integer keys") { + withTempDir { dir => + withTempView("t1") { + val path = new Path(dir.toURI.toString, "test.parquet") + withSQLConf(CometConf.COMET_ENABLED.key -> "false") { + val df = spark + .range(5) + .select(map(lit(3), lit("c"), lit(1), lit("a"), lit(2), lit("b")).alias("m")) + df.write.parquet(path.toString) + } + spark.read.parquet(path.toString).createOrReplaceTempView("t1") + checkSparkAnswerAndOperator(sql("SELECT map_sort(m) FROM t1")) + } + } + } + + test("map_sort with string keys") { + withTempDir { dir => + withTempView("t1") { + val path = new Path(dir.toURI.toString, "test.parquet") + withSQLConf(CometConf.COMET_ENABLED.key -> "false") { + val df = spark + .range(5) + .select(map(lit("z"), lit(1), lit("a"), lit(2), lit("m"), lit(3)).alias("m")) + df.write.parquet(path.toString) + } + spark.read.parquet(path.toString).createOrReplaceTempView("t1") + checkSparkAnswerAndOperator(sql("SELECT map_sort(m) FROM t1")) + } + } + } + + test("map_sort with double keys") { + withTempDir { dir => + withTempView("t1") { + val path = new Path(dir.toURI.toString, "test.parquet") + withSQLConf(CometConf.COMET_ENABLED.key -> "false") { + val df = spark + .range(5) + .select(map(lit(3.5), lit("c"), lit(1.2), lit("a"), lit(2.8), lit("b")).alias("m")) + df.write.parquet(path.toString) + } + spark.read.parquet(path.toString).createOrReplaceTempView("t1") + checkSparkAnswerAndOperator(sql("SELECT map_sort(m) FROM t1")) + } + } + } + + test("map_sort with null and empty maps") { + withTempDir { dir => + withTempView("t1") { + val path = new Path(dir.toURI.toString, "test.parquet") + withSQLConf(CometConf.COMET_ENABLED.key -> "false") { + val df = spark + .range(5) + .select( + when(col("id") === 0, lit(null)) + .when(col("id") === 1, map()) + .when(col("id") === 2, map(lit(1), lit("a"))) + .otherwise(map(lit(3), lit("c"), lit(2), lit("b"))) + .alias("m")) + df.write.parquet(path.toString) + } + spark.read.parquet(path.toString).createOrReplaceTempView("t1") + checkSparkAnswerAndOperator(sql("SELECT map_sort(m) FROM t1")) + } + } + } + + test("map_sort with struct keys") { + withTempDir { dir => + withTempView("t1") { + val path = new Path(dir.toURI.toString, "test.parquet") + withSQLConf(CometConf.COMET_ENABLED.key -> "false") { + val df = spark + .range(3) + .select( + map( + struct(lit(2), lit("b")), + lit("second"), + struct(lit(1), lit("a")), + lit("first"), + struct(lit(3), lit("c")), + lit("third")).alias("m")) + df.write.parquet(path.toString) + } + spark.read.parquet(path.toString).createOrReplaceTempView("t1") + checkSparkAnswerAndOperator(sql("SELECT map_sort(m) FROM t1")) + } + } + } + + test("map_sort with array keys") { + withTempDir { dir => + withTempView("t1") { + val path = new Path(dir.toURI.toString, "test.parquet") + withSQLConf(CometConf.COMET_ENABLED.key -> "false") { + val df = spark + .range(3) + .select( + map( + array(lit(2), lit(3)), + lit("array2"), + array(lit(1), lit(2)), + lit("array1"), + array(lit(3), lit(4)), + lit("array3")).alias("m")) + df.write.parquet(path.toString) + } + spark.read.parquet(path.toString).createOrReplaceTempView("t1") + checkSparkAnswerAndOperator(sql("SELECT map_sort(m) FROM t1")) + } + } + } + + test("map_sort with complex values") { + withTempDir { dir => + withTempView("t1") { + val path = new Path(dir.toURI.toString, "test.parquet") + withSQLConf(CometConf.COMET_ENABLED.key -> "false") { + val df = spark + .range(3) + .select( + map( + lit(3), + struct(lit("c"), array(lit(30), lit(31))), + lit(1), + struct(lit("a"), array(lit(10), lit(11))), + lit(2), + struct(lit("b"), array(lit(20), lit(21)))).alias("m")) + df.write.parquet(path.toString) + } + spark.read.parquet(path.toString).createOrReplaceTempView("t1") + checkSparkAnswerAndOperator(sql("SELECT map_sort(m) FROM t1")) + } + } + } + + test("map_sort fallback for non-orderable keys") { + withTempDir { dir => + withTempView("t1") { + val path = new Path(dir.toURI.toString, "test.parquet") + withSQLConf(CometConf.COMET_ENABLED.key -> "false") { + val df = spark + .range(3) + .select( + map( + map(lit(1), lit("inner1")), + lit("outer1"), + map(lit(2), lit("inner2")), + lit("outer2")).alias("m")) + df.write.parquet(path.toString) + } + spark.read.parquet(path.toString).createOrReplaceTempView("t1") + checkSparkAnswerAndFallbackReason( + sql("SELECT map_sort(m) FROM t1"), + "map_sort requires orderable key type") + } + } + } + } From afd298596b28e6850ada3f14c304a28ee6d9b98f Mon Sep 17 00:00:00 2001 From: shekharrajak Date: Wed, 21 Jan 2026 01:22:24 +0530 Subject: [PATCH 4/5] refactor: remove MapSort from shared code MapSort only exists in Spark 4.0+, not in 3.4/3.5 --- .../apache/comet/serde/QueryPlanSerde.scala | 3 +-- .../scala/org/apache/comet/serde/maps.scala | 19 ------------------- 2 files changed, 1 insertion(+), 21 deletions(-) diff --git a/spark/src/main/scala/org/apache/comet/serde/QueryPlanSerde.scala b/spark/src/main/scala/org/apache/comet/serde/QueryPlanSerde.scala index 9d7794b5a5..e50b1d80e6 100644 --- a/spark/src/main/scala/org/apache/comet/serde/QueryPlanSerde.scala +++ b/spark/src/main/scala/org/apache/comet/serde/QueryPlanSerde.scala @@ -126,8 +126,7 @@ object QueryPlanSerde extends Logging with CometExprShim { classOf[MapKeys] -> CometMapKeys, classOf[MapEntries] -> CometMapEntries, classOf[MapValues] -> CometMapValues, - classOf[MapFromArrays] -> CometMapFromArrays, - classOf[MapSort] -> CometMapSort) + classOf[MapFromArrays] -> CometMapFromArrays) private val structExpressions: Map[Class[_ <: Expression], CometExpressionSerde[_]] = Map( classOf[CreateNamedStruct] -> CometCreateNamedStruct, diff --git a/spark/src/main/scala/org/apache/comet/serde/maps.scala b/spark/src/main/scala/org/apache/comet/serde/maps.scala index c15ab7e547..9557f3655a 100644 --- a/spark/src/main/scala/org/apache/comet/serde/maps.scala +++ b/spark/src/main/scala/org/apache/comet/serde/maps.scala @@ -20,7 +20,6 @@ package org.apache.comet.serde import org.apache.spark.sql.catalyst.expressions._ -import org.apache.spark.sql.catalyst.expressions.objects.RowOrdering import org.apache.spark.sql.types.{ArrayType, MapType} import org.apache.comet.CometSparkSessionExtensions.withInfo @@ -91,21 +90,3 @@ object CometMapFromArrays extends CometExpressionSerde[MapFromArrays] { optExprWithInfo(mapFromArraysExpr, expr, expr.children: _*) } } - -object CometMapSort extends CometExpressionSerde[MapSort] { - - override def convert( - expr: MapSort, - inputs: Seq[Attribute], - binding: Boolean): Option[ExprOuterClass.Expr] = { - val keyType = expr.base.dataType.asInstanceOf[MapType].keyType - if (!RowOrdering.isOrderable(keyType)) { - withInfo(expr, s"map_sort requires orderable key type, got: $keyType") - return None - } - - val childExpr = exprToProtoInternal(expr.base, inputs, binding) - val mapSortScalarExpr = scalarFunctionExprToProto("map_sort", childExpr) - optExprWithInfo(mapSortScalarExpr, expr, expr.children: _*) - } -} From c2cbf778bbb4c1cebebddf53a149a058e398bd22 Mon Sep 17 00:00:00 2001 From: shekharrajak Date: Wed, 21 Jan 2026 01:22:31 +0530 Subject: [PATCH 5/5] feat: add MapSort support for Spark 4.0 via shim Adds MapSort serialization to Spark 4.0 version shim --- .../org/apache/comet/shims/CometExprShim.scala | 15 +++++++++++++-- 1 file changed, 13 insertions(+), 2 deletions(-) diff --git a/spark/src/main/spark-4.0/org/apache/comet/shims/CometExprShim.scala b/spark/src/main/spark-4.0/org/apache/comet/shims/CometExprShim.scala index fc3db183b3..1b06c79d24 100644 --- a/spark/src/main/spark-4.0/org/apache/comet/shims/CometExprShim.scala +++ b/spark/src/main/spark-4.0/org/apache/comet/shims/CometExprShim.scala @@ -23,13 +23,13 @@ import org.apache.spark.sql.catalyst.expressions._ import org.apache.spark.sql.catalyst.expressions.objects.StaticInvoke import org.apache.spark.sql.internal.SQLConf import org.apache.spark.sql.internal.types.StringTypeWithCollation -import org.apache.spark.sql.types.{BinaryType, BooleanType, DataTypes, StringType} +import org.apache.spark.sql.types.{BinaryType, BooleanType, DataTypes, MapType, StringType} import org.apache.comet.CometSparkSessionExtensions.withInfo import org.apache.comet.expressions.{CometCast, CometEvalMode} import org.apache.comet.serde.{CommonStringExprs, Compatible, ExprOuterClass, Incompatible} import org.apache.comet.serde.ExprOuterClass.{BinaryOutputStyle, Expr} -import org.apache.comet.serde.QueryPlanSerde.exprToProtoInternal +import org.apache.comet.serde.QueryPlanSerde.{exprToProtoInternal, optExprWithInfo, scalarFunctionExprToProto} /** * `CometExprShim` acts as a shim for parsing expressions from different Spark versions. @@ -103,6 +103,17 @@ trait CometExprShim extends CommonStringExprs { None } + case expr: MapSort => + val keyType = expr.base.dataType.asInstanceOf[MapType].keyType + if (!RowOrdering.isOrderable(keyType)) { + withInfo(expr, s"map_sort requires orderable key type, got: $keyType") + return None + } + + val childExpr = exprToProtoInternal(expr.base, inputs, binding) + val mapSortScalarExpr = scalarFunctionExprToProto("map_sort", childExpr) + optExprWithInfo(mapSortScalarExpr, expr, expr.children: _*) + case _ => None } }