From b9ffb30c1926aaa465c74129730cd18eb66618bc Mon Sep 17 00:00:00 2001 From: Guotao Yu Date: Wed, 24 Dec 2025 20:19:17 +0800 Subject: [PATCH 1/9] feat: add schema update to table metadata builder --- src/iceberg/partition_spec.cc | 36 +++ src/iceberg/partition_spec.h | 4 + src/iceberg/schema.cc | 40 +++ src/iceberg/schema.h | 19 ++ src/iceberg/table_metadata.cc | 234 +++++++++++++++++- src/iceberg/table_metadata.h | 9 +- src/iceberg/table_update.h | 6 +- .../test/table_metadata_builder_test.cc | 169 ++++++++++++- src/iceberg/test/table_requirements_test.cc | 15 +- src/iceberg/test/table_update_test.cc | 6 +- 10 files changed, 512 insertions(+), 26 deletions(-) diff --git a/src/iceberg/partition_spec.cc b/src/iceberg/partition_spec.cc index 7d0dab40a..fd50a0226 100644 --- a/src/iceberg/partition_spec.cc +++ b/src/iceberg/partition_spec.cc @@ -177,6 +177,42 @@ Status PartitionSpec::Validate(const Schema& schema, bool allow_missing_fields) return {}; } +Status PartitionSpec::ValidatePartitionName(const Schema& schema) const { + std::unordered_set partition_names; + for (const auto& partition_field : fields_) { + auto name = std::string(partition_field.name()); + if (name.empty()) { + return InvalidArgument("Cannot use empty partition name: {}", name); + } + if (partition_names.contains(name)) { + return InvalidArgument("Cannot use partition name more than once: {}", name); + } + partition_names.insert(name); + + ICEBERG_ASSIGN_OR_RAISE(auto schema_field, schema.FindFieldByName(name)); + auto transform_type = partition_field.transform()->transform_type(); + if (transform_type == TransformType::kIdentity) { + // for identity transform case we allow conflicts between partition and schema field + // name as long as they are sourced from the same schema field + if (schema_field.has_value() && + schema_field.value().get().field_id() != partition_field.source_id()) { + return InvalidArgument( + "Cannot create identity partition sourced from different field in schema: {}", + name); + } + } else { + // for all other transforms we don't allow conflicts between partition name and + // schema field name + if (schema_field.has_value()) { + return InvalidArgument( + "Cannot create partition from name that exists in schema: {}", name); + } + } + } + + return {}; +} + Result>> PartitionSpec::GetFieldsBySourceId(int32_t source_id) const { ICEBERG_ASSIGN_OR_RAISE(auto source_id_to_fields, source_id_to_fields_.Get(*this)); diff --git a/src/iceberg/partition_spec.h b/src/iceberg/partition_spec.h index 5fab59526..a7a5c19a3 100644 --- a/src/iceberg/partition_spec.h +++ b/src/iceberg/partition_spec.h @@ -84,6 +84,10 @@ class ICEBERG_EXPORT PartitionSpec : public util::Formattable { /// \return Error status if the partition spec is invalid. Status Validate(const Schema& schema, bool allow_missing_fields) const; + // \brief Validates the partition field names are unique within the partition spec and + // schema. + Status ValidatePartitionName(const Schema& schema) const; + /// \brief Get the partition fields by source ID. /// \param source_id The id of the source field. /// \return The partition fields by source ID, or NotFound if the source field is not diff --git a/src/iceberg/schema.cc b/src/iceberg/schema.cc index 414219f86..2a86bdac3 100644 --- a/src/iceberg/schema.cc +++ b/src/iceberg/schema.cc @@ -30,6 +30,7 @@ #include "iceberg/util/macros.h" #include "iceberg/util/type_util.h" #include "iceberg/util/visit_type.h" +#include "table_metadata.h" namespace iceberg { @@ -228,4 +229,43 @@ Result> Schema::IdentifierFieldNames() const { return names; } +Result Schema::HighestFieldId() const { + ICEBERG_ASSIGN_OR_RAISE(auto id_to_field, id_to_field_.Get(*this)); + + if (id_to_field.get().empty()) { + return kInitialColumnId; + } + + auto max_it = std::ranges::max_element( + id_to_field.get(), + [](const auto& lhs, const auto& rhs) { return lhs.first < rhs.first; }); + + return max_it->first; +} + +bool Schema::SameSchema(const Schema& other) const { return fields_ == other.fields_; } + +Status Schema::Validate(int32_t format_version) const { + // Get all fields including nested ones + ICEBERG_ASSIGN_OR_RAISE(auto id_to_field, id_to_field_.Get(*this)); + + // Check each field's type and defaults + for (const auto& [field_id, field_ref] : id_to_field.get()) { + const auto& field = field_ref.get(); + + // Check if the field's type requires a minimum format version + if (auto it = TableMetadata::kMinFormatVersions.find(field.type()->type_id()); + it != TableMetadata::kMinFormatVersions.end()) { + if (int32_t min_format_version = it->second; format_version < min_format_version) { + return InvalidSchema("Invalid type for {}: {} is not supported until v{}", + field.name(), *field.type(), min_format_version); + } + } + + // TODO(GuoTao.yu): Check default values when they are supported + } + + return {}; +} + } // namespace iceberg diff --git a/src/iceberg/schema.h b/src/iceberg/schema.h index bb9839625..a2442e73e 100644 --- a/src/iceberg/schema.h +++ b/src/iceberg/schema.h @@ -46,6 +46,7 @@ namespace iceberg { class ICEBERG_EXPORT Schema : public StructType { public: static constexpr int32_t kInitialSchemaId = 0; + static constexpr int32_t kInitialColumnId = 0; static constexpr int32_t kInvalidColumnId = -1; explicit Schema(std::vector fields, @@ -121,12 +122,30 @@ class ICEBERG_EXPORT Schema : public StructType { Result> Project( const std::unordered_set& field_ids) const; + /// \brief Return the field IDs of the identifier fields. const std::vector& IdentifierFieldIds() const; /// \brief Return the canonical field names of the identifier fields. Result> IdentifierFieldNames() const; + /// \brief Get the highest field ID in the schema. + /// \return The highest field ID. + Result HighestFieldId() const; + + /// \brief Checks whether this schema is equivalent to another schema while ignoring the + /// schema id. + bool SameSchema(const Schema& other) const; + + /// \brief Validate the schema for a given format version. + /// + /// This validates that the schema does not contain types that were released in later + /// format versions. + /// + /// \param format_version The format version to validate against. + /// \return Error status if the schema is invalid. + Status Validate(int32_t format_version) const; + friend bool operator==(const Schema& lhs, const Schema& rhs) { return lhs.Equals(rhs); } private: diff --git a/src/iceberg/table_metadata.cc b/src/iceberg/table_metadata.cc index ba5b8f328..89c2625fc 100644 --- a/src/iceberg/table_metadata.cc +++ b/src/iceberg/table_metadata.cc @@ -432,7 +432,11 @@ class TableMetadataBuilder::Impl { Status RemoveProperties(const std::unordered_set& removed); Status SetDefaultPartitionSpec(int32_t spec_id); Result AddPartitionSpec(const PartitionSpec& spec); - std::unique_ptr Build(); + Status SetCurrentSchema(int32_t schema_id); + Status RemoveSchemas(const std::vector& schema_ids); + Result AddSchema(const Schema& schema, int32_t new_last_column_id); + + Result> Build(); private: /// \brief Internal method to check for existing sort order and reuse its ID or create a @@ -447,6 +451,26 @@ class TableMetadataBuilder::Impl { /// \return The ID to use for this partition spec (reused if exists, new otherwise) int32_t ReuseOrCreateNewPartitionSpecId(const PartitionSpec& new_spec); + /// \brief Internal method to check for existing schema and reuse its ID or create a new + /// one + /// \param new_schema The schema to check + /// \return The ID to use for this schema (reused if exists, new otherwise + int32_t ReuseOrCreateNewSchemaId(const Schema& new_schema) const; + + /// \brief Update partition spec to use a new schema + /// \param schema The new schema to bind to + /// \param partition_spec The partition spec to update + /// \return The updated partition spec bound to the new schema + static Result> UpdateSpecSchema( + const Schema& schema, const PartitionSpec& partition_spec); + + /// \brief Update sort order to use a new schema + /// \param schema The new schema to bind to + /// \param sort_order The sort order to update + /// \return The updated sort order bound to the new schema + static Result> UpdateSortOrderSchema( + const Schema& schema, const SortOrder& sort_order); + private: // Base metadata (nullptr for new tables) const TableMetadata* base_; @@ -681,7 +705,175 @@ Status TableMetadataBuilder::Impl::RemoveProperties( return {}; } -std::unique_ptr TableMetadataBuilder::Impl::Build() { +Status TableMetadataBuilder::Impl::SetCurrentSchema(int32_t schema_id) { + if (schema_id == kLastAdded) { + if (!last_added_schema_id_.has_value()) { + return InvalidArgument("Cannot set last added schema: no schema has been added"); + } + return SetCurrentSchema(last_added_schema_id_.value()); + } + + if (metadata_.current_schema_id == schema_id) { + return {}; + } + + auto it = schemas_by_id_.find(schema_id); + if (it == schemas_by_id_.end()) { + return InvalidArgument("Cannot set current schema to unknown schema: {}", schema_id); + } + const auto& schema = it->second; + + // Rebuild all partition specs for the new current schema + std::vector> updated_specs; + for (const auto& spec : metadata_.partition_specs) { + ICEBERG_ASSIGN_OR_RAISE(auto updated_spec, UpdateSpecSchema(*schema, *spec)); + updated_specs.push_back(std::move(updated_spec)); + } + metadata_.partition_specs = std::move(updated_specs); + specs_by_id_.clear(); + for (const auto& spec : metadata_.partition_specs) { + specs_by_id_.emplace(spec->spec_id(), spec); + } + + // Rebuild all sort orders for the new current schema + std::vector> updated_orders; + for (const auto& order : metadata_.sort_orders) { + ICEBERG_ASSIGN_OR_RAISE(auto updated_order, UpdateSortOrderSchema(*schema, *order)); + updated_orders.push_back(std::move(updated_order)); + } + metadata_.sort_orders = std::move(updated_orders); + sort_orders_by_id_.clear(); + for (const auto& order : metadata_.sort_orders) { + sort_orders_by_id_.emplace(order->order_id(), order); + } + + // Set the current schema ID + metadata_.current_schema_id = schema_id; + + // Record the change + if (last_added_schema_id_.has_value() && last_added_schema_id_.value() == schema_id) { + changes_.push_back(std::make_unique(kLastAdded)); + } else { + changes_.push_back(std::make_unique(schema_id)); + } + + return {}; +} + +Status TableMetadataBuilder::Impl::RemoveSchemas(const std::vector& schema_ids) { + std::unordered_set schema_ids_to_remove(schema_ids.begin(), schema_ids.end()); + auto current_schema_id = metadata_.current_schema_id.value_or(Schema::kInitialSchemaId); + if (!schema_ids_to_remove.contains(current_schema_id)) { + return InvalidArgument("Cannot remove current schema: {}", current_schema_id); + } + + if (!schema_ids_to_remove.empty()) { + metadata_.schemas = metadata_.schemas | std::views::filter([&](const auto& schema) { + return !schema_ids_to_remove.contains( + schema->schema_id().value_or(Schema::kInitialSchemaId)); + }) | + std::ranges::to>>(); + changes_.push_back( + std::make_unique(std::move(schema_ids_to_remove))); + } + + return {}; +} + +Result TableMetadataBuilder::Impl::AddSchema(const Schema& schema, + int32_t new_last_column_id) { + if (new_last_column_id < metadata_.last_column_id) { + return InvalidArgument("Invalid last column ID: {} < {} (previous last column ID)", + new_last_column_id, metadata_.last_column_id); + } + + ICEBERG_RETURN_UNEXPECTED(schema.Validate(metadata_.format_version)); + + auto new_schema_id = ReuseOrCreateNewSchemaId(schema); + if (schemas_by_id_.find(new_schema_id) != schemas_by_id_.end()) { + // update last_added_schema_id if the schema was added in this set of changes (since + // it is now the last) + bool is_new_schema = + last_added_schema_id_.has_value() && + std::ranges::find_if(changes_, [new_schema_id](const auto& change) { + if (change->kind() != TableUpdate::Kind::kAddSchema) { + return false; + } + auto* add_schema = dynamic_cast(change.get()); + return add_schema->schema()->schema_id().value_or(Schema::kInitialSchemaId) == + new_schema_id; + }) != changes_.cend(); + last_added_schema_id_ = + is_new_schema ? std::make_optional(new_schema_id) : std::nullopt; + return new_schema_id; + } + + auto new_schema = std::make_shared( + std::vector(schema.fields().begin(), schema.fields().end()), + new_schema_id); + + metadata_.schemas.push_back(new_schema); + schemas_by_id_.emplace(new_schema_id, new_schema); + + changes_.push_back(std::make_unique(new_schema, new_last_column_id)); + metadata_.last_column_id = new_last_column_id; + last_added_schema_id_ = new_schema_id; + + return new_schema_id; +} + +int32_t TableMetadataBuilder::Impl::ReuseOrCreateNewSchemaId( + const Schema& new_schema) const { + // if the schema already exists, use its id; otherwise use the highest id + 1 + auto new_schema_id = metadata_.current_schema_id.value_or(Schema::kInitialSchemaId); + for (auto& schema : metadata_.schemas) { + auto schema_id = schema->schema_id().value_or(Schema::kInitialSchemaId); + if (schema->SameSchema(new_schema)) { + return schema_id; + } else if (new_schema_id <= schema_id) { + new_schema_id = schema_id + 1; + } + } + return new_schema_id; +} + +Result> TableMetadataBuilder::Impl::UpdateSpecSchema( + const Schema& schema, const PartitionSpec& partition_spec) { + // UpdateSpecSchema: Update partition spec to use the new schema + // This preserves the partition spec structure but rebinds it to the new schema + + // Copy all fields from the partition spec. IDs should not change. + std::vector fields; + fields.reserve(partition_spec.fields().size()); + int32_t last_assigned_field_id = PartitionSpec::kLegacyPartitionDataIdStart; + for (const auto& field : partition_spec.fields()) { + fields.push_back(field); + last_assigned_field_id = std::max(last_assigned_field_id, field.field_id()); + } + + // Build without validation because the schema may have changed in a way that makes + // this spec invalid. The spec should still be preserved so that older metadata can + // be interpreted. + ICEBERG_ASSIGN_OR_RAISE(auto new_partition_spec, + PartitionSpec::Make(partition_spec.spec_id(), std::move(fields), + last_assigned_field_id)); + + // Validate the new partition name against the new schema + ICEBERG_RETURN_UNEXPECTED(new_partition_spec->ValidatePartitionName(schema)); + return new_partition_spec; +} + +Result> TableMetadataBuilder::Impl::UpdateSortOrderSchema( + const Schema& schema, const SortOrder& sort_order) { + // Build without validation because the schema may have changed in a way that makes + // this order invalid. The order should still be preserved so that older metadata can + // be interpreted. + auto fields = sort_order.fields(); + std::vector new_fields{fields.begin(), fields.end()}; + return SortOrder::Make(sort_order.order_id(), std::move(new_fields)); +} + +Result> TableMetadataBuilder::Impl::Build() { // 1. Validate metadata consistency through TableMetadata#Validate // 2. Update last_updated_ms if there are changes @@ -691,6 +883,25 @@ std::unique_ptr TableMetadataBuilder::Impl::Build() { std::chrono::system_clock::now().time_since_epoch())}; } + auto current_schema_id = metadata_.current_schema_id.value_or(Schema::kInitialSchemaId); + auto schema_it = schemas_by_id_.find(current_schema_id); + ICEBERG_PRECHECK(schema_it != schemas_by_id_.end(), + "Current schema ID {} not found in schemas", current_schema_id); + const auto& current_schema = schema_it->second; + { + auto spec_it = specs_by_id_.find(metadata_.default_spec_id); + // FIXME(GuoTao.yu): Default spec must exist after we support update partition spec + if (spec_it != specs_by_id_.end()) { + ICEBERG_RETURN_UNEXPECTED( + spec_it->second->Validate(*current_schema, /*allow_missing_fields=*/false)); + } + auto sort_order_it = sort_orders_by_id_.find(metadata_.default_sort_order_id); + ICEBERG_PRECHECK(sort_order_it != sort_orders_by_id_.end(), + "Default sort order ID {} not found in sort orders", + metadata_.default_sort_order_id); + ICEBERG_RETURN_UNEXPECTED(sort_order_it->second->Validate(*current_schema)); + } + // 3. Buildup metadata_log from base metadata int32_t max_metadata_log_size = metadata_.properties.Get(TableProperties::kMetadataPreviousVersionsMax); @@ -796,16 +1007,22 @@ TableMetadataBuilder& TableMetadataBuilder::UpgradeFormatVersion( } TableMetadataBuilder& TableMetadataBuilder::SetCurrentSchema( - std::shared_ptr schema, int32_t new_last_column_id) { - throw IcebergError(std::format("{} not implemented", __FUNCTION__)); + std::shared_ptr const& schema, int32_t new_last_column_id) { + ICEBERG_BUILDER_ASSIGN_OR_RETURN(auto schema_id, + impl_->AddSchema(*schema, new_last_column_id)); + return SetCurrentSchema(schema_id); } TableMetadataBuilder& TableMetadataBuilder::SetCurrentSchema(int32_t schema_id) { - throw IcebergError(std::format("{} not implemented", __FUNCTION__)); + ICEBERG_BUILDER_RETURN_IF_ERROR(impl_->SetCurrentSchema(schema_id)); + return *this; } -TableMetadataBuilder& TableMetadataBuilder::AddSchema(std::shared_ptr schema) { - throw IcebergError(std::format("{} not implemented", __FUNCTION__)); +TableMetadataBuilder& TableMetadataBuilder::AddSchema( + std::shared_ptr const& schema) { + ICEBERG_BUILDER_ASSIGN_OR_RETURN(auto highest_field_id, schema->HighestFieldId()); + impl_->AddSchema(*schema, std::max(impl_->metadata().last_column_id, highest_field_id)); + return *this; } TableMetadataBuilder& TableMetadataBuilder::SetDefaultPartitionSpec( @@ -832,7 +1049,8 @@ TableMetadataBuilder& TableMetadataBuilder::RemovePartitionSpecs( TableMetadataBuilder& TableMetadataBuilder::RemoveSchemas( const std::vector& schema_ids) { - throw IcebergError(std::format("{} not implemented", __FUNCTION__)); + ICEBERG_BUILDER_RETURN_IF_ERROR(impl_->RemoveSchemas(schema_ids)); + return *this; } TableMetadataBuilder& TableMetadataBuilder::SetDefaultSortOrder( diff --git a/src/iceberg/table_metadata.h b/src/iceberg/table_metadata.h index daaada6e6..ddb97322f 100644 --- a/src/iceberg/table_metadata.h +++ b/src/iceberg/table_metadata.h @@ -73,10 +73,13 @@ struct ICEBERG_EXPORT TableMetadata { static constexpr int8_t kDefaultTableFormatVersion = 2; static constexpr int8_t kSupportedTableFormatVersion = 3; static constexpr int8_t kMinFormatVersionRowLineage = 3; + static constexpr int8_t kMinFormatVersionDefaultValues = 3; static constexpr int64_t kInitialSequenceNumber = 0; static constexpr int64_t kInvalidSequenceNumber = -1; static constexpr int64_t kInitialRowId = 0; + static inline const std::unordered_map kMinFormatVersions = {}; + /// An integer version number for the format int8_t format_version; /// A UUID that identifies the table @@ -187,7 +190,7 @@ ICEBERG_EXPORT std::string ToString(const MetadataLogEntry& entry); /// This builder provides a fluent interface for creating and modifying table metadata. /// It supports both creating new tables and building from existing metadata. /// -/// Each modification method generates a corresponding MetadataUpdate that is tracked +/// Each modification method generates a corresponding TableUpdate that is tracked /// in a changes list. This allows the builder to maintain a complete history of all /// modifications made to the table metadata, which is important for tracking table /// evolution and for serialization purposes. @@ -246,7 +249,7 @@ class ICEBERG_EXPORT TableMetadataBuilder : public ErrorCollector { /// \param schema The schema to set as current /// \param new_last_column_id The highest column ID in the schema /// \return Reference to this builder for method chaining - TableMetadataBuilder& SetCurrentSchema(std::shared_ptr schema, + TableMetadataBuilder& SetCurrentSchema(std::shared_ptr const& schema, int32_t new_last_column_id); /// \brief Set the current schema by schema ID @@ -259,7 +262,7 @@ class ICEBERG_EXPORT TableMetadataBuilder : public ErrorCollector { /// /// \param schema The schema to add /// \return Reference to this builder for method chaining - TableMetadataBuilder& AddSchema(std::shared_ptr schema); + TableMetadataBuilder& AddSchema(std::shared_ptr const& schema); /// \brief Set the default partition spec for the table /// diff --git a/src/iceberg/table_update.h b/src/iceberg/table_update.h index 71db517b8..a6bdb9e5d 100644 --- a/src/iceberg/table_update.h +++ b/src/iceberg/table_update.h @@ -216,10 +216,10 @@ class ICEBERG_EXPORT RemovePartitionSpecs : public TableUpdate { /// \brief Represents removing schemas from the table class ICEBERG_EXPORT RemoveSchemas : public TableUpdate { public: - explicit RemoveSchemas(std::vector schema_ids) + explicit RemoveSchemas(std::unordered_set schema_ids) : schema_ids_(std::move(schema_ids)) {} - const std::vector& schema_ids() const { return schema_ids_; } + const std::unordered_set& schema_ids() const { return schema_ids_; } void ApplyTo(TableMetadataBuilder& builder) const override; @@ -228,7 +228,7 @@ class ICEBERG_EXPORT RemoveSchemas : public TableUpdate { Kind kind() const override { return Kind::kRemoveSchemas; } private: - std::vector schema_ids_; + std::unordered_set schema_ids_; }; /// \brief Represents adding a new sort order to the table diff --git a/src/iceberg/test/table_metadata_builder_test.cc b/src/iceberg/test/table_metadata_builder_test.cc index bb9ce8c01..edbf08900 100644 --- a/src/iceberg/test/table_metadata_builder_test.cc +++ b/src/iceberg/test/table_metadata_builder_test.cc @@ -60,10 +60,11 @@ std::unique_ptr CreateBaseMetadata() { metadata->last_column_id = 3; metadata->current_schema_id = 0; metadata->schemas.push_back(CreateTestSchema()); + metadata->partition_specs.push_back(PartitionSpec::Unpartitioned()); metadata->default_spec_id = PartitionSpec::kInitialSpecId; metadata->last_partition_id = 0; metadata->current_snapshot_id = Snapshot::kInvalidSnapshotId; - metadata->default_sort_order_id = SortOrder::kInitialSortOrderId; + metadata->default_sort_order_id = SortOrder::kUnsortedOrderId; metadata->sort_orders.push_back(SortOrder::Unsorted()); metadata->next_row_id = TableMetadata::kInitialRowId; metadata->properties = TableProperties::default_properties(); @@ -77,6 +78,9 @@ TEST(TableMetadataBuilderTest, BuildFromEmpty) { auto builder = TableMetadataBuilder::BuildFromEmpty(2); ASSERT_NE(builder, nullptr); + auto schema = CreateTestSchema(); + builder->SetCurrentSchema(schema, schema->HighestFieldId().value()); + builder->SetDefaultSortOrder(SortOrder::Unsorted()); builder->AssignUUID("new-uuid-5678"); ICEBERG_UNWRAP_OR_FAIL(auto metadata, builder->Build()); @@ -85,7 +89,7 @@ TEST(TableMetadataBuilderTest, BuildFromEmpty) { EXPECT_EQ(metadata->format_version, 2); EXPECT_EQ(metadata->last_sequence_number, TableMetadata::kInitialSequenceNumber); EXPECT_EQ(metadata->default_spec_id, PartitionSpec::kInitialSpecId); - EXPECT_EQ(metadata->default_sort_order_id, SortOrder::kInitialSortOrderId); + EXPECT_EQ(metadata->default_sort_order_id, SortOrder::kUnsortedOrderId); EXPECT_EQ(metadata->current_snapshot_id, Snapshot::kInvalidSnapshotId); EXPECT_TRUE(metadata->metadata_log.empty()); } @@ -153,6 +157,9 @@ TEST(TableMetadataBuilderTest, BuildupMetadataLog) { TEST(TableMetadataBuilderTest, AssignUUID) { // Assign UUID for new table auto builder = TableMetadataBuilder::BuildFromEmpty(2); + auto schema = CreateTestSchema(); + builder->SetCurrentSchema(schema, schema->HighestFieldId().value()); + builder->SetDefaultSortOrder(SortOrder::Unsorted()); builder->AssignUUID("new-uuid-5678"); ICEBERG_UNWRAP_OR_FAIL(auto metadata, builder->Build()); EXPECT_EQ(metadata->table_uuid, "new-uuid-5678"); @@ -178,6 +185,8 @@ TEST(TableMetadataBuilderTest, AssignUUID) { // Auto-generate UUID builder = TableMetadataBuilder::BuildFromEmpty(2); + builder->SetCurrentSchema(schema, schema->HighestFieldId().value()); + builder->SetDefaultSortOrder(SortOrder::Unsorted()); builder->AssignUUID(); ICEBERG_UNWRAP_OR_FAIL(metadata, builder->Build()); EXPECT_FALSE(metadata->table_uuid.empty()); @@ -192,7 +201,8 @@ TEST(TableMetadataBuilderTest, AssignUUID) { } TEST(TableMetadataBuilderTest, SetProperties) { - auto builder = TableMetadataBuilder::BuildFromEmpty(2); + auto base = CreateBaseMetadata(); + auto builder = TableMetadataBuilder::BuildFrom(base.get()); builder->SetProperties({{"key1", "value1"}, {"key2", "value2"}}); ICEBERG_UNWRAP_OR_FAIL(auto metadata, builder->Build()); @@ -201,7 +211,7 @@ TEST(TableMetadataBuilderTest, SetProperties) { EXPECT_EQ(metadata->properties.configs().at("key2"), "value2"); // Update existing property and add new one - builder = TableMetadataBuilder::BuildFromEmpty(2); + builder = TableMetadataBuilder::BuildFrom(base.get()); builder->SetProperties({{"key1", "value1"}}); builder->SetProperties({{"key1", "new_value1"}, {"key3", "value3"}}); @@ -212,7 +222,8 @@ TEST(TableMetadataBuilderTest, SetProperties) { } TEST(TableMetadataBuilderTest, RemoveProperties) { - auto builder = TableMetadataBuilder::BuildFromEmpty(2); + auto base = CreateBaseMetadata(); + auto builder = TableMetadataBuilder::BuildFrom(base.get()); builder->SetProperties({{"key1", "value1"}, {"key2", "value2"}, {"key3", "value3"}}); builder->RemoveProperties({"key2", "key4"}); // key4 does not exist @@ -224,6 +235,9 @@ TEST(TableMetadataBuilderTest, RemoveProperties) { TEST(TableMetadataBuilderTest, UpgradeFormatVersion) { auto builder = TableMetadataBuilder::BuildFromEmpty(1); + auto schema = CreateTestSchema(); + builder->SetCurrentSchema(schema, schema->HighestFieldId().value()); + builder->SetDefaultSortOrder(SortOrder::Unsorted()); builder->UpgradeFormatVersion(2); ICEBERG_UNWRAP_OR_FAIL(auto metadata, builder->Build()); @@ -386,4 +400,149 @@ TEST(TableMetadataBuilderTest, SetDefaultSortOrderInvalid) { ASSERT_THAT(builder->Build(), HasErrorMessage("no sort order has been added")); } +// Test AddSchema +TEST(TableMetadataBuilderTest, AddSchemaBasic) { + auto base = CreateBaseMetadata(); + auto builder = TableMetadataBuilder::BuildFrom(base.get()); + + // 1. Add a new schema + auto field1 = SchemaField::MakeRequired(4, "new_field1", int64()); + auto field2 = SchemaField::MakeRequired(5, "new_field2", float64()); + auto new_schema = std::make_shared(std::vector{field1, field2}, 1); + builder->AddSchema(new_schema); + ICEBERG_UNWRAP_OR_FAIL(auto metadata, builder->Build()); + ASSERT_EQ(metadata->schemas.size(), 2); + EXPECT_EQ(metadata->schemas[1]->schema_id().value(), 1); + EXPECT_EQ(metadata->last_column_id, 5); + + // 2. Add duplicate schema - should be idempotent + builder = TableMetadataBuilder::BuildFrom(base.get()); + auto schema1 = std::make_shared(std::vector{field1, field2}, 1); + auto schema2 = std::make_shared(std::vector{field1, field2}, 2); + builder->AddSchema(schema1); + builder->AddSchema(schema2); // Same fields, should reuse ID + ICEBERG_UNWRAP_OR_FAIL(metadata, builder->Build()); + ASSERT_EQ(metadata->schemas.size(), 2); // Only one new schema added + + // 3. Add multiple different schemas + builder = TableMetadataBuilder::BuildFrom(base.get()); + auto field3 = SchemaField::MakeRequired(6, "field3", string()); + auto schema3 = std::make_shared(std::vector{field1, field2}, 1); + auto schema4 = std::make_shared(std::vector{field1, field3}, 2); + builder->AddSchema(schema3); + builder->AddSchema(schema4); + ICEBERG_UNWRAP_OR_FAIL(metadata, builder->Build()); + ASSERT_EQ(metadata->schemas.size(), 3); + EXPECT_EQ(metadata->schemas[1]->schema_id().value(), 1); + EXPECT_EQ(metadata->schemas[2]->schema_id().value(), 2); + EXPECT_EQ(metadata->last_column_id, 6); +} + +TEST(TableMetadataBuilderTest, AddSchemaInvalid) { + auto base = CreateBaseMetadata(); + + auto builder = TableMetadataBuilder::BuildFrom(base.get()); + auto field_low_id = SchemaField::MakeRequired(1, "low_id", int32()); + auto schema_low_id = + std::make_shared(std::vector{field_low_id}, 1); + // Manually try to set a lower last_column_id via SetCurrentSchema + // This is tested indirectly through AddSchemaInternal validation +} + +// Test SetCurrentSchema +TEST(TableMetadataBuilderTest, SetCurrentSchemaBasic) { + auto base = CreateBaseMetadata(); + auto builder = TableMetadataBuilder::BuildFrom(base.get()); + + // 1. Set current schema by Schema object + auto field1 = SchemaField::MakeRequired(4, "new_field", int64()); + auto new_schema = std::make_shared(std::vector{field1}, 1); + builder->SetCurrentSchema(new_schema, 4); + ICEBERG_UNWRAP_OR_FAIL(auto metadata, builder->Build()); + ASSERT_EQ(metadata->schemas.size(), 2); + EXPECT_EQ(metadata->current_schema_id.value(), 1); + EXPECT_EQ(metadata->schemas[1]->schema_id().value(), 1); + + // 2. Set current schema by schema ID + builder = TableMetadataBuilder::BuildFrom(base.get()); + auto schema1 = std::make_shared(std::vector{field1}, 1); + builder->AddSchema(schema1); + builder->SetCurrentSchema(1); + ICEBERG_UNWRAP_OR_FAIL(metadata, builder->Build()); + EXPECT_EQ(metadata->current_schema_id.value(), 1); + + // 3. Set current schema using -1 (last added) + builder = TableMetadataBuilder::BuildFrom(base.get()); + auto field2 = SchemaField::MakeRequired(5, "another_field", float64()); + auto schema2 = std::make_shared(std::vector{field2}, 2); + builder->AddSchema(schema2); + builder->SetCurrentSchema(-1); // Use last added + ICEBERG_UNWRAP_OR_FAIL(metadata, builder->Build()); + EXPECT_EQ(metadata->current_schema_id.value(), 1); + + // 4. Setting same schema is no-op + builder = TableMetadataBuilder::BuildFrom(base.get()); + builder->SetCurrentSchema(0); + ICEBERG_UNWRAP_OR_FAIL(metadata, builder->Build()); + EXPECT_EQ(metadata->current_schema_id.value(), 0); +} + +TEST(TableMetadataBuilderTest, SetCurrentSchemaInvalid) { + auto base = CreateBaseMetadata(); + + // 1. Try to use -1 (last added) when no schema has been added + auto builder = TableMetadataBuilder::BuildFrom(base.get()); + builder->SetCurrentSchema(-1); + ASSERT_THAT(builder->Build(), IsError(ErrorKind::kValidationFailed)); + ASSERT_THAT(builder->Build(), HasErrorMessage("no schema has been added")); + + // 2. Try to set non-existent schema ID + builder = TableMetadataBuilder::BuildFrom(base.get()); + builder->SetCurrentSchema(999); + ASSERT_THAT(builder->Build(), IsError(ErrorKind::kValidationFailed)); + ASSERT_THAT(builder->Build(), HasErrorMessage("unknown schema: 999")); +} + +// Test schema evolution: SetCurrentSchema should rebuild partition specs and sort orders +TEST(TableMetadataBuilderTest, SetCurrentSchemaRebuildsSpecsAndOrders) { + auto base = CreateBaseMetadata(); + + // Add a partition spec to the base metadata + auto schema = CreateTestSchema(); + ICEBERG_UNWRAP_OR_FAIL( + auto spec, + PartitionSpec::Make(PartitionSpec::kInitialSpecId, + {PartitionField(1, 1000, "id_bucket", Transform::Bucket(16))}, + 1000)); + base->partition_specs.push_back(std::move(spec)); + + // Add a sort order to the base metadata + SortField sort_field(1, Transform::Identity(), SortDirection::kAscending, + NullOrder::kFirst); + ICEBERG_UNWRAP_OR_FAIL(auto order, + SortOrder::Make(*schema, 1, std::vector{sort_field})); + base->sort_orders.push_back(std::move(order)); + base->default_sort_order_id = 1; + + auto builder = TableMetadataBuilder::BuildFrom(base.get()); + + // Add and set a new schema + std::vector new_fields{schema->fields().begin(), schema->fields().end()}; + new_fields.push_back(SchemaField::MakeRequired(4, "new_id", int64())); + new_fields.push_back(SchemaField::MakeRequired(5, "new_data", string())); + auto new_schema = std::make_shared(std::move(new_fields), 1); + builder->SetCurrentSchema(new_schema, 5); + + ICEBERG_UNWRAP_OR_FAIL(auto metadata, builder->Build()); + + // Verify schema was set + EXPECT_EQ(metadata->current_schema_id.value(), 1); + + // Verify partition specs were rebuilt (they should still exist) + ASSERT_EQ(metadata->partition_specs.size(), 2); + + // Verify sort orders were rebuilt (they should still exist) + ASSERT_EQ(metadata->sort_orders.size(), 2); +} + } // namespace iceberg diff --git a/src/iceberg/test/table_requirements_test.cc b/src/iceberg/test/table_requirements_test.cc index 3278eb883..bbbcc681e 100644 --- a/src/iceberg/test/table_requirements_test.cc +++ b/src/iceberg/test/table_requirements_test.cc @@ -623,7 +623,8 @@ TEST(TableRequirementsTest, RemoveSchemas) { metadata->current_schema_id = 3; std::vector> updates; - updates.push_back(std::make_unique(std::vector{1, 2})); + updates.push_back( + std::make_unique(std::unordered_set{1, 2})); auto result = TableRequirements::ForUpdateTable(*metadata, updates); ASSERT_THAT(result, IsOk()); @@ -652,7 +653,8 @@ TEST(TableRequirementsTest, RemoveSchemasWithBranch) { AddBranch(*metadata, "branch", 42); std::vector> updates; - updates.push_back(std::make_unique(std::vector{1, 2})); + updates.push_back( + std::make_unique(std::unordered_set{1, 2})); auto result = TableRequirements::ForUpdateTable(*metadata, updates); ASSERT_THAT(result, IsOk()); @@ -675,7 +677,8 @@ TEST(TableRequirementsTest, RemoveSchemasWithSchemaChangedFailure) { metadata->current_schema_id = 3; std::vector> updates; - updates.push_back(std::make_unique(std::vector{1, 2})); + updates.push_back( + std::make_unique(std::unordered_set{1, 2})); auto result = TableRequirements::ForUpdateTable(*metadata, updates); ASSERT_THAT(result, IsOk()); @@ -703,7 +706,8 @@ TEST(TableRequirementsTest, RemoveSchemasWithBranchChangedFailure) { AddBranch(*metadata, "test", 42); std::vector> updates; - updates.push_back(std::make_unique(std::vector{1, 2})); + updates.push_back( + std::make_unique(std::unordered_set{1, 2})); auto result = TableRequirements::ForUpdateTable(*metadata, updates); ASSERT_THAT(result, IsOk()); @@ -1074,7 +1078,8 @@ TEST(TableRequirementsTest, ReplaceTableDoesNotAddBranchRequirements) { AddBranch(*metadata, "branch", 42); std::vector> updates; - updates.push_back(std::make_unique(std::vector{1, 2})); + updates.push_back( + std::make_unique(std::unordered_set{1, 2})); auto result = TableRequirements::ForReplaceTable(*metadata, updates); ASSERT_THAT(result, IsOk()); diff --git a/src/iceberg/test/table_update_test.cc b/src/iceberg/test/table_update_test.cc index 041cfcd23..44a37f98d 100644 --- a/src/iceberg/test/table_update_test.cc +++ b/src/iceberg/test/table_update_test.cc @@ -71,11 +71,12 @@ std::unique_ptr CreateBaseMetadata() { metadata->last_column_id = 3; metadata->current_schema_id = 0; metadata->schemas.push_back(CreateTestSchema()); + metadata->partition_specs.push_back(PartitionSpec::Unpartitioned()); metadata->default_spec_id = PartitionSpec::kInitialSpecId; metadata->last_partition_id = 0; metadata->current_snapshot_id = Snapshot::kInvalidSnapshotId; - metadata->default_sort_order_id = SortOrder::kInitialSortOrderId; metadata->sort_orders.push_back(SortOrder::Unsorted()); + metadata->default_sort_order_id = SortOrder::kUnsortedOrderId; metadata->next_row_id = TableMetadata::kInitialRowId; return metadata; } @@ -286,7 +287,8 @@ INSTANTIATE_TEST_SUITE_P( .test_name = "RemoveSchemas", .update_factory = [] { - return std::make_unique(std::vector{1}); + return std::make_unique( + std::unordered_set{1}); }, .expected_existing_table_count = 1, .validator = From 059f346df8971846e013ace107d22b6b14cfd959 Mon Sep 17 00:00:00 2001 From: Guotao Yu Date: Wed, 24 Dec 2025 21:02:38 +0800 Subject: [PATCH 2/9] Place the implementation of private functions together --- src/iceberg/table_metadata.cc | 108 +++++++++++++++------------------- 1 file changed, 47 insertions(+), 61 deletions(-) diff --git a/src/iceberg/table_metadata.cc b/src/iceberg/table_metadata.cc index 89c2625fc..8543caeca 100644 --- a/src/iceberg/table_metadata.cc +++ b/src/iceberg/table_metadata.cc @@ -822,57 +822,6 @@ Result TableMetadataBuilder::Impl::AddSchema(const Schema& schema, return new_schema_id; } -int32_t TableMetadataBuilder::Impl::ReuseOrCreateNewSchemaId( - const Schema& new_schema) const { - // if the schema already exists, use its id; otherwise use the highest id + 1 - auto new_schema_id = metadata_.current_schema_id.value_or(Schema::kInitialSchemaId); - for (auto& schema : metadata_.schemas) { - auto schema_id = schema->schema_id().value_or(Schema::kInitialSchemaId); - if (schema->SameSchema(new_schema)) { - return schema_id; - } else if (new_schema_id <= schema_id) { - new_schema_id = schema_id + 1; - } - } - return new_schema_id; -} - -Result> TableMetadataBuilder::Impl::UpdateSpecSchema( - const Schema& schema, const PartitionSpec& partition_spec) { - // UpdateSpecSchema: Update partition spec to use the new schema - // This preserves the partition spec structure but rebinds it to the new schema - - // Copy all fields from the partition spec. IDs should not change. - std::vector fields; - fields.reserve(partition_spec.fields().size()); - int32_t last_assigned_field_id = PartitionSpec::kLegacyPartitionDataIdStart; - for (const auto& field : partition_spec.fields()) { - fields.push_back(field); - last_assigned_field_id = std::max(last_assigned_field_id, field.field_id()); - } - - // Build without validation because the schema may have changed in a way that makes - // this spec invalid. The spec should still be preserved so that older metadata can - // be interpreted. - ICEBERG_ASSIGN_OR_RAISE(auto new_partition_spec, - PartitionSpec::Make(partition_spec.spec_id(), std::move(fields), - last_assigned_field_id)); - - // Validate the new partition name against the new schema - ICEBERG_RETURN_UNEXPECTED(new_partition_spec->ValidatePartitionName(schema)); - return new_partition_spec; -} - -Result> TableMetadataBuilder::Impl::UpdateSortOrderSchema( - const Schema& schema, const SortOrder& sort_order) { - // Build without validation because the schema may have changed in a way that makes - // this order invalid. The order should still be preserved so that older metadata can - // be interpreted. - auto fields = sort_order.fields(); - std::vector new_fields{fields.begin(), fields.end()}; - return SortOrder::Make(sort_order.order_id(), std::move(new_fields)); -} - Result> TableMetadataBuilder::Impl::Build() { // 1. Validate metadata consistency through TableMetadata#Validate @@ -937,18 +886,55 @@ int32_t TableMetadataBuilder::Impl::ReuseOrCreateNewSortOrderId( return new_order_id; } -int32_t TableMetadataBuilder::Impl::ReuseOrCreateNewPartitionSpecId( - const PartitionSpec& new_spec) { - // if the spec already exists, use the same ID. otherwise, use the highest ID + 1. - int32_t new_spec_id = PartitionSpec::kInitialSpecId; - for (const auto& spec : metadata_.partition_specs) { - if (new_spec.CompatibleWith(*spec)) { - return spec->spec_id(); - } else if (new_spec_id <= spec->spec_id()) { - new_spec_id = spec->spec_id() + 1; +int32_t TableMetadataBuilder::Impl::ReuseOrCreateNewSchemaId( + const Schema& new_schema) const { + // if the schema already exists, use its id; otherwise use the highest id + 1 + auto new_schema_id = metadata_.current_schema_id.value_or(Schema::kInitialSchemaId); + for (auto& schema : metadata_.schemas) { + auto schema_id = schema->schema_id().value_or(Schema::kInitialSchemaId); + if (schema->SameSchema(new_schema)) { + return schema_id; + } else if (new_schema_id <= schema_id) { + new_schema_id = schema_id + 1; } } - return new_spec_id; + return new_schema_id; +} + +Result> TableMetadataBuilder::Impl::UpdateSpecSchema( + const Schema& schema, const PartitionSpec& partition_spec) { + // UpdateSpecSchema: Update partition spec to use the new schema + // This preserves the partition spec structure but rebinds it to the new schema + + // Copy all fields from the partition spec. IDs should not change. + std::vector fields; + fields.reserve(partition_spec.fields().size()); + int32_t last_assigned_field_id = PartitionSpec::kLegacyPartitionDataIdStart; + for (const auto& field : partition_spec.fields()) { + fields.push_back(field); + last_assigned_field_id = std::max(last_assigned_field_id, field.field_id()); + } + + // Build without validation because the schema may have changed in a way that makes + // this spec invalid. The spec should still be preserved so that older metadata can + // be interpreted. + ICEBERG_ASSIGN_OR_RAISE(auto new_partition_spec, + PartitionSpec::Make(partition_spec.spec_id(), std::move(fields), + last_assigned_field_id)); + + // Validate the new partition name against the new schema + ICEBERG_RETURN_UNEXPECTED(new_partition_spec->ValidatePartitionName(schema)); + return new_partition_spec; +} + +Result> TableMetadataBuilder::Impl::UpdateSortOrderSchema( + const Schema& schema, const SortOrder& sort_order) { + // Build without validation because the schema may have changed in a way that makes + // this order invalid. The order should still be preserved so that older metadata can + // be interpreted. + auto fields = sort_order.fields(); + std::vector new_fields{fields.begin(), fields.end()}; + return SortOrder::Make(sort_order.order_id(), std::move(new_fields)); } TableMetadataBuilder::TableMetadataBuilder(int8_t format_version) From 0ef9e1e647907e40d13ee867edb4859fddaa81fd Mon Sep 17 00:00:00 2001 From: Guotao Yu Date: Wed, 24 Dec 2025 21:14:24 +0800 Subject: [PATCH 3/9] fix clang format --- src/iceberg/schema.h | 1 - 1 file changed, 1 deletion(-) diff --git a/src/iceberg/schema.h b/src/iceberg/schema.h index a2442e73e..bdc3e880a 100644 --- a/src/iceberg/schema.h +++ b/src/iceberg/schema.h @@ -122,7 +122,6 @@ class ICEBERG_EXPORT Schema : public StructType { Result> Project( const std::unordered_set& field_ids) const; - /// \brief Return the field IDs of the identifier fields. const std::vector& IdentifierFieldIds() const; From bf436620477378ed27979e4a4b6ff548b4ab8ed5 Mon Sep 17 00:00:00 2001 From: Guotao Yu Date: Thu, 25 Dec 2025 10:01:04 +0800 Subject: [PATCH 4/9] fix test failure in arrow test --- src/iceberg/table_metadata.cc | 4 +++- src/iceberg/test/metadata_io_test.cc | 2 ++ 2 files changed, 5 insertions(+), 1 deletion(-) diff --git a/src/iceberg/table_metadata.cc b/src/iceberg/table_metadata.cc index 8543caeca..f66e2183d 100644 --- a/src/iceberg/table_metadata.cc +++ b/src/iceberg/table_metadata.cc @@ -1007,7 +1007,9 @@ TableMetadataBuilder& TableMetadataBuilder::SetCurrentSchema(int32_t schema_id) TableMetadataBuilder& TableMetadataBuilder::AddSchema( std::shared_ptr const& schema) { ICEBERG_BUILDER_ASSIGN_OR_RETURN(auto highest_field_id, schema->HighestFieldId()); - impl_->AddSchema(*schema, std::max(impl_->metadata().last_column_id, highest_field_id)); + auto new_last_column_id = std::max(impl_->metadata().last_column_id, highest_field_id); + ICEBERG_BUILDER_ASSIGN_OR_RETURN(auto schema_id, + impl_->AddSchema(*schema, new_last_column_id)); return *this; } diff --git a/src/iceberg/test/metadata_io_test.cc b/src/iceberg/test/metadata_io_test.cc index ea4555d3d..accaa6b6a 100644 --- a/src/iceberg/test/metadata_io_test.cc +++ b/src/iceberg/test/metadata_io_test.cc @@ -33,6 +33,7 @@ #include "iceberg/result.h" #include "iceberg/schema.h" #include "iceberg/snapshot.h" +#include "iceberg/sort_order.h" #include "iceberg/table_metadata.h" #include "iceberg/table_properties.h" #include "iceberg/test/matchers.h" @@ -75,6 +76,7 @@ class MetadataIOTest : public TempFileTestBase { .manifest_list = "s3://a/b/1.avro", .summary = {{"operation", "append"}}, })}, + .sort_orders = {SortOrder::Unsorted()}, .default_sort_order_id = 0, .next_row_id = 0}; } From 14c3b0cd3d4b837547f08f5b08c1e323a9cd5511 Mon Sep 17 00:00:00 2001 From: Guotao Yu Date: Thu, 25 Dec 2025 16:21:17 +0800 Subject: [PATCH 5/9] resolve some comments --- src/iceberg/partition_spec.cc | 5 ++-- src/iceberg/schema.cc | 32 ++++++++++++++----------- src/iceberg/schema.h | 3 +++ src/iceberg/table_metadata.cc | 45 ++++++++++++++++------------------- src/iceberg/table_metadata.h | 2 +- src/iceberg/table_update.cc | 6 ++--- 6 files changed, 48 insertions(+), 45 deletions(-) diff --git a/src/iceberg/partition_spec.cc b/src/iceberg/partition_spec.cc index fd50a0226..080ed4b6a 100644 --- a/src/iceberg/partition_spec.cc +++ b/src/iceberg/partition_spec.cc @@ -181,9 +181,8 @@ Status PartitionSpec::ValidatePartitionName(const Schema& schema) const { std::unordered_set partition_names; for (const auto& partition_field : fields_) { auto name = std::string(partition_field.name()); - if (name.empty()) { - return InvalidArgument("Cannot use empty partition name: {}", name); - } + ICEBERG_PRECHECK(!name.empty(), "Cannot use empty partition name: {}", name); + if (partition_names.contains(name)) { return InvalidArgument("Cannot use partition name more than once: {}", name); } diff --git a/src/iceberg/schema.cc b/src/iceberg/schema.cc index 2a86bdac3..bb888633e 100644 --- a/src/iceberg/schema.cc +++ b/src/iceberg/schema.cc @@ -25,12 +25,12 @@ #include "iceberg/result.h" #include "iceberg/row/struct_like.h" #include "iceberg/schema_internal.h" +#include "iceberg/table_metadata.h" #include "iceberg/type.h" #include "iceberg/util/formatter.h" // IWYU pragma: keep #include "iceberg/util/macros.h" #include "iceberg/util/type_util.h" #include "iceberg/util/visit_type.h" -#include "table_metadata.h" namespace iceberg { @@ -148,6 +148,20 @@ Result>> Schema::InitIdToPositio return visitor.Finish(); } +Result Schema::InitHighestFieldId(const Schema& self) { + ICEBERG_ASSIGN_OR_RAISE(auto id_to_field, self.id_to_field_.Get(self)); + + if (id_to_field.get().empty()) { + return kInitialColumnId; + } + + auto max_it = std::ranges::max_element( + id_to_field.get(), + [](const auto& lhs, const auto& rhs) { return lhs.first < rhs.first; }); + + return max_it->first; +} + Result> Schema::GetAccessorById( int32_t field_id) const { ICEBERG_ASSIGN_OR_RAISE(auto id_to_position_path, id_to_position_path_.Get(*this)); @@ -229,22 +243,12 @@ Result> Schema::IdentifierFieldNames() const { return names; } -Result Schema::HighestFieldId() const { - ICEBERG_ASSIGN_OR_RAISE(auto id_to_field, id_to_field_.Get(*this)); - - if (id_to_field.get().empty()) { - return kInitialColumnId; - } +Result Schema::HighestFieldId() const { return highest_field_id_.Get(*this); } - auto max_it = std::ranges::max_element( - id_to_field.get(), - [](const auto& lhs, const auto& rhs) { return lhs.first < rhs.first; }); - - return max_it->first; +bool Schema::SameSchema(const Schema& other) const { + return fields_ == other.fields_ && identifier_field_ids_ == other.identifier_field_ids_; } -bool Schema::SameSchema(const Schema& other) const { return fields_ == other.fields_; } - Status Schema::Validate(int32_t format_version) const { // Get all fields including nested ones ICEBERG_ASSIGN_OR_RAISE(auto id_to_field, id_to_field_.Get(*this)); diff --git a/src/iceberg/schema.h b/src/iceberg/schema.h index bdc3e880a..08ad5bc61 100644 --- a/src/iceberg/schema.h +++ b/src/iceberg/schema.h @@ -173,6 +173,7 @@ class ICEBERG_EXPORT Schema : public StructType { InitLowerCaseNameToIdMap(const Schema&); static Result>> InitIdToPositionPath( const Schema&); + static Result InitHighestFieldId(const Schema&); const std::optional schema_id_; /// Field IDs that uniquely identify rows in the table. @@ -185,6 +186,8 @@ class ICEBERG_EXPORT Schema : public StructType { Lazy lowercase_name_to_id_; /// Mapping from field id to (nested) position path to access the field. Lazy id_to_position_path_; + /// Highest field ID in the schema. + Lazy highest_field_id_; }; } // namespace iceberg diff --git a/src/iceberg/table_metadata.cc b/src/iceberg/table_metadata.cc index f66e2183d..1b0469292 100644 --- a/src/iceberg/table_metadata.cc +++ b/src/iceberg/table_metadata.cc @@ -433,7 +433,7 @@ class TableMetadataBuilder::Impl { Status SetDefaultPartitionSpec(int32_t spec_id); Result AddPartitionSpec(const PartitionSpec& spec); Status SetCurrentSchema(int32_t schema_id); - Status RemoveSchemas(const std::vector& schema_ids); + Status RemoveSchemas(const std::unordered_set& schema_ids); Result AddSchema(const Schema& schema, int32_t new_last_column_id); Result> Build(); @@ -708,7 +708,7 @@ Status TableMetadataBuilder::Impl::RemoveProperties( Status TableMetadataBuilder::Impl::SetCurrentSchema(int32_t schema_id) { if (schema_id == kLastAdded) { if (!last_added_schema_id_.has_value()) { - return InvalidArgument("Cannot set last added schema: no schema has been added"); + return ValidationFailed("Cannot set last added schema: no schema has been added"); } return SetCurrentSchema(last_added_schema_id_.value()); } @@ -760,21 +760,19 @@ Status TableMetadataBuilder::Impl::SetCurrentSchema(int32_t schema_id) { return {}; } -Status TableMetadataBuilder::Impl::RemoveSchemas(const std::vector& schema_ids) { - std::unordered_set schema_ids_to_remove(schema_ids.begin(), schema_ids.end()); +Status TableMetadataBuilder::Impl::RemoveSchemas( + const std::unordered_set& schema_ids) { auto current_schema_id = metadata_.current_schema_id.value_or(Schema::kInitialSchemaId); - if (!schema_ids_to_remove.contains(current_schema_id)) { - return InvalidArgument("Cannot remove current schema: {}", current_schema_id); - } + ICEBERG_PRECHECK(!schema_ids.contains(current_schema_id), + "Cannot remove current schema: {}", current_schema_id); - if (!schema_ids_to_remove.empty()) { + if (!schema_ids.empty()) { metadata_.schemas = metadata_.schemas | std::views::filter([&](const auto& schema) { - return !schema_ids_to_remove.contains( + return !schema_ids.contains( schema->schema_id().value_or(Schema::kInitialSchemaId)); }) | std::ranges::to>>(); - changes_.push_back( - std::make_unique(std::move(schema_ids_to_remove))); + changes_.push_back(std::make_unique(schema_ids)); } return {}; @@ -782,35 +780,35 @@ Status TableMetadataBuilder::Impl::RemoveSchemas(const std::vector& sch Result TableMetadataBuilder::Impl::AddSchema(const Schema& schema, int32_t new_last_column_id) { - if (new_last_column_id < metadata_.last_column_id) { - return InvalidArgument("Invalid last column ID: {} < {} (previous last column ID)", - new_last_column_id, metadata_.last_column_id); - } + ICEBERG_PRECHECK(new_last_column_id >= metadata_.last_column_id, + "Invalid last column ID: {} < {} (previous last column ID)", + new_last_column_id, metadata_.last_column_id); ICEBERG_RETURN_UNEXPECTED(schema.Validate(metadata_.format_version)); auto new_schema_id = ReuseOrCreateNewSchemaId(schema); - if (schemas_by_id_.find(new_schema_id) != schemas_by_id_.end()) { + if (schemas_by_id_.find(new_schema_id) != schemas_by_id_.end() && + new_last_column_id == metadata_.last_column_id) { // update last_added_schema_id if the schema was added in this set of changes (since // it is now the last) bool is_new_schema = last_added_schema_id_.has_value() && - std::ranges::find_if(changes_, [new_schema_id](const auto& change) { + std::ranges::any_of(changes_, [new_schema_id](const auto& change) { if (change->kind() != TableUpdate::Kind::kAddSchema) { return false; } auto* add_schema = dynamic_cast(change.get()); return add_schema->schema()->schema_id().value_or(Schema::kInitialSchemaId) == new_schema_id; - }) != changes_.cend(); + }); last_added_schema_id_ = is_new_schema ? std::make_optional(new_schema_id) : std::nullopt; return new_schema_id; } - auto new_schema = std::make_shared( - std::vector(schema.fields().begin(), schema.fields().end()), - new_schema_id); + auto new_schema = + std::make_shared(schema.fields() | std::ranges::to(), + new_schema_id, schema.IdentifierFieldIds()); metadata_.schemas.push_back(new_schema); schemas_by_id_.emplace(new_schema_id, new_schema); @@ -1008,8 +1006,7 @@ TableMetadataBuilder& TableMetadataBuilder::AddSchema( std::shared_ptr const& schema) { ICEBERG_BUILDER_ASSIGN_OR_RETURN(auto highest_field_id, schema->HighestFieldId()); auto new_last_column_id = std::max(impl_->metadata().last_column_id, highest_field_id); - ICEBERG_BUILDER_ASSIGN_OR_RETURN(auto schema_id, - impl_->AddSchema(*schema, new_last_column_id)); + ICEBERG_BUILDER_RETURN_IF_ERROR(impl_->AddSchema(*schema, new_last_column_id)); return *this; } @@ -1036,7 +1033,7 @@ TableMetadataBuilder& TableMetadataBuilder::RemovePartitionSpecs( } TableMetadataBuilder& TableMetadataBuilder::RemoveSchemas( - const std::vector& schema_ids) { + const std::unordered_set& schema_ids) { ICEBERG_BUILDER_RETURN_IF_ERROR(impl_->RemoveSchemas(schema_ids)); return *this; } diff --git a/src/iceberg/table_metadata.h b/src/iceberg/table_metadata.h index ddb97322f..474d792ed 100644 --- a/src/iceberg/table_metadata.h +++ b/src/iceberg/table_metadata.h @@ -292,7 +292,7 @@ class ICEBERG_EXPORT TableMetadataBuilder : public ErrorCollector { /// /// \param schema_ids The IDs of schemas to remove /// \return Reference to this builder for method chaining - TableMetadataBuilder& RemoveSchemas(const std::vector& schema_ids); + TableMetadataBuilder& RemoveSchemas(const std::unordered_set& schema_ids); /// \brief Set the default sort order for the table /// diff --git a/src/iceberg/table_update.cc b/src/iceberg/table_update.cc index 0fc275322..9d92a16ea 100644 --- a/src/iceberg/table_update.cc +++ b/src/iceberg/table_update.cc @@ -52,7 +52,7 @@ void UpgradeFormatVersion::GenerateRequirements(TableUpdateContext& context) con // AddSchema void AddSchema::ApplyTo(TableMetadataBuilder& builder) const { - throw IcebergError(std::format("{} not implemented", __FUNCTION__)); + builder.AddSchema(schema_); } void AddSchema::GenerateRequirements(TableUpdateContext& context) const { @@ -62,7 +62,7 @@ void AddSchema::GenerateRequirements(TableUpdateContext& context) const { // SetCurrentSchema void SetCurrentSchema::ApplyTo(TableMetadataBuilder& builder) const { - throw IcebergError(std::format("{} not implemented", __FUNCTION__)); + builder.SetCurrentSchema(schema_id_); } void SetCurrentSchema::GenerateRequirements(TableUpdateContext& context) const { @@ -103,7 +103,7 @@ void RemovePartitionSpecs::GenerateRequirements(TableUpdateContext& context) con // RemoveSchemas void RemoveSchemas::ApplyTo(TableMetadataBuilder& builder) const { - throw IcebergError(std::format("{} not implemented", __FUNCTION__)); + builder.RemoveSchemas(schema_ids_); } void RemoveSchemas::GenerateRequirements(TableUpdateContext& context) const { From 16e797e0be3bbe40155b7302c1de3666280d5f05 Mon Sep 17 00:00:00 2001 From: Guotao Yu Date: Thu, 25 Dec 2025 17:28:16 +0800 Subject: [PATCH 6/9] resolve conflicts --- src/iceberg/table_metadata.cc | 26 ++++++++++++++++++++------ 1 file changed, 20 insertions(+), 6 deletions(-) diff --git a/src/iceberg/table_metadata.cc b/src/iceberg/table_metadata.cc index 1b0469292..2e03b585e 100644 --- a/src/iceberg/table_metadata.cc +++ b/src/iceberg/table_metadata.cc @@ -884,6 +884,20 @@ int32_t TableMetadataBuilder::Impl::ReuseOrCreateNewSortOrderId( return new_order_id; } +int32_t TableMetadataBuilder::Impl::ReuseOrCreateNewPartitionSpecId( + const PartitionSpec& new_spec) { + // if the spec already exists, use the same ID. otherwise, use the highest ID + 1. + int32_t new_spec_id = PartitionSpec::kInitialSpecId; + for (const auto& spec : metadata_.partition_specs) { + if (new_spec.CompatibleWith(*spec)) { + return spec->spec_id(); + } else if (new_spec_id <= spec->spec_id()) { + new_spec_id = spec->spec_id() + 1; + } + } + return new_spec_id; +} + int32_t TableMetadataBuilder::Impl::ReuseOrCreateNewSchemaId( const Schema& new_schema) const { // if the schema already exists, use its id; otherwise use the highest id + 1 @@ -913,9 +927,9 @@ Result> TableMetadataBuilder::Impl::UpdateSpecSch last_assigned_field_id = std::max(last_assigned_field_id, field.field_id()); } - // Build without validation because the schema may have changed in a way that makes - // this spec invalid. The spec should still be preserved so that older metadata can - // be interpreted. + // Build without validation because the schema may have changed in a way that + // makes this spec invalid. The spec should still be preserved so that older + // metadata can be interpreted. ICEBERG_ASSIGN_OR_RAISE(auto new_partition_spec, PartitionSpec::Make(partition_spec.spec_id(), std::move(fields), last_assigned_field_id)); @@ -927,9 +941,9 @@ Result> TableMetadataBuilder::Impl::UpdateSpecSch Result> TableMetadataBuilder::Impl::UpdateSortOrderSchema( const Schema& schema, const SortOrder& sort_order) { - // Build without validation because the schema may have changed in a way that makes - // this order invalid. The order should still be preserved so that older metadata can - // be interpreted. + // Build without validation because the schema may have changed in a way that + // makes this order invalid. The order should still be preserved so that older + // metadata can be interpreted. auto fields = sort_order.fields(); std::vector new_fields{fields.begin(), fields.end()}; return SortOrder::Make(sort_order.order_id(), std::move(new_fields)); From 482261350f69fd6a0a69fb825206823982afbd0e Mon Sep 17 00:00:00 2001 From: Guotao Yu Date: Thu, 25 Dec 2025 17:53:38 +0800 Subject: [PATCH 7/9] resolve conflicts --- src/iceberg/test/update_partition_spec_test.cc | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/src/iceberg/test/update_partition_spec_test.cc b/src/iceberg/test/update_partition_spec_test.cc index 85d9cc52a..31dbbae64 100644 --- a/src/iceberg/test/update_partition_spec_test.cc +++ b/src/iceberg/test/update_partition_spec_test.cc @@ -131,7 +131,7 @@ class UpdatePartitionSpecTest : public ::testing::TestWithParam { metadata->default_spec_id = spec->spec_id(); metadata->last_partition_id = spec->last_assigned_field_id(); metadata->current_snapshot_id = Snapshot::kInvalidSnapshotId; - metadata->default_sort_order_id = SortOrder::kInitialSortOrderId; + metadata->default_sort_order_id = SortOrder::kUnsortedOrderId; metadata->sort_orders.push_back(SortOrder::Unsorted()); metadata->next_row_id = TableMetadata::kInitialRowId; metadata->properties = TableProperties::default_properties(); From 38cc6930a546ad9c86768ba9e1e7ed2243623e83 Mon Sep 17 00:00:00 2001 From: Guotao Yu Date: Fri, 26 Dec 2025 11:18:33 +0800 Subject: [PATCH 8/9] resolve comments --- src/iceberg/partition_spec.cc | 14 ++- src/iceberg/partition_spec.h | 4 +- src/iceberg/table_metadata.cc | 119 +++++++----------- src/iceberg/table_metadata.h | 4 +- src/iceberg/test/metadata_io_test.cc | 2 + .../test/table_metadata_builder_test.cc | 15 +-- 6 files changed, 61 insertions(+), 97 deletions(-) diff --git a/src/iceberg/partition_spec.cc b/src/iceberg/partition_spec.cc index 080ed4b6a..af77d1fe0 100644 --- a/src/iceberg/partition_spec.cc +++ b/src/iceberg/partition_spec.cc @@ -131,6 +131,8 @@ bool PartitionSpec::Equals(const PartitionSpec& other) const { } Status PartitionSpec::Validate(const Schema& schema, bool allow_missing_fields) const { + ICEBERG_RETURN_UNEXPECTED(ValidatePartitionName(schema, *this)); + std::unordered_map parents = IndexParents(schema); for (const auto& partition_field : fields_) { ICEBERG_ASSIGN_OR_RAISE(auto source_field, @@ -177,9 +179,10 @@ Status PartitionSpec::Validate(const Schema& schema, bool allow_missing_fields) return {}; } -Status PartitionSpec::ValidatePartitionName(const Schema& schema) const { +Status PartitionSpec::ValidatePartitionName(const Schema& schema, + const PartitionSpec& spec) { std::unordered_set partition_names; - for (const auto& partition_field : fields_) { + for (const auto& partition_field : spec.fields()) { auto name = std::string(partition_field.name()); ICEBERG_PRECHECK(!name.empty(), "Cannot use empty partition name: {}", name); @@ -190,9 +193,10 @@ Status PartitionSpec::ValidatePartitionName(const Schema& schema) const { ICEBERG_ASSIGN_OR_RAISE(auto schema_field, schema.FindFieldByName(name)); auto transform_type = partition_field.transform()->transform_type(); - if (transform_type == TransformType::kIdentity) { - // for identity transform case we allow conflicts between partition and schema field - // name as long as they are sourced from the same schema field + if (transform_type == TransformType::kIdentity || + transform_type == TransformType::kVoid) { + // for identity/nulls transform case we allow conflicts between partition and schema + // field name as long as they are sourced from the same schema field if (schema_field.has_value() && schema_field.value().get().field_id() != partition_field.source_id()) { return InvalidArgument( diff --git a/src/iceberg/partition_spec.h b/src/iceberg/partition_spec.h index a7a5c19a3..9aa1954ab 100644 --- a/src/iceberg/partition_spec.h +++ b/src/iceberg/partition_spec.h @@ -79,14 +79,14 @@ class ICEBERG_EXPORT PartitionSpec : public util::Formattable { /// \brief Validates the partition spec against a schema. /// \param schema The schema to validate against. - /// \param allowMissingFields Whether to skip validation for partition fields whose + /// \param allow_missing_fields Whether to skip validation for partition fields whose /// source columns have been dropped from the schema. /// \return Error status if the partition spec is invalid. Status Validate(const Schema& schema, bool allow_missing_fields) const; // \brief Validates the partition field names are unique within the partition spec and // schema. - Status ValidatePartitionName(const Schema& schema) const; + static Status ValidatePartitionName(const Schema& schema, const PartitionSpec& spec); /// \brief Get the partition fields by source ID. /// \param source_id The id of the source field. diff --git a/src/iceberg/table_metadata.cc b/src/iceberg/table_metadata.cc index 2e03b585e..59518b0d5 100644 --- a/src/iceberg/table_metadata.cc +++ b/src/iceberg/table_metadata.cc @@ -457,20 +457,6 @@ class TableMetadataBuilder::Impl { /// \return The ID to use for this schema (reused if exists, new otherwise int32_t ReuseOrCreateNewSchemaId(const Schema& new_schema) const; - /// \brief Update partition spec to use a new schema - /// \param schema The new schema to bind to - /// \param partition_spec The partition spec to update - /// \return The updated partition spec bound to the new schema - static Result> UpdateSpecSchema( - const Schema& schema, const PartitionSpec& partition_spec); - - /// \brief Update sort order to use a new schema - /// \param schema The new schema to bind to - /// \param sort_order The sort order to update - /// \return The updated sort order bound to the new schema - static Result> UpdateSortOrderSchema( - const Schema& schema, const SortOrder& sort_order); - private: // Base metadata (nullptr for new tables) const TableMetadata* base_; @@ -542,7 +528,7 @@ Status TableMetadataBuilder::Impl::UpgradeFormatVersion(int8_t new_format_versio } Status TableMetadataBuilder::Impl::SetDefaultSortOrder(int32_t order_id) { - if (order_id == -1) { + if (order_id == kLastAdded) { if (!last_added_order_id_.has_value()) { return InvalidArgument( "Cannot set last added sort order: no sort order has been added"); @@ -607,7 +593,7 @@ Result TableMetadataBuilder::Impl::AddSortOrder(const SortOrder& order) } Status TableMetadataBuilder::Impl::SetDefaultPartitionSpec(int32_t spec_id) { - if (spec_id == -1) { + if (spec_id == kLastAdded) { if (!last_added_spec_id_.has_value()) { return ValidationFailed( "Cannot set last added partition spec: no partition spec has been added"); @@ -656,8 +642,7 @@ Result TableMetadataBuilder::Impl::AddPartitionSpec(const PartitionSpec ICEBERG_ASSIGN_OR_RAISE( std::shared_ptr new_spec, - PartitionSpec::Make(new_spec_id, std::vector(spec.fields().begin(), - spec.fields().end()))); + PartitionSpec::Make(new_spec_id, spec.fields() | std::ranges::to())); metadata_.last_partition_id = std::max(metadata_.last_partition_id, new_spec->last_assigned_field_id()); metadata_.partition_specs.push_back(new_spec); @@ -718,18 +703,25 @@ Status TableMetadataBuilder::Impl::SetCurrentSchema(int32_t schema_id) { } auto it = schemas_by_id_.find(schema_id); - if (it == schemas_by_id_.end()) { - return InvalidArgument("Cannot set current schema to unknown schema: {}", schema_id); - } + ICEBERG_PRECHECK(it != schemas_by_id_.end(), + "Cannot set current schema to unknown schema: {}", schema_id); const auto& schema = it->second; // Rebuild all partition specs for the new current schema std::vector> updated_specs; - for (const auto& spec : metadata_.partition_specs) { - ICEBERG_ASSIGN_OR_RAISE(auto updated_spec, UpdateSpecSchema(*schema, *spec)); + for (const auto& partition_spec : metadata_.partition_specs) { + ICEBERG_ASSIGN_OR_RAISE( + auto updated_spec, + PartitionSpec::Make(partition_spec->spec_id(), + partition_spec->fields() | std::ranges::to())); + + ICEBERG_RETURN_UNEXPECTED( + PartitionSpec::ValidatePartitionName(*schema, *updated_spec)); + updated_specs.push_back(std::move(updated_spec)); } metadata_.partition_specs = std::move(updated_specs); + specs_by_id_.clear(); for (const auto& spec : metadata_.partition_specs) { specs_by_id_.emplace(spec->spec_id(), spec); @@ -737,11 +729,15 @@ Status TableMetadataBuilder::Impl::SetCurrentSchema(int32_t schema_id) { // Rebuild all sort orders for the new current schema std::vector> updated_orders; - for (const auto& order : metadata_.sort_orders) { - ICEBERG_ASSIGN_OR_RAISE(auto updated_order, UpdateSortOrderSchema(*schema, *order)); + for (const auto& sort_order : metadata_.sort_orders) { + ICEBERG_ASSIGN_OR_RAISE( + auto updated_order, + SortOrder::Make(sort_order->order_id(), + sort_order->fields() | std::ranges::to())); updated_orders.push_back(std::move(updated_order)); } metadata_.sort_orders = std::move(updated_orders); + sort_orders_by_id_.clear(); for (const auto& order : metadata_.sort_orders) { sort_orders_by_id_.emplace(order->order_id(), order); @@ -768,8 +764,8 @@ Status TableMetadataBuilder::Impl::RemoveSchemas( if (!schema_ids.empty()) { metadata_.schemas = metadata_.schemas | std::views::filter([&](const auto& schema) { - return !schema_ids.contains( - schema->schema_id().value_or(Schema::kInitialSchemaId)); + return schema->schema_id().has_value() && + !schema_ids.contains(schema->schema_id().value()); }) | std::ranges::to>>(); changes_.push_back(std::make_unique(schema_ids)); @@ -787,8 +783,8 @@ Result TableMetadataBuilder::Impl::AddSchema(const Schema& schema, ICEBERG_RETURN_UNEXPECTED(schema.Validate(metadata_.format_version)); auto new_schema_id = ReuseOrCreateNewSchemaId(schema); - if (schemas_by_id_.find(new_schema_id) != schemas_by_id_.end() && - new_last_column_id == metadata_.last_column_id) { + auto schema_found = schemas_by_id_.contains(new_schema_id); + if (schema_found && new_last_column_id == metadata_.last_column_id) { // update last_added_schema_id if the schema was added in this set of changes (since // it is now the last) bool is_new_schema = @@ -797,24 +793,27 @@ Result TableMetadataBuilder::Impl::AddSchema(const Schema& schema, if (change->kind() != TableUpdate::Kind::kAddSchema) { return false; } - auto* add_schema = dynamic_cast(change.get()); - return add_schema->schema()->schema_id().value_or(Schema::kInitialSchemaId) == - new_schema_id; + auto* add_schema = internal::checked_cast(change.get()); + return add_schema->schema()->schema_id() == std::make_optional(new_schema_id); }); last_added_schema_id_ = is_new_schema ? std::make_optional(new_schema_id) : std::nullopt; return new_schema_id; } + metadata_.last_column_id = new_last_column_id; + auto new_schema = std::make_shared(schema.fields() | std::ranges::to(), new_schema_id, schema.IdentifierFieldIds()); - metadata_.schemas.push_back(new_schema); - schemas_by_id_.emplace(new_schema_id, new_schema); + if (!schema_found) { + metadata_.schemas.push_back(new_schema); + schemas_by_id_.emplace(new_schema_id, new_schema); + } changes_.push_back(std::make_unique(new_schema, new_last_column_id)); - metadata_.last_column_id = new_last_column_id; + last_added_schema_id_ = new_schema_id; return new_schema_id; @@ -834,14 +833,16 @@ Result> TableMetadataBuilder::Impl::Build() { auto schema_it = schemas_by_id_.find(current_schema_id); ICEBERG_PRECHECK(schema_it != schemas_by_id_.end(), "Current schema ID {} not found in schemas", current_schema_id); - const auto& current_schema = schema_it->second; { + const auto& current_schema = schema_it->second; + auto spec_it = specs_by_id_.find(metadata_.default_spec_id); - // FIXME(GuoTao.yu): Default spec must exist after we support update partition spec - if (spec_it != specs_by_id_.end()) { - ICEBERG_RETURN_UNEXPECTED( - spec_it->second->Validate(*current_schema, /*allow_missing_fields=*/false)); - } + ICEBERG_PRECHECK(spec_it != specs_by_id_.end(), + "Default spec ID {} not found in partition specs", + metadata_.default_spec_id); + ICEBERG_RETURN_UNEXPECTED( + spec_it->second->Validate(*current_schema, /*allow_missing_fields=*/false)); + auto sort_order_it = sort_orders_by_id_.find(metadata_.default_sort_order_id); ICEBERG_PRECHECK(sort_order_it != sort_orders_by_id_.end(), "Default sort order ID {} not found in sort orders", @@ -913,42 +914,6 @@ int32_t TableMetadataBuilder::Impl::ReuseOrCreateNewSchemaId( return new_schema_id; } -Result> TableMetadataBuilder::Impl::UpdateSpecSchema( - const Schema& schema, const PartitionSpec& partition_spec) { - // UpdateSpecSchema: Update partition spec to use the new schema - // This preserves the partition spec structure but rebinds it to the new schema - - // Copy all fields from the partition spec. IDs should not change. - std::vector fields; - fields.reserve(partition_spec.fields().size()); - int32_t last_assigned_field_id = PartitionSpec::kLegacyPartitionDataIdStart; - for (const auto& field : partition_spec.fields()) { - fields.push_back(field); - last_assigned_field_id = std::max(last_assigned_field_id, field.field_id()); - } - - // Build without validation because the schema may have changed in a way that - // makes this spec invalid. The spec should still be preserved so that older - // metadata can be interpreted. - ICEBERG_ASSIGN_OR_RAISE(auto new_partition_spec, - PartitionSpec::Make(partition_spec.spec_id(), std::move(fields), - last_assigned_field_id)); - - // Validate the new partition name against the new schema - ICEBERG_RETURN_UNEXPECTED(new_partition_spec->ValidatePartitionName(schema)); - return new_partition_spec; -} - -Result> TableMetadataBuilder::Impl::UpdateSortOrderSchema( - const Schema& schema, const SortOrder& sort_order) { - // Build without validation because the schema may have changed in a way that - // makes this order invalid. The order should still be preserved so that older - // metadata can be interpreted. - auto fields = sort_order.fields(); - std::vector new_fields{fields.begin(), fields.end()}; - return SortOrder::Make(sort_order.order_id(), std::move(new_fields)); -} - TableMetadataBuilder::TableMetadataBuilder(int8_t format_version) : impl_(std::make_unique(format_version)) {} diff --git a/src/iceberg/table_metadata.h b/src/iceberg/table_metadata.h index 474d792ed..3885a4582 100644 --- a/src/iceberg/table_metadata.h +++ b/src/iceberg/table_metadata.h @@ -249,7 +249,7 @@ class ICEBERG_EXPORT TableMetadataBuilder : public ErrorCollector { /// \param schema The schema to set as current /// \param new_last_column_id The highest column ID in the schema /// \return Reference to this builder for method chaining - TableMetadataBuilder& SetCurrentSchema(std::shared_ptr const& schema, + TableMetadataBuilder& SetCurrentSchema(const std::shared_ptr& schema, int32_t new_last_column_id); /// \brief Set the current schema by schema ID @@ -262,7 +262,7 @@ class ICEBERG_EXPORT TableMetadataBuilder : public ErrorCollector { /// /// \param schema The schema to add /// \return Reference to this builder for method chaining - TableMetadataBuilder& AddSchema(std::shared_ptr const& schema); + TableMetadataBuilder& AddSchema(const std::shared_ptr& schema); /// \brief Set the default partition spec for the table /// diff --git a/src/iceberg/test/metadata_io_test.cc b/src/iceberg/test/metadata_io_test.cc index accaa6b6a..7b11cf47e 100644 --- a/src/iceberg/test/metadata_io_test.cc +++ b/src/iceberg/test/metadata_io_test.cc @@ -30,6 +30,7 @@ #include "iceberg/arrow/arrow_fs_file_io_internal.h" #include "iceberg/file_io.h" #include "iceberg/json_internal.h" +#include "iceberg/partition_spec.h" #include "iceberg/result.h" #include "iceberg/schema.h" #include "iceberg/snapshot.h" @@ -65,6 +66,7 @@ class MetadataIOTest : public TempFileTestBase { .last_sequence_number = 0, .schemas = {schema}, .current_schema_id = 1, + .partition_specs = {PartitionSpec::Unpartitioned()}, .default_spec_id = 0, .last_partition_id = 0, .properties = TableProperties::FromMap({{"key", "value"}}), diff --git a/src/iceberg/test/table_metadata_builder_test.cc b/src/iceberg/test/table_metadata_builder_test.cc index edbf08900..feeba8d28 100644 --- a/src/iceberg/test/table_metadata_builder_test.cc +++ b/src/iceberg/test/table_metadata_builder_test.cc @@ -81,6 +81,7 @@ TEST(TableMetadataBuilderTest, BuildFromEmpty) { auto schema = CreateTestSchema(); builder->SetCurrentSchema(schema, schema->HighestFieldId().value()); builder->SetDefaultSortOrder(SortOrder::Unsorted()); + builder->SetDefaultPartitionSpec(PartitionSpec::Unpartitioned()); builder->AssignUUID("new-uuid-5678"); ICEBERG_UNWRAP_OR_FAIL(auto metadata, builder->Build()); @@ -160,6 +161,7 @@ TEST(TableMetadataBuilderTest, AssignUUID) { auto schema = CreateTestSchema(); builder->SetCurrentSchema(schema, schema->HighestFieldId().value()); builder->SetDefaultSortOrder(SortOrder::Unsorted()); + builder->SetDefaultPartitionSpec(PartitionSpec::Unpartitioned()); builder->AssignUUID("new-uuid-5678"); ICEBERG_UNWRAP_OR_FAIL(auto metadata, builder->Build()); EXPECT_EQ(metadata->table_uuid, "new-uuid-5678"); @@ -187,6 +189,7 @@ TEST(TableMetadataBuilderTest, AssignUUID) { builder = TableMetadataBuilder::BuildFromEmpty(2); builder->SetCurrentSchema(schema, schema->HighestFieldId().value()); builder->SetDefaultSortOrder(SortOrder::Unsorted()); + builder->SetDefaultPartitionSpec(PartitionSpec::Unpartitioned()); builder->AssignUUID(); ICEBERG_UNWRAP_OR_FAIL(metadata, builder->Build()); EXPECT_FALSE(metadata->table_uuid.empty()); @@ -238,6 +241,7 @@ TEST(TableMetadataBuilderTest, UpgradeFormatVersion) { auto schema = CreateTestSchema(); builder->SetCurrentSchema(schema, schema->HighestFieldId().value()); builder->SetDefaultSortOrder(SortOrder::Unsorted()); + builder->SetDefaultPartitionSpec(PartitionSpec::Unpartitioned()); builder->UpgradeFormatVersion(2); ICEBERG_UNWRAP_OR_FAIL(auto metadata, builder->Build()); @@ -438,17 +442,6 @@ TEST(TableMetadataBuilderTest, AddSchemaBasic) { EXPECT_EQ(metadata->last_column_id, 6); } -TEST(TableMetadataBuilderTest, AddSchemaInvalid) { - auto base = CreateBaseMetadata(); - - auto builder = TableMetadataBuilder::BuildFrom(base.get()); - auto field_low_id = SchemaField::MakeRequired(1, "low_id", int32()); - auto schema_low_id = - std::make_shared(std::vector{field_low_id}, 1); - // Manually try to set a lower last_column_id via SetCurrentSchema - // This is tested indirectly through AddSchemaInternal validation -} - // Test SetCurrentSchema TEST(TableMetadataBuilderTest, SetCurrentSchemaBasic) { auto base = CreateBaseMetadata(); From d0e70546e9b4c8a73eae31ccb62da8f42a1bcc37 Mon Sep 17 00:00:00 2001 From: Guotao Yu Date: Fri, 26 Dec 2025 12:12:59 +0800 Subject: [PATCH 9/9] add tests for remove schemas --- src/iceberg/test/partition_spec_test.cc | 12 +- src/iceberg/test/residual_evaluator_test.cc | 4 +- .../test/table_metadata_builder_test.cc | 212 +++++++++++++++++- 3 files changed, 219 insertions(+), 9 deletions(-) diff --git a/src/iceberg/test/partition_spec_test.cc b/src/iceberg/test/partition_spec_test.cc index b7a9fedd2..f9953a308 100644 --- a/src/iceberg/test/partition_spec_test.cc +++ b/src/iceberg/test/partition_spec_test.cc @@ -109,17 +109,17 @@ TEST(PartitionSpecTest, PartitionTypeTest) { "fields": [ { "source-id": 4, "field-id": 1000, - "name": "ts_day", + "name": "__ts_day", "transform": "day" }, { "source-id": 1, "field-id": 1001, - "name": "id_bucket", + "name": "__id_bucket", "transform": "bucket[16]" }, { "source-id": 2, "field-id": 1002, - "name": "id_truncate", + "name": "__id_truncate", "transform": "truncate[4]" } ] })"_json; @@ -137,9 +137,9 @@ TEST(PartitionSpecTest, PartitionTypeTest) { ICEBERG_UNWRAP_OR_FAIL(auto parsed_spec, PartitionSpecFromJson(schema, json, 1)); ICEBERG_UNWRAP_OR_FAIL(auto partition_type, parsed_spec->PartitionType(*schema)); - SchemaField pt_field1(1000, "ts_day", date(), true); - SchemaField pt_field2(1001, "id_bucket", int32(), true); - SchemaField pt_field3(1002, "id_truncate", string(), true); + SchemaField pt_field1(1000, "__ts_day", date(), true); + SchemaField pt_field2(1001, "__id_bucket", int32(), true); + SchemaField pt_field3(1002, "__id_truncate", string(), true); ASSERT_EQ(3, partition_type->fields().size()); diff --git a/src/iceberg/test/residual_evaluator_test.cc b/src/iceberg/test/residual_evaluator_test.cc index bef17d2bc..04c82e25c 100644 --- a/src/iceberg/test/residual_evaluator_test.cc +++ b/src/iceberg/test/residual_evaluator_test.cc @@ -406,7 +406,7 @@ TEST_F(ResidualEvaluatorTest, IntegerTruncateTransformResiduals) { // Valid partitions would be 0, 10, 20...90, 100 etc. auto truncate_transform = Transform::Truncate(10); - PartitionField pt_field(50, 1000, "value", truncate_transform); + PartitionField pt_field(50, 1000, "__value", truncate_transform); ICEBERG_UNWRAP_OR_FAIL(auto spec_unique, PartitionSpec::Make(*schema, 0, {pt_field}, false)); auto spec = std::shared_ptr(spec_unique.release()); @@ -523,7 +523,7 @@ TEST_F(ResidualEvaluatorTest, StringTruncateTransformResiduals) { // Valid partitions would be two letter strings for eg: ab, bc etc auto truncate_transform = Transform::Truncate(2); - PartitionField pt_field(50, 1000, "value", truncate_transform); + PartitionField pt_field(50, 1000, "__value", truncate_transform); ICEBERG_UNWRAP_OR_FAIL(auto spec_unique, PartitionSpec::Make(*schema, 0, {pt_field}, false)); auto spec = std::shared_ptr(spec_unique.release()); diff --git a/src/iceberg/test/table_metadata_builder_test.cc b/src/iceberg/test/table_metadata_builder_test.cc index feeba8d28..a912a08f5 100644 --- a/src/iceberg/test/table_metadata_builder_test.cc +++ b/src/iceberg/test/table_metadata_builder_test.cc @@ -442,12 +442,53 @@ TEST(TableMetadataBuilderTest, AddSchemaBasic) { EXPECT_EQ(metadata->last_column_id, 6); } +TEST(TableMetadataBuilderTest, AddSchemaInvalidColumnIds) { + auto base = CreateBaseMetadata(); + auto builder = TableMetadataBuilder::BuildFrom(base.get()); + + // Try to add schema with column ID lower than existing last_column_id + auto field1 = + SchemaField::MakeRequired(2, "duplicate_id", int64()); // ID 2 already exists + auto invalid_schema = std::make_shared(std::vector{field1}, 1); + builder->AddSchema(invalid_schema); + ICEBERG_UNWRAP_OR_FAIL(auto metadata, builder->Build()); + // Should still work - AddSchema automatically uses max(existing, highest_in_schema) + ASSERT_EQ(metadata->schemas.size(), 2); + EXPECT_EQ(metadata->last_column_id, 3); // Should remain 3 (from base metadata) +} + +TEST(TableMetadataBuilderTest, AddSchemaWithHigherColumnIds) { + auto base = CreateBaseMetadata(); + auto builder = TableMetadataBuilder::BuildFrom(base.get()); + + // Add schema with higher column IDs + auto field1 = SchemaField::MakeRequired(10, "high_id1", int64()); + auto field2 = SchemaField::MakeRequired(15, "high_id2", string()); + auto new_schema = std::make_shared(std::vector{field1, field2}, 1); + builder->AddSchema(new_schema); + ICEBERG_UNWRAP_OR_FAIL(auto metadata, builder->Build()); + ASSERT_EQ(metadata->schemas.size(), 2); + EXPECT_EQ(metadata->last_column_id, 15); // Should be updated to highest field ID +} + +TEST(TableMetadataBuilderTest, AddSchemaEmptyFields) { + auto base = CreateBaseMetadata(); + auto builder = TableMetadataBuilder::BuildFrom(base.get()); + + // Add schema with no fields + auto empty_schema = std::make_shared(std::vector{}, 1); + builder->AddSchema(empty_schema); + ICEBERG_UNWRAP_OR_FAIL(auto metadata, builder->Build()); + ASSERT_EQ(metadata->schemas.size(), 2); + EXPECT_EQ(metadata->last_column_id, 3); // Should remain unchanged +} + // Test SetCurrentSchema TEST(TableMetadataBuilderTest, SetCurrentSchemaBasic) { auto base = CreateBaseMetadata(); auto builder = TableMetadataBuilder::BuildFrom(base.get()); - // 1. Set current schema by Schema object + // 1. Set current schema by Schema object with explicit last_column_id auto field1 = SchemaField::MakeRequired(4, "new_field", int64()); auto new_schema = std::make_shared(std::vector{field1}, 1); builder->SetCurrentSchema(new_schema, 4); @@ -455,6 +496,7 @@ TEST(TableMetadataBuilderTest, SetCurrentSchemaBasic) { ASSERT_EQ(metadata->schemas.size(), 2); EXPECT_EQ(metadata->current_schema_id.value(), 1); EXPECT_EQ(metadata->schemas[1]->schema_id().value(), 1); + EXPECT_EQ(metadata->last_column_id, 4); // 2. Set current schema by schema ID builder = TableMetadataBuilder::BuildFrom(base.get()); @@ -480,6 +522,32 @@ TEST(TableMetadataBuilderTest, SetCurrentSchemaBasic) { EXPECT_EQ(metadata->current_schema_id.value(), 0); } +TEST(TableMetadataBuilderTest, SetCurrentSchemaWithInvalidLastColumnId) { + auto base = CreateBaseMetadata(); + auto builder = TableMetadataBuilder::BuildFrom(base.get()); + + // Try to set current schema with last_column_id lower than existing + auto field1 = SchemaField::MakeRequired(4, "new_field", int64()); + auto new_schema = std::make_shared(std::vector{field1}, 1); + builder->SetCurrentSchema(new_schema, 2); // 2 < 3 (existing last_column_id) + ASSERT_THAT(builder->Build(), IsError(ErrorKind::kValidationFailed)); + ASSERT_THAT(builder->Build(), HasErrorMessage("Invalid last column ID")); +} + +TEST(TableMetadataBuilderTest, SetCurrentSchemaUpdatesLastColumnId) { + auto base = CreateBaseMetadata(); + auto builder = TableMetadataBuilder::BuildFrom(base.get()); + + // Set current schema with higher last_column_id + auto field1 = SchemaField::MakeRequired(4, "new_field1", int64()); + auto field2 = SchemaField::MakeRequired(8, "new_field2", string()); + auto new_schema = std::make_shared(std::vector{field1, field2}, 1); + builder->SetCurrentSchema(new_schema, 10); // Higher than field IDs + ICEBERG_UNWRAP_OR_FAIL(auto metadata, builder->Build()); + EXPECT_EQ(metadata->current_schema_id.value(), 1); + EXPECT_EQ(metadata->last_column_id, 10); +} + TEST(TableMetadataBuilderTest, SetCurrentSchemaInvalid) { auto base = CreateBaseMetadata(); @@ -538,4 +606,146 @@ TEST(TableMetadataBuilderTest, SetCurrentSchemaRebuildsSpecsAndOrders) { ASSERT_EQ(metadata->sort_orders.size(), 2); } +// Test RemoveSchemas +TEST(TableMetadataBuilderTest, RemoveSchemasBasic) { + auto base = CreateBaseMetadata(); + auto builder = TableMetadataBuilder::BuildFrom(base.get()); + + // Add multiple schemas + auto field1 = SchemaField::MakeRequired(4, "field1", int64()); + auto schema1 = std::make_shared(std::vector{field1}, 1); + auto field2 = SchemaField::MakeRequired(5, "field2", float64()); + auto schema2 = std::make_shared(std::vector{field2}, 2); + auto field3 = SchemaField::MakeRequired(6, "field3", string()); + auto schema3 = std::make_shared(std::vector{field3}, 3); + + builder->AddSchema(schema1); + builder->AddSchema(schema2); + builder->AddSchema(schema3); + + ICEBERG_UNWRAP_OR_FAIL(auto metadata, builder->Build()); + ASSERT_EQ(metadata->schemas.size(), 4); // Original + 3 new + + // Remove one schema + builder = TableMetadataBuilder::BuildFrom(metadata.get()); + builder->RemoveSchemas({1}); + ICEBERG_UNWRAP_OR_FAIL(metadata, builder->Build()); + ASSERT_EQ(metadata->schemas.size(), 3); + EXPECT_EQ(metadata->schemas[0]->schema_id().value(), 0); + EXPECT_EQ(metadata->schemas[1]->schema_id().value(), 2); + EXPECT_EQ(metadata->schemas[2]->schema_id().value(), 3); + + // Remove multiple schemas + builder = TableMetadataBuilder::BuildFrom(metadata.get()); + builder->RemoveSchemas({2, 3}); + ICEBERG_UNWRAP_OR_FAIL(metadata, builder->Build()); + ASSERT_EQ(metadata->schemas.size(), 1); + EXPECT_EQ(metadata->schemas[0]->schema_id().value(), Schema::kInitialSchemaId); +} + +TEST(TableMetadataBuilderTest, RemoveSchemasCannotRemoveCurrent) { + auto base = CreateBaseMetadata(); + auto builder = TableMetadataBuilder::BuildFrom(base.get()); + + // Add a new schema + auto field1 = SchemaField::MakeRequired(4, "field1", int64()); + auto schema1 = std::make_shared(std::vector{field1}, 1); + builder->AddSchema(schema1); + + ICEBERG_UNWRAP_OR_FAIL(auto metadata, builder->Build()); + ASSERT_EQ(metadata->schemas.size(), 2); + EXPECT_EQ(metadata->current_schema_id.value(), 0); + + // Try to remove current schema (ID 0) + builder = TableMetadataBuilder::BuildFrom(metadata.get()); + builder->RemoveSchemas({0}); + ASSERT_THAT(builder->Build(), IsError(ErrorKind::kValidationFailed)); + ASSERT_THAT(builder->Build(), HasErrorMessage("Cannot remove current schema: 0")); + + // Try to remove current schema along with others + builder = TableMetadataBuilder::BuildFrom(metadata.get()); + builder->RemoveSchemas({0, 1}); + ASSERT_THAT(builder->Build(), IsError(ErrorKind::kValidationFailed)); + ASSERT_THAT(builder->Build(), HasErrorMessage("Cannot remove current schema: 0")); +} + +TEST(TableMetadataBuilderTest, RemoveSchemasNonExistent) { + auto base = CreateBaseMetadata(); + auto builder = TableMetadataBuilder::BuildFrom(base.get()); + + // Add one schema + auto field1 = SchemaField::MakeRequired(4, "field1", int64()); + auto schema1 = std::make_shared(std::vector{field1}, 1); + builder->AddSchema(schema1); + + ICEBERG_UNWRAP_OR_FAIL(auto metadata, builder->Build()); + ASSERT_EQ(metadata->schemas.size(), 2); + + // Try to remove non-existent schema - should be no-op + builder = TableMetadataBuilder::BuildFrom(metadata.get()); + builder->RemoveSchemas({999}); + ICEBERG_UNWRAP_OR_FAIL(metadata, builder->Build()); + ASSERT_EQ(metadata->schemas.size(), 2); + + // Remove mix of existent and non-existent + builder = TableMetadataBuilder::BuildFrom(metadata.get()); + builder->RemoveSchemas({1, 999, 888}); + ICEBERG_UNWRAP_OR_FAIL(metadata, builder->Build()); + ASSERT_EQ(metadata->schemas.size(), 1); + EXPECT_EQ(metadata->schemas[0]->schema_id().value(), Schema::kInitialSchemaId); +} + +TEST(TableMetadataBuilderTest, RemoveSchemasEmptySet) { + auto base = CreateBaseMetadata(); + auto builder = TableMetadataBuilder::BuildFrom(base.get()); + + // Add a schema + auto field1 = SchemaField::MakeRequired(4, "field1", int64()); + auto schema1 = std::make_shared(std::vector{field1}, 1); + builder->AddSchema(schema1); + + ICEBERG_UNWRAP_OR_FAIL(auto metadata, builder->Build()); + ASSERT_EQ(metadata->schemas.size(), 2); + + // Remove empty set - should be no-op + builder = TableMetadataBuilder::BuildFrom(metadata.get()); + builder->RemoveSchemas({}); + ICEBERG_UNWRAP_OR_FAIL(metadata, builder->Build()); + ASSERT_EQ(metadata->schemas.size(), 2); +} + +TEST(TableMetadataBuilderTest, RemoveSchemasAfterSchemaChange) { + auto base = CreateBaseMetadata(); + auto builder = TableMetadataBuilder::BuildFrom(base.get()); + + // Add multiple schemas + auto field1 = SchemaField::MakeRequired(4, "field1", int64()); + auto schema1 = std::make_shared(std::vector{field1}, 1); + auto field2 = SchemaField::MakeRequired(5, "field2", float64()); + auto schema2 = std::make_shared(std::vector{field2}, 2); + + builder->AddSchema(schema1); + builder->AddSchema(schema2); + builder->SetCurrentSchema(1); // Set schema1 as current + + ICEBERG_UNWRAP_OR_FAIL(auto metadata, builder->Build()); + ASSERT_EQ(metadata->schemas.size(), 3); + EXPECT_EQ(metadata->current_schema_id.value(), 1); + + // Now remove the old current schema (ID 0) + builder = TableMetadataBuilder::BuildFrom(metadata.get()); + builder->RemoveSchemas({0}); + ICEBERG_UNWRAP_OR_FAIL(metadata, builder->Build()); + ASSERT_EQ(metadata->schemas.size(), 2); + EXPECT_EQ(metadata->current_schema_id.value(), 1); + EXPECT_EQ(metadata->schemas[0]->schema_id().value(), 1); + EXPECT_EQ(metadata->schemas[1]->schema_id().value(), 2); + + // Cannot remove the new current schema + builder = TableMetadataBuilder::BuildFrom(metadata.get()); + builder->RemoveSchemas({1}); + ASSERT_THAT(builder->Build(), IsError(ErrorKind::kValidationFailed)); + ASSERT_THAT(builder->Build(), HasErrorMessage("Cannot remove current schema: 1")); +} + } // namespace iceberg