diff --git a/src/iceberg/CMakeLists.txt b/src/iceberg/CMakeLists.txt index 3e0c66f90..23d2e4cdf 100644 --- a/src/iceberg/CMakeLists.txt +++ b/src/iceberg/CMakeLists.txt @@ -82,6 +82,7 @@ set(ICEBERG_SOURCES transform_function.cc type.cc update/pending_update.cc + update/snapshot_update.cc update/update_partition_spec.cc update/update_properties.cc update/update_schema.cc diff --git a/src/iceberg/avro/avro_schema_util.cc b/src/iceberg/avro/avro_schema_util.cc index ba38c1f93..b19ffce7d 100644 --- a/src/iceberg/avro/avro_schema_util.cc +++ b/src/iceberg/avro/avro_schema_util.cc @@ -17,7 +17,6 @@ * under the License. */ -#include #include #include #include @@ -40,6 +39,7 @@ #include "iceberg/schema_util_internal.h" #include "iceberg/util/formatter.h" #include "iceberg/util/macros.h" +#include "iceberg/util/string_util.h" #include "iceberg/util/visit_type.h" namespace iceberg::avro { @@ -471,13 +471,7 @@ Result GetId(const ::avro::NodePtr& node, const std::string& attr_name, return InvalidSchema("Missing avro attribute: {}", attr_name); } - int32_t id; - const auto& id_value = id_str.value(); - auto [_, ec] = std::from_chars(id_value.data(), id_value.data() + id_value.size(), id); - if (ec != std::errc()) { - return InvalidSchema("Invalid {}: {}", attr_name, id_value); - } - return id; + return StringUtils::ParseInt(id_str.value()); } Result GetElementId(const ::avro::NodePtr& node) { diff --git a/src/iceberg/json_internal.cc b/src/iceberg/json_internal.cc index 669d0b50b..4cd2f03fc 100644 --- a/src/iceberg/json_internal.cc +++ b/src/iceberg/json_internal.cc @@ -399,7 +399,7 @@ nlohmann::json ToJson(const Snapshot& snapshot) { json[kTimestampMs] = UnixMsFromTimePointMs(snapshot.timestamp_ms); json[kManifestList] = snapshot.manifest_list; // If there is an operation, write the summary map - if (snapshot.operation().has_value()) { + if (snapshot.Operation().has_value()) { json[kSummary] = snapshot.summary; } SetOptionalField(json, kSchemaId, snapshot.schema_id); @@ -1553,9 +1553,17 @@ Result> TableUpdateFromJson(const nlohmann::json& j GetJsonValueOptional(json, kMaxSnapshotAgeMs)); ICEBERG_ASSIGN_OR_RAISE(auto max_ref_age, GetJsonValueOptional(json, kMaxRefAgeMs)); - return std::make_unique(std::move(ref_name), snapshot_id, type, - min_snapshots, max_snapshot_age, - max_ref_age); + if (type == SnapshotRefType::kTag) { + ICEBERG_ASSIGN_OR_RAISE(auto tag, SnapshotRef::MakeTag(snapshot_id, max_ref_age)); + return std::make_unique(std::move(ref_name), *tag); + } else { + ICEBERG_CHECK(type == SnapshotRefType::kBranch, + "Expected branch type for snapshot ref"); + ICEBERG_ASSIGN_OR_RAISE(auto branch, + SnapshotRef::MakeBranch(snapshot_id, min_snapshots, + max_snapshot_age, max_ref_age)); + return std::make_unique(std::move(ref_name), *branch); + } } if (action == kActionSetProperties) { using StringMap = std::unordered_map; diff --git a/src/iceberg/manifest/manifest_writer.cc b/src/iceberg/manifest/manifest_writer.cc index 0899869ad..0045e2c06 100644 --- a/src/iceberg/manifest/manifest_writer.cc +++ b/src/iceberg/manifest/manifest_writer.cc @@ -369,23 +369,18 @@ Result> ManifestWriter::MakeWriter( int8_t format_version, std::optional snapshot_id, std::string_view manifest_location, std::shared_ptr file_io, std::shared_ptr partition_spec, std::shared_ptr current_schema, - std::optional content, std::optional first_row_id) { + ManifestContent content, std::optional first_row_id) { switch (format_version) { case 1: return MakeV1Writer(snapshot_id, manifest_location, std::move(file_io), std::move(partition_spec), std::move(current_schema)); case 2: - ICEBERG_PRECHECK(content.has_value(), - "ManifestContent is required for format version 2"); return MakeV2Writer(snapshot_id, manifest_location, std::move(file_io), - std::move(partition_spec), std::move(current_schema), - content.value()); + std::move(partition_spec), std::move(current_schema), content); case 3: - ICEBERG_PRECHECK(content.has_value(), - "ManifestContent is required for format version 3"); return MakeV3Writer(snapshot_id, first_row_id, manifest_location, std::move(file_io), std::move(partition_spec), - std::move(current_schema), content.value()); + std::move(current_schema), content); default: return NotSupported("Format version {} is not supported", format_version); } diff --git a/src/iceberg/manifest/manifest_writer.h b/src/iceberg/manifest/manifest_writer.h index 5a095b28b..288bda31a 100644 --- a/src/iceberg/manifest/manifest_writer.h +++ b/src/iceberg/manifest/manifest_writer.h @@ -28,6 +28,7 @@ #include "iceberg/file_writer.h" #include "iceberg/iceberg_export.h" +#include "iceberg/manifest/manifest_list.h" #include "iceberg/metrics.h" #include "iceberg/result.h" #include "iceberg/type_fwd.h" @@ -175,7 +176,7 @@ class ICEBERG_EXPORT ManifestWriter { std::string_view manifest_location, std::shared_ptr file_io, std::shared_ptr partition_spec, std::shared_ptr current_schema, - std::optional content = std::nullopt, + ManifestContent content = ManifestContent::kData, std::optional first_row_id = std::nullopt); private: diff --git a/src/iceberg/meson.build b/src/iceberg/meson.build index 3164f390a..ead2ef2c8 100644 --- a/src/iceberg/meson.build +++ b/src/iceberg/meson.build @@ -103,6 +103,7 @@ iceberg_sources = files( 'transform_function.cc', 'type.cc', 'update/pending_update.cc', + 'update/snapshot_update.cc', 'update/update_partition_spec.cc', 'update/update_properties.cc', 'update/update_schema.cc', diff --git a/src/iceberg/snapshot.cc b/src/iceberg/snapshot.cc index ba85e0491..dfa9a340e 100644 --- a/src/iceberg/snapshot.cc +++ b/src/iceberg/snapshot.cc @@ -19,10 +19,13 @@ #include "iceberg/snapshot.h" +#include + #include "iceberg/file_io.h" #include "iceberg/manifest/manifest_list.h" #include "iceberg/manifest/manifest_reader.h" #include "iceberg/util/macros.h" +#include "iceberg/util/string_util.h" namespace iceberg { @@ -49,6 +52,55 @@ SnapshotRefType SnapshotRef::type() const noexcept { retention); } +Status SnapshotRef::Validate() const { + if (type() == SnapshotRefType::kBranch) { + const auto& branch = std::get(this->retention); + ICEBERG_CHECK(!branch.min_snapshots_to_keep.has_value() || + branch.min_snapshots_to_keep.value() > 0, + "Min snapshots to keep must be greater than 0"); + ICEBERG_CHECK( + !branch.max_snapshot_age_ms.has_value() || branch.max_snapshot_age_ms.value() > 0, + "Max snapshot age must be greater than 0 ms"); + ICEBERG_CHECK(!branch.max_ref_age_ms.has_value() || branch.max_ref_age_ms.value() > 0, + "Max reference age must be greater than 0"); + } else { + const auto& tag = std::get(this->retention); + ICEBERG_CHECK(!tag.max_ref_age_ms.has_value() || tag.max_ref_age_ms.value() > 0, + "Max reference age must be greater than 0"); + } + return {}; +} + +Result> SnapshotRef::MakeBranch( + int64_t snapshot_id, std::optional min_snapshots_to_keep, + std::optional max_snapshot_age_ms, std::optional max_ref_age_ms) { + auto ref = std::make_unique( + SnapshotRef{.snapshot_id = snapshot_id, + .retention = Branch{ + .min_snapshots_to_keep = min_snapshots_to_keep, + .max_snapshot_age_ms = max_snapshot_age_ms, + .max_ref_age_ms = max_ref_age_ms, + }}); + ICEBERG_RETURN_UNEXPECTED(ref->Validate()); + return ref; +} + +Result> SnapshotRef::MakeTag( + int64_t snapshot_id, std::optional max_ref_age_ms) { + auto ref = std::make_unique(SnapshotRef{ + .snapshot_id = snapshot_id, .retention = Tag{.max_ref_age_ms = max_ref_age_ms}}); + ICEBERG_RETURN_UNEXPECTED(ref->Validate()); + return ref; +} + +std::unique_ptr SnapshotRef::Clone( + std::optional new_snapshot_id) const { + auto ref = std::make_unique(); + ref->snapshot_id = new_snapshot_id.value_or(snapshot_id); + ref->retention = retention; + return ref; +} + bool SnapshotRef::Equals(const SnapshotRef& other) const { if (this == &other) { return true; @@ -67,7 +119,7 @@ bool SnapshotRef::Equals(const SnapshotRef& other) const { } } -std::optional Snapshot::operation() const { +std::optional Snapshot::Operation() const { auto it = summary.find(SnapshotSummaryFields::kOperation); if (it != summary.end()) { return it->second; @@ -75,6 +127,24 @@ std::optional Snapshot::operation() const { return std::nullopt; } +Result> Snapshot::FirstRowId() const { + auto it = summary.find(SnapshotSummaryFields::kFirstRowId); + if (it == summary.end()) { + return std::nullopt; + } + + return StringUtils::ParseInt(it->second); +} + +Result> Snapshot::AddedRows() const { + auto it = summary.find(SnapshotSummaryFields::kAddedRows); + if (it == summary.end()) { + return std::nullopt; + } + + return StringUtils::ParseInt(it->second); +} + bool Snapshot::Equals(const Snapshot& other) const { if (this == &other) { return true; @@ -85,6 +155,37 @@ bool Snapshot::Equals(const Snapshot& other) const { schema_id == other.schema_id; } +Result> Snapshot::Make( + int64_t sequence_number, int64_t snapshot_id, + std::optional parent_snapshot_id, TimePointMs timestamp_ms, + std::string operation, std::unordered_map summary, + std::optional schema_id, std::string manifest_list, + std::optional first_row_id, std::optional added_rows) { + ICEBERG_PRECHECK(!operation.empty(), "Operation cannot be empty"); + ICEBERG_PRECHECK(!first_row_id.has_value() || first_row_id.value() >= 0, + "Invalid first-row-id (cannot be negative): {}", first_row_id.value()); + ICEBERG_PRECHECK(!added_rows.has_value() || added_rows.value() >= 0, + "Invalid added-rows (cannot be negative): {}", added_rows.value()); + ICEBERG_PRECHECK(!first_row_id.has_value() || added_rows.has_value(), + "Missing added-rows when first-row-id is set"); + summary[SnapshotSummaryFields::kOperation] = operation; + if (first_row_id.has_value()) { + summary[SnapshotSummaryFields::kFirstRowId] = std::to_string(first_row_id.value()); + } + if (added_rows.has_value()) { + summary[SnapshotSummaryFields::kAddedRows] = std::to_string(added_rows.value()); + } + return std::make_unique(Snapshot{ + .snapshot_id = snapshot_id, + .parent_snapshot_id = parent_snapshot_id, + .sequence_number = sequence_number, + .timestamp_ms = timestamp_ms, + .manifest_list = std::move(manifest_list), + .summary = std::move(summary), + .schema_id = schema_id, + }); +} + Result SnapshotCache::InitManifestsCache( const Snapshot* snapshot, std::shared_ptr file_io) { if (file_io == nullptr) { diff --git a/src/iceberg/snapshot.h b/src/iceberg/snapshot.h index 86a7fe20d..e2ec0ccb0 100644 --- a/src/iceberg/snapshot.h +++ b/src/iceberg/snapshot.h @@ -25,7 +25,6 @@ #include #include #include -#include #include #include "iceberg/iceberg_export.h" @@ -114,6 +113,39 @@ struct ICEBERG_EXPORT SnapshotRef { SnapshotRefType type() const noexcept; + /// \brief Create a branch reference + /// + /// \param snapshot_id The snapshot ID for the branch + /// \param min_snapshots_to_keep Optional minimum number of snapshots to keep + /// \param max_snapshot_age_ms Optional maximum snapshot age in milliseconds + /// \param max_ref_age_ms Optional maximum reference age in milliseconds + /// \return A Result containing a unique_ptr to the SnapshotRef, or an error if + /// validation failed + static Result> MakeBranch( + int64_t snapshot_id, std::optional min_snapshots_to_keep = std::nullopt, + std::optional max_snapshot_age_ms = std::nullopt, + std::optional max_ref_age_ms = std::nullopt); + + /// \brief Create a tag reference + /// + /// \param snapshot_id The snapshot ID for the tag + /// \param max_ref_age_ms Optional maximum reference age in milliseconds + /// \return A Result containing a unique_ptr to the SnapshotRef, or an error if + /// validation failed + static Result> MakeTag( + int64_t snapshot_id, std::optional max_ref_age_ms = std::nullopt); + + /// \brief Clone this SnapshotRef with an optional new snapshot ID + /// + /// \param new_snapshot_id Optional new snapshot ID. If not provided, uses the current + /// snapshot_id + /// \return A unique_ptr to the cloned SnapshotRef + std::unique_ptr Clone( + std::optional new_snapshot_id = std::nullopt) const; + + /// \brief Validate the SnapshotRef + Status Validate() const; + /// \brief Compare two snapshot refs for equality friend bool operator==(const SnapshotRef& lhs, const SnapshotRef& rhs) { return lhs.Equals(rhs); @@ -125,9 +157,13 @@ struct ICEBERG_EXPORT SnapshotRef { }; /// \brief Optional Snapshot Summary Fields -struct SnapshotSummaryFields { +struct ICEBERG_EXPORT SnapshotSummaryFields { /// \brief The operation field key inline static const std::string kOperation = "operation"; + /// \brief The first row id field key + inline static const std::string kFirstRowId = "first-row-id"; + /// \brief The added rows field key + inline static const std::string kAddedRows = "added-rows"; /// Metrics, see https://iceberg.apache.org/spec/#metrics @@ -246,12 +282,44 @@ struct ICEBERG_EXPORT Snapshot { /// ID of the table's current schema when the snapshot was created. std::optional schema_id; + /// \brief Create a new Snapshot instance with validation on the inputs. + static Result> Make( + int64_t sequence_number, int64_t snapshot_id, + std::optional parent_snapshot_id, TimePointMs timestamp_ms, + std::string operation, std::unordered_map summary, + std::optional schema_id, std::string manifest_list, + std::optional first_row_id = std::nullopt, + std::optional added_rows = std::nullopt); + /// \brief Return the name of the DataOperations data operation that produced this /// snapshot. /// /// \return the operation that produced this snapshot, or nullopt if the operation is /// unknown. - std::optional operation() const; + std::optional Operation() const; + + /// \brief The row-id of the first newly added row in this snapshot. + /// + /// All rows added in this snapshot will have a row-id assigned to them greater than + /// this value. All rows with a row-id less than this value were created in a snapshot + /// that was added to the table (but not necessarily committed to this branch) in the + /// past. + /// + /// \return the first row-id to be used in this snapshot or nullopt when row lineage + /// is not supported + Result> FirstRowId() const; + + /// \brief The upper bound of number of rows with assigned row IDs in this snapshot. + /// + /// It can be used safely to increment the table's `next-row-id` during a commit. It + /// can be more than the number of rows added in this snapshot and include some + /// existing rows. + /// + /// This field is optional but is required when the table version supports row lineage. + /// + /// \return the upper bound of number of rows with assigned row IDs in this snapshot + /// or nullopt if the value was not stored. + Result> AddedRows() const; /// \brief Compare two snapshots for equality. friend bool operator==(const Snapshot& lhs, const Snapshot& rhs) { diff --git a/src/iceberg/table.h b/src/iceberg/table.h index 311395856..77fe763ff 100644 --- a/src/iceberg/table.h +++ b/src/iceberg/table.h @@ -50,7 +50,7 @@ class ICEBERG_EXPORT Table : public std::enable_shared_from_this { virtual ~Table(); - /// \brief Return the identifier of this table + /// \brief Returns the identifier of this table const TableIdentifier& name() const { return identifier_; } /// \brief Returns the UUID of the table @@ -59,40 +59,40 @@ class ICEBERG_EXPORT Table : public std::enable_shared_from_this
{ /// \brief Return the schema for this table, return NotFoundError if not found Result> schema() const; - /// \brief Return a map of schema for this table + /// \brief Returns a map of schema for this table Result< std::reference_wrapper>>> schemas() const; - /// \brief Return the partition spec for this table, return NotFoundError if not found + /// \brief Returns the partition spec for this table, return NotFoundError if not found Result> spec() const; - /// \brief Return a map of partition specs for this table + /// \brief Returns a map of partition specs for this table Result>>> specs() const; - /// \brief Return the sort order for this table, return NotFoundError if not found + /// \brief Returns the sort order for this table, return NotFoundError if not found Result> sort_order() const; - /// \brief Return a map of sort order IDs to sort orders for this table + /// \brief Returns a map of sort order IDs to sort orders for this table Result>>> sort_orders() const; - /// \brief Return a map of string properties for this table + /// \brief Returns the properties of this table const TableProperties& properties() const; - /// \brief Return the table's metadata file location + /// \brief Returns the table's metadata file location std::string_view metadata_file_location() const; - /// \brief Return the table's base location + /// \brief Returns the table's base location std::string_view location() const; /// \brief Returns the time when this table was last updated TimePointMs last_updated_ms() const; - /// \brief Return the table's current snapshot, return NotFoundError if not found + /// \brief Returns the table's current snapshot, return NotFoundError if not found Result> current_snapshot() const; /// \brief Get the snapshot of this table with the given id diff --git a/src/iceberg/table_metadata.cc b/src/iceberg/table_metadata.cc index 851048b30..22eb739bd 100644 --- a/src/iceberg/table_metadata.cc +++ b/src/iceberg/table_metadata.cc @@ -34,6 +34,7 @@ #include +#include "iceberg/constants.h" #include "iceberg/exception.h" #include "iceberg/file_io.h" #include "iceberg/json_internal.h" @@ -52,6 +53,7 @@ #include "iceberg/util/location_util.h" #include "iceberg/util/macros.h" #include "iceberg/util/property_util.h" +#include "iceberg/util/timepoint.h" #include "iceberg/util/type_util.h" #include "iceberg/util/uuid.h" @@ -244,26 +246,38 @@ Result> TableMetadata::SchemaById(int32_t schema_id) con } Result> TableMetadata::PartitionSpec() const { - auto iter = std::ranges::find_if(partition_specs, [this](const auto& spec) { - return spec != nullptr && spec->spec_id() == default_spec_id; + return PartitionSpecById(default_spec_id); +} + +Result> TableMetadata::PartitionSpecById( + int32_t spec_id) const { + auto iter = std::ranges::find_if(partition_specs, [spec_id](const auto& spec) { + return spec != nullptr && spec->spec_id() == spec_id; }); if (iter == partition_specs.end()) { - return NotFound("Default partition spec is not found"); + return NotFound("Partition spec with ID {} is not found", spec_id); } return *iter; } Result> TableMetadata::SortOrder() const { - auto iter = std::ranges::find_if(sort_orders, [this](const auto& order) { - return order != nullptr && order->order_id() == default_sort_order_id; + return SortOrderById(default_sort_order_id); +} + +Result> TableMetadata::SortOrderById(int32_t order_id) const { + auto iter = std::ranges::find_if(sort_orders, [order_id](const auto& order) { + return order != nullptr && order->order_id() == order_id; }); if (iter == sort_orders.end()) { - return NotFound("Default sort order is not found"); + return NotFound("Sort order with ID {} is not found", order_id); } return *iter; } Result> TableMetadata::Snapshot() const { + if (current_snapshot_id == kInvalidSnapshotId) { + return NotFound("No current snapshot"); + } return SnapshotById(current_snapshot_id); } @@ -277,6 +291,10 @@ Result> TableMetadata::SnapshotById(int64_t snapshot_i return *iter; } +int64_t TableMetadata::NextSequenceNumber() const { + return format_version > 1 ? last_sequence_number + 1 : kInitialSequenceNumber; +} + namespace { template @@ -555,6 +573,10 @@ class TableMetadataBuilder::Impl { sort_orders_by_id_.emplace(order->order_id(), order); } + for (const auto& snapshot : metadata_.snapshots) { + snapshots_by_id_.emplace(snapshot->snapshot_id, snapshot); + } + metadata_.last_updated_ms = kInvalidLastUpdatedMs; } @@ -591,6 +613,10 @@ class TableMetadataBuilder::Impl { Status RemoveSchemas(const std::unordered_set& schema_ids); Result AddSchema(const Schema& schema, int32_t new_last_column_id); void SetLocation(std::string_view location); + Status AddSnapshot(std::shared_ptr snapshot); + Status SetBranchSnapshot(int64_t snapshot_id, const std::string& branch); + Status SetBranchSnapshot(std::shared_ptr snapshot, const std::string& branch); + Status SetRef(const std::string& name, std::shared_ptr ref); Result> Build(); @@ -613,6 +639,34 @@ class TableMetadataBuilder::Impl { /// \return The ID to use for this schema (reused if exists, new otherwise int32_t ReuseOrCreateNewSchemaId(const Schema& new_schema) const; + /// \brief Finds intermediate snapshots that have not been committed as the current + /// snapshot. + /// + /// Transactions can create snapshots that are never the current snapshot because + /// several changes are combined by the transaction into one table metadata update. When + /// each intermediate snapshot is added to table metadata, it is added to the snapshot + /// log, assuming that it will be the current snapshot. When there are multiple snapshot + /// updates, the log must be corrected by suppressing the intermediate snapshot entries. + /// + /// A snapshot is an intermediate snapshot if it was added but is not the current + /// snapshot. + /// + /// \param current_snapshot_id The current snapshot ID + /// \return A set of snapshot IDs for all added snapshots that were later replaced as + /// the current snapshot in changes + std::unordered_set IntermediateSnapshotIdSet( + int64_t current_snapshot_id) const; + + /// \brief Updates the snapshot log by removing intermediate snapshots and handling + /// removed snapshots. + /// + /// \param current_snapshot_id The current snapshot ID + /// \return Updated snapshot log or error + Result> UpdateSnapshotLog( + int64_t current_snapshot_id) const; + + Status SetBranchSnapshotInternal(const Snapshot& snapshot, const std::string& branch); + private: // Base metadata (nullptr for new tables) const TableMetadata* base_; @@ -634,6 +688,7 @@ class TableMetadataBuilder::Impl { std::unordered_map> schemas_by_id_; std::unordered_map> specs_by_id_; std::unordered_map> sort_orders_by_id_; + std::unordered_map> snapshots_by_id_; }; Status TableMetadataBuilder::Impl::AssignUUID(std::string_view uuid) { @@ -982,6 +1037,206 @@ void TableMetadataBuilder::Impl::SetLocation(std::string_view location) { changes_.push_back(std::make_unique(std::string(location))); } +Status TableMetadataBuilder::Impl::AddSnapshot(std::shared_ptr snapshot) { + if (snapshot == nullptr) { + // change is a noop + return {}; + } + ICEBERG_CHECK(!metadata_.schemas.empty(), + "Attempting to add a snapshot before a schema is added"); + ICEBERG_CHECK(!metadata_.partition_specs.empty(), + "Attempting to add a snapshot before a partition spec is added"); + ICEBERG_CHECK(!metadata_.sort_orders.empty(), + "Attempting to add a snapshot before a sort order is added"); + ICEBERG_CHECK(!snapshots_by_id_.contains(snapshot->snapshot_id), + "Snapshot already exists for id: {}", snapshot->snapshot_id); + ICEBERG_CHECK( + metadata_.format_version == 1 || + snapshot->sequence_number > metadata_.last_sequence_number || + !snapshot->parent_snapshot_id.has_value(), + "Cannot add snapshot with sequence number {} older than last sequence number {}", + snapshot->sequence_number, metadata_.last_sequence_number); + + metadata_.last_updated_ms = snapshot->timestamp_ms; + metadata_.last_sequence_number = snapshot->sequence_number; + metadata_.snapshots.push_back(snapshot); + snapshots_by_id_.emplace(snapshot->snapshot_id, snapshot); + changes_.push_back(std::make_unique(snapshot)); + + if (metadata_.format_version >= TableMetadata::kMinFormatVersionRowLineage) { + ICEBERG_ASSIGN_OR_RAISE(auto first_row_id, snapshot->FirstRowId()); + ICEBERG_CHECK(first_row_id.has_value(), + "Cannot add a snapshot: first-row-id is null"); + ICEBERG_CHECK( + first_row_id.value() >= metadata_.next_row_id, + "Cannot add a snapshot, first-row-id is behind table next-row-id: {} < {}", + first_row_id.value(), metadata_.next_row_id); + + ICEBERG_ASSIGN_OR_RAISE(auto add_rows, snapshot->AddedRows()); + ICEBERG_CHECK(add_rows.has_value(), "Cannot add a snapshot: added-rows is null"); + metadata_.next_row_id += add_rows.value(); + } + + return {}; +} + +Status TableMetadataBuilder::Impl::SetBranchSnapshot(int64_t snapshot_id, + const std::string& branch) { + auto ref_it = metadata_.refs.find(branch); + if (ref_it != metadata_.refs.end() && ref_it->second->snapshot_id == snapshot_id) { + // change is a noop + return {}; + } + + auto snapshot_it = snapshots_by_id_.find(snapshot_id); + ICEBERG_CHECK(snapshot_it != snapshots_by_id_.end(), + "Cannot set {} to unknown snapshot: {}", branch, snapshot_id); + return SetBranchSnapshotInternal(*snapshot_it->second, branch); +} + +Status TableMetadataBuilder::Impl::SetBranchSnapshot(std::shared_ptr snapshot, + const std::string& branch) { + if (snapshot == nullptr) { + // change is a noop + return {}; + } + const Snapshot& snapshot_ref = *snapshot; + ICEBERG_RETURN_UNEXPECTED(AddSnapshot(std::move(snapshot))); + return SetBranchSnapshotInternal(snapshot_ref, branch); +} + +Status TableMetadataBuilder::Impl::SetBranchSnapshotInternal(const Snapshot& snapshot, + const std::string& branch) { + const int64_t replacement_snapshot_id = snapshot.snapshot_id; + auto ref_it = metadata_.refs.find(branch); + if (ref_it != metadata_.refs.end()) { + ICEBERG_CHECK(ref_it->second->type() == SnapshotRefType::kBranch, + "Cannot update branch: {} is a tag", branch); + if (ref_it->second->snapshot_id == replacement_snapshot_id) { + return {}; + } + } + + ICEBERG_CHECK( + metadata_.format_version == 1 || + snapshot.sequence_number <= metadata_.last_sequence_number, + "Last sequence number {} is less than existing snapshot sequence number {}", + metadata_.last_sequence_number, snapshot.sequence_number); + + std::shared_ptr new_ref; + if (ref_it != metadata_.refs.end()) { + new_ref = ref_it->second->Clone(replacement_snapshot_id); + } else { + ICEBERG_ASSIGN_OR_RAISE(new_ref, SnapshotRef::MakeBranch(replacement_snapshot_id)); + } + + return SetRef(branch, std::move(new_ref)); +} + +Status TableMetadataBuilder::Impl::SetRef(const std::string& name, + std::shared_ptr ref) { + auto existing_ref_it = metadata_.refs.find(name); + if (existing_ref_it != metadata_.refs.end() && *existing_ref_it->second == *ref) { + return {}; + } + + int64_t snapshot_id = ref->snapshot_id; + auto snapshot_it = snapshots_by_id_.find(snapshot_id); + ICEBERG_CHECK(snapshot_it != snapshots_by_id_.end(), + "Cannot set {} to unknown snapshot: {}", name, snapshot_id); + const auto& snapshot = snapshot_it->second; + + // If snapshot was added in this set of changes, update last_updated_ms + if (std::ranges::any_of(changes_, [snapshot_id](const auto& change) { + return change->kind() == TableUpdate::Kind::kAddSnapshot && + internal::checked_cast(*change) + .snapshot() + ->snapshot_id == snapshot_id; + })) { + metadata_.last_updated_ms = snapshot->timestamp_ms; + } + + if (name == SnapshotRef::kMainBranch) { + metadata_.current_snapshot_id = ref->snapshot_id; + if (metadata_.last_updated_ms == kInvalidLastUpdatedMs) { + metadata_.last_updated_ms = CurrentTimePointMs(); + } + metadata_.snapshot_log.emplace_back(metadata_.last_updated_ms, ref->snapshot_id); + } + + changes_.push_back(std::make_unique(name, *ref)); + metadata_.refs[name] = std::move(ref); + + return {}; +} + +std::unordered_set TableMetadataBuilder::Impl::IntermediateSnapshotIdSet( + int64_t current_snapshot_id) const { + std::unordered_set added_snapshot_ids; + std::unordered_set intermediate_snapshot_ids; + + std::ranges::for_each(changes_, [&](const auto& change) { + if (change->kind() == TableUpdate::Kind::kAddSnapshot) { + // Adds must always come before set current snapshot + const auto& added_snapshot = + internal::checked_cast(*change); + added_snapshot_ids.insert(added_snapshot.snapshot()->snapshot_id); + } else if (change->kind() == TableUpdate::Kind::kSetSnapshotRef) { + const auto& set_ref = internal::checked_cast(*change); + int64_t snapshot_id = set_ref.snapshot_id(); + if (added_snapshot_ids.contains(snapshot_id) && + set_ref.ref_name() == SnapshotRef::kMainBranch && + snapshot_id != current_snapshot_id) { + intermediate_snapshot_ids.insert(snapshot_id); + } + } + }); + + return intermediate_snapshot_ids; +} + +Result> TableMetadataBuilder::Impl::UpdateSnapshotLog( + int64_t current_snapshot_id) const { + std::unordered_set intermediate_snapshot_ids = + IntermediateSnapshotIdSet(current_snapshot_id); + const bool has_removed_snapshots = + std::ranges::any_of(changes_, [](const auto& change) { + return change->kind() == TableUpdate::Kind::kRemoveSnapshots; + }); + if (intermediate_snapshot_ids.empty() && !has_removed_snapshots) { + return metadata_.snapshot_log; + } + + // Update the snapshot log + std::vector new_snapshot_log; + for (const auto& log_entry : metadata_.snapshot_log) { + int64_t snapshot_id = log_entry.snapshot_id; + if (snapshots_by_id_.contains(snapshot_id)) { + if (!intermediate_snapshot_ids.contains(snapshot_id)) { + // Copy the log entries that are still valid + new_snapshot_log.push_back(log_entry); + } + } else if (has_removed_snapshots) { + // Any invalid entry causes the history before it to be removed. Otherwise, there + // could be history gaps that cause time-travel queries to produce incorrect + // results. For example, if history is [(t1, s1), (t2, s2), (t3, s3)] and s2 is + // removed, the history cannot be [(t1, s1), (t3, s3)] because it appears that s3 + // was current during the time between t2 and t3 when in fact s2 was the current + // snapshot. + new_snapshot_log.clear(); + } + } + + if (snapshots_by_id_.contains(current_snapshot_id)) { + ICEBERG_CHECK( + !new_snapshot_log.empty() && + new_snapshot_log.back().snapshot_id == current_snapshot_id, + "Cannot set invalid snapshot log: latest entry is not the current snapshot"); + } + + return new_snapshot_log; +} + Result> TableMetadataBuilder::Impl::Build() { // 1. Validate metadata consistency through TableMetadata#Validate @@ -1025,7 +1280,9 @@ Result> TableMetadataBuilder::Impl::Build() { metadata_.metadata_log.end() - max_metadata_log_size); } - // TODO(anyone): 4. update snapshot_log + // 4. Update snapshot_log + ICEBERG_ASSIGN_OR_RAISE(metadata_.snapshot_log, + UpdateSnapshotLog(metadata_.current_snapshot_id)); // 5. Create and return the TableMetadata return std::make_unique(std::move(metadata_)); @@ -1207,17 +1464,26 @@ TableMetadataBuilder& TableMetadataBuilder::AddSortOrder( TableMetadataBuilder& TableMetadataBuilder::AddSnapshot( std::shared_ptr snapshot) { - throw IcebergError(std::format("{} not implemented", __FUNCTION__)); + ICEBERG_BUILDER_RETURN_IF_ERROR(impl_->AddSnapshot(snapshot)); + return *this; } TableMetadataBuilder& TableMetadataBuilder::SetBranchSnapshot(int64_t snapshot_id, const std::string& branch) { - throw IcebergError(std::format("{} not implemented", __FUNCTION__)); + ICEBERG_BUILDER_RETURN_IF_ERROR(impl_->SetBranchSnapshot(snapshot_id, branch)); + return *this; +} + +TableMetadataBuilder& TableMetadataBuilder::SetBranchSnapshot( + std::shared_ptr snapshot, const std::string& branch) { + ICEBERG_BUILDER_RETURN_IF_ERROR(impl_->SetBranchSnapshot(std::move(snapshot), branch)); + return *this; } TableMetadataBuilder& TableMetadataBuilder::SetRef(const std::string& name, std::shared_ptr ref) { - throw IcebergError(std::format("{} not implemented", __FUNCTION__)); + ICEBERG_BUILDER_RETURN_IF_ERROR(impl_->SetRef(name, std::move(ref))); + return *this; } TableMetadataBuilder& TableMetadataBuilder::RemoveRef(const std::string& name) { diff --git a/src/iceberg/table_metadata.h b/src/iceberg/table_metadata.h index 3e3eb9c70..a4165b814 100644 --- a/src/iceberg/table_metadata.h +++ b/src/iceberg/table_metadata.h @@ -134,17 +134,32 @@ struct ICEBERG_EXPORT TableMetadata { int format_version = kDefaultTableFormatVersion); /// \brief Get the current schema, return NotFoundError if not found + /// \note The returned schema is guaranteed to be not null Result> Schema() const; /// \brief Get the current schema by ID, return NotFoundError if not found + /// \note The returned schema is guaranteed to be not null Result> SchemaById(int32_t schema_id) const; /// \brief Get the current partition spec, return NotFoundError if not found + /// \note The returned partition spec is guaranteed to be not null Result> PartitionSpec() const; + /// \brief Get the current partition spec by ID, return NotFoundError if not found + /// \note The returned partition spec is guaranteed to be not null + Result> PartitionSpecById( + int32_t spec_id) const; /// \brief Get the current sort order, return NotFoundError if not found + /// \note The returned sort order is guaranteed to be not null Result> SortOrder() const; + /// \brief Get the current sort order by ID, return NotFoundError if not found + /// \note The returned sort order is guaranteed to be not null + Result> SortOrderById(int32_t sort_order_id) const; /// \brief Get the current snapshot, return NotFoundError if not found + /// \note The returned snapshot is guaranteed to be not null Result> Snapshot() const; - /// \brief Get the snapshot of this table with the given id + /// \brief Get the snapshot by ID, return NotFoundError if not found + /// \note The returned snapshot is guaranteed to be not null Result> SnapshotById(int64_t snapshot_id) const; + /// \brief Get the next sequence number + int64_t NextSequenceNumber() const; ICEBERG_EXPORT friend bool operator==(const TableMetadata& lhs, const TableMetadata& rhs); @@ -337,6 +352,14 @@ class ICEBERG_EXPORT TableMetadataBuilder : public ErrorCollector { /// \return Reference to this builder for method chaining TableMetadataBuilder& SetBranchSnapshot(int64_t snapshot_id, const std::string& branch); + /// \brief Set a branch to point to a specific snapshot + /// + /// \param snapshot The snapshot the branch should reference + /// \param branch The name of the branch + /// \return Reference to this builder for method chaining + TableMetadataBuilder& SetBranchSnapshot(std::shared_ptr snapshot, + const std::string& branch); + /// \brief Set a snapshot reference /// /// \param name The name of the reference diff --git a/src/iceberg/table_properties.h b/src/iceberg/table_properties.h index feb4a2001..5d5c17db0 100644 --- a/src/iceberg/table_properties.h +++ b/src/iceberg/table_properties.h @@ -20,7 +20,6 @@ #pragma once #include -#include #include #include #include @@ -244,6 +243,9 @@ class ICEBERG_EXPORT TableProperties : public ConfigBase { inline static Entry kDeleteTargetFileSizeBytes{ "write.delete.target-file-size-bytes", int64_t{64} * 1024 * 1024}; // 64 MB + inline static Entry kSnapshotIdInheritanceEnabled{ + "compatibility.snapshot-id-inheritance.enabled", false}; + // Garbage collection properties inline static Entry kGcEnabled{"gc.enabled", true}; diff --git a/src/iceberg/table_update.cc b/src/iceberg/table_update.cc index 38ce0fbc9..29388d47c 100644 --- a/src/iceberg/table_update.cc +++ b/src/iceberg/table_update.cc @@ -274,7 +274,7 @@ std::unique_ptr SetDefaultSortOrder::Clone() const { // AddSnapshot void AddSnapshot::ApplyTo(TableMetadataBuilder& builder) const { - throw IcebergError(std::format("{} not implemented", __FUNCTION__)); + builder.AddSnapshot(snapshot_); } void AddSnapshot::GenerateRequirements(TableUpdateContext& context) const { @@ -344,7 +344,7 @@ std::unique_ptr RemoveSnapshotRef::Clone() const { // SetSnapshotRef void SetSnapshotRef::ApplyTo(TableMetadataBuilder& builder) const { - throw IcebergError(std::format("{} not implemented", __FUNCTION__)); + builder.SetBranchSnapshot(snapshot_id_, ref_name_); } void SetSnapshotRef::GenerateRequirements(TableUpdateContext& context) const { diff --git a/src/iceberg/table_update.h b/src/iceberg/table_update.h index 875243195..3c9c9dbbc 100644 --- a/src/iceberg/table_update.h +++ b/src/iceberg/table_update.h @@ -401,6 +401,19 @@ class ICEBERG_EXPORT SetSnapshotRef : public TableUpdate { max_snapshot_age_ms_(max_snapshot_age_ms), max_ref_age_ms_(max_ref_age_ms) {} + SetSnapshotRef(std::string ref_name, const SnapshotRef& ref) + : ref_name_(std::move(ref_name)), snapshot_id_(ref.snapshot_id), type_(ref.type()) { + if (type_ == SnapshotRefType::kBranch) { + const auto& branch = std::get(ref.retention); + min_snapshots_to_keep_ = branch.min_snapshots_to_keep; + max_snapshot_age_ms_ = branch.max_snapshot_age_ms; + max_ref_age_ms_ = branch.max_ref_age_ms; + } else { + const auto& tag = std::get(ref.retention); + max_ref_age_ms_ = tag.max_ref_age_ms; + } + } + const std::string& ref_name() const { return ref_name_; } int64_t snapshot_id() const { return snapshot_id_; } SnapshotRefType type() const { return type_; } diff --git a/src/iceberg/test/json_internal_test.cc b/src/iceberg/test/json_internal_test.cc index f88527ff4..33ed03b9f 100644 --- a/src/iceberg/test/json_internal_test.cc +++ b/src/iceberg/test/json_internal_test.cc @@ -269,7 +269,7 @@ TEST(JsonInternalTest, SnapshotFromJsonSummaryWithNoOperation) { auto result = SnapshotFromJson(snapshot_json); ASSERT_TRUE(result.has_value()); - ASSERT_EQ(result.value()->operation(), DataOperation::kOverwrite); + ASSERT_EQ(result.value()->Operation(), DataOperation::kOverwrite); } TEST(JsonInternalTest, NameMapping) { diff --git a/src/iceberg/test/snapshot_test.cc b/src/iceberg/test/snapshot_test.cc index a3c28f89f..e7a657fb9 100644 --- a/src/iceberg/test/snapshot_test.cc +++ b/src/iceberg/test/snapshot_test.cc @@ -96,7 +96,7 @@ TEST_F(SnapshotTest, ConstructionAndFieldAccess) { EXPECT_EQ(snapshot.sequence_number, 1); EXPECT_EQ(snapshot.timestamp_ms.time_since_epoch().count(), 1615569200000); EXPECT_EQ(snapshot.manifest_list, "s3://example/manifest_list.avro"); - EXPECT_EQ(snapshot.operation().value(), DataOperation::kAppend); + EXPECT_EQ(snapshot.Operation(), std::make_optional(DataOperation::kAppend)); EXPECT_EQ(snapshot.summary.at(std::string(SnapshotSummaryFields::kAddedDataFiles)), "101"); EXPECT_EQ(snapshot.summary.at(std::string(SnapshotSummaryFields::kOperation)), diff --git a/src/iceberg/test/table_requirements_test.cc b/src/iceberg/test/table_requirements_test.cc index 80f836367..dd0aa8f63 100644 --- a/src/iceberg/test/table_requirements_test.cc +++ b/src/iceberg/test/table_requirements_test.cc @@ -883,12 +883,12 @@ TEST(TableRequirementsTest, SetSnapshotRef) { // Multiple updates to same ref should deduplicate std::vector> updates; - updates.push_back(std::make_unique(kRefName, kSnapshotId, - SnapshotRefType::kBranch)); - updates.push_back(std::make_unique(kRefName, kSnapshotId + 1, - SnapshotRefType::kBranch)); - updates.push_back(std::make_unique(kRefName, kSnapshotId + 2, - SnapshotRefType::kBranch)); + ICEBERG_UNWRAP_OR_FAIL(auto ref1, SnapshotRef::MakeBranch(kSnapshotId)); + updates.push_back(std::make_unique(kRefName, *ref1)); + ICEBERG_UNWRAP_OR_FAIL(auto ref2, SnapshotRef::MakeBranch(kSnapshotId + 1)); + updates.push_back(std::make_unique(kRefName, *ref2)); + ICEBERG_UNWRAP_OR_FAIL(auto ref3, SnapshotRef::MakeBranch(kSnapshotId + 2)); + updates.push_back(std::make_unique(kRefName, *ref3)); auto result = TableRequirements::ForUpdateTable(*metadata, updates); ASSERT_THAT(result, IsOk()); diff --git a/src/iceberg/test/update_properties_test.cc b/src/iceberg/test/update_properties_test.cc index 8ac8e5eb2..59fa1d8d6 100644 --- a/src/iceberg/test/update_properties_test.cc +++ b/src/iceberg/test/update_properties_test.cc @@ -107,7 +107,7 @@ TEST_F(UpdatePropertiesTest, UpgradeFormatVersionInvalidString) { auto result = update->Apply(); EXPECT_THAT(result, IsError(ErrorKind::kInvalidArgument)); - EXPECT_THAT(result, HasErrorMessage("Invalid format version")); + EXPECT_THAT(result, HasErrorMessage("Failed to parse integer from string")); } TEST_F(UpdatePropertiesTest, UpgradeFormatVersionOutOfRange) { diff --git a/src/iceberg/transaction.cc b/src/iceberg/transaction.cc index 6641a1afd..6ef942dbb 100644 --- a/src/iceberg/transaction.cc +++ b/src/iceberg/transaction.cc @@ -20,20 +20,24 @@ #include "iceberg/transaction.h" #include +#include #include "iceberg/catalog.h" #include "iceberg/schema.h" #include "iceberg/table.h" #include "iceberg/table_metadata.h" +#include "iceberg/table_properties.h" #include "iceberg/table_requirement.h" #include "iceberg/table_requirements.h" #include "iceberg/table_update.h" #include "iceberg/update/pending_update.h" +#include "iceberg/update/snapshot_update.h" #include "iceberg/update/update_partition_spec.h" #include "iceberg/update/update_properties.h" #include "iceberg/update/update_schema.h" #include "iceberg/update/update_sort_order.h" #include "iceberg/util/checked_cast.h" +#include "iceberg/util/location_util.h" #include "iceberg/util/macros.h" namespace iceberg { @@ -69,6 +73,16 @@ const TableMetadata* Transaction::base() const { return metadata_builder_->base( const TableMetadata& Transaction::current() const { return metadata_builder_->current(); } +std::string Transaction::MetadataFileLocation(std::string_view filename) const { + const auto metadata_location = + current().properties.Get(TableProperties::kWriteMetadataLocation); + if (metadata_location.empty()) { + return std::format("{}/{}", LocationUtil::StripTrailingSlash(metadata_location), + filename); + } + return std::format("{}/metadata/{}", current().location, filename); +} + Status Transaction::AddUpdate(const std::shared_ptr& update) { if (!last_update_committed_) { return InvalidArgument("Cannot add update when previous update is not committed"); @@ -113,6 +127,42 @@ Status Transaction::Apply(PendingUpdate& update) { metadata_builder_->SetCurrentSchema(std::move(result.schema), result.new_last_column_id); } break; + case PendingUpdate::Kind::kUpdateSnapshot: { + const auto& base = metadata_builder_->current(); + + auto& update_snapshot = internal::checked_cast(update); + ICEBERG_ASSIGN_OR_RAISE(auto result, update_snapshot.Apply()); + + // Create a temp builder to check if this is an empty update + auto temp_update = TableMetadataBuilder::BuildFrom(&base); + if (base.SnapshotById(result.snapshot->snapshot_id).has_value()) { + // This is a rollback operation + temp_update->SetBranchSnapshot(result.snapshot->snapshot_id, + result.target_branch); + } else if (result.stage_only) { + temp_update->AddSnapshot(result.snapshot); + } else { + temp_update->SetBranchSnapshot(std::move(result.snapshot), result.target_branch); + } + + if (temp_update->changes().empty()) { + // Do not commit if the metadata has not changed. for example, this may happen + // when setting the current snapshot to an ID that is already current. note that + // this check uses identity. + return {}; + } + + for (const auto& change : temp_update->changes()) { + change->ApplyTo(*metadata_builder_); + } + + // If the table UUID is missing, add it here. the UUID will be re-created each time + // this operation retries to ensure that if a concurrent operation assigns the UUID, + // this operation will not fail. + if (base.table_uuid.empty()) { + metadata_builder_->AssignUUID(); + } + } break; default: return NotSupported("Unsupported pending update: {}", static_cast(update.kind())); @@ -155,12 +205,22 @@ Result> Transaction::Commit() { } // XXX: we should handle commit failure and retry here. - ICEBERG_ASSIGN_OR_RAISE(auto updated_table, table_->catalog()->UpdateTable( - table_->name(), requirements, updates)); + auto commit_result = + table_->catalog()->UpdateTable(table_->name(), requirements, updates); + + for (const auto& update : pending_updates_) { + if (auto update_ptr = update.lock()) { + std::ignore = update_ptr->Finalize(commit_result.has_value() + ? std::nullopt + : std::make_optional(commit_result.error())); + } + } + + ICEBERG_RETURN_UNEXPECTED(commit_result); // Mark as committed and update table reference committed_ = true; - table_ = std::move(updated_table); + table_ = std::move(commit_result.value()); return table_; } diff --git a/src/iceberg/transaction.h b/src/iceberg/transaction.h index ea918a173..3c2395c2c 100644 --- a/src/iceberg/transaction.h +++ b/src/iceberg/transaction.h @@ -49,6 +49,12 @@ class ICEBERG_EXPORT Transaction : public std::enable_shared_from_this table_; // The kind of this transaction. diff --git a/src/iceberg/type_fwd.h b/src/iceberg/type_fwd.h index 2daf39e63..ff49e1ed6 100644 --- a/src/iceberg/type_fwd.h +++ b/src/iceberg/type_fwd.h @@ -188,6 +188,7 @@ class Transaction; /// \brief Update family. class PendingUpdate; +class SnapshotUpdate; class UpdatePartitionSpec; class UpdateProperties; class UpdateSchema; diff --git a/src/iceberg/update/meson.build b/src/iceberg/update/meson.build index e4c786f40..4238e0222 100644 --- a/src/iceberg/update/meson.build +++ b/src/iceberg/update/meson.build @@ -18,6 +18,7 @@ install_headers( [ 'pending_update.h', + 'snapshot_update.h', 'update_partition_spec.h', 'update_schema.h', 'update_sort_order.h', diff --git a/src/iceberg/update/pending_update.cc b/src/iceberg/update/pending_update.cc index 535e7b41c..e55a93dfd 100644 --- a/src/iceberg/update/pending_update.cc +++ b/src/iceberg/update/pending_update.cc @@ -30,4 +30,10 @@ PendingUpdate::~PendingUpdate() = default; Status PendingUpdate::Commit() { return transaction_->Apply(*this); } +Status PendingUpdate::Finalize([[maybe_unused]] std::optional commit_error) { + return {}; +} + +const TableMetadata& PendingUpdate::base() const { return transaction_->current(); } + } // namespace iceberg diff --git a/src/iceberg/update/pending_update.h b/src/iceberg/update/pending_update.h index 90723987c..2124d7e12 100644 --- a/src/iceberg/update/pending_update.h +++ b/src/iceberg/update/pending_update.h @@ -23,7 +23,7 @@ /// API for table changes using builder pattern #include -#include +#include #include "iceberg/iceberg_export.h" #include "iceberg/result.h" @@ -45,6 +45,7 @@ class ICEBERG_EXPORT PendingUpdate : public ErrorCollector { kUpdatePartitionSpec, kUpdateProperties, kUpdateSchema, + kUpdateSnapshot, kUpdateSortOrder, }; @@ -59,6 +60,15 @@ class ICEBERG_EXPORT PendingUpdate : public ErrorCollector { /// - CommitStateUnknown: unknown status, no cleanup should be done. virtual Status Commit(); + /// \brief Finalize the pending update. + /// + /// This method is called after the update is committed. + /// Implementations should override this method to clean up any resources. + /// + /// \param commit_error An optional error indicating whether the commit was successful + /// \return Status indicating success or failure + virtual Status Finalize(std::optional commit_error); + // Non-copyable, movable PendingUpdate(const PendingUpdate&) = delete; PendingUpdate& operator=(const PendingUpdate&) = delete; @@ -70,6 +80,8 @@ class ICEBERG_EXPORT PendingUpdate : public ErrorCollector { protected: explicit PendingUpdate(std::shared_ptr transaction); + const TableMetadata& base() const; + std::shared_ptr transaction_; }; diff --git a/src/iceberg/update/snapshot_update.cc b/src/iceberg/update/snapshot_update.cc new file mode 100644 index 000000000..2bbb2d506 --- /dev/null +++ b/src/iceberg/update/snapshot_update.cc @@ -0,0 +1,434 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +#include "iceberg/update/snapshot_update.h" + +#include +#include + +#include "iceberg/constants.h" +#include "iceberg/file_io.h" +#include "iceberg/manifest/manifest_entry.h" +#include "iceberg/manifest/manifest_list.h" +#include "iceberg/manifest/manifest_reader.h" +#include "iceberg/manifest/manifest_writer.h" +#include "iceberg/manifest/rolling_manifest_writer.h" +#include "iceberg/partition_summary_internal.h" +#include "iceberg/snapshot.h" +#include "iceberg/table.h" +#include "iceberg/transaction.h" +#include "iceberg/util/macros.h" +#include "iceberg/util/snapshot_util_internal.h" +#include "iceberg/util/string_util.h" +#include "iceberg/util/uuid.h" + +namespace iceberg { + +namespace { + +// The Java impl skips updating total if parsing fails. Here we choose to be strict. +Status UpdateTotal(std::unordered_map& summary, + const std::unordered_map& previous_summary, + const std::string& total_property, const std::string& added_property, + const std::string& deleted_property) { + auto total_it = previous_summary.find(total_property); + if (total_it != previous_summary.end()) { + ICEBERG_ASSIGN_OR_RAISE(auto new_total, + StringUtils::ParseInt(total_it->second)); + + auto added_it = summary.find(added_property); + if (new_total >= 0 && added_it != summary.end()) { + ICEBERG_ASSIGN_OR_RAISE(auto added_value, + StringUtils::ParseInt(added_it->second)); + new_total += added_value; + } + + auto deleted_it = summary.find(deleted_property); + if (new_total >= 0 && deleted_it != summary.end()) { + ICEBERG_ASSIGN_OR_RAISE(auto deleted_value, + StringUtils::ParseInt(deleted_it->second)); + new_total -= deleted_value; + } + + if (new_total >= 0) { + summary[total_property] = std::to_string(new_total); + } + } + return {}; +} + +// Add metadata to a manifest file by reading it and extracting statistics. +Result AddMetadata(const ManifestFile& manifest, std::shared_ptr io, + const TableMetadata& metadata) { + ICEBERG_PRECHECK(manifest.added_snapshot_id != kInvalidSnapshotId, + "Manifest {} already has assigned a snapshot id: {}", + manifest.manifest_path, manifest.added_snapshot_id); + + ICEBERG_ASSIGN_OR_RAISE(auto schema, metadata.Schema()); + ICEBERG_ASSIGN_OR_RAISE(auto spec, + metadata.PartitionSpecById(manifest.partition_spec_id)); + ICEBERG_ASSIGN_OR_RAISE(auto partition_type, spec->PartitionType(*schema)); + + ICEBERG_ASSIGN_OR_RAISE(auto reader, + ManifestReader::Make(manifest, std::move(io), schema, spec)); + ICEBERG_ASSIGN_OR_RAISE(auto entries, reader->Entries()); + + PartitionSummary stats(*partition_type); + int32_t added_files = 0; + int64_t added_rows = 0; + int32_t existing_files = 0; + int64_t existing_rows = 0; + int32_t deleted_files = 0; + int64_t deleted_rows = 0; + + std::optional snapshot_id; + int64_t max_snapshot_id = std::numeric_limits::min(); + for (const auto& entry : entries) { + ICEBERG_PRECHECK(entry.data_file != nullptr, + "Manifest entry in {} is missing data_file", manifest.manifest_path); + + if (entry.snapshot_id.has_value() && entry.snapshot_id.value() > max_snapshot_id) { + max_snapshot_id = entry.snapshot_id.value(); + } + + switch (entry.status) { + case ManifestStatus::kAdded: { + added_files += 1; + added_rows += entry.data_file->record_count; + if (!snapshot_id.has_value() && entry.snapshot_id.has_value()) { + snapshot_id = entry.snapshot_id; + } + } break; + case ManifestStatus::kExisting: { + existing_files += 1; + existing_rows += entry.data_file->record_count; + } break; + case ManifestStatus::kDeleted: { + deleted_files += 1; + deleted_rows += entry.data_file->record_count; + if (!snapshot_id.has_value() && entry.snapshot_id.has_value()) { + snapshot_id = entry.snapshot_id; + } + } break; + } + + ICEBERG_RETURN_UNEXPECTED(stats.Update(entry.data_file->partition)); + } + + if (!snapshot_id.has_value()) { + // If no files were added or deleted, use the largest snapshot ID in the manifest + snapshot_id = max_snapshot_id; + } + + ICEBERG_ASSIGN_OR_RAISE(auto partition_summaries, stats.Summaries()); + + ManifestFile enriched = manifest; + enriched.added_snapshot_id = snapshot_id.value(); + enriched.added_files_count = added_files; + enriched.existing_files_count = existing_files; + enriched.deleted_files_count = deleted_files; + enriched.added_rows_count = added_rows; + enriched.existing_rows_count = existing_rows; + enriched.deleted_rows_count = deleted_rows; + enriched.partitions = std::move(partition_summaries); + enriched.first_row_id = std::nullopt; + return enriched; +} + +} // anonymous namespace + +SnapshotUpdate::~SnapshotUpdate() = default; + +SnapshotUpdate::SnapshotUpdate(std::shared_ptr transaction) + : PendingUpdate(std::move(transaction)), + can_inherit_snapshot_id_( + base().format_version > 1 || + base().properties.Get(TableProperties::kSnapshotIdInheritanceEnabled)), + commit_uuid_(Uuid::GenerateV7().ToString()), + target_manifest_size_bytes_( + base().properties.Get(TableProperties::kManifestTargetSizeBytes)) {} + +// TODO(xxx): write manifests in parallel +Result> SnapshotUpdate::WriteDataManifests( + const std::vector>& data_files, + const std::shared_ptr& spec, + std::optional data_sequence_number) { + if (data_files.empty()) { + return std::vector{}; + } + + ICEBERG_ASSIGN_OR_RAISE(auto current_schema, base().Schema()); + RollingManifestWriter rolling_writer( + [this, spec, schema = std::move(current_schema), + snapshot_id = SnapshotId()]() -> Result> { + return ManifestWriter::MakeWriter(base().format_version, snapshot_id, + ManifestPath(), transaction_->table()->io(), + std::move(spec), std::move(schema), + ManifestContent::kData, + /*first_row_id=*/base().next_row_id); + }, + target_manifest_size_bytes_); + + for (const auto& file : data_files) { + ICEBERG_RETURN_UNEXPECTED(rolling_writer.WriteAddedEntry(file, data_sequence_number)); + } + ICEBERG_RETURN_UNEXPECTED(rolling_writer.Close()); + return rolling_writer.ToManifestFiles(); +} + +// TODO(xxx): write manifests in parallel +Result> SnapshotUpdate::WriteDeleteManifests( + const std::vector>& delete_files, + const std::shared_ptr& spec) { + if (delete_files.empty()) { + return std::vector{}; + } + + ICEBERG_ASSIGN_OR_RAISE(auto current_schema, base().Schema()); + RollingManifestWriter rolling_writer( + [this, spec, schema = std::move(current_schema), + snapshot_id = SnapshotId()]() -> Result> { + return ManifestWriter::MakeWriter(base().format_version, snapshot_id, + ManifestPath(), transaction_->table()->io(), + std::move(spec), std::move(schema), + ManifestContent::kDeletes); + }, + target_manifest_size_bytes_); + + for (const auto& file : delete_files) { + /// FIXME: Java impl wrap it with `PendingDeleteFile` and deals with + /// file->data_sequenece_number + ICEBERG_RETURN_UNEXPECTED(rolling_writer.WriteAddedEntry(file)); + } + ICEBERG_RETURN_UNEXPECTED(rolling_writer.Close()); + return rolling_writer.ToManifestFiles(); +} + +int64_t SnapshotUpdate::SnapshotId() { + if (!snapshot_id_.has_value()) { + snapshot_id_ = SnapshotUtil::GenerateSnapshotId(base()); + } + return snapshot_id_.value(); +} + +Result SnapshotUpdate::Apply() { + ICEBERG_RETURN_UNEXPECTED(CheckErrors()); + ICEBERG_ASSIGN_OR_RAISE(auto parent_snapshot, + SnapshotUtil::OptionalLatestSnapshot(base(), target_branch_)); + + int64_t sequence_number = base().NextSequenceNumber(); + std::optional parent_snapshot_id = + parent_snapshot ? std::make_optional(parent_snapshot->snapshot_id) : std::nullopt; + + if (parent_snapshot) { + ICEBERG_RETURN_UNEXPECTED(Validate(base(), parent_snapshot)); + } + + ICEBERG_ASSIGN_OR_RAISE(auto manifests, Apply(base(), parent_snapshot)); + for (auto& manifest : manifests) { + if (manifest.added_snapshot_id != kInvalidSnapshotId) { + continue; + } + // TODO(xxx): read in parallel and cache enriched manifests for retries + ICEBERG_ASSIGN_OR_RAISE(manifest, + AddMetadata(manifest, transaction_->table()->io(), base())); + } + + std::string manifest_list_path = ManifestListPath(); + manifest_lists_.push_back(manifest_list_path); + ICEBERG_ASSIGN_OR_RAISE( + auto writer, ManifestListWriter::MakeWriter(base().format_version, SnapshotId(), + parent_snapshot_id, manifest_list_path, + transaction_->table()->io(), + sequence_number, base().next_row_id)); + ICEBERG_RETURN_UNEXPECTED(writer->AddAll(manifests)); + ICEBERG_RETURN_UNEXPECTED(writer->Close()); + + std::optional next_row_id; + std::optional assigned_rows; + if (base().format_version >= 3) { + ICEBERG_CHECK(writer->next_row_id().has_value(), + "row id is required by format version >= 3"); + next_row_id = base().next_row_id; + assigned_rows = writer->next_row_id().value() - base().next_row_id; + } + + std::string op = operation(); + ICEBERG_CHECK(!op.empty(), "Snapshot operation cannot be empty"); + + if (op == DataOperation::kReplace) { + const auto summary = Summary(); + auto added_records_it = summary.find(SnapshotSummaryFields::kAddedRecords); + auto replaced_records_it = summary.find(SnapshotSummaryFields::kDeletedRecords); + if (added_records_it != summary.cend() && replaced_records_it != summary.cend()) { + ICEBERG_ASSIGN_OR_RAISE(auto added_records, + StringUtils::ParseInt(added_records_it->second)); + ICEBERG_ASSIGN_OR_RAISE(auto replaced_records, StringUtils::ParseInt( + replaced_records_it->second)); + ICEBERG_PRECHECK( + added_records <= replaced_records, + "Invalid REPLACE operation: {} added records > {} replaced records", + added_records, replaced_records); + } + } + + ICEBERG_ASSIGN_OR_RAISE(auto summary, ComputeSummary(base())); + ICEBERG_ASSIGN_OR_RAISE( + staged_snapshot_, + Snapshot::Make(sequence_number, SnapshotId(), parent_snapshot_id, + CurrentTimePointMs(), std::move(op), std::move(summary), + base().current_schema_id, std::move(manifest_list_path), next_row_id, + assigned_rows)); + + return ApplyResult{.snapshot = staged_snapshot_, + .target_branch = target_branch_, + .stage_only = stage_only_}; +} + +Status SnapshotUpdate::Finalize(std::optional commit_error) { + if (commit_error.has_value()) { + if (commit_error->kind == ErrorKind::kCommitStateUnknown) { + return {}; + } + CleanAll(); + return {}; + } + + if (CleanupAfterCommit()) { + ICEBERG_CHECK(staged_snapshot_ != nullptr, + "Staged snapshot is null during finalize after commit"); + auto cached_snapshot = SnapshotCache(staged_snapshot_.get()); + ICEBERG_ASSIGN_OR_RAISE(auto manifests, + cached_snapshot.Manifests(transaction_->table()->io())); + CleanUncommitted(manifests | std::views::transform([](const auto& manifest) { + return manifest.manifest_path; + }) | + std::ranges::to>()); + } + + // Also clean up unused manifest lists created by multiple attempts + for (const auto& manifest_list : manifest_lists_) { + if (manifest_list != staged_snapshot_->manifest_list) { + std::ignore = DeleteFile(manifest_list); + } + } + + return {}; +} + +Status SnapshotUpdate::SetTargetBranch(const std::string& branch) { + ICEBERG_PRECHECK(!branch.empty(), "Branch name cannot be empty"); + + if (auto ref_it = base().refs.find(branch); ref_it != base().refs.end()) { + ICEBERG_PRECHECK( + ref_it->second->type() == SnapshotRefType::kBranch, + "{} is a tag, not a branch. Tags cannot be targets for producing snapshots", + branch); + } + + target_branch_ = branch; + return {}; +} + +Result> SnapshotUpdate::ComputeSummary( + const TableMetadata& previous) { + std::unordered_map summary = Summary(); + if (summary.empty()) { + return summary; + } + + // Get previous summary from the target branch + std::unordered_map previous_summary; + if (auto ref_it = previous.refs.find(target_branch_); ref_it != previous.refs.end()) { + if (auto snap_it = previous.SnapshotById(ref_it->second->snapshot_id); + snap_it.has_value()) { + previous_summary = snap_it.value()->summary; + } + } else { + // if there was no previous snapshot, default the summary to start totals at 0 + previous_summary[SnapshotSummaryFields::kTotalRecords] = "0"; + previous_summary[SnapshotSummaryFields::kTotalFileSize] = "0"; + previous_summary[SnapshotSummaryFields::kTotalDataFiles] = "0"; + previous_summary[SnapshotSummaryFields::kTotalDeleteFiles] = "0"; + previous_summary[SnapshotSummaryFields::kTotalPosDeletes] = "0"; + previous_summary[SnapshotSummaryFields::kTotalEqDeletes] = "0"; + } + + // Update totals + ICEBERG_RETURN_UNEXPECTED(UpdateTotal( + summary, previous_summary, SnapshotSummaryFields::kTotalRecords, + SnapshotSummaryFields::kAddedRecords, SnapshotSummaryFields::kDeletedRecords)); + ICEBERG_RETURN_UNEXPECTED(UpdateTotal( + summary, previous_summary, SnapshotSummaryFields::kTotalFileSize, + SnapshotSummaryFields::kAddedFileSize, SnapshotSummaryFields::kRemovedFileSize)); + ICEBERG_RETURN_UNEXPECTED(UpdateTotal( + summary, previous_summary, SnapshotSummaryFields::kTotalDataFiles, + SnapshotSummaryFields::kAddedDataFiles, SnapshotSummaryFields::kDeletedDataFiles)); + ICEBERG_RETURN_UNEXPECTED(UpdateTotal(summary, previous_summary, + SnapshotSummaryFields::kTotalDeleteFiles, + SnapshotSummaryFields::kAddedDeleteFiles, + SnapshotSummaryFields::kRemovedDeleteFiles)); + ICEBERG_RETURN_UNEXPECTED(UpdateTotal(summary, previous_summary, + SnapshotSummaryFields::kTotalPosDeletes, + SnapshotSummaryFields::kAddedPosDeletes, + SnapshotSummaryFields::kRemovedPosDeletes)); + ICEBERG_RETURN_UNEXPECTED(UpdateTotal( + summary, previous_summary, SnapshotSummaryFields::kTotalEqDeletes, + SnapshotSummaryFields::kAddedEqDeletes, SnapshotSummaryFields::kRemovedEqDeletes)); + + // TODO(xxx): add custom summary fields like engine info + return summary; +} + +void SnapshotUpdate::CleanAll() { + for (const auto& manifest_list : manifest_lists_) { + std::ignore = DeleteFile(manifest_list); + } + manifest_lists_.clear(); + CleanUncommitted(std::unordered_set{}); +} + +Status SnapshotUpdate::DeleteFile(const std::string& path) { + static const auto kDefaultDeleteFunc = [this](const std::string& path) { + return this->transaction_->table()->io()->DeleteFile(path); + }; + if (delete_func_) { + return delete_func_(path); + } else { + return kDefaultDeleteFunc(path); + } +} + +std::string SnapshotUpdate::ManifestListPath() { + // Generate manifest list path + // Format: {metadata_location}/snap-{snapshot_id}-{attempt}-{uuid}.avro + int64_t snapshot_id = SnapshotId(); + std::string filename = + std::format("snap-{}-{}-{}.avro", snapshot_id, ++attempt_, commit_uuid_); + return transaction_->MetadataFileLocation(filename); +} + +std::string SnapshotUpdate::ManifestPath() { + // Generate manifest path + // Format: {metadata_location}/{uuid}-m{manifest_count}.avro + std::string filename = std::format("{}-m{}.avro", commit_uuid_, manifest_count_++); + return transaction_->MetadataFileLocation(filename); +} + +} // namespace iceberg diff --git a/src/iceberg/update/snapshot_update.h b/src/iceberg/update/snapshot_update.h new file mode 100644 index 000000000..48ef1676f --- /dev/null +++ b/src/iceberg/update/snapshot_update.h @@ -0,0 +1,196 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +#pragma once + +#include +#include +#include +#include +#include +#include +#include + +#include "iceberg/iceberg_export.h" +#include "iceberg/manifest/manifest_list.h" +#include "iceberg/result.h" +#include "iceberg/snapshot.h" +#include "iceberg/type_fwd.h" +#include "iceberg/update/pending_update.h" + +namespace iceberg { + +/// \brief Base class for operations that produce snapshots. +/// +/// This class provides common functionality for creating new snapshots, +/// including manifest list writing and cleanup. +class ICEBERG_EXPORT SnapshotUpdate : public PendingUpdate { + public: + /// \brief Result of applying a snapshot update + struct ApplyResult { + std::shared_ptr snapshot; + std::string target_branch; + bool stage_only = false; + }; + + ~SnapshotUpdate() override; + + /// \brief Set a callback to delete files instead of the table's default. + /// + /// \param delete_func A function used to delete file locations + /// \return Reference to this for method chaining + /// \note Cannot be called more than once + auto& DeleteWith(this auto& self, + std::function delete_func) { + if (self.delete_func_) { + return self.AddError(ErrorKind::kInvalidArgument, + "Cannot set delete callback more than once"); + } + self.delete_func_ = std::move(delete_func); + return self; + } + + /// \brief Stage a snapshot in table metadata, but not update the current snapshot id. + /// + /// \return Reference to this for method chaining + auto& StageOnly(this auto& self) { + self.stage_only_ = true; + return self; + } + + /// \brief Apply the update's changes to create a new snapshot. + /// + /// This method validates the changes, applies them to the metadata, + /// and creates a new snapshot without committing it. The snapshot + /// is stored internally and can be accessed after Apply() succeeds. + /// + /// \return A result containing the new snapshot, or an error + Result Apply(); + + /// \brief Finalize the snapshot update, cleaning up any uncommitted files. + Status Finalize(std::optional commit_error) override; + + protected: + explicit SnapshotUpdate(std::shared_ptr transaction); + + /// \brief Write data manifests for the given data files + /// + /// \param data_files The data files to write + /// \param spec The partition spec to use + /// \param data_sequence_number Optional data sequence number for the files + /// \return A vector of manifest files + Result> WriteDataManifests( + const std::vector>& data_files, + const std::shared_ptr& spec, + std::optional data_sequence_number = std::nullopt); + + /// \brief Write delete manifests for the given delete files + /// + /// \param delete_files The delete files to write + /// \param spec The partition spec to use + /// \return A vector of manifest files + Result> WriteDeleteManifests( + const std::vector>& delete_files, + const std::shared_ptr& spec); + + Status SetTargetBranch(const std::string& branch); + const std::string& target_branch() const { return target_branch_; } + bool can_inherit_snapshot_id() const { return can_inherit_snapshot_id_; } + const std::string& commit_uuid() const { return commit_uuid_; } + int32_t manifest_count() const { return manifest_count_; } + int32_t attempt() const { return attempt_; } + int64_t target_manifest_size_bytes() const { return target_manifest_size_bytes_; } + + /// \brief Clean up any uncommitted manifests that were created. + /// + /// Manifests may not be committed if Apply is called multiple times + /// because a commit conflict has occurred. Implementations may keep + /// around manifests because the same changes will be made by both + /// Apply calls. This method instructs the implementation to clean up + /// those manifests and passes the paths of the manifests that were + /// actually committed. + /// + /// \param committed A set of manifest paths that were actually committed + virtual void CleanUncommitted(const std::unordered_set& committed) = 0; + + /// \brief A string that describes the action that produced the new snapshot. + /// + /// \return A string operation name + virtual std::string operation() = 0; + + /// \brief Validate the current metadata. + /// + /// Child operations can override this to add custom validation. + /// + /// \param current_metadata Current table metadata to validate + /// \param snapshot Ending snapshot on the lineage which is being validated + virtual Status Validate(const TableMetadata& current_metadata, + const std::shared_ptr& snapshot) { + return {}; + }; + + /// \brief Apply the update's changes to the given metadata and snapshot. + /// + /// \param metadata_to_update The base table metadata to apply changes to + /// \param snapshot Snapshot to apply the changes to + /// \return A vector of manifest files for the new snapshot + virtual Result> Apply( + const TableMetadata& metadata_to_update, + const std::shared_ptr& snapshot) = 0; + + /// \brief Get the summary map for this operation. + /// + /// \return A map of summary properties + virtual std::unordered_map Summary() = 0; + + /// \brief Check if cleanup should happen after commit + /// + /// \return True if cleanup should happen after commit + virtual bool CleanupAfterCommit() const { return true; } + + /// \brief Get or generate the snapshot ID for the new snapshot. + int64_t SnapshotId(); + + private: + /// \brief Returns the snapshot summary from the implementation and updates totals. + Result> ComputeSummary( + const TableMetadata& previous); + + /// \brief Clean up all uncommitted files + void CleanAll(); + + Status DeleteFile(const std::string& path); + std::string ManifestListPath(); + std::string ManifestPath(); + + private: + const bool can_inherit_snapshot_id_{true}; + const std::string commit_uuid_; + int32_t manifest_count_{0}; + int32_t attempt_{0}; + std::vector manifest_lists_; + const int64_t target_manifest_size_bytes_; + std::optional snapshot_id_; + bool stage_only_{false}; + std::function delete_func_; + std::string target_branch_{SnapshotRef::kMainBranch}; + std::shared_ptr staged_snapshot_; +}; + +} // namespace iceberg diff --git a/src/iceberg/update/update_partition_spec.cc b/src/iceberg/update/update_partition_spec.cc index ffea1e099..54c3dc60a 100644 --- a/src/iceberg/update/update_partition_spec.cc +++ b/src/iceberg/update/update_partition_spec.cc @@ -47,11 +47,10 @@ Result> UpdatePartitionSpec::Make( UpdatePartitionSpec::UpdatePartitionSpec(std::shared_ptr transaction) : PendingUpdate(std::move(transaction)) { - const TableMetadata& base_metadata = transaction_->current(); - format_version_ = base_metadata.format_version; + format_version_ = base().format_version; // Get the current/default partition spec - auto spec_result = base_metadata.PartitionSpec(); + auto spec_result = base().PartitionSpec(); if (!spec_result.has_value()) { AddError(spec_result.error()); return; @@ -59,15 +58,15 @@ UpdatePartitionSpec::UpdatePartitionSpec(std::shared_ptr transactio spec_ = std::move(spec_result.value()); // Get the current schema - auto schema_result = base_metadata.Schema(); + auto schema_result = base().Schema(); if (!schema_result.has_value()) { AddError(schema_result.error()); return; } schema_ = std::move(schema_result.value()); - last_assigned_partition_id_ = std::max(base_metadata.last_partition_id, - PartitionSpec::kLegacyPartitionDataIdStart - 1); + last_assigned_partition_id_ = + std::max(base().last_partition_id, PartitionSpec::kLegacyPartitionDataIdStart - 1); name_to_field_ = IndexSpecByName(*spec_); transform_to_field_ = IndexSpecByTransform(*spec_); @@ -433,18 +432,16 @@ UpdatePartitionSpec::IndexSpecByTransform(const PartitionSpec& spec) { } void UpdatePartitionSpec::BuildHistoricalFieldsIndex() { - const TableMetadata& base_metadata = transaction_->current(); - // Count total fields across all specs to reserve capacity size_t total_fields = 0; - for (const auto& partition_spec : base_metadata.partition_specs) { + for (const auto& partition_spec : base().partition_specs) { total_fields += partition_spec->fields().size(); } historical_fields_.reserve(total_fields); // Index all fields from all historical partition specs // Later specs override earlier ones for the same (source_id, transform) key - for (const auto& partition_spec : base_metadata.partition_specs) { + for (const auto& partition_spec : base().partition_specs) { for (const auto& field : partition_spec->fields()) { TransformKey key{field.source_id(), field.transform()->ToString()}; historical_fields_.emplace(key, field); diff --git a/src/iceberg/update/update_properties.cc b/src/iceberg/update/update_properties.cc index ce809c437..fe49df81c 100644 --- a/src/iceberg/update/update_properties.cc +++ b/src/iceberg/update/update_properties.cc @@ -19,10 +19,8 @@ #include "iceberg/update/update_properties.h" -#include #include #include -#include #include "iceberg/metrics_config.h" #include "iceberg/result.h" @@ -31,6 +29,7 @@ #include "iceberg/transaction.h" #include "iceberg/util/error_collector.h" #include "iceberg/util/macros.h" +#include "iceberg/util/string_util.h" namespace iceberg { @@ -70,7 +69,7 @@ UpdateProperties& UpdateProperties::Remove(const std::string& key) { Result UpdateProperties::Apply() { ICEBERG_RETURN_UNEXPECTED(CheckErrors()); - const auto& current_props = transaction_->current().properties.configs(); + const auto& current_props = base().properties.configs(); std::unordered_map new_properties; std::vector removals; for (const auto& [key, value] : current_props) { @@ -85,15 +84,8 @@ Result UpdateProperties::Apply() { auto iter = new_properties.find(TableProperties::kFormatVersion.key()); if (iter != new_properties.end()) { - int parsed_version = 0; - const auto& val = iter->second; - auto [ptr, ec] = std::from_chars(val.data(), val.data() + val.size(), parsed_version); - - if (ec == std::errc::invalid_argument) { - return InvalidArgument("Invalid format version '{}': not a valid integer", val); - } else if (ec == std::errc::result_out_of_range) { - return InvalidArgument("Format version '{}' is out of range", val); - } + ICEBERG_ASSIGN_OR_RAISE(auto parsed_version, + StringUtils::ParseInt(iter->second)); if (parsed_version > TableMetadata::kSupportedTableFormatVersion) { return InvalidArgument( @@ -105,7 +97,7 @@ Result UpdateProperties::Apply() { updates_.erase(TableProperties::kFormatVersion.key()); } - if (auto schema = transaction_->current().Schema(); schema.has_value()) { + if (auto schema = base().Schema(); schema.has_value()) { ICEBERG_RETURN_UNEXPECTED( MetricsConfig::VerifyReferencedColumns(new_properties, *schema.value())); } diff --git a/src/iceberg/update/update_schema.cc b/src/iceberg/update/update_schema.cc index 0e81c4ad7..327ee0ac6 100644 --- a/src/iceberg/update/update_schema.cc +++ b/src/iceberg/update/update_schema.cc @@ -250,10 +250,8 @@ Result> UpdateSchema::Make( UpdateSchema::UpdateSchema(std::shared_ptr transaction) : PendingUpdate(std::move(transaction)) { - const TableMetadata& base_metadata = transaction_->current(); - // Get the current schema - auto schema_result = base_metadata.Schema(); + auto schema_result = base().Schema(); if (!schema_result.has_value()) { AddError(schema_result.error()); return; @@ -261,7 +259,7 @@ UpdateSchema::UpdateSchema(std::shared_ptr transaction) schema_ = std::move(schema_result.value()); // Initialize last_column_id from base metadata - last_column_id_ = base_metadata.last_column_id; + last_column_id_ = base().last_column_id; // Initialize identifier field names from the current schema auto identifier_names_result = schema_->IdentifierFieldNames(); diff --git a/src/iceberg/update/update_sort_order.cc b/src/iceberg/update/update_sort_order.cc index e3e651d50..c5c7be322 100644 --- a/src/iceberg/update/update_sort_order.cc +++ b/src/iceberg/update/update_sort_order.cc @@ -19,7 +19,6 @@ #include "iceberg/update/update_sort_order.h" -#include #include #include @@ -52,7 +51,7 @@ UpdateSortOrder& UpdateSortOrder::AddSortField(const std::shared_ptr& term ICEBERG_BUILDER_CHECK(term != nullptr, "Term cannot be null"); ICEBERG_BUILDER_CHECK(term->is_unbound(), "Term must be unbound"); - ICEBERG_BUILDER_ASSIGN_OR_RETURN(auto schema, transaction_->current().Schema()); + ICEBERG_BUILDER_ASSIGN_OR_RETURN(auto schema, base().Schema()); if (term->kind() == Term::Kind::kReference) { // kReference is treated as identity transform auto named_ref = internal::checked_pointer_cast(term); @@ -99,7 +98,7 @@ Result> UpdateSortOrder::Apply() { // The actual sort order ID will be assigned by TableMetadataBuilder when // the AddSortOrder update is applied. ICEBERG_ASSIGN_OR_RAISE(order, SortOrder::Make(/*sort_id=*/-1, sort_fields_)); - ICEBERG_ASSIGN_OR_RAISE(auto schema, transaction_->current().Schema()); + ICEBERG_ASSIGN_OR_RAISE(auto schema, base().Schema()); ICEBERG_RETURN_UNEXPECTED(order->Validate(*schema)); } return order; diff --git a/src/iceberg/util/snapshot_util.cc b/src/iceberg/util/snapshot_util.cc index 2ec36478a..4395dc274 100644 --- a/src/iceberg/util/snapshot_util.cc +++ b/src/iceberg/util/snapshot_util.cc @@ -26,6 +26,7 @@ #include "iceberg/util/macros.h" #include "iceberg/util/snapshot_util_internal.h" #include "iceberg/util/timepoint.h" +#include "iceberg/util/uuid.h" namespace iceberg { @@ -335,4 +336,28 @@ Result> SnapshotUtil::LatestSnapshot( return metadata.SnapshotById(it->second->snapshot_id); } +Result> SnapshotUtil::OptionalLatestSnapshot( + const TableMetadata& metadata, const std::string& branch) { + return LatestSnapshot(metadata, branch) + .or_else([](const auto& error) -> Result> { + if (error.kind == ErrorKind::kNotFound) { + return nullptr; + } + return std::unexpected(error); + }); +} + +int64_t SnapshotUtil::GenerateSnapshotId() { + auto uuid = Uuid::GenerateV7(); + return (uuid.high_bits() ^ uuid.low_bits()) & std::numeric_limits::max(); +} + +int64_t SnapshotUtil::GenerateSnapshotId(const TableMetadata& metadata) { + auto snapshot_id = GenerateSnapshotId(); + while (metadata.SnapshotById(snapshot_id).has_value()) { + snapshot_id = GenerateSnapshotId(); + } + return snapshot_id; +} + } // namespace iceberg diff --git a/src/iceberg/util/snapshot_util_internal.h b/src/iceberg/util/snapshot_util_internal.h index 2b11168ed..befa96fe6 100644 --- a/src/iceberg/util/snapshot_util_internal.h +++ b/src/iceberg/util/snapshot_util_internal.h @@ -20,6 +20,7 @@ #pragma once #include +#include #include #include #include @@ -229,13 +230,13 @@ class ICEBERG_EXPORT SnapshotUtil { /// \brief Fetch the snapshot at the head of the given branch in the given table. /// - /// This method calls Table::current_snapshot() instead of using branch API for the main + /// This method calls TableMetadata::Snapshot() instead of using branch API for the main /// branch so that existing code still goes through the old code path to ensure /// backwards compatibility. /// /// \param table The table /// \param branch Branch name of the table (empty string means main branch) - /// \return The latest snapshot for the given branch + /// \return The latest snapshot for the given branch, or NotFoundError. static Result> LatestSnapshot(const Table& table, const std::string& branch); @@ -251,10 +252,31 @@ class ICEBERG_EXPORT SnapshotUtil { /// \param metadata The table metadata /// \param branch Branch name of the table metadata (empty string means main /// branch) - /// \return The latest snapshot for the given branch + /// \return The latest snapshot for the given branch, or NotFoundError. static Result> LatestSnapshot(const TableMetadata& metadata, const std::string& branch); + /// \brief Fetch the snapshot at the head of the given branch in the given table. + /// + /// Like LatestSnapshot above except that nullptr is returned if snapshot does not + /// exist. + /// + /// \param metadata The table metadata + /// \param branch Branch name of the table metadata (empty string means main + /// branch) + /// \return The latest snapshot for the given branch, or nullptr if not found. + static Result> OptionalLatestSnapshot( + const TableMetadata& metadata, const std::string& branch); + + /// \brief Generate a new snapshot ID. + static int64_t GenerateSnapshotId(); + + /// \brief Generate a new snapshot ID for the given metadata. + /// + /// \param metadata The table metadata + /// \return A new snapshot ID + static int64_t GenerateSnapshotId(const TableMetadata& metadata); + private: /// \brief Helper function to traverse ancestors of a snapshot. /// diff --git a/src/iceberg/util/string_util.h b/src/iceberg/util/string_util.h index 0c9e89bc7..dfedb4a72 100644 --- a/src/iceberg/util/string_util.h +++ b/src/iceberg/util/string_util.h @@ -20,10 +20,14 @@ #pragma once #include +#include #include #include +#include +#include #include "iceberg/iceberg_export.h" +#include "iceberg/result.h" namespace iceberg { @@ -61,6 +65,20 @@ class ICEBERG_EXPORT StringUtils { } return count; } + + template + static Result ParseInt(std::string_view str) { + T value = 0; + auto [ptr, ec] = std::from_chars(str.data(), str.data() + str.size(), value); + if (ec == std::errc::invalid_argument) [[unlikely]] { + return InvalidArgument("Failed to parse integer from string '{}': invalid argument", + str); + } else if (ec == std::errc::result_out_of_range) [[unlikely]] { + return InvalidArgument("Failed to parse {} from string '{}': value out of range", + typeid(T).name(), str); + } + return value; + } }; /// \brief Transparent hash function that supports std::string_view as lookup key diff --git a/src/iceberg/util/timepoint.cc b/src/iceberg/util/timepoint.cc index 0381e90a6..ed52dddf8 100644 --- a/src/iceberg/util/timepoint.cc +++ b/src/iceberg/util/timepoint.cc @@ -60,4 +60,11 @@ std::string FormatTimePointMs(TimePointMs time_point_ms) { return oss.str(); } +TimePointMs CurrentTimePointMs() { + auto now = std::chrono::system_clock::now(); + auto duration_since_epoch = now.time_since_epoch(); + return TimePointMs{ + std::chrono::duration_cast(duration_since_epoch)}; +} + } // namespace iceberg diff --git a/src/iceberg/util/timepoint.h b/src/iceberg/util/timepoint.h index 6052c94ae..ed303e1fb 100644 --- a/src/iceberg/util/timepoint.h +++ b/src/iceberg/util/timepoint.h @@ -49,4 +49,7 @@ ICEBERG_EXPORT int64_t UnixNsFromTimePointNs(TimePointNs time_point_ns); /// \brief Returns a human-readable string representation of a TimePointMs ICEBERG_EXPORT std::string FormatTimePointMs(TimePointMs time_point_ms); +/// \brief Returns a time point in milliseconds that represents the current system time +ICEBERG_EXPORT TimePointMs CurrentTimePointMs(); + } // namespace iceberg diff --git a/src/iceberg/util/uuid.cc b/src/iceberg/util/uuid.cc index 9322deb93..cc76095a9 100644 --- a/src/iceberg/util/uuid.cc +++ b/src/iceberg/util/uuid.cc @@ -217,4 +217,16 @@ std::string Uuid::ToString() const { data_[15]); } +int64_t Uuid::high_bits() const { + int64_t result; + std::memcpy(&result, data_.data(), 8); + return result; +} + +int64_t Uuid::low_bits() const { + int64_t result; + std::memcpy(&result, data_.data() + 8, 8); + return result; +} + } // namespace iceberg diff --git a/src/iceberg/util/uuid.h b/src/iceberg/util/uuid.h index 64db7c5d6..69ac10fc7 100644 --- a/src/iceberg/util/uuid.h +++ b/src/iceberg/util/uuid.h @@ -78,6 +78,9 @@ class ICEBERG_EXPORT Uuid : public util::Formattable { return lhs.data_ == rhs.data_; } + int64_t high_bits() const; + int64_t low_bits() const; + private: std::array data_; };