Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
9 changes: 9 additions & 0 deletions programs/server/Server.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -163,6 +163,10 @@
# include <azure/core/diagnostics/logger.hpp>
#endif

#if USE_PARQUET
# include <Processors/Formats/Impl/ParquetFileMetaDataCache.h>
#endif


/// A minimal file used when the server is run without installation
constexpr unsigned char resource_embedded_xml[] =
Expand Down Expand Up @@ -415,6 +419,7 @@ namespace ServerSetting
extern const ServerSettingsUInt64 keeper_server_socket_send_timeout_sec;
extern const ServerSettingsString hdfs_libhdfs3_conf;
extern const ServerSettingsString config_file;
extern const ServerSettingsUInt64 input_format_parquet_metadata_cache_max_size;
}

namespace ErrorCodes
Expand Down Expand Up @@ -2739,6 +2744,10 @@ try

auto replicas_reconnector = ReplicasReconnector::init(global_context);

#if USE_PARQUET
ParquetFileMetaDataCache::instance()->setMaxSizeInBytes(server_settings[ServerSetting::input_format_parquet_metadata_cache_max_size]);
#endif

/// Set current database name before loading tables and databases because
/// system logs may copy global context.
std::string default_database = server_settings[ServerSetting::default_database];
Expand Down
1 change: 1 addition & 0 deletions src/Access/Common/AccessType.h
Original file line number Diff line number Diff line change
Expand Up @@ -334,6 +334,7 @@ enum class AccessType : uint8_t
M(SYSTEM_DROP_SCHEMA_CACHE, "SYSTEM CLEAR SCHEMA CACHE, SYSTEM DROP SCHEMA CACHE, DROP SCHEMA CACHE", GLOBAL, SYSTEM_DROP_CACHE) \
M(SYSTEM_DROP_FORMAT_SCHEMA_CACHE, "SYSTEM CLEAR FORMAT SCHEMA CACHE, SYSTEM DROP FORMAT SCHEMA CACHE, DROP FORMAT SCHEMA CACHE", GLOBAL, SYSTEM_DROP_CACHE) \
M(SYSTEM_DROP_S3_CLIENT_CACHE, "SYSTEM CLEAR S3 CLIENT CACHE, SYSTEM DROP S3 CLIENT, DROP S3 CLIENT CACHE", GLOBAL, SYSTEM_DROP_CACHE) \
M(SYSTEM_DROP_PARQUET_METADATA_CACHE, "SYSTEM DROP PARQUET METADATA CACHE", GLOBAL, SYSTEM_DROP_CACHE) \
M(SYSTEM_DROP_CACHE, "DROP CACHE", GROUP, SYSTEM) \
M(SYSTEM_RELOAD_CONFIG, "RELOAD CONFIG", GLOBAL, SYSTEM_RELOAD) \
M(SYSTEM_RELOAD_USERS, "RELOAD USERS", GLOBAL, SYSTEM_RELOAD) \
Expand Down
3 changes: 2 additions & 1 deletion src/Common/ProfileEvents.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -1298,7 +1298,8 @@ The server successfully detected this situation and will download merged part fr
M(RuntimeFilterRowsChecked, "Number of rows checked by JOIN Runtime Filters", ValueType::Number) \
M(RuntimeFilterRowsPassed, "Number of rows that passed (not filtered out by) JOIN Runtime Filters", ValueType::Number) \
M(RuntimeFilterRowsSkipped, "Number of rows in blocks that were skipped by JOIN Runtime Filters", ValueType::Number) \

M(ParquetMetaDataCacheHits, "Number of times the read from filesystem cache hit the cache.", ValueType::Number) \
M(ParquetMetaDataCacheMisses, "Number of times the read from filesystem cache miss the cache.", ValueType::Number) \

