Skip to content

Conversation

@andygrove
Copy link
Member

@andygrove andygrove commented Jan 22, 2026

Closes #1204

Summary

  • Serialize native query plans once and broadcast to all executors
  • Avoid repeated protobuf serialization for each partition

Changes

  • Add serializePlan() method to serialize an Operator once
  • Add getCometIterator() overload accepting pre-serialized bytes
  • Update getNativeLimitRDD to broadcast the serialized plan
  • Update CometTakeOrderedAndProjectExec to broadcast the topK plan

Impact

For a query with 1000 partitions across 10 executors:

  • Plan serialization: 1000x → 1x
  • Plan transfer: 1000x → 10x (once per executor via broadcast)

Test plan

  • Existing tests pass
  • Benchmark with high partition count queries

🤖 Generated with Claude Code

andygrove and others added 2 commits January 22, 2026 12:24
Replace ByteArrayOutputStream with direct CodedOutputStream serialization
to eliminate unnecessary allocations during query plan serialization.

This optimization:
- Pre-allocates exact buffer size using getSerializedSize()
- Eliminates ByteArrayOutputStream's internal buffer resizing
- Removes defensive array copying from toByteArray()
- Applies to 5 hot paths called per-partition during query execution

For a query with 1000 partitions, this eliminates 5000+ unnecessary
allocations and array copies, significantly reducing GC pressure.

Changes:
- operators.scala: getCometIterator() and convertBlock()
- CometNativeWriteExec.scala: serializedPlanOpt() and doExecute()
- ParquetFilters.scala: createNativeFilters()
Serialize native query plans once and broadcast to all executors,
avoiding repeated protobuf serialization for each partition.

This optimization:
- Adds serializePlan() method to serialize an Operator once
- Adds getCometIterator() overload accepting pre-serialized bytes
- Updates getNativeLimitRDD to broadcast the serialized plan
- Updates CometTakeOrderedAndProjectExec to broadcast the topK plan

For a query with 1000 partitions across 10 executors, this reduces
plan serialization from 1000x to 1x, and plan transfer from 1000x
to 10x (once per executor via broadcast).

Co-Authored-By: Claude Opus 4.5 <noreply@anthropic.com>
@andygrove andygrove changed the title [EXPERIMENTAL] perf: cache and broadcast serialized plans across partitions perf: [EXPERIMENTAL] cache and broadcast serialized plans across partitions Jan 22, 2026
CometExecUtils
.getTopKNativePlan(child.output, sortOrder, child, limit)
.get)
val broadcastTopK = sparkContext.broadcast(serializedTopK)
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Do we need to unpersist() it after use?

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think we can rely on Spark to GC?

@codecov-commenter
Copy link

codecov-commenter commented Jan 22, 2026

Codecov Report

❌ Patch coverage is 89.28571% with 6 lines in your changes missing coverage. Please review.
✅ Project coverage is 60.11%. Comparing base (f09f8af) to head (17870c2).
⚠️ Report is 866 commits behind head on main.

Files with missing lines Patch % Lines
.../apache/spark/sql/comet/CometNativeWriteExec.scala 45.45% 6 Missing ⚠️
Additional details and impacted files
@@             Coverage Diff              @@
##               main    #3244      +/-   ##
============================================
+ Coverage     56.12%   60.11%   +3.98%     
- Complexity      976     1429     +453     
============================================
  Files           119      170      +51     
  Lines         11743    15805    +4062     
  Branches       2251     2606     +355     
============================================
+ Hits           6591     9501    +2910     
- Misses         4012     4983     +971     
- Partials       1140     1321     +181     

☔ View full report in Codecov by Sentry.
📢 Have feedback on the report? Share it here.

🚀 New features to boost your workflow:
  • ❄️ Test Analytics: Detect flaky tests, report on failures, and find test suite problems.

@andygrove
Copy link
Member Author

I'm not sure if this change is worth it, now that we only do the serde once per stage.

@andygrove andygrove closed this Jan 23, 2026
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Labels

None yet

Projects

None yet

Development

Successfully merging this pull request may close these issues.

Only create one native plan for a query on an executor

3 participants