Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
3 changes: 2 additions & 1 deletion src/Core/ProtocolDefines.h
Original file line number Diff line number Diff line change
Expand Up @@ -38,7 +38,8 @@ static constexpr auto DBMS_CLUSTER_PROCESSING_PROTOCOL_VERSION_WITH_DATA_LAKE_ME
static constexpr auto DBMS_CLUSTER_PROCESSING_PROTOCOL_VERSION_WITH_ICEBERG_METADATA = 3;
static constexpr auto DBMS_CLUSTER_PROCESSING_PROTOCOL_VERSION_WITH_FILE_BUCKETS_INFO = 4;
static constexpr auto DBMS_CLUSTER_PROCESSING_PROTOCOL_VERSION_WITH_EXCLUDED_ROWS = 5;
static constexpr auto DBMS_CLUSTER_PROCESSING_PROTOCOL_VERSION = DBMS_CLUSTER_PROCESSING_PROTOCOL_VERSION_WITH_EXCLUDED_ROWS;
static constexpr auto DBMS_CLUSTER_PROCESSING_PROTOCOL_VERSION_WITH_ICEBERG_ABSOLUTE_PATH = 6;
static constexpr auto DBMS_CLUSTER_PROCESSING_PROTOCOL_VERSION = DBMS_CLUSTER_PROCESSING_PROTOCOL_VERSION_WITH_ICEBERG_ABSOLUTE_PATH;

static constexpr auto DATA_LAKE_TABLE_STATE_SNAPSHOT_PROTOCOL_VERSION = 1;

