-
Notifications
You must be signed in to change notification settings - Fork 272
perf: [iceberg] Use protobuf instead of JSON to serialize Iceberg partition values #3247
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
perf: [iceberg] Use protobuf instead of JSON to serialize Iceberg partition values #3247
Conversation
hsiang-c
left a comment
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Replace JSON with Protocol Buffer LGTM
Codecov Report❌ Patch coverage is
Additional details and impacted files@@ Coverage Diff @@
## main #3247 +/- ##
============================================
+ Coverage 56.12% 60.07% +3.94%
- Complexity 976 1438 +462
============================================
Files 119 172 +53
Lines 11743 15927 +4184
Branches 2251 2631 +380
============================================
+ Hits 6591 9568 +2977
- Misses 4012 5031 +1019
- Partials 1140 1328 +188 ☔ View full report in Codecov by Sentry. 🚀 New features to boost your workflow:
|
native/core/src/execution/planner.rs
Outdated
| /// This replaces JSON parsing with direct protobuf deserialization with a more compact | ||
| /// representation (e.g., timestamps as integers vs strings). |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
nit: we can remove the part of the comment that explains how this used to work
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
done
| checkIcebergNativeScan( | ||
| "SELECT COUNT(*) FROM s3_catalog.db.large_partitioned_test WHERE partition_id IN (0, 50, 99)") | ||
|
|
||
| spark.sql("DROP TABLE s3_catalog.db.large_partitioned_test") |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
(nit) You can try DROP TABLE s3_catalog.db.large_partitioned_test PURGE to remove files on disk.
| oneof value { | ||
| int32 int_val = 2; | ||
| int64 long_val = 3; | ||
| int64 date_val = 4; // days since epoch | ||
| int64 timestamp_val = 5; // microseconds since epoch | ||
| int64 timestamp_tz_val = 6; // microseconds with timezone | ||
| string string_val = 7; | ||
| double double_val = 8; | ||
| float float_val = 9; | ||
| bytes decimal_val = 10; // unscaled BigInteger bytes | ||
| bool bool_val = 11; | ||
| bytes uuid_val = 12; | ||
| bytes fixed_val = 13; | ||
| bytes binary_val = 14; |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
We may want to consider consolidating this with the existing Literal defined in protobuf. This does not need to happen for the current PR.
message Literal {
oneof value {
bool bool_val = 1;
// Protobuf doesn't provide int8 and int16, we put them into int32 and convert
// to int8 and int16 when deserializing.
int32 byte_val = 2;
int32 short_val = 3;
int32 int_val = 4;
int64 long_val = 5;
float float_val = 6;
double double_val = 7;
string string_val = 8;
bytes bytes_val = 9;
bytes decimal_val = 10;
ListLiteral list_val = 11;
}There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
These are Iceberg types though.
andygrove
left a comment
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
LGTM pending CI
| /** | ||
| * Legacy JSON serialization function - removed in favor of protobuf. Kept as reference for | ||
| * conversion logic. | ||
| */ | ||
| private def partitionValueToJson(fieldTypeStr: String, value: Any): JValue = { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
can we just remove this now?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Found some unused code as a result of removing this. Thanks!
|
@parthchandra there is a clippy failure |
8319470 to
e8b87e7
Compare
|
@parthchandra do you have benchmarks showing the performance improvement? |
|
Merged. Thanks @andygrove @hsiang-c |
Rationale for this change
We see increased GC collection times in jobs with Iceberg scans with a large number (10K-100K) of partitions
What changes are included in this PR?
Partition values are currently serialized to native by constructing a JSON string. This PR changes that to use Protobuf instead.
How are these changes tested?
Added a new unit test for a large number of partitions.
AI note: large parts were generated using Claude Code.