From 86f6eb6b8fc28bda52fc2aa99a71273ba49801b8 Mon Sep 17 00:00:00 2001 From: Andy Grove Date: Thu, 22 Jan 2026 12:24:25 -0700 Subject: [PATCH 1/3] perf: reduce GC pressure in protobuf serialization Replace ByteArrayOutputStream with direct CodedOutputStream serialization to eliminate unnecessary allocations during query plan serialization. This optimization: - Pre-allocates exact buffer size using getSerializedSize() - Eliminates ByteArrayOutputStream's internal buffer resizing - Removes defensive array copying from toByteArray() - Applies to 5 hot paths called per-partition during query execution For a query with 1000 partitions, this eliminates 5000+ unnecessary allocations and array copies, significantly reducing GC pressure. Changes: - operators.scala: getCometIterator() and convertBlock() - CometNativeWriteExec.scala: serializedPlanOpt() and doExecute() - ParquetFilters.scala: createNativeFilters() --- .../apache/comet/parquet/ParquetFilters.scala | 12 +++++++---- .../sql/comet/CometNativeWriteExec.scala | 21 ++++++++++++------- .../apache/spark/sql/comet/operators.scala | 20 +++++++++++------- 3 files changed, 33 insertions(+), 20 deletions(-) diff --git a/spark/src/main/scala/org/apache/comet/parquet/ParquetFilters.scala b/spark/src/main/scala/org/apache/comet/parquet/ParquetFilters.scala index dbc3e17f83..5e63199e41 100644 --- a/spark/src/main/scala/org/apache/comet/parquet/ParquetFilters.scala +++ b/spark/src/main/scala/org/apache/comet/parquet/ParquetFilters.scala @@ -43,6 +43,8 @@ import org.apache.spark.sql.sources import org.apache.spark.sql.types.StructType import org.apache.spark.unsafe.types.UTF8String +import com.google.protobuf.CodedOutputStream + import org.apache.comet.parquet.SourceFilterSerde.{createBinaryExpr, createNameExpr, createUnaryExpr, createValueExpr} import org.apache.comet.serde.ExprOuterClass import org.apache.comet.serde.QueryPlanSerde.scalarFunctionExprToProto @@ -885,10 +887,12 @@ class ParquetFilters( def createNativeFilters(predicates: Seq[sources.Filter]): Option[Array[Byte]] = { predicates.reduceOption(sources.And).flatMap(createNativeFilter).map { expr => - val outputStream = new ByteArrayOutputStream() - expr.writeTo(outputStream) - outputStream.close() - outputStream.toByteArray + val size = expr.getSerializedSize + val bytes = new Array[Byte](size) + val codedOutput = CodedOutputStream.newInstance(bytes) + expr.writeTo(codedOutput) + codedOutput.checkNoSpaceLeft() + bytes } } diff --git a/spark/src/main/scala/org/apache/spark/sql/comet/CometNativeWriteExec.scala b/spark/src/main/scala/org/apache/spark/sql/comet/CometNativeWriteExec.scala index f153a691ef..04625b2b04 100644 --- a/spark/src/main/scala/org/apache/spark/sql/comet/CometNativeWriteExec.scala +++ b/spark/src/main/scala/org/apache/spark/sql/comet/CometNativeWriteExec.scala @@ -34,6 +34,8 @@ import org.apache.spark.sql.execution.metric.{SQLMetric, SQLMetrics} import org.apache.spark.sql.vectorized.ColumnarBatch import org.apache.spark.util.Utils +import com.google.protobuf.CodedOutputStream + import org.apache.comet.CometExecIterator import org.apache.comet.serde.OperatorOuterClass.Operator @@ -75,10 +77,12 @@ case class CometNativeWriteExec( sparkContext.collectionAccumulator[FileCommitProtocol.TaskCommitMessage]("taskCommitMessages") override def serializedPlanOpt: SerializedPlan = { - val outputStream = new ByteArrayOutputStream() - nativeOp.writeTo(outputStream) - outputStream.close() - SerializedPlan(Some(outputStream.toByteArray)) + val size = nativeOp.getSerializedSize + val bytes = new Array[Byte](size) + val codedOutput = CodedOutputStream.newInstance(bytes) + nativeOp.writeTo(codedOutput) + codedOutput.checkNoSpaceLeft() + SerializedPlan(Some(bytes)) } override def withNewChildInternal(newChild: SparkPlan): SparkPlan = @@ -196,10 +200,11 @@ case class CometNativeWriteExec( val nativeMetrics = CometMetricNode.fromCometPlan(this) - val outputStream = new ByteArrayOutputStream() - modifiedNativeOp.writeTo(outputStream) - outputStream.close() - val planBytes = outputStream.toByteArray + val size = modifiedNativeOp.getSerializedSize + val planBytes = new Array[Byte](size) + val codedOutput = CodedOutputStream.newInstance(planBytes) + modifiedNativeOp.writeTo(codedOutput) + codedOutput.checkNoSpaceLeft() val execIterator = new CometExecIterator( CometExec.newIterId, diff --git a/spark/src/main/scala/org/apache/spark/sql/comet/operators.scala b/spark/src/main/scala/org/apache/spark/sql/comet/operators.scala index 0a435e5b7a..a19449a7ab 100644 --- a/spark/src/main/scala/org/apache/spark/sql/comet/operators.scala +++ b/spark/src/main/scala/org/apache/spark/sql/comet/operators.scala @@ -50,6 +50,7 @@ import org.apache.spark.util.SerializableConfiguration import org.apache.spark.util.io.ChunkedByteBuffer import com.google.common.base.Objects +import com.google.protobuf.CodedOutputStream import org.apache.comet.{CometConf, CometExecIterator, CometRuntimeException, ConfigEntry} import org.apache.comet.CometSparkSessionExtensions.{isCometShuffleEnabled, withInfo} @@ -139,10 +140,11 @@ object CometExec { partitionIdx: Int, broadcastedHadoopConfForEncryption: Option[Broadcast[SerializableConfiguration]], encryptedFilePaths: Seq[String]): CometExecIterator = { - val outputStream = new ByteArrayOutputStream() - nativePlan.writeTo(outputStream) - outputStream.close() - val bytes = outputStream.toByteArray + val size = nativePlan.getSerializedSize + val bytes = new Array[Byte](size) + val codedOutput = CodedOutputStream.newInstance(bytes) + nativePlan.writeTo(codedOutput) + codedOutput.checkNoSpaceLeft() new CometExecIterator( newIterId, inputs, @@ -414,10 +416,12 @@ abstract class CometNativeExec extends CometExec { def convertBlock(): CometNativeExec = { def transform(arg: Any): AnyRef = arg match { case serializedPlan: SerializedPlan if serializedPlan.isEmpty => - val out = new ByteArrayOutputStream() - nativeOp.writeTo(out) - out.close() - SerializedPlan(Some(out.toByteArray)) + val size = nativeOp.getSerializedSize + val bytes = new Array[Byte](size) + val codedOutput = CodedOutputStream.newInstance(bytes) + nativeOp.writeTo(codedOutput) + codedOutput.checkNoSpaceLeft() + SerializedPlan(Some(bytes)) case other: AnyRef => other case null => null } From 0a6f97d73c98961a57efd3807544ae3342dba11b Mon Sep 17 00:00:00 2001 From: Andy Grove Date: Thu, 22 Jan 2026 13:16:27 -0700 Subject: [PATCH 2/3] fix --- .../main/scala/org/apache/comet/parquet/ParquetFilters.scala | 1 - .../scala/org/apache/spark/sql/comet/CometNativeWriteExec.scala | 2 -- spark/src/main/scala/org/apache/spark/sql/comet/operators.scala | 1 - 3 files changed, 4 deletions(-) diff --git a/spark/src/main/scala/org/apache/comet/parquet/ParquetFilters.scala b/spark/src/main/scala/org/apache/comet/parquet/ParquetFilters.scala index 5e63199e41..f8da68d59f 100644 --- a/spark/src/main/scala/org/apache/comet/parquet/ParquetFilters.scala +++ b/spark/src/main/scala/org/apache/comet/parquet/ParquetFilters.scala @@ -19,7 +19,6 @@ package org.apache.comet.parquet -import java.io.ByteArrayOutputStream import java.lang.{Boolean => JBoolean, Byte => JByte, Double => JDouble, Float => JFloat, Long => JLong, Short => JShort} import java.math.{BigDecimal => JBigDecimal} import java.sql.{Date, Timestamp} diff --git a/spark/src/main/scala/org/apache/spark/sql/comet/CometNativeWriteExec.scala b/spark/src/main/scala/org/apache/spark/sql/comet/CometNativeWriteExec.scala index 04625b2b04..39e7ac6eef 100644 --- a/spark/src/main/scala/org/apache/spark/sql/comet/CometNativeWriteExec.scala +++ b/spark/src/main/scala/org/apache/spark/sql/comet/CometNativeWriteExec.scala @@ -19,8 +19,6 @@ package org.apache.spark.sql.comet -import java.io.ByteArrayOutputStream - import scala.jdk.CollectionConverters._ import org.apache.hadoop.fs.Path diff --git a/spark/src/main/scala/org/apache/spark/sql/comet/operators.scala b/spark/src/main/scala/org/apache/spark/sql/comet/operators.scala index a19449a7ab..f4f97b8312 100644 --- a/spark/src/main/scala/org/apache/spark/sql/comet/operators.scala +++ b/spark/src/main/scala/org/apache/spark/sql/comet/operators.scala @@ -19,7 +19,6 @@ package org.apache.spark.sql.comet -import java.io.ByteArrayOutputStream import java.util.Locale import scala.collection.mutable From 3ca7a7f737d8d8b67036707ce238204f748a154e Mon Sep 17 00:00:00 2001 From: Andy Grove Date: Thu, 22 Jan 2026 14:32:54 -0700 Subject: [PATCH 3/3] perf: cache serialized query plans to avoid per-partition serialization Add caching for serialized protobuf query plans to avoid serializing the same plan repeatedly for every partition in CometExecIterator. Changes: - Add serializeNativePlan() helper method in CometExec - Add getCometIterator() overload accepting pre-serialized bytes - Update CometExecUtils.getNativeLimitRDD to serialize once - Update CometTakeOrderedAndProjectExec to serialize plans once For a query with N partitions, this eliminates N-1 redundant protobuf serializations per affected code path, reducing CPU overhead and GC pressure during query execution. Co-Authored-By: Claude Opus 4.5 --- .../spark/sql/comet/CometExecUtils.scala | 6 ++- .../CometTakeOrderedAndProjectExec.scala | 29 +++++++++---- .../apache/spark/sql/comet/operators.scala | 41 ++++++++++++++++--- 3 files changed, 60 insertions(+), 16 deletions(-) diff --git a/spark/src/main/scala/org/apache/spark/sql/comet/CometExecUtils.scala b/spark/src/main/scala/org/apache/spark/sql/comet/CometExecUtils.scala index fd97fe3fa2..a2af60142b 100644 --- a/spark/src/main/scala/org/apache/spark/sql/comet/CometExecUtils.scala +++ b/spark/src/main/scala/org/apache/spark/sql/comet/CometExecUtils.scala @@ -53,9 +53,11 @@ object CometExecUtils { limit: Int, offset: Int = 0): RDD[ColumnarBatch] = { val numParts = childPlan.getNumPartitions + // Serialize the plan once before mapping to avoid repeated serialization per partition + val limitOp = CometExecUtils.getLimitNativePlan(outputAttribute, limit, offset).get + val serializedPlan = CometExec.serializeNativePlan(limitOp) childPlan.mapPartitionsWithIndexInternal { case (idx, iter) => - val limitOp = CometExecUtils.getLimitNativePlan(outputAttribute, limit, offset).get - CometExec.getCometIterator(Seq(iter), outputAttribute.length, limitOp, numParts, idx) + CometExec.getCometIterator(Seq(iter), outputAttribute.length, serializedPlan, numParts, idx) } } diff --git a/spark/src/main/scala/org/apache/spark/sql/comet/CometTakeOrderedAndProjectExec.scala b/spark/src/main/scala/org/apache/spark/sql/comet/CometTakeOrderedAndProjectExec.scala index 2517c19f26..2abe783172 100644 --- a/spark/src/main/scala/org/apache/spark/sql/comet/CometTakeOrderedAndProjectExec.scala +++ b/spark/src/main/scala/org/apache/spark/sql/comet/CometTakeOrderedAndProjectExec.scala @@ -133,12 +133,15 @@ case class CometTakeOrderedAndProjectExec( CometExecUtils.getNativeLimitRDD(childRDD, child.output, limit) } else { val numParts = childRDD.getNumPartitions + // Serialize the plan once before mapping to avoid repeated serialization per partition + val topK = + CometExecUtils + .getTopKNativePlan(child.output, sortOrder, child, limit) + .get + val serializedTopK = CometExec.serializeNativePlan(topK) + val numOutputCols = child.output.length childRDD.mapPartitionsWithIndexInternal { case (idx, iter) => - val topK = - CometExecUtils - .getTopKNativePlan(child.output, sortOrder, child, limit) - .get - CometExec.getCometIterator(Seq(iter), child.output.length, topK, numParts, idx) + CometExec.getCometIterator(Seq(iter), numOutputCols, serializedTopK, numParts, idx) } } @@ -154,11 +157,19 @@ case class CometTakeOrderedAndProjectExec( new CometShuffledBatchRDD(dep, readMetrics) } + // Serialize the plan once before mapping to avoid repeated serialization per partition + val topKAndProjection = CometExecUtils + .getProjectionNativePlan(projectList, child.output, sortOrder, child, limit, offset) + .get + val serializedTopKAndProjection = CometExec.serializeNativePlan(topKAndProjection) + val finalOutputLength = output.length singlePartitionRDD.mapPartitionsInternal { iter => - val topKAndProjection = CometExecUtils - .getProjectionNativePlan(projectList, child.output, sortOrder, child, limit, offset) - .get - val it = CometExec.getCometIterator(Seq(iter), output.length, topKAndProjection, 1, 0) + val it = CometExec.getCometIterator( + Seq(iter), + finalOutputLength, + serializedTopKAndProjection, + 1, + 0) setSubqueries(it.id, this) Option(TaskContext.get()).foreach { context => diff --git a/spark/src/main/scala/org/apache/spark/sql/comet/operators.scala b/spark/src/main/scala/org/apache/spark/sql/comet/operators.scala index f4f97b8312..cb70986170 100644 --- a/spark/src/main/scala/org/apache/spark/sql/comet/operators.scala +++ b/spark/src/main/scala/org/apache/spark/sql/comet/operators.scala @@ -113,6 +113,19 @@ object CometExec { def newIterId: Long = curId.getAndIncrement() + /** + * Serialize a native plan to bytes. Use this method to serialize the plan once before calling + * getCometIterator for each partition, avoiding repeated serialization. + */ + def serializeNativePlan(nativePlan: Operator): Array[Byte] = { + val size = nativePlan.getSerializedSize + val bytes = new Array[Byte](size) + val codedOutput = CodedOutputStream.newInstance(bytes) + nativePlan.writeTo(codedOutput) + codedOutput.checkNoSpaceLeft() + bytes + } + def getCometIterator( inputs: Seq[Iterator[ColumnarBatch]], numOutputCols: Int, @@ -130,6 +143,28 @@ object CometExec { encryptedFilePaths = Seq.empty) } + /** + * Create a CometExecIterator with a pre-serialized native plan. Use this overload when + * executing the same plan across multiple partitions to avoid serializing the plan repeatedly. + */ + def getCometIterator( + inputs: Seq[Iterator[ColumnarBatch]], + numOutputCols: Int, + serializedPlan: Array[Byte], + numParts: Int, + partitionIdx: Int): CometExecIterator = { + new CometExecIterator( + newIterId, + inputs, + numOutputCols, + serializedPlan, + CometMetricNode(Map.empty), + numParts, + partitionIdx, + broadcastedHadoopConfForEncryption = None, + encryptedFilePaths = Seq.empty) + } + def getCometIterator( inputs: Seq[Iterator[ColumnarBatch]], numOutputCols: Int, @@ -139,11 +174,7 @@ object CometExec { partitionIdx: Int, broadcastedHadoopConfForEncryption: Option[Broadcast[SerializableConfiguration]], encryptedFilePaths: Seq[String]): CometExecIterator = { - val size = nativePlan.getSerializedSize - val bytes = new Array[Byte](size) - val codedOutput = CodedOutputStream.newInstance(bytes) - nativePlan.writeTo(codedOutput) - codedOutput.checkNoSpaceLeft() + val bytes = serializeNativePlan(nativePlan) new CometExecIterator( newIterId, inputs,