Expand Down
3 changes: 3 additions & 0 deletions src/Core/Settings.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -579,6 +579,9 @@ Use multiple threads for azure multipart upload.
)", 0) \
DECLARE(Bool, s3_throw_on_zero_files_match, false, R"(
Throw an error, when ListObjects request cannot match any files
)", 0) \
DECLARE(Bool, s3_propagate_credentials_to_other_storages, false, R"(
Allow copying base storage credentials to secondary object storages with a different endpoint. Default: 0 (credentials only copied when endpoint matches base).
)", 0) \
DECLARE(Bool, hdfs_throw_on_zero_files_match, false, R"(
Throw an error if matched zero files according to glob expansion rules.
Expand Down
40 changes: 40 additions & 0 deletions src/Core/SettingsChangesHistory.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -42,6 +42,46 @@ const VersionToSettingsChangesMap & getSettingsChangesHistory()
// addSettingsChanges(settings_changes_history, "26.1.3.20001",
// {
// });
addSettingsChanges(settings_changes_history, "26.3",
{
{"allow_calculating_subcolumns_sizes_for_merge_tree_reading", false, true, "Allow calculating subcolumns sizes for merge tree reading to improve read tasks splitting"},
{"delta_lake_reload_schema_for_consistency", false, false, "New setting to control whether DeltaLake reloads schema before each query for consistency."},
{"use_partition_pruning", true, true, "New setting controlling whether MergeTree uses partition key for pruning. 'use_partition_key' is an alias for this setting."},
{"use_partition_key", true, true, "Alias for setting 'use_partition_pruning'."},
{"mysql_datatypes_support_level", "", "decimal,datetime64,date2Date32", "Enable modern MySQL type mappings by default."},
});
addSettingsChanges(settings_changes_history, "26.2",
{
{"allow_fuzz_query_functions", false, false, "New setting to enable the fuzzQuery function."},
{"ast_fuzzer_runs", 0, 0, "New setting to enable server-side AST fuzzer."},
{"ast_fuzzer_any_query", false, false, "New setting to allow fuzzing all query types, not just read-only."},
{"check_named_collection_dependencies", true, true, "New setting to check if dropping a named collection would break dependent tables."},
{"deduplicate_blocks_in_dependent_materialized_views", false, true, "Enable deduplication for dependent materialized views by default."},
{"deduplicate_insert", "backward_compatible_choice", "enable", "Enable deduplication for all sync and async inserts by default."},
{"deduplicate_insert", "backward_compatible_choice", "backward_compatible_choice", "New setting to control deduplication for INSERT queries."},
{"enable_join_runtime_filters", false, true, "Enabled this optimization"},
{"parallel_replicas_filter_pushdown", false, false, "New setting"},
{"optimize_dry_run_check_part", true, true, "New setting"},
{"parallel_non_joined_rows_processing", true, true, "New setting to enable parallel processing of non-joined rows in RIGHT/FULL parallel_hash joins."},
{"enable_automatic_decision_for_merging_across_partitions_for_final", true, true, "New setting"},
{"enable_full_text_index", false, true, "The text index is now GA"},
{"allow_experimental_full_text_index", false, true, "The text index is now GA"},
{"use_page_cache_for_local_disks", false, false, "New setting to use userspace page cache for local disks"},
{"use_page_cache_for_object_storage", false, false, "New setting to use userspace page cache for object storage table functions"},
{"use_statistics_cache", false, true, "Enable statistics cache"},
{"apply_row_policy_after_final", false, true, "Enabling apply_row_policy_after_final by default, as if was in 25.8 before #87303"},
{"ignore_format_null_for_explain", false, true, "FORMAT Null is now ignored for EXPLAIN queries by default"},
{"s3_propagate_credentials_to_other_storages", false, false, "New setting"},
{"input_format_connection_handling", false, false, "New setting to allow parsing and processing remaining data in the buffer if the connection closes unexpectedly"},
{"input_format_max_block_wait_ms", 0, 0, "New setting to limit maximum wait time in milliseconds before a block is emitted by input format"},
{"allow_insert_into_iceberg", false, false, "Insert into iceberg was moved to Beta"},
{"allow_experimental_insert_into_iceberg", false, false, "Insert into iceberg was moved to Beta"},
{"output_format_arrow_date_as_uint16", true, false, "Write Date as Arrow DATE32 instead of plain UInt16 by default."},
{"jemalloc_profile_text_output_format", "collapsed", "collapsed", "New setting to control output format for system.jemalloc_profile_text table. Possible values: 'raw', 'symbolized', 'collapsed'"},
{"jemalloc_profile_text_symbolize_with_inline", true, true, "New setting to control whether to include inline frames when symbolizing jemalloc heap profile. When enabled, inline frames are included at the cost of slower symbolization; when disabled, they are skipped for faster output"},
{"jemalloc_profile_text_collapsed_use_count", false, false, "New setting to aggregate by allocation count instead of bytes in the collapsed jemalloc heap profile format"},
{"opentelemetry_start_keeper_trace_probability", "auto", "auto", "New setting"},
});
addSettingsChanges(settings_changes_history, "26.1",
{
{"parallel_replicas_filter_pushdown", false, false, "New setting"},
Expand Down
4 changes: 3 additions & 1 deletion src/Databases/DataLake/DatabaseDataLake.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -498,7 +498,9 @@ StoragePtr DatabaseDataLake::tryGetTableImpl(const String & name, ContextPtr con
LOG_DEBUG(log, "Has no credentials");
}
}
else if (!lightweight && table_metadata.requiresCredentials() && std::find(vended_credentials_catalogs.begin(), vended_credentials_catalogs.end(), catalog->getCatalogType()) == vended_credentials_catalogs.end())
else if (!lightweight && table_metadata.requiresCredentials()
&& std::find(vended_credentials_catalogs.begin(), vended_credentials_catalogs.end(), catalog->getCatalogType()) == vended_credentials_catalogs.end()
&& table_metadata.getStorageType() != DatabaseDataLakeStorageType::Local)
{
throw Exception(
ErrorCodes::BAD_ARGUMENTS,
Expand Down
6 changes: 2 additions & 4 deletions src/Interpreters/ClusterFunctionReadTask.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -35,10 +35,8 @@ ClusterFunctionReadTaskResponse::ClusterFunctionReadTaskResponse(ObjectInfoPtr o
data_lake_metadata = object->data_lake_metadata.value();

#if USE_AVRO
if (std::dynamic_pointer_cast<IcebergDataObjectInfo>(object))
{
iceberg_info = dynamic_cast<IcebergDataObjectInfo &>(*object).info;
}
if (auto iceberg_object = std::dynamic_pointer_cast<IcebergDataObjectInfo>(object))
iceberg_info = iceberg_object->info;
#endif

const bool send_over_whole_archive = !context->getSettingsRef()[Setting::cluster_function_process_archive_on_multiple_nodes];
Expand Down
70 changes: 46 additions & 24 deletions src/Storages/ObjectStorage/DataLakes/Iceberg/Compaction.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -113,6 +113,7 @@ Plan getPlan(
const DataLakeStorageSettings & data_lake_settings,
const PersistentTableComponents & persistent_table_components,
ObjectStoragePtr object_storage,
SecondaryStorages & secondary_storages,
const String & write_format,
ContextPtr context,
CompressionMethod compression_method)
Expand Down Expand Up @@ -155,27 +156,30 @@ Plan getPlan(
std::unordered_map<String, std::shared_ptr<ManifestFilePlan>> manifest_files;
for (const auto & snapshot : snapshots_info)
{
auto manifest_list = getManifestList(object_storage, persistent_table_components, context, snapshot.manifest_list_path, log);
auto manifest_list = getManifestList(object_storage, persistent_table_components, context, snapshot.manifest_list_absolute_path, log, secondary_storages);
for (const auto & manifest_file : manifest_list)
{
plan.manifest_list_to_manifest_files[snapshot.manifest_list_path].push_back(manifest_file.manifest_file_path);
if (!plan.manifest_file_to_first_snapshot.contains(manifest_file.manifest_file_path))
plan.manifest_file_to_first_snapshot[manifest_file.manifest_file_path] = snapshot.snapshot_id;
plan.manifest_list_to_manifest_files[snapshot.manifest_list_absolute_path].push_back(manifest_file.manifest_file_absolute_path);
if (!plan.manifest_file_to_first_snapshot.contains(manifest_file.manifest_file_absolute_path))
{
plan.manifest_file_to_first_snapshot[manifest_file.manifest_file_absolute_path] = snapshot.snapshot_id;
}
auto manifest_file_content = getManifestFile(
object_storage,
persistent_table_components,
context,
log,
manifest_file.manifest_file_path,
manifest_file.manifest_file_absolute_path,
manifest_file.added_sequence_number,
manifest_file.added_snapshot_id);
manifest_file.added_snapshot_id,
secondary_storages);

if (!manifest_files.contains(manifest_file.manifest_file_path))
if (!manifest_files.contains(manifest_file.manifest_file_absolute_path))
{
manifest_files[manifest_file.manifest_file_path] = std::make_shared<ManifestFilePlan>(current_schema);
manifest_files[manifest_file.manifest_file_path]->path = manifest_file.manifest_file_path;
manifest_files[manifest_file.manifest_file_absolute_path] = std::make_shared<ManifestFilePlan>(current_schema);
manifest_files[manifest_file.manifest_file_absolute_path]->path = manifest_file.manifest_file_absolute_path;
}
manifest_files[manifest_file.manifest_file_path]->manifest_lists_path.push_back(snapshot.manifest_list_path);
manifest_files[manifest_file.manifest_file_absolute_path]->manifest_lists_path.push_back(snapshot.manifest_list_absolute_path);
auto data_files = manifest_file_content->getFilesWithoutDeleted(FileContentType::DATA);
auto positional_delete_files = manifest_file_content->getFilesWithoutDeleted(FileContentType::POSITION_DELETE);
for (const auto & pos_delete_file : positional_delete_files)
Expand All @@ -187,19 +191,23 @@ Plan getPlan(
if (plan.partitions.size() <= partition_index)
plan.partitions.push_back({});

IcebergDataObjectInfoPtr data_object_info = std::make_shared<IcebergDataObjectInfo>(data_file, 0);
auto [resolved_storage, resolved_key] = resolveObjectStorageForPath(
persistent_table_components.table_location, data_file->file_path, object_storage, secondary_storages, context);

IcebergDataObjectInfoPtr data_object_info = std::make_shared<IcebergDataObjectInfo>(*data_file, 0, resolved_storage, resolved_key);
std::shared_ptr<DataFilePlan> data_file_ptr;
if (!plan.path_to_data_file.contains(manifest_file.manifest_file_path))
std::string path_identifier = resolved_storage->getDescription() + ":" + resolved_storage->getObjectsNamespace() + "|" + resolved_key;
if (!plan.path_to_data_file.contains(path_identifier))
{
data_file_ptr = std::make_shared<DataFilePlan>(DataFilePlan{
.data_object_info = data_object_info,
.manifest_list = manifest_files[manifest_file.manifest_file_path],
.manifest_list = manifest_files[manifest_file.manifest_file_absolute_path],
.patched_path = plan.generator.generateDataFileName()});
plan.path_to_data_file[manifest_file.manifest_file_path] = data_file_ptr;
plan.path_to_data_file[path_identifier] = data_file_ptr;
}
else
{
data_file_ptr = plan.path_to_data_file[manifest_file.manifest_file_path];
data_file_ptr = plan.path_to_data_file[path_identifier];
}
plan.partitions[partition_index].push_back(data_file_ptr);
plan.snapshot_id_to_data_files[snapshot.snapshot_id].push_back(plan.partitions[partition_index].back());
Expand Down Expand Up @@ -232,7 +240,9 @@ static void writeDataFiles(
const std::optional<FormatSettings> & format_settings,
ContextPtr context,
const String & write_format,
CompressionMethod write_compression_method)
CompressionMethod write_compression_method,
const String & table_location,
SecondaryStorages & secondary_storages)
{
for (auto & [_, data_file] : initial_plan.path_to_data_file)
{
Expand All @@ -243,10 +253,15 @@ static void writeDataFiles(
format_settings,
// todo make compaction using same FormatParserSharedResources
std::make_shared<FormatParserSharedResources>(context->getSettingsRef(), 1),
context);
context,
table_location,
secondary_storages);

RelativePathWithMetadata relative_path(data_file->data_object_info->getPath());
auto read_buffer = createReadBuffer(relative_path, object_storage, context, getLogger("IcebergCompaction"));
ObjectStoragePtr storage_to_use = data_file->data_object_info->getResolvedStorage();
if (!storage_to_use)
storage_to_use = object_storage;
RelativePathWithMetadata object_info(data_file->data_object_info->getPath());
auto read_buffer = createReadBuffer(object_info, storage_to_use, context, getLogger("IcebergCompaction"));

const Settings & settings = context->getSettingsRef();
auto parser_shared_resources = std::make_shared<FormatParserSharedResources>(
Expand Down Expand Up @@ -400,6 +415,9 @@ void writeMetadataFiles(
{
manifest_entry->patched_path = plan.generator.generateManifestEntryName();
manifest_file_renamings[manifest_entry->path] = manifest_entry->patched_path.path_in_metadata;

std::vector<String> manifest_data_filenames(data_filenames.begin(), data_filenames.end());

auto buffer_manifest_entry = object_storage->writeObject(
StoredObject(manifest_entry->patched_path.path_in_storage),
WriteMode::Rewrite,
Expand All @@ -417,7 +435,7 @@ void writeMetadataFiles(
partition_columns,
plan.partition_encoder.getPartitionValue(grouped_by_manifest_files_partitions[manifest_entry]),
ChunkPartitioner(fields_from_partition_spec, current_schema, context, sample_block_).getResultTypes(),
std::vector(data_filenames.begin(), data_filenames.end()),
manifest_data_filenames,
manifest_entry->statistics,
sample_block_,
snapshot,
Expand All @@ -438,17 +456,17 @@ void writeMetadataFiles(
if (plan.history[i].added_files == 0)
continue;

manifest_list_renamings[plan.history[i].manifest_list_path] = new_snapshots[i].metadata_path;
manifest_list_renamings[plan.history[i].manifest_list_absolute_path] = new_snapshots[i].metadata_path;
}

for (size_t i = 0; i < plan.history.size(); ++i)
{
if (plan.history[i].added_files == 0)
continue;

auto initial_manifest_list_name = plan.history[i].manifest_list_path;
auto initial_manifest_list_name = plan.history[i].manifest_list_absolute_path;
auto initial_manifest_entries = plan.manifest_list_to_manifest_files[initial_manifest_list_name];
auto renamed_manifest_list = manifest_list_renamings[initial_manifest_list_name];
auto renamed_manifest_list = manifest_list_renamings[plan.history[i].manifest_list_absolute_path];
std::vector<String> renamed_manifest_entries;
Int32 total_manifest_file_sizes = 0;
for (const auto & initial_manifest_entry : initial_manifest_entries)
Expand Down Expand Up @@ -516,6 +534,7 @@ void compactIcebergTable(
IcebergHistory snapshots_info,
const PersistentTableComponents & persistent_table_components,
ObjectStoragePtr object_storage_,
SecondaryStorages & secondary_storages_,
const DataLakeStorageSettings & data_lake_settings,
const std::optional<FormatSettings> & format_settings_,
SharedHeader sample_block_,
Expand All @@ -527,6 +546,7 @@ void compactIcebergTable(
data_lake_settings,
persistent_table_components,
object_storage_,
secondary_storages_,
write_format,
context_,
persistent_table_components.metadata_compression_method);
Expand All @@ -540,7 +560,9 @@ void compactIcebergTable(
format_settings_,
context_,
write_format,
persistent_table_components.metadata_compression_method);
persistent_table_components.metadata_compression_method,
persistent_table_components.table_location,
secondary_storages_);
writeMetadataFiles(plan, object_storage_, context_, sample_block_, write_format, persistent_table_components.table_path);
clearOldFiles(object_storage_, old_files);
}
Expand Down
2 changes: 2 additions & 0 deletions src/Storages/ObjectStorage/DataLakes/Iceberg/Compaction.h
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,7 @@
#include <Storages/ObjectStorage/DataLakes/Iceberg/PersistentTableComponents.h>
#include <Storages/ObjectStorage/DataLakes/Iceberg/Snapshot.h>
#include <Storages/ObjectStorage/StorageObjectStorage.h>
#include <Storages/ObjectStorage/Utils.h>


namespace DB::Iceberg
Expand All @@ -15,6 +16,7 @@ void compactIcebergTable(
IcebergHistory snapshots_info,
const PersistentTableComponents & persistent_table_components,
DB::ObjectStoragePtr object_storage_,
SecondaryStorages & secondary_storages_,
const DataLakeStorageSettings & data_lake_settings,
const std::optional<DB::FormatSettings> & format_settings_,
DB::SharedHeader sample_block_,
Expand Down
Loading
Loading