#ifdef APPLY_FOR_EXTERNAL_EVENTS
#define APPLY_FOR_EVENTS(M) APPLY_FOR_BUILTIN_EVENTS(M) APPLY_FOR_EXTERNAL_EVENTS(M)
Expand Down
3 changes: 1 addition & 2 deletions src/Core/FormatFactorySettings.h
Original file line number Diff line number Diff line change
Expand Up @@ -1527,8 +1527,7 @@ Allow to write information about geo columns in parquet metadata and encode colu
DECLARE(Bool, into_outfile_create_parent_directories, false, R"(
Automatically create parent directories when using INTO OUTFILE if they do not already exists.
)", 0) \


DECLARE(Bool, input_format_parquet_use_metadata_cache, true, R"(Enable parquet file metadata caching)", 0) \
// End of FORMAT_FACTORY_SETTINGS

#define OBSOLETE_FORMAT_SETTINGS(M, ALIAS) \
Expand Down
5 changes: 3 additions & 2 deletions src/Core/ServerSettings.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -1469,7 +1469,8 @@ The policy on how to perform a scheduling of CPU slots specified by `concurrent_
```xml
<skip_check_for_incorrect_settings>1</skip_check_for_incorrect_settings>
```
)", 0)
)", 0) \
DECLARE(UInt64, input_format_parquet_metadata_cache_max_size, 500000000, "Maximum size of parquet file metadata cache", 0)

/// Settings with a path are server settings with at least one layer of nesting that have a fixed structure (no lists, lists, enumerations, repetitions, ...).
#define LIST_OF_SERVER_SETTINGS_WITH_PATH(DECLARE, ALIAS) \
Expand Down Expand Up @@ -1542,7 +1543,7 @@ The policy on how to perform a scheduling of CPU slots specified by `concurrent_
DECLARE(UInt64, keeper_server_socket_receive_timeout_sec, DBMS_DEFAULT_RECEIVE_TIMEOUT_SEC, R"(Keeper socket receive timeout.)", 0, "keeper_server.socket_receive_timeout_sec") \
DECLARE(UInt64, keeper_server_socket_send_timeout_sec, DBMS_DEFAULT_SEND_TIMEOUT_SEC, R"(Keeper socket send timeout.)", 0, "keeper_server.socket_send_timeout_sec") \
DECLARE(String, hdfs_libhdfs3_conf, "", R"(Points libhdfs3 to the right location for its config.)", 0, "hdfs.libhdfs3_conf") \
DECLARE(String, config_file, "config.xml", R"(Points to the server config file.)", 0, "config-file")
DECLARE(String, config_file, "config.xml", R"(Points to the server config file.)", 0, "config-file")

// clang-format on

Expand Down
6 changes: 6 additions & 0 deletions src/Core/SettingsChangesHistory.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -78,6 +78,7 @@ const VersionToSettingsChangesMap & getSettingsChangesHistory()
{"throw_if_deduplication_in_dependent_materialized_views_enabled_with_async_insert", true, false, "It becomes obsolete."},
{"database_datalake_require_metadata_access", true, true, "New setting."},
{"automatic_parallel_replicas_min_bytes_per_replica", 0, 1_MiB, "Better default value derived from testing results"},
{"input_format_parquet_use_metadata_cache", true, true, "New setting, turned ON by default"},
Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

