From 06b4edea156a9b9ecd6b58aaf176958539ca86ee Mon Sep 17 00:00:00 2001 From: Anton Ivashkin Date: Mon, 9 Mar 2026 17:31:16 +0100 Subject: [PATCH] Fix file identifier in rescheduleTasksFromReplica --- ...rageObjectStorageStableTaskDistributor.cpp | 38 +++++++++++-------- ...torageObjectStorageStableTaskDistributor.h | 2 + 2 files changed, 24 insertions(+), 16 deletions(-) diff --git a/src/Storages/ObjectStorage/StorageObjectStorageStableTaskDistributor.cpp b/src/Storages/ObjectStorage/StorageObjectStorageStableTaskDistributor.cpp index 13e3721a076e..8700ddad19a0 100644 --- a/src/Storages/ObjectStorage/StorageObjectStorageStableTaskDistributor.cpp +++ b/src/Storages/ObjectStorage/StorageObjectStorageStableTaskDistributor.cpp @@ -125,7 +125,7 @@ ObjectInfoPtr StorageObjectStorageStableTaskDistributor::getPreQueuedFile(size_t auto next_file = files.back(); files.pop_back(); - auto file_identifier = send_over_whole_archive ? next_file->getPathOrPathToArchiveIfArchive() : getAbsolutePathFromObjectInfo(next_file).value_or(next_file->getIdentifier()); + auto file_identifier = getFileIdentifier(next_file); auto it = unprocessed_files.find(file_identifier); if (it == unprocessed_files.end()) continue; @@ -170,18 +170,7 @@ ObjectInfoPtr StorageObjectStorageStableTaskDistributor::getMatchingFileFromIter } } - String file_identifier; - if (send_over_whole_archive && object_info->isArchive()) - { - file_identifier = object_info->getPathOrPathToArchiveIfArchive(); - LOG_TEST(log, "Will send over the whole archive {} to replicas. " - "This will be suboptimal, consider turning on " - "cluster_function_process_archive_on_multiple_nodes setting", file_identifier); - } - else - { - file_identifier = getAbsolutePathFromObjectInfo(object_info).value_or(object_info->getIdentifier()); - } + String file_identifier = getFileIdentifier(object_info, true); if (iceberg_read_optimization_enabled) { @@ -250,7 +239,7 @@ ObjectInfoPtr StorageObjectStorageStableTaskDistributor::getAnyUnprocessedFile(s auto next_file = it->second.first; unprocessed_files.erase(it); - auto file_path = send_over_whole_archive ? next_file->getPathOrPathToArchiveIfArchive() : getAbsolutePathFromObjectInfo(next_file).value_or(next_file->getPath()); + auto file_path = getFileIdentifier(next_file); LOG_TRACE( log, "Iterator exhausted. Assigning unprocessed file {} to replica {} from matched replica {}", @@ -312,11 +301,28 @@ void StorageObjectStorageStableTaskDistributor::rescheduleTasksFromReplica(size_ for (const auto & file : processed_file_list_ptr->second) { - auto file_replica_idx = getReplicaForFile(file->getPath()); - unprocessed_files.emplace(file->getPath(), std::make_pair(file, file_replica_idx)); + auto file_identifier = getFileIdentifier(file); + auto file_replica_idx = getReplicaForFile(file_identifier); + unprocessed_files.emplace(file_identifier, std::make_pair(file, file_replica_idx)); connection_to_files[file_replica_idx].push_back(file); } replica_to_files_to_be_processed.erase(number_of_current_replica); } +String StorageObjectStorageStableTaskDistributor::getFileIdentifier(ObjectInfoPtr file_object, bool write_to_log) const +{ + if (send_over_whole_archive && file_object->isArchive()) + { + auto file_identifier = file_object->getPathOrPathToArchiveIfArchive(); + if (write_to_log) + { + LOG_TEST(log, "Will send over the whole archive {} to replicas. " + "This will be suboptimal, consider turning on " + "cluster_function_process_archive_on_multiple_nodes setting", file_identifier); + } + return file_identifier; + } + return getAbsolutePathFromObjectInfo(file_object).value_or(file_object->getIdentifier()); +} + } diff --git a/src/Storages/ObjectStorage/StorageObjectStorageStableTaskDistributor.h b/src/Storages/ObjectStorage/StorageObjectStorageStableTaskDistributor.h index 0cd10ac7188e..3a5a16998be3 100644 --- a/src/Storages/ObjectStorage/StorageObjectStorageStableTaskDistributor.h +++ b/src/Storages/ObjectStorage/StorageObjectStorageStableTaskDistributor.h @@ -41,6 +41,8 @@ class StorageObjectStorageStableTaskDistributor void saveLastNodeActivity(size_t number_of_current_replica); + String getFileIdentifier(ObjectInfoPtr file_object, bool write_to_log = false) const; + const std::shared_ptr iterator; const bool send_over_whole_archive;