diff --git a/src/Storages/ObjectStorage/DataLakes/IDataLakeMetadata.h b/src/Storages/ObjectStorage/DataLakes/IDataLakeMetadata.h index 8f3542165eb7..f58bf8d04dcd 100644 --- a/src/Storages/ObjectStorage/DataLakes/IDataLakeMetadata.h +++ b/src/Storages/ObjectStorage/DataLakes/IDataLakeMetadata.h @@ -144,6 +144,12 @@ class IDataLakeMetadata : boost::noncopyable virtual void modifyFormatSettings(FormatSettings &, const Context &) const {} + virtual bool supportsTruncate() const { return false; } + virtual void truncate(ContextPtr /*context*/, std::shared_ptr /*catalog*/, const StorageID & /*storage_id*/) + { + throw Exception(ErrorCodes::NOT_IMPLEMENTED, "Truncation is not supported by {} metadata", getName()); + } + static constexpr bool supportsTotalRows() { return false; } virtual std::optional totalRows(ContextPtr) const { return {}; } static constexpr bool supportsTotalBytes() { return false; } diff --git a/src/Storages/ObjectStorage/DataLakes/Iceberg/Constant.h b/src/Storages/ObjectStorage/DataLakes/Iceberg/Constant.h index 3bc4747a5f18..a09125cfbeb2 100644 --- a/src/Storages/ObjectStorage/DataLakes/Iceberg/Constant.h +++ b/src/Storages/ObjectStorage/DataLakes/Iceberg/Constant.h @@ -111,7 +111,9 @@ DEFINE_ICEBERG_FIELD_ALIAS(partition_spec, partition-spec); DEFINE_ICEBERG_FIELD_ALIAS(partition_specs, partition-specs); DEFINE_ICEBERG_FIELD_ALIAS(spec_id, spec-id); DEFINE_ICEBERG_FIELD_ALIAS(added_records, added-records); +DEFINE_ICEBERG_FIELD_ALIAS(deleted_records, deleted-records); DEFINE_ICEBERG_FIELD_ALIAS(added_data_files, added-data-files); +DEFINE_ICEBERG_FIELD_ALIAS(deleted_data_files, deleted-data-files); DEFINE_ICEBERG_FIELD_ALIAS(added_delete_files, added-delete-files); DEFINE_ICEBERG_FIELD_ALIAS(added_position_delete_files, added-position-delete-files); DEFINE_ICEBERG_FIELD_ALIAS(added_position_deletes, added-position-deletes); diff --git a/src/Storages/ObjectStorage/DataLakes/Iceberg/IcebergMetadata.cpp b/src/Storages/ObjectStorage/DataLakes/Iceberg/IcebergMetadata.cpp index b78739deb826..36ac299adc4b 100644 --- a/src/Storages/ObjectStorage/DataLakes/Iceberg/IcebergMetadata.cpp +++ b/src/Storages/ObjectStorage/DataLakes/Iceberg/IcebergMetadata.cpp @@ -11,9 +11,9 @@ #include #include #include -#include #include #include +#include #include #include #include @@ -104,6 +104,7 @@ extern const int NOT_IMPLEMENTED; extern const int ICEBERG_SPECIFICATION_VIOLATION; extern const int TABLE_ALREADY_EXISTS; extern const int SUPPORT_IS_DISABLED; +extern const int INCORRECT_DATA; } namespace Setting @@ -610,6 +611,100 @@ void IcebergMetadata::mutate( ); } +void IcebergMetadata::truncate(ContextPtr context, std::shared_ptr catalog, const StorageID & storage_id) +{ + if (!context->getSettingsRef()[Setting::allow_experimental_insert_into_iceberg].value) + throw Exception( + ErrorCodes::SUPPORT_IS_DISABLED, + "Iceberg truncate is experimental. " + "To allow its usage, enable setting allow_experimental_insert_into_iceberg"); + + auto [actual_data_snapshot, actual_table_state_snapshot] = getRelevantState(context); + auto metadata_object = getMetadataJSONObject( + actual_table_state_snapshot.metadata_file_path, + object_storage, + persistent_components.metadata_cache, + context, + log, + persistent_components.metadata_compression_method, + persistent_components.table_uuid); + + // Use -1 as the Iceberg spec sentinel for "no parent snapshot" + // (distinct from snapshot ID 0 which is a valid snapshot). + Int64 parent_snapshot_id = actual_table_state_snapshot.snapshot_id.value_or(-1); + + auto config_path = persistent_components.table_path; + if (!config_path.starts_with('/')) config_path = '/' + config_path; + if (!config_path.ends_with('/')) config_path += "/"; + + bool is_transactional = (catalog != nullptr && catalog->isTransactional()); + + // Transactional catalogs (e.g. REST) require a fully-qualified blob URI + // (scheme://bucket/path) so the catalog can resolve the metadata location + // independently of any local path configuration. Non-transactional catalogs + // use bare paths relative to the object storage root. + FileNamesGenerator filename_generator; + if (is_transactional || context->getSettingsRef()[Setting::write_full_path_in_iceberg_metadata]) + { + String location = metadata_object->getValue(Iceberg::f_location); + if (!location.ends_with("/")) location += "/"; + filename_generator = FileNamesGenerator( + location, config_path, is_transactional, + persistent_components.metadata_compression_method, write_format); + } + else + { + filename_generator = FileNamesGenerator( + config_path, config_path, false, + persistent_components.metadata_compression_method, write_format); + } + + Int32 new_metadata_version = actual_table_state_snapshot.metadata_version + 1; + filename_generator.setVersion(new_metadata_version); + + auto [metadata_name, storage_metadata_name] = filename_generator.generateMetadataName(); + + auto [new_snapshot, manifest_list_name, storage_manifest_list_name] = MetadataGenerator(metadata_object).generateNextMetadata( + filename_generator, metadata_name, parent_snapshot_id, + /* added_files */ 0, /* added_records */ 0, /* added_files_size */ 0, + /* num_partitions */ 0, /* added_delete_files */ 0, /* num_deleted_rows */ 0, + std::nullopt, std::nullopt, /*is_truncate=*/true); + + auto write_settings = context->getWriteSettings(); + auto buf = object_storage->writeObject( + StoredObject(storage_manifest_list_name), + WriteMode::Rewrite, std::nullopt, + DBMS_DEFAULT_BUFFER_SIZE, write_settings); + + generateManifestList(filename_generator, metadata_object, object_storage, + context, {}, new_snapshot, 0, *buf, Iceberg::FileContentType::DATA, /*use_previous_snapshots=*/false); + buf->finalize(); + + String metadata_content = dumpMetadataObjectToString(metadata_object); + writeMessageToFile(metadata_content, storage_metadata_name, object_storage, + context, "*", "", persistent_components.metadata_compression_method); + + if (catalog) + { + // Transactional catalogs require a fully-qualified blob URI so the catalog + // can resolve the metadata location independently of local path configuration. + String catalog_filename = metadata_name; + if (is_transactional) + { + // Build full URI from the table's location field (e.g. "s3://bucket/namespace.table") + // combined with the relative metadata name. + String location = metadata_object->getValue(Iceberg::f_location); + if (!location.ends_with("/")) location += "/"; + catalog_filename = location + metadata_name; + } + + const auto & [namespace_name, table_name] = DataLake::parseTableName(storage_id.getTableName()); + if (!catalog->updateMetadata(namespace_name, table_name, catalog_filename, new_snapshot)) + throw Exception(ErrorCodes::INCORRECT_DATA, + "Failed to commit Iceberg truncate update to catalog."); + } +} + void IcebergMetadata::checkMutationIsPossible(const MutationCommands & commands) { for (const auto & command : commands) diff --git a/src/Storages/ObjectStorage/DataLakes/Iceberg/IcebergMetadata.h b/src/Storages/ObjectStorage/DataLakes/Iceberg/IcebergMetadata.h index 169e93a11d4a..c672f36de014 100644 --- a/src/Storages/ObjectStorage/DataLakes/Iceberg/IcebergMetadata.h +++ b/src/Storages/ObjectStorage/DataLakes/Iceberg/IcebergMetadata.h @@ -85,6 +85,9 @@ class IcebergMetadata : public IDataLakeMetadata bool supportsUpdate() const override { return true; } bool supportsWrites() const override { return true; } bool supportsParallelInsert() const override { return true; } + bool supportsTruncate() const override { return true; } + + void truncate(ContextPtr context, std::shared_ptr catalog, const StorageID & storage_id) override; IcebergHistory getHistory(ContextPtr local_context) const; diff --git a/src/Storages/ObjectStorage/DataLakes/Iceberg/IcebergWrites.cpp b/src/Storages/ObjectStorage/DataLakes/Iceberg/IcebergWrites.cpp index fe30deb08ba7..c2d5a0c0c5bc 100644 --- a/src/Storages/ObjectStorage/DataLakes/Iceberg/IcebergWrites.cpp +++ b/src/Storages/ObjectStorage/DataLakes/Iceberg/IcebergWrites.cpp @@ -430,6 +430,31 @@ void generateManifestFile( writer.close(); } +// Avro uses zigzag encoding for integers to efficiently represent small negative +// numbers. Positive n maps to 2n, negative n maps to 2(-n)-1, keeping small +// magnitudes compact regardless of sign. The value is then serialized as a +// variable-length base-128 integer (little-endian), where the high bit of each +// byte signals whether more bytes follow. +// See: https://avro.apache.org/docs/1.11.1/specification/#binary-encoding +static void writeAvroLong(WriteBuffer & out, int64_t val) +{ + uint64_t n = (static_cast(val) << 1) ^ static_cast(val >> 63); + while (n & ~0x7fULL) + { + char c = static_cast((n & 0x7f) | 0x80); + out.write(&c, 1); + n >>= 7; + } + char c = static_cast(n); + out.write(&c, 1); +} + +static void writeAvroBytes(WriteBuffer & out, const String & s) +{ + writeAvroLong(out, static_cast(s.size())); + out.write(s.data(), s.size()); +} + void generateManifestList( const FileNamesGenerator & filename_generator, Poco::JSON::Object::Ptr metadata, @@ -451,6 +476,38 @@ void generateManifestList( else throw Exception(ErrorCodes::BAD_ARGUMENTS, "Unknown iceberg version {}", version); + // For empty manifest list (e.g. TRUNCATE), write a valid Avro container + // file manually so we can embed the full schema JSON with field-ids intact, + // without triggering the DataFileWriter constructor's eager writeHeader() + // which commits encoder state before we can override avro.schema. + if (manifest_entry_names.empty() && !use_previous_snapshots) + { + // For an empty manifest list (e.g. after TRUNCATE), we write a minimal valid + // Avro Object Container File manually rather than using avro::DataFileWriter. + // The reason: DataFileWriter calls writeHeader() eagerly in its constructor, + // committing the binary encoder state. Post-construction setMetadata() calls + // corrupt StreamWriter::next_ causing a NULL dereference on close(). Writing + // the OCF header directly ensures the full schema JSON (with Iceberg field-ids) + // is embedded intact — the Avro C++ library strips unknown field properties + // like field-id during schema node serialization. + // Avro OCF format: [magic(4)] [metadata_map] [sync_marker(16)] [no data blocks] + buf.write("Obj\x01", 4); + + writeAvroLong(buf, 2); // 2 metadata entries + writeAvroBytes(buf, "avro.codec"); + writeAvroBytes(buf, "null"); + writeAvroBytes(buf, "avro.schema"); + writeAvroBytes(buf, schema_representation); // full JSON with field-ids intact + + writeAvroLong(buf, 0); // end of metadata map + + static const char sync_marker[16] = {}; + buf.write(sync_marker, 16); + + buf.finalize(); + return; + } + auto schema = avro::compileJsonSchemaFromString(schema_representation); // NOLINT auto adapter = std::make_unique(buf); diff --git a/src/Storages/ObjectStorage/DataLakes/Iceberg/MetadataGenerator.cpp b/src/Storages/ObjectStorage/DataLakes/Iceberg/MetadataGenerator.cpp index c7bce897ea6c..184c6a7f9359 100644 --- a/src/Storages/ObjectStorage/DataLakes/Iceberg/MetadataGenerator.cpp +++ b/src/Storages/ObjectStorage/DataLakes/Iceberg/MetadataGenerator.cpp @@ -113,7 +113,8 @@ MetadataGenerator::NextMetadataResult MetadataGenerator::generateNextMetadata( Int32 added_delete_files, Int32 num_deleted_rows, std::optional user_defined_snapshot_id, - std::optional user_defined_timestamp) + std::optional user_defined_timestamp, + bool is_truncate) { int format_version = metadata_object->getValue(Iceberg::f_format_version); Poco::JSON::Object::Ptr new_snapshot = new Poco::JSON::Object; @@ -137,7 +138,16 @@ MetadataGenerator::NextMetadataResult MetadataGenerator::generateNextMetadata( auto parent_snapshot = getParentSnapshot(parent_snapshot_id); Poco::JSON::Object::Ptr summary = new Poco::JSON::Object; - if (num_deleted_rows == 0) + if (is_truncate) + { + summary->set(Iceberg::f_operation, Iceberg::f_overwrite); + Int32 prev_total_records = parent_snapshot && parent_snapshot->has(Iceberg::f_summary) && parent_snapshot->getObject(Iceberg::f_summary)->has(Iceberg::f_total_records) ? std::stoi(parent_snapshot->getObject(Iceberg::f_summary)->getValue(Iceberg::f_total_records)) : 0; + Int32 prev_total_data_files = parent_snapshot && parent_snapshot->has(Iceberg::f_summary) && parent_snapshot->getObject(Iceberg::f_summary)->has(Iceberg::f_total_data_files) ? std::stoi(parent_snapshot->getObject(Iceberg::f_summary)->getValue(Iceberg::f_total_data_files)) : 0; + + summary->set(Iceberg::f_deleted_records, std::to_string(prev_total_records)); + summary->set(Iceberg::f_deleted_data_files, std::to_string(prev_total_data_files)); + } + else if (num_deleted_rows == 0) { summary->set(Iceberg::f_operation, Iceberg::f_append); summary->set(Iceberg::f_added_data_files, std::to_string(added_files)); @@ -157,7 +167,12 @@ MetadataGenerator::NextMetadataResult MetadataGenerator::generateNextMetadata( auto sum_with_parent_snapshot = [&](const char * field_name, Int32 snapshot_value) { - Int32 prev_value = parent_snapshot ? std::stoi(parent_snapshot->getObject(Iceberg::f_summary)->getValue(field_name)) : 0; + if (is_truncate) + { + summary->set(field_name, std::to_string(0)); + return; + } + Int32 prev_value = parent_snapshot && parent_snapshot->has(Iceberg::f_summary) && parent_snapshot->getObject(Iceberg::f_summary)->has(field_name) ? std::stoi(parent_snapshot->getObject(Iceberg::f_summary)->getValue(field_name)) : 0; summary->set(field_name, std::to_string(prev_value + snapshot_value)); }; diff --git a/src/Storages/ObjectStorage/DataLakes/Iceberg/MetadataGenerator.h b/src/Storages/ObjectStorage/DataLakes/Iceberg/MetadataGenerator.h index a4cbbbc4434e..035747dafa14 100644 --- a/src/Storages/ObjectStorage/DataLakes/Iceberg/MetadataGenerator.h +++ b/src/Storages/ObjectStorage/DataLakes/Iceberg/MetadataGenerator.h @@ -30,7 +30,8 @@ class MetadataGenerator Int32 added_delete_files, Int32 num_deleted_rows, std::optional user_defined_snapshot_id = std::nullopt, - std::optional user_defined_timestamp = std::nullopt); + std::optional user_defined_timestamp = std::nullopt, + bool is_truncate = false); void generateAddColumnMetadata(const String & column_name, DataTypePtr type); void generateDropColumnMetadata(const String & column_name); diff --git a/src/Storages/ObjectStorage/StorageObjectStorage.cpp b/src/Storages/ObjectStorage/StorageObjectStorage.cpp index f04be5d5f946..7fe3ce25debe 100644 --- a/src/Storages/ObjectStorage/StorageObjectStorage.cpp +++ b/src/Storages/ObjectStorage/StorageObjectStorage.cpp @@ -625,7 +625,7 @@ void StorageObjectStorage::commitExportPartitionTransaction(const String & trans void StorageObjectStorage::truncate( const ASTPtr & /* query */, const StorageMetadataPtr & /* metadata_snapshot */, - ContextPtr /* context */, + ContextPtr context, TableExclusiveLockHolder & /* table_holder */) { const auto path = configuration->getRawPath(); @@ -639,8 +639,12 @@ void StorageObjectStorage::truncate( if (configuration->isDataLakeConfiguration()) { - throw Exception(ErrorCodes::NOT_IMPLEMENTED, - "Truncate is not supported for data lake engine"); + auto * data_lake_metadata = getExternalMetadata(context); + if (!data_lake_metadata || !data_lake_metadata->supportsTruncate()) + throw Exception(ErrorCodes::NOT_IMPLEMENTED, "Truncate is not supported for this data lake engine"); + + data_lake_metadata->truncate(context, catalog, getStorageID()); + return; } if (path.hasGlobs()) diff --git a/tests/integration/test_storage_iceberg_no_spark/test_iceberg_truncate.py b/tests/integration/test_storage_iceberg_no_spark/test_iceberg_truncate.py new file mode 100644 index 000000000000..5d775db70ce0 --- /dev/null +++ b/tests/integration/test_storage_iceberg_no_spark/test_iceberg_truncate.py @@ -0,0 +1,81 @@ +#!/usr/bin/env python3 + +from pyiceberg.catalog import load_catalog +from helpers.config_cluster import minio_secret_key, minio_access_key +import uuid +import pyarrow as pa +from pyiceberg.schema import Schema, NestedField +from pyiceberg.types import LongType, StringType +from pyiceberg.partitioning import PartitionSpec + +BASE_URL_LOCAL_RAW = "http://localhost:8182" +CATALOG_NAME = "demo" + +def load_catalog_impl(started_cluster): + return load_catalog( + CATALOG_NAME, + **{ + "uri": BASE_URL_LOCAL_RAW, + "type": "rest", + "s3.endpoint": f"http://{started_cluster.get_instance_ip('minio')}:9000", + "s3.access-key-id": minio_access_key, + "s3.secret-access-key": minio_secret_key, + }, + ) + + +def test_iceberg_truncate_restart(started_cluster_iceberg_no_spark): + instance = started_cluster_iceberg_no_spark.instances["node1"] + catalog = load_catalog_impl(started_cluster_iceberg_no_spark) + + namespace = f"clickhouse_truncate_restart_{uuid.uuid4().hex}" + catalog.create_namespace(namespace) + + schema = Schema( + NestedField(field_id=1, name="id", field_type=LongType(), required=False), + NestedField(field_id=2, name="val", field_type=StringType(), required=False), + ) + table_name = "test_truncate_restart" + catalog.create_table( + identifier=f"{namespace}.{table_name}", + schema=schema, + location=f"s3://warehouse-rest/{namespace}.{table_name}", + partition_spec=PartitionSpec(), + ) + + ch_table_identifier = f"`{namespace}.{table_name}`" + + instance.query(f"DROP DATABASE IF EXISTS {namespace}") + instance.query( + f""" + CREATE DATABASE {namespace} ENGINE = DataLakeCatalog('http://rest:8181/v1', 'minio', '{minio_secret_key}') + SETTINGS + catalog_type='rest', + warehouse='demo', + storage_endpoint='http://minio:9000/warehouse-rest'; + """, + settings={"allow_database_iceberg": 1} + ) + + # 1. Insert initial data and truncate + df = pa.Table.from_pylist([{"id": 1, "val": "A"}, {"id": 2, "val": "B"}]) + catalog.load_table(f"{namespace}.{table_name}").append(df) + + assert int(instance.query(f"SELECT count() FROM {namespace}.{ch_table_identifier}").strip()) == 2 + + instance.query( + f"TRUNCATE TABLE {namespace}.{ch_table_identifier}", + settings={"allow_experimental_insert_into_iceberg": 1} + ) + assert int(instance.query(f"SELECT count() FROM {namespace}.{ch_table_identifier}").strip()) == 0 + + # 2. Restart ClickHouse and verify table is still readable (count = 0) + instance.restart_clickhouse() + assert int(instance.query(f"SELECT count() FROM {namespace}.{ch_table_identifier}").strip()) == 0 + + # 3. Insert new data after restart and verify it's readable + new_df = pa.Table.from_pylist([{"id": 3, "val": "C"}]) + catalog.load_table(f"{namespace}.{table_name}").append(new_df) + assert int(instance.query(f"SELECT count() FROM {namespace}.{ch_table_identifier}").strip()) == 1 + + instance.query(f"DROP DATABASE {namespace}") \ No newline at end of file