From f49f65421c1311f8cff603b54dfa18034597c7fa Mon Sep 17 00:00:00 2001 From: Feiyang Li Date: Fri, 26 Dec 2025 15:35:36 +0800 Subject: [PATCH] feat(rest): implement table operations part1 --- src/iceberg/catalog.h | 3 +- .../catalog/memory/in_memory_catalog.cc | 5 +- .../catalog/memory/in_memory_catalog.h | 3 +- src/iceberg/catalog/rest/http_client.cc | 18 +-- src/iceberg/catalog/rest/http_client.h | 7 +- src/iceberg/catalog/rest/rest_catalog.cc | 72 +++++++-- src/iceberg/catalog/rest/rest_catalog.h | 3 +- src/iceberg/test/mock_catalog.h | 2 +- src/iceberg/test/rest_catalog_test.cc | 137 +++++++++++++++--- 9 files changed, 200 insertions(+), 50 deletions(-) diff --git a/src/iceberg/catalog.h b/src/iceberg/catalog.h index 7be56bae1..bded0d264 100644 --- a/src/iceberg/catalog.h +++ b/src/iceberg/catalog.h @@ -179,7 +179,8 @@ class ICEBERG_EXPORT Catalog { /// \param identifier a table identifier /// \return instance of Table implementation referred to by identifier or /// ErrorKind::kNoSuchTable if the table does not exist - virtual Result> LoadTable(const TableIdentifier& identifier) = 0; + virtual Result> LoadTable( + const TableIdentifier& identifier) const = 0; /// \brief Register a table with the catalog if it does not exist /// diff --git a/src/iceberg/catalog/memory/in_memory_catalog.cc b/src/iceberg/catalog/memory/in_memory_catalog.cc index b3fd0060a..94563a088 100644 --- a/src/iceberg/catalog/memory/in_memory_catalog.cc +++ b/src/iceberg/catalog/memory/in_memory_catalog.cc @@ -466,7 +466,7 @@ Status InMemoryCatalog::RenameTable(const TableIdentifier& from, } Result> InMemoryCatalog::LoadTable( - const TableIdentifier& identifier) { + const TableIdentifier& identifier) const { if (!file_io_) [[unlikely]] { return InvalidArgument("file_io is not set for catalog {}", catalog_name_); } @@ -480,8 +480,9 @@ Result> InMemoryCatalog::LoadTable( ICEBERG_ASSIGN_OR_RAISE(auto metadata, TableMetadataUtil::Read(*file_io_, metadata_location)); + auto non_const_catalog = std::const_pointer_cast(shared_from_this()); return Table::Make(identifier, std::move(metadata), std::move(metadata_location), - file_io_, shared_from_this()); + file_io_, non_const_catalog); } Result> InMemoryCatalog::RegisterTable( diff --git a/src/iceberg/catalog/memory/in_memory_catalog.h b/src/iceberg/catalog/memory/in_memory_catalog.h index 22a596c10..95b4e870b 100644 --- a/src/iceberg/catalog/memory/in_memory_catalog.h +++ b/src/iceberg/catalog/memory/in_memory_catalog.h @@ -93,7 +93,8 @@ class ICEBERG_EXPORT InMemoryCatalog Status RenameTable(const TableIdentifier& from, const TableIdentifier& to) override; - Result> LoadTable(const TableIdentifier& identifier) override; + Result> LoadTable( + const TableIdentifier& identifier) const override; Result> RegisterTable( const TableIdentifier& identifier, diff --git a/src/iceberg/catalog/rest/http_client.cc b/src/iceberg/catalog/rest/http_client.cc index d1138b787..ce26c19e1 100644 --- a/src/iceberg/catalog/rest/http_client.cc +++ b/src/iceberg/catalog/rest/http_client.cc @@ -135,9 +135,8 @@ Status HandleFailureResponse(const cpr::Response& response, } // namespace void HttpClient::PrepareSession( - const std::string& path, - const std::unordered_map& request_headers, - const std::unordered_map& params) { + const std::string& path, const std::unordered_map& params, + const std::unordered_map& request_headers) { session_->SetUrl(cpr::Url{path}); session_->SetParameters(GetParameters(params)); session_->RemoveContent(); @@ -164,7 +163,7 @@ Result HttpClient::Get( cpr::Response response; { std::lock_guard guard(session_mutex_); - PrepareSession(path, headers, params); + PrepareSession(path, params, headers); response = session_->Get(); } @@ -181,7 +180,7 @@ Result HttpClient::Post( cpr::Response response; { std::lock_guard guard(session_mutex_); - PrepareSession(path, headers); + PrepareSession(path, /*params=*/{}, headers); session_->SetBody(cpr::Body{body}); response = session_->Post(); } @@ -206,7 +205,7 @@ Result HttpClient::PostForm( auto form_headers = headers; form_headers[kHeaderContentType] = kMimeTypeFormUrlEncoded; - PrepareSession(path, form_headers); + PrepareSession(path, /*params=*/{}, form_headers); std::vector pair_list; pair_list.reserve(form_data.size()); for (const auto& [key, val] : form_data) { @@ -229,7 +228,7 @@ Result HttpClient::Head( cpr::Response response; { std::lock_guard guard(session_mutex_); - PrepareSession(path, headers); + PrepareSession(path, /*params=*/{}, headers); response = session_->Head(); } @@ -240,12 +239,13 @@ Result HttpClient::Head( } Result HttpClient::Delete( - const std::string& path, const std::unordered_map& headers, + const std::string& path, const std::unordered_map& params, + const std::unordered_map& headers, const ErrorHandler& error_handler) { cpr::Response response; { std::lock_guard guard(session_mutex_); - PrepareSession(path, headers); + PrepareSession(path, params, headers); response = session_->Delete(); } diff --git a/src/iceberg/catalog/rest/http_client.h b/src/iceberg/catalog/rest/http_client.h index 56c9f2902..eddd73ac1 100644 --- a/src/iceberg/catalog/rest/http_client.h +++ b/src/iceberg/catalog/rest/http_client.h @@ -104,13 +104,14 @@ class ICEBERG_REST_EXPORT HttpClient { /// \brief Sends a DELETE request. Result Delete(const std::string& path, + const std::unordered_map& params, const std::unordered_map& headers, const ErrorHandler& error_handler); private: - void PrepareSession(const std::string& path, - const std::unordered_map& request_headers, - const std::unordered_map& params = {}); + void PrepareSession( + const std::string& path, const std::unordered_map& params, + const std::unordered_map& request_headers); std::unordered_map default_headers_; diff --git a/src/iceberg/catalog/rest/rest_catalog.cc b/src/iceberg/catalog/rest/rest_catalog.cc index eeffffd26..abc9a2303 100644 --- a/src/iceberg/catalog/rest/rest_catalog.cc +++ b/src/iceberg/catalog/rest/rest_catalog.cc @@ -189,9 +189,10 @@ Status RestCatalog::DropNamespace(const Namespace& ns) { ICEBERG_RETURN_UNEXPECTED( CheckEndpoint(supported_endpoints_, Endpoint::DropNamespace())); ICEBERG_ASSIGN_OR_RAISE(auto path, paths_->Namespace_(ns)); - ICEBERG_ASSIGN_OR_RAISE( - const auto response, - client_->Delete(path, /*headers=*/{}, *DropNamespaceErrorHandler::Instance())); + + ICEBERG_ASSIGN_OR_RAISE(const auto response, + client_->Delete(path, /*params=*/{}, /*headers=*/{}, + *DropNamespaceErrorHandler::Instance())); return {}; } @@ -204,7 +205,7 @@ Result RestCatalog::NamespaceExists(const Namespace& ns) const { return false; } ICEBERG_RETURN_UNEXPECTED(result); - // GET succeeded, namespace exists + // GetNamespaceProperties succeeded, namespace exists return true; } @@ -212,9 +213,8 @@ Result RestCatalog::NamespaceExists(const Namespace& ns) const { auto response_or_error = client_->Head(path, /*headers=*/{}, *NamespaceErrorHandler::Instance()); if (!response_or_error.has_value()) { - const auto& error = response_or_error.error(); // catch NoSuchNamespaceException/404 and return false - if (error.kind == ErrorKind::kNoSuchNamespace) { + if (response_or_error.error().kind == ErrorKind::kNoSuchNamespace) { return false; } ICEBERG_RETURN_UNEXPECTED(response_or_error); @@ -294,14 +294,44 @@ Result> RestCatalog::StageCreateTable( return NotImplemented("Not implemented"); } -Status RestCatalog::DropTable([[maybe_unused]] const TableIdentifier& identifier, - [[maybe_unused]] bool purge) { - return NotImplemented("Not implemented"); +Status RestCatalog::DropTable(const TableIdentifier& identifier, bool purge) { + ICEBERG_RETURN_UNEXPECTED(CheckEndpoint(supported_endpoints_, Endpoint::DeleteTable())); + ICEBERG_ASSIGN_OR_RAISE(auto path, paths_->Table(identifier)); + + std::unordered_map params; + if (purge) { + params["purgeRequested"] = "true"; + } + ICEBERG_ASSIGN_OR_RAISE( + const auto response, + client_->Delete(path, params, /*headers=*/{}, *TableErrorHandler::Instance())); + return {}; } -Result RestCatalog::TableExists( - [[maybe_unused]] const TableIdentifier& identifier) const { - return NotImplemented("Not implemented"); +Result RestCatalog::TableExists(const TableIdentifier& identifier) const { + auto check = CheckEndpoint(supported_endpoints_, Endpoint::TableExists()); + if (!check.has_value()) { + // Fall back to LoadTable endpoint (GET) + auto result = LoadTable(identifier); + if (!result.has_value() && result.error().kind == ErrorKind::kNoSuchTable) { + return false; + } + ICEBERG_RETURN_UNEXPECTED(result); + // LoadTable succeeded, table exists + return true; + } + + ICEBERG_ASSIGN_OR_RAISE(auto path, paths_->Table(identifier)); + auto response_or_error = + client_->Head(path, /*headers=*/{}, *TableErrorHandler::Instance()); + if (!response_or_error.has_value()) { + // catch NoSuchTableException/404 and return false + if (response_or_error.error().kind == ErrorKind::kNoSuchTable) { + return false; + } + ICEBERG_RETURN_UNEXPECTED(response_or_error); + } + return true; } Status RestCatalog::RenameTable([[maybe_unused]] const TableIdentifier& from, @@ -310,8 +340,22 @@ Status RestCatalog::RenameTable([[maybe_unused]] const TableIdentifier& from, } Result> RestCatalog::LoadTable( - [[maybe_unused]] const TableIdentifier& identifier) { - return NotImplemented("Not implemented"); + const TableIdentifier& identifier) const { + ICEBERG_RETURN_UNEXPECTED(CheckEndpoint(supported_endpoints_, Endpoint::LoadTable())); + ICEBERG_ASSIGN_OR_RAISE(auto path, paths_->Table(identifier)); + + ICEBERG_ASSIGN_OR_RAISE( + const auto response, + client_->Get(path, /*params=*/{}, /*headers=*/{}, *TableErrorHandler::Instance())); + + // TODO(Feiyang Li): support load metadata table + ICEBERG_ASSIGN_OR_RAISE(auto json, FromJsonString(response.body())); + ICEBERG_ASSIGN_OR_RAISE(auto load_result, LoadTableResultFromJson(json)); + // Cast away const since Table needs non-const Catalog pointer for mutations + auto non_const_catalog = std::const_pointer_cast(shared_from_this()); + return Table::Make(identifier, load_result.metadata, + std::move(load_result.metadata_location), file_io_, + non_const_catalog); } Result> RestCatalog::RegisterTable( diff --git a/src/iceberg/catalog/rest/rest_catalog.h b/src/iceberg/catalog/rest/rest_catalog.h index a80965211..5b9985041 100644 --- a/src/iceberg/catalog/rest/rest_catalog.h +++ b/src/iceberg/catalog/rest/rest_catalog.h @@ -97,7 +97,8 @@ class ICEBERG_REST_EXPORT RestCatalog : public Catalog, Status DropTable(const TableIdentifier& identifier, bool purge) override; - Result> LoadTable(const TableIdentifier& identifier) override; + Result> LoadTable( + const TableIdentifier& identifier) const override; Result> RegisterTable( const TableIdentifier& identifier, diff --git a/src/iceberg/test/mock_catalog.h b/src/iceberg/test/mock_catalog.h index 7873e6fe3..4564c1b22 100644 --- a/src/iceberg/test/mock_catalog.h +++ b/src/iceberg/test/mock_catalog.h @@ -81,7 +81,7 @@ class MockCatalog : public Catalog { (override)); MOCK_METHOD((Result>), LoadTable, (const TableIdentifier&), - (override)); + (const, override)); MOCK_METHOD((Result>), RegisterTable, (const TableIdentifier&, const std::string&), (override)); diff --git a/src/iceberg/test/rest_catalog_test.cc b/src/iceberg/test/rest_catalog_test.cc index fb1f70610..d4afef615 100644 --- a/src/iceberg/test/rest_catalog_test.cc +++ b/src/iceberg/test/rest_catalog_test.cc @@ -136,6 +136,30 @@ class RestCatalogIntegrationTest : public ::testing::Test { return RestCatalog::Make(*config, std::make_shared()); } + // Helper function to create a default schema for testing + std::shared_ptr CreateDefaultSchema() { + return std::make_shared( + std::vector{SchemaField::MakeRequired(1, "id", int32()), + SchemaField::MakeOptional(2, "data", string())}, + /*schema_id=*/1); + } + + // Helper function to create a default unpartitioned partition spec + std::shared_ptr CreateDefaultPartitionSpec() { + auto partition_spec_result = + PartitionSpec::Make(PartitionSpec::kInitialSpecId, {}, 0); + EXPECT_THAT(partition_spec_result, IsOk()); + return {std::move(*partition_spec_result)}; + } + + // Helper function to create a default unsorted sort order + std::shared_ptr CreateDefaultSortOrder() { + auto sort_order_result = + SortOrder::Make(SortOrder::kUnsortedOrderId, std::vector{}); + EXPECT_THAT(sort_order_result, IsOk()); + return {std::move(*sort_order_result)}; + } + static inline std::unique_ptr docker_compose_; }; @@ -288,7 +312,7 @@ TEST_F(RestCatalogIntegrationTest, NamespaceExists) { // Check it now exists exists_result = catalog->NamespaceExists(ns); ASSERT_THAT(exists_result, IsOk()); - EXPECT_TRUE(*exists_result); + EXPECT_TRUE(exists_result.value()); } TEST_F(RestCatalogIntegrationTest, UpdateNamespaceProperties) { @@ -340,7 +364,19 @@ TEST_F(RestCatalogIntegrationTest, DropNamespace) { // Verify it no longer exists exists_result = catalog->NamespaceExists(ns); ASSERT_THAT(exists_result, IsOk()); - EXPECT_FALSE(*exists_result); + EXPECT_FALSE(exists_result.value()); +} + +TEST_F(RestCatalogIntegrationTest, DropNonExistentNamespace) { + auto catalog_result = CreateCatalog(); + ASSERT_THAT(catalog_result, IsOk()); + auto& catalog = catalog_result.value(); + + Namespace ns{.levels = {"nonexistent_namespace"}}; + auto status = catalog->DropNamespace(ns); + + // Should return NoSuchNamespace error + EXPECT_THAT(status, IsError(ErrorKind::kNoSuchNamespace)); } TEST_F(RestCatalogIntegrationTest, CreateTable) { @@ -362,22 +398,9 @@ TEST_F(RestCatalogIntegrationTest, CreateTable) { status = catalog->CreateNamespace(ns, ns_properties); EXPECT_THAT(status, IsOk()); - // Create schema - auto schema = std::make_shared( - std::vector{SchemaField::MakeOptional(1, "foo", string()), - SchemaField::MakeRequired(2, "bar", int32()), - SchemaField::MakeOptional(3, "baz", boolean())}, - /*schema_id=*/1); - - // Create partition spec and sort order (unpartitioned and unsorted) - auto partition_spec_result = PartitionSpec::Make(PartitionSpec::kInitialSpecId, {}, 0); - ASSERT_THAT(partition_spec_result, IsOk()); - auto partition_spec = std::shared_ptr(std::move(*partition_spec_result)); - - auto sort_order_result = - SortOrder::Make(SortOrder::kUnsortedOrderId, std::vector{}); - ASSERT_THAT(sort_order_result, IsOk()); - auto sort_order = std::shared_ptr(std::move(*sort_order_result)); + auto schema = CreateDefaultSchema(); + auto partition_spec = CreateDefaultPartitionSpec(); + auto sort_order = CreateDefaultSortOrder(); // Create table TableIdentifier table_id{.ns = ns, .name = "t1"}; @@ -400,4 +423,82 @@ TEST_F(RestCatalogIntegrationTest, CreateTable) { HasErrorMessage("Table already exists: test_create_table.apple.ios.t1")); } +TEST_F(RestCatalogIntegrationTest, LoadTable) { + auto catalog_result = CreateCatalog(); + ASSERT_THAT(catalog_result, IsOk()); + auto& catalog = catalog_result.value(); + + // Create namespace and table first + Namespace ns{.levels = {"test_load_table"}}; + auto status = catalog->CreateNamespace(ns, {}); + EXPECT_THAT(status, IsOk()); + + // Create schema, partition spec, and sort order using helper functions + auto schema = CreateDefaultSchema(); + auto partition_spec = CreateDefaultPartitionSpec(); + auto sort_order = CreateDefaultSortOrder(); + + // Create table + TableIdentifier table_id{.ns = ns, .name = "test_table"}; + std::unordered_map table_properties{{"key1", "value1"}}; + auto create_result = catalog->CreateTable(table_id, schema, partition_spec, sort_order, + "", table_properties); + ASSERT_THAT(create_result, IsOk()); + + // Load the table + auto load_result = catalog->LoadTable(table_id); + ASSERT_THAT(load_result, IsOk()); + auto& loaded_table = load_result.value(); + + // Verify loaded table properties + EXPECT_EQ(loaded_table->name().ns.levels, std::vector{"test_load_table"}); + EXPECT_EQ(loaded_table->name().name, "test_table"); + EXPECT_NE(loaded_table->metadata(), nullptr); + + // Verify schema + auto loaded_schema_result = loaded_table->schema(); + ASSERT_THAT(loaded_schema_result, IsOk()); + auto loaded_schema = loaded_schema_result.value(); + EXPECT_TRUE(loaded_schema->schema_id().has_value()); // Server assigns schema_id + EXPECT_EQ(loaded_schema->fields().size(), 2); + EXPECT_EQ(loaded_schema->fields()[0].name(), "id"); + EXPECT_EQ(loaded_schema->fields()[1].name(), "data"); +} + +TEST_F(RestCatalogIntegrationTest, DropTable) { + auto catalog_result = CreateCatalog(); + ASSERT_THAT(catalog_result, IsOk()); + auto& catalog = catalog_result.value(); + + // Create namespace and table first + Namespace ns{.levels = {"test_drop_table"}}; + auto status = catalog->CreateNamespace(ns, {}); + EXPECT_THAT(status, IsOk()); + + // Create table + auto schema = CreateDefaultSchema(); + auto partition_spec = CreateDefaultPartitionSpec(); + auto sort_order = CreateDefaultSortOrder(); + + TableIdentifier table_id{.ns = ns, .name = "table_to_drop"}; + std::unordered_map table_properties; + auto create_result = catalog->CreateTable(table_id, schema, partition_spec, sort_order, + "", table_properties); + ASSERT_THAT(create_result, IsOk()); + + // Verify table exists + auto load_result = catalog->TableExists(table_id); + ASSERT_THAT(load_result, IsOk()); + EXPECT_TRUE(load_result.value()); + + // Drop the table + status = catalog->DropTable(table_id, /*purge=*/false); + ASSERT_THAT(status, IsOk()); + + // Verify table no longer exists + load_result = catalog->TableExists(table_id); + ASSERT_THAT(load_result, IsOk()); + EXPECT_FALSE(load_result.value()); +} + } // namespace iceberg::rest