# Baselines generated with v25.12.1 (pre-release)

});
addSettingsChanges(settings_changes_history, "25.12",
{
Expand Down Expand Up @@ -434,6 +435,11 @@ const VersionToSettingsChangesMap & getSettingsChangesHistory()
{"parallel_hash_join_threshold", 0, 0, "New setting"},
/// Release closed. Please use 25.4
});
addSettingsChanges(settings_changes_history, "24.12.2.20000",
{
// Altinity Antalya modifications atop of 24.12
{"input_format_parquet_use_metadata_cache", true, true, "New setting, turned ON by default"}, // https://github.com/Altinity/ClickHouse/pull/586
});
addSettingsChanges(settings_changes_history, "25.2",
{
/// Release closed. Please use 25.3
Expand Down
15 changes: 15 additions & 0 deletions src/Interpreters/InterpreterSystemQuery.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -77,6 +77,10 @@
#include <Formats/ProtobufSchemas.h>
#endif

#if USE_PARQUET
#include <Processors/Formats/Impl/ParquetFileMetaDataCache.h>
#endif

#if USE_AWS_S3
#include <IO/S3/Client.h>
#endif
Expand Down Expand Up @@ -453,6 +457,16 @@ BlockIO InterpreterSystemQuery::execute()
getContext()->clearQueryResultCache(query.query_result_cache_tag);
break;
}
case Type::DROP_PARQUET_METADATA_CACHE:
{
#if USE_PARQUET
getContext()->checkAccess(AccessType::SYSTEM_DROP_PARQUET_METADATA_CACHE);
ParquetFileMetaDataCache::instance()->clear();
break;
#else
throw Exception(ErrorCodes::SUPPORT_IS_DISABLED, "The server was compiled without the support for Parquet");
#endif
}
case Type::CLEAR_COMPILED_EXPRESSION_CACHE:
#if USE_EMBEDDED_COMPILER
getContext()->checkAccess(AccessType::SYSTEM_DROP_COMPILED_EXPRESSION_CACHE);
Expand Down Expand Up @@ -1997,6 +2011,7 @@ AccessRightsElements InterpreterSystemQuery::getRequiredAccessForDDLOnCluster()
case Type::CLEAR_PAGE_CACHE:
case Type::CLEAR_SCHEMA_CACHE:
case Type::CLEAR_FORMAT_SCHEMA_CACHE:
case Type::DROP_PARQUET_METADATA_CACHE:
case Type::CLEAR_S3_CLIENT_CACHE:
{
required_access.emplace_back(AccessType::SYSTEM_DROP_CACHE);
Expand Down
1 change: 1 addition & 0 deletions src/Parsers/ASTSystemQuery.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -575,6 +575,7 @@ void ASTSystemQuery::formatImpl(WriteBuffer & ostr, const FormatSettings & setti
case Type::CLEAR_TEXT_INDEX_CACHES:
case Type::CLEAR_COMPILED_EXPRESSION_CACHE:
case Type::CLEAR_S3_CLIENT_CACHE:
case Type::DROP_PARQUET_METADATA_CACHE:
case Type::CLEAR_ICEBERG_METADATA_CACHE:
case Type::RESET_COVERAGE:
case Type::RESTART_REPLICAS:
Expand Down
1 change: 1 addition & 0 deletions src/Parsers/ASTSystemQuery.h
Original file line number Diff line number Diff line change
Expand Up @@ -43,6 +43,7 @@ class ASTSystemQuery : public IAST, public ASTQueryWithOnCluster
CLEAR_QUERY_CONDITION_CACHE,
CLEAR_QUERY_CACHE,
CLEAR_COMPILED_EXPRESSION_CACHE,
DROP_PARQUET_METADATA_CACHE,
CLEAR_ICEBERG_METADATA_CACHE,
CLEAR_FILESYSTEM_CACHE,
CLEAR_DISTRIBUTED_CACHE,
Expand Down
4 changes: 4 additions & 0 deletions src/Processors/Formats/IInputFormat.h
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,7 @@
#include <base/types.h>
#include <Core/BlockMissingValues.h>
#include <Processors/ISource.h>
#include <Core/Settings.h>


namespace DB
Expand Down Expand Up @@ -128,6 +129,9 @@ class IInputFormat : public ISource

void needOnlyCount() { need_only_count = true; }

/// Set additional info/key/id related to underlying storage of the ReadBuffer
virtual void setStorageRelatedUniqueKey(const Settings & /*settings*/, const String & /*key*/) {}

protected:
ReadBuffer & getReadBuffer() const { chassert(in); return *in; }

Expand Down
76 changes: 75 additions & 1 deletion src/Processors/Formats/Impl/ParquetBlockInputFormat.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,9 @@

#if USE_PARQUET

#include <Core/Settings.h>
#include <Core/ServerSettings.h>
#include <Common/ProfileEvents.h>
#include <Common/logger_useful.h>
#include <Common/ThreadPool.h>
#include <Common/setThreadName.h>
Expand All @@ -34,6 +37,7 @@
#include <Common/FieldAccurateComparison.h>
#include <Processors/Formats/Impl/Parquet/parquetBloomFilterHash.h>
#include <Interpreters/Context.h>
#include <Processors/Formats/Impl/ParquetFileMetaDataCache.h>
#include <Interpreters/convertFieldToType.h>
#include <Storages/MergeTree/KeyCondition.h>
#include <Processors/Formats/Impl/ParquetV3BlockInputFormat.h>
Expand All @@ -45,6 +49,8 @@ namespace ProfileEvents
extern const Event ParquetFetchWaitTimeMicroseconds;
extern const Event ParquetReadRowGroups;
extern const Event ParquetPrunedRowGroups;
extern const Event ParquetMetaDataCacheHits;
extern const Event ParquetMetaDataCacheMisses;
}

namespace CurrentMetrics
Expand All @@ -61,6 +67,16 @@ namespace CurrentMetrics
namespace DB
{

namespace Setting
{
extern const SettingsBool input_format_parquet_use_metadata_cache;
}

namespace ServerSetting
{
extern const ServerSettingsUInt64 input_format_parquet_metadata_cache_max_size;
}

namespace ErrorCodes
{
extern const int INCORRECT_DATA;
Expand Down Expand Up @@ -545,6 +561,49 @@ static std::vector<Range> getHyperrectangleForRowGroup(const parquet::FileMetaDa
return hyperrectangle;
}

std::shared_ptr<parquet::FileMetaData> ParquetBlockInputFormat::readMetadataFromFile()
{
createArrowFileIfNotCreated();
return parquet::ReadMetaData(arrow_file);
}

std::shared_ptr<parquet::FileMetaData> ParquetBlockInputFormat::getFileMetaData()
{
// in-memory cache is not implemented for local file operations, only for remote files
// there is a chance the user sets `input_format_parquet_use_metadata_cache=1` for a local file operation
// and the cache_key won't be set. Therefore, we also need to check for metadata_cache.key
if (!metadata_cache.use_cache || metadata_cache.key.empty())
{
return readMetadataFromFile();
}

auto [parquet_file_metadata, loaded] = ParquetFileMetaDataCache::instance()->getOrSet(
metadata_cache.key,
[&]()
{
return readMetadataFromFile();
}
);
if (loaded)
ProfileEvents::increment(ProfileEvents::ParquetMetaDataCacheMisses);
else
ProfileEvents::increment(ProfileEvents::ParquetMetaDataCacheHits);
return parquet_file_metadata;
}

void ParquetBlockInputFormat::createArrowFileIfNotCreated()
{
if (arrow_file)
{
return;
}

// Create arrow file adapter.
// TODO: Make the adapter do prefetching on IO threads, based on the full set of ranges that
// we'll need to read (which we know in advance). Use max_download_threads for that.
arrow_file = asArrowFile(*in, format_settings, is_stopped, "Parquet", PARQUET_MAGIC_BYTES, /* avoid_buffering */ true);
}

std::unordered_set<std::size_t> getBloomFilterFilteringColumnKeys(const KeyCondition::RPN & rpn)
{
std::unordered_set<std::size_t> column_keys;
Expand Down Expand Up @@ -691,7 +750,7 @@ void ParquetBlockInputFormat::initializeIfNeeded()
if (is_stopped)
return;

metadata = parquet::ReadMetaData(arrow_file);
metadata = getFileMetaData();
if (buckets_to_read)
{
std::unordered_set<size_t> set_to_read(buckets_to_read->row_group_ids.begin(), buckets_to_read->row_group_ids.end());
Expand Down Expand Up @@ -807,6 +866,8 @@ void ParquetBlockInputFormat::initializeIfNeeded()
}
}

bool has_row_groups_to_read = false;

auto skip_row_group_based_on_filters = [&](int row_group)
{
if (!format_settings.parquet.filter_push_down && !format_settings.parquet.bloom_filter_push_down)
Expand Down Expand Up @@ -865,7 +926,20 @@ void ParquetBlockInputFormat::initializeIfNeeded()
row_group_batches.back().total_bytes_compressed += row_group_size;
auto rows = adaptive_chunk_size(row_group);
row_group_batches.back().adaptive_chunk_size = rows ? rows : format_settings.parquet.max_block_size;

has_row_groups_to_read = true;
}

if (has_row_groups_to_read)
{
createArrowFileIfNotCreated();
}
}

void ParquetBlockInputFormat::setStorageRelatedUniqueKey(const Settings & settings, const String & key_)
{
metadata_cache.key = key_;
metadata_cache.use_cache = settings[Setting::input_format_parquet_use_metadata_cache];
}

void ParquetBlockInputFormat::initializeRowGroupBatchReader(size_t row_group_batch_idx)
Expand Down
15 changes: 15 additions & 0 deletions src/Processors/Formats/Impl/ParquetBlockInputFormat.h
Original file line number Diff line number Diff line change
Expand Up @@ -94,6 +94,7 @@ class ParquetBlockInputFormat : public IInputFormat
size_t getApproxBytesReadForChunk() const override { return previous_approx_bytes_read_for_chunk; }

void setBucketsToRead(const FileBucketInfoPtr & buckets_to_read_) override;
void setStorageRelatedUniqueKey(const Settings & settings, const String & key_) override;

private:
Chunk read() override;
Expand All @@ -113,6 +114,13 @@ class ParquetBlockInputFormat : public IInputFormat

void threadFunction(size_t row_group_batch_idx);

void createArrowFileIfNotCreated();
std::shared_ptr<parquet::FileMetaData> readMetadataFromFile();

std::shared_ptr<parquet::FileMetaData> getFileMetaData();

inline bool supportPrefetch() const;

// Data layout in the file:
//
// row group 0
Expand Down Expand Up @@ -361,6 +369,13 @@ class ParquetBlockInputFormat : public IInputFormat
bool is_initialized = false;
std::optional<std::unordered_map<String, String>> parquet_names_to_clickhouse;
std::optional<std::unordered_map<String, String>> clickhouse_names_to_parquet;
struct Cache
{
String key;
bool use_cache = false;
};

Cache metadata_cache;
};

class ArrowParquetSchemaReader : public ISchemaReader
Expand Down
20 changes: 20 additions & 0 deletions src/Processors/Formats/Impl/ParquetFileMetaDataCache.cpp
Original file line number Diff line number Diff line change
@@ -0,0 +1,20 @@
#include <Processors/Formats/Impl/ParquetFileMetaDataCache.h>

#if USE_PARQUET

namespace DB
{

ParquetFileMetaDataCache::ParquetFileMetaDataCache()
: CacheBase<String, parquet::FileMetaData>(CurrentMetrics::end(), CurrentMetrics::end(), 0)
{}

ParquetFileMetaDataCache * ParquetFileMetaDataCache::instance()
{
static ParquetFileMetaDataCache instance;
return &instance;
}

}

#endif
30 changes: 30 additions & 0 deletions src/Processors/Formats/Impl/ParquetFileMetaDataCache.h
Original file line number Diff line number Diff line change
@@ -0,0 +1,30 @@
#pragma once

#include "config.h"

#if USE_PARQUET

namespace parquet
{

class FileMetaData;

}

#include <Common/CacheBase.h>

namespace DB
{

class ParquetFileMetaDataCache : public CacheBase<String, parquet::FileMetaData>
{
public:
static ParquetFileMetaDataCache * instance();

private:
ParquetFileMetaDataCache();
};

}

#endif
Loading
Loading