From f5007cc0d566176116bbbe8a1949e748d8269a83 Mon Sep 17 00:00:00 2001 From: Junwang Zhao Date: Sat, 10 Jan 2026 23:54:46 +0800 Subject: [PATCH 1/4] feat: add SnapshotManager The test cases are derived from Java's TestSnapshotManager.java. Since MergeAppend is not supported yet, some tests are omitted for now and can be added later. --- src/iceberg/CMakeLists.txt | 1 + src/iceberg/meson.build | 1 + src/iceberg/table.cc | 5 + src/iceberg/table.h | 3 + src/iceberg/test/CMakeLists.txt | 1 + src/iceberg/test/snapshot_manager_test.cc | 494 ++++++++++++++++++++++ src/iceberg/test/update_test_base.h | 26 ++ src/iceberg/transaction.cc | 8 + src/iceberg/transaction.h | 3 + src/iceberg/type_fwd.h | 1 + src/iceberg/update/meson.build | 1 + src/iceberg/update/snapshot_manager.cc | 224 ++++++++++ src/iceberg/update/snapshot_manager.h | 211 +++++++++ src/iceberg/update/snapshot_update.h | 12 + 14 files changed, 991 insertions(+) create mode 100644 src/iceberg/test/snapshot_manager_test.cc create mode 100644 src/iceberg/update/snapshot_manager.cc create mode 100644 src/iceberg/update/snapshot_manager.h diff --git a/src/iceberg/CMakeLists.txt b/src/iceberg/CMakeLists.txt index 5d3ff33b3..7cf867c04 100644 --- a/src/iceberg/CMakeLists.txt +++ b/src/iceberg/CMakeLists.txt @@ -89,6 +89,7 @@ set(ICEBERG_SOURCES update/fast_append.cc update/pending_update.cc update/set_snapshot.cc + update/snapshot_manager.cc update/snapshot_update.cc update/update_location.cc update/update_partition_spec.cc diff --git a/src/iceberg/meson.build b/src/iceberg/meson.build index fd82a889b..7dd3409ad 100644 --- a/src/iceberg/meson.build +++ b/src/iceberg/meson.build @@ -107,6 +107,7 @@ iceberg_sources = files( 'update/fast_append.cc', 'update/pending_update.cc', 'update/set_snapshot.cc', + 'update/snapshot_manager.cc', 'update/snapshot_update.cc', 'update/update_location.cc', 'update/update_partition_spec.cc', diff --git a/src/iceberg/table.cc b/src/iceberg/table.cc index b6c26ea00..2a31eb3b1 100644 --- a/src/iceberg/table.cc +++ b/src/iceberg/table.cc @@ -32,6 +32,7 @@ #include "iceberg/table_scan.h" #include "iceberg/transaction.h" #include "iceberg/update/expire_snapshots.h" +#include "iceberg/update/snapshot_manager.h" #include "iceberg/update/update_partition_spec.h" #include "iceberg/update/update_partition_statistics.h" #include "iceberg/update/update_properties.h" @@ -222,6 +223,10 @@ Result> Table::NewUpdatePartitionStat return transaction->NewUpdatePartitionStatistics(); } +Result> Table::NewSnapshotManager() { + return SnapshotManager::Make(name().ToString(), shared_from_this()); +} + Result> StagedTable::Make( TableIdentifier identifier, std::shared_ptr metadata, std::string metadata_location, std::shared_ptr io, diff --git a/src/iceberg/table.h b/src/iceberg/table.h index 1f3135dd7..423911c21 100644 --- a/src/iceberg/table.h +++ b/src/iceberg/table.h @@ -168,6 +168,9 @@ class ICEBERG_EXPORT Table : public std::enable_shared_from_this { /// \brief Create a new FastAppend to append data files and commit the changes. virtual Result> NewFastAppend(); + /// \brief Create a new SnapshotManager to manage snapshots and snapshot references. + virtual Result> NewSnapshotManager(); + protected: Table(TableIdentifier identifier, std::shared_ptr metadata, std::string metadata_location, std::shared_ptr io, diff --git a/src/iceberg/test/CMakeLists.txt b/src/iceberg/test/CMakeLists.txt index ae61d819f..a8f119fa4 100644 --- a/src/iceberg/test/CMakeLists.txt +++ b/src/iceberg/test/CMakeLists.txt @@ -179,6 +179,7 @@ if(ICEBERG_BUILD_BUNDLE) expire_snapshots_test.cc fast_append_test.cc set_snapshot_test.cc + snapshot_manager_test.cc transaction_test.cc update_location_test.cc update_partition_spec_test.cc diff --git a/src/iceberg/test/snapshot_manager_test.cc b/src/iceberg/test/snapshot_manager_test.cc new file mode 100644 index 000000000..f9bd044c7 --- /dev/null +++ b/src/iceberg/test/snapshot_manager_test.cc @@ -0,0 +1,494 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +#include "iceberg/update/snapshot_manager.h" + +#include +#include + +#include "iceberg/result.h" +#include "iceberg/snapshot.h" +#include "iceberg/test/matchers.h" +#include "iceberg/test/update_test_base.h" +#include "iceberg/transaction.h" +#include "iceberg/update/fast_append.h" + +namespace iceberg { + +class SnapshotManagerTest : public UpdateTestBase { + protected: + // These snapshot IDs correspond to the snapshots in the TableMetadataV2Valid.json + static constexpr int64_t kOldestSnapshotId = 3051729675574597004; + static constexpr int64_t kCurrentSnapshotId = 3055729675574597004; +}; + +TEST_F(SnapshotManagerTest, CreateBranch) { + ICEBERG_UNWRAP_OR_FAIL(auto manager, SnapshotManager::Make("test_table", table_)); + manager->CreateBranch("branch1", kCurrentSnapshotId); + EXPECT_THAT(manager->Commit(), IsOk()); + + ICEBERG_UNWRAP_OR_FAIL(auto reloaded, catalog_->LoadTable(table_ident_)); + auto it = reloaded->metadata()->refs.find("branch1"); + EXPECT_NE(it, reloaded->metadata()->refs.end()); + auto ref = it->second; + EXPECT_EQ(ref->type(), SnapshotRefType::kBranch); + EXPECT_EQ(ref->snapshot_id, kCurrentSnapshotId); +} + +TEST_F(SnapshotManagerTest, CreateBranchWithoutSnapshotId) { + ICEBERG_UNWRAP_OR_FAIL(auto manager, SnapshotManager::Make("test_table", table_)); + manager->CreateBranch("branch1"); + EXPECT_THAT(manager->Commit(), IsOk()); + + ICEBERG_UNWRAP_OR_FAIL(auto reloaded, catalog_->LoadTable(table_ident_)); + auto it = reloaded->metadata()->refs.find("branch1"); + EXPECT_NE(it, reloaded->metadata()->refs.end()); + auto ref = it->second; + EXPECT_EQ(ref->type(), SnapshotRefType::kBranch); + EXPECT_EQ(ref->snapshot_id, kCurrentSnapshotId); +} + +TEST_F(SnapshotManagerTest, CreateBranchOnEmptyTable) { + ICEBERG_UNWRAP_OR_FAIL(auto manager, + SnapshotManager::Make("minimal_table", minimal_table_)); + manager->CreateBranch("branch1"); + EXPECT_THAT(manager->Commit(), IsOk()); + + ICEBERG_UNWRAP_OR_FAIL(auto reloaded, catalog_->LoadTable(minimal_table_ident_)); + EXPECT_FALSE( + reloaded->metadata()->refs.contains(std::string(SnapshotRef::kMainBranch))); + auto it = reloaded->metadata()->refs.find("branch1"); + EXPECT_NE(it, reloaded->metadata()->refs.end()); + auto ref = it->second; + EXPECT_EQ(ref->type(), SnapshotRefType::kBranch); +} + +TEST_F(SnapshotManagerTest, CreateBranchOnEmptyTableFailsWhenRefAlreadyExists) { + ICEBERG_UNWRAP_OR_FAIL(auto manager, + SnapshotManager::Make("minimal_table", minimal_table_)); + manager->CreateBranch("branch1"); + EXPECT_THAT(manager->Commit(), IsOk()); + + ICEBERG_UNWRAP_OR_FAIL(auto manager2, + SnapshotManager::Make("minimal_table", minimal_table_)); + manager2->CreateBranch("branch1"); + auto result = manager2->Commit(); + EXPECT_THAT(result, IsError(ErrorKind::kCommitFailed)); + EXPECT_THAT(result, HasErrorMessage("branch 'branch1' was created concurrently")); +} + +TEST_F(SnapshotManagerTest, CreateBranchFailsWhenRefAlreadyExists) { + ICEBERG_UNWRAP_OR_FAIL(auto manager, SnapshotManager::Make("test_table", table_)); + manager->CreateBranch("branch1", kCurrentSnapshotId); + EXPECT_THAT(manager->Commit(), IsOk()); + + // Try to create a branch with an existing name + ICEBERG_UNWRAP_OR_FAIL(auto manager2, SnapshotManager::Make("test_table", table_)); + manager2->CreateBranch("branch1", kCurrentSnapshotId); + auto result = manager2->Commit(); + EXPECT_THAT(result, IsError(ErrorKind::kCommitFailed)); + EXPECT_THAT(result, HasErrorMessage("branch 'branch1' was created concurrently")); +} + +TEST_F(SnapshotManagerTest, CreateTag) { + ICEBERG_UNWRAP_OR_FAIL(auto manager, SnapshotManager::Make("test_table", table_)); + manager->CreateTag("tag1", kCurrentSnapshotId); + EXPECT_THAT(manager->Commit(), IsOk()); + + ICEBERG_UNWRAP_OR_FAIL(auto reloaded, catalog_->LoadTable(table_ident_)); + auto it = reloaded->metadata()->refs.find("tag1"); + EXPECT_NE(it, reloaded->metadata()->refs.end()); + auto ref = it->second; + EXPECT_EQ(ref->type(), SnapshotRefType::kTag); + EXPECT_EQ(ref->snapshot_id, kCurrentSnapshotId); +} + +TEST_F(SnapshotManagerTest, CreateTagFailsWhenRefAlreadyExists) { + ICEBERG_UNWRAP_OR_FAIL(auto manager, SnapshotManager::Make("test_table", table_)); + manager->CreateTag("tag1", kCurrentSnapshotId); + EXPECT_THAT(manager->Commit(), IsOk()); + + // Try to create a tag with an existing name + ICEBERG_UNWRAP_OR_FAIL(auto manager2, SnapshotManager::Make("test_table", table_)); + manager2->CreateTag("tag1", kCurrentSnapshotId); + auto result = manager2->Commit(); + EXPECT_THAT(result, IsError(ErrorKind::kCommitFailed)); + EXPECT_THAT(result, HasErrorMessage("tag 'tag1' was created concurrently")); +} + +TEST_F(SnapshotManagerTest, RemoveBranch) { + ICEBERG_UNWRAP_OR_FAIL(auto manager, SnapshotManager::Make("test_table", table_)); + manager->CreateBranch("branch1", kCurrentSnapshotId); + EXPECT_THAT(manager->Commit(), IsOk()); + + { + ICEBERG_UNWRAP_OR_FAIL(auto reloaded, catalog_->LoadTable(table_ident_)); + ICEBERG_UNWRAP_OR_FAIL(auto manager2, SnapshotManager::Make("test_table", reloaded)); + manager2->RemoveBranch("branch1"); + EXPECT_THAT(manager2->Commit(), IsOk()); + } + + { + ICEBERG_UNWRAP_OR_FAIL(auto reloaded, catalog_->LoadTable(table_ident_)); + EXPECT_FALSE(reloaded->metadata()->refs.contains("branch1")); + } +} + +TEST_F(SnapshotManagerTest, RemovingNonExistingBranchFails) { + ICEBERG_UNWRAP_OR_FAIL(auto manager, SnapshotManager::Make("test_table", table_)); + manager->RemoveBranch("non-existing"); + auto result = manager->Commit(); + EXPECT_THAT(result, IsError(ErrorKind::kValidationFailed)); + EXPECT_THAT(result, HasErrorMessage("Branch does not exist: non-existing")); +} + +TEST_F(SnapshotManagerTest, RemovingMainBranchFails) { + ICEBERG_UNWRAP_OR_FAIL(auto manager, SnapshotManager::Make("test_table", table_)); + manager->RemoveBranch(std::string(SnapshotRef::kMainBranch)); + auto result = manager->Commit(); + EXPECT_THAT(result, IsError(ErrorKind::kValidationFailed)); + EXPECT_THAT(result, HasErrorMessage("Cannot remove main branch")); +} + +TEST_F(SnapshotManagerTest, RemoveTag) { + ICEBERG_UNWRAP_OR_FAIL(auto manager, SnapshotManager::Make("test_table", table_)); + manager->CreateTag("tag1", kCurrentSnapshotId); + EXPECT_THAT(manager->Commit(), IsOk()); + + { + ICEBERG_UNWRAP_OR_FAIL(auto reloaded, catalog_->LoadTable(table_ident_)); + ICEBERG_UNWRAP_OR_FAIL(auto manager2, SnapshotManager::Make("test_table", reloaded)); + manager2->RemoveTag("tag1"); + EXPECT_THAT(manager2->Commit(), IsOk()); + } + + { + ICEBERG_UNWRAP_OR_FAIL(auto reloaded, catalog_->LoadTable(table_ident_)); + EXPECT_FALSE(reloaded->metadata()->refs.contains("tag1")); + } +} + +TEST_F(SnapshotManagerTest, RemovingNonExistingTagFails) { + ICEBERG_UNWRAP_OR_FAIL(auto manager, SnapshotManager::Make("test_table", table_)); + manager->RemoveTag("non-existing"); + auto result = manager->Commit(); + EXPECT_THAT(result, IsError(ErrorKind::kValidationFailed)); + EXPECT_THAT(result, HasErrorMessage("Tag does not exist: non-existing")); +} + +TEST_F(SnapshotManagerTest, ReplaceBranch) { + ICEBERG_UNWRAP_OR_FAIL(auto manager, SnapshotManager::Make("test_table", table_)); + manager->CreateBranch("branch1", kOldestSnapshotId); + manager->CreateBranch("branch2", kCurrentSnapshotId); + EXPECT_THAT(manager->Commit(), IsOk()); + + { + ICEBERG_UNWRAP_OR_FAIL(auto reloaded, catalog_->LoadTable(table_ident_)); + ICEBERG_UNWRAP_OR_FAIL(auto manager2, SnapshotManager::Make("test_table", reloaded)); + manager2->ReplaceBranch("branch1", "branch2"); + EXPECT_THAT(manager2->Commit(), IsOk()); + } + + { + ICEBERG_UNWRAP_OR_FAIL(auto reloaded, catalog_->LoadTable(table_ident_)); + auto it = reloaded->metadata()->refs.find("branch1"); + EXPECT_NE(it, reloaded->metadata()->refs.end()); + auto ref = it->second; + EXPECT_NE(ref, nullptr); + EXPECT_EQ(ref->snapshot_id, kCurrentSnapshotId); + } +} + +TEST_F(SnapshotManagerTest, ReplaceBranchNonExistingToBranchFails) { + ICEBERG_UNWRAP_OR_FAIL(auto manager, SnapshotManager::Make("test_table", table_)); + manager->CreateBranch("branch1", kCurrentSnapshotId); + EXPECT_THAT(manager->Commit(), IsOk()); + + ICEBERG_UNWRAP_OR_FAIL(auto reloaded, catalog_->LoadTable(table_ident_)); + ICEBERG_UNWRAP_OR_FAIL(auto manager2, SnapshotManager::Make("test_table", reloaded)); + manager2->ReplaceBranch("branch1", "non-existing"); + auto result = manager2->Commit(); + EXPECT_THAT(result, IsError(ErrorKind::kValidationFailed)); + EXPECT_THAT(result, HasErrorMessage("Ref does not exist: non-existing")); +} + +TEST_F(SnapshotManagerTest, ReplaceBranchNonExistingFromBranchCreatesTheBranch) { + ICEBERG_UNWRAP_OR_FAIL(auto manager, SnapshotManager::Make("test_table", table_)); + manager->CreateBranch("branch1", kCurrentSnapshotId); + EXPECT_THAT(manager->Commit(), IsOk()); + + { + ICEBERG_UNWRAP_OR_FAIL(auto reloaded, catalog_->LoadTable(table_ident_)); + ICEBERG_UNWRAP_OR_FAIL(auto manager2, SnapshotManager::Make("test_table", reloaded)); + manager2->ReplaceBranch("new-branch", "branch1"); + EXPECT_THAT(manager2->Commit(), IsOk()); + } + + { + ICEBERG_UNWRAP_OR_FAIL(auto reloaded, catalog_->LoadTable(table_ident_)); + auto it = reloaded->metadata()->refs.find("new-branch"); + EXPECT_NE(it, reloaded->metadata()->refs.end()); + auto ref = it->second; + EXPECT_EQ(ref->type(), SnapshotRefType::kBranch); + EXPECT_EQ(ref->snapshot_id, kCurrentSnapshotId); + } +} + +TEST_F(SnapshotManagerTest, FastForwardBranchNonExistingFromBranchCreatesTheBranch) { + ICEBERG_UNWRAP_OR_FAIL(auto manager, SnapshotManager::Make("test_table", table_)); + manager->CreateBranch("branch1", kCurrentSnapshotId); + EXPECT_THAT(manager->Commit(), IsOk()); + + { + ICEBERG_UNWRAP_OR_FAIL(auto reloaded, catalog_->LoadTable(table_ident_)); + ICEBERG_UNWRAP_OR_FAIL(auto manager2, SnapshotManager::Make("test_table", reloaded)); + manager2->FastForwardBranch("new-branch", "branch1"); + EXPECT_THAT(manager2->Commit(), IsOk()); + } + + { + ICEBERG_UNWRAP_OR_FAIL(auto reloaded, catalog_->LoadTable(table_ident_)); + auto it = reloaded->metadata()->refs.find("new-branch"); + EXPECT_NE(it, reloaded->metadata()->refs.end()); + auto ref = it->second; + EXPECT_EQ(ref->type(), SnapshotRefType::kBranch); + EXPECT_EQ(ref->snapshot_id, kCurrentSnapshotId); + } +} + +TEST_F(SnapshotManagerTest, FastForwardBranchNonExistingToFails) { + ICEBERG_UNWRAP_OR_FAIL(auto manager, SnapshotManager::Make("test_table", table_)); + manager->CreateBranch("branch1", kCurrentSnapshotId); + EXPECT_THAT(manager->Commit(), IsOk()); + + ICEBERG_UNWRAP_OR_FAIL(auto reloaded, catalog_->LoadTable(table_ident_)); + ICEBERG_UNWRAP_OR_FAIL(auto manager2, SnapshotManager::Make("test_table", reloaded)); + manager2->FastForwardBranch("branch1", "non-existing"); + auto result = manager2->Commit(); + EXPECT_THAT(result, IsError(ErrorKind::kValidationFailed)); + EXPECT_THAT(result, HasErrorMessage("Ref does not exist: non-existing")); +} + +TEST_F(SnapshotManagerTest, ReplaceTag) { + ICEBERG_UNWRAP_OR_FAIL(auto manager, SnapshotManager::Make("test_table", table_)); + manager->CreateTag("tag1", kCurrentSnapshotId); + EXPECT_THAT(manager->Commit(), IsOk()); + + { + ICEBERG_UNWRAP_OR_FAIL(auto reloaded, catalog_->LoadTable(table_ident_)); + ICEBERG_UNWRAP_OR_FAIL(auto manager2, SnapshotManager::Make("test_table", reloaded)); + manager2->ReplaceTag("tag1", kCurrentSnapshotId); + EXPECT_THAT(manager2->Commit(), IsOk()); + } + + { + ICEBERG_UNWRAP_OR_FAIL(auto reloaded, catalog_->LoadTable(table_ident_)); + auto it = reloaded->metadata()->refs.find("tag1"); + EXPECT_NE(it, reloaded->metadata()->refs.end()); + auto ref = it->second; + EXPECT_NE(ref, nullptr); + EXPECT_EQ(ref->snapshot_id, kCurrentSnapshotId); + } +} + +TEST_F(SnapshotManagerTest, UpdatingBranchRetention) { + ICEBERG_UNWRAP_OR_FAIL(auto manager, SnapshotManager::Make("test_table", table_)); + manager->CreateBranch("branch1", kCurrentSnapshotId); + EXPECT_THAT(manager->Commit(), IsOk()); + + { + ICEBERG_UNWRAP_OR_FAIL(auto reloaded, catalog_->LoadTable(table_ident_)); + ICEBERG_UNWRAP_OR_FAIL(auto manager2, SnapshotManager::Make("test_table", reloaded)); + manager2->SetMinSnapshotsToKeep("branch1", 10); + manager2->SetMaxSnapshotAgeMs("branch1", 20000); + EXPECT_THAT(manager2->Commit(), IsOk()); + } + + { + ICEBERG_UNWRAP_OR_FAIL(auto reloaded, catalog_->LoadTable(table_ident_)); + auto it = reloaded->metadata()->refs.find("branch1"); + EXPECT_NE(it, reloaded->metadata()->refs.end()); + auto ref = it->second; + EXPECT_EQ(ref->type(), SnapshotRefType::kBranch); + const auto& branch = std::get(ref->retention); + EXPECT_EQ(branch.max_snapshot_age_ms, 20000); + EXPECT_EQ(branch.min_snapshots_to_keep, 10); + } +} + +TEST_F(SnapshotManagerTest, SettingBranchRetentionOnTagFails) { + ICEBERG_UNWRAP_OR_FAIL(auto manager, SnapshotManager::Make("test_table", table_)); + manager->CreateTag("tag1", kCurrentSnapshotId); + EXPECT_THAT(manager->Commit(), IsOk()); + + { + ICEBERG_UNWRAP_OR_FAIL(auto reloaded, catalog_->LoadTable(table_ident_)); + ICEBERG_UNWRAP_OR_FAIL(auto manager2, SnapshotManager::Make("test_table", reloaded)); + manager2->SetMinSnapshotsToKeep("tag1", 10); + auto result = manager2->Commit(); + EXPECT_THAT(result, IsError(ErrorKind::kValidationFailed)); + EXPECT_THAT(result, HasErrorMessage("Ref 'tag1' is a tag not a branch")); + } + + { + ICEBERG_UNWRAP_OR_FAIL(auto reloaded, catalog_->LoadTable(table_ident_)); + ICEBERG_UNWRAP_OR_FAIL(auto manager2, SnapshotManager::Make("test_table", reloaded)); + manager2->SetMaxSnapshotAgeMs("tag1", 10); + auto result = manager2->Commit(); + EXPECT_THAT(result, IsError(ErrorKind::kValidationFailed)); + EXPECT_THAT(result, HasErrorMessage("Ref 'tag1' is a tag not a branch")); + } +} + +TEST_F(SnapshotManagerTest, UpdatingBranchMaxRefAge) { + ICEBERG_UNWRAP_OR_FAIL(auto manager, SnapshotManager::Make("test_table", table_)); + manager->CreateBranch("branch1", kCurrentSnapshotId); + EXPECT_THAT(manager->Commit(), IsOk()); + + { + ICEBERG_UNWRAP_OR_FAIL(auto reloaded, catalog_->LoadTable(table_ident_)); + ICEBERG_UNWRAP_OR_FAIL(auto manager2, SnapshotManager::Make("test_table", reloaded)); + manager2->SetMaxRefAgeMs("branch1", 10000); + EXPECT_THAT(manager2->Commit(), IsOk()); + } + + { + ICEBERG_UNWRAP_OR_FAIL(auto reloaded, catalog_->LoadTable(table_ident_)); + auto it = reloaded->metadata()->refs.find("branch1"); + EXPECT_NE(it, reloaded->metadata()->refs.end()); + auto ref = it->second; + EXPECT_EQ(ref->max_ref_age_ms(), 10000); + } +} + +TEST_F(SnapshotManagerTest, UpdatingTagMaxRefAge) { + ICEBERG_UNWRAP_OR_FAIL(auto manager, SnapshotManager::Make("test_table", table_)); + manager->CreateTag("tag1", kCurrentSnapshotId); + EXPECT_THAT(manager->Commit(), IsOk()); + + { + ICEBERG_UNWRAP_OR_FAIL(auto reloaded, catalog_->LoadTable(table_ident_)); + ICEBERG_UNWRAP_OR_FAIL(auto manager2, SnapshotManager::Make("test_table", reloaded)); + manager2->SetMaxRefAgeMs("tag1", 10000); + EXPECT_THAT(manager2->Commit(), IsOk()); + } + + { + ICEBERG_UNWRAP_OR_FAIL(auto reloaded, catalog_->LoadTable(table_ident_)); + auto it = reloaded->metadata()->refs.find("tag1"); + EXPECT_NE(it, reloaded->metadata()->refs.end()); + auto ref = it->second; + EXPECT_EQ(ref->max_ref_age_ms(), 10000); + } +} + +TEST_F(SnapshotManagerTest, RenameBranch) { + ICEBERG_UNWRAP_OR_FAIL(auto manager, SnapshotManager::Make("test_table", table_)); + manager->CreateBranch("branch1", kCurrentSnapshotId); + EXPECT_THAT(manager->Commit(), IsOk()); + + { + ICEBERG_UNWRAP_OR_FAIL(auto reloaded, catalog_->LoadTable(table_ident_)); + ICEBERG_UNWRAP_OR_FAIL(auto manager2, SnapshotManager::Make("test_table", reloaded)); + manager2->RenameBranch("branch1", "branch2"); + EXPECT_THAT(manager2->Commit(), IsOk()); + } + + { + ICEBERG_UNWRAP_OR_FAIL(auto reloaded, catalog_->LoadTable(table_ident_)); + auto it1 = reloaded->metadata()->refs.find("branch1"); + EXPECT_EQ(it1, reloaded->metadata()->refs.end()); + + auto it2 = reloaded->metadata()->refs.find("branch2"); + EXPECT_NE(it2, reloaded->metadata()->refs.end()); + auto ref2 = it2->second; + EXPECT_EQ(ref2->snapshot_id, kCurrentSnapshotId); + } +} + +TEST_F(SnapshotManagerTest, FailRenamingMainBranch) { + ICEBERG_UNWRAP_OR_FAIL(auto manager, SnapshotManager::Make("test_table", table_)); + manager->RenameBranch(std::string(SnapshotRef::kMainBranch), "some-branch"); + auto result = manager->Commit(); + EXPECT_THAT(result, IsError(ErrorKind::kValidationFailed)); + EXPECT_THAT(result, HasErrorMessage("Cannot rename main branch")); +} + +TEST_F(SnapshotManagerTest, RenamingNonExistingBranchFails) { + ICEBERG_UNWRAP_OR_FAIL(auto manager, SnapshotManager::Make("test_table", table_)); + manager->RenameBranch("some-missing-branch", "some-branch"); + auto result = manager->Commit(); + EXPECT_THAT(result, IsError(ErrorKind::kValidationFailed)); + EXPECT_THAT(result, HasErrorMessage("Branch does not exist: some-missing-branch")); +} + +TEST_F(SnapshotManagerTest, RollbackTo) { + ICEBERG_UNWRAP_OR_FAIL(auto manager, SnapshotManager::Make("test_table", table_)); + manager->RollbackTo(kOldestSnapshotId); + EXPECT_THAT(manager->Commit(), IsOk()); + + ICEBERG_UNWRAP_OR_FAIL(auto reloaded, catalog_->LoadTable(table_ident_)); + ICEBERG_UNWRAP_OR_FAIL(auto current_snapshot, reloaded->current_snapshot()); + EXPECT_EQ(current_snapshot->snapshot_id, kOldestSnapshotId); +} + +TEST_F(SnapshotManagerTest, SetCurrentSnapshot) { + ICEBERG_UNWRAP_OR_FAIL(auto manager, SnapshotManager::Make("test_table", table_)); + manager->SetCurrentSnapshot(kOldestSnapshotId); + EXPECT_THAT(manager->Commit(), IsOk()); + + ICEBERG_UNWRAP_OR_FAIL(auto reloaded, catalog_->LoadTable(table_ident_)); + ICEBERG_UNWRAP_OR_FAIL(auto current_snapshot, reloaded->current_snapshot()); + EXPECT_EQ(current_snapshot->snapshot_id, kOldestSnapshotId); +} + +TEST_F(SnapshotManagerTest, CreateReferencesAndRollback) { + ICEBERG_UNWRAP_OR_FAIL(auto manager, SnapshotManager::Make("test_table", table_)); + manager->CreateBranch("branch1", kCurrentSnapshotId); + manager->CreateTag("tag1", kCurrentSnapshotId); + manager->RollbackTo(kOldestSnapshotId); + EXPECT_THAT(manager->Commit(), IsOk()); + + ICEBERG_UNWRAP_OR_FAIL(auto reloaded, catalog_->LoadTable(table_ident_)); + ICEBERG_UNWRAP_OR_FAIL(auto current_snapshot, reloaded->current_snapshot()); + EXPECT_EQ(current_snapshot->snapshot_id, kOldestSnapshotId); + + auto branch_it = reloaded->metadata()->refs.find("branch1"); + EXPECT_NE(branch_it, reloaded->metadata()->refs.end()); + EXPECT_EQ(branch_it->second->snapshot_id, kCurrentSnapshotId); + + auto tag_it = reloaded->metadata()->refs.find("tag1"); + EXPECT_NE(tag_it, reloaded->metadata()->refs.end()); + EXPECT_EQ(tag_it->second->snapshot_id, kCurrentSnapshotId); +} + +TEST_F(SnapshotManagerTest, SnapshotManagerThroughTransaction) { + ICEBERG_UNWRAP_OR_FAIL(auto txn, table_->NewTransaction()); + ICEBERG_UNWRAP_OR_FAIL(auto manager, SnapshotManager::Make(txn)); + + manager->RollbackTo(kOldestSnapshotId); + EXPECT_THAT(manager->Commit(), IsOk()); + EXPECT_THAT(txn->Commit(), IsOk()); + + ICEBERG_UNWRAP_OR_FAIL(auto reloaded, catalog_->LoadTable(table_ident_)); + ICEBERG_UNWRAP_OR_FAIL(auto current_snapshot, reloaded->current_snapshot()); + EXPECT_EQ(current_snapshot->snapshot_id, kOldestSnapshotId); +} + +} // namespace iceberg diff --git a/src/iceberg/test/update_test_base.h b/src/iceberg/test/update_test_base.h index c14cb76b9..8f6e08c25 100644 --- a/src/iceberg/test/update_test_base.h +++ b/src/iceberg/test/update_test_base.h @@ -43,6 +43,7 @@ class UpdateTestBase : public ::testing::Test { void SetUp() override { InitializeFileIO(); RegisterTableFromResource("TableMetadataV2Valid.json"); + RegisterMinimalTableFromResource("TableMetadataV2ValidMinimal.json"); } /// \brief Initialize file IO and create necessary directories. @@ -56,6 +57,7 @@ class UpdateTestBase : public ::testing::Test { static_cast(*file_io_).fs()); ASSERT_TRUE(arrow_fs != nullptr); ASSERT_TRUE(arrow_fs->CreateDir(table_location_ + "/metadata").ok()); + ASSERT_TRUE(arrow_fs->CreateDir(minimal_table_location_ + "/metadata").ok()); } /// \brief Register a table from a metadata resource file. @@ -78,11 +80,35 @@ class UpdateTestBase : public ::testing::Test { catalog_->RegisterTable(table_ident_, metadata_location)); } + /// \brief Register a minimal table from a metadata resource file. + /// + /// \param resource_name The name of the metadata resource file + void RegisterMinimalTableFromResource(const std::string& resource_name) { + // Drop existing table if it exists + std::ignore = catalog_->DropTable(minimal_table_ident_, /*purge=*/false); + + // Write table metadata to the table location. + auto metadata_location = + std::format("{}/metadata/00001-{}.metadata.json", minimal_table_location_, + Uuid::GenerateV7().ToString()); + ICEBERG_UNWRAP_OR_FAIL(auto metadata, ReadTableMetadataFromResource(resource_name)); + metadata->location = minimal_table_location_; + ASSERT_THAT(TableMetadataUtil::Write(*file_io_, metadata_location, *metadata), + IsOk()); + + // Register the table in the catalog. + ICEBERG_UNWRAP_OR_FAIL( + minimal_table_, catalog_->RegisterTable(minimal_table_ident_, metadata_location)); + } + const TableIdentifier table_ident_{.name = "test_table"}; const std::string table_location_{"/warehouse/test_table"}; + const TableIdentifier minimal_table_ident_{.name = "minimal_table"}; + const std::string minimal_table_location_{"/warehouse/minimal_table"}; std::shared_ptr file_io_; std::shared_ptr catalog_; std::shared_ptr
table_; + std::shared_ptr
minimal_table_; }; } // namespace iceberg diff --git a/src/iceberg/transaction.cc b/src/iceberg/transaction.cc index b24aa0da3..981823d04 100644 --- a/src/iceberg/transaction.cc +++ b/src/iceberg/transaction.cc @@ -36,6 +36,7 @@ #include "iceberg/update/fast_append.h" #include "iceberg/update/pending_update.h" #include "iceberg/update/set_snapshot.h" +#include "iceberg/update/snapshot_manager.h" #include "iceberg/update/snapshot_update.h" #include "iceberg/update/update_location.h" #include "iceberg/update/update_partition_spec.h" @@ -428,4 +429,11 @@ Transaction::NewUpdateSnapshotReference() { return update_ref; } +Result> Transaction::NewSnapshotManager() { + ICEBERG_ASSIGN_OR_RAISE(std::shared_ptr snapshot_manager, + SnapshotManager::Make(shared_from_this())); + ICEBERG_RETURN_UNEXPECTED(AddUpdate(snapshot_manager)); + return snapshot_manager; +} + } // namespace iceberg diff --git a/src/iceberg/transaction.h b/src/iceberg/transaction.h index e975be7ff..e694a48c9 100644 --- a/src/iceberg/transaction.h +++ b/src/iceberg/transaction.h @@ -105,6 +105,9 @@ class ICEBERG_EXPORT Transaction : public std::enable_shared_from_this> NewUpdateSnapshotReference(); + /// \brief Create a new SnapshotManager to manage snapshots. + Result> NewSnapshotManager(); + private: Transaction(std::shared_ptr
table, Kind kind, bool auto_commit, std::unique_ptr metadata_builder); diff --git a/src/iceberg/type_fwd.h b/src/iceberg/type_fwd.h index e97de0ac5..a712c2ede 100644 --- a/src/iceberg/type_fwd.h +++ b/src/iceberg/type_fwd.h @@ -184,6 +184,7 @@ class TableProperties; /// \brief Table update. class TableMetadataBuilder; class TableUpdate; +class SnapshotManager; class TableRequirement; class TableUpdateContext; class Transaction; diff --git a/src/iceberg/update/meson.build b/src/iceberg/update/meson.build index 102471c04..6acb007a1 100644 --- a/src/iceberg/update/meson.build +++ b/src/iceberg/update/meson.build @@ -21,6 +21,7 @@ install_headers( 'fast_append.h', 'pending_update.h', 'set_snapshot.h', + 'snapshot_manager.h', 'snapshot_update.h', 'update_location.h', 'update_partition_spec.h', diff --git a/src/iceberg/update/snapshot_manager.cc b/src/iceberg/update/snapshot_manager.cc new file mode 100644 index 000000000..5afedb455 --- /dev/null +++ b/src/iceberg/update/snapshot_manager.cc @@ -0,0 +1,224 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +#include "iceberg/update/snapshot_manager.h" + +#include +#include + +#include "iceberg/result.h" +#include "iceberg/snapshot.h" +#include "iceberg/table.h" +#include "iceberg/table_metadata.h" +#include "iceberg/transaction.h" +#include "iceberg/update/fast_append.h" +#include "iceberg/update/set_snapshot.h" +#include "iceberg/update/update_snapshot_reference.h" +#include "iceberg/util/macros.h" + +namespace iceberg { + +Result> SnapshotManager::Make( + const std::string& table_name, std::shared_ptr
table) { + if (table == nullptr) { + return InvalidArgument("Table cannot be null"); + } + if (table->metadata() == nullptr) { + return InvalidArgument("Cannot manage snapshots: table {} does not exist", + table_name); + } + // Create a transaction first + ICEBERG_ASSIGN_OR_RAISE(auto transaction, + Transaction::Make(table, Transaction::Kind::kUpdate, + /*auto_commit=*/false)); + auto manager = std::shared_ptr( + new SnapshotManager(std::move(transaction), /*is_external=*/false)); + return manager; +} + +Result> SnapshotManager::Make( + std::shared_ptr transaction) { + if (transaction == nullptr) { + return InvalidArgument("Invalid input transaction: null"); + } + return std::shared_ptr( + new SnapshotManager(std::move(transaction), /*is_external=*/true)); +} + +SnapshotManager::SnapshotManager(std::shared_ptr transaction, + bool is_external) + : PendingUpdate(transaction), is_external_transaction_(is_external) {} + +SnapshotManager::~SnapshotManager() = default; + +SnapshotManager& SnapshotManager::Cherrypick(int64_t snapshot_id) { + ICEBERG_BUILDER_RETURN_IF_ERROR(CommitIfRefUpdatesExist()); + // TODO(anyone): Implement cherrypick operation + ICEBERG_BUILDER_CHECK(false, "Cherrypick operation not yet implemented"); + return *this; +} + +SnapshotManager& SnapshotManager::SetCurrentSnapshot(int64_t snapshot_id) { + ICEBERG_BUILDER_RETURN_IF_ERROR(CommitIfRefUpdatesExist()); + ICEBERG_BUILDER_ASSIGN_OR_RETURN(auto set_snapshot, transaction_->NewSetSnapshot()); + set_snapshot->SetCurrentSnapshot(snapshot_id); + ICEBERG_BUILDER_RETURN_IF_ERROR(set_snapshot->Commit()); + return *this; +} + +SnapshotManager& SnapshotManager::RollbackToTime(TimePointMs timestamp_ms) { + ICEBERG_BUILDER_RETURN_IF_ERROR(CommitIfRefUpdatesExist()); + ICEBERG_BUILDER_ASSIGN_OR_RETURN(auto set_snapshot, transaction_->NewSetSnapshot()); + set_snapshot->RollbackToTime(UnixMsFromTimePointMs(timestamp_ms)); + ICEBERG_BUILDER_RETURN_IF_ERROR(set_snapshot->Commit()); + return *this; +} + +SnapshotManager& SnapshotManager::RollbackTo(int64_t snapshot_id) { + ICEBERG_BUILDER_RETURN_IF_ERROR(CommitIfRefUpdatesExist()); + ICEBERG_BUILDER_ASSIGN_OR_RETURN(auto set_snapshot, transaction_->NewSetSnapshot()); + set_snapshot->RollbackTo(snapshot_id); + ICEBERG_BUILDER_RETURN_IF_ERROR(set_snapshot->Commit()); + return *this; +} + +SnapshotManager& SnapshotManager::CreateBranch(const std::string& name) { + if (base().current_snapshot_id != kInvalidSnapshotId) { + ICEBERG_BUILDER_ASSIGN_OR_RETURN(auto current_snapshot, base().Snapshot()); + if (current_snapshot != nullptr) { + return CreateBranch(name, current_snapshot->snapshot_id); + } + } + const auto& current_refs = base().refs; + ICEBERG_BUILDER_CHECK(!base().refs.contains(name), "Ref {} already exists", name); + ICEBERG_BUILDER_ASSIGN_OR_RETURN(auto fast_append, transaction_->NewFastAppend()); + ICEBERG_BUILDER_RETURN_IF_ERROR(fast_append->ToBranch(name).Commit()); + return *this; +} + +SnapshotManager& SnapshotManager::CreateBranch(const std::string& name, + int64_t snapshot_id) { + ICEBERG_BUILDER_ASSIGN_OR_RETURN(auto update_ref, UpdateSnapshotReferencesOperation()); + update_ref->CreateBranch(name, snapshot_id); + return *this; +} + +SnapshotManager& SnapshotManager::CreateTag(const std::string& name, + int64_t snapshot_id) { + ICEBERG_BUILDER_ASSIGN_OR_RETURN(auto update_ref, UpdateSnapshotReferencesOperation()); + update_ref->CreateTag(name, snapshot_id); + return *this; +} + +SnapshotManager& SnapshotManager::RemoveBranch(const std::string& name) { + ICEBERG_BUILDER_ASSIGN_OR_RETURN(auto update_ref, UpdateSnapshotReferencesOperation()); + update_ref->RemoveBranch(name); + return *this; +} + +SnapshotManager& SnapshotManager::RemoveTag(const std::string& name) { + ICEBERG_BUILDER_ASSIGN_OR_RETURN(auto update_ref, UpdateSnapshotReferencesOperation()); + update_ref->RemoveTag(name); + return *this; +} + +SnapshotManager& SnapshotManager::ReplaceTag(const std::string& name, + int64_t snapshot_id) { + ICEBERG_BUILDER_ASSIGN_OR_RETURN(auto update_ref, UpdateSnapshotReferencesOperation()); + update_ref->ReplaceTag(name, snapshot_id); + return *this; +} + +SnapshotManager& SnapshotManager::ReplaceBranch(const std::string& name, + int64_t snapshot_id) { + ICEBERG_BUILDER_ASSIGN_OR_RETURN(auto update_ref, UpdateSnapshotReferencesOperation()); + update_ref->ReplaceBranch(name, snapshot_id); + return *this; +} + +SnapshotManager& SnapshotManager::ReplaceBranch(const std::string& from, + const std::string& to) { + ICEBERG_BUILDER_ASSIGN_OR_RETURN(auto update_ref, UpdateSnapshotReferencesOperation()); + update_ref->ReplaceBranch(from, to); + return *this; +} + +SnapshotManager& SnapshotManager::FastForwardBranch(const std::string& from, + const std::string& to) { + ICEBERG_BUILDER_ASSIGN_OR_RETURN(auto update_ref, UpdateSnapshotReferencesOperation()); + update_ref->FastForward(from, to); + return *this; +} + +SnapshotManager& SnapshotManager::RenameBranch(const std::string& name, + const std::string& new_name) { + ICEBERG_BUILDER_ASSIGN_OR_RETURN(auto update_ref, UpdateSnapshotReferencesOperation()); + update_ref->RenameBranch(name, new_name); + return *this; +} + +SnapshotManager& SnapshotManager::SetMinSnapshotsToKeep(const std::string& branch_name, + int32_t min_snapshots_to_keep) { + ICEBERG_BUILDER_ASSIGN_OR_RETURN(auto update_ref, UpdateSnapshotReferencesOperation()); + update_ref->SetMinSnapshotsToKeep(branch_name, min_snapshots_to_keep); + return *this; +} + +SnapshotManager& SnapshotManager::SetMaxSnapshotAgeMs(const std::string& branch_name, + int64_t max_snapshot_age_ms) { + ICEBERG_BUILDER_ASSIGN_OR_RETURN(auto update_ref, UpdateSnapshotReferencesOperation()); + update_ref->SetMaxSnapshotAgeMs(branch_name, max_snapshot_age_ms); + return *this; +} + +SnapshotManager& SnapshotManager::SetMaxRefAgeMs(const std::string& name, + int64_t max_ref_age_ms) { + ICEBERG_BUILDER_ASSIGN_OR_RETURN(auto update_ref, UpdateSnapshotReferencesOperation()); + update_ref->SetMaxRefAgeMs(name, max_ref_age_ms); + return *this; +} + +Result> SnapshotManager::Apply() { return base().Snapshot(); } + +Status SnapshotManager::Commit() { + ICEBERG_RETURN_UNEXPECTED(CommitIfRefUpdatesExist()); + if (!is_external_transaction_) { + ICEBERG_RETURN_UNEXPECTED(transaction_->Commit()); + } + return {}; +} + +Result> +SnapshotManager::UpdateSnapshotReferencesOperation() { + if (update_snapshot_references_operation_ == nullptr) { + ICEBERG_ASSIGN_OR_RAISE(update_snapshot_references_operation_, + transaction_->NewUpdateSnapshotReference()); + } + return update_snapshot_references_operation_; +} + +Status SnapshotManager::CommitIfRefUpdatesExist() { + if (update_snapshot_references_operation_ != nullptr) { + ICEBERG_RETURN_UNEXPECTED(update_snapshot_references_operation_->Commit()); + update_snapshot_references_operation_ = nullptr; + } + return {}; +} + +} // namespace iceberg diff --git a/src/iceberg/update/snapshot_manager.h b/src/iceberg/update/snapshot_manager.h new file mode 100644 index 000000000..6315ea701 --- /dev/null +++ b/src/iceberg/update/snapshot_manager.h @@ -0,0 +1,211 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +#pragma once + +#include +#include + +#include "iceberg/iceberg_export.h" +#include "iceberg/result.h" +#include "iceberg/snapshot.h" +#include "iceberg/type_fwd.h" +#include "iceberg/update/pending_update.h" +#include "iceberg/util/timepoint.h" + +namespace iceberg { + +/// \brief API for managing snapshots and snapshot references. +/// +/// Allows rolling table data back to a state at an older snapshot, cherry-picking +/// snapshots, and managing branches and tags. +class ICEBERG_EXPORT SnapshotManager : public PendingUpdate { + public: + /// \brief Create a SnapshotManager for a table. + /// + /// \param table_name The name of the table + /// \param table The table to manage snapshots for + /// \return A new SnapshotManager instance, or an error if the table doesn't exist + static Result> Make(const std::string& table_name, + std::shared_ptr
table); + + /// \brief Create a SnapshotManager from an existing transaction. + /// + /// \param transaction The transaction to use + /// \return A new SnapshotManager instance + static Result> Make( + std::shared_ptr transaction); + + ~SnapshotManager() override; + + // TODO(xxx): is this correct? + Kind kind() const final { return Kind::kUpdateSnapshotReference; } + + /// \brief Apply supported changes in given snapshot and create a new snapshot which + /// will be set as the current snapshot on commit. + /// + /// \param snapshot_id A snapshot ID whose changes to apply + /// \return Reference to this for method chaining + SnapshotManager& Cherrypick(int64_t snapshot_id); + + /// \brief Roll this table's data back to a specific Snapshot identified by id. + /// + /// \param snapshot_id Long id of the snapshot to roll back table data to + /// \return Reference to this for method chaining + SnapshotManager& SetCurrentSnapshot(int64_t snapshot_id); + + /// \brief Roll this table's data back to the last Snapshot before the given timestamp. + /// + /// \param timestamp_ms A timestamp in milliseconds + /// \return Reference to this for method chaining + SnapshotManager& RollbackToTime(TimePointMs timestamp_ms); + + /// \brief Rollback table's state to a specific Snapshot identified by id. + /// + /// \param snapshot_id Long id of snapshot id to roll back table to. Must be an ancestor + /// of the current snapshot + /// \return Reference to this for method chaining + SnapshotManager& RollbackTo(int64_t snapshot_id); + + /// \brief Create a new branch. The branch will point to current snapshot if the current + /// snapshot is not NULL. Otherwise, the branch will point to a newly created empty + /// snapshot. + /// + /// \param name Branch name + /// \return Reference to this for method chaining + SnapshotManager& CreateBranch(const std::string& name); + + /// \brief Create a new branch pointing to the given snapshot id. + /// + /// \param name Branch name + /// \param snapshot_id ID of the snapshot which will be the head of the branch + /// \return Reference to this for method chaining + SnapshotManager& CreateBranch(const std::string& name, int64_t snapshot_id); + + /// \brief Create a new tag pointing to the given snapshot id. + /// + /// \param name Tag name + /// \param snapshot_id Snapshot ID for the head of the new tag + /// \return Reference to this for method chaining + SnapshotManager& CreateTag(const std::string& name, int64_t snapshot_id); + + /// \brief Remove a branch by name. + /// + /// \param name Branch name + /// \return Reference to this for method chaining + SnapshotManager& RemoveBranch(const std::string& name); + + /// \brief Remove the tag with the given name. + /// + /// \param name Tag name + /// \return Reference to this for method chaining + SnapshotManager& RemoveTag(const std::string& name); + + /// \brief Replaces the tag with the given name to point to the specified snapshot. + /// + /// \param name Tag to replace + /// \param snapshot_id New snapshot id for the given tag + /// \return Reference to this for method chaining + SnapshotManager& ReplaceTag(const std::string& name, int64_t snapshot_id); + + /// \brief Replaces the branch with the given name to point to the specified snapshot. + /// + /// \param name Branch to replace + /// \param snapshot_id New snapshot id for the given branch + /// \return Reference to this for method chaining + SnapshotManager& ReplaceBranch(const std::string& name, int64_t snapshot_id); + + /// \brief Replaces the from branch to point to the to snapshot. The to will remain + /// unchanged, and from branch will retain its retention properties. If the from branch + /// does not exist, it will be created with default retention properties. + /// + /// \param from Branch to replace + /// \param to The branch from should be replaced with + /// \return Reference to this for method chaining + SnapshotManager& ReplaceBranch(const std::string& from, const std::string& to); + + /// \brief Performs a fast-forward of from up to the to snapshot if from is an ancestor + /// of to. The to will remain unchanged, and from will retain its retention properties. + /// If the from branch does not exist, it will be created with default retention + /// properties. + /// + /// \param from Branch to fast-forward + /// \param to Ref for the from branch to be fast forwarded to + /// \return Reference to this for method chaining + SnapshotManager& FastForwardBranch(const std::string& from, const std::string& to); + + /// \brief Rename a branch. + /// + /// \param name Name of branch to rename + /// \param new_name The desired new name of the branch + /// \return Reference to this for method chaining + SnapshotManager& RenameBranch(const std::string& name, const std::string& new_name); + + /// \brief Updates the minimum number of snapshots to keep for a branch. + /// + /// \param branch_name Branch name + /// \param min_snapshots_to_keep Minimum number of snapshots to retain on the branch + /// \return Reference to this for method chaining + SnapshotManager& SetMinSnapshotsToKeep(const std::string& branch_name, + int32_t min_snapshots_to_keep); + + /// \brief Updates the max snapshot age for a branch. + /// + /// \param branch_name Branch name + /// \param max_snapshot_age_ms Maximum snapshot age in milliseconds to retain on branch + /// \return Reference to this for method chaining + SnapshotManager& SetMaxSnapshotAgeMs(const std::string& branch_name, + int64_t max_snapshot_age_ms); + + /// \brief Updates the retention policy for a reference. + /// + /// \param name Reference name + /// \param max_ref_age_ms Retention age in milliseconds of the tag reference itself + /// \return Reference to this for method chaining + SnapshotManager& SetMaxRefAgeMs(const std::string& name, int64_t max_ref_age_ms); + + /// \brief Apply the pending changes and return the current snapshot. + /// + /// \return The current snapshot after applying changes, or an error + Result> Apply(); + + /// \brief Commit all pending changes. + /// + /// \return Status indicating success or failure + Status Commit() override; + + private: + /// \brief Constructor for creating a SnapshotManager with a transaction. + /// + /// \param transaction The transaction to use + /// \param is_external Whether this is an external transaction (true) or created + /// internally (false) + SnapshotManager(std::shared_ptr transaction, bool is_external); + + /// \brief Get or create the UpdateSnapshotReference operation. + Result> UpdateSnapshotReferencesOperation(); + + /// \brief Commit any pending reference updates if they exist. + Status CommitIfRefUpdatesExist(); + + bool is_external_transaction_; + std::shared_ptr update_snapshot_references_operation_; +}; + +} // namespace iceberg diff --git a/src/iceberg/update/snapshot_update.h b/src/iceberg/update/snapshot_update.h index 12c3b19dc..b26bc4c08 100644 --- a/src/iceberg/update/snapshot_update.h +++ b/src/iceberg/update/snapshot_update.h @@ -76,6 +76,18 @@ class ICEBERG_EXPORT SnapshotUpdate : public PendingUpdate { return self; } + /// \brief Perform operations on a particular branch + /// + /// \param branch Which is name of SnapshotRef of type branch + /// \return Reference to this for method chaining + auto& ToBranch(this auto& self, const std::string& branch) { + auto status = self.SetTargetBranch(branch); + if (!status.has_value()) { + return self.AddError(status.error()); + } + return self; + } + /// \brief Set a summary property. /// /// \param property The property name From 5caa472d211ed6c5b5cd75087bda56245a297095 Mon Sep 17 00:00:00 2001 From: Junwang Zhao Date: Mon, 2 Feb 2026 21:23:52 +0800 Subject: [PATCH 2/4] fix: review comments --- src/iceberg/table.cc | 5 +- src/iceberg/test/snapshot_manager_test.cc | 218 ++++++++++++---------- src/iceberg/transaction.cc | 25 ++- src/iceberg/type_fwd.h | 2 +- src/iceberg/update/pending_update.h | 1 + src/iceberg/update/snapshot_manager.cc | 46 +---- src/iceberg/update/snapshot_manager.h | 13 +- 7 files changed, 144 insertions(+), 166 deletions(-) diff --git a/src/iceberg/table.cc b/src/iceberg/table.cc index 2a31eb3b1..cf081143c 100644 --- a/src/iceberg/table.cc +++ b/src/iceberg/table.cc @@ -224,7 +224,10 @@ Result> Table::NewUpdatePartitionStat } Result> Table::NewSnapshotManager() { - return SnapshotManager::Make(name().ToString(), shared_from_this()); + ICEBERG_ASSIGN_OR_RAISE( + auto transaction, Transaction::Make(shared_from_this(), Transaction::Kind::kUpdate, + /*auto_commit=*/true)); + return SnapshotManager::Make(std::move(transaction)); } Result> StagedTable::Make( diff --git a/src/iceberg/test/snapshot_manager_test.cc b/src/iceberg/test/snapshot_manager_test.cc index f9bd044c7..4c0b09cb9 100644 --- a/src/iceberg/test/snapshot_manager_test.cc +++ b/src/iceberg/test/snapshot_manager_test.cc @@ -33,14 +33,26 @@ namespace iceberg { class SnapshotManagerTest : public UpdateTestBase { protected: - // These snapshot IDs correspond to the snapshots in the TableMetadataV2Valid.json - static constexpr int64_t kOldestSnapshotId = 3051729675574597004; - static constexpr int64_t kCurrentSnapshotId = 3055729675574597004; + void SetUp() override { + UpdateTestBase::SetUp(); + ExtractSnapshotIdsFromTable(); + } + + void ExtractSnapshotIdsFromTable() { + ICEBERG_UNWRAP_OR_FAIL(auto current, table_->current_snapshot()); + current_snapshot_id_ = current->snapshot_id; + ASSERT_FALSE(table_->snapshots().empty()); + oldest_snapshot_id_ = table_->snapshots().front()->snapshot_id; + } + + // Snapshot IDs from the main table (TableMetadataV2Valid.json) + int64_t current_snapshot_id_{}; + int64_t oldest_snapshot_id_{}; }; TEST_F(SnapshotManagerTest, CreateBranch) { - ICEBERG_UNWRAP_OR_FAIL(auto manager, SnapshotManager::Make("test_table", table_)); - manager->CreateBranch("branch1", kCurrentSnapshotId); + ICEBERG_UNWRAP_OR_FAIL(auto manager, table_->NewSnapshotManager()); + manager->CreateBranch("branch1", current_snapshot_id_); EXPECT_THAT(manager->Commit(), IsOk()); ICEBERG_UNWRAP_OR_FAIL(auto reloaded, catalog_->LoadTable(table_ident_)); @@ -48,11 +60,11 @@ TEST_F(SnapshotManagerTest, CreateBranch) { EXPECT_NE(it, reloaded->metadata()->refs.end()); auto ref = it->second; EXPECT_EQ(ref->type(), SnapshotRefType::kBranch); - EXPECT_EQ(ref->snapshot_id, kCurrentSnapshotId); + EXPECT_EQ(ref->snapshot_id, current_snapshot_id_); } TEST_F(SnapshotManagerTest, CreateBranchWithoutSnapshotId) { - ICEBERG_UNWRAP_OR_FAIL(auto manager, SnapshotManager::Make("test_table", table_)); + ICEBERG_UNWRAP_OR_FAIL(auto manager, table_->NewSnapshotManager()); manager->CreateBranch("branch1"); EXPECT_THAT(manager->Commit(), IsOk()); @@ -61,12 +73,11 @@ TEST_F(SnapshotManagerTest, CreateBranchWithoutSnapshotId) { EXPECT_NE(it, reloaded->metadata()->refs.end()); auto ref = it->second; EXPECT_EQ(ref->type(), SnapshotRefType::kBranch); - EXPECT_EQ(ref->snapshot_id, kCurrentSnapshotId); + EXPECT_EQ(ref->snapshot_id, current_snapshot_id_); } TEST_F(SnapshotManagerTest, CreateBranchOnEmptyTable) { - ICEBERG_UNWRAP_OR_FAIL(auto manager, - SnapshotManager::Make("minimal_table", minimal_table_)); + ICEBERG_UNWRAP_OR_FAIL(auto manager, minimal_table_->NewSnapshotManager()); manager->CreateBranch("branch1"); EXPECT_THAT(manager->Commit(), IsOk()); @@ -80,35 +91,35 @@ TEST_F(SnapshotManagerTest, CreateBranchOnEmptyTable) { } TEST_F(SnapshotManagerTest, CreateBranchOnEmptyTableFailsWhenRefAlreadyExists) { - ICEBERG_UNWRAP_OR_FAIL(auto manager, - SnapshotManager::Make("minimal_table", minimal_table_)); + ICEBERG_UNWRAP_OR_FAIL(auto manager, minimal_table_->NewSnapshotManager()); manager->CreateBranch("branch1"); EXPECT_THAT(manager->Commit(), IsOk()); - ICEBERG_UNWRAP_OR_FAIL(auto manager2, - SnapshotManager::Make("minimal_table", minimal_table_)); + ICEBERG_UNWRAP_OR_FAIL(auto table_with_branch, + catalog_->LoadTable(minimal_table_ident_)); + ICEBERG_UNWRAP_OR_FAIL(auto manager2, table_with_branch->NewSnapshotManager()); manager2->CreateBranch("branch1"); auto result = manager2->Commit(); - EXPECT_THAT(result, IsError(ErrorKind::kCommitFailed)); - EXPECT_THAT(result, HasErrorMessage("branch 'branch1' was created concurrently")); + EXPECT_THAT(result, IsError(ErrorKind::kValidationFailed)); + EXPECT_THAT(result, HasErrorMessage("Ref branch1 already exists")); } TEST_F(SnapshotManagerTest, CreateBranchFailsWhenRefAlreadyExists) { - ICEBERG_UNWRAP_OR_FAIL(auto manager, SnapshotManager::Make("test_table", table_)); - manager->CreateBranch("branch1", kCurrentSnapshotId); + ICEBERG_UNWRAP_OR_FAIL(auto manager, table_->NewSnapshotManager()); + manager->CreateBranch("branch1", current_snapshot_id_); EXPECT_THAT(manager->Commit(), IsOk()); // Try to create a branch with an existing name - ICEBERG_UNWRAP_OR_FAIL(auto manager2, SnapshotManager::Make("test_table", table_)); - manager2->CreateBranch("branch1", kCurrentSnapshotId); + ICEBERG_UNWRAP_OR_FAIL(auto manager2, table_->NewSnapshotManager()); + manager2->CreateBranch("branch1", current_snapshot_id_); auto result = manager2->Commit(); EXPECT_THAT(result, IsError(ErrorKind::kCommitFailed)); EXPECT_THAT(result, HasErrorMessage("branch 'branch1' was created concurrently")); } TEST_F(SnapshotManagerTest, CreateTag) { - ICEBERG_UNWRAP_OR_FAIL(auto manager, SnapshotManager::Make("test_table", table_)); - manager->CreateTag("tag1", kCurrentSnapshotId); + ICEBERG_UNWRAP_OR_FAIL(auto manager, table_->NewSnapshotManager()); + manager->CreateTag("tag1", current_snapshot_id_); EXPECT_THAT(manager->Commit(), IsOk()); ICEBERG_UNWRAP_OR_FAIL(auto reloaded, catalog_->LoadTable(table_ident_)); @@ -116,30 +127,30 @@ TEST_F(SnapshotManagerTest, CreateTag) { EXPECT_NE(it, reloaded->metadata()->refs.end()); auto ref = it->second; EXPECT_EQ(ref->type(), SnapshotRefType::kTag); - EXPECT_EQ(ref->snapshot_id, kCurrentSnapshotId); + EXPECT_EQ(ref->snapshot_id, current_snapshot_id_); } TEST_F(SnapshotManagerTest, CreateTagFailsWhenRefAlreadyExists) { - ICEBERG_UNWRAP_OR_FAIL(auto manager, SnapshotManager::Make("test_table", table_)); - manager->CreateTag("tag1", kCurrentSnapshotId); + ICEBERG_UNWRAP_OR_FAIL(auto manager, table_->NewSnapshotManager()); + manager->CreateTag("tag1", current_snapshot_id_); EXPECT_THAT(manager->Commit(), IsOk()); // Try to create a tag with an existing name - ICEBERG_UNWRAP_OR_FAIL(auto manager2, SnapshotManager::Make("test_table", table_)); - manager2->CreateTag("tag1", kCurrentSnapshotId); + ICEBERG_UNWRAP_OR_FAIL(auto manager2, table_->NewSnapshotManager()); + manager2->CreateTag("tag1", current_snapshot_id_); auto result = manager2->Commit(); EXPECT_THAT(result, IsError(ErrorKind::kCommitFailed)); EXPECT_THAT(result, HasErrorMessage("tag 'tag1' was created concurrently")); } TEST_F(SnapshotManagerTest, RemoveBranch) { - ICEBERG_UNWRAP_OR_FAIL(auto manager, SnapshotManager::Make("test_table", table_)); - manager->CreateBranch("branch1", kCurrentSnapshotId); + ICEBERG_UNWRAP_OR_FAIL(auto manager, table_->NewSnapshotManager()); + manager->CreateBranch("branch1", current_snapshot_id_); EXPECT_THAT(manager->Commit(), IsOk()); { ICEBERG_UNWRAP_OR_FAIL(auto reloaded, catalog_->LoadTable(table_ident_)); - ICEBERG_UNWRAP_OR_FAIL(auto manager2, SnapshotManager::Make("test_table", reloaded)); + ICEBERG_UNWRAP_OR_FAIL(auto manager2, reloaded->NewSnapshotManager()); manager2->RemoveBranch("branch1"); EXPECT_THAT(manager2->Commit(), IsOk()); } @@ -151,7 +162,7 @@ TEST_F(SnapshotManagerTest, RemoveBranch) { } TEST_F(SnapshotManagerTest, RemovingNonExistingBranchFails) { - ICEBERG_UNWRAP_OR_FAIL(auto manager, SnapshotManager::Make("test_table", table_)); + ICEBERG_UNWRAP_OR_FAIL(auto manager, table_->NewSnapshotManager()); manager->RemoveBranch("non-existing"); auto result = manager->Commit(); EXPECT_THAT(result, IsError(ErrorKind::kValidationFailed)); @@ -159,7 +170,7 @@ TEST_F(SnapshotManagerTest, RemovingNonExistingBranchFails) { } TEST_F(SnapshotManagerTest, RemovingMainBranchFails) { - ICEBERG_UNWRAP_OR_FAIL(auto manager, SnapshotManager::Make("test_table", table_)); + ICEBERG_UNWRAP_OR_FAIL(auto manager, table_->NewSnapshotManager()); manager->RemoveBranch(std::string(SnapshotRef::kMainBranch)); auto result = manager->Commit(); EXPECT_THAT(result, IsError(ErrorKind::kValidationFailed)); @@ -167,13 +178,13 @@ TEST_F(SnapshotManagerTest, RemovingMainBranchFails) { } TEST_F(SnapshotManagerTest, RemoveTag) { - ICEBERG_UNWRAP_OR_FAIL(auto manager, SnapshotManager::Make("test_table", table_)); - manager->CreateTag("tag1", kCurrentSnapshotId); + ICEBERG_UNWRAP_OR_FAIL(auto manager, table_->NewSnapshotManager()); + manager->CreateTag("tag1", current_snapshot_id_); EXPECT_THAT(manager->Commit(), IsOk()); { ICEBERG_UNWRAP_OR_FAIL(auto reloaded, catalog_->LoadTable(table_ident_)); - ICEBERG_UNWRAP_OR_FAIL(auto manager2, SnapshotManager::Make("test_table", reloaded)); + ICEBERG_UNWRAP_OR_FAIL(auto manager2, reloaded->NewSnapshotManager()); manager2->RemoveTag("tag1"); EXPECT_THAT(manager2->Commit(), IsOk()); } @@ -185,7 +196,7 @@ TEST_F(SnapshotManagerTest, RemoveTag) { } TEST_F(SnapshotManagerTest, RemovingNonExistingTagFails) { - ICEBERG_UNWRAP_OR_FAIL(auto manager, SnapshotManager::Make("test_table", table_)); + ICEBERG_UNWRAP_OR_FAIL(auto manager, table_->NewSnapshotManager()); manager->RemoveTag("non-existing"); auto result = manager->Commit(); EXPECT_THAT(result, IsError(ErrorKind::kValidationFailed)); @@ -193,14 +204,14 @@ TEST_F(SnapshotManagerTest, RemovingNonExistingTagFails) { } TEST_F(SnapshotManagerTest, ReplaceBranch) { - ICEBERG_UNWRAP_OR_FAIL(auto manager, SnapshotManager::Make("test_table", table_)); - manager->CreateBranch("branch1", kOldestSnapshotId); - manager->CreateBranch("branch2", kCurrentSnapshotId); + ICEBERG_UNWRAP_OR_FAIL(auto manager, table_->NewSnapshotManager()); + manager->CreateBranch("branch1", oldest_snapshot_id_); + manager->CreateBranch("branch2", current_snapshot_id_); EXPECT_THAT(manager->Commit(), IsOk()); { ICEBERG_UNWRAP_OR_FAIL(auto reloaded, catalog_->LoadTable(table_ident_)); - ICEBERG_UNWRAP_OR_FAIL(auto manager2, SnapshotManager::Make("test_table", reloaded)); + ICEBERG_UNWRAP_OR_FAIL(auto manager2, reloaded->NewSnapshotManager()); manager2->ReplaceBranch("branch1", "branch2"); EXPECT_THAT(manager2->Commit(), IsOk()); } @@ -211,17 +222,17 @@ TEST_F(SnapshotManagerTest, ReplaceBranch) { EXPECT_NE(it, reloaded->metadata()->refs.end()); auto ref = it->second; EXPECT_NE(ref, nullptr); - EXPECT_EQ(ref->snapshot_id, kCurrentSnapshotId); + EXPECT_EQ(ref->snapshot_id, current_snapshot_id_); } } TEST_F(SnapshotManagerTest, ReplaceBranchNonExistingToBranchFails) { - ICEBERG_UNWRAP_OR_FAIL(auto manager, SnapshotManager::Make("test_table", table_)); - manager->CreateBranch("branch1", kCurrentSnapshotId); + ICEBERG_UNWRAP_OR_FAIL(auto manager, table_->NewSnapshotManager()); + manager->CreateBranch("branch1", current_snapshot_id_); EXPECT_THAT(manager->Commit(), IsOk()); ICEBERG_UNWRAP_OR_FAIL(auto reloaded, catalog_->LoadTable(table_ident_)); - ICEBERG_UNWRAP_OR_FAIL(auto manager2, SnapshotManager::Make("test_table", reloaded)); + ICEBERG_UNWRAP_OR_FAIL(auto manager2, reloaded->NewSnapshotManager()); manager2->ReplaceBranch("branch1", "non-existing"); auto result = manager2->Commit(); EXPECT_THAT(result, IsError(ErrorKind::kValidationFailed)); @@ -229,13 +240,13 @@ TEST_F(SnapshotManagerTest, ReplaceBranchNonExistingToBranchFails) { } TEST_F(SnapshotManagerTest, ReplaceBranchNonExistingFromBranchCreatesTheBranch) { - ICEBERG_UNWRAP_OR_FAIL(auto manager, SnapshotManager::Make("test_table", table_)); - manager->CreateBranch("branch1", kCurrentSnapshotId); + ICEBERG_UNWRAP_OR_FAIL(auto manager, table_->NewSnapshotManager()); + manager->CreateBranch("branch1", current_snapshot_id_); EXPECT_THAT(manager->Commit(), IsOk()); { ICEBERG_UNWRAP_OR_FAIL(auto reloaded, catalog_->LoadTable(table_ident_)); - ICEBERG_UNWRAP_OR_FAIL(auto manager2, SnapshotManager::Make("test_table", reloaded)); + ICEBERG_UNWRAP_OR_FAIL(auto manager2, reloaded->NewSnapshotManager()); manager2->ReplaceBranch("new-branch", "branch1"); EXPECT_THAT(manager2->Commit(), IsOk()); } @@ -246,18 +257,18 @@ TEST_F(SnapshotManagerTest, ReplaceBranchNonExistingFromBranchCreatesTheBranch) EXPECT_NE(it, reloaded->metadata()->refs.end()); auto ref = it->second; EXPECT_EQ(ref->type(), SnapshotRefType::kBranch); - EXPECT_EQ(ref->snapshot_id, kCurrentSnapshotId); + EXPECT_EQ(ref->snapshot_id, current_snapshot_id_); } } TEST_F(SnapshotManagerTest, FastForwardBranchNonExistingFromBranchCreatesTheBranch) { - ICEBERG_UNWRAP_OR_FAIL(auto manager, SnapshotManager::Make("test_table", table_)); - manager->CreateBranch("branch1", kCurrentSnapshotId); + ICEBERG_UNWRAP_OR_FAIL(auto manager, table_->NewSnapshotManager()); + manager->CreateBranch("branch1", current_snapshot_id_); EXPECT_THAT(manager->Commit(), IsOk()); { ICEBERG_UNWRAP_OR_FAIL(auto reloaded, catalog_->LoadTable(table_ident_)); - ICEBERG_UNWRAP_OR_FAIL(auto manager2, SnapshotManager::Make("test_table", reloaded)); + ICEBERG_UNWRAP_OR_FAIL(auto manager2, reloaded->NewSnapshotManager()); manager2->FastForwardBranch("new-branch", "branch1"); EXPECT_THAT(manager2->Commit(), IsOk()); } @@ -268,17 +279,17 @@ TEST_F(SnapshotManagerTest, FastForwardBranchNonExistingFromBranchCreatesTheBran EXPECT_NE(it, reloaded->metadata()->refs.end()); auto ref = it->second; EXPECT_EQ(ref->type(), SnapshotRefType::kBranch); - EXPECT_EQ(ref->snapshot_id, kCurrentSnapshotId); + EXPECT_EQ(ref->snapshot_id, current_snapshot_id_); } } TEST_F(SnapshotManagerTest, FastForwardBranchNonExistingToFails) { - ICEBERG_UNWRAP_OR_FAIL(auto manager, SnapshotManager::Make("test_table", table_)); - manager->CreateBranch("branch1", kCurrentSnapshotId); + ICEBERG_UNWRAP_OR_FAIL(auto manager, table_->NewSnapshotManager()); + manager->CreateBranch("branch1", current_snapshot_id_); EXPECT_THAT(manager->Commit(), IsOk()); ICEBERG_UNWRAP_OR_FAIL(auto reloaded, catalog_->LoadTable(table_ident_)); - ICEBERG_UNWRAP_OR_FAIL(auto manager2, SnapshotManager::Make("test_table", reloaded)); + ICEBERG_UNWRAP_OR_FAIL(auto manager2, reloaded->NewSnapshotManager()); manager2->FastForwardBranch("branch1", "non-existing"); auto result = manager2->Commit(); EXPECT_THAT(result, IsError(ErrorKind::kValidationFailed)); @@ -286,14 +297,14 @@ TEST_F(SnapshotManagerTest, FastForwardBranchNonExistingToFails) { } TEST_F(SnapshotManagerTest, ReplaceTag) { - ICEBERG_UNWRAP_OR_FAIL(auto manager, SnapshotManager::Make("test_table", table_)); - manager->CreateTag("tag1", kCurrentSnapshotId); + ICEBERG_UNWRAP_OR_FAIL(auto manager, table_->NewSnapshotManager()); + manager->CreateTag("tag1", current_snapshot_id_); EXPECT_THAT(manager->Commit(), IsOk()); { ICEBERG_UNWRAP_OR_FAIL(auto reloaded, catalog_->LoadTable(table_ident_)); - ICEBERG_UNWRAP_OR_FAIL(auto manager2, SnapshotManager::Make("test_table", reloaded)); - manager2->ReplaceTag("tag1", kCurrentSnapshotId); + ICEBERG_UNWRAP_OR_FAIL(auto manager2, reloaded->NewSnapshotManager()); + manager2->ReplaceTag("tag1", current_snapshot_id_); EXPECT_THAT(manager2->Commit(), IsOk()); } @@ -303,18 +314,18 @@ TEST_F(SnapshotManagerTest, ReplaceTag) { EXPECT_NE(it, reloaded->metadata()->refs.end()); auto ref = it->second; EXPECT_NE(ref, nullptr); - EXPECT_EQ(ref->snapshot_id, kCurrentSnapshotId); + EXPECT_EQ(ref->snapshot_id, current_snapshot_id_); } } TEST_F(SnapshotManagerTest, UpdatingBranchRetention) { - ICEBERG_UNWRAP_OR_FAIL(auto manager, SnapshotManager::Make("test_table", table_)); - manager->CreateBranch("branch1", kCurrentSnapshotId); + ICEBERG_UNWRAP_OR_FAIL(auto manager, table_->NewSnapshotManager()); + manager->CreateBranch("branch1", current_snapshot_id_); EXPECT_THAT(manager->Commit(), IsOk()); { ICEBERG_UNWRAP_OR_FAIL(auto reloaded, catalog_->LoadTable(table_ident_)); - ICEBERG_UNWRAP_OR_FAIL(auto manager2, SnapshotManager::Make("test_table", reloaded)); + ICEBERG_UNWRAP_OR_FAIL(auto manager2, reloaded->NewSnapshotManager()); manager2->SetMinSnapshotsToKeep("branch1", 10); manager2->SetMaxSnapshotAgeMs("branch1", 20000); EXPECT_THAT(manager2->Commit(), IsOk()); @@ -333,13 +344,13 @@ TEST_F(SnapshotManagerTest, UpdatingBranchRetention) { } TEST_F(SnapshotManagerTest, SettingBranchRetentionOnTagFails) { - ICEBERG_UNWRAP_OR_FAIL(auto manager, SnapshotManager::Make("test_table", table_)); - manager->CreateTag("tag1", kCurrentSnapshotId); + ICEBERG_UNWRAP_OR_FAIL(auto manager, table_->NewSnapshotManager()); + manager->CreateTag("tag1", current_snapshot_id_); EXPECT_THAT(manager->Commit(), IsOk()); { ICEBERG_UNWRAP_OR_FAIL(auto reloaded, catalog_->LoadTable(table_ident_)); - ICEBERG_UNWRAP_OR_FAIL(auto manager2, SnapshotManager::Make("test_table", reloaded)); + ICEBERG_UNWRAP_OR_FAIL(auto manager2, reloaded->NewSnapshotManager()); manager2->SetMinSnapshotsToKeep("tag1", 10); auto result = manager2->Commit(); EXPECT_THAT(result, IsError(ErrorKind::kValidationFailed)); @@ -348,7 +359,7 @@ TEST_F(SnapshotManagerTest, SettingBranchRetentionOnTagFails) { { ICEBERG_UNWRAP_OR_FAIL(auto reloaded, catalog_->LoadTable(table_ident_)); - ICEBERG_UNWRAP_OR_FAIL(auto manager2, SnapshotManager::Make("test_table", reloaded)); + ICEBERG_UNWRAP_OR_FAIL(auto manager2, reloaded->NewSnapshotManager()); manager2->SetMaxSnapshotAgeMs("tag1", 10); auto result = manager2->Commit(); EXPECT_THAT(result, IsError(ErrorKind::kValidationFailed)); @@ -357,13 +368,13 @@ TEST_F(SnapshotManagerTest, SettingBranchRetentionOnTagFails) { } TEST_F(SnapshotManagerTest, UpdatingBranchMaxRefAge) { - ICEBERG_UNWRAP_OR_FAIL(auto manager, SnapshotManager::Make("test_table", table_)); - manager->CreateBranch("branch1", kCurrentSnapshotId); + ICEBERG_UNWRAP_OR_FAIL(auto manager, table_->NewSnapshotManager()); + manager->CreateBranch("branch1", current_snapshot_id_); EXPECT_THAT(manager->Commit(), IsOk()); { ICEBERG_UNWRAP_OR_FAIL(auto reloaded, catalog_->LoadTable(table_ident_)); - ICEBERG_UNWRAP_OR_FAIL(auto manager2, SnapshotManager::Make("test_table", reloaded)); + ICEBERG_UNWRAP_OR_FAIL(auto manager2, reloaded->NewSnapshotManager()); manager2->SetMaxRefAgeMs("branch1", 10000); EXPECT_THAT(manager2->Commit(), IsOk()); } @@ -378,13 +389,13 @@ TEST_F(SnapshotManagerTest, UpdatingBranchMaxRefAge) { } TEST_F(SnapshotManagerTest, UpdatingTagMaxRefAge) { - ICEBERG_UNWRAP_OR_FAIL(auto manager, SnapshotManager::Make("test_table", table_)); - manager->CreateTag("tag1", kCurrentSnapshotId); + ICEBERG_UNWRAP_OR_FAIL(auto manager, table_->NewSnapshotManager()); + manager->CreateTag("tag1", current_snapshot_id_); EXPECT_THAT(manager->Commit(), IsOk()); { ICEBERG_UNWRAP_OR_FAIL(auto reloaded, catalog_->LoadTable(table_ident_)); - ICEBERG_UNWRAP_OR_FAIL(auto manager2, SnapshotManager::Make("test_table", reloaded)); + ICEBERG_UNWRAP_OR_FAIL(auto manager2, reloaded->NewSnapshotManager()); manager2->SetMaxRefAgeMs("tag1", 10000); EXPECT_THAT(manager2->Commit(), IsOk()); } @@ -399,13 +410,13 @@ TEST_F(SnapshotManagerTest, UpdatingTagMaxRefAge) { } TEST_F(SnapshotManagerTest, RenameBranch) { - ICEBERG_UNWRAP_OR_FAIL(auto manager, SnapshotManager::Make("test_table", table_)); - manager->CreateBranch("branch1", kCurrentSnapshotId); + ICEBERG_UNWRAP_OR_FAIL(auto manager, table_->NewSnapshotManager()); + manager->CreateBranch("branch1", current_snapshot_id_); EXPECT_THAT(manager->Commit(), IsOk()); { ICEBERG_UNWRAP_OR_FAIL(auto reloaded, catalog_->LoadTable(table_ident_)); - ICEBERG_UNWRAP_OR_FAIL(auto manager2, SnapshotManager::Make("test_table", reloaded)); + ICEBERG_UNWRAP_OR_FAIL(auto manager2, reloaded->NewSnapshotManager()); manager2->RenameBranch("branch1", "branch2"); EXPECT_THAT(manager2->Commit(), IsOk()); } @@ -418,12 +429,12 @@ TEST_F(SnapshotManagerTest, RenameBranch) { auto it2 = reloaded->metadata()->refs.find("branch2"); EXPECT_NE(it2, reloaded->metadata()->refs.end()); auto ref2 = it2->second; - EXPECT_EQ(ref2->snapshot_id, kCurrentSnapshotId); + EXPECT_EQ(ref2->snapshot_id, current_snapshot_id_); } } TEST_F(SnapshotManagerTest, FailRenamingMainBranch) { - ICEBERG_UNWRAP_OR_FAIL(auto manager, SnapshotManager::Make("test_table", table_)); + ICEBERG_UNWRAP_OR_FAIL(auto manager, table_->NewSnapshotManager()); manager->RenameBranch(std::string(SnapshotRef::kMainBranch), "some-branch"); auto result = manager->Commit(); EXPECT_THAT(result, IsError(ErrorKind::kValidationFailed)); @@ -431,7 +442,7 @@ TEST_F(SnapshotManagerTest, FailRenamingMainBranch) { } TEST_F(SnapshotManagerTest, RenamingNonExistingBranchFails) { - ICEBERG_UNWRAP_OR_FAIL(auto manager, SnapshotManager::Make("test_table", table_)); + ICEBERG_UNWRAP_OR_FAIL(auto manager, table_->NewSnapshotManager()); manager->RenameBranch("some-missing-branch", "some-branch"); auto result = manager->Commit(); EXPECT_THAT(result, IsError(ErrorKind::kValidationFailed)); @@ -439,56 +450,63 @@ TEST_F(SnapshotManagerTest, RenamingNonExistingBranchFails) { } TEST_F(SnapshotManagerTest, RollbackTo) { - ICEBERG_UNWRAP_OR_FAIL(auto manager, SnapshotManager::Make("test_table", table_)); - manager->RollbackTo(kOldestSnapshotId); + ICEBERG_UNWRAP_OR_FAIL(auto manager, table_->NewSnapshotManager()); + manager->RollbackTo(oldest_snapshot_id_); EXPECT_THAT(manager->Commit(), IsOk()); ICEBERG_UNWRAP_OR_FAIL(auto reloaded, catalog_->LoadTable(table_ident_)); ICEBERG_UNWRAP_OR_FAIL(auto current_snapshot, reloaded->current_snapshot()); - EXPECT_EQ(current_snapshot->snapshot_id, kOldestSnapshotId); + EXPECT_EQ(current_snapshot->snapshot_id, oldest_snapshot_id_); } TEST_F(SnapshotManagerTest, SetCurrentSnapshot) { - ICEBERG_UNWRAP_OR_FAIL(auto manager, SnapshotManager::Make("test_table", table_)); - manager->SetCurrentSnapshot(kOldestSnapshotId); + ICEBERG_UNWRAP_OR_FAIL(auto manager, table_->NewSnapshotManager()); + manager->SetCurrentSnapshot(oldest_snapshot_id_); EXPECT_THAT(manager->Commit(), IsOk()); ICEBERG_UNWRAP_OR_FAIL(auto reloaded, catalog_->LoadTable(table_ident_)); ICEBERG_UNWRAP_OR_FAIL(auto current_snapshot, reloaded->current_snapshot()); - EXPECT_EQ(current_snapshot->snapshot_id, kOldestSnapshotId); + EXPECT_EQ(current_snapshot->snapshot_id, oldest_snapshot_id_); } TEST_F(SnapshotManagerTest, CreateReferencesAndRollback) { - ICEBERG_UNWRAP_OR_FAIL(auto manager, SnapshotManager::Make("test_table", table_)); - manager->CreateBranch("branch1", kCurrentSnapshotId); - manager->CreateTag("tag1", kCurrentSnapshotId); - manager->RollbackTo(kOldestSnapshotId); + ICEBERG_UNWRAP_OR_FAIL(auto manager, table_->NewSnapshotManager()); + manager->CreateBranch("branch1", current_snapshot_id_); + manager->CreateTag("tag1", current_snapshot_id_); EXPECT_THAT(manager->Commit(), IsOk()); - ICEBERG_UNWRAP_OR_FAIL(auto reloaded, catalog_->LoadTable(table_ident_)); - ICEBERG_UNWRAP_OR_FAIL(auto current_snapshot, reloaded->current_snapshot()); - EXPECT_EQ(current_snapshot->snapshot_id, kOldestSnapshotId); + { + ICEBERG_UNWRAP_OR_FAIL(auto reloaded, catalog_->LoadTable(table_ident_)); + ICEBERG_UNWRAP_OR_FAIL(auto manager2, reloaded->NewSnapshotManager()); + manager2->RollbackTo(oldest_snapshot_id_); + } - auto branch_it = reloaded->metadata()->refs.find("branch1"); - EXPECT_NE(branch_it, reloaded->metadata()->refs.end()); - EXPECT_EQ(branch_it->second->snapshot_id, kCurrentSnapshotId); + { + ICEBERG_UNWRAP_OR_FAIL(auto reloaded, catalog_->LoadTable(table_ident_)); + ICEBERG_UNWRAP_OR_FAIL(auto current_snapshot, reloaded->current_snapshot()); + EXPECT_EQ(current_snapshot->snapshot_id, oldest_snapshot_id_); - auto tag_it = reloaded->metadata()->refs.find("tag1"); - EXPECT_NE(tag_it, reloaded->metadata()->refs.end()); - EXPECT_EQ(tag_it->second->snapshot_id, kCurrentSnapshotId); + auto branch_it = reloaded->metadata()->refs.find("branch1"); + EXPECT_NE(branch_it, reloaded->metadata()->refs.end()); + EXPECT_EQ(branch_it->second->snapshot_id, current_snapshot_id_); + + auto tag_it = reloaded->metadata()->refs.find("tag1"); + EXPECT_NE(tag_it, reloaded->metadata()->refs.end()); + EXPECT_EQ(tag_it->second->snapshot_id, current_snapshot_id_); + } } TEST_F(SnapshotManagerTest, SnapshotManagerThroughTransaction) { ICEBERG_UNWRAP_OR_FAIL(auto txn, table_->NewTransaction()); ICEBERG_UNWRAP_OR_FAIL(auto manager, SnapshotManager::Make(txn)); - manager->RollbackTo(kOldestSnapshotId); + manager->RollbackTo(oldest_snapshot_id_); EXPECT_THAT(manager->Commit(), IsOk()); EXPECT_THAT(txn->Commit(), IsOk()); ICEBERG_UNWRAP_OR_FAIL(auto reloaded, catalog_->LoadTable(table_ident_)); ICEBERG_UNWRAP_OR_FAIL(auto current_snapshot, reloaded->current_snapshot()); - EXPECT_EQ(current_snapshot->snapshot_id, kOldestSnapshotId); + EXPECT_EQ(current_snapshot->snapshot_id, oldest_snapshot_id_); } } // namespace iceberg diff --git a/src/iceberg/transaction.cc b/src/iceberg/transaction.cc index 981823d04..83d4c02db 100644 --- a/src/iceberg/transaction.cc +++ b/src/iceberg/transaction.cc @@ -63,9 +63,7 @@ Transaction::~Transaction() = default; Result> Transaction::Make(std::shared_ptr
table, Kind kind, bool auto_commit) { - if (!table || !table->catalog()) [[unlikely]] { - return InvalidArgument("Table and catalog cannot be null"); - } + ICEBERG_PRECHECK(table && table->catalog(), "Table and catalog cannot be null"); std::unique_ptr metadata_builder; if (kind == Kind::kCreate) { @@ -94,9 +92,11 @@ std::string Transaction::MetadataFileLocation(std::string_view filename) const { } Status Transaction::AddUpdate(const std::shared_ptr& update) { - if (!last_update_committed_) { - return InvalidArgument("Cannot add update when previous update is not committed"); - } + ICEBERG_PRECHECK(update->kind() != PendingUpdate::Kind::kSnapshotManager, + "SnapshotManager updates should not be added to the transaction"); + ICEBERG_CHECK(last_update_committed_, + "Cannot add update when previous update is not committed"); + pending_updates_.emplace_back(std::weak_ptr(update)); last_update_committed_ = false; return {}; @@ -302,13 +302,9 @@ Status Transaction::ApplyUpdatePartitionStatistics(UpdatePartitionStatistics& up } Result> Transaction::Commit() { - if (committed_) { - return Invalid("Transaction already committed"); - } - if (!last_update_committed_) { - return InvalidArgument( - "Cannot commit transaction when previous update is not committed"); - } + ICEBERG_CHECK(!committed_, "Transaction already committed"); + ICEBERG_CHECK(last_update_committed_, + "Cannot commit transaction when previous update is not committed"); const auto& updates = metadata_builder_->changes(); if (updates.empty()) { @@ -432,7 +428,8 @@ Transaction::NewUpdateSnapshotReference() { Result> Transaction::NewSnapshotManager() { ICEBERG_ASSIGN_OR_RAISE(std::shared_ptr snapshot_manager, SnapshotManager::Make(shared_from_this())); - ICEBERG_RETURN_UNEXPECTED(AddUpdate(snapshot_manager)); + // SnapshotManager has its own commit logic, so it is not added to the pending updates. + // This differs from the Java implementation. return snapshot_manager; } diff --git a/src/iceberg/type_fwd.h b/src/iceberg/type_fwd.h index a712c2ede..7a3f50df2 100644 --- a/src/iceberg/type_fwd.h +++ b/src/iceberg/type_fwd.h @@ -184,7 +184,6 @@ class TableProperties; /// \brief Table update. class TableMetadataBuilder; class TableUpdate; -class SnapshotManager; class TableRequirement; class TableUpdateContext; class Transaction; @@ -194,6 +193,7 @@ class ExpireSnapshots; class FastAppend; class PendingUpdate; class SetSnapshot; +class SnapshotManager; class SnapshotUpdate; class UpdateLocation; class UpdatePartitionSpec; diff --git a/src/iceberg/update/pending_update.h b/src/iceberg/update/pending_update.h index f44812a85..558d60726 100644 --- a/src/iceberg/update/pending_update.h +++ b/src/iceberg/update/pending_update.h @@ -44,6 +44,7 @@ class ICEBERG_EXPORT PendingUpdate : public ErrorCollector { enum class Kind : uint8_t { kExpireSnapshots, kSetSnapshot, + kSnapshotManager, kUpdateLocation, kUpdatePartitionSpec, kUpdatePartitionStatistics, diff --git a/src/iceberg/update/snapshot_manager.cc b/src/iceberg/update/snapshot_manager.cc index 5afedb455..da2bf3b88 100644 --- a/src/iceberg/update/snapshot_manager.cc +++ b/src/iceberg/update/snapshot_manager.cc @@ -34,48 +34,24 @@ namespace iceberg { -Result> SnapshotManager::Make( - const std::string& table_name, std::shared_ptr
table) { - if (table == nullptr) { - return InvalidArgument("Table cannot be null"); - } - if (table->metadata() == nullptr) { - return InvalidArgument("Cannot manage snapshots: table {} does not exist", - table_name); - } - // Create a transaction first - ICEBERG_ASSIGN_OR_RAISE(auto transaction, - Transaction::Make(table, Transaction::Kind::kUpdate, - /*auto_commit=*/false)); - auto manager = std::shared_ptr( - new SnapshotManager(std::move(transaction), /*is_external=*/false)); - return manager; -} - Result> SnapshotManager::Make( std::shared_ptr transaction) { - if (transaction == nullptr) { - return InvalidArgument("Invalid input transaction: null"); - } - return std::shared_ptr( - new SnapshotManager(std::move(transaction), /*is_external=*/true)); + ICEBERG_PRECHECK(transaction != nullptr, "Invalid input transaction: null"); + return std::shared_ptr(new SnapshotManager(std::move(transaction))); } -SnapshotManager::SnapshotManager(std::shared_ptr transaction, - bool is_external) - : PendingUpdate(transaction), is_external_transaction_(is_external) {} +SnapshotManager::SnapshotManager(std::shared_ptr transaction) + : PendingUpdate(transaction) {} SnapshotManager::~SnapshotManager() = default; SnapshotManager& SnapshotManager::Cherrypick(int64_t snapshot_id) { - ICEBERG_BUILDER_RETURN_IF_ERROR(CommitIfRefUpdatesExist()); // TODO(anyone): Implement cherrypick operation ICEBERG_BUILDER_CHECK(false, "Cherrypick operation not yet implemented"); return *this; } SnapshotManager& SnapshotManager::SetCurrentSnapshot(int64_t snapshot_id) { - ICEBERG_BUILDER_RETURN_IF_ERROR(CommitIfRefUpdatesExist()); ICEBERG_BUILDER_ASSIGN_OR_RETURN(auto set_snapshot, transaction_->NewSetSnapshot()); set_snapshot->SetCurrentSnapshot(snapshot_id); ICEBERG_BUILDER_RETURN_IF_ERROR(set_snapshot->Commit()); @@ -83,7 +59,6 @@ SnapshotManager& SnapshotManager::SetCurrentSnapshot(int64_t snapshot_id) { } SnapshotManager& SnapshotManager::RollbackToTime(TimePointMs timestamp_ms) { - ICEBERG_BUILDER_RETURN_IF_ERROR(CommitIfRefUpdatesExist()); ICEBERG_BUILDER_ASSIGN_OR_RETURN(auto set_snapshot, transaction_->NewSetSnapshot()); set_snapshot->RollbackToTime(UnixMsFromTimePointMs(timestamp_ms)); ICEBERG_BUILDER_RETURN_IF_ERROR(set_snapshot->Commit()); @@ -91,7 +66,6 @@ SnapshotManager& SnapshotManager::RollbackToTime(TimePointMs timestamp_ms) { } SnapshotManager& SnapshotManager::RollbackTo(int64_t snapshot_id) { - ICEBERG_BUILDER_RETURN_IF_ERROR(CommitIfRefUpdatesExist()); ICEBERG_BUILDER_ASSIGN_OR_RETURN(auto set_snapshot, transaction_->NewSetSnapshot()); set_snapshot->RollbackTo(snapshot_id); ICEBERG_BUILDER_RETURN_IF_ERROR(set_snapshot->Commit()); @@ -101,9 +75,8 @@ SnapshotManager& SnapshotManager::RollbackTo(int64_t snapshot_id) { SnapshotManager& SnapshotManager::CreateBranch(const std::string& name) { if (base().current_snapshot_id != kInvalidSnapshotId) { ICEBERG_BUILDER_ASSIGN_OR_RETURN(auto current_snapshot, base().Snapshot()); - if (current_snapshot != nullptr) { - return CreateBranch(name, current_snapshot->snapshot_id); - } + ICEBERG_DCHECK(current_snapshot != nullptr, "Current snapshot should not be null"); + return CreateBranch(name, current_snapshot->snapshot_id); } const auto& current_refs = base().refs; ICEBERG_BUILDER_CHECK(!base().refs.contains(name), "Ref {} already exists", name); @@ -197,11 +170,8 @@ SnapshotManager& SnapshotManager::SetMaxRefAgeMs(const std::string& name, Result> SnapshotManager::Apply() { return base().Snapshot(); } Status SnapshotManager::Commit() { - ICEBERG_RETURN_UNEXPECTED(CommitIfRefUpdatesExist()); - if (!is_external_transaction_) { - ICEBERG_RETURN_UNEXPECTED(transaction_->Commit()); - } - return {}; + ICEBERG_RETURN_UNEXPECTED(CheckErrors()); + return CommitIfRefUpdatesExist(); } Result> diff --git a/src/iceberg/update/snapshot_manager.h b/src/iceberg/update/snapshot_manager.h index 6315ea701..344109283 100644 --- a/src/iceberg/update/snapshot_manager.h +++ b/src/iceberg/update/snapshot_manager.h @@ -37,14 +37,6 @@ namespace iceberg { /// snapshots, and managing branches and tags. class ICEBERG_EXPORT SnapshotManager : public PendingUpdate { public: - /// \brief Create a SnapshotManager for a table. - /// - /// \param table_name The name of the table - /// \param table The table to manage snapshots for - /// \return A new SnapshotManager instance, or an error if the table doesn't exist - static Result> Make(const std::string& table_name, - std::shared_ptr
table); - /// \brief Create a SnapshotManager from an existing transaction. /// /// \param transaction The transaction to use @@ -194,9 +186,7 @@ class ICEBERG_EXPORT SnapshotManager : public PendingUpdate { /// \brief Constructor for creating a SnapshotManager with a transaction. /// /// \param transaction The transaction to use - /// \param is_external Whether this is an external transaction (true) or created - /// internally (false) - SnapshotManager(std::shared_ptr transaction, bool is_external); + explicit SnapshotManager(std::shared_ptr transaction); /// \brief Get or create the UpdateSnapshotReference operation. Result> UpdateSnapshotReferencesOperation(); @@ -204,7 +194,6 @@ class ICEBERG_EXPORT SnapshotManager : public PendingUpdate { /// \brief Commit any pending reference updates if they exist. Status CommitIfRefUpdatesExist(); - bool is_external_transaction_; std::shared_ptr update_snapshot_references_operation_; }; From 3fdb4611130c1e52342839e8c003399ec4de8613 Mon Sep 17 00:00:00 2001 From: Junwang Zhao Date: Tue, 3 Feb 2026 16:34:29 +0800 Subject: [PATCH 3/4] fix: SnapshotManager::kind() returns kSnapshotManager --- src/iceberg/update/snapshot_manager.h | 3 +-- 1 file changed, 1 insertion(+), 2 deletions(-) diff --git a/src/iceberg/update/snapshot_manager.h b/src/iceberg/update/snapshot_manager.h index 344109283..822795cfc 100644 --- a/src/iceberg/update/snapshot_manager.h +++ b/src/iceberg/update/snapshot_manager.h @@ -46,8 +46,7 @@ class ICEBERG_EXPORT SnapshotManager : public PendingUpdate { ~SnapshotManager() override; - // TODO(xxx): is this correct? - Kind kind() const final { return Kind::kUpdateSnapshotReference; } + Kind kind() const final { return Kind::kSnapshotManager; } /// \brief Apply supported changes in given snapshot and create a new snapshot which /// will be set as the current snapshot on commit. From 7d30c1186afc9e233d1f1c3193c7cf2f385347c2 Mon Sep 17 00:00:00 2001 From: Junwang Zhao Date: Thu, 5 Feb 2026 19:41:07 +0800 Subject: [PATCH 4/4] fix: review comments --- src/iceberg/transaction.cc | 2 +- src/iceberg/update/snapshot_manager.cc | 22 ++++++------ src/iceberg/update/snapshot_manager.h | 47 +++++++++++--------------- src/iceberg/update/snapshot_update.cc | 14 -------- src/iceberg/update/snapshot_update.h | 19 ++++++++--- 5 files changed, 45 insertions(+), 59 deletions(-) diff --git a/src/iceberg/transaction.cc b/src/iceberg/transaction.cc index 83d4c02db..16aa0e31e 100644 --- a/src/iceberg/transaction.cc +++ b/src/iceberg/transaction.cc @@ -93,7 +93,7 @@ std::string Transaction::MetadataFileLocation(std::string_view filename) const { Status Transaction::AddUpdate(const std::shared_ptr& update) { ICEBERG_PRECHECK(update->kind() != PendingUpdate::Kind::kSnapshotManager, - "SnapshotManager updates should not be added to the transaction"); + "SnapshotManager should not be added to the transaction"); ICEBERG_CHECK(last_update_committed_, "Cannot add update when previous update is not committed"); diff --git a/src/iceberg/update/snapshot_manager.cc b/src/iceberg/update/snapshot_manager.cc index da2bf3b88..bd7e05a6f 100644 --- a/src/iceberg/update/snapshot_manager.cc +++ b/src/iceberg/update/snapshot_manager.cc @@ -41,7 +41,7 @@ Result> SnapshotManager::Make( } SnapshotManager::SnapshotManager(std::shared_ptr transaction) - : PendingUpdate(transaction) {} + : PendingUpdate(std::move(transaction)) {} SnapshotManager::~SnapshotManager() = default; @@ -58,9 +58,9 @@ SnapshotManager& SnapshotManager::SetCurrentSnapshot(int64_t snapshot_id) { return *this; } -SnapshotManager& SnapshotManager::RollbackToTime(TimePointMs timestamp_ms) { +SnapshotManager& SnapshotManager::RollbackToTime(int64_t timestamp_ms) { ICEBERG_BUILDER_ASSIGN_OR_RETURN(auto set_snapshot, transaction_->NewSetSnapshot()); - set_snapshot->RollbackToTime(UnixMsFromTimePointMs(timestamp_ms)); + set_snapshot->RollbackToTime(timestamp_ms); ICEBERG_BUILDER_RETURN_IF_ERROR(set_snapshot->Commit()); return *this; } @@ -81,7 +81,7 @@ SnapshotManager& SnapshotManager::CreateBranch(const std::string& name) { const auto& current_refs = base().refs; ICEBERG_BUILDER_CHECK(!base().refs.contains(name), "Ref {} already exists", name); ICEBERG_BUILDER_ASSIGN_OR_RETURN(auto fast_append, transaction_->NewFastAppend()); - ICEBERG_BUILDER_RETURN_IF_ERROR(fast_append->ToBranch(name).Commit()); + ICEBERG_BUILDER_RETURN_IF_ERROR(fast_append->SetTargetBranch(name).Commit()); return *this; } @@ -167,8 +167,6 @@ SnapshotManager& SnapshotManager::SetMaxRefAgeMs(const std::string& name, return *this; } -Result> SnapshotManager::Apply() { return base().Snapshot(); } - Status SnapshotManager::Commit() { ICEBERG_RETURN_UNEXPECTED(CheckErrors()); return CommitIfRefUpdatesExist(); @@ -176,17 +174,17 @@ Status SnapshotManager::Commit() { Result> SnapshotManager::UpdateSnapshotReferencesOperation() { - if (update_snapshot_references_operation_ == nullptr) { - ICEBERG_ASSIGN_OR_RAISE(update_snapshot_references_operation_, + if (update_snap_refs_ == nullptr) { + ICEBERG_ASSIGN_OR_RAISE(update_snap_refs_, transaction_->NewUpdateSnapshotReference()); } - return update_snapshot_references_operation_; + return update_snap_refs_; } Status SnapshotManager::CommitIfRefUpdatesExist() { - if (update_snapshot_references_operation_ != nullptr) { - ICEBERG_RETURN_UNEXPECTED(update_snapshot_references_operation_->Commit()); - update_snapshot_references_operation_ = nullptr; + if (update_snap_refs_ != nullptr) { + ICEBERG_RETURN_UNEXPECTED(update_snap_refs_->Commit()); + update_snap_refs_ = nullptr; } return {}; } diff --git a/src/iceberg/update/snapshot_manager.h b/src/iceberg/update/snapshot_manager.h index 822795cfc..d895be67b 100644 --- a/src/iceberg/update/snapshot_manager.h +++ b/src/iceberg/update/snapshot_manager.h @@ -24,10 +24,8 @@ #include "iceberg/iceberg_export.h" #include "iceberg/result.h" -#include "iceberg/snapshot.h" #include "iceberg/type_fwd.h" #include "iceberg/update/pending_update.h" -#include "iceberg/util/timepoint.h" namespace iceberg { @@ -51,25 +49,25 @@ class ICEBERG_EXPORT SnapshotManager : public PendingUpdate { /// \brief Apply supported changes in given snapshot and create a new snapshot which /// will be set as the current snapshot on commit. /// - /// \param snapshot_id A snapshot ID whose changes to apply + /// \param snapshot_id A Snapshot ID whose changes to apply /// \return Reference to this for method chaining SnapshotManager& Cherrypick(int64_t snapshot_id); - /// \brief Roll this table's data back to a specific Snapshot identified by id. + /// \brief Roll this table's data back to a specific Snapshot ID. /// - /// \param snapshot_id Long id of the snapshot to roll back table data to + /// \param snapshot_id Snapshot ID to roll back table data to /// \return Reference to this for method chaining SnapshotManager& SetCurrentSnapshot(int64_t snapshot_id); - /// \brief Roll this table's data back to the last Snapshot before the given timestamp. + /// \brief Roll this table's data back to the last snapshot before the given timestamp. /// /// \param timestamp_ms A timestamp in milliseconds /// \return Reference to this for method chaining - SnapshotManager& RollbackToTime(TimePointMs timestamp_ms); + SnapshotManager& RollbackToTime(int64_t timestamp_ms); - /// \brief Rollback table's state to a specific Snapshot identified by id. + /// \brief Rollback table's state to a specific Snapshot ID. /// - /// \param snapshot_id Long id of snapshot id to roll back table to. Must be an ancestor + /// \param snapshot_id Snapshot ID to roll back table to. Must be an ancestor /// of the current snapshot /// \return Reference to this for method chaining SnapshotManager& RollbackTo(int64_t snapshot_id); @@ -82,14 +80,14 @@ class ICEBERG_EXPORT SnapshotManager : public PendingUpdate { /// \return Reference to this for method chaining SnapshotManager& CreateBranch(const std::string& name); - /// \brief Create a new branch pointing to the given snapshot id. + /// \brief Create a new branch pointing to the given Snapshot ID. /// /// \param name Branch name - /// \param snapshot_id ID of the snapshot which will be the head of the branch + /// \param snapshot_id Snapshot ID which will be the head of the branch /// \return Reference to this for method chaining SnapshotManager& CreateBranch(const std::string& name, int64_t snapshot_id); - /// \brief Create a new tag pointing to the given snapshot id. + /// \brief Create a new tag pointing to the given Snapshot ID. /// /// \param name Tag name /// \param snapshot_id Snapshot ID for the head of the new tag @@ -111,30 +109,30 @@ class ICEBERG_EXPORT SnapshotManager : public PendingUpdate { /// \brief Replaces the tag with the given name to point to the specified snapshot. /// /// \param name Tag to replace - /// \param snapshot_id New snapshot id for the given tag + /// \param snapshot_id New Snapshot ID for the given tag /// \return Reference to this for method chaining SnapshotManager& ReplaceTag(const std::string& name, int64_t snapshot_id); /// \brief Replaces the branch with the given name to point to the specified snapshot. /// /// \param name Branch to replace - /// \param snapshot_id New snapshot id for the given branch + /// \param snapshot_id New Snapshot ID for the given branch /// \return Reference to this for method chaining SnapshotManager& ReplaceBranch(const std::string& name, int64_t snapshot_id); - /// \brief Replaces the from branch to point to the to snapshot. The to will remain - /// unchanged, and from branch will retain its retention properties. If the from branch - /// does not exist, it will be created with default retention properties. + /// \brief Replaces the `from` branch to point to the `to` snapshot. The `to` will + /// remain unchanged, and `from` branch will retain its retention properties. If the + /// `from` branch does not exist, it will be created with default retention properties. /// /// \param from Branch to replace /// \param to The branch from should be replaced with /// \return Reference to this for method chaining SnapshotManager& ReplaceBranch(const std::string& from, const std::string& to); - /// \brief Performs a fast-forward of from up to the to snapshot if from is an ancestor - /// of to. The to will remain unchanged, and from will retain its retention properties. - /// If the from branch does not exist, it will be created with default retention - /// properties. + /// \brief Performs a fast-forward of `from` up to the `to` snapshot if `from` is an + /// ancestor of `to`. The `to` will remain unchanged, and `from` will retain its + /// retention properties. If the `from` branch does not exist, it will be created with + /// default retention properties. /// /// \param from Branch to fast-forward /// \param to Ref for the from branch to be fast forwarded to @@ -171,11 +169,6 @@ class ICEBERG_EXPORT SnapshotManager : public PendingUpdate { /// \return Reference to this for method chaining SnapshotManager& SetMaxRefAgeMs(const std::string& name, int64_t max_ref_age_ms); - /// \brief Apply the pending changes and return the current snapshot. - /// - /// \return The current snapshot after applying changes, or an error - Result> Apply(); - /// \brief Commit all pending changes. /// /// \return Status indicating success or failure @@ -193,7 +186,7 @@ class ICEBERG_EXPORT SnapshotManager : public PendingUpdate { /// \brief Commit any pending reference updates if they exist. Status CommitIfRefUpdatesExist(); - std::shared_ptr update_snapshot_references_operation_; + std::shared_ptr update_snap_refs_; }; } // namespace iceberg diff --git a/src/iceberg/update/snapshot_update.cc b/src/iceberg/update/snapshot_update.cc index 38c5129f4..682d5f889 100644 --- a/src/iceberg/update/snapshot_update.cc +++ b/src/iceberg/update/snapshot_update.cc @@ -331,20 +331,6 @@ Status SnapshotUpdate::Finalize(std::optional commit_error) { return {}; } -Status SnapshotUpdate::SetTargetBranch(const std::string& branch) { - ICEBERG_PRECHECK(!branch.empty(), "Branch name cannot be empty"); - - if (auto ref_it = base().refs.find(branch); ref_it != base().refs.end()) { - ICEBERG_PRECHECK( - ref_it->second->type() == SnapshotRefType::kBranch, - "{} is a tag, not a branch. Tags cannot be targets for producing snapshots", - branch); - } - - target_branch_ = branch; - return {}; -} - Result> SnapshotUpdate::ComputeSummary( const TableMetadata& previous) { std::unordered_map summary = Summary(); diff --git a/src/iceberg/update/snapshot_update.h b/src/iceberg/update/snapshot_update.h index b26bc4c08..fdbb2660d 100644 --- a/src/iceberg/update/snapshot_update.h +++ b/src/iceberg/update/snapshot_update.h @@ -80,11 +80,21 @@ class ICEBERG_EXPORT SnapshotUpdate : public PendingUpdate { /// /// \param branch Which is name of SnapshotRef of type branch /// \return Reference to this for method chaining - auto& ToBranch(this auto& self, const std::string& branch) { - auto status = self.SetTargetBranch(branch); - if (!status.has_value()) { - return self.AddError(status.error()); + auto& SetTargetBranch(this auto& self, const std::string& branch) { + if (branch.empty()) [[unlikely]] { + return self.AddError(ErrorKind::kInvalidArgument, "Branch name cannot be empty"); } + + if (auto ref_it = self.base().refs.find(branch); ref_it != self.base().refs.end()) { + if (ref_it->second->type() != SnapshotRefType::kBranch) { + return self.AddError(ErrorKind::kInvalidArgument, + "{} is a tag, not a branch. Tags cannot be targets for " + "producing snapshots", + branch); + } + } + + self.target_branch_ = branch; return self; } @@ -133,7 +143,6 @@ class ICEBERG_EXPORT SnapshotUpdate : public PendingUpdate { std::span> files, const std::shared_ptr& spec); - Status SetTargetBranch(const std::string& branch); const std::string& target_branch() const { return target_branch_; } bool can_inherit_snapshot_id() const { return can_inherit_snapshot_id_; } const std::string& commit_uuid() const { return commit_uuid_; }