-
Notifications
You must be signed in to change notification settings - Fork 273
perf: cache serialized query plans to avoid per-partition serialization #3246
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Conversation
Replace ByteArrayOutputStream with direct CodedOutputStream serialization to eliminate unnecessary allocations during query plan serialization. This optimization: - Pre-allocates exact buffer size using getSerializedSize() - Eliminates ByteArrayOutputStream's internal buffer resizing - Removes defensive array copying from toByteArray() - Applies to 5 hot paths called per-partition during query execution For a query with 1000 partitions, this eliminates 5000+ unnecessary allocations and array copies, significantly reducing GC pressure. Changes: - operators.scala: getCometIterator() and convertBlock() - CometNativeWriteExec.scala: serializedPlanOpt() and doExecute() - ParquetFilters.scala: createNativeFilters()
Add caching for serialized protobuf query plans to avoid serializing the same plan repeatedly for every partition in CometExecIterator. Changes: - Add serializeNativePlan() helper method in CometExec - Add getCometIterator() overload accepting pre-serialized bytes - Update CometExecUtils.getNativeLimitRDD to serialize once - Update CometTakeOrderedAndProjectExec to serialize plans once For a query with N partitions, this eliminates N-1 redundant protobuf serializations per affected code path, reducing CPU overhead and GC pressure during query execution. Co-Authored-By: Claude Opus 4.5 <noreply@anthropic.com>
Codecov Report✅ All modified and coverable lines are covered by tests. Additional details and impacted files@@ Coverage Diff @@
## main #3246 +/- ##
============================================
+ Coverage 56.12% 60.11% +3.98%
- Complexity 976 1428 +452
============================================
Files 119 170 +51
Lines 11743 15799 +4056
Branches 2251 2604 +353
============================================
+ Hits 6591 9497 +2906
- Misses 4012 4983 +971
- Partials 1140 1319 +179 ☔ View full report in Codecov by Sentry. 🚀 New features to boost your workflow:
|
hsiang-c
left a comment
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Moving serialization out of mapPartitionsWithIndexInternal LGTM
wForget
left a comment
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Thanks @andygrove
Which issue does this PR close?
N/A - Performance optimization
Rationale for this change
The
CometExec.getCometIterator()method serializes the protobuf query plan to bytes every time it's called. Since this method is called insidemapPartitions*for each partition, the same plan was being serialized N times for a query with N partitions. This causes unnecessary CPU overhead and GC pressure.What changes are included in this PR?
operators.scala - Added caching infrastructure:
serializeNativePlan(nativePlan: Operator): Array[Byte]- helper to serialize a plan oncegetCometIteratoroverload accepting pre-serializedArray[Byte]instead ofOperatorCometExecUtils.scala - Updated
getNativeLimitRDD:mapPartitionsWithIndexInternalCometTakeOrderedAndProjectExec.scala - Updated
doExecuteColumnar():How are these changes tested?
Existing tests pass:
🤖 Generated with Claude Code