Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -319,8 +319,10 @@ IcebergIterator::IcebergIterator(
std::sort(equality_deletes_files.begin(), equality_deletes_files.end());
std::sort(position_deletes_files.begin(), position_deletes_files.end());
producer_task.emplace(
[this]()
[this, thread_group = DB::CurrentThread::getGroup()]()
{
ThreadGroupSwitcher switcher(thread_group, "IcebergKeys");

while (!blocking_queue.isFinished())
{
std::optional<ManifestFileEntry> entry;
Expand Down
3 changes: 3 additions & 0 deletions src/Storages/ObjectStorage/IObjectIterator.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -91,6 +91,9 @@ ObjectIteratorSplitByBuckets::ObjectIteratorSplitByBuckets(

ObjectInfoPtr ObjectIteratorSplitByBuckets::next(size_t id)
{
/// pending_objects_info is not thread-safe
std::lock_guard<std::mutex> lock(mutex);

if (!pending_objects_info.empty())
{
auto result = pending_objects_info.front();
Expand Down
5 changes: 5 additions & 0 deletions src/Storages/ObjectStorage/IObjectIterator.h
Original file line number Diff line number Diff line change
Expand Up @@ -41,6 +41,8 @@ class ObjectIteratorWithPathAndFileFilter : public IObjectIterator, private With
size_t estimatedKeysCount() override { return iterator->estimatedKeysCount(); }
std::optional<UInt64> getSnapshotVersion() const override { return iterator->getSnapshotVersion(); }

bool has_concurrent_next() const override { return iterator->has_concurrent_next(); }

private:
const ObjectIterator iterator;
const std::string object_namespace;
Expand All @@ -62,11 +64,14 @@ class ObjectIteratorSplitByBuckets : public IObjectIterator, private WithContext
size_t estimatedKeysCount() override { return iterator->estimatedKeysCount(); }
std::optional<UInt64> getSnapshotVersion() const override { return iterator->getSnapshotVersion(); }

bool has_concurrent_next() const override { return true; }

private:
const ObjectIterator iterator;
String format;
ObjectStoragePtr object_storage;
FormatSettings format_settings;
std::mutex mutex;

std::queue<ObjectInfoPtr> pending_objects_info;
const LoggerPtr log = getLogger("GlobIterator");
Expand Down
Loading