-
Notifications
You must be signed in to change notification settings - Fork 272
perf: [EXPERIMENTAL] cache and broadcast serialized plans across partitions #3244
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Conversation
Replace ByteArrayOutputStream with direct CodedOutputStream serialization to eliminate unnecessary allocations during query plan serialization. This optimization: - Pre-allocates exact buffer size using getSerializedSize() - Eliminates ByteArrayOutputStream's internal buffer resizing - Removes defensive array copying from toByteArray() - Applies to 5 hot paths called per-partition during query execution For a query with 1000 partitions, this eliminates 5000+ unnecessary allocations and array copies, significantly reducing GC pressure. Changes: - operators.scala: getCometIterator() and convertBlock() - CometNativeWriteExec.scala: serializedPlanOpt() and doExecute() - ParquetFilters.scala: createNativeFilters()
Serialize native query plans once and broadcast to all executors, avoiding repeated protobuf serialization for each partition. This optimization: - Adds serializePlan() method to serialize an Operator once - Adds getCometIterator() overload accepting pre-serialized bytes - Updates getNativeLimitRDD to broadcast the serialized plan - Updates CometTakeOrderedAndProjectExec to broadcast the topK plan For a query with 1000 partitions across 10 executors, this reduces plan serialization from 1000x to 1x, and plan transfer from 1000x to 10x (once per executor via broadcast). Co-Authored-By: Claude Opus 4.5 <noreply@anthropic.com>
| CometExecUtils | ||
| .getTopKNativePlan(child.output, sortOrder, child, limit) | ||
| .get) | ||
| val broadcastTopK = sparkContext.broadcast(serializedTopK) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Do we need to unpersist() it after use?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I think we can rely on Spark to GC?
Codecov Report❌ Patch coverage is
Additional details and impacted files@@ Coverage Diff @@
## main #3244 +/- ##
============================================
+ Coverage 56.12% 60.11% +3.98%
- Complexity 976 1429 +453
============================================
Files 119 170 +51
Lines 11743 15805 +4062
Branches 2251 2606 +355
============================================
+ Hits 6591 9501 +2910
- Misses 4012 4983 +971
- Partials 1140 1321 +181 ☔ View full report in Codecov by Sentry. 🚀 New features to boost your workflow:
|
|
I'm not sure if this change is worth it, now that we only do the serde once per stage. |
Closes #1204
Summary
Changes
serializePlan()method to serialize anOperatoroncegetCometIterator()overload accepting pre-serialized bytesgetNativeLimitRDDto broadcast the serialized planCometTakeOrderedAndProjectExecto broadcast the topK planImpact
For a query with 1000 partitions across 10 executors:
Test plan
🤖 Generated with Claude Code