diff --git a/src/Common/FailPoint.cpp b/src/Common/FailPoint.cpp index 476738471079..6e9d53efe167 100644 --- a/src/Common/FailPoint.cpp +++ b/src/Common/FailPoint.cpp @@ -29,8 +29,8 @@ static struct InitFiu /// We should define different types of failpoints here. There are four types of them: /// - ONCE: the failpoint will only be triggered once. /// - REGULAR: the failpoint will always be triggered until disableFailPoint is called. -/// - PAUSEABLE_ONCE: the failpoint will be blocked one time when pauseFailPoint is called, util disableFailPoint is called. -/// - PAUSEABLE: the failpoint will be blocked every time when pauseFailPoint is called, util disableFailPoint is called. +/// - PAUSEABLE_ONCE: the failpoint will be blocked one time when pauseFailPoint is called, until disableFailPoint is called. +/// - PAUSEABLE: the failpoint will be blocked every time when pauseFailPoint is called, until disableFailPoint is called. #define APPLY_FOR_FAILPOINTS(ONCE, REGULAR, PAUSEABLE_ONCE, PAUSEABLE) \ ONCE(replicated_merge_tree_commit_zk_fail_after_op) \ @@ -131,9 +131,13 @@ static struct InitFiu REGULAR(rmt_delay_commit_part) \ ONCE(smt_commit_exception_before_op) \ ONCE(backup_add_empty_memory_table) \ + ONCE(local_object_storage_network_error_during_remove) \ + ONCE(parallel_replicas_check_read_mode_always) \ + REGULAR(lightweight_show_tables) \ + PAUSEABLE_ONCE(drop_database_before_exclusive_ddl_lock) \ + REGULAR(storage_merge_tree_background_schedule_merge_fail) \ REGULAR(refresh_task_stop_racing_for_running_refresh) - namespace FailPoints { #define M(NAME) extern const char(NAME)[] = #NAME ""; diff --git a/src/Storages/MergeTree/IMergeTreeCleanupThread.cpp b/src/Storages/MergeTree/IMergeTreeCleanupThread.cpp new file mode 100644 index 000000000000..74af7f991c47 --- /dev/null +++ b/src/Storages/MergeTree/IMergeTreeCleanupThread.cpp @@ -0,0 +1,162 @@ +#include + +#include +#include +#include +#include + +namespace DB +{ + +namespace MergeTreeSetting +{ + extern const MergeTreeSettingsUInt64 cleanup_delay_period; + extern const MergeTreeSettingsUInt64 cleanup_delay_period_random_add; + extern const MergeTreeSettingsUInt64 cleanup_thread_preferred_points_per_iteration; + extern const MergeTreeSettingsUInt64 max_cleanup_delay_period; +} + +IMergeTreeCleanupThread::IMergeTreeCleanupThread(MergeTreeData & data_) + : data(data_) + , log_name(data.getStorageID().getFullTableName() + " (CleanupThread)") + , log(getLogger(log_name)) + , sleep_ms((*data.getSettings())[MergeTreeSetting::cleanup_delay_period] * 1000) +{ + task = data.getContext()->getSchedulePool().createTask(data.getStorageID(), log_name, [this] { run(); }); +} + +IMergeTreeCleanupThread::~IMergeTreeCleanupThread() = default; + +void IMergeTreeCleanupThread::start() +{ + task->activateAndSchedule(); +} + +void IMergeTreeCleanupThread::wakeup() +{ + task->schedule(); +} + +void IMergeTreeCleanupThread::stop() +{ + task->deactivate(); +} + +void IMergeTreeCleanupThread::wakeupEarlierIfNeeded() +{ + /// It may happen that the tables was idle for a long time, but then a user started to aggressively insert (or mutate) data. + /// In this case, sleep_ms was set to the highest possible value, the task is not going to wake up soon, + /// but the number of objects to clean up is growing. We need to wakeup the task earlier. + auto storage_settings = data.getSettings(); + if (!(*storage_settings)[MergeTreeSetting::cleanup_thread_preferred_points_per_iteration]) + return; + + /// The number of other objects (logs, blocks, etc) is usually correlated with the number of Outdated parts. + /// Do not wake up unless we have too many. + size_t number_of_outdated_objects = data.getOutdatedPartsCount(); + if (number_of_outdated_objects < (*storage_settings)[MergeTreeSetting::cleanup_thread_preferred_points_per_iteration] * 2) + return; + + /// A race condition is possible here, but it's okay + if (is_running.load(std::memory_order_relaxed)) + return; + + /// Do not re-check all parts too often (avoid constantly calling getNumberOfOutdatedPartsWithExpiredRemovalTime()) + if (!wakeup_check_timer.compareAndRestart(static_cast((*storage_settings)[MergeTreeSetting::cleanup_delay_period]) / 4.0)) + return; + + UInt64 prev_run_timestamp_ms = prev_cleanup_timestamp_ms.load(std::memory_order_relaxed); + UInt64 now_ms = clock_gettime_ns_adjusted(prev_run_timestamp_ms * 1'000'000) / 1'000'000; + if (!prev_run_timestamp_ms || now_ms <= prev_run_timestamp_ms) + return; + + /// Don't run it more often than cleanup_delay_period + UInt64 seconds_passed = (now_ms - prev_run_timestamp_ms) / 1000; + if (seconds_passed < (*storage_settings)[MergeTreeSetting::cleanup_delay_period]) + return; + + /// Do not count parts that cannot be removed anyway. Do not wake up unless we have too many. + number_of_outdated_objects = data.getNumberOfOutdatedPartsWithExpiredRemovalTime(); + if (number_of_outdated_objects < (*storage_settings)[MergeTreeSetting::cleanup_thread_preferred_points_per_iteration] * 2) + return; + + LOG_TRACE( + log, + "Waking up cleanup thread because there are {} outdated objects and previous cleanup finished {}s ago", + number_of_outdated_objects, + seconds_passed); + + wakeup(); +} + +void IMergeTreeCleanupThread::run() +{ + if (cleanup_blocker.isCancelled()) + { + LOG_TRACE(LogFrequencyLimiter(log, 30), "Cleanup is cancelled, exiting"); + return; + } + + SCOPE_EXIT({ is_running.store(false, std::memory_order_relaxed); }); + is_running.store(true, std::memory_order_relaxed); + + auto storage_settings = data.getSettings(); + + Float32 cleanup_points = 0; + try + { + cleanup_points = iterate(); + } + catch (const Coordination::Exception & e) + { + tryLogCurrentException(log, __PRETTY_FUNCTION__); + + if (e.code == Coordination::Error::ZSESSIONEXPIRED) + return; + } + catch (...) + { + tryLogCurrentException(log, __PRETTY_FUNCTION__); + } + + UInt64 prev_timestamp = prev_cleanup_timestamp_ms.load(std::memory_order_relaxed); + UInt64 now_ms = clock_gettime_ns_adjusted(prev_timestamp * 1'000'000) / 1'000'000; + + /// Do not adjust sleep_ms on the first run after starting the server + if (prev_timestamp && (*storage_settings)[MergeTreeSetting::cleanup_thread_preferred_points_per_iteration]) + { + /// We don't want to run the task too often when the table was barely changed and there's almost nothing to cleanup. + /// But we cannot simply sleep max_cleanup_delay_period (300s) when nothing was cleaned up and cleanup_delay_period (30s) + /// when we removed something, because inserting one part per 30s will lead to running cleanup each 30s just to remove one part. + /// So we need some interpolation based on preferred batch size. + auto expected_cleanup_points = (*storage_settings)[MergeTreeSetting::cleanup_thread_preferred_points_per_iteration]; + + /// How long should we sleep to remove cleanup_thread_preferred_points_per_iteration on the next iteration? + Float32 ratio = cleanup_points / static_cast(expected_cleanup_points); + if (ratio == 0) + sleep_ms = (*storage_settings)[MergeTreeSetting::max_cleanup_delay_period] * 1000; + else + sleep_ms = static_cast(static_cast(sleep_ms) / ratio); + + sleep_ms = std::clamp( + sleep_ms, + (*storage_settings)[MergeTreeSetting::cleanup_delay_period] * 1000, + (*storage_settings)[MergeTreeSetting::max_cleanup_delay_period] * 1000); + + UInt64 interval_ms = now_ms - prev_timestamp; + LOG_TRACE( + log, + "Scheduling next cleanup after {}ms (points: {}, interval: {}ms, ratio: {}, points per minute: {})", + sleep_ms, + cleanup_points, + interval_ms, + ratio, + cleanup_points / static_cast(interval_ms * 60'000)); + } + prev_cleanup_timestamp_ms.store(now_ms, std::memory_order_relaxed); + + sleep_ms += std::uniform_int_distribution(0, (*storage_settings)[MergeTreeSetting::cleanup_delay_period_random_add] * 1000)(rng); + task->scheduleAfter(sleep_ms); +} + +} diff --git a/src/Storages/MergeTree/IMergeTreeCleanupThread.h b/src/Storages/MergeTree/IMergeTreeCleanupThread.h new file mode 100644 index 000000000000..3621d9e177e7 --- /dev/null +++ b/src/Storages/MergeTree/IMergeTreeCleanupThread.h @@ -0,0 +1,56 @@ +#pragma once + +#include +#include +#include +#include + +#include + +namespace DB +{ + +class MergeTreeData; + +/// Removes obsolete data from a table of type [Replicated]MergeTree. +class IMergeTreeCleanupThread +{ +public: + explicit IMergeTreeCleanupThread(MergeTreeData & data_); + + virtual ~IMergeTreeCleanupThread(); + + void start(); + + void wakeup(); + + void stop(); + + void wakeupEarlierIfNeeded(); + + ActionLock getCleanupLock() { return cleanup_blocker.cancel(); } + +protected: + MergeTreeData & data; + + String log_name; + LoggerPtr log; + BackgroundSchedulePoolTaskHolder task; + pcg64 rng{randomSeed()}; + + UInt64 sleep_ms; + + std::atomic prev_cleanup_timestamp_ms = 0; + std::atomic is_running = false; + + AtomicStopwatch wakeup_check_timer; + + ActionBlocker cleanup_blocker; + + void run(); + + /// Returns a number this is directly proportional to the number of cleaned up blocks + virtual Float32 iterate() = 0; +}; + +} diff --git a/src/Storages/MergeTree/MergeTreeCleanupThread.cpp b/src/Storages/MergeTree/MergeTreeCleanupThread.cpp new file mode 100644 index 000000000000..a23e16a3db14 --- /dev/null +++ b/src/Storages/MergeTree/MergeTreeCleanupThread.cpp @@ -0,0 +1,63 @@ +#include + +#include +#include + +namespace DB +{ + +namespace MergeTreeSetting +{ + extern const MergeTreeSettingsSeconds lock_acquire_timeout_for_background_operations; + extern const MergeTreeSettingsUInt64 merge_tree_clear_old_parts_interval_seconds; + extern const MergeTreeSettingsUInt64 merge_tree_clear_old_temporary_directories_interval_seconds; + extern const MergeTreeSettingsSeconds temporary_directories_lifetime; +} + +MergeTreeCleanupThread::MergeTreeCleanupThread(StorageMergeTree & storage_) + : IMergeTreeCleanupThread(storage_) + , storage(storage_) +{ +} + +void MergeTreeCleanupThread::start() +{ + time_after_previous_cleanup_parts.restart(); + time_after_previous_cleanup_temporary_directories.restart(); + IMergeTreeCleanupThread::start(); +} + +Float32 MergeTreeCleanupThread::iterate() +{ + size_t cleaned_other = 0; + size_t cleaned_part_like = 0; + size_t cleaned_parts = 0; + + auto storage_settings = storage.getSettings(); + + auto shared_lock + = storage.lockForShare(RWLockImpl::NO_QUERY, (*storage_settings)[MergeTreeSetting::lock_acquire_timeout_for_background_operations]); + if (auto lock = time_after_previous_cleanup_temporary_directories.compareAndRestartDeferred( + static_cast((*storage_settings)[MergeTreeSetting::merge_tree_clear_old_temporary_directories_interval_seconds]))) + { + /// Both use relative_data_path which changes during rename, so we do it under share lock + cleaned_part_like += storage.clearOldTemporaryDirectories( + (*storage.getSettings())[MergeTreeSetting::temporary_directories_lifetime].totalSeconds()); + } + + if (auto lock = time_after_previous_cleanup_parts.compareAndRestartDeferred( + static_cast((*storage_settings)[MergeTreeSetting::merge_tree_clear_old_parts_interval_seconds]))) + { + cleaned_parts += storage.clearOldPartsFromFilesystem(/* force */ false, /* with_pause_point */ true); + cleaned_other += storage.clearOldMutations(); + cleaned_part_like += storage.clearEmptyParts(); + cleaned_part_like += storage.clearUnusedPatchParts(); + cleaned_part_like += storage.unloadPrimaryKeysAndClearCachesOfOutdatedParts(); + } + + constexpr Float32 parts_number_amplification = 1.3f; /// Assuming we merge 4-5 parts each time + Float32 cleaned_inserted_parts = static_cast(cleaned_parts) / parts_number_amplification; + return cleaned_inserted_parts + static_cast(cleaned_part_like) + static_cast(cleaned_other); +} + +} diff --git a/src/Storages/MergeTree/MergeTreeCleanupThread.h b/src/Storages/MergeTree/MergeTreeCleanupThread.h new file mode 100644 index 000000000000..aa3ce0f2a684 --- /dev/null +++ b/src/Storages/MergeTree/MergeTreeCleanupThread.h @@ -0,0 +1,31 @@ +#pragma once + +#include +#include + +namespace DB +{ + +class StorageMergeTree; + +class MergeTreeCleanupThread : public IMergeTreeCleanupThread +{ +public: + explicit MergeTreeCleanupThread(StorageMergeTree & storage_); + + /// Shadows IMergeTreeCleanupThread::start() to restart cleanup timers + /// before activating the background task. This ensures the thread waits + /// a full interval after the manual cleanup done in startup(). + void start(); + +private: + StorageMergeTree & storage; + + AtomicStopwatch time_after_previous_cleanup_parts; + AtomicStopwatch time_after_previous_cleanup_temporary_directories; + + /// Returns a number that is directly proportional to the number of cleaned up objects + Float32 iterate() override; +}; + +} diff --git a/src/Storages/MergeTree/ReplicatedMergeTreeCleanupThread.cpp b/src/Storages/MergeTree/ReplicatedMergeTreeCleanupThread.cpp index ae11b95ec511..32bef6aa7245 100644 --- a/src/Storages/MergeTree/ReplicatedMergeTreeCleanupThread.cpp +++ b/src/Storages/MergeTree/ReplicatedMergeTreeCleanupThread.cpp @@ -1,28 +1,21 @@ -#include -#include #include + +#include #include -#include -#include +#include -#include #include #include -#include - +#include namespace DB { namespace MergeTreeSetting { - extern const MergeTreeSettingsUInt64 cleanup_delay_period; - extern const MergeTreeSettingsUInt64 cleanup_delay_period_random_add; - extern const MergeTreeSettingsUInt64 cleanup_thread_preferred_points_per_iteration; extern const MergeTreeSettingsUInt64 finished_mutations_to_keep; extern const MergeTreeSettingsSeconds lock_acquire_timeout_for_background_operations; - extern const MergeTreeSettingsUInt64 max_cleanup_delay_period; extern const MergeTreeSettingsUInt64 max_replicated_logs_to_keep; extern const MergeTreeSettingsUInt64 min_replicated_logs_to_keep; extern const MergeTreeSettingsUInt64 replicated_deduplication_window; @@ -41,132 +34,9 @@ namespace ErrorCodes ReplicatedMergeTreeCleanupThread::ReplicatedMergeTreeCleanupThread(StorageReplicatedMergeTree & storage_) - : storage(storage_) - , log_name(storage.getStorageID().getFullTableName() + " (ReplicatedMergeTreeCleanupThread)") - , log(getLogger(log_name)) - , sleep_ms((*storage.getSettings())[MergeTreeSetting::cleanup_delay_period] * 1000) -{ - task = storage.getContext()->getSchedulePool().createTask(log_name, [this]{ run(); }); -} - -void ReplicatedMergeTreeCleanupThread::run() -{ - if (cleanup_blocker.isCancelled()) - { - LOG_TRACE(LogFrequencyLimiter(log, 30), "Cleanup is cancelled, exiting"); - return; - } - - SCOPE_EXIT({ is_running.store(false, std::memory_order_relaxed); }); - is_running.store(true, std::memory_order_relaxed); - - auto storage_settings = storage.getSettings(); - - Float32 cleanup_points = 0; - try - { - cleanup_points = iterate(); - } - catch (const Coordination::Exception & e) - { - tryLogCurrentException(log, __PRETTY_FUNCTION__); - - if (e.code == Coordination::Error::ZSESSIONEXPIRED) - return; - } - catch (...) - { - tryLogCurrentException(log, __PRETTY_FUNCTION__); - } - - UInt64 prev_timestamp = prev_cleanup_timestamp_ms.load(std::memory_order_relaxed); - UInt64 now_ms = clock_gettime_ns_adjusted(prev_timestamp * 1'000'000) / 1'000'000; - - /// Do not adjust sleep_ms on the first run after starting the server - if (prev_timestamp && (*storage_settings)[MergeTreeSetting::cleanup_thread_preferred_points_per_iteration]) - { - /// We don't want to run the task too often when the table was barely changed and there's almost nothing to cleanup. - /// But we cannot simply sleep max_cleanup_delay_period (300s) when nothing was cleaned up and cleanup_delay_period (30s) - /// when we removed something, because inserting one part per 30s will lead to running cleanup each 30s just to remove one part. - /// So we need some interpolation based on preferred batch size. - auto expected_cleanup_points = (*storage_settings)[MergeTreeSetting::cleanup_thread_preferred_points_per_iteration]; - - /// How long should we sleep to remove cleanup_thread_preferred_points_per_iteration on the next iteration? - Float32 ratio = cleanup_points / expected_cleanup_points; - if (ratio == 0) - sleep_ms = (*storage_settings)[MergeTreeSetting::max_cleanup_delay_period] * 1000; - else - sleep_ms = static_cast(sleep_ms / ratio); - - sleep_ms = std::clamp(sleep_ms, (*storage_settings)[MergeTreeSetting::cleanup_delay_period] * 1000, (*storage_settings)[MergeTreeSetting::max_cleanup_delay_period] * 1000); - - UInt64 interval_ms = now_ms - prev_timestamp; - LOG_TRACE(log, "Scheduling next cleanup after {}ms (points: {}, interval: {}ms, ratio: {}, points per minute: {})", - sleep_ms, cleanup_points, interval_ms, ratio, cleanup_points / interval_ms * 60'000); - } - prev_cleanup_timestamp_ms.store(now_ms, std::memory_order_relaxed); - - sleep_ms += std::uniform_int_distribution(0, (*storage_settings)[MergeTreeSetting::cleanup_delay_period_random_add] * 1000)(rng); - task->scheduleAfter(sleep_ms); -} - -void ReplicatedMergeTreeCleanupThread::wakeupEarlierIfNeeded() -{ - /// It may happen that the tables was idle for a long time, but then a user started to aggressively insert (or mutate) data. - /// In this case, sleep_ms was set to the highest possible value, the task is not going to wake up soon, - /// but the number of objects to clean up is growing. We need to wakeup the task earlier. - auto storage_settings = storage.getSettings(); - if (!(*storage_settings)[MergeTreeSetting::cleanup_thread_preferred_points_per_iteration]) - return; - - /// The number of other objects (logs, blocks, etc) is usually correlated with the number of Outdated parts. - /// Do not wake up unless we have too many. - size_t number_of_outdated_objects = storage.getOutdatedPartsCount(); - if (number_of_outdated_objects < (*storage_settings)[MergeTreeSetting::cleanup_thread_preferred_points_per_iteration] * 2) - return; - - /// A race condition is possible here, but it's okay - if (is_running.load(std::memory_order_relaxed)) - return; - - /// Do not re-check all parts too often (avoid constantly calling getNumberOfOutdatedPartsWithExpiredRemovalTime()) - if (!wakeup_check_timer.compareAndRestart((*storage_settings)[MergeTreeSetting::cleanup_delay_period] / 4.0)) - return; - - UInt64 prev_run_timestamp_ms = prev_cleanup_timestamp_ms.load(std::memory_order_relaxed); - UInt64 now_ms = clock_gettime_ns_adjusted(prev_run_timestamp_ms * 1'000'000) / 1'000'000; - if (!prev_run_timestamp_ms || now_ms <= prev_run_timestamp_ms) - return; - - /// Don't run it more often than cleanup_delay_period - UInt64 seconds_passed = (now_ms - prev_run_timestamp_ms) / 1000; - if (seconds_passed < (*storage_settings)[MergeTreeSetting::cleanup_delay_period]) - return; - - /// Do not count parts that cannot be removed anyway. Do not wake up unless we have too many. - number_of_outdated_objects = storage.getNumberOfOutdatedPartsWithExpiredRemovalTime(); - if (number_of_outdated_objects < (*storage_settings)[MergeTreeSetting::cleanup_thread_preferred_points_per_iteration] * 2) - return; - - LOG_TRACE(log, "Waking up cleanup thread because there are {} outdated objects and previous cleanup finished {}s ago", - number_of_outdated_objects, seconds_passed); - - wakeup(); -} - -void ReplicatedMergeTreeCleanupThread::start() -{ - task->activateAndSchedule(); -} - -void ReplicatedMergeTreeCleanupThread::wakeup() -{ - task->schedule(); -} - -void ReplicatedMergeTreeCleanupThread::stop() + : IMergeTreeCleanupThread(storage_) + , storage(storage_) { - task->deactivate(); } Float32 ReplicatedMergeTreeCleanupThread::iterate() @@ -191,18 +61,31 @@ Float32 ReplicatedMergeTreeCleanupThread::iterate() if (storage.is_leader) { cleaned_logs = clearOldLogs(); - size_t normal_blocks = clearOldBlocks("blocks", (*storage_settings)[MergeTreeSetting::replicated_deduplication_window_seconds], - (*storage_settings)[MergeTreeSetting::replicated_deduplication_window], cached_block_stats_for_sync_inserts); - size_t async_blocks = clearOldBlocks("async_blocks", - (*storage_settings)[MergeTreeSetting::replicated_deduplication_window_seconds_for_async_inserts], - (*storage_settings)[MergeTreeSetting::replicated_deduplication_window_for_async_inserts], - cached_block_stats_for_async_inserts); + auto zookeeper = storage.getZooKeeper(); + + size_t deduplication_blocks = clearOldBlocks(storage.zookeeper_path, "deduplication_hashes", *zookeeper, + (*storage_settings)[MergeTreeSetting::replicated_deduplication_window_seconds], + (*storage_settings)[MergeTreeSetting::replicated_deduplication_window], + cached_stats_for_insert_deduplication_hashes, + log); + + size_t normal_blocks = clearOldBlocks(storage.zookeeper_path, "blocks", *zookeeper, + (*storage_settings)[MergeTreeSetting::replicated_deduplication_window_seconds], + (*storage_settings)[MergeTreeSetting::replicated_deduplication_window], + cached_block_stats_for_sync_inserts, + log); + + size_t async_blocks = clearOldBlocks(storage.zookeeper_path, "async_blocks", *zookeeper, + (*storage_settings)[MergeTreeSetting::replicated_deduplication_window_seconds_for_async_inserts], + (*storage_settings)[MergeTreeSetting::replicated_deduplication_window_for_async_inserts], + cached_block_stats_for_async_inserts, + log); /// Many async blocks are transformed into one ordinary block Float32 async_blocks_per_block = static_cast((*storage_settings)[MergeTreeSetting::replicated_deduplication_window]) / - ((*storage_settings)[MergeTreeSetting::replicated_deduplication_window_for_async_inserts] + 1); - cleaned_blocks = (normal_blocks + async_blocks * async_blocks_per_block) / 2; + static_cast((*storage_settings)[MergeTreeSetting::replicated_deduplication_window_for_async_inserts] + 1); + cleaned_blocks = (static_cast(deduplication_blocks) + static_cast(normal_blocks) + static_cast(async_blocks) * async_blocks_per_block) / 2; cleaned_other += clearOldMutations(); cleaned_part_like += storage.clearEmptyParts(); @@ -222,8 +105,8 @@ Float32 ReplicatedMergeTreeCleanupThread::iterate() /// many Outdated parts, and WALs usually contain many parts too). We count then as one part for simplicity. constexpr Float32 parts_number_amplification = 1.3f; /// Assuming we merge 4-5 parts each time - Float32 cleaned_inserted_parts = (cleaned_blocks + (cleaned_logs + cleaned_parts) / parts_number_amplification) / 3; - return cleaned_inserted_parts + cleaned_part_like + cleaned_other; + Float32 cleaned_inserted_parts = (cleaned_blocks + static_cast(cleaned_logs + cleaned_parts) / parts_number_amplification) / 3; + return cleaned_inserted_parts + static_cast(cleaned_part_like + cleaned_other); } @@ -460,22 +343,33 @@ struct ReplicatedMergeTreeCleanupThread::NodeWithStat { String node; Int64 ctime = 0; + Int64 czxid = 0; Int32 version = 0; - NodeWithStat(String node_, Int64 ctime_, Int32 version_) : node(std::move(node_)), ctime(ctime_), version(version_) {} + NodeWithStat(String node_, Int64 ctime_, Int64 czxid_, Int32 version_) : node(std::move(node_)), ctime(ctime_), czxid(czxid_), version(version_) {} + /// Sort by (ctime, czxid) rather than (ctime, node_name) to ensure consistent ordering + /// across different deduplication directories (blocks/, deduplication_hashes/). + /// With COMPATIBLE_DOUBLE_HASHES, entries for the same insert exist in both directories + /// with different node names but the same czxid (created in the same multi-op). + /// Using czxid ensures both directories remove entries for the same logical inserts. static bool greaterByTime(const NodeWithStat & lhs, const NodeWithStat & rhs) { - return std::forward_as_tuple(lhs.ctime, lhs.node) > std::forward_as_tuple(rhs.ctime, rhs.node); + return std::forward_as_tuple(lhs.ctime, lhs.czxid) > std::forward_as_tuple(rhs.ctime, rhs.czxid); } }; -size_t ReplicatedMergeTreeCleanupThread::clearOldBlocks(const String & blocks_dir_name, UInt64 window_seconds, UInt64 window_size, NodeCTimeAndVersionCache & cached_block_stats) +size_t ReplicatedMergeTreeCleanupThread::clearOldBlocks( + const String & zookeeper_path, + const String & blocks_dir_name, + zkutil::ZooKeeper & zookeeper, + UInt64 window_seconds, + UInt64 window_size, + NodeCTimeAndVersionCache & cached_block_stats, + LoggerPtr log_) { - auto zookeeper = storage.getZooKeeper(); - std::vector timed_blocks; - getBlocksSortedByTime(blocks_dir_name, *zookeeper, timed_blocks, cached_block_stats); + getBlocksSortedByTime(zookeeper_path, blocks_dir_name, zookeeper, timed_blocks, cached_block_stats, log_); if (timed_blocks.empty()) return 0; @@ -487,7 +381,7 @@ size_t ReplicatedMergeTreeCleanupThread::clearOldBlocks(const String & blocks_di current_time - static_cast(1000 * window_seconds)); /// Virtual node, all nodes that are "greater" than this one will be deleted - NodeWithStat block_threshold{{}, time_threshold, 0}; + NodeWithStat block_threshold{{}, time_threshold, 0, 0}; size_t current_deduplication_window = std::min(timed_blocks.size(), window_size); auto first_outdated_block_fixed_threshold = timed_blocks.begin() + current_deduplication_window; @@ -500,15 +394,15 @@ size_t ReplicatedMergeTreeCleanupThread::clearOldBlocks(const String & blocks_di return 0; auto last_outdated_block = timed_blocks.end() - 1; - LOG_TRACE(log, "Will clear {} old blocks from {} (ctime {}) to {} (ctime {})", num_nodes_to_delete, + LOG_TRACE(log_, "Will clear {} old blocks from {} (ctime {}) to {} (ctime {})", num_nodes_to_delete, first_outdated_block->node, first_outdated_block->ctime, last_outdated_block->node, last_outdated_block->ctime); zkutil::AsyncResponses try_remove_futures; for (auto it = first_outdated_block; it != timed_blocks.end(); ++it) { - String path = storage.zookeeper_path + "/" + blocks_dir_name + "/" + it->node; - try_remove_futures.emplace_back(path, zookeeper->asyncTryRemove(path, it->version)); + String path = zookeeper_path + "/" + blocks_dir_name + "/" + it->node; + try_remove_futures.emplace_back(path, zookeeper.asyncTryRemove(path, it->version)); } for (auto & pair : try_remove_futures) @@ -518,7 +412,7 @@ size_t ReplicatedMergeTreeCleanupThread::clearOldBlocks(const String & blocks_di if (rc == Coordination::Error::ZNOTEMPTY) { /// Can happen if there are leftover block nodes with children created by previous server versions. - zookeeper->removeRecursive(path); + zookeeper.removeRecursive(path); cached_block_stats.erase(first_outdated_block->node); } else if (rc == Coordination::Error::ZOK || rc == Coordination::Error::ZNONODE || rc == Coordination::Error::ZBADVERSION) @@ -529,24 +423,30 @@ size_t ReplicatedMergeTreeCleanupThread::clearOldBlocks(const String & blocks_di } else { - LOG_WARNING(log, "Error while deleting ZooKeeper path `{}`: {}, ignoring.", path, rc); + LOG_WARNING(log_, "Error while deleting ZooKeeper path `{}`: {}, ignoring.", path, rc); } first_outdated_block++; } - LOG_TRACE(log, "Cleared {} old blocks from ZooKeeper", num_nodes_to_delete); + LOG_TRACE(log_, "Cleared {} old blocks from ZooKeeper", num_nodes_to_delete); return num_nodes_to_delete; } -void ReplicatedMergeTreeCleanupThread::getBlocksSortedByTime(const String & blocks_dir_name, zkutil::ZooKeeper & zookeeper, std::vector & timed_blocks, NodeCTimeAndVersionCache & cached_block_stats) +void ReplicatedMergeTreeCleanupThread::getBlocksSortedByTime( + const String & zookeeper_path, + const String & blocks_dir_name, + zkutil::ZooKeeper & zookeeper, + std::vector & timed_blocks, + NodeCTimeAndVersionCache & cached_block_stats, + LoggerPtr log_) { timed_blocks.clear(); Strings blocks; Coordination::Stat stat; - if (Coordination::Error::ZOK != zookeeper.tryGetChildren(storage.zookeeper_path + "/" + blocks_dir_name, blocks, &stat)) - throw Exception(ErrorCodes::NOT_FOUND_NODE, "{}/{} doesn't exist", storage.zookeeper_path, blocks_dir_name); + if (Coordination::Error::ZOK != zookeeper.tryGetChildren(zookeeper_path + "/" + blocks_dir_name, blocks, &stat)) + throw Exception(ErrorCodes::NOT_FOUND_NODE, "{}/{} doesn't exist", zookeeper_path, blocks_dir_name); /// Seems like this code is obsolete, because we delete blocks from cache /// when they are deleted from zookeeper. But we don't know about all (maybe future) places in code @@ -565,7 +465,7 @@ void ReplicatedMergeTreeCleanupThread::getBlocksSortedByTime(const String & bloc auto not_cached_blocks = stat.numChildren - cached_block_stats.size(); if (not_cached_blocks) { - LOG_TRACE(log, "Checking {} {} ({} are not cached){}, path is {}", stat.numChildren, blocks_dir_name, not_cached_blocks, " to clear old ones from ZooKeeper.", storage.zookeeper_path + "/" + blocks_dir_name); + LOG_TRACE(log_, "Checking {} {} ({} are not cached) to clear old ones from ZooKeeper., path is {}/{}", stat.numChildren, blocks_dir_name, not_cached_blocks, zookeeper_path, blocks_dir_name); } std::vector exists_paths; @@ -575,13 +475,13 @@ void ReplicatedMergeTreeCleanupThread::getBlocksSortedByTime(const String & bloc if (it == cached_block_stats.end()) { /// New block. Fetch its stat asynchronously. - exists_paths.emplace_back(storage.zookeeper_path + "/" + blocks_dir_name + "/" + block); + exists_paths.emplace_back(zookeeper_path + "/" + blocks_dir_name + "/" + block); } else { /// Cached block - const auto & ctime_and_version = it->second; - timed_blocks.emplace_back(block, ctime_and_version.first, ctime_and_version.second); + const auto & entry = it->second; + timed_blocks.emplace_back(block, entry.ctime, entry.czxid, entry.version); } } @@ -595,8 +495,8 @@ void ReplicatedMergeTreeCleanupThread::getBlocksSortedByTime(const String & bloc if (status.error != Coordination::Error::ZNONODE) { auto node_name = fs::path(exists_paths[i]).filename(); - cached_block_stats.emplace(node_name, std::make_pair(status.stat.ctime, status.stat.version)); - timed_blocks.emplace_back(node_name, status.stat.ctime, status.stat.version); + cached_block_stats.emplace(node_name, NodeCacheEntry{status.stat.ctime, status.stat.czxid, status.stat.version}); + timed_blocks.emplace_back(node_name, status.stat.ctime, status.stat.czxid, status.stat.version); } } diff --git a/src/Storages/MergeTree/ReplicatedMergeTreeCleanupThread.h b/src/Storages/MergeTree/ReplicatedMergeTreeCleanupThread.h index 268c9a08202c..fc69583145f4 100644 --- a/src/Storages/MergeTree/ReplicatedMergeTreeCleanupThread.h +++ b/src/Storages/MergeTree/ReplicatedMergeTreeCleanupThread.h @@ -1,61 +1,48 @@ #pragma once -#include -#include -#include -#include -#include -#include -#include +#include +#include -#include -#include -#include +namespace zkutil +{ + +class ZooKeeper; +using ZooKeeperPtr = std::shared_ptr; +} namespace DB { class StorageReplicatedMergeTree; - -/** Removes obsolete data from a table of type ReplicatedMergeTree. - */ -class ReplicatedMergeTreeCleanupThread +class ReplicatedMergeTreeCleanupThread : public IMergeTreeCleanupThread { public: explicit ReplicatedMergeTreeCleanupThread(StorageReplicatedMergeTree & storage_); - void start(); - - void wakeup(); - - void stop(); - - void wakeupEarlierIfNeeded(); - - ActionLock getCleanupLock() { return cleanup_blocker.cancel(); } + struct NodeCacheEntry + { + Int64 ctime = 0; + Int64 czxid = 0; + Int32 version = 0; + }; + using NodeCTimeAndVersionCache = std::map; + /// Remove old block hashes from ZooKeeper. This is done by the leader replica. Returns the number of removed blocks + static size_t clearOldBlocks( + const String & zookeeper_path, + const String & blocks_dir_name, + zkutil::ZooKeeper & zookeeper, + UInt64 window_seconds, + UInt64 window_size, + NodeCTimeAndVersionCache & cached_block_stats, + LoggerPtr log_); private: StorageReplicatedMergeTree & storage; - String log_name; - LoggerPtr log; - BackgroundSchedulePoolTaskHolder task; - pcg64 rng{randomSeed()}; - - UInt64 sleep_ms; - - std::atomic prev_cleanup_timestamp_ms = 0; - std::atomic is_running = false; - - AtomicStopwatch wakeup_check_timer; - - ActionBlocker cleanup_blocker; - - void run(); /// Returns a number this is directly proportional to the number of cleaned up blocks - Float32 iterate(); + Float32 iterate() override; /// Remove old records from ZooKeeper. Returns the number of removed logs size_t clearOldLogs(); @@ -63,26 +50,30 @@ class ReplicatedMergeTreeCleanupThread /// The replica is marked as "lost" if it is inactive and its log pointer /// is far behind and we are not going to keep logs for it. /// Lost replicas will use different strategy for repair. - void markLostReplicas(const std::unordered_map & host_versions_lost_replicas, - const std::unordered_map & log_pointers_candidate_lost_replicas, - size_t replicas_count, const zkutil::ZooKeeperPtr & zookeeper); - - using NodeCTimeAndVersionCache = std::map>; - /// Remove old block hashes from ZooKeeper. This is done by the leader replica. Returns the number of removed blocks - size_t clearOldBlocks(const String & blocks_dir_name, UInt64 window_seconds, UInt64 window_size, NodeCTimeAndVersionCache & cached_block_stats); + void markLostReplicas( + const std::unordered_map & host_versions_lost_replicas, + const std::unordered_map & log_pointers_candidate_lost_replicas, + size_t replicas_count, + const zkutil::ZooKeeperPtr & zookeeper); /// Remove old mutations that are done from ZooKeeper. This is done by the leader replica. Returns the number of removed mutations size_t clearOldMutations(); + NodeCTimeAndVersionCache cached_stats_for_insert_deduplication_hashes; NodeCTimeAndVersionCache cached_block_stats_for_sync_inserts; NodeCTimeAndVersionCache cached_block_stats_for_async_inserts; struct NodeWithStat; /// Returns list of blocks (with their stat) sorted by ctime in descending order. - void getBlocksSortedByTime(const String & blocks_dir_name, zkutil::ZooKeeper & zookeeper, std::vector & timed_blocks, NodeCTimeAndVersionCache & cached_block_stats); + static void getBlocksSortedByTime( + const String & zookeeper_path, + const String & blocks_dir_name, + zkutil::ZooKeeper & zookeeper, + std::vector & timed_blocks, + NodeCTimeAndVersionCache & cached_block_stats, + LoggerPtr log_); /// TODO Removing old quorum/failed_parts }; - } diff --git a/src/Storages/StorageMergeTree.cpp b/src/Storages/StorageMergeTree.cpp index acfe2905f91d..7e37c6f2845b 100644 --- a/src/Storages/StorageMergeTree.cpp +++ b/src/Storages/StorageMergeTree.cpp @@ -64,6 +64,11 @@ namespace DB namespace FailPoints { extern const char storage_merge_tree_background_clear_old_parts_pause[]; + extern const char smt_merge_selecting_task_pause_when_scheduled[]; + extern const char mt_select_parts_to_mutate_no_free_threads[]; + extern const char mt_select_parts_to_mutate_max_part_size[]; + extern const char storage_shared_merge_tree_mutate_pause_before_wait[]; + extern const char storage_merge_tree_background_schedule_merge_fail[]; } namespace Setting @@ -127,6 +132,7 @@ namespace ActionLocks extern const StorageActionBlockType PartsMerge; extern const StorageActionBlockType PartsTTLMerge; extern const StorageActionBlockType PartsMove; + extern const StorageActionBlockType Cleanup; } static MergeTreeTransactionPtr tryGetTransactionForMutation(const MergeTreeMutationEntry & mutation, LoggerPtr log = nullptr) @@ -166,6 +172,7 @@ StorageMergeTree::StorageMergeTree( , reader(*this) , writer(*this) , merger_mutator(*this) + , cleanup_thread(*this) { initializeDirectoriesAndFormatVersion(relative_data_path_, LoadingStrictnessLevel::ATTACH <= mode, date_column_name); @@ -203,11 +210,10 @@ void StorageMergeTree::startup() clearOldTemporaryDirectories(0, {"tmp_", "delete_tmp_", "tmp-fetch_"}); /// NOTE background task will also do the above cleanups periodically. - time_after_previous_cleanup_parts.restart(); - time_after_previous_cleanup_temporary_directories.restart(); try { + cleanup_thread.start(); background_operations_assignee.start(); startBackgroundMovesIfNeeded(); startOutdatedAndUnexpectedDataPartsLoadingTask(); @@ -231,6 +237,23 @@ void StorageMergeTree::startup() } } +void StorageMergeTree::flushAndPrepareForShutdown() +{ + LOG_TRACE(log, "Start preparing for shutdown"); + + if (flush_called.exchange(true)) + return; + + merger_mutator.merges_blocker.cancelForever(); + parts_mover.moves_blocker.cancelForever(); + + background_operations_assignee.finish(); + background_moves_assignee.finish(); + + cleanup_thread.stop(); + + LOG_TRACE(log, "Finished preparing for shutdown"); +} void StorageMergeTree::shutdown(bool) { if (shutdown_called.exchange(true)) @@ -1505,6 +1528,9 @@ bool StorageMergeTree::scheduleDataProcessingJob(BackgroundJobsAssignee & assign assert(!isStaticStorage()); + FailPointInjection::pauseFailPoint(FailPoints::smt_merge_selecting_task_pause_when_scheduled); + + cleanup_thread.wakeupEarlierIfNeeded(); auto metadata_snapshot = getInMemoryMetadataPtr(); MergeMutateSelectedEntryPtr merge_entry; MergeMutateSelectedEntryPtr mutate_entry; @@ -1571,6 +1597,12 @@ bool StorageMergeTree::scheduleDataProcessingJob(BackgroundJobsAssignee & assign /// in MergePlainMergeTreeTask. So, this slot will never be freed. if (!scheduled && isTTLMergeType(merge_entry->future_part->merge_type)) getContext()->getMergeList().cancelMergeWithTTL(); + + fiu_do_on(FailPoints::storage_merge_tree_background_schedule_merge_fail, + { + scheduled = false; + }); + return scheduled; } if (mutate_entry) @@ -1592,39 +1624,7 @@ bool StorageMergeTree::scheduleDataProcessingJob(BackgroundJobsAssignee & assign mutation_wait_event.notify_all(); } - bool scheduled = false; - if (auto lock = time_after_previous_cleanup_temporary_directories.compareAndRestartDeferred( - (*getSettings())[MergeTreeSetting::merge_tree_clear_old_temporary_directories_interval_seconds])) - { - assignee.scheduleCommonTask(std::make_shared( - [this, shared_lock] () - { - return clearOldTemporaryDirectories((*getSettings())[MergeTreeSetting::temporary_directories_lifetime].totalSeconds()); - }, common_assignee_trigger, getStorageID()), /* need_trigger */ false); - scheduled = true; - } - - if (auto lock = time_after_previous_cleanup_parts.compareAndRestartDeferred( - (*getSettings())[MergeTreeSetting::merge_tree_clear_old_parts_interval_seconds])) - { - assignee.scheduleCommonTask(std::make_shared( - [this, shared_lock] () - { - /// All use relative_data_path which changes during rename - /// so execute under share lock. - size_t cleared_count = 0; - cleared_count += clearOldPartsFromFilesystem(/* force */ false, /* with_pause_point */true); - cleared_count += clearOldMutations(); - cleared_count += clearEmptyParts(); - cleared_count += clearUnusedPatchParts(); - cleared_count += unloadPrimaryKeysAndClearCachesOfOutdatedParts(); - return cleared_count; - /// TODO maybe take into account number of cleared objects when calculating backoff - }, common_assignee_trigger, getStorageID()), /* need_trigger */ false); - scheduled = true; - } - - return scheduled; + return false; } UInt64 StorageMergeTree::getCurrentMutationVersion(UInt64 data_version, std::unique_lock & /*currently_processing_in_background_mutex_lock*/) const @@ -2598,6 +2598,8 @@ ActionLock StorageMergeTree::getActionLock(StorageActionBlockType action_type) return merger_mutator.ttl_merges_blocker.cancel(); if (action_type == ActionLocks::PartsMove) return parts_mover.moves_blocker.cancel(); + if (action_type == ActionLocks::Cleanup) + return cleanup_thread.getCleanupLock(); return {}; } @@ -2608,6 +2610,8 @@ void StorageMergeTree::onActionLockRemove(StorageActionBlockType action_type) background_operations_assignee.trigger(); else if (action_type == ActionLocks::PartsMove) background_moves_assignee.trigger(); + else if (action_type == ActionLocks::Cleanup) + cleanup_thread.wakeup(); } IStorage::DataValidationTasksPtr StorageMergeTree::getCheckTaskList( diff --git a/src/Storages/StorageMergeTree.h b/src/Storages/StorageMergeTree.h index 00cbc7acdad8..f2df01dc8778 100644 --- a/src/Storages/StorageMergeTree.h +++ b/src/Storages/StorageMergeTree.h @@ -3,6 +3,7 @@ #include #include #include +#include #include #include #include @@ -131,16 +132,13 @@ class StorageMergeTree final : public MergeTreeData MergeTreeDataSelectExecutor reader; MergeTreeDataWriter writer; MergeTreeDataMergerMutator merger_mutator; + MergeTreeCleanupThread cleanup_thread; std::unique_ptr deduplication_log; /// For block numbers. SimpleIncrement increment; - /// For clearOldParts - AtomicStopwatch time_after_previous_cleanup_parts; - /// For clearOldTemporaryDirectories. - AtomicStopwatch time_after_previous_cleanup_temporary_directories; /// For clearOldBrokenDetachedParts AtomicStopwatch time_after_previous_cleanup_broken_detached_parts; @@ -304,6 +302,7 @@ class StorageMergeTree final : public MergeTreeData friend class MergeTreeData; friend class MergePlainMergeTreeTask; friend class MutatePlainMergeTreeTask; + friend class MergeTreeCleanupThread; struct DataValidationTasks : public IStorage::DataValidationTasksBase { diff --git a/tests/integration/test_merge_tree_load_parts/test.py b/tests/integration/test_merge_tree_load_parts/test.py index ebd68b70bf1b..9c1da7ef4a19 100644 --- a/tests/integration/test_merge_tree_load_parts/test.py +++ b/tests/integration/test_merge_tree_load_parts/test.py @@ -76,7 +76,12 @@ def test_merge_tree_load_parts(started_cluster): == "1\n" ) - node1.query("ALTER TABLE mt_load_parts MODIFY SETTING old_parts_lifetime = 1") + node1.query( + "ALTER TABLE mt_load_parts MODIFY SETTING " + "old_parts_lifetime = 1, cleanup_delay_period = 1, " + "cleanup_delay_period_random_add = 0, " + "cleanup_thread_preferred_points_per_iteration = 0" + ) node1.query("DETACH TABLE mt_load_parts") node1.query("ATTACH TABLE mt_load_parts") diff --git a/tests/integration/test_merge_tree_s3/test.py b/tests/integration/test_merge_tree_s3/test.py index bf28ae9b3ca2..ec0dd56f784a 100644 --- a/tests/integration/test_merge_tree_s3/test.py +++ b/tests/integration/test_merge_tree_s3/test.py @@ -95,6 +95,9 @@ def create_table(node, table_name, **additional_settings): "index_granularity": 512, "temporary_directories_lifetime": 1, "write_marks_for_substreams_in_compact_parts": 1, + "cleanup_delay_period": 1, + "cleanup_delay_period_random_add": 0, + "cleanup_thread_preferred_points_per_iteration": 0, } settings.update(additional_settings) diff --git a/tests/integration/test_s3_plain_rewritable/test.py b/tests/integration/test_s3_plain_rewritable/test.py index f02f20d954ff..8b1730a23d69 100644 --- a/tests/integration/test_s3_plain_rewritable/test.py +++ b/tests/integration/test_s3_plain_rewritable/test.py @@ -63,7 +63,7 @@ def create_insert(node, table_name, insert_values): ) ENGINE=MergeTree() PARTITION BY id % 10 ORDER BY id - SETTINGS storage_policy='{}' + SETTINGS storage_policy='{}', cleanup_delay_period=1, cleanup_delay_period_random_add=0, cleanup_thread_preferred_points_per_iteration=0 """.format( table_name, storage_policy ) diff --git a/tests/queries/0_stateless/00652_mergetree_mutations.sh b/tests/queries/0_stateless/00652_mergetree_mutations.sh index edb306d38831..dec4adb720cf 100755 --- a/tests/queries/0_stateless/00652_mergetree_mutations.sh +++ b/tests/queries/0_stateless/00652_mergetree_mutations.sh @@ -53,7 +53,7 @@ ${CLICKHOUSE_CLIENT} --query="SELECT '*** Test mutations cleaner ***'" ${CLICKHOUSE_CLIENT} --query="DROP TABLE IF EXISTS mutations_cleaner" # Create a table with finished_mutations_to_keep = 2 -${CLICKHOUSE_CLIENT} --query="CREATE TABLE mutations_cleaner(x UInt32) ENGINE MergeTree ORDER BY x SETTINGS finished_mutations_to_keep = 2" +${CLICKHOUSE_CLIENT} --query="CREATE TABLE mutations_cleaner(x UInt32) ENGINE MergeTree ORDER BY x SETTINGS finished_mutations_to_keep = 2, cleanup_delay_period = 1, cleanup_delay_period_random_add = 0, cleanup_thread_preferred_points_per_iteration = 0" # Insert some data ${CLICKHOUSE_CLIENT} --query="INSERT INTO mutations_cleaner(x) VALUES (1), (2), (3), (4)" @@ -65,9 +65,9 @@ ${CLICKHOUSE_CLIENT} --query="ALTER TABLE mutations_cleaner DELETE WHERE x = 3" wait_for_mutation "mutations_cleaner" "mutation_4.txt" -# Sleep and then do an INSERT to wakeup the background task that will clean up the old mutations +# Sleep and then wakeup the background cleanup thread that will clean up the old mutations sleep 1 -${CLICKHOUSE_CLIENT} --query="INSERT INTO mutations_cleaner(x) VALUES (4)" +${CLICKHOUSE_CLIENT} --query="SYSTEM START CLEANUP mutations_cleaner" sleep 0.1 for i in {1..10} @@ -77,12 +77,12 @@ do break fi - if [[ $i -eq 100 ]]; then + if [[ $i -eq 10 ]]; then echo "Timed out while waiting for outdated mutation record to be deleted!" fi sleep 1 - ${CLICKHOUSE_CLIENT} --query="INSERT INTO mutations_cleaner(x) VALUES (4)" + ${CLICKHOUSE_CLIENT} --query="SYSTEM START CLEANUP mutations_cleaner" done # Check that the first mutation is cleaned diff --git a/tests/queries/0_stateless/01560_ttl_remove_empty_parts.sh b/tests/queries/0_stateless/01560_ttl_remove_empty_parts.sh index b65e6019a2a6..2d21f60d55ef 100755 --- a/tests/queries/0_stateless/01560_ttl_remove_empty_parts.sh +++ b/tests/queries/0_stateless/01560_ttl_remove_empty_parts.sh @@ -10,7 +10,7 @@ CURDIR=$(cd "$(dirname "${BASH_SOURCE[0]}")" && pwd) ${CLICKHOUSE_CLIENT} -q 'DROP TABLE IF EXISTS ttl_empty_parts' ${CLICKHOUSE_CLIENT} -q ' - CREATE TABLE ttl_empty_parts (id UInt32, d Date) ENGINE = MergeTree ORDER BY tuple() PARTITION BY id SETTINGS old_parts_lifetime=5 + CREATE TABLE ttl_empty_parts (id UInt32, d Date) ENGINE = MergeTree ORDER BY tuple() PARTITION BY id SETTINGS old_parts_lifetime = 5, merge_tree_clear_old_parts_interval_seconds = 1, cleanup_delay_period = 1, cleanup_delay_period_random_add = 0, cleanup_thread_preferred_points_per_iteration = 0 ' ${CLICKHOUSE_CLIENT} -q "INSERT INTO ttl_empty_parts SELECT 0, toDate('2005-01-01') + number from numbers(500);" diff --git a/tests/queries/0_stateless/02421_new_type_json_empty_parts.sh b/tests/queries/0_stateless/02421_new_type_json_empty_parts.sh index 1d94afa42949..aaa1d0cdafe7 100755 --- a/tests/queries/0_stateless/02421_new_type_json_empty_parts.sh +++ b/tests/queries/0_stateless/02421_new_type_json_empty_parts.sh @@ -11,7 +11,7 @@ CURDIR=$(cd "$(dirname "${BASH_SOURCE[0]}")" && pwd) ${CLICKHOUSE_CLIENT} -q "DROP TABLE IF EXISTS t_json_empty_parts;" ${CLICKHOUSE_CLIENT} -q "SELECT 'Collapsing';" -${CLICKHOUSE_CLIENT} -q "CREATE TABLE t_json_empty_parts (id UInt64, s Int8, data JSON) ENGINE = CollapsingMergeTree(s) ORDER BY id SETTINGS old_parts_lifetime=5;" --enable_json_type 1 +${CLICKHOUSE_CLIENT} -q "CREATE TABLE t_json_empty_parts (id UInt64, s Int8, data JSON) ENGINE = CollapsingMergeTree(s) ORDER BY id SETTINGS old_parts_lifetime=5, merge_tree_clear_old_parts_interval_seconds = 1, cleanup_delay_period = 1, cleanup_delay_period_random_add = 0, cleanup_thread_preferred_points_per_iteration = 0;" --enable_json_type 1 ${CLICKHOUSE_CLIENT} -q "INSERT INTO t_json_empty_parts VALUES (1, 1, '{\"k1\": \"aaa\"}') (1, -1, '{\"k2\": \"bbb\"}');" ${CLICKHOUSE_CLIENT} -q "SELECT count() FROM t_json_empty_parts;" ${CLICKHOUSE_CLIENT} -q "SELECT count() FROM system.parts WHERE table = 't_json_empty_parts' AND database = currentDatabase() AND active;" @@ -19,7 +19,7 @@ ${CLICKHOUSE_CLIENT} -q "SELECT DISTINCT arrayJoin(JSONAllPathsWithTypes(data)) ${CLICKHOUSE_CLIENT} -q "DROP TABLE IF EXISTS t_json_empty_parts;" ${CLICKHOUSE_CLIENT} -q "SELECT 'DELETE all';" -${CLICKHOUSE_CLIENT} -q "CREATE TABLE t_json_empty_parts (id UInt64, data JSON) ENGINE = MergeTree ORDER BY id SETTINGS old_parts_lifetime=5;" --enable_json_type 1 +${CLICKHOUSE_CLIENT} -q "CREATE TABLE t_json_empty_parts (id UInt64, data JSON) ENGINE = MergeTree ORDER BY id SETTINGS old_parts_lifetime=5, merge_tree_clear_old_parts_interval_seconds = 1, cleanup_delay_period = 1, cleanup_delay_period_random_add = 0, cleanup_thread_preferred_points_per_iteration = 0;" --enable_json_type 1 ${CLICKHOUSE_CLIENT} -q "INSERT INTO t_json_empty_parts VALUES (1, '{\"k1\": \"aaa\"}') (1, '{\"k2\": \"bbb\"}');" ${CLICKHOUSE_CLIENT} -q "SELECT count() FROM t_json_empty_parts;" ${CLICKHOUSE_CLIENT} -q "SELECT count() FROM system.parts WHERE table = 't_json_empty_parts' AND database = currentDatabase() AND active;" @@ -32,7 +32,7 @@ ${CLICKHOUSE_CLIENT} -q "SELECT DISTINCT arrayJoin(JSONAllPathsWithTypes(data)) ${CLICKHOUSE_CLIENT} -q "DROP TABLE IF EXISTS t_json_empty_parts;" ${CLICKHOUSE_CLIENT} -q "SELECT 'TTL';" -${CLICKHOUSE_CLIENT} -q "CREATE TABLE t_json_empty_parts (id UInt64, d Date, data JSON) ENGINE = MergeTree ORDER BY id TTL d WHERE id % 2 = 1 SETTINGS old_parts_lifetime=5;" --enable_json_type 1 +${CLICKHOUSE_CLIENT} -q "CREATE TABLE t_json_empty_parts (id UInt64, d Date, data JSON) ENGINE = MergeTree ORDER BY id TTL d WHERE id % 2 = 1 SETTINGS old_parts_lifetime=5, merge_tree_clear_old_parts_interval_seconds = 1, cleanup_delay_period = 1, cleanup_delay_period_random_add = 0, cleanup_thread_preferred_points_per_iteration = 0;" --enable_json_type 1 ${CLICKHOUSE_CLIENT} -q "INSERT INTO t_json_empty_parts VALUES (1, '2000-01-01', '{\"k1\": \"aaa\"}') (2, '2000-01-01', '{\"k2\": \"bbb\"}');" ${CLICKHOUSE_CLIENT} -q "OPTIMIZE TABLE t_json_empty_parts FINAL;" ${CLICKHOUSE_CLIENT} -q "SELECT count() FROM t_json_empty_parts;" diff --git a/tests/queries/0_stateless/03008_s3_plain_rewritable_fault.sh b/tests/queries/0_stateless/03008_s3_plain_rewritable_fault.sh index 514092c40f54..e9d17d0bed3d 100755 --- a/tests/queries/0_stateless/03008_s3_plain_rewritable_fault.sh +++ b/tests/queries/0_stateless/03008_s3_plain_rewritable_fault.sh @@ -64,7 +64,7 @@ SETTINGS endpoint = 'http://localhost:11111/test/03008_test_s3_mt_fault/', access_key_id = clickhouse, secret_access_key = clickhouse), - old_parts_lifetime = 1; + old_parts_lifetime = 1, merge_tree_clear_old_parts_interval_seconds = 1, cleanup_delay_period = 1, cleanup_delay_period_random_add = 0, cleanup_thread_preferred_points_per_iteration = 0; " ${CLICKHOUSE_CLIENT} --query " diff --git a/tests/queries/0_stateless/03198_unload_primary_key_outdated.sh b/tests/queries/0_stateless/03198_unload_primary_key_outdated.sh index f43fd6bc310c..edcdaf873ada 100755 --- a/tests/queries/0_stateless/03198_unload_primary_key_outdated.sh +++ b/tests/queries/0_stateless/03198_unload_primary_key_outdated.sh @@ -9,7 +9,7 @@ $CLICKHOUSE_CLIENT " CREATE TABLE t_unload_primary_key (a UInt64, b UInt64) ENGINE = MergeTree ORDER BY a - SETTINGS old_parts_lifetime = 10000, use_primary_key_cache = 0; + SETTINGS old_parts_lifetime = 10000, use_primary_key_cache = 0, merge_tree_clear_old_parts_interval_seconds = 1, cleanup_delay_period = 1, cleanup_delay_period_random_add = 0, cleanup_thread_preferred_points_per_iteration = 0; INSERT INTO t_unload_primary_key VALUES (1, 1); diff --git a/tests/queries/0_stateless/03305_mutations_counters.sh b/tests/queries/0_stateless/03305_mutations_counters.sh index 4b3bdd5f5e98..32b5d6ba49db 100755 --- a/tests/queries/0_stateless/03305_mutations_counters.sh +++ b/tests/queries/0_stateless/03305_mutations_counters.sh @@ -26,7 +26,7 @@ function wait_for_mutation_cleanup() $CLICKHOUSE_CLIENT --query " DROP TABLE IF EXISTS t_mutations_counters; - CREATE TABLE t_mutations_counters (a UInt64, b UInt64) ENGINE = MergeTree ORDER BY a; + CREATE TABLE t_mutations_counters (a UInt64, b UInt64) ENGINE = MergeTree ORDER BY a SETTINGS cleanup_delay_period = 1, cleanup_delay_period_random_add = 0, cleanup_thread_preferred_points_per_iteration = 0; INSERT INTO t_mutations_counters VALUES (1, 2) (2, 3); diff --git a/tests/queries/0_stateless/03305_rename_mutations_counter.sh b/tests/queries/0_stateless/03305_rename_mutations_counter.sh index 8f204453da46..cebc7fe55c5e 100755 --- a/tests/queries/0_stateless/03305_rename_mutations_counter.sh +++ b/tests/queries/0_stateless/03305_rename_mutations_counter.sh @@ -26,7 +26,7 @@ function wait_for_mutation_cleanup() $CLICKHOUSE_CLIENT --query " DROP TABLE IF EXISTS t_mutations_counters_rename; - CREATE TABLE t_mutations_counters_rename (a UInt64, b UInt64) ENGINE = MergeTree ORDER BY a; + CREATE TABLE t_mutations_counters_rename (a UInt64, b UInt64) ENGINE = MergeTree ORDER BY a SETTINGS cleanup_delay_period = 1, cleanup_delay_period_random_add = 0, cleanup_thread_preferred_points_per_iteration = 0; INSERT INTO t_mutations_counters_rename VALUES (1, 2) (2, 3); diff --git a/tests/queries/0_stateless/03357_replacing_min_age_cleanup.sh b/tests/queries/0_stateless/03357_replacing_min_age_cleanup.sh index 5c1557d8e580..831122d43a4a 100755 --- a/tests/queries/0_stateless/03357_replacing_min_age_cleanup.sh +++ b/tests/queries/0_stateless/03357_replacing_min_age_cleanup.sh @@ -26,7 +26,7 @@ SET alter_sync = 2; DROP TABLE IF EXISTS replacing; -CREATE TABLE replacing (key int, value int, version int, deleted UInt8) ENGINE = ReplacingMergeTree(version, deleted) ORDER BY key SETTINGS merge_tree_clear_old_parts_interval_seconds = 1; +CREATE TABLE replacing (key int, value int, version int, deleted UInt8) ENGINE = ReplacingMergeTree(version, deleted) ORDER BY key SETTINGS merge_tree_clear_old_parts_interval_seconds = 1, cleanup_delay_period = 1, cleanup_delay_period_random_add = 0, cleanup_thread_preferred_points_per_iteration = 0; INSERT INTO replacing VALUES (1, 1, 1, 0), (1, 1, 2, 1); @@ -59,6 +59,9 @@ CREATE TABLE replacing2 (key int, value int, version int, deleted UInt8) ENGINE SETTINGS allow_experimental_replacing_merge_with_cleanup = true, enable_replacing_merge_with_cleanup_for_min_age_to_force_merge = true, merge_tree_clear_old_parts_interval_seconds = 1, + cleanup_delay_period = 1, + cleanup_delay_period_random_add = 0, + cleanup_thread_preferred_points_per_iteration = 0, number_of_free_entries_in_pool_to_execute_optimize_entire_partition = 1, min_age_to_force_merge_on_partition_only = true, min_age_to_force_merge_seconds = 1, diff --git a/tests/queries/0_stateless/03742_clean_up_thread_for_merge_tree.reference b/tests/queries/0_stateless/03742_clean_up_thread_for_merge_tree.reference new file mode 100644 index 000000000000..f188ceda5ef4 --- /dev/null +++ b/tests/queries/0_stateless/03742_clean_up_thread_for_merge_tree.reference @@ -0,0 +1 @@ +OK: parts count reached 1 diff --git a/tests/queries/0_stateless/03742_clean_up_thread_for_merge_tree.sh b/tests/queries/0_stateless/03742_clean_up_thread_for_merge_tree.sh new file mode 100755 index 000000000000..a6509ef22390 --- /dev/null +++ b/tests/queries/0_stateless/03742_clean_up_thread_for_merge_tree.sh @@ -0,0 +1,59 @@ +#!/usr/bin/env bash +# Tags: no-parallel +# Tag no-parallel: Fails due to failpoint intersection + +CUR_DIR=$(cd "$(dirname "${BASH_SOURCE[0]}")" && pwd) +# shellcheck source=../shell_config.sh +. "$CUR_DIR"/../shell_config.sh + +on_exit() { + $CLICKHOUSE_CLIENT --query "SYSTEM DISABLE FAILPOINT storage_merge_tree_background_schedule_merge_fail;" +} + +trap on_exit EXIT + +# Prepare +$CLICKHOUSE_CLIENT --query " + DROP TABLE IF EXISTS m; + + CREATE TABLE m ( + i Int32 + ) ENGINE = MergeTree + ORDER BY i + SETTINGS old_parts_lifetime = 1, merge_tree_clear_old_parts_interval_seconds = 1, cleanup_delay_period = 1, cleanup_delay_period_random_add = 0, cleanup_thread_preferred_points_per_iteration = 0; + + SYSTEM ENABLE FAILPOINT storage_merge_tree_background_schedule_merge_fail; + + INSERT INTO m VALUES (1); + INSERT INTO m VALUES (2); + + OPTIMIZE TABLE m FINAL; +" + +function parts_count() { + $CLICKHOUSE_CLIENT --query "SELECT count() FROM system.parts WHERE database = currentDatabase() AND table = 'm';" +} + +# Wait up to 60 seconds until count = 1 +ok=0 +for _ in $(seq 1 60); do + CNT=$(parts_count) + if [ "$CNT" -eq 1 ]; then + ok=1 + break + fi + + sleep 1 +done + +if [ "$ok" -eq 1 ]; then + echo "OK: parts count reached 1" +else + echo "FAIL: parts count never reached 1 within 60 seconds" +fi + +$CLICKHOUSE_CLIENT --query "DROP TABLE IF EXISTS m;" + +$CLICKHOUSE_CLIENT --query " + DROP TABLE IF EXISTS m; +" diff --git a/tests/queries/0_stateless/03745_system_background_schedule_pool.reference b/tests/queries/0_stateless/03745_system_background_schedule_pool.reference new file mode 100644 index 000000000000..91958bf0cb1e --- /dev/null +++ b/tests/queries/0_stateless/03745_system_background_schedule_pool.reference @@ -0,0 +1,5 @@ +1 +buffer_flush default test_buffer_03745 1 StorageBuffer (default.test_buffer_03745)/Bg +schedule default test_merge_tree_03745 1 BackgroundJobsAssignee:DataProcessing +schedule default test_merge_tree_03745 1 default.test_merge_tree_03745 (CleanupThread) +distributed default test_distributed_03745 1 default.test_distributed_03745.DistributedInsertQueue.default/Bg diff --git a/tests/queries/0_stateless/03745_system_background_schedule_pool.sql b/tests/queries/0_stateless/03745_system_background_schedule_pool.sql new file mode 100644 index 000000000000..7824732f22d4 --- /dev/null +++ b/tests/queries/0_stateless/03745_system_background_schedule_pool.sql @@ -0,0 +1,32 @@ +SELECT count() >= 0 AS has_tasks FROM system.background_schedule_pool; + +-- Test 1: Buffer table (buffer_flush pool) +DROP TABLE IF EXISTS test_table_03745; +DROP TABLE IF EXISTS test_buffer_03745; + +CREATE TABLE test_table_03745 (x UInt64) ENGINE = Memory; +CREATE TABLE test_buffer_03745 (x UInt64) ENGINE = Buffer(currentDatabase(), test_table_03745, 1, 10, 100, 10000, 1000000, 10000000, 100000000); +INSERT INTO test_buffer_03745 VALUES (1), (2), (3); +SELECT pool, database, table, table_uuid != toUUIDOrDefault(0) AS has_uuid, log_name FROM system.background_schedule_pool WHERE database = currentDatabase() ORDER BY ALL; +DROP TABLE test_buffer_03745; +DROP TABLE test_table_03745; + +-- Test 2: MergeTree table (schedule pool) +DROP TABLE IF EXISTS test_merge_tree_03745; +CREATE TABLE test_merge_tree_03745 (x UInt64, y String) ENGINE = MergeTree() ORDER BY x SETTINGS refresh_statistics_interval = '0'; +INSERT INTO test_merge_tree_03745 VALUES (1, 'a'), (2, 'b'); +SELECT pool, database, table, table_uuid != toUUIDOrDefault(0) AS has_uuid, log_name FROM system.background_schedule_pool WHERE database = currentDatabase() ORDER BY ALL; +DROP TABLE test_merge_tree_03745; + +-- Test 3: Distributed table (distributed pool) +DROP TABLE IF EXISTS test_local_03745; +DROP TABLE IF EXISTS test_distributed_03745; +CREATE TABLE test_local_03745 (x UInt64) ENGINE = Memory; +-- Use a longer sleep time to keep the task in delayed_tasks longer, avoiding race with getTasks() +CREATE TABLE test_distributed_03745 (x UInt64) ENGINE = Distributed(test_shard_localhost, currentDatabase(), test_local_03745) SETTINGS background_insert_sleep_time_ms = 10000; +SYSTEM STOP DISTRIBUTED SENDS test_distributed_03745; +-- Pool is created only for async INSERTs +INSERT INTO test_distributed_03745 SETTINGS prefer_localhost_replica=0, distributed_foreground_insert=0 VALUES (1), (2), (3); +SELECT pool, database, table, table_uuid != toUUIDOrDefault(0) AS has_uuid, log_name FROM system.background_schedule_pool WHERE database = currentDatabase() ORDER BY ALL; +DROP TABLE test_distributed_03745; +DROP TABLE test_local_03745;