diff --git a/src/Storages/ObjectStorage/DataLakes/Iceberg/IcebergIterator.cpp b/src/Storages/ObjectStorage/DataLakes/Iceberg/IcebergIterator.cpp index a3f635072fc0..dcb4c48f73a3 100644 --- a/src/Storages/ObjectStorage/DataLakes/Iceberg/IcebergIterator.cpp +++ b/src/Storages/ObjectStorage/DataLakes/Iceberg/IcebergIterator.cpp @@ -319,8 +319,10 @@ IcebergIterator::IcebergIterator( std::sort(equality_deletes_files.begin(), equality_deletes_files.end()); std::sort(position_deletes_files.begin(), position_deletes_files.end()); producer_task.emplace( - [this]() + [this, thread_group = DB::CurrentThread::getGroup()]() { + ThreadGroupSwitcher switcher(thread_group, "IcebergKeys"); + while (!blocking_queue.isFinished()) { std::optional entry; diff --git a/src/Storages/ObjectStorage/IObjectIterator.cpp b/src/Storages/ObjectStorage/IObjectIterator.cpp index 08714909e63d..1ee214526429 100644 --- a/src/Storages/ObjectStorage/IObjectIterator.cpp +++ b/src/Storages/ObjectStorage/IObjectIterator.cpp @@ -91,6 +91,9 @@ ObjectIteratorSplitByBuckets::ObjectIteratorSplitByBuckets( ObjectInfoPtr ObjectIteratorSplitByBuckets::next(size_t id) { + /// pending_objects_info is not thread-safe + std::lock_guard lock(mutex); + if (!pending_objects_info.empty()) { auto result = pending_objects_info.front(); diff --git a/src/Storages/ObjectStorage/IObjectIterator.h b/src/Storages/ObjectStorage/IObjectIterator.h index 47e603a36dc8..82abf44896f3 100644 --- a/src/Storages/ObjectStorage/IObjectIterator.h +++ b/src/Storages/ObjectStorage/IObjectIterator.h @@ -41,6 +41,8 @@ class ObjectIteratorWithPathAndFileFilter : public IObjectIterator, private With size_t estimatedKeysCount() override { return iterator->estimatedKeysCount(); } std::optional getSnapshotVersion() const override { return iterator->getSnapshotVersion(); } + bool has_concurrent_next() const override { return iterator->has_concurrent_next(); } + private: const ObjectIterator iterator; const std::string object_namespace; @@ -62,11 +64,14 @@ class ObjectIteratorSplitByBuckets : public IObjectIterator, private WithContext size_t estimatedKeysCount() override { return iterator->estimatedKeysCount(); } std::optional getSnapshotVersion() const override { return iterator->getSnapshotVersion(); } + bool has_concurrent_next() const override { return true; } + private: const ObjectIterator iterator; String format; ObjectStoragePtr object_storage; FormatSettings format_settings; + std::mutex mutex; std::queue pending_objects_info; const LoggerPtr log = getLogger("GlobIterator");