Improvement locking Iceberg iterator#1436
Conversation
|
@codex review |
|
Codex Review: Didn't find any major issues. You're on a roll. ℹ️ About Codex in GitHubYour team has set up Codex to review pull requests in this repo. Reviews are triggered when you
If Codex has suggestions, it will comment; otherwise it will react with 👍. Codex can also answer questions or update the PR. Try commenting "@codex address that feedback". |
|
Why there is an |
|
Change make sense for 26.1 too, but good to approve that it fixed the issue before porting. |
|
We're trying to abandon 25.8 :) We release it as is (hopefully, today/tomorrow), idk if we will do further releases So I guess it is not a priority now (to verify something against 25.8, having in mind it will have to be re-verified later for 26.1) |
arthurpassos
left a comment
There was a problem hiding this comment.
LGTM, left a few nit comments.
Did you benchmark it to see if there is any practical difference?
| virtual ObjectInfoPtr next(size_t) = 0; | ||
| virtual size_t estimatedKeysCount() = 0; | ||
| virtual std::optional<UInt64> getSnapshotVersion() const { return std::nullopt; } | ||
| virtual bool has_concurrent_next() const { return false; } |
There was a problem hiding this comment.
Please add docs, it is not clear to the reader of this API what has_concurrent_next means. Plus, shouldn't it be hasConcurrentNext?
There was a problem hiding this comment.
Perhaps something more descriptive like hasLockingMechanismInNextOperation
| { | ||
| ObjectInfoPtr object_info; | ||
|
|
||
| if (iterator->has_concurrent_next()) |
There was a problem hiding this comment.
I would add a comment similar to the below:
In case
IObjectIterator::hasConcurrentNextis true, a locking mechanism is already in place inIObjectIterator::next. Therefore, there is no need to guard it withStorageObjectStorageStableTaskDistributor::mutex
This comment was marked as outdated.
This comment was marked as outdated.
|
Likely this was fixed in the branch. Let's try updating upon the latest changes |
Changelog category (leave one):
Improvement locking Iceberg iterator
Changelog entry (a user-readable short description of the changes that goes to CHANGELOG.md):
What is going on:
In IcebergCluster request each replica creates pull of threads, each thread try to get next task. Each thread calls callback
All threads share the same socket, only one send request, all others wait on mutex.
Requests from each replica are sending on initiator. Initiator processes them in multiple threads, but all threads calls StorageObjectStorageStableTaskDistributor::getNextTask. This method calls three other methods with lock mutex inside each
getPreQueuedFiletries to find path for replica in already extracted paths.getMatchingFileFromIteratorcallsIcebergIterator::nextto extract next path, calculate replica for path, if it is a replica which send request, send this path as response, if this path is for other replica, puts it in list forgetPreQueuedFilegetAnyUnprocessedFileexecuted only when all paths are extracted and more nothing for current replica.When
getMatchingFileFromIteratorcallsIcebergIterator::next.IcebergIterator::nextis built overConcurrentBoundedQueue,nextwaits until something is pushed into queue or queue is finished. So when queue is empty, all threads are waiting until something is happened.In separate thread
IcebergIteratorcallsSingleThreadIcebergKeysIterator::nextto extract next path from Iceberg metadata. It can take a long time, because of reading metadata from S3, pruning files, etc.Possible scenario:
StorageObjectStorageStableTaskDistributor::getNextTask, finds path for replica 2, put it into list, finds file for replica 1, return it.SingleThreadIcebergKeysIterator::next, and it takes a dozens of seconds.StorageObjectStorageStableTaskDistributorhas an object for replica 2, butStorageObjectStorageStableTaskDistributorlocked untilSingleThreadIcebergKeysIteratornot finished work.As result, replica 2 gets path only after processing all metadata.
Solution:
IcebergIteratoris built overConcurrentBoundedQueue, sonextmethod is thread-safe. It can be called without lockStorageObjectStorageStableTaskDistributor, and replica 2 can get prequeued path without waitingSingleThreadIcebergKeysIterator::next.CI/CD Options
Exclude tests:
Regression jobs to run: