diff --git a/contrib/arrow b/contrib/arrow index 68dd24ec070e..82192eef72dc 160000 --- a/contrib/arrow +++ b/contrib/arrow @@ -1 +1 @@ -Subproject commit 68dd24ec070e18633a1b78112a463cfa333c6ada +Subproject commit 82192eef72dc8f1288fc4a87ee1337d24d627622 diff --git a/contrib/arrow-cmake/CMakeLists.txt b/contrib/arrow-cmake/CMakeLists.txt index d578b8aa29e2..ac38dd56402e 100644 --- a/contrib/arrow-cmake/CMakeLists.txt +++ b/contrib/arrow-cmake/CMakeLists.txt @@ -24,17 +24,25 @@ if (NOT ENABLE_PARQUET) return() endif() -# Freebsd: ../contrib/arrow/cpp/src/arrow/util/bit-util.h:27:10: fatal error: endian.h: No such file or directory -if (OS_FREEBSD) - message (FATAL_ERROR "Using internal parquet library on FreeBSD is not supported") +# Support C11 +if(NOT DEFINED CMAKE_C_STANDARD) + set(CMAKE_C_STANDARD 11) +endif() + +# This ensures that a standard higher than the minimum can be passed correctly +if(NOT DEFINED CMAKE_CXX_STANDARD) + set(CMAKE_CXX_STANDARD 20) +elseif(${CMAKE_CXX_STANDARD} VERSION_LESS 20) + message(FATAL_ERROR "Cannot set a CMAKE_CXX_STANDARD smaller than 20") endif() -set (CMAKE_CXX_STANDARD 17) +# We require a C++20 compliant compiler +set(CMAKE_CXX_STANDARD_REQUIRED ON) -set(ARROW_VERSION "11.0.0") +set(ARROW_VERSION "23.0.0") string(REGEX MATCH "^[0-9]+\\.[0-9]+\\.[0-9]+" ARROW_BASE_VERSION "${ARROW_VERSION}") -set(ARROW_VERSION_MAJOR "11") +set(ARROW_VERSION_MAJOR "23") set(ARROW_VERSION_MINOR "0") set(ARROW_VERSION_PATCH "0") @@ -221,8 +229,8 @@ target_include_directories(_orc SYSTEM PRIVATE set(LIBRARY_DIR "${ClickHouse_SOURCE_DIR}/contrib/arrow/cpp/src/arrow") set(ARROW_GENERATED_SRC_DIR ${CMAKE_CURRENT_BINARY_DIR}/cpp/src) -configure_file(${LIBRARY_DIR}/util/config.h.cmake ${ARROW_GENERATED_SRC_DIR}/arrow/util/config.h) - +configure_file("${LIBRARY_DIR}/util/config.h.cmake" "${ARROW_GENERATED_SRC_DIR}/arrow/util/config.h" ESCAPE_QUOTES) +configure_file("${LIBRARY_DIR}/util/config_internal.h.cmake" "${ARROW_GENERATED_SRC_DIR}/arrow/util/config_internal.h" ESCAPE_QUOTES) # arrow/cpp/src/arrow/CMakeLists.txt (ARROW_SRCS + ARROW_COMPUTE + ARROW_IPC) # find . \( -iname \*.cc -o -iname \*.cpp -o -iname \*.c \) | sort | awk '{print "\"${LIBRARY_DIR}" substr($1,2) "\"" }' | grep -v 'test.cc' | grep -v 'json' | grep -v 'flight' \| @@ -250,6 +258,7 @@ set(ARROW_SRCS "${LIBRARY_DIR}/array/concatenate.cc" "${LIBRARY_DIR}/array/data.cc" "${LIBRARY_DIR}/array/diff.cc" + "${LIBRARY_DIR}/array/statistics.cc" "${LIBRARY_DIR}/array/util.cc" "${LIBRARY_DIR}/array/validate.cc" "${LIBRARY_DIR}/buffer.cc" @@ -276,7 +285,6 @@ set(ARROW_SRCS "${LIBRARY_DIR}/compute/kernels/codegen_internal.cc" "${LIBRARY_DIR}/compute/kernels/hash_aggregate.cc" "${LIBRARY_DIR}/compute/kernels/ree_util_internal.cc" - "${LIBRARY_DIR}/compute/kernels/row_encoder.cc" "${LIBRARY_DIR}/compute/kernels/scalar_arithmetic.cc" "${LIBRARY_DIR}/compute/kernels/scalar_boolean.cc" "${LIBRARY_DIR}/compute/kernels/scalar_cast_boolean.cc" @@ -298,6 +306,7 @@ set(ARROW_SRCS "${LIBRARY_DIR}/compute/kernels/scalar_temporal_binary.cc" "${LIBRARY_DIR}/compute/kernels/scalar_temporal_unary.cc" "${LIBRARY_DIR}/compute/kernels/scalar_validity.cc" + "${LIBRARY_DIR}/compute/kernels/temporal_internal.cc" "${LIBRARY_DIR}/compute/kernels/util_internal.cc" "${LIBRARY_DIR}/compute/kernels/vector_array_sort.cc" "${LIBRARY_DIR}/compute/kernels/vector_cumulative_ops.cc" @@ -313,6 +322,7 @@ set(ARROW_SRCS "${LIBRARY_DIR}/compute/kernels/vector_selection_internal.cc" "${LIBRARY_DIR}/compute/kernels/vector_selection_take_internal.cc" "${LIBRARY_DIR}/compute/kernels/vector_sort.cc" + "${LIBRARY_DIR}/compute/kernels/vector_swizzle.cc" "${LIBRARY_DIR}/compute/key_hash_internal.cc" "${LIBRARY_DIR}/compute/key_map_internal.cc" "${LIBRARY_DIR}/compute/light_array_internal.cc" @@ -326,7 +336,11 @@ set(ARROW_SRCS "${LIBRARY_DIR}/config.cc" "${LIBRARY_DIR}/datum.cc" "${LIBRARY_DIR}/device.cc" + "${LIBRARY_DIR}/device_allocation_type_set.cc" "${LIBRARY_DIR}/extension_type.cc" + "${LIBRARY_DIR}/extension/bool8.cc" + "${LIBRARY_DIR}/extension/json.cc" + "${LIBRARY_DIR}/extension/uuid.cc" "${LIBRARY_DIR}/integration/c_data_integration_internal.cc" "${LIBRARY_DIR}/io/buffered.cc" "${LIBRARY_DIR}/io/caching.cc" @@ -374,7 +388,10 @@ set(ARROW_SRCS "${LIBRARY_DIR}/util/bitmap_builders.cc" "${LIBRARY_DIR}/util/bitmap_ops.cc" "${LIBRARY_DIR}/util/bpacking.cc" + "${LIBRARY_DIR}/util/bpacking_scalar.cc" + "${LIBRARY_DIR}/util/bpacking_simd_default.cc" "${LIBRARY_DIR}/util/byte_size.cc" + "${LIBRARY_DIR}/util/byte_stream_split_internal.cc" "${LIBRARY_DIR}/util/cancel.cc" "${LIBRARY_DIR}/util/compression.cc" "${LIBRARY_DIR}/util/counting_semaphore.cc" @@ -384,20 +401,25 @@ set(ARROW_SRCS "${LIBRARY_DIR}/util/decimal.cc" "${LIBRARY_DIR}/util/delimiting.cc" "${LIBRARY_DIR}/util/dict_util.cc" + "${LIBRARY_DIR}/util/fixed_width_internal.cc" "${LIBRARY_DIR}/util/float16.cc" "${LIBRARY_DIR}/util/formatting.cc" "${LIBRARY_DIR}/util/future.cc" + "${LIBRARY_DIR}/util/fuzz_internal.cc" "${LIBRARY_DIR}/util/hashing.cc" "${LIBRARY_DIR}/util/int_util.cc" "${LIBRARY_DIR}/util/io_util.cc" "${LIBRARY_DIR}/util/key_value_metadata.cc" "${LIBRARY_DIR}/util/list_util.cc" + "${LIBRARY_DIR}/util/logger.cc" "${LIBRARY_DIR}/util/logging.cc" + "${LIBRARY_DIR}/util/math_internal.cc" "${LIBRARY_DIR}/util/memory.cc" "${LIBRARY_DIR}/util/mutex.cc" "${LIBRARY_DIR}/util/ree_util.cc" + "${LIBRARY_DIR}/util/secure_string.cc" "${LIBRARY_DIR}/util/string.cc" - "${LIBRARY_DIR}/util/string_builder.cc" + "${LIBRARY_DIR}/util/string_util.cc" "${LIBRARY_DIR}/util/task_group.cc" "${LIBRARY_DIR}/util/tdigest.cc" "${LIBRARY_DIR}/util/thread_pool.cc" @@ -410,7 +432,7 @@ set(ARROW_SRCS "${LIBRARY_DIR}/util/utf8.cc" "${LIBRARY_DIR}/util/value_parsing.cc" "${LIBRARY_DIR}/vendored/base64.cpp" - "${LIBRARY_DIR}/vendored/datetime/tz.cpp" + "${LIBRARY_DIR}/vendored/datetime.cpp" "${LIBRARY_DIR}/vendored/double-conversion/bignum-dtoa.cc" "${LIBRARY_DIR}/vendored/double-conversion/bignum.cc" "${LIBRARY_DIR}/vendored/double-conversion/cached-powers.cc" @@ -442,6 +464,12 @@ set(ARROW_SRCS "${ARROW_SRC_DIR}/arrow/adapters/orc/options.cc" ) +add_definitions(-DARROW_WITH_BROTLI) +SET(ARROW_SRCS "${LIBRARY_DIR}/util/compression_brotli.cc" ${ARROW_SRCS}) + +add_definitions(-DARROW_WITH_BZ2) +SET(ARROW_SRCS "${LIBRARY_DIR}/util/compression_bz2.cc" ${ARROW_SRCS}) + add_definitions(-DARROW_WITH_LZ4) SET(ARROW_SRCS "${LIBRARY_DIR}/util/compression_lz4.cc" ${ARROW_SRCS}) @@ -454,9 +482,6 @@ SET(ARROW_SRCS "${LIBRARY_DIR}/util/compression_zlib.cc" ${ARROW_SRCS}) add_definitions(-DARROW_WITH_ZSTD) SET(ARROW_SRCS "${LIBRARY_DIR}/util/compression_zstd.cc" ${ARROW_SRCS}) -add_definitions(-DARROW_WITH_BROTLI) -SET(ARROW_SRCS "${LIBRARY_DIR}/util/compression_brotli.cc" ${ARROW_SRCS}) - add_library(_arrow ${ARROW_SRCS}) add_library(ch_contrib::arrow ALIAS _arrow) @@ -473,6 +498,8 @@ target_link_libraries(_arrow PRIVATE ch_contrib::zlib ch_contrib::zstd ch_contrib::brotli + ch_contrib::bzip2 + ch_contrib::curl ) target_link_libraries(_arrow PUBLIC _orc) @@ -493,19 +520,23 @@ set(LIBRARY_DIR "${ClickHouse_SOURCE_DIR}/contrib/arrow/cpp/src/parquet") set(GEN_LIBRARY_DIR "${ClickHouse_SOURCE_DIR}/contrib/arrow/cpp/src/generated") # arrow/cpp/src/parquet/CMakeLists.txt set(PARQUET_SRCS + "${LIBRARY_DIR}/arrow/fuzz_internal.cc" "${LIBRARY_DIR}/arrow/path_internal.cc" "${LIBRARY_DIR}/arrow/reader.cc" "${LIBRARY_DIR}/arrow/reader_internal.cc" "${LIBRARY_DIR}/arrow/schema.cc" "${LIBRARY_DIR}/arrow/schema_internal.cc" + "${LIBRARY_DIR}/arrow/variant_internal.cc" "${LIBRARY_DIR}/arrow/writer.cc" "${LIBRARY_DIR}/benchmark_util.cc" "${LIBRARY_DIR}/bloom_filter.cc" "${LIBRARY_DIR}/bloom_filter_reader.cc" + "${LIBRARY_DIR}/chunker_internal.cc" "${LIBRARY_DIR}/column_reader.cc" "${LIBRARY_DIR}/column_scanner.cc" "${LIBRARY_DIR}/column_writer.cc" - "${LIBRARY_DIR}/encoding.cc" + "${LIBRARY_DIR}/decoder.cc" + "${LIBRARY_DIR}/encoder.cc" "${LIBRARY_DIR}/encryption/crypto_factory.cc" "${LIBRARY_DIR}/encryption/encryption.cc" "${LIBRARY_DIR}/encryption/encryption_internal.cc" @@ -525,6 +556,9 @@ set(PARQUET_SRCS "${LIBRARY_DIR}/exception.cc" "${LIBRARY_DIR}/file_reader.cc" "${LIBRARY_DIR}/file_writer.cc" + "${LIBRARY_DIR}/geospatial/statistics.cc" + "${LIBRARY_DIR}/geospatial/util_internal.cc" + "${LIBRARY_DIR}/geospatial/util_json_internal.cc" "${LIBRARY_DIR}/level_comparison.cc" "${LIBRARY_DIR}/level_comparison_avx2.cc" "${LIBRARY_DIR}/level_conversion.cc" @@ -535,13 +569,13 @@ set(PARQUET_SRCS "${LIBRARY_DIR}/printer.cc" "${LIBRARY_DIR}/properties.cc" "${LIBRARY_DIR}/schema.cc" + "${LIBRARY_DIR}/size_statistics.cc" "${LIBRARY_DIR}/statistics.cc" "${LIBRARY_DIR}/stream_reader.cc" "${LIBRARY_DIR}/stream_writer.cc" "${LIBRARY_DIR}/types.cc" "${LIBRARY_DIR}/xxhasher.cc" - "${GEN_LIBRARY_DIR}/parquet_constants.cpp" "${GEN_LIBRARY_DIR}/parquet_types.cpp" ) #list(TRANSFORM PARQUET_SRCS PREPEND "${LIBRARY_DIR}/") # cmake 3.12 @@ -558,7 +592,8 @@ target_link_libraries(_parquet PRIVATE boost::headers_only boost::regex - OpenSSL::Crypto OpenSSL::SSL) + OpenSSL::Crypto OpenSSL::SSL + ch_contrib::rapidjson) if (SANITIZE STREQUAL "undefined") target_compile_options(_parquet PRIVATE -fno-sanitize=undefined) diff --git a/contrib/arrow-cmake/cpp/src/arrow/util/config.h b/contrib/arrow-cmake/cpp/src/arrow/util/config.h index cacff7b16cb5..5fa5782f364f 100644 --- a/contrib/arrow-cmake/cpp/src/arrow/util/config.h +++ b/contrib/arrow-cmake/cpp/src/arrow/util/config.h @@ -15,25 +15,21 @@ // specific language governing permissions and limitations // under the License. -#define ARROW_VERSION_MAJOR 11 +#define ARROW_VERSION_MAJOR 23 #define ARROW_VERSION_MINOR 0 #define ARROW_VERSION_PATCH 0 #define ARROW_VERSION ((ARROW_VERSION_MAJOR * 1000) + ARROW_VERSION_MINOR) * 1000 + ARROW_VERSION_PATCH -#define ARROW_VERSION_STRING "11.0.0" +#define ARROW_VERSION_STRING "23.0.0" -#define ARROW_SO_VERSION "1100" -#define ARROW_FULL_SO_VERSION "1100.0.0" +#define ARROW_SO_VERSION "2300" +#define ARROW_FULL_SO_VERSION "2300.0.0" #define ARROW_CXX_COMPILER_ID "Clang" #define ARROW_CXX_COMPILER_VERSION "ClickHouse" -#define ARROW_CXX_COMPILER_FLAGS "" #define ARROW_BUILD_TYPE "" -#define ARROW_GIT_ID "" -#define ARROW_GIT_DESCRIPTION "" - #define ARROW_PACKAGE_KIND "" /* #undef ARROW_COMPUTE */ @@ -47,15 +43,26 @@ /* #undef ARROW_JEMALLOC */ /* #undef ARROW_JEMALLOC_VENDORED */ /* #undef ARROW_JSON */ +/* #undef ARROW_MIMALLOC */ /* #undef ARROW_ORC */ /* #undef ARROW_PARQUET */ /* #undef ARROW_SUBSTRAIT */ +/* #undef ARROW_AZURE */ +/* #undef ARROW_ENABLE_THREADING */ /* #undef ARROW_GCS */ +/* #undef ARROW_HDFS */ /* #undef ARROW_S3 */ +/* #undef ARROW_USE_GLOG */ /* #undef ARROW_USE_NATIVE_INT128 */ +/* #undef ARROW_WITH_BROTLI */ +/* #undef ARROW_WITH_BZ2 */ +/* #undef ARROW_WITH_LZ4 */ /* #undef ARROW_WITH_MUSL */ /* #undef ARROW_WITH_OPENTELEMETRY */ -/* #undef ARROW_WITH_UCX */ - -/* #undef GRPCPP_PP_INCLUDE */ +/* #undef ARROW_WITH_RE2 */ +/* #undef ARROW_WITH_SNAPPY */ +/* #undef ARROW_WITH_UTF8PROC */ +/* #undef ARROW_WITH_ZLIB */ +/* #undef ARROW_WITH_ZSTD */ +/* #undef PARQUET_REQUIRE_ENCRYPTION */ diff --git a/contrib/arrow-cmake/cpp/src/arrow/util/config_internal.h b/contrib/arrow-cmake/cpp/src/arrow/util/config_internal.h new file mode 100644 index 000000000000..01112c4db37d --- /dev/null +++ b/contrib/arrow-cmake/cpp/src/arrow/util/config_internal.h @@ -0,0 +1,24 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +// These variables are not exposed as they can make compilation caching +// and increment builds less efficient. + +#define ARROW_CXX_COMPILER_FLAGS "" + +#define ARROW_GIT_ID "" +#define ARROW_GIT_DESCRIPTION "" diff --git a/contrib/arrow-cmake/flight.cmake b/contrib/arrow-cmake/flight.cmake index f29fcdec1e0b..c028ab8de13c 100644 --- a/contrib/arrow-cmake/flight.cmake +++ b/contrib/arrow-cmake/flight.cmake @@ -37,11 +37,6 @@ add_custom_command( # protobuf-internal.cc set(ARROW_FLIGHT_SRCS ${ARROW_FLIGHT_GENERATED_SRC_DIR}/Flight.pb.cc - ${ARROW_FLIGHT_SRC_DIR}/transport/grpc/grpc_client.cc - ${ARROW_FLIGHT_SRC_DIR}/transport/grpc/grpc_server.cc - ${ARROW_FLIGHT_SRC_DIR}/transport/grpc/protocol_grpc_internal.cc - ${ARROW_FLIGHT_SRC_DIR}/transport/grpc/serialization_internal.cc - ${ARROW_FLIGHT_SRC_DIR}/transport/grpc/util_internal.cc ${ARROW_FLIGHT_SRC_DIR}/client.cc ${ARROW_FLIGHT_SRC_DIR}/client_cookie_middleware.cc ${ARROW_FLIGHT_SRC_DIR}/client_tracing_middleware.cc @@ -50,10 +45,14 @@ set(ARROW_FLIGHT_SRCS ${ARROW_FLIGHT_SRC_DIR}/serialization_internal.cc ${ARROW_FLIGHT_SRC_DIR}/server.cc ${ARROW_FLIGHT_SRC_DIR}/server_auth.cc - ${ARROW_FLIGHT_SRC_DIR}/server_middleware.cc ${ARROW_FLIGHT_SRC_DIR}/server_tracing_middleware.cc ${ARROW_FLIGHT_SRC_DIR}/transport.cc ${ARROW_FLIGHT_SRC_DIR}/transport_server.cc + ${ARROW_FLIGHT_SRC_DIR}/transport/grpc/grpc_client.cc + ${ARROW_FLIGHT_SRC_DIR}/transport/grpc/grpc_server.cc + ${ARROW_FLIGHT_SRC_DIR}/transport/grpc/protocol_grpc_internal.cc + ${ARROW_FLIGHT_SRC_DIR}/transport/grpc/serialization_internal.cc + ${ARROW_FLIGHT_SRC_DIR}/transport/grpc/util_internal.cc ${ARROW_FLIGHT_SRC_DIR}/types.cc ) diff --git a/contrib/flatbuffers b/contrib/flatbuffers index 0100f6a57798..0bed8cd4a001 160000 --- a/contrib/flatbuffers +++ b/contrib/flatbuffers @@ -1 +1 @@ -Subproject commit 0100f6a5779831fa7a651e4b67ef389a8752bd9b +Subproject commit 0bed8cd4a001850de9591563df99b435349ba05e diff --git a/docs/en/sql-reference/statements/system.md b/docs/en/sql-reference/statements/system.md index de12fbc9ed2e..5fdafd0c0b34 100644 --- a/docs/en/sql-reference/statements/system.md +++ b/docs/en/sql-reference/statements/system.md @@ -101,6 +101,10 @@ Clears the mark cache. Clears the iceberg metadata cache. +## SYSTEM DROP PARQUET METADATA CACHE {#drop-parquet-metadata-cache} + +Clears the parquet metadata cache. + ## SYSTEM CLEAR|DROP TEXT INDEX CACHES {#drop-text-index-caches} Clears the text index's header, dictionary and postings caches. diff --git a/programs/local/LocalServer.cpp b/programs/local/LocalServer.cpp index aed93185172e..a1fd5c5df2c5 100644 --- a/programs/local/LocalServer.cpp +++ b/programs/local/LocalServer.cpp @@ -123,6 +123,10 @@ namespace ServerSetting extern const ServerSettingsUInt64 iceberg_metadata_files_cache_size; extern const ServerSettingsUInt64 iceberg_metadata_files_cache_max_entries; extern const ServerSettingsDouble iceberg_metadata_files_cache_size_ratio; + extern const ServerSettingsString parquet_metadata_cache_policy; + extern const ServerSettingsUInt64 parquet_metadata_cache_size; + extern const ServerSettingsUInt64 parquet_metadata_cache_max_entries; + extern const ServerSettingsDouble parquet_metadata_cache_size_ratio; extern const ServerSettingsUInt64 max_active_parts_loading_thread_pool_size; extern const ServerSettingsUInt64 max_io_thread_pool_free_size; extern const ServerSettingsUInt64 max_io_thread_pool_size; @@ -972,6 +976,18 @@ void LocalServer::processConfig() } global_context->setIcebergMetadataFilesCache(iceberg_metadata_files_cache_policy, iceberg_metadata_files_cache_size, iceberg_metadata_files_cache_max_entries, iceberg_metadata_files_cache_size_ratio); #endif +#if USE_PARQUET + String parquet_metadata_cache_policy = server_settings[ServerSetting::parquet_metadata_cache_policy]; + size_t parquet_metadata_cache_size = server_settings[ServerSetting::parquet_metadata_cache_size]; + size_t parquet_metadata_cache_max_entries = server_settings[ServerSetting::parquet_metadata_cache_max_entries]; + double parquet_metadata_cache_size_ratio = server_settings[ServerSetting::parquet_metadata_cache_size_ratio]; + if (parquet_metadata_cache_size > max_cache_size) + { + parquet_metadata_cache_size = max_cache_size; + LOG_INFO(log, "Lowered Parquet metadata cache size to {} because the system has limited RAM", formatReadableSizeWithBinarySuffix(parquet_metadata_cache_size)); + } + global_context->setParquetMetadataCache(parquet_metadata_cache_policy, parquet_metadata_cache_size, parquet_metadata_cache_max_entries, parquet_metadata_cache_size_ratio); +#endif Names allowed_disks_table_engines; splitInto<','>(allowed_disks_table_engines, server_settings[ServerSetting::allowed_disks_for_table_engines].value); diff --git a/programs/server/Server.cpp b/programs/server/Server.cpp index 891b305c910e..d60c7d6d654e 100644 --- a/programs/server/Server.cpp +++ b/programs/server/Server.cpp @@ -164,10 +164,6 @@ # include #endif -#if USE_PARQUET -# include -#endif - /// A minimal file used when the server is run without installation constexpr unsigned char resource_embedded_xml[] = @@ -267,6 +263,10 @@ namespace ServerSetting extern const ServerSettingsUInt64 iceberg_metadata_files_cache_size; extern const ServerSettingsUInt64 iceberg_metadata_files_cache_max_entries; extern const ServerSettingsDouble iceberg_metadata_files_cache_size_ratio; + extern const ServerSettingsString parquet_metadata_cache_policy; + extern const ServerSettingsUInt64 parquet_metadata_cache_size; + extern const ServerSettingsUInt64 parquet_metadata_cache_max_entries; + extern const ServerSettingsDouble parquet_metadata_cache_size_ratio; extern const ServerSettingsUInt64 io_thread_pool_queue_size; extern const ServerSettingsBool jemalloc_enable_global_profiler; extern const ServerSettingsBool jemalloc_collect_global_profile_samples_in_trace_log; @@ -423,7 +423,6 @@ namespace ServerSetting extern const ServerSettingsUInt64 object_storage_list_objects_cache_ttl; extern const ServerSettingsUInt64 object_storage_list_objects_cache_size; extern const ServerSettingsUInt64 object_storage_list_objects_cache_max_entries; - extern const ServerSettingsUInt64 input_format_parquet_metadata_cache_max_size; } namespace ErrorCodes @@ -2038,6 +2037,18 @@ try } global_context->setIcebergMetadataFilesCache(iceberg_metadata_files_cache_policy, iceberg_metadata_files_cache_size, iceberg_metadata_files_cache_max_entries, iceberg_metadata_files_cache_size_ratio); #endif +#if USE_PARQUET + String parquet_metadata_cache_policy = server_settings[ServerSetting::parquet_metadata_cache_policy]; + size_t parquet_metadata_cache_size = server_settings[ServerSetting::parquet_metadata_cache_size]; + size_t parquet_metadata_cache_max_entries = server_settings[ServerSetting::parquet_metadata_cache_max_entries]; + double parquet_metadata_cache_size_ratio = server_settings[ServerSetting::parquet_metadata_cache_size_ratio]; + if (parquet_metadata_cache_size > max_cache_size) + { + parquet_metadata_cache_size = max_cache_size; + LOG_INFO(log, "Lowered Parquet metadata cache size to {} because the system has limited RAM", formatReadableSizeWithBinarySuffix(parquet_metadata_cache_size)); + } + global_context->setParquetMetadataCache(parquet_metadata_cache_policy, parquet_metadata_cache_size, parquet_metadata_cache_max_entries, parquet_metadata_cache_size_ratio); +#endif Names allowed_disks_table_engines; splitInto<','>(allowed_disks_table_engines, server_settings[ServerSetting::allowed_disks_for_table_engines].value); @@ -2757,10 +2768,6 @@ try auto replicas_reconnector = ReplicasReconnector::init(global_context); -#if USE_PARQUET - ParquetFileMetaDataCache::instance()->setMaxSizeInBytes(server_settings[ServerSetting::input_format_parquet_metadata_cache_max_size]); -#endif - /// Set current database name before loading tables and databases because /// system logs may copy global context. std::string default_database = server_settings[ServerSetting::default_database]; diff --git a/src/Common/CurrentMetrics.cpp b/src/Common/CurrentMetrics.cpp index d8a3c4df0699..1e49963a444c 100644 --- a/src/Common/CurrentMetrics.cpp +++ b/src/Common/CurrentMetrics.cpp @@ -320,6 +320,8 @@ M(AsyncInsertCacheSize, "Number of async insert hash id in cache") \ M(IcebergMetadataFilesCacheBytes, "Size of the Iceberg metadata cache in bytes") \ M(IcebergMetadataFilesCacheFiles, "Number of cached files in the Iceberg metadata cache") \ + M(ParquetMetadataCacheBytes, "Size of the Parquet metadata cache in bytes") \ + M(ParquetMetadataCacheFiles, "Number of cached files in the Parquet metadata cache") \ M(AvroSchemaCacheBytes, "Size of the Avro schema cache in bytes") \ M(AvroSchemaCacheCells, "Number of cached Avro schemas") \ M(AvroSchemaRegistryCacheBytes, "Size of the Avro schema registry cache in bytes") \ diff --git a/src/Common/ProfileEvents.cpp b/src/Common/ProfileEvents.cpp index 1c9f2d9b9ac0..399c2da7a0c2 100644 --- a/src/Common/ProfileEvents.cpp +++ b/src/Common/ProfileEvents.cpp @@ -103,6 +103,9 @@ M(IcebergMetadataFilesCacheStaleMisses, "Number of times iceberg metadata files have been found in the cache, but were considered stale and had to be read from (remote) disk.", ValueType::Number) \ M(IcebergMetadataFilesCacheWeightLost, "Approximate number of bytes evicted from the iceberg metadata cache.", ValueType::Number) \ M(IcebergMetadataReadWaitTimeMicroseconds, "Total time data readers spend waiting for iceberg metadata files to be read and parsed, summed across all reader threads.", ValueType::Microseconds) \ + M(ParquetMetadataCacheHits, "Number of times parquet metadata has been found in the cache.", ValueType::Number) \ + M(ParquetMetadataCacheMisses, "Number of times parquet metadata has not been found in the cache and had to be read from disk.", ValueType::Number) \ + M(ParquetMetadataCacheWeightLost, "Approximate number of bytes evicted from the parquet metadata cache.", ValueType::Number) \ M(IcebergIteratorInitializationMicroseconds, "Total time spent on synchronous initialization of iceberg data iterators.", ValueType::Microseconds) \ M(IcebergMetadataUpdateMicroseconds, "Total time spent on synchronous initialization of iceberg data iterators.", ValueType::Microseconds) \ M(IcebergMetadataReturnedObjectInfos, "Total number of returned object infos from iceberg iterator.", ValueType::Number) \ @@ -1328,8 +1331,6 @@ The server successfully detected this situation and will download merged part fr M(ObjectStorageListObjectsCacheMisses, "Number of times object storage list objects operation miss the cache.", ValueType::Number) \ M(ObjectStorageListObjectsCacheExactMatchHits, "Number of times object storage list objects operation hit the cache with an exact match.", ValueType::Number) \ M(ObjectStorageListObjectsCachePrefixMatchHits, "Number of times object storage list objects operation miss the cache using prefix matching.", ValueType::Number) \ - M(ParquetMetaDataCacheHits, "Number of times the read from filesystem cache hit the cache.", ValueType::Number) \ - M(ParquetMetaDataCacheMisses, "Number of times the read from filesystem cache miss the cache.", ValueType::Number) \ #ifdef APPLY_FOR_EXTERNAL_EVENTS #define APPLY_FOR_EVENTS(M) APPLY_FOR_BUILTIN_EVENTS(M) APPLY_FOR_EXTERNAL_EVENTS(M) diff --git a/src/Core/Defines.h b/src/Core/Defines.h index 6d4d7af70503..4c369d5c2531 100644 --- a/src/Core/Defines.h +++ b/src/Core/Defines.h @@ -119,6 +119,10 @@ static constexpr auto DEFAULT_ICEBERG_METADATA_CACHE_POLICY = "SLRU"; static constexpr auto DEFAULT_ICEBERG_METADATA_CACHE_MAX_SIZE = 1_GiB; static constexpr auto DEFAULT_ICEBERG_METADATA_CACHE_SIZE_RATIO = 0.5; static constexpr auto DEFAULT_ICEBERG_METADATA_CACHE_MAX_ENTRIES = 1000; +static constexpr auto DEFAULT_PARQUET_METADATA_CACHE_POLICY = "SLRU"; +static constexpr auto DEFAULT_PARQUET_METADATA_CACHE_MAX_SIZE = 512_MiB; +static constexpr auto DEFAULT_PARQUET_METADATA_CACHE_SIZE_RATIO = 0.5; +static constexpr auto DEFAULT_PARQUET_METADATA_CACHE_MAX_ENTRIES = 5000; static constexpr auto DEFAULT_QUERY_CONDITION_CACHE_POLICY = "SLRU"; static constexpr auto DEFAULT_QUERY_CONDITION_CACHE_MAX_SIZE = 100_MiB; static constexpr auto DEFAULT_QUERY_CONDITION_CACHE_SIZE_RATIO = 0.5l; diff --git a/src/Core/FormatFactorySettings.h b/src/Core/FormatFactorySettings.h index 3ee1da38c3d2..b8d7d4b8c4bd 100644 --- a/src/Core/FormatFactorySettings.h +++ b/src/Core/FormatFactorySettings.h @@ -1527,7 +1527,6 @@ Allow to write information about geo columns in parquet metadata and encode colu DECLARE(Bool, into_outfile_create_parent_directories, false, R"( Automatically create parent directories when using INTO OUTFILE if they do not already exists. )", 0) \ - DECLARE(Bool, input_format_parquet_use_metadata_cache, true, R"(Enable parquet file metadata caching)", 0) \ // End of FORMAT_FACTORY_SETTINGS #define OBSOLETE_FORMAT_SETTINGS(M, ALIAS) \ @@ -1537,6 +1536,7 @@ Automatically create parent directories when using INTO OUTFILE if they do not a MAKE_OBSOLETE(M, Bool, input_format_orc_import_nested, false) \ MAKE_OBSOLETE(M, Bool, output_format_enable_streaming, false) \ MAKE_OBSOLETE(M, Bool, input_format_parquet_use_native_reader, false) \ + MAKE_OBSOLETE(M, Bool, input_format_parquet_use_metadata_cache, false) \ #endif // __CLION_IDE__ diff --git a/src/Core/ServerSettings.cpp b/src/Core/ServerSettings.cpp index fa2dc98c303e..f5459d0513b0 100644 --- a/src/Core/ServerSettings.cpp +++ b/src/Core/ServerSettings.cpp @@ -498,6 +498,10 @@ namespace DECLARE(UInt64, iceberg_metadata_files_cache_size, DEFAULT_ICEBERG_METADATA_CACHE_MAX_SIZE, "Maximum size of iceberg metadata cache in bytes. Zero means disabled.", 0) \ DECLARE(UInt64, iceberg_metadata_files_cache_max_entries, DEFAULT_ICEBERG_METADATA_CACHE_MAX_ENTRIES, "Maximum size of iceberg metadata files cache in entries. Zero means disabled.", 0) \ DECLARE(Double, iceberg_metadata_files_cache_size_ratio, DEFAULT_ICEBERG_METADATA_CACHE_SIZE_RATIO, "The size of the protected queue (in case of SLRU policy) in the iceberg metadata cache relative to the cache's total size.", 0) \ + DECLARE(String, parquet_metadata_cache_policy, DEFAULT_PARQUET_METADATA_CACHE_POLICY, "Parquet metadata cache policy name.", 0) \ + DECLARE(UInt64, parquet_metadata_cache_size, DEFAULT_PARQUET_METADATA_CACHE_MAX_SIZE, "Maximum size of parquet metadata cache in bytes. Zero means disabled.", 0) \ + DECLARE(UInt64, parquet_metadata_cache_max_entries, DEFAULT_PARQUET_METADATA_CACHE_MAX_ENTRIES, "Maximum size of parquet metadata files cache in entries. Zero means disabled.", 0) \ + DECLARE(Double, parquet_metadata_cache_size_ratio, DEFAULT_PARQUET_METADATA_CACHE_SIZE_RATIO, "The size of the protected queue (in case of SLRU policy) in the parquet metadata cache relative to the cache's total size.", 0) \ DECLARE(String, allowed_disks_for_table_engines, "", "List of disks allowed for use with Iceberg", 0) \ DECLARE(String, vector_similarity_index_cache_policy, DEFAULT_VECTOR_SIMILARITY_INDEX_CACHE_POLICY, "Vector similarity index cache policy name.", 0) \ DECLARE(UInt64, vector_similarity_index_cache_size, DEFAULT_VECTOR_SIMILARITY_INDEX_CACHE_MAX_SIZE, R"(Size of cache for vector similarity indexes. Zero means disabled. @@ -1485,7 +1489,6 @@ The policy on how to perform a scheduling of CPU slots specified by `concurrent_ DECLARE(UInt64, object_storage_list_objects_cache_max_entries, 1000, "Maximum size of ObjectStorage list objects cache in entries. Zero means disabled.", 0) \ DECLARE(UInt64, object_storage_list_objects_cache_ttl, 3600, "Time to live of records in ObjectStorage list objects cache in seconds. Zero means unlimited", 0) \ DECLARE(Bool, enable_experimental_export_merge_tree_partition_feature, false, "Enable export replicated merge tree partition feature. It is experimental and not yet ready for production use.", 0) \ - DECLARE(UInt64, input_format_parquet_metadata_cache_max_size, 500000000, "Maximum size of parquet file metadata cache", 0) /// Settings with a path are server settings with at least one layer of nesting that have a fixed structure (no lists, lists, enumerations, repetitions, ...). #define LIST_OF_SERVER_SETTINGS_WITH_PATH(DECLARE, ALIAS) \ diff --git a/src/Core/Settings.cpp b/src/Core/Settings.cpp index 8882874422e6..3a78b66a5c3f 100644 --- a/src/Core/Settings.cpp +++ b/src/Core/Settings.cpp @@ -5161,6 +5161,14 @@ If turned on, iceberg table function and iceberg storage may utilize the iceberg Possible values: +- 0 - Disabled +- 1 - Enabled +)", 0) \ + DECLARE(Bool, use_parquet_metadata_cache, true, R"( +If turned on, parquet format may utilize the parquet metadata cache. + +Possible values: + - 0 - Disabled - 1 - Enabled )", 0) \ diff --git a/src/Core/SettingsChangesHistory.cpp b/src/Core/SettingsChangesHistory.cpp index 7c212510482c..1ea8ea4810bc 100644 --- a/src/Core/SettingsChangesHistory.cpp +++ b/src/Core/SettingsChangesHistory.cpp @@ -45,6 +45,8 @@ const VersionToSettingsChangesMap & getSettingsChangesHistory() // {"object_storage_max_nodes", 0, 0, "Antalya: New setting"}, {"s3_propagate_credentials_to_other_storages", false, false, "New setting"}, {"export_merge_tree_part_filename_pattern", "", "{part_name}_{checksum}", "New setting"}, + {"use_parquet_metadata_cache", false, true, "Enables cache of parquet file metadata."}, + {"input_format_parquet_use_metadata_cache", true, false, "Obsolete. No-op"}, // https://github.com/Altinity/ClickHouse/pull/586 {"iceberg_metadata_staleness_ms", 0, 0, "New setting allowing using cached metadata version at READ operations to prevent fetching from remote catalog"}, }); addSettingsChanges(settings_changes_history, "26.1", @@ -238,7 +240,6 @@ const VersionToSettingsChangesMap & getSettingsChangesHistory() {"allow_database_iceberg", false, true, "Turned ON by default for Antalya (alias)."}, {"allow_database_unity_catalog", false, true, "Turned ON by default for Antalya (alias)."}, {"allow_database_glue_catalog", false, true, "Turned ON by default for Antalya (alias)."}, - {"input_format_parquet_use_metadata_cache", true, true, "New setting, turned ON by default"}, // https://github.com/Altinity/ClickHouse/pull/586 {"iceberg_timezone_for_timestamptz", "UTC", "UTC", "New setting."}, {"object_storage_remote_initiator", false, false, "New setting."}, diff --git a/src/Formats/FormatFactory.cpp b/src/Formats/FormatFactory.cpp index 3220ad6f01a6..5ebb293588f6 100644 --- a/src/Formats/FormatFactory.cpp +++ b/src/Formats/FormatFactory.cpp @@ -14,6 +14,7 @@ #include #include #include +#include #include #include #include @@ -407,12 +408,13 @@ void FormatFactory::registerFileBucketInfo(const String & format, FileBucketInfo creators.file_bucket_info_creator = std::move(bucket_info); } -InputFormatPtr FormatFactory::getInput( +InputFormatPtr FormatFactory::getInputImpl( const String & name, ReadBuffer & _buf, const Block & sample, const ContextPtr & context, UInt64 max_block_size, + const std::optional & object_with_metadata, const std::optional & _format_settings, FormatParserSharedResourcesPtr parser_shared_resources, FormatFilterInfoPtr format_filter_info, @@ -424,7 +426,7 @@ InputFormatPtr FormatFactory::getInput( const std::optional & min_block_size_bytes) const { const auto& creators = getCreators(name); - if (!creators.input_creator && !creators.random_access_input_creator) + if (!creators.input_creator && !creators.random_access_input_creator && !creators.random_access_input_creator_with_metadata) throw Exception(ErrorCodes::FORMAT_IS_NOT_SUITABLE_FOR_INPUT, "Format {} is not suitable for input", name); /// Some formats use this thread pool. Lazily initialize it. @@ -434,7 +436,10 @@ InputFormatPtr FormatFactory::getInput( const FormatSettings format_settings = _format_settings ? *_format_settings : getFormatSettings(context); const Settings & settings = context->getSettingsRef(); - if (format_filter_info && (format_filter_info->prewhere_info || format_filter_info->row_level_filter) && (!creators.random_access_input_creator || !creators.prewhere_support_checker || !creators.prewhere_support_checker(format_settings))) + if (format_filter_info + && (format_filter_info->prewhere_info || format_filter_info->row_level_filter) + && ((!creators.random_access_input_creator && !creators.random_access_input_creator_with_metadata) + || !creators.prewhere_support_checker || !creators.prewhere_support_checker(format_settings))) throw Exception(ErrorCodes::LOGICAL_ERROR, "{} passed to format that doesn't support it", format_filter_info->prewhere_info ? "PREWHERE" : "ROW LEVEL FILTER"); @@ -465,7 +470,8 @@ InputFormatPtr FormatFactory::getInput( size_t max_parsing_threads = parser_shared_resources->getParsingThreadsPerReader(); bool parallel_parsing = max_parsing_threads > 1 && settings[Setting::input_format_parallel_parsing] - && creators.file_segmentation_engine_creator && !creators.random_access_input_creator && !need_only_count; + && creators.file_segmentation_engine_creator && !(creators.random_access_input_creator || creators.random_access_input_creator_with_metadata) + && !need_only_count; if (settings[Setting::max_memory_usage] && settings[Setting::min_chunk_bytes_for_parallel_parsing] * max_parsing_threads * 2 > settings[Setting::max_memory_usage]) @@ -482,10 +488,10 @@ InputFormatPtr FormatFactory::getInput( parallel_parsing = false; } - // Create the InputFormat in one of 3 ways. + // Create the InputFormat in one of a few ways. InputFormatPtr format; - + // 1. Try parallel processing first if (parallel_parsing) { const auto & input_getter = creators.input_creator; @@ -511,11 +517,22 @@ InputFormatPtr FormatFactory::getInput( format = std::make_shared(params); } + // 2. Prefer to use metadata-aware creator if we have object metadata + else if (creators.random_access_input_creator_with_metadata + && object_with_metadata.has_value() && format_settings.parquet.use_native_reader_v3) + { + format = creators.random_access_input_creator_with_metadata( + buf, sample, format_settings, context->getReadSettings(), is_remote_fs, + parser_shared_resources, format_filter_info, object_with_metadata, context); + } + // 3. Use the normal random access creator for formats that need to jump around in the file else if (creators.random_access_input_creator) { format = creators.random_access_input_creator( - buf, sample, format_settings, context->getReadSettings(), is_remote_fs, parser_shared_resources, format_filter_info); + buf, sample, format_settings, context->getReadSettings(), is_remote_fs, + parser_shared_resources, format_filter_info); } + // 4. Use the normal creator for sequential reading else { format = creators.input_creator(buf, sample, row_input_format_params, format_settings); @@ -540,6 +557,53 @@ InputFormatPtr FormatFactory::getInput( return format; } +InputFormatPtr FormatFactory::getInput( + const String & name, + ReadBuffer & buf, + const Block & sample, + const ContextPtr & context, + UInt64 max_block_size, + const std::optional & format_settings, + FormatParserSharedResourcesPtr parser_shared_resources, + FormatFilterInfoPtr format_filter_info, + bool is_remote_fs, + CompressionMethod compression, + bool need_only_count, + const std::optional & max_block_size_bytes, + const std::optional & min_block_size_rows, + const std::optional & min_block_size_bytes) const +{ + return getInputImpl(name, buf, sample, context, max_block_size, std::nullopt, + format_settings, parser_shared_resources, format_filter_info, + is_remote_fs, compression, need_only_count, + max_block_size_bytes, min_block_size_rows, min_block_size_bytes); +} + +// Overload with metadata +InputFormatPtr FormatFactory::getInputWithMetadata( + const String & name, + ReadBuffer & buf, + const Block & sample, + const ContextPtr & context, + UInt64 max_block_size, + const std::optional & object_with_metadata, + const std::optional & format_settings, + FormatParserSharedResourcesPtr parser_shared_resources, + FormatFilterInfoPtr format_filter_info, + bool is_remote_fs, + CompressionMethod compression, + bool need_only_count, + const std::optional & max_block_size_bytes, + const std::optional & min_block_size_rows, + const std::optional & min_block_size_bytes) const +{ + chassert(object_with_metadata.has_value()); + return getInputImpl(name, buf, sample, context, max_block_size, object_with_metadata, + format_settings, parser_shared_resources, format_filter_info, + is_remote_fs, compression, need_only_count, + max_block_size_bytes, min_block_size_rows, min_block_size_bytes); +} + std::unique_ptr FormatFactory::wrapReadBufferIfNeeded( ReadBuffer & buf, CompressionMethod compression, @@ -765,6 +829,17 @@ void FormatFactory::registerRandomAccessInputFormat(const String & name, RandomA KnownFormatNames::instance().add(name, /* case_insensitive = */ true); } +void FormatFactory::registerRandomAccessInputFormatWithMetadata(const String & name, RandomAccessInputCreatorWithMetadata input_creator) +{ + chassert(input_creator); + auto & creators = getOrCreateCreators(name); + if (creators.input_creator || creators.random_access_input_creator_with_metadata) + throw Exception(ErrorCodes::LOGICAL_ERROR, "FormatFactory: Input format {} is already registered", name); + creators.random_access_input_creator_with_metadata = std::move(input_creator); + registerFileExtension(name, name); + KnownFormatNames::instance().add(name, /* case_insensitive = */ true); +} + void FormatFactory::registerNonTrivialPrefixAndSuffixChecker(const String & name, NonTrivialPrefixAndSuffixChecker non_trivial_prefix_and_suffix_checker) { auto & target = getOrCreateCreators(name).non_trivial_prefix_and_suffix_checker; @@ -993,7 +1068,7 @@ String FormatFactory::getAdditionalInfoForSchemaCache(const String & name, const bool FormatFactory::isInputFormat(const String & name) const { auto it = dict.find(boost::to_lower_copy(name)); - return it != dict.end() && (it->second.input_creator || it->second.random_access_input_creator); + return it != dict.end() && (it->second.input_creator || it->second.random_access_input_creator || it->second.random_access_input_creator_with_metadata); } bool FormatFactory::isOutputFormat(const String & name) const @@ -1052,7 +1127,7 @@ std::vector FormatFactory::getAllInputFormats() const std::vector input_formats; for (const auto & [format_name, creators] : dict) { - if (creators.input_creator || creators.random_access_input_creator) + if (creators.input_creator || creators.random_access_input_creator || creators.random_access_input_creator_with_metadata) input_formats.push_back(format_name); } diff --git a/src/Formats/FormatFactory.h b/src/Formats/FormatFactory.h index 3186ca7ad95d..6df5484850c2 100644 --- a/src/Formats/FormatFactory.h +++ b/src/Formats/FormatFactory.h @@ -7,6 +7,7 @@ #include #include #include +#include #include #include #include @@ -112,11 +113,22 @@ class FormatFactory final : private boost::noncopyable, public IHints<2> FormatParserSharedResourcesPtr parser_shared_resources, FormatFilterInfoPtr format_filter_info)>; + using RandomAccessInputCreatorWithMetadata = std::function & object_with_metadata, + const ContextPtr & context)>; + using OutputCreator = std::function; + WriteBuffer & buf, + const Block & sample, + const FormatSettings & settings, + FormatFilterInfoPtr format_filter_info)>; /// Some input formats can have non trivial readPrefix() and readSuffix(), /// so in some cases there is no possibility to use parallel parsing. @@ -130,7 +142,10 @@ class FormatFactory final : private boost::noncopyable, public IHints<2> /// Obtain HTTP content-type for the output format. using ContentTypeGetter = std::function & settings)>; - using SchemaReaderCreator = std::function; + using SchemaReaderCreator = std::function; + using ExternalSchemaReaderCreator = std::function; /// Some formats can extract different schemas from the same source depending on @@ -153,6 +168,7 @@ class FormatFactory final : private boost::noncopyable, public IHints<2> FileBucketInfoCreator file_bucket_info_creator; BucketSplitterCreator bucket_splitter_creator; RandomAccessInputCreator random_access_input_creator; + RandomAccessInputCreatorWithMetadata random_access_input_creator_with_metadata; OutputCreator output_creator; FileSegmentationEngineCreator file_segmentation_engine_creator; SchemaReaderCreator schema_reader_creator; @@ -171,6 +187,22 @@ class FormatFactory final : private boost::noncopyable, public IHints<2> using FormatsDictionary = std::unordered_map; using FileExtensionFormats = std::unordered_map; + InputFormatPtr getInputImpl( + const String & name, + ReadBuffer & buf, + const Block & sample, + const ContextPtr & context, + UInt64 max_block_size, + const std::optional & object_with_metadata, + const std::optional & format_settings, + FormatParserSharedResourcesPtr parser_shared_resources, + FormatFilterInfoPtr format_filter_info, + bool is_remote_fs, + CompressionMethod compression, + bool need_only_count, + const std::optional & max_block_size_bytes, + const std::optional & min_block_size_rows, + const std::optional & min_block_size_bytes) const; public: static FormatFactory & instance(); @@ -199,6 +231,27 @@ class FormatFactory final : private boost::noncopyable, public IHints<2> const std::optional & min_block_size_rows = std::nullopt, const std::optional & min_block_size_bytes = std::nullopt) const; + /// much the same as getInput but allows for passing metadata from object storage + InputFormatPtr getInputWithMetadata( + const String & name, + ReadBuffer & buf, + const Block & sample, + const ContextPtr & context, + UInt64 max_block_size, + const std::optional & object_with_metadata, + const std::optional & format_settings = std::nullopt, + FormatParserSharedResourcesPtr parser_shared_resources = nullptr, + FormatFilterInfoPtr format_filter_info = nullptr, + // affects things like buffer sizes and parallel reading + bool is_remote_fs = false, + // allows to do: buf -> parallel read -> decompression, + // because parallel read after decompression is not possible + CompressionMethod compression = CompressionMethod::None, + bool need_only_count = false, + const std::optional & max_block_size_bytes = std::nullopt, + const std::optional & min_block_size_rows = std::nullopt, + const std::optional & min_block_size_bytes = std::nullopt) const; + /// Checks all preconditions. Returns ordinary format if parallel formatting cannot be done. OutputFormatPtr getOutputFormatParallelIfPossible( const String & name, @@ -222,6 +275,14 @@ class FormatFactory final : private boost::noncopyable, public IHints<2> /// Content-Type to set when sending HTTP response with this output format. String getContentType(const String & name, const std::optional & settings) const; + /// overload for formats that support object storage metadata + SchemaReaderPtr getSchemaReader( + const String & name, + ReadBuffer & buf, + const ContextPtr & context, + const RelativePathWithMetadata & metadata, + const std::optional & format_settings = std::nullopt) const; + SchemaReaderPtr getSchemaReader( const String & name, ReadBuffer & buf, @@ -250,6 +311,7 @@ class FormatFactory final : private boost::noncopyable, public IHints<2> /// Register format by its name. void registerInputFormat(const String & name, InputCreator input_creator); void registerRandomAccessInputFormat(const String & name, RandomAccessInputCreator input_creator); + void registerRandomAccessInputFormatWithMetadata(const String & name, RandomAccessInputCreatorWithMetadata input_creator_with_metadata); void registerOutputFormat(const String & name, OutputCreator output_creator); /// Register file extension for format diff --git a/src/Interpreters/Context.cpp b/src/Interpreters/Context.cpp index 876180dad66d..9055901a6e5a 100644 --- a/src/Interpreters/Context.cpp +++ b/src/Interpreters/Context.cpp @@ -43,6 +43,7 @@ #include #include #include +#include #include #include #include @@ -538,6 +539,9 @@ struct ContextSharedPart : boost::noncopyable mutable MMappedFileCachePtr mmap_cache TSA_GUARDED_BY(mutex); /// Cache of mmapped files to avoid frequent open/map/unmap/close and to reuse from several threads. #if USE_AVRO mutable IcebergMetadataFilesCachePtr iceberg_metadata_files_cache TSA_GUARDED_BY(mutex); /// Cache of deserialized iceberg metadata files. +#endif +#if USE_PARQUET + mutable ParquetMetadataCachePtr parquet_metadata_cache TSA_GUARDED_BY(mutex); /// Cache of deserialized parquet metadata files. #endif AsynchronousMetrics * asynchronous_metrics TSA_GUARDED_BY(mutex) = nullptr; /// Points to asynchronous metrics mutable PageCachePtr page_cache TSA_GUARDED_BY(mutex); /// Userspace page cache. @@ -4123,6 +4127,49 @@ void Context::clearIcebergMetadataFilesCache() const } #endif +#if USE_PARQUET +void Context::setParquetMetadataCache(const String & cache_policy, size_t max_size_in_bytes, size_t max_entries, double size_ratio) +{ + std::lock_guard lock(shared->mutex); + + if (shared->parquet_metadata_cache) + throw Exception(ErrorCodes::LOGICAL_ERROR, "Parquet metadata cache has been already created."); + + shared->parquet_metadata_cache = std::make_shared(cache_policy, max_size_in_bytes, max_entries, size_ratio); +} + +void Context::updateParquetMetadataCacheConfiguration(const Poco::Util::AbstractConfiguration & config) +{ + std::lock_guard lock(shared->mutex); + + if (!shared->parquet_metadata_cache) + throw Exception(ErrorCodes::LOGICAL_ERROR, "Parquet metadata cache was not created yet."); + + size_t max_size_in_bytes = config.getUInt64("parquet_metadata_cache_size", DEFAULT_PARQUET_METADATA_CACHE_MAX_SIZE); + size_t max_entries = config.getUInt64("parquet_metadata_cache_max_entries", DEFAULT_PARQUET_METADATA_CACHE_MAX_ENTRIES); + shared->parquet_metadata_cache->setMaxSizeInBytes(max_size_in_bytes); + shared->parquet_metadata_cache->setMaxCount(max_entries); +} + +std::shared_ptr Context::getParquetMetadataCache() const +{ + SharedLockGuard lock(shared->mutex); + + if (!shared->parquet_metadata_cache) + throw Exception(ErrorCodes::LOGICAL_ERROR, "Parquet metadata cache was not created yet."); + return shared->parquet_metadata_cache; +} + +void Context::clearParquetMetadataCache() const +{ + auto cache = getParquetMetadataCache(); + + /// Clear the cache without holding context mutex to avoid blocking context for a long time + if (cache) + cache->clear(); +} +#endif + void Context::setQueryConditionCache(const String & cache_policy, size_t max_size_in_bytes, double size_ratio) { std::lock_guard lock(shared->mutex); diff --git a/src/Interpreters/Context.h b/src/Interpreters/Context.h index a67d236151c0..b5087c44b01c 100644 --- a/src/Interpreters/Context.h +++ b/src/Interpreters/Context.h @@ -102,6 +102,7 @@ class PageCache; class MMappedFileCache; class UncompressedCache; class IcebergMetadataFilesCache; +class ParquetMetadataCache; class VectorSimilarityIndexCache; class TextIndexDictionaryBlockCache; class TextIndexHeaderCache; @@ -1376,6 +1377,13 @@ class Context: public ContextData, public std::enable_shared_from_this void clearIcebergMetadataFilesCache() const; #endif +#if USE_PARQUET + void setParquetMetadataCache(const String & cache_policy, size_t max_size_in_bytes, size_t max_entries, double size_ratio); + void updateParquetMetadataCacheConfiguration(const Poco::Util::AbstractConfiguration & config); + std::shared_ptr getParquetMetadataCache() const; + void clearParquetMetadataCache() const; +#endif + void setAllowedDisksForTableEngines(std::unordered_set && allowed_disks_) { allowed_disks = std::move(allowed_disks_); } const std::unordered_set & getAllowedDisksForTableEngines() const { return allowed_disks; } diff --git a/src/Interpreters/InterpreterSystemQuery.cpp b/src/Interpreters/InterpreterSystemQuery.cpp index 201b2bea43f5..b49caa8d8e4c 100644 --- a/src/Interpreters/InterpreterSystemQuery.cpp +++ b/src/Interpreters/InterpreterSystemQuery.cpp @@ -78,10 +78,6 @@ #include #endif -#if USE_PARQUET -#include -#endif - #if USE_AWS_S3 #include #endif @@ -403,6 +399,14 @@ BlockIO InterpreterSystemQuery::execute() break; #else throw Exception(ErrorCodes::SUPPORT_IS_DISABLED, "The server was compiled without the support for AVRO"); +#endif + case Type::CLEAR_PARQUET_METADATA_CACHE: +#if USE_PARQUET + getContext()->checkAccess(AccessType::SYSTEM_DROP_PARQUET_METADATA_CACHE); + system_context->clearParquetMetadataCache(); + break; +#else + throw Exception(ErrorCodes::SUPPORT_IS_DISABLED, "The server was compiled without the support for Parquet"); #endif case Type::CLEAR_PRIMARY_INDEX_CACHE: getContext()->checkAccess(AccessType::SYSTEM_DROP_PRIMARY_INDEX_CACHE); @@ -458,16 +462,6 @@ BlockIO InterpreterSystemQuery::execute() getContext()->clearQueryResultCache(query.query_result_cache_tag); break; } - case Type::DROP_PARQUET_METADATA_CACHE: - { -#if USE_PARQUET - getContext()->checkAccess(AccessType::SYSTEM_DROP_PARQUET_METADATA_CACHE); - ParquetFileMetaDataCache::instance()->clear(); - break; -#else - throw Exception(ErrorCodes::SUPPORT_IS_DISABLED, "The server was compiled without the support for Parquet"); -#endif - } case Type::CLEAR_COMPILED_EXPRESSION_CACHE: #if USE_EMBEDDED_COMPILER getContext()->checkAccess(AccessType::SYSTEM_DROP_COMPILED_EXPRESSION_CACHE); @@ -2012,6 +2006,7 @@ AccessRightsElements InterpreterSystemQuery::getRequiredAccessForDDLOnCluster() case Type::CLEAR_CONNECTIONS_CACHE: case Type::CLEAR_MARK_CACHE: case Type::CLEAR_ICEBERG_METADATA_CACHE: + case Type::CLEAR_PARQUET_METADATA_CACHE: case Type::CLEAR_PRIMARY_INDEX_CACHE: case Type::CLEAR_MMAP_CACHE: case Type::CLEAR_QUERY_CONDITION_CACHE: @@ -2031,7 +2026,6 @@ AccessRightsElements InterpreterSystemQuery::getRequiredAccessForDDLOnCluster() case Type::CLEAR_PAGE_CACHE: case Type::CLEAR_SCHEMA_CACHE: case Type::CLEAR_FORMAT_SCHEMA_CACHE: - case Type::DROP_PARQUET_METADATA_CACHE: case Type::CLEAR_S3_CLIENT_CACHE: case Type::DROP_OBJECT_STORAGE_LIST_OBJECTS_CACHE: { diff --git a/src/Parsers/ASTSystemQuery.cpp b/src/Parsers/ASTSystemQuery.cpp index 28b399e10ef3..c0e40ff3766a 100644 --- a/src/Parsers/ASTSystemQuery.cpp +++ b/src/Parsers/ASTSystemQuery.cpp @@ -575,9 +575,9 @@ void ASTSystemQuery::formatImpl(WriteBuffer & ostr, const FormatSettings & setti case Type::CLEAR_TEXT_INDEX_CACHES: case Type::CLEAR_COMPILED_EXPRESSION_CACHE: case Type::CLEAR_S3_CLIENT_CACHE: - case Type::DROP_PARQUET_METADATA_CACHE: case Type::CLEAR_ICEBERG_METADATA_CACHE: case Type::DROP_OBJECT_STORAGE_LIST_OBJECTS_CACHE: + case Type::CLEAR_PARQUET_METADATA_CACHE: case Type::RESET_COVERAGE: case Type::RESTART_REPLICAS: case Type::JEMALLOC_PURGE: diff --git a/src/Parsers/ASTSystemQuery.h b/src/Parsers/ASTSystemQuery.h index 358d4806257d..640ec5667742 100644 --- a/src/Parsers/ASTSystemQuery.h +++ b/src/Parsers/ASTSystemQuery.h @@ -43,8 +43,8 @@ class ASTSystemQuery : public IAST, public ASTQueryWithOnCluster CLEAR_QUERY_CONDITION_CACHE, CLEAR_QUERY_CACHE, CLEAR_COMPILED_EXPRESSION_CACHE, - DROP_PARQUET_METADATA_CACHE, CLEAR_ICEBERG_METADATA_CACHE, + CLEAR_PARQUET_METADATA_CACHE, CLEAR_FILESYSTEM_CACHE, CLEAR_DISTRIBUTED_CACHE, CLEAR_DISK_METADATA_CACHE, diff --git a/src/Parsers/ParserSystemQuery.cpp b/src/Parsers/ParserSystemQuery.cpp index a70c0feca955..38e1a7ff7e1f 100644 --- a/src/Parsers/ParserSystemQuery.cpp +++ b/src/Parsers/ParserSystemQuery.cpp @@ -258,6 +258,7 @@ bool ParserSystemQuery::parseImpl(IParser::Pos & pos, ASTPtr & node, Expected & {"DROP QUERY CACHE", Type::CLEAR_QUERY_CACHE}, {"DROP COMPILED EXPRESSION CACHE", Type::CLEAR_COMPILED_EXPRESSION_CACHE}, {"DROP ICEBERG METADATA CACHE", Type::CLEAR_ICEBERG_METADATA_CACHE}, + {"DROP PARQUET METADATA CACHE", Type::CLEAR_PARQUET_METADATA_CACHE}, {"DROP FILESYSTEM CACHE", Type::CLEAR_FILESYSTEM_CACHE}, {"DROP DISTRIBUTED CACHE", Type::CLEAR_DISTRIBUTED_CACHE}, {"DROP DISK METADATA CACHE", Type::CLEAR_DISK_METADATA_CACHE}, diff --git a/src/Processors/Formats/IInputFormat.h b/src/Processors/Formats/IInputFormat.h index bf0eccf2fa88..1a9cb76450f7 100644 --- a/src/Processors/Formats/IInputFormat.h +++ b/src/Processors/Formats/IInputFormat.h @@ -129,9 +129,6 @@ class IInputFormat : public ISource void needOnlyCount() { need_only_count = true; } - /// Set additional info/key/id related to underlying storage of the ReadBuffer - virtual void setStorageRelatedUniqueKey(const Settings & /*settings*/, const String & /*key*/) {} - protected: ReadBuffer & getReadBuffer() const { chassert(in); return *in; } diff --git a/src/Processors/Formats/Impl/Parquet/Decoding.cpp b/src/Processors/Formats/Impl/Parquet/Decoding.cpp index 609ecfd32dc4..f137cdf3f01a 100644 --- a/src/Processors/Formats/Impl/Parquet/Decoding.cpp +++ b/src/Processors/Formats/Impl/Parquet/Decoding.cpp @@ -4,7 +4,7 @@ #include #include -#include +#include namespace DB::ErrorCodes { diff --git a/src/Processors/Formats/Impl/Parquet/ReadManager.cpp b/src/Processors/Formats/Impl/Parquet/ReadManager.cpp index d88f9b59a6cb..e08a71019927 100644 --- a/src/Processors/Formats/Impl/Parquet/ReadManager.cpp +++ b/src/Processors/Formats/Impl/Parquet/ReadManager.cpp @@ -47,7 +47,9 @@ std::optional AtomicBitSet::findFirst() void ReadManager::init(FormatParserSharedResourcesPtr parser_shared_resources_, const std::optional> & buckets_to_read_) { parser_shared_resources = parser_shared_resources_; - reader.file_metadata = Reader::readFileMetaData(reader.prefetcher); + + if (reader.file_metadata.schema.empty()) + reader.file_metadata = Reader::readFileMetaData(reader.prefetcher); if (buckets_to_read_) { diff --git a/src/Processors/Formats/Impl/Parquet/Write.cpp b/src/Processors/Formats/Impl/Parquet/Write.cpp index 35b5e2c59891..42af8e997f0a 100644 --- a/src/Processors/Formats/Impl/Parquet/Write.cpp +++ b/src/Processors/Formats/Impl/Parquet/Write.cpp @@ -3,7 +3,7 @@ #include #include #include -#include +#include #include #include #include @@ -651,19 +651,19 @@ PODArray & compress(PODArray & source, PODArray & scratch, Com void encodeRepDefLevelsRLE(const UInt8 * data, size_t size, UInt8 max_level, PODArray & out) { - using arrow::util::RleEncoder; + using arrow::util::RleBitPackedEncoder; chassert(max_level > 0); size_t offset = out.size(); size_t prefix_size = sizeof(Int32); int bit_width = bitScanReverse(max_level) + 1; - int max_rle_size = RleEncoder::MaxBufferSize(bit_width, static_cast(size)) + - RleEncoder::MinBufferSize(bit_width); + auto max_rle_size = RleBitPackedEncoder::MaxBufferSize(bit_width, static_cast(size)) + + RleBitPackedEncoder::MinBufferSize(bit_width); out.resize(offset + prefix_size + max_rle_size); - RleEncoder encoder(reinterpret_cast(out.data() + offset + prefix_size), max_rle_size, bit_width); + RleBitPackedEncoder encoder(reinterpret_cast(out.data() + offset + prefix_size), static_cast(max_rle_size), bit_width); for (size_t i = 0; i < size; ++i) encoder.Put(data[i]); encoder.Flush(); diff --git a/src/Processors/Formats/Impl/ParquetBlockInputFormat.cpp b/src/Processors/Formats/Impl/ParquetBlockInputFormat.cpp index 3a60e4666ce0..c25418fb3268 100644 --- a/src/Processors/Formats/Impl/ParquetBlockInputFormat.cpp +++ b/src/Processors/Formats/Impl/ParquetBlockInputFormat.cpp @@ -1,5 +1,7 @@ #include #include +#include +#include #include #include #include @@ -31,13 +33,13 @@ #include #include #include +#include +#include #include #include #include #include #include -#include -#include #include #include #include @@ -49,8 +51,6 @@ namespace ProfileEvents extern const Event ParquetFetchWaitTimeMicroseconds; extern const Event ParquetReadRowGroups; extern const Event ParquetPrunedRowGroups; - extern const Event ParquetMetaDataCacheHits; - extern const Event ParquetMetaDataCacheMisses; } namespace CurrentMetrics @@ -69,12 +69,7 @@ namespace DB namespace Setting { - extern const SettingsBool input_format_parquet_use_metadata_cache; -} - -namespace ServerSetting -{ - extern const ServerSettingsUInt64 input_format_parquet_metadata_cache_max_size; + extern const SettingsBool use_parquet_metadata_cache; } namespace ErrorCodes @@ -83,6 +78,7 @@ namespace ErrorCodes extern const int MEMORY_LIMIT_EXCEEDED; extern const int CANNOT_READ_ALL_DATA; extern const int CANNOT_PARSE_NUMBER; + extern const int LOGICAL_ERROR; } namespace @@ -561,49 +557,6 @@ static std::vector getHyperrectangleForRowGroup(const parquet::FileMetaDa return hyperrectangle; } -std::shared_ptr ParquetBlockInputFormat::readMetadataFromFile() -{ - createArrowFileIfNotCreated(); - return parquet::ReadMetaData(arrow_file); -} - -std::shared_ptr ParquetBlockInputFormat::getFileMetaData() -{ - // in-memory cache is not implemented for local file operations, only for remote files - // there is a chance the user sets `input_format_parquet_use_metadata_cache=1` for a local file operation - // and the cache_key won't be set. Therefore, we also need to check for metadata_cache.key - if (!metadata_cache.use_cache || metadata_cache.key.empty()) - { - return readMetadataFromFile(); - } - - auto [parquet_file_metadata, loaded] = ParquetFileMetaDataCache::instance()->getOrSet( - metadata_cache.key, - [&]() - { - return readMetadataFromFile(); - } - ); - if (loaded) - ProfileEvents::increment(ProfileEvents::ParquetMetaDataCacheMisses); - else - ProfileEvents::increment(ProfileEvents::ParquetMetaDataCacheHits); - return parquet_file_metadata; -} - -void ParquetBlockInputFormat::createArrowFileIfNotCreated() -{ - if (arrow_file) - { - return; - } - - // Create arrow file adapter. - // TODO: Make the adapter do prefetching on IO threads, based on the full set of ranges that - // we'll need to read (which we know in advance). Use max_download_threads for that. - arrow_file = asArrowFile(*in, format_settings, is_stopped, "Parquet", PARQUET_MAGIC_BYTES, /* avoid_buffering */ true); -} - std::unordered_set getBloomFilterFilteringColumnKeys(const KeyCondition::RPN & rpn) { std::unordered_set column_keys; @@ -750,7 +703,7 @@ void ParquetBlockInputFormat::initializeIfNeeded() if (is_stopped) return; - metadata = getFileMetaData(); + metadata = parquet::ReadMetaData(arrow_file); if (buckets_to_read) { std::unordered_set set_to_read(buckets_to_read->row_group_ids.begin(), buckets_to_read->row_group_ids.end()); @@ -866,8 +819,6 @@ void ParquetBlockInputFormat::initializeIfNeeded() } } - bool has_row_groups_to_read = false; - auto skip_row_group_based_on_filters = [&](int row_group) { if (!format_settings.parquet.filter_push_down && !format_settings.parquet.bloom_filter_push_down) @@ -926,20 +877,7 @@ void ParquetBlockInputFormat::initializeIfNeeded() row_group_batches.back().total_bytes_compressed += row_group_size; auto rows = adaptive_chunk_size(row_group); row_group_batches.back().adaptive_chunk_size = rows ? rows : format_settings.parquet.max_block_size; - - has_row_groups_to_read = true; } - - if (has_row_groups_to_read) - { - createArrowFileIfNotCreated(); - } -} - -void ParquetBlockInputFormat::setStorageRelatedUniqueKey(const Settings & settings, const String & key_) -{ - metadata_cache.key = key_; - metadata_cache.use_cache = settings[Setting::input_format_parquet_use_metadata_cache]; } void ParquetBlockInputFormat::initializeRowGroupBatchReader(size_t row_group_batch_idx) @@ -1476,8 +1414,7 @@ void registerInputFormatParquet(FormatFactory & factory) return std::make_shared(); } ); - - factory.registerRandomAccessInputFormat( + factory.registerRandomAccessInputFormatWithMetadata( "Parquet", [](ReadBuffer & buf, const Block & sample, @@ -1485,31 +1422,75 @@ void registerInputFormatParquet(FormatFactory & factory) const ReadSettings & read_settings, bool is_remote_fs, FormatParserSharedResourcesPtr parser_shared_resources, - FormatFilterInfoPtr format_filter_info) -> InputFormatPtr + FormatFilterInfoPtr format_filter_info, + const std::optional & object_with_metadata, + const ContextPtr & context) -> InputFormatPtr { + auto lambda_logger = getLogger("ParquetMetadataCache"); size_t min_bytes_for_seek = is_remote_fs ? read_settings.remote_read_min_bytes_for_seek : settings.parquet.local_read_min_bytes_for_seek; if (settings.parquet.use_native_reader_v3) { + LOG_TRACE(lambda_logger, "using native reader v3 in ParquetBlockInputFormat with metadata cache"); + ParquetMetadataCachePtr metadata_cache = context->getParquetMetadataCache(); return std::make_shared( buf, std::make_shared(sample), settings, std::move(parser_shared_resources), std::move(format_filter_info), - min_bytes_for_seek); + min_bytes_for_seek, + metadata_cache, + object_with_metadata + ); } else { - return std::make_shared( - buf, - std::make_shared(sample), - settings, - std::move(parser_shared_resources), - std::move(format_filter_info), - min_bytes_for_seek); + throw Exception( + ErrorCodes::LOGICAL_ERROR, + "Implementation of ParquetBlockInputFormat using arrow reader didn't require blob metadata for initialization"); } }); + factory.registerRandomAccessInputFormat( + "Parquet", + [](ReadBuffer & buf, + const Block & sample, + const FormatSettings & settings, + const ReadSettings & read_settings, + bool is_remote_fs, + FormatParserSharedResourcesPtr parser_shared_resources, + FormatFilterInfoPtr format_filter_info) -> InputFormatPtr + { + auto lambda_logger = getLogger("ParquetMetadataCache"); + size_t min_bytes_for_seek + = is_remote_fs ? read_settings.remote_read_min_bytes_for_seek : settings.parquet.local_read_min_bytes_for_seek; + if (settings.parquet.use_native_reader_v3) + { + LOG_TRACE(lambda_logger, "using native reader v3 in ParquetBlockInputFormat with no metadata cache"); + return std::make_shared( + buf, + std::make_shared(sample), + settings, + std::move(parser_shared_resources), + std::move(format_filter_info), + min_bytes_for_seek, + nullptr, + std::nullopt + ); + } + else + { + LOG_TRACE(lambda_logger, "using arrow reader in ParquetBlockInputFormat without metadata cache"); + return std::make_shared( + buf, + std::make_shared(sample), + settings, + std::move(parser_shared_resources), + std::move(format_filter_info), + min_bytes_for_seek + ); + } + }); factory.markFormatSupportsSubsetOfColumns("Parquet"); factory.registerPrewhereSupportChecker("Parquet", [](const FormatSettings & settings) { @@ -1524,15 +1505,21 @@ void registerParquetSchemaReader(FormatFactory & factory) return std::make_shared(); }); factory.registerSchemaReader( - "Parquet", - [](ReadBuffer & buf, const FormatSettings & settings) -> SchemaReaderPtr + "Parquet", [](ReadBuffer & buf, const FormatSettings & settings) -> SchemaReaderPtr { + auto lambda_logger = getLogger("ParquetMetadataCache"); if (settings.parquet.use_native_reader_v3) + { + LOG_TRACE(lambda_logger, "using native reader v3 in ParquetSchemaReader"); return std::make_shared(buf, settings); + } else + { + LOG_TRACE(lambda_logger, "using arrow reader in ParquetSchemaReader"); return std::make_shared(buf, settings); + } } - ); + ); factory.registerAdditionalInfoForSchemaCacheGetter( "Parquet", diff --git a/src/Processors/Formats/Impl/ParquetBlockInputFormat.h b/src/Processors/Formats/Impl/ParquetBlockInputFormat.h index 29e34171e432..28cf9ce60c80 100644 --- a/src/Processors/Formats/Impl/ParquetBlockInputFormat.h +++ b/src/Processors/Formats/Impl/ParquetBlockInputFormat.h @@ -94,7 +94,6 @@ class ParquetBlockInputFormat : public IInputFormat size_t getApproxBytesReadForChunk() const override { return previous_approx_bytes_read_for_chunk; } void setBucketsToRead(const FileBucketInfoPtr & buckets_to_read_) override; - void setStorageRelatedUniqueKey(const Settings & settings, const String & key_) override; private: Chunk read() override; @@ -114,11 +113,6 @@ class ParquetBlockInputFormat : public IInputFormat void threadFunction(size_t row_group_batch_idx); - void createArrowFileIfNotCreated(); - std::shared_ptr readMetadataFromFile(); - - std::shared_ptr getFileMetaData(); - inline bool supportPrefetch() const; // Data layout in the file: @@ -369,13 +363,6 @@ class ParquetBlockInputFormat : public IInputFormat bool is_initialized = false; std::optional> parquet_names_to_clickhouse; std::optional> clickhouse_names_to_parquet; - struct Cache - { - String key; - bool use_cache = false; - }; - - Cache metadata_cache; }; class ArrowParquetSchemaReader : public ISchemaReader diff --git a/src/Processors/Formats/Impl/ParquetBlockOutputFormat.cpp b/src/Processors/Formats/Impl/ParquetBlockOutputFormat.cpp index b5729d12b039..fc55f5469beb 100644 --- a/src/Processors/Formats/Impl/ParquetBlockOutputFormat.cpp +++ b/src/Processors/Formats/Impl/ParquetBlockOutputFormat.cpp @@ -365,9 +365,9 @@ void ParquetBlockOutputFormat::writeUsingArrow(std::vector chunks) builder.compression_level(static_cast(format_settings.parquet.output_compression_level)); } - // Writing page index is disabled by default. - if (format_settings.parquet.write_page_index) - builder.enable_write_page_index(); + // Writing page index is enabled by default. + if (!format_settings.parquet.write_page_index) + builder.disable_write_page_index(); parquet::ArrowWriterProperties::Builder writer_props_builder; if (format_settings.parquet.output_compliant_nested_types) diff --git a/src/Processors/Formats/Impl/ParquetFileMetaDataCache.cpp b/src/Processors/Formats/Impl/ParquetFileMetaDataCache.cpp deleted file mode 100644 index 4aeb1d48bada..000000000000 --- a/src/Processors/Formats/Impl/ParquetFileMetaDataCache.cpp +++ /dev/null @@ -1,30 +0,0 @@ -#include - -#if USE_PARQUET - -#include - -namespace DB -{ - -size_t ParquetFileMetaDataWeightFunction::operator()(const parquet::FileMetaData & metadata) const -{ - /// TODO fix-me: using the size on disk is not ideal, but it is the simplest and best we can do for now. - /// this implementation is only used by the v1 reader, which is going to be deprecated and a new implementation for the v3 - /// reader will be added in the future. - return metadata.size(); -} - -ParquetFileMetaDataCache::ParquetFileMetaDataCache() - : CacheBase, ParquetFileMetaDataWeightFunction>(CurrentMetrics::end(), CurrentMetrics::end(), 0) -{} - -ParquetFileMetaDataCache * ParquetFileMetaDataCache::instance() -{ - static ParquetFileMetaDataCache instance; - return &instance; -} - -} - -#endif diff --git a/src/Processors/Formats/Impl/ParquetFileMetaDataCache.h b/src/Processors/Formats/Impl/ParquetFileMetaDataCache.h deleted file mode 100644 index 4578f2582391..000000000000 --- a/src/Processors/Formats/Impl/ParquetFileMetaDataCache.h +++ /dev/null @@ -1,35 +0,0 @@ -#pragma once - -#include "config.h" - -#if USE_PARQUET - -namespace parquet -{ - -class FileMetaData; - -} - -#include - -namespace DB -{ - -struct ParquetFileMetaDataWeightFunction -{ - size_t operator()(const parquet::FileMetaData & metadata) const; -}; - -class ParquetFileMetaDataCache : public CacheBase, ParquetFileMetaDataWeightFunction> -{ -public: - static ParquetFileMetaDataCache * instance(); - -private: - ParquetFileMetaDataCache(); -}; - -} - -#endif diff --git a/src/Processors/Formats/Impl/ParquetMetadataCache.cpp b/src/Processors/Formats/Impl/ParquetMetadataCache.cpp new file mode 100644 index 000000000000..dce344a0554b --- /dev/null +++ b/src/Processors/Formats/Impl/ParquetMetadataCache.cpp @@ -0,0 +1,128 @@ +#include + +#if USE_PARQUET + +namespace CurrentMetrics +{ +extern const Metric ParquetMetadataCacheBytes; +extern const Metric ParquetMetadataCacheFiles; +} + +namespace ProfileEvents +{ +extern const Event ParquetMetadataCacheWeightLost; +} + +namespace DB +{ + +bool ParquetMetadataCacheKey::operator==(const ParquetMetadataCacheKey & other) const +{ + return file_path == other.file_path && etag == other.etag; +} + +size_t ParquetMetadataCacheKeyHash::operator()(const ParquetMetadataCacheKey & key) const +{ + size_t hash = 0; + boost::hash_combine(hash, CityHash_v1_0_2::CityHash64(key.file_path.data(), key.file_path.size())); + boost::hash_combine(hash, CityHash_v1_0_2::CityHash64(key.etag.data(), key.etag.size())); + return hash; +} + +ParquetMetadataCacheCell::ParquetMetadataCacheCell(parquet::format::FileMetaData metadata_) + : metadata(std::move(metadata_)) + , memory_bytes(calculateMemorySize() + SIZE_IN_MEMORY_OVERHEAD) +{ +} +size_t ParquetMetadataCacheCell::calculateMemorySize() const +{ + size_t total_size = sizeof(metadata); + + // SchemaElements parquet::format::SchemaElement + for (const parquet::format::SchemaElement & element : metadata.schema) + { + total_size += sizeof(element); + total_size += element.name.capacity(); + } + + // RowGroups parquet::format::RowGroup + for (const parquet::format::RowGroup & row_group : metadata.row_groups) + { + total_size += sizeof(row_group); + total_size += row_group.sorting_columns.capacity() * sizeof(parquet::format::SortingColumn); + for (const parquet::format::ColumnChunk & column_chunk : row_group.columns) + { + total_size += sizeof(column_chunk); + + total_size += column_chunk.file_path.capacity(); + total_size += column_chunk.encrypted_column_metadata.capacity(); + + // ColumnMetaData parquet::format::ColumnMetaData + for (const auto & path : column_chunk.meta_data.path_in_schema) + total_size += path.capacity(); + + for (const auto & kv : column_chunk.meta_data.key_value_metadata) + total_size += kv.key.capacity() + kv.value.capacity(); + + total_size += column_chunk.meta_data.encodings.capacity() * sizeof(parquet::format::Encoding::type); + total_size += column_chunk.meta_data.encoding_stats.capacity() * sizeof(parquet::format::PageEncodingStats); + total_size += column_chunk.meta_data.geospatial_statistics.geospatial_types.capacity() * sizeof(int32_t); + + total_size += column_chunk.meta_data.statistics.min_value.capacity(); + total_size += column_chunk.meta_data.statistics.max_value.capacity(); + total_size += column_chunk.meta_data.statistics.min.capacity(); + total_size += column_chunk.meta_data.statistics.max.capacity(); + + total_size += column_chunk.meta_data.size_statistics.repetition_level_histogram.capacity() * sizeof(int64_t); + total_size += column_chunk.meta_data.size_statistics.definition_level_histogram.capacity() * sizeof(int64_t); + } + } + + // KeyValueMetadata parquet::format::KeyValueMetadata + for (const auto & kv : metadata.key_value_metadata) + total_size += kv.key.capacity() + kv.value.capacity(); + + // ColumnOrder parquet::format::ColumnOrders + for (const auto & order : metadata.column_orders) + total_size += sizeof(order); + + // Top-level fields + total_size += metadata.created_by.capacity(); + total_size += metadata.footer_signing_key_metadata.capacity(); + + /// String fields for metadata.encryption_algorithm.* generally are either very small + /// within SSO limits or are empty in the case where we don't use encryption at all + /// so I'm skipping accounting for them as they'll fall within our SIZE_IN_MEMORY_OVERHEAD + return total_size; +} + +size_t ParquetMetadataCacheWeightFunction::operator()(const ParquetMetadataCacheCell & cell) const +{ + return cell.memory_bytes; +} + +ParquetMetadataCache::ParquetMetadataCache( + const String & cache_policy, + size_t max_size_in_bytes, + size_t max_count, + double size_ratio) + : Base(cache_policy, + CurrentMetrics::ParquetMetadataCacheBytes, + CurrentMetrics::ParquetMetadataCacheFiles, + max_size_in_bytes, + max_count, + size_ratio) + , log(getLogger("ParquetMetadataCache")) +{ +} +ParquetMetadataCacheKey ParquetMetadataCache::createKey(const String & file_path, const String & file_attr) +{ + return ParquetMetadataCacheKey{file_path, file_attr}; +} +void ParquetMetadataCache::onEntryRemoval(const size_t weight_loss, const MappedPtr &) +{ + LOG_TRACE(log, "cache eviction"); + ProfileEvents::increment(ProfileEvents::ParquetMetadataCacheWeightLost, weight_loss); +} +} +#endif diff --git a/src/Processors/Formats/Impl/ParquetMetadataCache.h b/src/Processors/Formats/Impl/ParquetMetadataCache.h new file mode 100644 index 000000000000..3306c555562d --- /dev/null +++ b/src/Processors/Formats/Impl/ParquetMetadataCache.h @@ -0,0 +1,101 @@ +#pragma once +#include "config.h" + +#if USE_PARQUET + +#include +#include +#include +#include +#include +#include +#include +#include +#include +#include +#include + +#if USE_AWS_S3 +#include +#include +#endif + +namespace ProfileEvents +{ + extern const Event ParquetMetadataCacheMisses; + extern const Event ParquetMetadataCacheHits; +} + +namespace DB +{ + +struct ParquetMetadataCacheKey +{ + String file_path; + String etag; + bool operator==(const ParquetMetadataCacheKey & other) const; +}; + +/// Hash function for ParquetMetadataCacheKey +struct ParquetMetadataCacheKeyHash +{ + size_t operator()(const ParquetMetadataCacheKey & key) const; +}; + +/// Cache cell containing Parquet metadata +struct ParquetMetadataCacheCell : private boost::noncopyable +{ + parquet::format::FileMetaData metadata; + UInt64 memory_bytes; + explicit ParquetMetadataCacheCell(parquet::format::FileMetaData metadata_); +private: + static constexpr size_t SIZE_IN_MEMORY_OVERHEAD = 200; + size_t calculateMemorySize() const; +}; + +/// Weight function for metadata cache +struct ParquetMetadataCacheWeightFunction +{ + size_t operator()(const ParquetMetadataCacheCell & cell) const; +}; + +/// Parquet metadata cache +class ParquetMetadataCache : public CacheBase +{ +public: + using Base = CacheBase; + ParquetMetadataCache(const String & cache_policy, size_t max_size_in_bytes, size_t max_count, double size_ratio); + static ParquetMetadataCacheKey createKey(const String & file_path, const String & file_attr); + /// Get or load Parquet metadata with caching + template + parquet::format::FileMetaData getOrSetMetadata(const ParquetMetadataCacheKey & key, LoadFunc && load_fn) + { + auto load_fn_wrapper = [&]() + { + auto metadata = load_fn(); + LOG_TRACE(log, "got metadata from cache {} | {}", key.file_path, key.etag); + return std::make_shared(std::move(metadata)); + }; + auto result = Base::getOrSet(key, load_fn_wrapper); + if (result.second) + { + LOG_TRACE(log, "cache miss {} | {}", key.file_path, key.etag); + ProfileEvents::increment(ProfileEvents::ParquetMetadataCacheMisses); + } + else + { + LOG_TRACE(log, "cache hit {} | {}", key.file_path, key.etag); + ProfileEvents::increment(ProfileEvents::ParquetMetadataCacheHits); + } + return result.first->metadata; + } + +private: + LoggerPtr log; + /// Called for each individual entry being evicted from cache + void onEntryRemoval(size_t weight_loss, const MappedPtr &) override; +}; + +using ParquetMetadataCachePtr = std::shared_ptr; +} +#endif diff --git a/src/Processors/Formats/Impl/ParquetMetadataInputFormat.cpp b/src/Processors/Formats/Impl/ParquetMetadataInputFormat.cpp index f7c2dab49db1..66c11d637e69 100644 --- a/src/Processors/Formats/Impl/ParquetMetadataInputFormat.cpp +++ b/src/Processors/Formats/Impl/ParquetMetadataInputFormat.cpp @@ -24,15 +24,8 @@ #include #include #include -#include -namespace ProfileEvents -{ -extern const Event ParquetMetaDataCacheHits; -extern const Event ParquetMetaDataCacheMisses; -} - namespace DB { @@ -41,11 +34,6 @@ namespace ErrorCodes extern const int BAD_ARGUMENTS; } -namespace Setting -{ -extern const SettingsBool input_format_parquet_use_metadata_cache; -} - static NamesAndTypesList getHeaderForParquetMetadata() { NamesAndTypesList names_and_types{ @@ -144,35 +132,10 @@ void checkHeader(const Block & header) static std::shared_ptr getFileMetadata( ReadBuffer & in, const FormatSettings & format_settings, - std::atomic & is_stopped, - ParquetMetadataInputFormat::Cache metadata_cache) + std::atomic & is_stopped) { - // in-memory cache is not implemented for local file operations, only for remote files - // there is a chance the user sets `input_format_parquet_use_metadata_cache=1` for a local file operation - // and the cache_key won't be set. Therefore, we also need to check for metadata_cache.key - if (!metadata_cache.use_cache || metadata_cache.key.empty()) - { - auto arrow_file = asArrowFile(in, format_settings, is_stopped, "Parquet", PARQUET_MAGIC_BYTES, /* avoid_buffering */ true); - return parquet::ReadMetaData(arrow_file); - } - - auto [parquet_file_metadata, loaded] = ParquetFileMetaDataCache::instance()->getOrSet( - metadata_cache.key, - [&]() - { - auto arrow_file = asArrowFile(in, format_settings, is_stopped, "Parquet", PARQUET_MAGIC_BYTES, /* avoid_buffering */ true); - return parquet::ReadMetaData(arrow_file); - } - ); - - if (loaded) - ProfileEvents::increment(ProfileEvents::ParquetMetaDataCacheMisses); - else - ProfileEvents::increment(ProfileEvents::ParquetMetaDataCacheHits); - - return parquet_file_metadata; - - + auto arrow_file = asArrowFile(in, format_settings, is_stopped, "Parquet", PARQUET_MAGIC_BYTES, /* avoid_buffering */ true); + return parquet::ReadMetaData(arrow_file); } ParquetMetadataInputFormat::ParquetMetadataInputFormat(ReadBuffer & in_, SharedHeader header_, const FormatSettings & format_settings_) @@ -187,7 +150,7 @@ Chunk ParquetMetadataInputFormat::read() if (done) return res; - auto metadata = getFileMetadata(*in, format_settings, is_stopped, metadata_cache); + auto metadata = getFileMetadata(*in, format_settings, is_stopped); const auto & header = getPort().getHeader(); auto names_and_types = getHeaderForParquetMetadata(); @@ -528,12 +491,6 @@ void ParquetMetadataInputFormat::resetParser() done = false; } -void ParquetMetadataInputFormat::setStorageRelatedUniqueKey(const Settings & settings, const String & key_) -{ - metadata_cache.key = key_; - metadata_cache.use_cache = settings[Setting::input_format_parquet_use_metadata_cache]; -} - ParquetMetadataSchemaReader::ParquetMetadataSchemaReader(ReadBuffer & in_) : ISchemaReader(in_) { diff --git a/src/Processors/Formats/Impl/ParquetMetadataInputFormat.h b/src/Processors/Formats/Impl/ParquetMetadataInputFormat.h index 6b667dcc5b1e..81cf7890ee7e 100644 --- a/src/Processors/Formats/Impl/ParquetMetadataInputFormat.h +++ b/src/Processors/Formats/Impl/ParquetMetadataInputFormat.h @@ -62,14 +62,6 @@ class ParquetMetadataInputFormat : public IInputFormat void resetParser() override; - void setStorageRelatedUniqueKey(const Settings & settings, const String & key_) override; - - struct Cache - { - String key; - bool use_cache = false; - }; - private: Chunk read() override; @@ -86,8 +78,6 @@ class ParquetMetadataInputFormat : public IInputFormat const FormatSettings format_settings; bool done = false; std::atomic is_stopped{0}; - - Cache metadata_cache; }; class ParquetMetadataSchemaReader : public ISchemaReader diff --git a/src/Processors/Formats/Impl/ParquetV3BlockInputFormat.cpp b/src/Processors/Formats/Impl/ParquetV3BlockInputFormat.cpp index cea175837710..f0398658408b 100644 --- a/src/Processors/Formats/Impl/ParquetV3BlockInputFormat.cpp +++ b/src/Processors/Formats/Impl/ParquetV3BlockInputFormat.cpp @@ -37,12 +37,16 @@ ParquetV3BlockInputFormat::ParquetV3BlockInputFormat( const FormatSettings & format_settings_, FormatParserSharedResourcesPtr parser_shared_resources_, FormatFilterInfoPtr format_filter_info_, - size_t min_bytes_for_seek) + size_t min_bytes_for_seek, + ParquetMetadataCachePtr metadata_cache_, + const std::optional & object_with_metadata_) : IInputFormat(header_, &buf) , format_settings(format_settings_) , read_options(convertReadOptions(format_settings)) , parser_shared_resources(parser_shared_resources_) , format_filter_info(format_filter_info_) + , metadata_cache(metadata_cache_) + , object_with_metadata(object_with_metadata_) { read_options.min_bytes_for_seek = min_bytes_for_seek; read_options.bytes_per_read_task = min_bytes_for_seek * 4; @@ -94,12 +98,29 @@ void ParquetV3BlockInputFormat::initializeIfNeeded() std::lock_guard lock(reader_mutex); reader.emplace(); reader->reader.prefetcher.init(in, read_options, parser_shared_resources); + reader->reader.file_metadata = getFileMetadata(reader->reader.prefetcher); reader->reader.init(read_options, getPort().getHeader(), format_filter_info); reader->init(parser_shared_resources, buckets_to_read ? std::optional(buckets_to_read->row_group_ids) : std::nullopt); } } } +parquet::format::FileMetaData ParquetV3BlockInputFormat::getFileMetadata(Parquet::Prefetcher & prefetcher) const +{ + if (metadata_cache && object_with_metadata.has_value() && object_with_metadata->metadata.has_value()) + { + String file_name = object_with_metadata->getPath(); + String etag = object_with_metadata->metadata->etag; + ParquetMetadataCacheKey cache_key = ParquetMetadataCache::createKey(file_name, etag); + return metadata_cache->getOrSetMetadata( + cache_key, [&]() { return Parquet::Reader::readFileMetaData(prefetcher); }); + } + else + { + return Parquet::Reader::readFileMetaData(prefetcher); + } +} + Chunk ParquetV3BlockInputFormat::read() { if (need_only_count) @@ -110,7 +131,8 @@ Chunk ParquetV3BlockInputFormat::read() /// Don't init Reader and ReadManager if we only need file metadata. Parquet::Prefetcher temp_prefetcher; temp_prefetcher.init(in, read_options, parser_shared_resources); - auto file_metadata = Parquet::Reader::readFileMetaData(temp_prefetcher); + parquet::format::FileMetaData file_metadata = getFileMetadata(temp_prefetcher); + auto chunk = getChunkForCount(size_t(file_metadata.num_rows)); chunk.getChunkInfos().add(std::make_shared(0)); @@ -155,9 +177,9 @@ void ParquetV3BlockInputFormat::resetParser() IInputFormat::resetParser(); } -NativeParquetSchemaReader::NativeParquetSchemaReader(ReadBuffer & in_, const FormatSettings & format_settings) +NativeParquetSchemaReader::NativeParquetSchemaReader(ReadBuffer & in_, const FormatSettings & format_settings_) : ISchemaReader(in_) - , read_options(convertReadOptions(format_settings)) + , read_options(convertReadOptions(format_settings_)) { } diff --git a/src/Processors/Formats/Impl/ParquetV3BlockInputFormat.h b/src/Processors/Formats/Impl/ParquetV3BlockInputFormat.h index e90559d93d90..dffd7c28605e 100644 --- a/src/Processors/Formats/Impl/ParquetV3BlockInputFormat.h +++ b/src/Processors/Formats/Impl/ParquetV3BlockInputFormat.h @@ -6,6 +6,7 @@ #include #include #include +#include #include namespace DB @@ -20,7 +21,9 @@ class ParquetV3BlockInputFormat : public IInputFormat const FormatSettings & format_settings, FormatParserSharedResourcesPtr parser_shared_resources_, FormatFilterInfoPtr format_filter_info_, - size_t min_bytes_for_seek); + size_t min_bytes_for_seek, + ParquetMetadataCachePtr metadata_cache_ = nullptr, + const std::optional & object_with_metadata_ = std::nullopt); void resetParser() override; @@ -44,6 +47,8 @@ class ParquetV3BlockInputFormat : public IInputFormat Parquet::ReadOptions read_options; FormatParserSharedResourcesPtr parser_shared_resources; FormatFilterInfoPtr format_filter_info; + ParquetMetadataCachePtr metadata_cache; + const std::optional object_with_metadata; /// (This mutex is not important. It protects `reader.emplace` in a weird case where onCancel() /// may be called in parallel with first read(). ReadManager itself is thread safe for that, @@ -58,6 +63,8 @@ class ParquetV3BlockInputFormat : public IInputFormat void initializeIfNeeded(); std::shared_ptr buckets_to_read; + + parquet::format::FileMetaData getFileMetadata(Parquet::Prefetcher & prefetcher) const; }; class NativeParquetSchemaReader : public ISchemaReader @@ -72,7 +79,7 @@ class NativeParquetSchemaReader : public ISchemaReader void initializeIfNeeded(); Parquet::ReadOptions read_options; - Parquet::parq::FileMetaData file_metadata; + parquet::format::FileMetaData file_metadata; bool initialized = false; }; diff --git a/src/Server/ArrowFlightHandler.cpp b/src/Server/ArrowFlightHandler.cpp index 77877a329a31..e7a4e4888b1f 100644 --- a/src/Server/ArrowFlightHandler.cpp +++ b/src/Server/ArrowFlightHandler.cpp @@ -972,7 +972,7 @@ arrow::Status ArrowFlightHandler::GetFlightInfo( total_bytes += block.bytes(); auto ticket_info = calls_data->createTicket(std::make_shared(std::move(block)), ch_to_arrow_converter); arrow::flight::FlightEndpoint endpoint; - endpoint.ticket = arrow::flight::Ticket{.ticket = ticket_info->ticket}; + endpoint.ticket = arrow::flight::Ticket{ticket_info->ticket}; endpoint.expiration_time = ticket_info->expiration_time; endpoints.emplace_back(endpoint); } @@ -1132,7 +1132,7 @@ arrow::Status ArrowFlightHandler::PollFlightInfo( if (poll_info->ticket) { arrow::flight::FlightEndpoint endpoint; - endpoint.ticket = arrow::flight::Ticket{.ticket = *poll_info->ticket}; + endpoint.ticket = arrow::flight::Ticket{*poll_info->ticket}; endpoint.expiration_time = calls_data->getTicketExpirationTime(*poll_info->ticket); endpoints.emplace_back(endpoint); } diff --git a/src/Storages/Hive/HiveFile.cpp b/src/Storages/Hive/HiveFile.cpp index c44349b4ac1e..9677cfc69bde 100644 --- a/src/Storages/Hive/HiveFile.cpp +++ b/src/Storages/Hive/HiveFile.cpp @@ -285,7 +285,9 @@ void HiveParquetFile::prepareReader() in = std::make_unique(namenode_url, path, getContext()->getGlobalContext()->getConfigRef(), getContext()->getReadSettings()); auto format_settings = getFormatSettings(getContext()); std::atomic is_stopped{0}; - THROW_ARROW_NOT_OK(parquet::arrow::OpenFile(asArrowFile(*in, format_settings, is_stopped, "Parquet", PARQUET_MAGIC_BYTES), arrow::default_memory_pool(), &reader)); + auto open_file_res = parquet::arrow::OpenFile(asArrowFile(*in, format_settings, is_stopped, "Parquet", PARQUET_MAGIC_BYTES), arrow::default_memory_pool()); + THROW_ARROW_NOT_OK(open_file_res.status()); + reader = *std::move(open_file_res); } void HiveParquetFile::loadSplitMinMaxIndexesImpl() diff --git a/src/Storages/ObjectStorage/DataLakes/DeltaLakeMetadata.cpp b/src/Storages/ObjectStorage/DataLakes/DeltaLakeMetadata.cpp index 4eb9e1f095ff..66227ae68caa 100644 --- a/src/Storages/ObjectStorage/DataLakes/DeltaLakeMetadata.cpp +++ b/src/Storages/ObjectStorage/DataLakes/DeltaLakeMetadata.cpp @@ -498,12 +498,10 @@ struct DeltaLakeMetadataImpl std::atomic is_stopped{0}; - std::unique_ptr reader; - THROW_ARROW_NOT_OK( - parquet::arrow::OpenFile( - asArrowFile(*buf, format_settings, is_stopped, "Parquet", PARQUET_MAGIC_BYTES), - arrow::default_memory_pool(), - &reader)); + auto open_file_res = parquet::arrow::OpenFile( + asArrowFile(*buf, format_settings, is_stopped, "Parquet", PARQUET_MAGIC_BYTES), arrow::default_memory_pool()); + THROW_ARROW_NOT_OK(open_file_res.status()); + auto reader = *std::move(open_file_res); ArrowColumnToCHColumn column_reader( header, "Parquet", diff --git a/src/Storages/ObjectStorage/StorageObjectStorageSource.cpp b/src/Storages/ObjectStorage/StorageObjectStorageSource.cpp index d3ff0c5a6906..d90ea0030aac 100644 --- a/src/Storages/ObjectStorage/StorageObjectStorageSource.cpp +++ b/src/Storages/ObjectStorage/StorageObjectStorageSource.cpp @@ -19,6 +19,7 @@ #include #include #include +#include #include #include #include @@ -79,6 +80,8 @@ namespace Setting extern const SettingsUInt64 s3_path_filter_limit; extern const SettingsBool allow_experimental_iceberg_read_optimization; extern const SettingsBool use_object_storage_list_objects_cache; + extern const SettingsBool use_parquet_metadata_cache; + extern const SettingsBool input_format_parquet_use_native_reader_v3; } namespace ErrorCodes @@ -856,6 +859,8 @@ StorageObjectStorageSource::ReaderHolder StorageObjectStorageSource::createReade return std::make_shared(format_filter_info->filter_actions_dag, format_filter_info->context.lock(), mapper, nullptr, nullptr); }(); + chassert(object_info->getObjectMetadata().has_value()); + LOG_DEBUG( log, "Reading object '{}', size: {} bytes, with format: {}", @@ -863,7 +868,36 @@ StorageObjectStorageSource::ReaderHolder StorageObjectStorageSource::createReade object_info->getObjectMetadata()->size_bytes, object_info->getFileFormat().value_or(configuration->getFormat())); - auto input_format = FormatFactory::instance().getInput( + bool use_native_reader_v3 = format_settings.has_value() + ? format_settings->parquet.use_native_reader_v3 + : context_->getSettingsRef()[Setting::input_format_parquet_use_native_reader_v3]; + + InputFormatPtr input_format; + if (context_->getSettingsRef()[Setting::use_parquet_metadata_cache] && use_native_reader_v3 + && (object_info->getFileFormat().value_or(configuration->getFormat()) == "Parquet") + && !object_info->getObjectMetadata()->etag.empty()) + { + const std::optional object_with_metadata = object_info->relative_path_with_metadata; + input_format = FormatFactory::instance().getInputWithMetadata( + object_info->getFileFormat().value_or(configuration->getFormat()), + *read_buf, + initial_header, + context_, + max_block_size, + object_with_metadata, + format_settings, + parser_shared_resources, + filter_info, + true /* is_remote_fs */, + compression_method, + need_only_count, + std::nullopt /*min_block_size_bytes*/, + std::nullopt /*min_block_size_rows*/, + std::nullopt /*max_block_size_bytes*/); + } + else + { + input_format = FormatFactory::instance().getInput( object_info->getFileFormat().value_or(configuration->getFormat()), *read_buf, initial_header, @@ -875,6 +909,7 @@ StorageObjectStorageSource::ReaderHolder StorageObjectStorageSource::createReade true /* is_remote_fs */, compression_method, need_only_count); + } input_format->setBucketsToRead(object_info->file_bucket_info); input_format->setSerializationHints(read_from_format_info.serialization_hints); @@ -882,14 +917,6 @@ StorageObjectStorageSource::ReaderHolder StorageObjectStorageSource::createReade if (need_only_count) input_format->needOnlyCount(); - if (!object_info->getPath().empty()) - { - if (const auto & metadata = object_info->relative_path_with_metadata.metadata) - { - input_format->setStorageRelatedUniqueKey(context_->getSettingsRef(), object_info->getPath() + ":" + metadata->etag); - } - } - builder.init(Pipe(input_format)); configuration->addDeleteTransformers(object_info, builder, format_settings, parser_shared_resources, context_); @@ -1001,9 +1028,10 @@ std::unique_ptr createReadBuffer( || object_storage->getType() == ObjectStorageType::S3); } - /// We need object metadata for two cases: + /// We need object metadata for a few use cases: /// 1. object size suggests whether we need to use prefetch /// 2. object etag suggests a cache key in case we use filesystem cache + /// 3. object etag as a cache key for parquet metadata caching if (!object_info.metadata) object_info.metadata = object_storage->getObjectMetadata(object_info.getPath(), /*with_tags=*/ false); diff --git a/src/Storages/System/StorageSystemFormats.cpp b/src/Storages/System/StorageSystemFormats.cpp index 7be5139b08c7..eb3be6e23427 100644 --- a/src/Storages/System/StorageSystemFormats.cpp +++ b/src/Storages/System/StorageSystemFormats.cpp @@ -36,11 +36,11 @@ void StorageSystemFormats::fillData(MutableColumns & res_columns, ContextPtr con const auto & [name, creators] = pair; String format_name = creators.name; - bool has_input_format(creators.input_creator != nullptr || creators.random_access_input_creator != nullptr); + bool has_input_format(creators.input_creator != nullptr || creators.random_access_input_creator != nullptr || creators.random_access_input_creator_with_metadata != nullptr); bool has_output_format(creators.output_creator != nullptr); bool supports_parallel_parsing(creators.file_segmentation_engine_creator != nullptr || creators.random_access_input_creator != nullptr); String content_type = has_output_format ? FormatFactory::instance().getContentType(format_name, settings) : ""; - bool supports_random_access = creators.random_access_input_creator != nullptr; + bool supports_random_access = creators.random_access_input_creator != nullptr || creators.random_access_input_creator_with_metadata != nullptr; bool has_schema_inference = creators.schema_reader_creator != nullptr; bool has_external_schema = creators.external_schema_reader_creator != nullptr; bool prefers_large_blocks = creators.prefers_large_blocks; diff --git a/tests/integration/test_s3_cluster/test.py b/tests/integration/test_s3_cluster/test.py index 77626af827ee..b698ce4a79d5 100644 --- a/tests/integration/test_s3_cluster/test.py +++ b/tests/integration/test_s3_cluster/test.py @@ -951,7 +951,7 @@ def test_hive_partitioning(started_cluster, allow_experimental_analyzer): FROM s3('http://minio1:9001/root/data/hive/key=**.parquet', 'minio', '{minio_secret_key}', 'Parquet', 'key Int32, value Int32') WHERE key <= 2 FORMAT TSV - SETTINGS enable_filesystem_cache = 0, use_query_cache = 0, use_cache_for_count_from_files = 0, use_hive_partitioning = 0 + SETTINGS enable_filesystem_cache = 0, use_query_cache = 0, use_cache_for_count_from_files = 0, use_hive_partitioning = 0, use_parquet_metadata_cache = 0 """, query_id=query_id_full, ) @@ -965,7 +965,7 @@ def test_hive_partitioning(started_cluster, allow_experimental_analyzer): FROM s3('http://minio1:9001/root/data/hive/key=**.parquet', 'minio', '{minio_secret_key}', 'Parquet', 'key Int32, value Int32') WHERE key <= 2 FORMAT TSV - SETTINGS enable_filesystem_cache = 0, use_query_cache = 0, use_cache_for_count_from_files = 0, use_hive_partitioning = 1 + SETTINGS enable_filesystem_cache = 0, use_query_cache = 0, use_cache_for_count_from_files = 0, use_hive_partitioning = 1, use_parquet_metadata_cache = 0 """, query_id=query_id_optimized, ) @@ -979,7 +979,7 @@ def test_hive_partitioning(started_cluster, allow_experimental_analyzer): FROM s3Cluster(cluster_simple, 'http://minio1:9001/root/data/hive/key=**.parquet', 'minio', '{minio_secret_key}', 'Parquet', 'key Int32, value Int32') WHERE key <= 2 FORMAT TSV - SETTINGS enable_filesystem_cache = 0, use_query_cache = 0, use_cache_for_count_from_files = 0, use_hive_partitioning = 0 + SETTINGS enable_filesystem_cache = 0, use_query_cache = 0, use_cache_for_count_from_files = 0, use_hive_partitioning = 0, use_parquet_metadata_cache = 0 """, query_id=query_id_cluster_full, ) @@ -993,7 +993,7 @@ def test_hive_partitioning(started_cluster, allow_experimental_analyzer): FROM s3Cluster(cluster_simple, 'http://minio1:9001/root/data/hive/key=**.parquet', 'minio', '{minio_secret_key}', 'Parquet', 'key Int32, value Int32') WHERE key <= 2 FORMAT TSV - SETTINGS enable_filesystem_cache = 0, use_query_cache = 0, use_cache_for_count_from_files = 0, use_hive_partitioning = 1 + SETTINGS enable_filesystem_cache = 0, use_query_cache = 0, use_cache_for_count_from_files = 0, use_hive_partitioning = 1, use_parquet_metadata_cache = 0 """, query_id=query_id_cluster_optimized, ) diff --git a/tests/integration/test_storage_delta/configs/users.d/disable_parquet_metadata_caching.xml b/tests/integration/test_storage_delta/configs/users.d/disable_parquet_metadata_caching.xml deleted file mode 100644 index bc34464e30da..000000000000 --- a/tests/integration/test_storage_delta/configs/users.d/disable_parquet_metadata_caching.xml +++ /dev/null @@ -1,7 +0,0 @@ - - - - 0 - - - diff --git a/tests/integration/test_storage_delta/test.py b/tests/integration/test_storage_delta/test.py index 639e70102fcd..fe59f3c5c96d 100644 --- a/tests/integration/test_storage_delta/test.py +++ b/tests/integration/test_storage_delta/test.py @@ -110,7 +110,6 @@ def started_cluster(): user_configs=[ "configs/users.d/users.xml", "configs/users.d/enable_writes.xml", - "configs/users.d/disable_parquet_metadata_caching.xml", ], env_variables={ "RUST_BACKTRACE": "1", @@ -132,7 +131,6 @@ def started_cluster(): user_configs=[ "configs/users.d/users.xml", "configs/users.d/enable_writes.xml", - "configs/users.d/disable_parquet_metadata_caching.xml", ], with_minio=True, stay_alive=True, @@ -183,7 +181,6 @@ def started_cluster(): user_configs=[ "configs/users.d/users.xml", "configs/users.d/disabled_delta_kernel.xml", - "configs/users.d/disable_parquet_metadata_caching.xml", ], with_minio=True, with_azurite=True, @@ -1386,7 +1383,7 @@ def test_session_token(started_cluster): parquet_data_path = create_initial_data_file( started_cluster, instance, - "SELECT toUInt64(number), toString(number) FROM numbers(100) SETTINGS input_format_parquet_use_metadata_cache=0", + "SELECT toUInt64(number), toString(number) FROM numbers(100)", TABLE_NAME, node_name=node_name, ) @@ -1399,7 +1396,7 @@ def test_session_token(started_cluster): f""" SELECT count() FROM deltaLake( 'http://{started_cluster.minio_host}:{started_cluster.minio_port}/{started_cluster.minio_bucket}/{TABLE_NAME}/', - SETTINGS allow_experimental_delta_kernel_rs=1, input_format_parquet_use_metadata_cache=0) + SETTINGS allow_experimental_delta_kernel_rs=1) """ ) ) @@ -2336,7 +2333,7 @@ def test_column_pruning(started_cluster): query_id = f"query_{TABLE_NAME}_2" assert sum == int( instance.query( - f"SELECT sum(id) FROM {table_function} SETTINGS enable_filesystem_cache=0, max_read_buffer_size_remote_fs=100, remote_read_min_bytes_for_seek=1, input_format_parquet_use_native_reader_v3=1", + f"SELECT sum(id) FROM {table_function} SETTINGS enable_filesystem_cache=0, max_read_buffer_size_remote_fs=100, remote_read_min_bytes_for_seek=1, input_format_parquet_use_native_reader_v3=1, use_parquet_metadata_cache=0", query_id=query_id, ) ) diff --git a/tests/queries/0_stateless/02735_parquet_encoder.reference b/tests/queries/0_stateless/02735_parquet_encoder.reference index d26a331ec2fb..c89b3a6c5300 100644 --- a/tests/queries/0_stateless/02735_parquet_encoder.reference +++ b/tests/queries/0_stateless/02735_parquet_encoder.reference @@ -43,7 +43,7 @@ datetime DateTime64(3, \'UTC\') (1000000,NULL,NULL,'0','-1294970296') (1000000,NULL,NULL,'-2147483296','2147481000') (100000,900000,NULL,'100009','999999') -[(2,NULL,NULL,'','[]')] +[(2,NULL,NULL,NULL,NULL)] 1 1 0 1 5090915589685802007 diff --git a/tests/queries/0_stateless/02995_settings_25_11_1.tsv b/tests/queries/0_stateless/02995_settings_25_11_1.tsv index 0969de0f86de..1fc5f4b2f8b1 100644 --- a/tests/queries/0_stateless/02995_settings_25_11_1.tsv +++ b/tests/queries/0_stateless/02995_settings_25_11_1.tsv @@ -646,7 +646,6 @@ input_format_parquet_case_insensitive_column_matching 0 input_format_parquet_enable_json_parsing 1 input_format_parquet_enable_row_group_prefetch 1 input_format_parquet_filter_push_down 1 -input_format_parquet_use_metadata_cache 1 input_format_parquet_import_nested 0 input_format_parquet_local_file_min_bytes_for_seek 8192 input_format_parquet_local_time_as_utc 1 diff --git a/tests/queries/0_stateless/03299_parquet_object_storage_metadata_cache.reference b/tests/queries/0_stateless/03299_parquet_object_storage_metadata_cache.reference deleted file mode 100644 index c87ad9008b60..000000000000 --- a/tests/queries/0_stateless/03299_parquet_object_storage_metadata_cache.reference +++ /dev/null @@ -1,8 +0,0 @@ -10 -10 -10 -10 -10 -10 -0 -10 diff --git a/tests/queries/0_stateless/03299_parquet_object_storage_metadata_cache.sql b/tests/queries/0_stateless/03299_parquet_object_storage_metadata_cache.sql deleted file mode 100644 index c82b3f4dc0e3..000000000000 --- a/tests/queries/0_stateless/03299_parquet_object_storage_metadata_cache.sql +++ /dev/null @@ -1,63 +0,0 @@ --- Tags: no-parallel, no-fasttest, no-parallel-replicas - -SET input_format_parquet_use_native_reader_v3=0; - -DROP TABLE IF EXISTS t_parquet_03262; - -CREATE TABLE t_parquet_03262 (a UInt64) -ENGINE = S3(s3_conn, filename = 'test_03262_{_partition_id}', format = Parquet) -PARTITION BY a; - -INSERT INTO t_parquet_03262 SELECT number FROM numbers(10) SETTINGS s3_truncate_on_insert=1; - -SELECT COUNT(*) -FROM s3(s3_conn, filename = 'test_03262_*', format = Parquet) -SETTINGS input_format_parquet_use_metadata_cache=1, use_query_condition_cache=0,optimize_count_from_files=0; - -SELECT COUNT(*) -FROM s3(s3_conn, filename = 'test_03262_*', format = Parquet) -SETTINGS input_format_parquet_use_metadata_cache=1, use_query_condition_cache=0, optimize_count_from_files=0, log_comment='test_03262_parquet_metadata_cache'; - -SELECT COUNT(*) -FROM s3(s3_conn, filename = 'test_03262_*', format = ParquetMetadata) -SETTINGS input_format_parquet_use_metadata_cache=1, use_query_condition_cache=0, optimize_count_from_files=0, log_comment='test_03262_parquet_metadata_format_metadata_cache'; - -SYSTEM FLUSH LOGS; - -SELECT ProfileEvents['ParquetMetaDataCacheHits'] -FROM system.query_log -where log_comment = 'test_03262_parquet_metadata_cache' -AND type = 'QueryFinish' -ORDER BY event_time desc -LIMIT 1 SETTINGS use_query_condition_cache=0; - -SELECT ProfileEvents['ParquetMetaDataCacheHits'] -FROM system.query_log -where log_comment = 'test_03262_parquet_metadata_format_metadata_cache' -AND type = 'QueryFinish' -ORDER BY event_time desc -LIMIT 1 SETTINGS use_query_condition_cache=0; - -SYSTEM DROP PARQUET METADATA CACHE; - -SELECT COUNT(*) -FROM s3(s3_conn, filename = 'test_03262_*', format = Parquet) -SETTINGS input_format_parquet_use_metadata_cache=1, use_query_condition_cache=0, optimize_count_from_files=0, log_comment='test_03262_parquet_metadata_cache_cache_empty'; - -SYSTEM FLUSH LOGS; - -SELECT ProfileEvents['ParquetMetaDataCacheHits'] -FROM system.query_log -where log_comment = 'test_03262_parquet_metadata_cache_cache_empty' -AND type = 'QueryFinish' -ORDER BY event_time desc -LIMIT 1 SETTINGS use_query_condition_cache=0; - -SELECT ProfileEvents['ParquetMetaDataCacheMisses'] -FROM system.query_log -where log_comment = 'test_03262_parquet_metadata_cache_cache_empty' -AND type = 'QueryFinish' -ORDER BY event_time desc -LIMIT 1 SETTINGS use_query_condition_cache=0; - -DROP TABLE t_parquet_03262; diff --git a/tests/queries/0_stateless/03707_parquet_metadata_cache.reference b/tests/queries/0_stateless/03707_parquet_metadata_cache.reference new file mode 100644 index 000000000000..f0f6f1bf5b4c --- /dev/null +++ b/tests/queries/0_stateless/03707_parquet_metadata_cache.reference @@ -0,0 +1,12 @@ +1 test1 1.23 +2 test2 4.5600000000000005 +0 1 +1 TEST1 2.23 +2 TEST2 5.5600000000000005 +1 TEST1 2.23 +2 TEST2 5.5600000000000005 +1 0 +0 0 +test1 2.46 +test2 9.120000000000001 +0 1 diff --git a/tests/queries/0_stateless/03707_parquet_metadata_cache.sql b/tests/queries/0_stateless/03707_parquet_metadata_cache.sql new file mode 100644 index 000000000000..460550baf6e0 --- /dev/null +++ b/tests/queries/0_stateless/03707_parquet_metadata_cache.sql @@ -0,0 +1,74 @@ +-- Tags: no-fasttest, no-parallel, no-parallel-replicas, no-random-settings +-- no-fasttest: depends on s3 storage +-- no-parallel: cache is system-wide and tests can affect each other in unexpected way +-- no-parallel-replicas: profile events are not available on the second replica +-- no-random-settings: we need to test the interaction of specific setting combinations + +/* +Because the parquet metadata cache is system-wide, parallel runs of +SYSTEM DROP PARQUET METADATA CACHE will lead to non-deterministic results +*/ +SET log_queries = 1; +SYSTEM DROP PARQUET METADATA CACHE; + +-- Triggers caching of the file +-- should be a cache miss as we load the cache the first time +SELECT * +FROM s3(s3_conn, filename = '03707_cache_test.parquet', format = 'Parquet') +SETTINGS log_comment = '03707-first-test-query', use_parquet_metadata_cache = 1; + +SYSTEM FLUSH LOGS query_log; + +SELECT + ProfileEvents['ParquetMetadataCacheHits'] AS hits, + ProfileEvents['ParquetMetadataCacheMisses'] AS misses +FROM system.query_log +WHERE (log_comment = '03707-first-test-query') AND (type = 'QueryFinish') AND (current_database = currentDatabase()); + +-- Should be a cache hit as we use the same file +SELECT + id, + upper(name), + value + 1 +FROM s3(s3_conn, filename = '03707_cache_test.parquet', format = 'Parquet') +SETTINGS log_comment = '03707-second-test-query', use_parquet_metadata_cache = 1; + +-- Should not be a cache hit because we are not using the native v3 reader +SELECT + id, + upper(name), + value + 1 +FROM s3(s3_conn, filename = '03707_cache_test.parquet', format = 'Parquet') +SETTINGS log_comment = '03707-second-test-query-no-v3', use_parquet_metadata_cache = 1, input_format_parquet_use_native_reader_v3 = 0; + +SYSTEM FLUSH LOGS query_log; + +SELECT + ProfileEvents['ParquetMetadataCacheHits'] AS hits, + ProfileEvents['ParquetMetadataCacheMisses'] AS misses +FROM system.query_log +WHERE (log_comment = '03707-second-test-query') AND (type = 'QueryFinish') AND (current_database = currentDatabase()); + +SELECT + ProfileEvents['ParquetMetadataCacheHits'] AS hits, + ProfileEvents['ParquetMetadataCacheMisses'] AS misses +FROM system.query_log +WHERE (log_comment = '03707-second-test-query-no-v3') AND (type = 'QueryFinish') AND (current_database = currentDatabase()); + +SYSTEM DROP PARQUET METADATA CACHE; + +-- Should be back to a cache miss since we dropped the cache +SELECT + name, + value + value +FROM s3(s3_conn, filename = '03707_cache_test.parquet', format = 'Parquet') +SETTINGS log_comment = '03707-third-test-query', use_parquet_metadata_cache = 1; + +SYSTEM FLUSH LOGS query_log; + +SELECT + ProfileEvents['ParquetMetadataCacheHits'] AS hits, + ProfileEvents['ParquetMetadataCacheMisses'] AS misses +FROM system.query_log +WHERE (log_comment = '03707-third-test-query') AND (type = 'QueryFinish') AND (current_database = currentDatabase()); + diff --git a/tests/queries/0_stateless/03723_parquet_prefetcher_read_big_at.sql b/tests/queries/0_stateless/03723_parquet_prefetcher_read_big_at.sql index 157faad932c9..c6bb1af5956d 100644 --- a/tests/queries/0_stateless/03723_parquet_prefetcher_read_big_at.sql +++ b/tests/queries/0_stateless/03723_parquet_prefetcher_read_big_at.sql @@ -17,7 +17,7 @@ SELECT * FROM t_parquet_prefetcher_read_big_at ORDER BY a,c FORMAT Null -SETTINGS log_comment = 'test_03723_parquet_prefetcher_read_big_at'; +SETTINGS log_comment = 'test_03723_parquet_prefetcher_read_big_at', use_parquet_metadata_cache = 0; -- Ensure that profiling is available for analysis SYSTEM FLUSH LOGS query_log; -- Check profiling data to visualize what logic has been used diff --git a/tests/queries/0_stateless/04027_parquet_v3_setting_override_s3.reference b/tests/queries/0_stateless/04027_parquet_v3_setting_override_s3.reference new file mode 100644 index 000000000000..573541ac9702 --- /dev/null +++ b/tests/queries/0_stateless/04027_parquet_v3_setting_override_s3.reference @@ -0,0 +1 @@ +0 diff --git a/tests/queries/0_stateless/04027_parquet_v3_setting_override_s3.sql b/tests/queries/0_stateless/04027_parquet_v3_setting_override_s3.sql new file mode 100644 index 000000000000..ea89ae7d75f9 --- /dev/null +++ b/tests/queries/0_stateless/04027_parquet_v3_setting_override_s3.sql @@ -0,0 +1,20 @@ +-- Tags: no-fasttest +-- Tag no-fasttest: Depends on S3 + +-- Test that `input_format_parquet_use_native_reader_v3` setting at SELECT time +-- overrides the setting specified at CREATE TABLE time. + +SET s3_truncate_on_insert = 1; + +INSERT INTO FUNCTION s3(s3_conn, url = 'http://localhost:11111/test/04027_parquet_v3_setting_override.parquet', format = Parquet) +SELECT number AS id FROM numbers(1); + +DROP TABLE IF EXISTS t_04027_parquet_v3_override; + +CREATE TABLE t_04027_parquet_v3_override (id UInt64) +ENGINE = S3(s3_conn, url = 'http://localhost:11111/test/04027_parquet_v3_setting_override.parquet', format = Parquet) +SETTINGS input_format_parquet_use_native_reader_v3 = false; + +SELECT * FROM t_04027_parquet_v3_override ORDER BY id LIMIT 5 SETTINGS input_format_parquet_use_native_reader_v3 = true; + +DROP TABLE t_04027_parquet_v3_override; diff --git a/tests/queries/0_stateless/04034_parquet_v3_metadata_cache_no_query_context.reference b/tests/queries/0_stateless/04034_parquet_v3_metadata_cache_no_query_context.reference new file mode 100644 index 000000000000..ea4483ec305e --- /dev/null +++ b/tests/queries/0_stateless/04034_parquet_v3_metadata_cache_no_query_context.reference @@ -0,0 +1 @@ +100 4950 diff --git a/tests/queries/0_stateless/04034_parquet_v3_metadata_cache_no_query_context.sh b/tests/queries/0_stateless/04034_parquet_v3_metadata_cache_no_query_context.sh new file mode 100755 index 000000000000..90cd0dea3e48 --- /dev/null +++ b/tests/queries/0_stateless/04034_parquet_v3_metadata_cache_no_query_context.sh @@ -0,0 +1,67 @@ +#!/usr/bin/env bash +# Tags: no-fasttest +# Tag no-fasttest: Depends on S3 + +# Regression test: S3Queue background threads read Parquet files without a +# query context on CurrentThread. Before the fix, the format factory lambda +# called CurrentThread::getQueryContext()->getParquetMetadataCache() which +# crashed because getQueryContext() returned null in background threads. + +set -e + +CUR_DIR=$(cd "$(dirname "${BASH_SOURCE[0]}")" && pwd) +# shellcheck source=../shell_config.sh +. "$CUR_DIR"/../shell_config.sh + +S3_PATH="test/${CLICKHOUSE_DATABASE}_04034" + +# Write a Parquet file to S3 +$CLICKHOUSE_CLIENT -q " + INSERT INTO FUNCTION s3(s3_conn, url = 'http://localhost:11111/${S3_PATH}/data.parquet', format = Parquet) + SELECT number AS id, toString(number) AS name FROM numbers(100) + SETTINGS s3_truncate_on_insert = 1 +" + +# Create destination table +$CLICKHOUSE_CLIENT -q " + CREATE TABLE ${CLICKHOUSE_DATABASE}.dest (id UInt64, name String) + ENGINE = MergeTree ORDER BY id +" + +# Create S3Queue table reading Parquet with metadata cache + native reader v3. +# The background processing thread has no query context on CurrentThread. +$CLICKHOUSE_CLIENT --send_logs_level=error -q " + CREATE TABLE ${CLICKHOUSE_DATABASE}.queue (id UInt64, name String) + ENGINE = S3Queue('http://localhost:11111/${S3_PATH}/*.parquet', 'Parquet') + SETTINGS + keeper_path = '/clickhouse/${CLICKHOUSE_DATABASE}/04034_s3queue', + mode = 'unordered', + after_processing = 'keep', + s3queue_processing_threads_num = 1, + s3queue_polling_min_timeout_ms = 100, + s3queue_polling_max_timeout_ms = 500, + input_format_parquet_use_native_reader_v3 = 1, + use_parquet_metadata_cache = 1 +" + +# Create MV to trigger background reads from S3Queue into dest table +$CLICKHOUSE_CLIENT -q " + CREATE MATERIALIZED VIEW ${CLICKHOUSE_DATABASE}.mv TO ${CLICKHOUSE_DATABASE}.dest + AS SELECT id, name FROM ${CLICKHOUSE_DATABASE}.queue +" + +# Wait for S3Queue to process the file (up to 30 seconds) +for _ in {1..60}; do + count=$($CLICKHOUSE_CLIENT -q "SELECT count() FROM ${CLICKHOUSE_DATABASE}.dest") + if [ "$count" -ge 100 ]; then + break + fi + sleep 0.5 +done + +$CLICKHOUSE_CLIENT -q "SELECT count(), sum(id) FROM ${CLICKHOUSE_DATABASE}.dest" + +# Cleanup +$CLICKHOUSE_CLIENT -q "DROP VIEW IF EXISTS ${CLICKHOUSE_DATABASE}.mv SYNC" +$CLICKHOUSE_CLIENT -q "DROP TABLE IF EXISTS ${CLICKHOUSE_DATABASE}.queue SYNC" +$CLICKHOUSE_CLIENT -q "DROP TABLE IF EXISTS ${CLICKHOUSE_DATABASE}.dest SYNC" diff --git a/tests/queries/0_stateless/data_minio/03707_cache_test.parquet b/tests/queries/0_stateless/data_minio/03707_cache_test.parquet new file mode 100644 index 000000000000..8616977c038d Binary files /dev/null and b/tests/queries/0_stateless/data_minio/03707_cache_test.parquet differ