From d01eb1575f46a1583ea52142faf6f5e0e22e6b3f Mon Sep 17 00:00:00 2001 From: Anton Ivashkin Date: Tue, 10 Mar 2026 10:42:45 +0100 Subject: [PATCH 1/2] Fix iterator locking --- .../ObjectStorage/DataLakes/Iceberg/IcebergIterator.cpp | 4 +++- src/Storages/ObjectStorage/IObjectIterator.h | 4 ++++ 2 files changed, 7 insertions(+), 1 deletion(-) diff --git a/src/Storages/ObjectStorage/DataLakes/Iceberg/IcebergIterator.cpp b/src/Storages/ObjectStorage/DataLakes/Iceberg/IcebergIterator.cpp index a3f635072fc0..0aa2b83c5ae5 100644 --- a/src/Storages/ObjectStorage/DataLakes/Iceberg/IcebergIterator.cpp +++ b/src/Storages/ObjectStorage/DataLakes/Iceberg/IcebergIterator.cpp @@ -319,8 +319,10 @@ IcebergIterator::IcebergIterator( std::sort(equality_deletes_files.begin(), equality_deletes_files.end()); std::sort(position_deletes_files.begin(), position_deletes_files.end()); producer_task.emplace( - [this]() + [this, thread_group = DB::CurrentThread::getGroup()]() { + ThreadGroupSwitcher switcher(thread_group, "IcebergKeysIterator"); + while (!blocking_queue.isFinished()) { std::optional entry; diff --git a/src/Storages/ObjectStorage/IObjectIterator.h b/src/Storages/ObjectStorage/IObjectIterator.h index 47e603a36dc8..1854e0cde422 100644 --- a/src/Storages/ObjectStorage/IObjectIterator.h +++ b/src/Storages/ObjectStorage/IObjectIterator.h @@ -41,6 +41,8 @@ class ObjectIteratorWithPathAndFileFilter : public IObjectIterator, private With size_t estimatedKeysCount() override { return iterator->estimatedKeysCount(); } std::optional getSnapshotVersion() const override { return iterator->getSnapshotVersion(); } + bool has_concurrent_next() const override { return iterator->has_concurrent_next(); } + private: const ObjectIterator iterator; const std::string object_namespace; @@ -62,6 +64,8 @@ class ObjectIteratorSplitByBuckets : public IObjectIterator, private WithContext size_t estimatedKeysCount() override { return iterator->estimatedKeysCount(); } std::optional getSnapshotVersion() const override { return iterator->getSnapshotVersion(); } + bool has_concurrent_next() const override { return iterator->has_concurrent_next(); } + private: const ObjectIterator iterator; String format; From 55cef3c1356f5fd6248539f9c9c53b29272eb3b6 Mon Sep 17 00:00:00 2001 From: Anton Ivashkin Date: Tue, 10 Mar 2026 11:25:47 +0100 Subject: [PATCH 2/2] Fix --- .../ObjectStorage/DataLakes/Iceberg/IcebergIterator.cpp | 2 +- src/Storages/ObjectStorage/IObjectIterator.cpp | 3 +++ src/Storages/ObjectStorage/IObjectIterator.h | 3 ++- 3 files changed, 6 insertions(+), 2 deletions(-) diff --git a/src/Storages/ObjectStorage/DataLakes/Iceberg/IcebergIterator.cpp b/src/Storages/ObjectStorage/DataLakes/Iceberg/IcebergIterator.cpp index 0aa2b83c5ae5..dcb4c48f73a3 100644 --- a/src/Storages/ObjectStorage/DataLakes/Iceberg/IcebergIterator.cpp +++ b/src/Storages/ObjectStorage/DataLakes/Iceberg/IcebergIterator.cpp @@ -321,7 +321,7 @@ IcebergIterator::IcebergIterator( producer_task.emplace( [this, thread_group = DB::CurrentThread::getGroup()]() { - ThreadGroupSwitcher switcher(thread_group, "IcebergKeysIterator"); + ThreadGroupSwitcher switcher(thread_group, "IcebergKeys"); while (!blocking_queue.isFinished()) { diff --git a/src/Storages/ObjectStorage/IObjectIterator.cpp b/src/Storages/ObjectStorage/IObjectIterator.cpp index 08714909e63d..1ee214526429 100644 --- a/src/Storages/ObjectStorage/IObjectIterator.cpp +++ b/src/Storages/ObjectStorage/IObjectIterator.cpp @@ -91,6 +91,9 @@ ObjectIteratorSplitByBuckets::ObjectIteratorSplitByBuckets( ObjectInfoPtr ObjectIteratorSplitByBuckets::next(size_t id) { + /// pending_objects_info is not thread-safe + std::lock_guard lock(mutex); + if (!pending_objects_info.empty()) { auto result = pending_objects_info.front(); diff --git a/src/Storages/ObjectStorage/IObjectIterator.h b/src/Storages/ObjectStorage/IObjectIterator.h index 1854e0cde422..82abf44896f3 100644 --- a/src/Storages/ObjectStorage/IObjectIterator.h +++ b/src/Storages/ObjectStorage/IObjectIterator.h @@ -64,13 +64,14 @@ class ObjectIteratorSplitByBuckets : public IObjectIterator, private WithContext size_t estimatedKeysCount() override { return iterator->estimatedKeysCount(); } std::optional getSnapshotVersion() const override { return iterator->getSnapshotVersion(); } - bool has_concurrent_next() const override { return iterator->has_concurrent_next(); } + bool has_concurrent_next() const override { return true; } private: const ObjectIterator iterator; String format; ObjectStoragePtr object_storage; FormatSettings format_settings; + std::mutex mutex; std::queue pending_objects_info; const LoggerPtr log = getLogger("GlobIterator");