Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -125,7 +125,7 @@ ObjectInfoPtr StorageObjectStorageStableTaskDistributor::getPreQueuedFile(size_t
auto next_file = files.back();
files.pop_back();

auto file_identifier = send_over_whole_archive ? next_file->getPathOrPathToArchiveIfArchive() : getAbsolutePathFromObjectInfo(next_file).value_or(next_file->getIdentifier());
auto file_identifier = getFileIdentifier(next_file);
auto it = unprocessed_files.find(file_identifier);
if (it == unprocessed_files.end())
continue;
Expand Down Expand Up @@ -170,18 +170,7 @@ ObjectInfoPtr StorageObjectStorageStableTaskDistributor::getMatchingFileFromIter
}
}

String file_identifier;
if (send_over_whole_archive && object_info->isArchive())
{
file_identifier = object_info->getPathOrPathToArchiveIfArchive();
LOG_TEST(log, "Will send over the whole archive {} to replicas. "
"This will be suboptimal, consider turning on "
"cluster_function_process_archive_on_multiple_nodes setting", file_identifier);
}
else
{
file_identifier = getAbsolutePathFromObjectInfo(object_info).value_or(object_info->getIdentifier());
}
String file_identifier = getFileIdentifier(object_info, true);

if (iceberg_read_optimization_enabled)
{
Expand Down Expand Up @@ -250,7 +239,7 @@ ObjectInfoPtr StorageObjectStorageStableTaskDistributor::getAnyUnprocessedFile(s
auto next_file = it->second.first;
unprocessed_files.erase(it);

auto file_path = send_over_whole_archive ? next_file->getPathOrPathToArchiveIfArchive() : getAbsolutePathFromObjectInfo(next_file).value_or(next_file->getPath());
auto file_path = getFileIdentifier(next_file);
LOG_TRACE(
log,
"Iterator exhausted. Assigning unprocessed file {} to replica {} from matched replica {}",
Expand Down Expand Up @@ -312,11 +301,28 @@ void StorageObjectStorageStableTaskDistributor::rescheduleTasksFromReplica(size_

for (const auto & file : processed_file_list_ptr->second)
{
auto file_replica_idx = getReplicaForFile(file->getPath());
unprocessed_files.emplace(file->getPath(), std::make_pair(file, file_replica_idx));
auto file_identifier = getFileIdentifier(file);
auto file_replica_idx = getReplicaForFile(file_identifier);
unprocessed_files.emplace(file_identifier, std::make_pair(file, file_replica_idx));
connection_to_files[file_replica_idx].push_back(file);
}
replica_to_files_to_be_processed.erase(number_of_current_replica);
}

String StorageObjectStorageStableTaskDistributor::getFileIdentifier(ObjectInfoPtr file_object, bool write_to_log) const
{
if (send_over_whole_archive && file_object->isArchive())
{
auto file_identifier = file_object->getPathOrPathToArchiveIfArchive();
if (write_to_log)
{
LOG_TEST(log, "Will send over the whole archive {} to replicas. "
"This will be suboptimal, consider turning on "
"cluster_function_process_archive_on_multiple_nodes setting", file_identifier);
}
return file_identifier;
}
return getAbsolutePathFromObjectInfo(file_object).value_or(file_object->getIdentifier());
}

}
Original file line number Diff line number Diff line change
Expand Up @@ -41,6 +41,8 @@ class StorageObjectStorageStableTaskDistributor

void saveLastNodeActivity(size_t number_of_current_replica);

String getFileIdentifier(ObjectInfoPtr file_object, bool write_to_log = false) const;

const std::shared_ptr<IObjectIterator> iterator;
const bool send_over_whole_archive;

Expand Down
Loading