Conversation
WalkthroughThis pull request introduces in-memory caching mechanisms (LRU and TTL-based) throughout the system and optimizes I/O operations by refactoring sequential object fetching into concurrent two-phase approaches across storage backends and manifest retrieval paths. Changes
Estimated code review effort🎯 4 (Complex) | ⏱️ ~50 minutes Possibly related PRs
Suggested reviewers
Poem
🚥 Pre-merge checks | ❌ 3❌ Failed checks (2 warnings, 1 inconclusive)
✏️ Tip: You can configure your own custom pre-merge checks in the settings. ✨ Finishing Touches🧪 Generate unit tests (beta)
Comment |
There was a problem hiding this comment.
Actionable comments posted: 7
Caution
Some comments are outside the diff and can’t be posted inline due to platform limitations.
⚠️ Outside diff range comments (2)
src/hottier.rs (1)
746-755:⚠️ Potential issue | 🟡 MinorPotential cache inconsistency when deleting directories.
Line 748 uses
remove_dir_allon the parent directory, which deletes all files in that directory, not just the single file being processed. However, line 755 only removes the one file (file_to_delete.file_path) from the cache. If the minute directory contains multiple parquet files, other files would be deleted from disk but remain in the cache as stale entries.This could cause
get_hot_tier_manifest_filesto incorrectly classify deleted files as available in hot tier.🔧 Suggested fix: Remove all cached entries for the deleted directory
+ // Build prefix for all files in the deleted directory + let deleted_dir = path_to_delete.parent().unwrap(); + if let Ok(rel_dir) = deleted_dir.strip_prefix(self.hot_tier_path) { + let prefix = rel_dir.to_string_lossy(); + self.cached_files.retain(|k, _| !k.starts_with(prefix.as_ref())); + } fs::remove_dir_all(path_to_delete.parent().unwrap()).await?; delete_empty_directory_hot_tier( path_to_delete.parent().unwrap().to_path_buf(), ) .await?; - // Remove deleted file from in-memory index - self.cached_files.remove(&file_to_delete.file_path);🤖 Prompt for AI Agents
Verify each finding against the current code and only fix it if needed. In `@src/hottier.rs` around lines 746 - 755, The code currently calls fs::remove_dir_all on path_to_delete.parent() but only removes the single file entry self.cached_files.remove(&file_to_delete.file_path), leaving other deleted parquet files stale in the cache; update the cleanup after remove_dir_all to remove all cached entries under that parent directory (e.g., iterate over self.cached_files keys and remove/retain only those not starting with the parent path or matching its children) so cached_files no longer contains entries for files deleted by remove_dir_all—this will keep get_hot_tier_manifest_files consistent with on-disk state; ensure you reference the parent directory obtained from path_to_delete.parent().unwrap() and perform the cache mutation on self.cached_files (using retain or explicit removal) in the same async context as delete_empty_directory_hot_tier.src/storage/localfs.rs (1)
295-303:⚠️ Potential issue | 🟡 MinorAvoid
expecton filename UTF-8 conversion in directory scans.Non-UTF8 entry names will cause a panic on this request path. Use
to_string_lossy()to handle all valid filesystem entries gracefully.🔧 Proposed fix
- .to_str() - .expect("file name is parseable to str") - .to_owned(); + .to_string_lossy() + .into_owned();🤖 Prompt for AI Agents
Verify each finding against the current code and only fix it if needed. In `@src/storage/localfs.rs` around lines 295 - 303, The code panics by calling expect() after file_name().to_str(); update the directory-scan path where you build name from entry (the let name = entry.path().file_name()...? sequence) to use to_string_lossy() instead of to_str().expect(...) so non-UTF8 filenames are handled safely, and keep the existing NoSuchKey error when file_name() is None; ensure name remains an owned String produced from the lossy conversion.
🧹 Nitpick comments (3)
src/hottier.rs (2)
289-328: Consider handling potential panics from the blocking task.The
JoinHandlefromspawn_blockingis immediately dropped, which means any panic insidewalk_dirwill be silently swallowed. While the comment explains the graceful degradation behavior (falling through to remote storage), panics during cache population could mask bugs or disk issues.Consider logging if the task fails:
♻️ Suggested improvement for observability
- drop(tokio::task::spawn_blocking(move || { + let handle = tokio::task::spawn_blocking(move || { fn walk_dir( dir: &Path, hot_tier_root: &Path, index: &DashMap<String, u64>, streams: &DashSet<String>, ) { // ... existing code ... } walk_dir(hot_tier_path, hot_tier_path, cached_files, enabled_streams); - })); + }); + tokio::spawn(async move { + if let Err(e) = handle.await { + tracing::warn!("Hot tier cache population task failed: {e}"); + } + });🤖 Prompt for AI Agents
Verify each finding against the current code and only fix it if needed. In `@src/hottier.rs` around lines 289 - 328, The blocking cache-population task is spawned and its JoinHandle dropped, so any panic inside walk_dir is swallowed; capture the handle returned by tokio::task::spawn_blocking (instead of drop(...)), then spawn an async waiter that awaits the handle and logs failures (e.g. using log::error!) if handle.await returns Err(JoinError) or the closure returned a Result::Err; reference the existing symbols walk_dir, hot_tier_path, cached_files, and enabled_streams and ensure the spawned waiter logs the JoinError and/or any internal error to make panics visible.
303-306: Silent error handling may hide disk issues.Errors from
std::fs::read_dirare silently ignored, which could mask permission problems or disk failures during startup cache population. Consider logging at debug/trace level:♻️ Suggested logging
let entries = match std::fs::read_dir(dir) { Ok(entries) => entries, - Err(_) => return, + Err(e) => { + tracing::trace!("Failed to read hot tier directory {}: {e}", dir.display()); + return; + } };🤖 Prompt for AI Agents
Verify each finding against the current code and only fix it if needed. In `@src/hottier.rs` around lines 303 - 306, The code silently returns when std::fs::read_dir(dir) fails (the match producing entries), which hides disk/permission errors; change Err(_) => return to capture the error (e.g. Err(e)) and log it at debug or trace level including the error and the dir path before returning so you still bail out but surface the underlying cause (modify the match around std::fs::read_dir(dir), referencing the entries binding and the dir variable, and use the crate's logger e.g. log::debug! or log::trace! with a message like "failed to read_dir {}: {:?}", dir, e).src/handlers/http/query.rs (1)
464-467: Deduplicatestreams_to_createbefore spawning tasks.If a query references the same stream multiple times (aliases/self-joins), duplicate create tasks can still run.
♻️ Proposed tweak
let streams_to_create: Vec<_> = streams .into_iter() .filter(|stream_name| PARSEABLE.get_stream(stream_name, tenant_id).is_err()) + .unique() .collect();🤖 Prompt for AI Agents
Verify each finding against the current code and only fix it if needed. In `@src/handlers/http/query.rs` around lines 464 - 467, The code builds streams_to_create by filtering stream names with PARSEABLE.get_stream(stream_name, tenant_id) but doesn't remove duplicates, causing redundant create tasks; update the logic that computes streams_to_create in query.rs (the variable streams_to_create) to deduplicate stream names before spawning tasks — e.g., collect into a HashSet (or use an ordered dedupe if order matters) or call .unique/.dedup on the iterator so each stream is created only once, then proceed to spawn tasks for the deduplicated collection.
🤖 Prompt for all review comments with AI agents
Verify each finding against the current code and only fix it if needed.
Inline comments:
In `@src/metastore/metastores/object_store_metastore.rs`:
- Around line 96-97: stream_json_cache currently uses an unbounded HashMap
inside Mutex in ObjectStoreMetastore, so entries only expire on lookup and the
map can grow forever; replace it with a bounded, TTL-aware cache (e.g., an
LruCache with a max capacity or a TTL-capable cache like moka/evmap) keyed by
the same String and storing CachedEntry<Bytes>, or wrap an LRU+expiration
structure to enforce capacity on insert and evict least-recently-used items;
update places that read/write stream_json_cache (the Mutex<HashMap>-accessing
code paths) to use the new cache API and add an optional background cleanup task
if using a TTL-only cache to proactively remove expired entries.
In `@src/query/stream_schema_provider.rs`:
- Around line 459-477: Unbounded concurrent fetches are created by mapping
manifest_items into manifest_futures and awaiting join_all; change this to a
bounded concurrency approach by streaming the items and applying a concurrency
limiter (e.g., futures::stream::iter(manifest_items).map(|item| async { ...
PARSEABLE.metastore.get_manifest(...) ...
}).buffer_unordered(CONCURRENCY_LIMIT).collect()). Replace the manifest_futures
+ join_all pattern with the buffered stream so PARSEABLE.metastore.get_manifest
calls are limited; pick a sensible CONCURRENCY_LIMIT constant and keep the
returned tuple shape (result, time_lower_bound, time_upper_bound) unchanged.
- Around line 394-407: The current loop in the column_statistics merge (inside
the for columns loop, in the Entry::Occupied branch) calls
entry.get_mut().take(), which clears the stored Option and drops prior stats
even when col.stats is None; instead, change the logic to read the existing
Option by reference (let existing_opt = entry.get_mut();), and only modify it
when col.stats is Some—if both existing_opt and col.stats are Some call
existing_opt = Some(existing.update(new_stats)), if existing_opt is None and
col.stats is Some set it to Some(new_stats), and if col.stats is None leave
existing_opt untouched; this preserves prior stats when incoming stats are
missing and uses the same symbols column_statistics, columns, col.stats, and
entry.get_mut() to locate the code.
- Around line 545-563: When PARSEABLE.options.mode is Query or Prism, do not
ignore errors from metastore.get_all_stream_jsons or from serde_json::from_slice
for ObjectStoreFormat: if get_all_stream_jsons returns Err, return or propagate
that error instead of continuing with Snapshot::default, and if
serde_json::from_slice fails for any returned object, surface that parsing error
(or at least return a descriptive error including the offending object/stream)
rather than skipping it; update the logic around get_all_stream_jsons,
ObjectStoreFormat deserialization, and the time_partition/snapshot assembly so
failures are propagated (or converted to a proper Result error) when
Mode::Query/Mode::Prism is active.
In `@src/storage/gcs.rs`:
- Around line 552-561: Phase 2 currently spawns an unbounded future per path
(matching_paths.into_iter().map(...).collect::<FuturesUnordered<_>>()) which can
exhaust memory/requests; replace it with a bounded stream using
futures::stream::iter(matching_paths).map(|path| { let tenant_id =
tenant_id.clone(); async move { self.get_object(&path, &tenant_id).await }
}).buffer_unordered(MAX_OBJECT_STORE_REQUESTS).try_collect().await to limit
simultaneous get_object calls, and add the needed imports (futures::StreamExt,
futures::TryStreamExt); keep using the same MAX_OBJECT_STORE_REQUESTS constant
and ensure tenant_id is cloned inside the closure.
In `@src/storage/localfs.rs`:
- Around line 323-347: Phase 2 currently spawns unbounded async file reads over
matching_paths (using tenant_str_owned and collecting into FuturesUnordered)
which can exhaust file descriptors; change the pipeline to convert
matching_paths.into_iter() into a stream (use futures::stream::iter or
StreamExt) and apply .map(...) then .buffer_unordered(MAX_OBJECT_STORE_REQUESTS)
before collecting/try_collect to cap concurrent fs::read operations; also add
the required import use futures::StreamExt; reference the existing
variables/functions tenant_str_owned, matching_paths, MAX_OBJECT_STORE_REQUESTS,
and the res collection to locate where to apply the change.
In `@src/storage/s3.rs`:
- Around line 757-766: The Phase 2 fetch eagerly builds one future per matched
object by collecting into a FuturesUnordered, which can spike memory for large
matching_paths; change the pattern to stream the paths and apply bounded
concurrency (use a Stream::iter or equivalent over matching_paths, map each path
to the async self.get_object(&path, &tenant_id) call, then call buffer_unordered
with a concurrency limit constant—matching the project pattern used in
field_stats.rs—before try_collect().await) so only a limited number of in-flight
futures exist instead of allocating them all at once.
---
Outside diff comments:
In `@src/hottier.rs`:
- Around line 746-755: The code currently calls fs::remove_dir_all on
path_to_delete.parent() but only removes the single file entry
self.cached_files.remove(&file_to_delete.file_path), leaving other deleted
parquet files stale in the cache; update the cleanup after remove_dir_all to
remove all cached entries under that parent directory (e.g., iterate over
self.cached_files keys and remove/retain only those not starting with the parent
path or matching its children) so cached_files no longer contains entries for
files deleted by remove_dir_all—this will keep get_hot_tier_manifest_files
consistent with on-disk state; ensure you reference the parent directory
obtained from path_to_delete.parent().unwrap() and perform the cache mutation on
self.cached_files (using retain or explicit removal) in the same async context
as delete_empty_directory_hot_tier.
In `@src/storage/localfs.rs`:
- Around line 295-303: The code panics by calling expect() after
file_name().to_str(); update the directory-scan path where you build name from
entry (the let name = entry.path().file_name()...? sequence) to use
to_string_lossy() instead of to_str().expect(...) so non-UTF8 filenames are
handled safely, and keep the existing NoSuchKey error when file_name() is None;
ensure name remains an owned String produced from the lossy conversion.
---
Nitpick comments:
In `@src/handlers/http/query.rs`:
- Around line 464-467: The code builds streams_to_create by filtering stream
names with PARSEABLE.get_stream(stream_name, tenant_id) but doesn't remove
duplicates, causing redundant create tasks; update the logic that computes
streams_to_create in query.rs (the variable streams_to_create) to deduplicate
stream names before spawning tasks — e.g., collect into a HashSet (or use an
ordered dedupe if order matters) or call .unique/.dedup on the iterator so each
stream is created only once, then proceed to spawn tasks for the deduplicated
collection.
In `@src/hottier.rs`:
- Around line 289-328: The blocking cache-population task is spawned and its
JoinHandle dropped, so any panic inside walk_dir is swallowed; capture the
handle returned by tokio::task::spawn_blocking (instead of drop(...)), then
spawn an async waiter that awaits the handle and logs failures (e.g. using
log::error!) if handle.await returns Err(JoinError) or the closure returned a
Result::Err; reference the existing symbols walk_dir, hot_tier_path,
cached_files, and enabled_streams and ensure the spawned waiter logs the
JoinError and/or any internal error to make panics visible.
- Around line 303-306: The code silently returns when std::fs::read_dir(dir)
fails (the match producing entries), which hides disk/permission errors; change
Err(_) => return to capture the error (e.g. Err(e)) and log it at debug or trace
level including the error and the dir path before returning so you still bail
out but surface the underlying cause (modify the match around
std::fs::read_dir(dir), referencing the entries binding and the dir variable,
and use the crate's logger e.g. log::debug! or log::trace! with a message like
"failed to read_dir {}: {:?}", dir, e).
ℹ️ Review info
Configuration used: Repository UI
Review profile: CHILL
Plan: Pro
⛔ Files ignored due to path filters (1)
Cargo.lockis excluded by!**/*.lock
📒 Files selected for processing (11)
Cargo.tomlsrc/catalog/snapshot.rssrc/handlers/http/query.rssrc/hottier.rssrc/metastore/metastores/object_store_metastore.rssrc/parseable/mod.rssrc/query/stream_schema_provider.rssrc/storage/azure_blob.rssrc/storage/gcs.rssrc/storage/localfs.rssrc/storage/s3.rs
| /// Cache for stream JSON files with TTL, keyed by (stream_name, tenant_id) | ||
| stream_json_cache: Mutex<HashMap<String, CachedEntry<Bytes>>>, |
There was a problem hiding this comment.
Bound stream_json_cache; TTL-only HashMap can grow without limit.
stream_json_cache uses an unbounded HashMap. Expiration checks happen on lookup, but key cardinality is never bounded, so memory can grow indefinitely with stream/tenant churn.
Suggested bounded-cache refactor
const MANIFEST_CACHE_SIZE: usize = 512;
+const STREAM_JSON_CACHE_SIZE: usize = 2048;
const STREAM_JSON_CACHE_TTL: Duration = Duration::from_secs(300); // 5 minutes
pub struct ObjectStoreMetastore {
pub storage: Arc<dyn ObjectStorage>,
manifest_cache: Mutex<LruCache<String, Manifest>>,
- stream_json_cache: Mutex<HashMap<String, CachedEntry<Bytes>>>,
+ stream_json_cache: Mutex<LruCache<String, CachedEntry<Bytes>>>,
}
impl ObjectStoreMetastore {
pub fn new(storage: Arc<dyn ObjectStorage>) -> Self {
Self {
storage,
manifest_cache: Mutex::new(LruCache::new(
NonZeroUsize::new(MANIFEST_CACHE_SIZE).unwrap(),
)),
- stream_json_cache: Mutex::new(HashMap::new()),
+ stream_json_cache: Mutex::new(LruCache::new(
+ NonZeroUsize::new(STREAM_JSON_CACHE_SIZE).unwrap(),
+ )),
}
}
}- {
- let cache = self.stream_json_cache.lock();
- if let Some(entry) = cache.get(&cache_key)
- && !entry.is_expired(STREAM_JSON_CACHE_TTL)
- {
- return Ok(entry.value.clone());
- }
- }
+ {
+ let mut cache = self.stream_json_cache.lock();
+ if let Some(entry) = cache.get(&cache_key) {
+ if !entry.is_expired(STREAM_JSON_CACHE_TTL) {
+ return Ok(entry.value.clone());
+ }
+ cache.pop(&cache_key);
+ }
+ }
let bytes = self.storage.get_object(&path, tenant_id).await?;
self.stream_json_cache
.lock()
- .insert(cache_key, CachedEntry::new(bytes.clone()));
+ .put(cache_key, CachedEntry::new(bytes.clone()));🤖 Prompt for AI Agents
Verify each finding against the current code and only fix it if needed.
In `@src/metastore/metastores/object_store_metastore.rs` around lines 96 - 97,
stream_json_cache currently uses an unbounded HashMap inside Mutex in
ObjectStoreMetastore, so entries only expire on lookup and the map can grow
forever; replace it with a bounded, TTL-aware cache (e.g., an LruCache with a
max capacity or a TTL-capable cache like moka/evmap) keyed by the same String
and storing CachedEntry<Bytes>, or wrap an LRU+expiration structure to enforce
capacity on insert and evict least-recently-used items; update places that
read/write stream_json_cache (the Mutex<HashMap>-accessing code paths) to use
the new cache API and add an optional background cleanup task if using a
TTL-only cache to proactively remove expired entries.
| for col in columns { | ||
| match column_statistics.entry(col.name) { | ||
| std::collections::hash_map::Entry::Occupied(mut entry) => { | ||
| if let (Some(existing), Some(new_stats)) = | ||
| (entry.get_mut().take(), col.stats) | ||
| { | ||
| *x = Some(stats.update(col_stats)); | ||
| *entry.get_mut() = Some(existing.update(new_stats)); | ||
| } | ||
| }) | ||
| .or_insert_with(|| col.stats.as_ref().cloned()); | ||
| }); | ||
| } | ||
| std::collections::hash_map::Entry::Vacant(entry) => { | ||
| entry.insert(col.stats); | ||
| } | ||
| } | ||
| } |
There was a problem hiding this comment.
Preserve existing column stats when incoming stats are missing.
At Line 398, entry.get_mut().take() clears the slot before merge. If col.stats is None (or existing is None), the previous stats are dropped permanently. This regresses planner stats quality and pruning effectiveness.
Proposed fix
for col in columns {
match column_statistics.entry(col.name) {
std::collections::hash_map::Entry::Occupied(mut entry) => {
- if let (Some(existing), Some(new_stats)) =
- (entry.get_mut().take(), col.stats)
- {
- *entry.get_mut() = Some(existing.update(new_stats));
- }
+ let slot = entry.get_mut();
+ match (slot.take(), col.stats) {
+ (Some(existing), Some(new_stats)) => {
+ *slot = Some(existing.update(new_stats));
+ }
+ (Some(existing), None) => {
+ *slot = Some(existing);
+ }
+ (None, Some(new_stats)) => {
+ *slot = Some(new_stats);
+ }
+ (None, None) => {}
+ }
}
std::collections::hash_map::Entry::Vacant(entry) => {
entry.insert(col.stats);
}
}
}📝 Committable suggestion
‼️ IMPORTANT
Carefully review the code before committing. Ensure that it accurately replaces the highlighted code, contains no missing lines, and has no issues with indentation. Thoroughly test & benchmark the code to ensure it meets the requirements.
| for col in columns { | |
| match column_statistics.entry(col.name) { | |
| std::collections::hash_map::Entry::Occupied(mut entry) => { | |
| if let (Some(existing), Some(new_stats)) = | |
| (entry.get_mut().take(), col.stats) | |
| { | |
| *x = Some(stats.update(col_stats)); | |
| *entry.get_mut() = Some(existing.update(new_stats)); | |
| } | |
| }) | |
| .or_insert_with(|| col.stats.as_ref().cloned()); | |
| }); | |
| } | |
| std::collections::hash_map::Entry::Vacant(entry) => { | |
| entry.insert(col.stats); | |
| } | |
| } | |
| } | |
| for col in columns { | |
| match column_statistics.entry(col.name) { | |
| std::collections::hash_map::Entry::Occupied(mut entry) => { | |
| let slot = entry.get_mut(); | |
| match (slot.take(), col.stats) { | |
| (Some(existing), Some(new_stats)) => { | |
| *slot = Some(existing.update(new_stats)); | |
| } | |
| (Some(existing), None) => { | |
| *slot = Some(existing); | |
| } | |
| (None, Some(new_stats)) => { | |
| *slot = Some(new_stats); | |
| } | |
| (None, None) => {} | |
| } | |
| } | |
| std::collections::hash_map::Entry::Vacant(entry) => { | |
| entry.insert(col.stats); | |
| } | |
| } | |
| } |
🤖 Prompt for AI Agents
Verify each finding against the current code and only fix it if needed.
In `@src/query/stream_schema_provider.rs` around lines 394 - 407, The current loop
in the column_statistics merge (inside the for columns loop, in the
Entry::Occupied branch) calls entry.get_mut().take(), which clears the stored
Option and drops prior stats even when col.stats is None; instead, change the
logic to read the existing Option by reference (let existing_opt =
entry.get_mut();), and only modify it when col.stats is Some—if both
existing_opt and col.stats are Some call existing_opt =
Some(existing.update(new_stats)), if existing_opt is None and col.stats is Some
set it to Some(new_stats), and if col.stats is None leave existing_opt
untouched; this preserves prior stats when incoming stats are missing and uses
the same symbols column_statistics, columns, col.stats, and entry.get_mut() to
locate the code.
| let manifest_futures: Vec<_> = manifest_items | ||
| .into_iter() | ||
| .map(|item| async move { | ||
| let result = PARSEABLE | ||
| .metastore | ||
| .get_manifest( | ||
| stream_name, | ||
| item.time_lower_bound, | ||
| item.time_upper_bound, | ||
| Some(item.manifest_path), | ||
| tenant_id, | ||
| ) | ||
| .await; | ||
| (result, item.time_lower_bound, item.time_upper_bound) | ||
| }) | ||
| .collect(); | ||
|
|
||
| let results = join_all(manifest_futures).await; | ||
|
|
There was a problem hiding this comment.
Avoid unbounded manifest fetch fan-out.
Line 459-Line 477 issues one async fetch per manifest at once. Large snapshots can create request storms against metastore/object storage and hurt query-node stability. Add bounded concurrency (chunking/semaphore) here.
🤖 Prompt for AI Agents
Verify each finding against the current code and only fix it if needed.
In `@src/query/stream_schema_provider.rs` around lines 459 - 477, Unbounded
concurrent fetches are created by mapping manifest_items into manifest_futures
and awaiting join_all; change this to a bounded concurrency approach by
streaming the items and applying a concurrency limiter (e.g.,
futures::stream::iter(manifest_items).map(|item| async { ...
PARSEABLE.metastore.get_manifest(...) ...
}).buffer_unordered(CONCURRENCY_LIMIT).collect()). Replace the manifest_futures
+ join_all pattern with the buffered stream so PARSEABLE.metastore.get_manifest
calls are limited; pick a sensible CONCURRENCY_LIMIT constant and keep the
returned tuple shape (result, time_lower_bound, time_upper_bound) unchanged.
| if PARSEABLE.options.mode == Mode::Query || PARSEABLE.options.mode == Mode::Prism { | ||
| let obs = PARSEABLE | ||
| .metastore | ||
| .get_all_stream_jsons(&self.stream, None, &self.tenant_id) | ||
| .await; | ||
| let mut time_partition = None; | ||
| let mut snapshot = Snapshot::default(); | ||
| if let Ok(obs) = obs { | ||
| for ob in obs { | ||
| if let Ok(osf) = serde_json::from_slice::<ObjectStoreFormat>(&ob) { | ||
| if time_partition.is_none() { | ||
| time_partition = osf.time_partition; | ||
| } | ||
| snapshot.manifest_list.extend(osf.snapshot.manifest_list); | ||
| } | ||
| } | ||
| } | ||
| (time_partition, snapshot) | ||
| } else { |
There was a problem hiding this comment.
Do not silently continue on metadata fetch/parse errors in Query/Prism mode.
At Line 552 and Line 554, errors from get_all_stream_jsons and serde_json::from_slice are ignored, then query execution proceeds with default Snapshot. This can return partial/empty results instead of failing fast.
Proposed fix
let (time_partition, merged_snapshot) =
if PARSEABLE.options.mode == Mode::Query || PARSEABLE.options.mode == Mode::Prism {
- let obs = PARSEABLE
+ let obs = PARSEABLE
.metastore
.get_all_stream_jsons(&self.stream, None, &self.tenant_id)
- .await;
+ .await
+ .map_err(|e| DataFusionError::Plan(e.to_string()))?;
let mut time_partition = None;
let mut snapshot = Snapshot::default();
- if let Ok(obs) = obs {
- for ob in obs {
- if let Ok(osf) = serde_json::from_slice::<ObjectStoreFormat>(&ob) {
- if time_partition.is_none() {
- time_partition = osf.time_partition;
- }
- snapshot.manifest_list.extend(osf.snapshot.manifest_list);
- }
+ for ob in obs {
+ let osf: ObjectStoreFormat = serde_json::from_slice(&ob)
+ .map_err(|e| DataFusionError::Plan(e.to_string()))?;
+ if time_partition.is_none() {
+ time_partition = osf.time_partition.clone();
}
+ snapshot.manifest_list.extend(osf.snapshot.manifest_list);
}
(time_partition, snapshot)
} else {🤖 Prompt for AI Agents
Verify each finding against the current code and only fix it if needed.
In `@src/query/stream_schema_provider.rs` around lines 545 - 563, When
PARSEABLE.options.mode is Query or Prism, do not ignore errors from
metastore.get_all_stream_jsons or from serde_json::from_slice for
ObjectStoreFormat: if get_all_stream_jsons returns Err, return or propagate that
error instead of continuing with Snapshot::default, and if
serde_json::from_slice fails for any returned object, surface that parsing error
(or at least return a descriptive error including the offending object/stream)
rather than skipping it; update the logic around get_all_stream_jsons,
ObjectStoreFormat deserialization, and the time_partition/snapshot assembly so
failures are propagated (or converted to a proper Result error) when
Mode::Query/Mode::Prism is active.
| // Phase 2: Fetch all matching objects concurrently | ||
| let res: Vec<Bytes> = matching_paths | ||
| .into_iter() | ||
| .map(|path| { | ||
| let tenant_id = tenant_id.clone(); | ||
| async move { self.get_object(&path, &tenant_id).await } | ||
| }) | ||
| .collect::<FuturesUnordered<_>>() | ||
| .try_collect() | ||
| .await?; |
There was a problem hiding this comment.
🧩 Analysis chain
🏁 Script executed:
# First, let's read the file around lines 552-561 to see the actual implementation
sed -n '545,570p' src/storage/gcs.rsRepository: parseablehq/parseable
Length of output: 901
🏁 Script executed:
# Search for MAX_OBJECT_STORE_REQUESTS constant
rg "MAX_OBJECT_STORE_REQUESTS" --type rustRepository: parseablehq/parseable
Length of output: 521
🏁 Script executed:
# Check if there are similar patterns in other storage implementations (like S3)
rg "FuturesUnordered|buffer_unordered" --type rust -A 2 -B 2Repository: parseablehq/parseable
Length of output: 9379
🏁 Script executed:
# Check the broader context of the get_objects method
sed -n '520,565p' src/storage/gcs.rsRepository: parseablehq/parseable
Length of output: 1645
Bound concurrent GET fan-out in Phase 2.
Current implementation spawns one future per matched object without any concurrency limit, risking unbounded memory usage and request spikes when listing returns many results. Even though the GCS client is wrapped with LimitStore(MAX_OBJECT_STORE_REQUESTS) for rate limiting, this does not prevent all futures from being created simultaneously within a single get_objects call.
🔧 Proposed fix (bounded concurrency)
- let res: Vec<Bytes> = matching_paths
- .into_iter()
- .map(|path| {
- let tenant_id = tenant_id.clone();
- async move { self.get_object(&path, &tenant_id).await }
- })
- .collect::<FuturesUnordered<_>>()
- .try_collect()
- .await?;
+ let tenant_id = tenant_id.clone();
+ let res: Vec<Bytes> = futures::stream::iter(matching_paths.into_iter())
+ .map(|path| {
+ let tenant_id = tenant_id.clone();
+ async move { self.get_object(&path, &tenant_id).await }
+ })
+ .buffer_unordered(super::MAX_OBJECT_STORE_REQUESTS)
+ .try_collect()
+ .await?;🤖 Prompt for AI Agents
Verify each finding against the current code and only fix it if needed.
In `@src/storage/gcs.rs` around lines 552 - 561, Phase 2 currently spawns an
unbounded future per path
(matching_paths.into_iter().map(...).collect::<FuturesUnordered<_>>()) which can
exhaust memory/requests; replace it with a bounded stream using
futures::stream::iter(matching_paths).map(|path| { let tenant_id =
tenant_id.clone(); async move { self.get_object(&path, &tenant_id).await }
}).buffer_unordered(MAX_OBJECT_STORE_REQUESTS).try_collect().await to limit
simultaneous get_object calls, and add the needed imports (futures::StreamExt,
futures::TryStreamExt); keep using the same MAX_OBJECT_STORE_REQUESTS constant
and ensure tenant_id is cloned inside the closure.
| // Phase 2: Read all matching files concurrently | ||
| let tenant_str_owned = tenant_str.to_owned(); | ||
| let res: Vec<Bytes> = matching_paths | ||
| .into_iter() | ||
| .map(|path| { | ||
| let tenant = tenant_str_owned.clone(); | ||
| async move { | ||
| let file = fs::read(&path).await.map_err(ObjectStorageError::from)?; | ||
| increment_files_scanned_in_object_store_calls_by_date( | ||
| "GET", | ||
| 1, | ||
| &Utc::now().date_naive().to_string(), | ||
| &tenant, | ||
| ); | ||
| increment_object_store_calls_by_date( | ||
| "GET", | ||
| &Utc::now().date_naive().to_string(), | ||
| &tenant, | ||
| ); | ||
| Ok::<Bytes, ObjectStorageError>(file.into()) | ||
| } | ||
| }) | ||
| .collect::<FuturesUnordered<_>>() | ||
| .try_collect() | ||
| .await?; |
There was a problem hiding this comment.
🧩 Analysis chain
🏁 Script executed:
# Find MAX_OBJECT_STORE_REQUESTS constant
rg "MAX_OBJECT_STORE_REQUESTS" --type rustRepository: parseablehq/parseable
Length of output: 521
🏁 Script executed:
# Check the current code in localfs.rs around lines 323-347
wc -l src/storage/localfs.rsRepository: parseablehq/parseable
Length of output: 91
🏁 Script executed:
# Read the actual code at the specified lines
sed -n '320,350p' src/storage/localfs.rsRepository: parseablehq/parseable
Length of output: 1153
🏁 Script executed:
# Check imports at the top of localfs.rs to understand current dependencies
head -30 src/storage/localfs.rsRepository: parseablehq/parseable
Length of output: 1132
🏁 Script executed:
# Search for buffer_unordered usage in the codebase to see if it's already used
rg "buffer_unordered" --type rustRepository: parseablehq/parseable
Length of output: 682
🏁 Script executed:
# Check how get_objects is called/used
rg "get_objects" --type rust -B 2 -A 2 | head -40Repository: parseablehq/parseable
Length of output: 1968
🏁 Script executed:
# Check if LocalFS is used in multi-tenant or other contexts beyond single-node
rg "LocalFS" --type rust -B 2 -A 2 | head -50Repository: parseablehq/parseable
Length of output: 2017
🏁 Script executed:
# Verify if StreamExt is already available/imported elsewhere in the file
rg "use.*StreamExt" src/storage/localfs.rsRepository: parseablehq/parseable
Length of output: 119
🏁 Script executed:
# Check the entire Phase 2 context to understand the function and listing size
sed -n '290,350p' src/storage/localfs.rsRepository: parseablehq/parseable
Length of output: 2167
Add concurrency limit to Phase 2 file reads in LocalFS to prevent file descriptor exhaustion.
LocalFS currently reads all matched files concurrently without bounds, while S3, GCS, and Azure backends limit concurrency to MAX_OBJECT_STORE_REQUESTS (1000). Apply the same limit here for consistency and to prevent exceeding OS file descriptor limits.
Required import addition:
use futures::StreamExt;🔧 Proposed fix
- let res: Vec<Bytes> = matching_paths
- .into_iter()
+ let res: Vec<Bytes> = futures::stream::iter(matching_paths.into_iter())
.map(|path| {
let tenant = tenant_str_owned.clone();
async move {
let file = fs::read(&path).await.map_err(ObjectStorageError::from)?;
increment_files_scanned_in_object_store_calls_by_date(
"GET",
1,
&Utc::now().date_naive().to_string(),
&tenant,
);
increment_object_store_calls_by_date(
"GET",
&Utc::now().date_naive().to_string(),
&tenant,
);
Ok::<Bytes, ObjectStorageError>(file.into())
}
})
- .collect::<FuturesUnordered<_>>()
+ .buffer_unordered(super::MAX_OBJECT_STORE_REQUESTS)
.try_collect()
.await?;🤖 Prompt for AI Agents
Verify each finding against the current code and only fix it if needed.
In `@src/storage/localfs.rs` around lines 323 - 347, Phase 2 currently spawns
unbounded async file reads over matching_paths (using tenant_str_owned and
collecting into FuturesUnordered) which can exhaust file descriptors; change the
pipeline to convert matching_paths.into_iter() into a stream (use
futures::stream::iter or StreamExt) and apply .map(...) then
.buffer_unordered(MAX_OBJECT_STORE_REQUESTS) before collecting/try_collect to
cap concurrent fs::read operations; also add the required import use
futures::StreamExt; reference the existing variables/functions tenant_str_owned,
matching_paths, MAX_OBJECT_STORE_REQUESTS, and the res collection to locate
where to apply the change.
| // Phase 2: Fetch all matching objects concurrently | ||
| let res: Vec<Bytes> = matching_paths | ||
| .into_iter() | ||
| .map(|path| { | ||
| let tenant_id = tenant_id.clone(); | ||
| async move { self.get_object(&path, &tenant_id).await } | ||
| }) | ||
| .collect::<FuturesUnordered<_>>() | ||
| .try_collect() | ||
| .await?; |
There was a problem hiding this comment.
🧩 Analysis chain
🏁 Script executed:
# First, let's look at the actual code at lines 757-766 in src/storage/s3.rs
head -780 src/storage/s3.rs | tail -50Repository: parseablehq/parseable
Length of output: 1704
🏁 Script executed:
# Search for MAX_OBJECT_STORE_REQUESTS constant
rg "MAX_OBJECT_STORE_REQUESTS" --type rsRepository: parseablehq/parseable
Length of output: 91
🏁 Script executed:
# Search for FuturesUnordered usage in the file to understand the pattern
rg "FuturesUnordered|buffer_unordered" src/storage/s3.rs -A 5 -B 2Repository: parseablehq/parseable
Length of output: 871
🏁 Script executed:
# Check what imports are available and how futures are being used
head -100 src/storage/s3.rs | grep -E "use|import"Repository: parseablehq/parseable
Length of output: 535
🏁 Script executed:
# Search for MAX_OBJECT_STORE_REQUESTS without file type filter
rg "MAX_OBJECT_STORE_REQUESTS"Repository: parseablehq/parseable
Length of output: 521
🏁 Script executed:
# Search for buffer_unordered usage in the codebase
rg "buffer_unordered" -A 3 -B 2Repository: parseablehq/parseable
Length of output: 2647
🏁 Script executed:
# Check how concurrency is managed elsewhere in the codebase
rg "LimitStore|buffered|limit" src/storage/ -A 2 -B 1Repository: parseablehq/parseable
Length of output: 16435
🏁 Script executed:
# Let's understand the full context around this function
sed -n '724,780p' src/storage/s3.rsRepository: parseablehq/parseable
Length of output: 1979
Bound Phase-2 fetch concurrency to avoid memory spikes with large listings.
The code creates one async future per matched object immediately via map().collect::<FuturesUnordered<_>>(), which eagerly instantiates all futures in memory before execution. While the underlying S3 client is wrapped with LimitStore (capped at 1000 concurrent requests), the futures themselves are unbounded, causing unnecessary memory overhead on large prefix listings. Use buffer_unordered to limit in-flight futures and follow the concurrent pattern established elsewhere in the codebase (e.g., field_stats.rs).
🔧 Proposed fix (bounded concurrency)
- let res: Vec<Bytes> = matching_paths
- .into_iter()
- .map(|path| {
- let tenant_id = tenant_id.clone();
- async move { self.get_object(&path, &tenant_id).await }
- })
- .collect::<FuturesUnordered<_>>()
- .try_collect()
- .await?;
+ let res: Vec<Bytes> = futures::stream::iter(matching_paths.into_iter())
+ .map(|path| {
+ let tenant_id = tenant_id.clone();
+ async move { self.get_object(&path, &tenant_id).await }
+ })
+ .buffer_unordered(super::MAX_OBJECT_STORE_REQUESTS)
+ .try_collect()
+ .await?;🤖 Prompt for AI Agents
Verify each finding against the current code and only fix it if needed.
In `@src/storage/s3.rs` around lines 757 - 766, The Phase 2 fetch eagerly builds
one future per matched object by collecting into a FuturesUnordered, which can
spike memory for large matching_paths; change the pattern to stream the paths
and apply bounded concurrency (use a Stream::iter or equivalent over
matching_paths, map each path to the async self.get_object(&path, &tenant_id)
call, then call buffer_unordered with a concurrency limit constant—matching the
project pattern used in field_stats.rs—before try_collect().await) so only a
limited number of in-flight futures exist instead of allocating them all at
once.
Summary by CodeRabbit
Release Notes
New Features
Performance Improvements
Chores