diff --git a/Cargo.lock b/Cargo.lock index 16edf186f..670b6e404 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -53,7 +53,7 @@ dependencies = [ "derive_more 2.1.1", "encoding_rs", "flate2", - "foldhash", + "foldhash 0.1.5", "futures-core", "h2 0.3.27", "http 0.2.12", @@ -187,7 +187,7 @@ dependencies = [ "cookie 0.16.2", "derive_more 2.1.1", "encoding_rs", - "foldhash", + "foldhash 0.1.5", "futures-core", "futures-util", "impl-more", @@ -2383,6 +2383,12 @@ version = "0.1.5" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "d9c4f5dac5e15c24eb999c26181a6ca40b39fe946cbe4c263c7209467bc83af2" +[[package]] +name = "foldhash" +version = "0.2.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "77ce24cb58228fbb8aa041425bb1050850ac19177686ea6e0f41a70416f56fdb" + [[package]] name = "form_urlencoded" version = "1.2.2" @@ -2623,7 +2629,7 @@ version = "0.15.5" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "9229cfe53dfd69f0609a49f65461bd93001ea1ef889cd5529dd176593f5338a1" dependencies = [ - "foldhash", + "foldhash 0.1.5", ] [[package]] @@ -2631,6 +2637,11 @@ name = "hashbrown" version = "0.16.1" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "841d1cc9bed7f9236f321df977030373f4a4163ae1a7dbfe1a51a2c1a51d9100" +dependencies = [ + "allocator-api2", + "equivalent", + "foldhash 0.2.0", +] [[package]] name = "heck" @@ -3221,6 +3232,15 @@ version = "0.4.29" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "5e5032e24019045c762d3c0f28f5b6b8bbf38563a65908389bf7978758920897" +[[package]] +name = "lru" +version = "0.16.3" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "a1dc47f592c06f33f8e3aea9591776ec7c9f9e4124778ff8a3c3b87159f7e593" +dependencies = [ + "hashbrown 0.16.1", +] + [[package]] name = "lru-slab" version = "0.1.2" @@ -3729,6 +3749,7 @@ dependencies = [ "indexmap", "itertools 0.14.0", "lazy_static", + "lru", "num_cpus", "object_store", "once_cell", diff --git a/Cargo.toml b/Cargo.toml index dfe6a9ea5..3f40c403b 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -166,6 +166,7 @@ prost = "0.13.1" dashmap = "6.1.0" parking_lot = "0.12.5" indexmap = { version = "2.13.0", features = ["serde"] } +lru = "0.16.3" [build-dependencies] cargo_toml = "0.21" diff --git a/src/catalog/snapshot.rs b/src/catalog/snapshot.rs index a1cf04c9a..69b9eceef 100644 --- a/src/catalog/snapshot.rs +++ b/src/catalog/snapshot.rs @@ -40,34 +40,33 @@ impl Default for Snapshot { impl super::Snapshot for Snapshot { fn manifests(&self, time_predicates: &[PartialTimeFilter]) -> Vec { - let mut manifests = self.manifest_list.clone(); - for predicate in time_predicates { - match predicate { - PartialTimeFilter::Low(Bound::Included(time)) => manifests.retain(|item| { - let time = time.and_utc(); - item.time_upper_bound >= time - }), - PartialTimeFilter::Low(Bound::Excluded(time)) => manifests.retain(|item| { - let time = time.and_utc(); - item.time_upper_bound > time - }), - PartialTimeFilter::High(Bound::Included(time)) => manifests.retain(|item| { - let time = time.and_utc(); - item.time_lower_bound <= time - }), - PartialTimeFilter::High(Bound::Excluded(time)) => manifests.retain(|item| { - let time = time.and_utc(); - item.time_lower_bound < time - }), - PartialTimeFilter::Eq(time) => manifests.retain(|item| { - let time = time.and_utc(); - item.time_lower_bound <= time && time <= item.time_upper_bound - }), - _ => (), - } - } - - manifests + // Avoid cloning the entire manifest list upfront; instead filter by reference + // and only clone items that match all predicates. + self.manifest_list + .iter() + .filter(|item| { + time_predicates.iter().all(|predicate| match predicate { + PartialTimeFilter::Low(Bound::Included(time)) => { + item.time_upper_bound >= time.and_utc() + } + PartialTimeFilter::Low(Bound::Excluded(time)) => { + item.time_upper_bound > time.and_utc() + } + PartialTimeFilter::High(Bound::Included(time)) => { + item.time_lower_bound <= time.and_utc() + } + PartialTimeFilter::High(Bound::Excluded(time)) => { + item.time_lower_bound < time.and_utc() + } + PartialTimeFilter::Eq(time) => { + let time = time.and_utc(); + item.time_lower_bound <= time && time <= item.time_upper_bound + } + _ => true, + }) + }) + .cloned() + .collect() } } diff --git a/src/handlers/http/query.rs b/src/handlers/http/query.rs index fd7804f7f..2d5e175aa 100644 --- a/src/handlers/http/query.rs +++ b/src/handlers/http/query.rs @@ -460,8 +460,18 @@ pub async fn create_streams_for_distributed( if PARSEABLE.options.mode != Mode::Query && PARSEABLE.options.mode != Mode::Prism { return Ok(()); } + // Skip streams already loaded in memory — avoids redundant S3 list_streams + metadata calls + let streams_to_create: Vec<_> = streams + .into_iter() + .filter(|stream_name| PARSEABLE.get_stream(stream_name, tenant_id).is_err()) + .collect(); + + if streams_to_create.is_empty() { + return Ok(()); + } + let mut join_set = JoinSet::new(); - for stream_name in streams { + for stream_name in streams_to_create { let id = tenant_id.to_owned(); join_set.spawn(async move { let result = PARSEABLE diff --git a/src/hottier.rs b/src/hottier.rs index d4641dd1f..1d0b56415 100644 --- a/src/hottier.rs +++ b/src/hottier.rs @@ -22,6 +22,8 @@ use std::{ path::{Path, PathBuf}, }; +use dashmap::{DashMap, DashSet}; + use crate::{ catalog::manifest::{File, Manifest}, handlers::http::cluster::PMETA_STREAM_NAME, @@ -67,6 +69,12 @@ pub struct StreamHotTier { pub struct HotTierManager { filesystem: LocalFileSystem, hot_tier_path: &'static Path, + /// In-memory index of files present in hot tier: file_path -> file_size. + /// Avoids per-file filesystem stat calls during query. + cached_files: DashMap, + /// Tracks which streams have hot tier enabled (key: "tenant/stream" or "stream"). + /// Avoids blocking filesystem stat calls in async context on every query. + enabled_streams: DashSet, } impl HotTierManager { @@ -75,6 +83,8 @@ impl HotTierManager { HotTierManager { filesystem: LocalFileSystem::new(), hot_tier_path, + cached_files: DashMap::new(), + enabled_streams: DashSet::new(), } } @@ -213,12 +223,23 @@ impl HotTierManager { if !self.check_stream_hot_tier_exists(stream, tenant_id) { return Err(HotTierValidationError::NotFound(stream.to_owned()).into()); } + // Build the prefix to match index entries for this stream + let prefix = if let Some(tenant_id) = tenant_id.as_ref() { + format!("{tenant_id}/{stream}/") + } else { + format!("{stream}/") + }; + // Remove all cached entries for this stream from the in-memory index + self.cached_files.retain(|k, _| !k.starts_with(&prefix)); + let path = if let Some(tenant_id) = tenant_id.as_ref() { self.hot_tier_path.join(tenant_id).join(stream) } else { self.hot_tier_path.join(stream) }; fs::remove_dir_all(path).await?; + self.enabled_streams + .remove(&Self::stream_key(stream, tenant_id)); Ok(()) } @@ -234,6 +255,8 @@ impl HotTierManager { let path = self.hot_tier_file_path(stream, tenant_id)?; let bytes = serde_json::to_vec(&hot_tier)?.into(); self.filesystem.put(&path, bytes).await?; + self.enabled_streams + .insert(Self::stream_key(stream, tenant_id)); Ok(()) } @@ -263,6 +286,47 @@ impl HotTierManager { where 'a: 'static, { + // Populate the in-memory cache from existing hot tier files on disk. + // Runs in a blocking thread pool task to avoid stalling the async runtime. + // Queries arriving before this completes will simply not find files in the + // cache and fall through to remote storage (same behavior as before caching). + let hot_tier_path = self.hot_tier_path; + let cached_files = &self.cached_files; + let enabled_streams = &self.enabled_streams; + drop(tokio::task::spawn_blocking(move || { + fn walk_dir( + dir: &Path, + hot_tier_root: &Path, + index: &DashMap, + streams: &DashSet, + ) { + let entries = match std::fs::read_dir(dir) { + Ok(entries) => entries, + Err(_) => return, + }; + for entry in entries.flatten() { + let path = entry.path(); + if path.is_dir() { + walk_dir(&path, hot_tier_root, index, streams); + } else if path.extension().is_some_and(|ext| ext == "parquet") + && let Ok(meta) = entry.metadata() + && let Ok(rel) = path.strip_prefix(hot_tier_root) + { + index.insert(rel.to_string_lossy().into_owned(), meta.len()); + } else if path + .file_name() + .is_some_and(|n| n == STREAM_HOT_TIER_FILENAME) + && let Ok(rel) = path.strip_prefix(hot_tier_root) + && let Some(stream_dir) = rel.parent() + { + // Register "tenant/stream" or "stream" in the enabled set + streams.insert(stream_dir.to_string_lossy().into_owned()); + } + } + } + walk_dir(hot_tier_path, hot_tier_path, cached_files, enabled_streams); + })); + let mut scheduler = AsyncScheduler::new(); scheduler .every(HOT_TIER_SYNC_DURATION) @@ -444,6 +508,9 @@ impl HotTierManager { .get_object(&parquet_file_path, tenant_id) .await?; file.write_all(&parquet_data).await?; + // Update the in-memory index with the newly downloaded file + self.cached_files + .insert(parquet_file.file_path.clone(), parquet_file.file_size); *parquet_file_size += parquet_file.file_size; stream_hot_tier.used_size = *parquet_file_size; @@ -550,33 +617,29 @@ impl HotTierManager { } } - /// Returns the list of manifest files present in hot tier directory for the stream + /// Returns the list of manifest files present in hot tier directory for the stream. + /// Uses the in-memory index for O(1) lookups per file instead of filesystem stat calls. pub async fn get_hot_tier_manifest_files( &self, manifest_files: &mut Vec, ) -> Result, HotTierError> { - // Check which query-relevant files exist locally in the hot tier directory. let mut hot_tier_files = Vec::new(); let mut remaining = Vec::with_capacity(manifest_files.len()); for file in manifest_files.drain(..) { - let hot_tier_path = self.hot_tier_path.join(&file.file_path); - if let Ok(meta) = fs::metadata(&hot_tier_path).await - && meta.len() == file.file_size + if self + .cached_files + .get(&file.file_path) + .is_some_and(|size| *size == file.file_size) { hot_tier_files.push(file); - continue; + } else { + remaining.push(file); } - - remaining.push(file); } *manifest_files = remaining; - // Sort both lists in descending order by file path. - hot_tier_files.sort_unstable_by(|a, b| b.file_path.cmp(&a.file_path)); - manifest_files.sort_unstable_by(|a, b| b.file_path.cmp(&a.file_path)); - Ok(hot_tier_files) } @@ -612,19 +675,21 @@ impl HotTierManager { Ok(hot_tier_parquet_files) } - ///check if the hot tier metadata file exists for the stream - pub fn check_stream_hot_tier_exists(&self, stream: &str, tenant_id: &Option) -> bool { - let path = if let Some(tenant_id) = tenant_id.as_ref() { - self.hot_tier_path - .join(tenant_id) - .join(stream) - .join(STREAM_HOT_TIER_FILENAME) + /// Key for the enabled_streams set: "tenant/stream" or "stream". + fn stream_key(stream: &str, tenant_id: &Option) -> String { + if let Some(tenant_id) = tenant_id.as_ref() { + format!("{tenant_id}/{stream}") } else { - self.hot_tier_path - .join(stream) - .join(STREAM_HOT_TIER_FILENAME) - }; - path.exists() + stream.to_owned() + } + } + + /// Check if the hot tier is enabled for the stream. + /// Uses the in-memory set populated at startup and kept in sync + /// by put_hot_tier / delete_hot_tier, avoiding blocking I/O. + pub fn check_stream_hot_tier_exists(&self, stream: &str, tenant_id: &Option) -> bool { + self.enabled_streams + .contains(&Self::stream_key(stream, tenant_id)) } ///delete the parquet file from the hot tier directory for the stream @@ -686,6 +751,9 @@ impl HotTierManager { ) .await?; + // Remove deleted file from in-memory index + self.cached_files.remove(&file_to_delete.file_path); + stream_hot_tier.used_size -= file_size; stream_hot_tier.available_size += file_size; self.put_hot_tier(stream, stream_hot_tier, tenant_id) diff --git a/src/metastore/metastores/object_store_metastore.rs b/src/metastore/metastores/object_store_metastore.rs index 904bcca24..70906507d 100644 --- a/src/metastore/metastores/object_store_metastore.rs +++ b/src/metastore/metastores/object_store_metastore.rs @@ -18,7 +18,9 @@ use std::{ collections::{BTreeMap, HashMap, HashSet}, + num::NonZeroUsize, sync::Arc, + time::{Duration, Instant}, }; use actix_web::http::StatusCode; @@ -26,6 +28,8 @@ use arrow_schema::Schema; use bytes::Bytes; use chrono::{DateTime, Utc}; use dashmap::DashMap; +use lru::LruCache; +use parking_lot::Mutex; use relative_path::RelativePathBuf; use tonic::async_trait; use tracing::warn; @@ -59,10 +63,58 @@ use crate::{ users::filters::{Filter, migrate_v1_v2}, }; +/// Default manifest LRU cache capacity +const MANIFEST_CACHE_SIZE: usize = 512; +/// Default stream JSON cache TTL +const STREAM_JSON_CACHE_TTL: Duration = Duration::from_secs(300); // 5 minutes + +/// Cached entry with a TTL +#[derive(Debug, Clone)] +struct CachedEntry { + value: T, + inserted_at: Instant, +} + +impl CachedEntry { + fn new(value: T) -> Self { + Self { + value, + inserted_at: Instant::now(), + } + } + + fn is_expired(&self, ttl: Duration) -> bool { + self.inserted_at.elapsed() > ttl + } +} + /// Using PARSEABLE's storage as a metastore (default) -#[derive(Debug)] pub struct ObjectStoreMetastore { pub storage: Arc, + /// LRU cache for manifest files, keyed by manifest path + manifest_cache: Mutex>, + /// Cache for stream JSON files with TTL, keyed by (stream_name, tenant_id) + stream_json_cache: Mutex>>, +} + +impl std::fmt::Debug for ObjectStoreMetastore { + fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result { + f.debug_struct("ObjectStoreMetastore") + .field("storage", &self.storage) + .finish() + } +} + +impl ObjectStoreMetastore { + pub fn new(storage: Arc) -> Self { + Self { + storage, + manifest_cache: Mutex::new(LruCache::new( + NonZeroUsize::new(MANIFEST_CACHE_SIZE).unwrap(), + )), + stream_json_cache: Mutex::new(HashMap::new()), + } + } } #[async_trait] @@ -869,7 +921,27 @@ impl Metastore for ObjectStoreMetastore { } else { stream_json_path(stream_name, tenant_id) }; - Ok(self.storage.get_object(&path, tenant_id).await?) + + let cache_key = path.to_string(); + + // Check stream JSON cache (with TTL) + { + let cache = self.stream_json_cache.lock(); + if let Some(entry) = cache.get(&cache_key) + && !entry.is_expired(STREAM_JSON_CACHE_TTL) + { + return Ok(entry.value.clone()); + } + } + + let bytes = self.storage.get_object(&path, tenant_id).await?; + + // Cache the result + self.stream_json_cache + .lock() + .insert(cache_key, CachedEntry::new(bytes.clone())); + + Ok(bytes) } /// Fetch all `ObjectStoreFormat` present in a stream folder @@ -922,10 +994,17 @@ impl Metastore for ObjectStoreMetastore { ) -> Result<(), MetastoreError> { let path = stream_json_path(stream_name, tenant_id); - Ok(self - .storage - .put_object(&path, to_bytes(obj), tenant_id) - .await?) + let bytes = to_bytes(obj); + self.storage + .put_object(&path, bytes.clone(), tenant_id) + .await?; + + // Update the stream JSON cache with the new value + self.stream_json_cache + .lock() + .insert(path.to_string(), CachedEntry::new(bytes)); + + Ok(()) } /// Fetch all `Manifest` files @@ -980,7 +1059,7 @@ impl Metastore for ObjectStoreMetastore { Ok(result_file_list) } - /// Fetch a specific `Manifest` file + /// Fetch a specific `Manifest` file (with LRU cache) async fn get_manifest( &self, stream_name: &str, @@ -996,26 +1075,24 @@ impl Metastore for ObjectStoreMetastore { manifest_path(path.as_str()) } }; + + let cache_key = path.to_string(); + + // Check LRU cache first + if let Some(cached) = self.manifest_cache.lock().get(&cache_key) { + return Ok(Some(cached.clone())); + } + match self.storage.get_object(&path, tenant_id).await { Ok(bytes) => { - let manifest = serde_json::from_slice(&bytes)?; + let manifest: Manifest = serde_json::from_slice(&bytes)?; + // Insert into LRU cache + self.manifest_cache.lock().put(cache_key, manifest.clone()); Ok(Some(manifest)) } Err(ObjectStorageError::NoSuchKey(_)) => Ok(None), Err(err) => Err(MetastoreError::ObjectStorageError(err)), } - // let path = partition_path(stream_name, lower_bound, upper_bound); - // // // need a 'ends with `manifest.json` condition here' - // // let obs = self - // // .storage - // // .get_objects( - // // path, - // // Box::new(|file_name| file_name.ends_with("manifest.json")), - // // ) - // // .await?; - // warn!(partition_path=?path); - // let path = manifest_path(path.as_str()); - // warn!(manifest_path=?path); } /// Get the path for a specific `Manifest` file @@ -1045,10 +1122,17 @@ impl Metastore for ObjectStoreMetastore { let path = partition_path(stream_name, lower_bound, upper_bound, tenant_id) .join(&manifest_file_name); - Ok(self - .storage - .put_object(&path, to_bytes(obj), tenant_id) - .await?) + let bytes = to_bytes(obj); + self.storage + .put_object(&path, bytes.clone(), tenant_id) + .await?; + + // Update the manifest cache with the newly written manifest + if let Ok(manifest) = serde_json::from_slice::(&bytes) { + self.manifest_cache.lock().put(path.to_string(), manifest); + } + + Ok(()) } async fn delete_manifest( @@ -1061,6 +1145,10 @@ impl Metastore for ObjectStoreMetastore { let manifest_file_name = manifest_path("").to_string(); let path = partition_path(stream_name, lower_bound, upper_bound, tenant_id) .join(&manifest_file_name); + + // Remove from manifest cache + self.manifest_cache.lock().pop(&path.to_string()); + Ok(self.storage.delete_object(&path, tenant_id).await?) } diff --git a/src/parseable/mod.rs b/src/parseable/mod.rs index c6072798d..1c92d5921 100644 --- a/src/parseable/mod.rs +++ b/src/parseable/mod.rs @@ -119,9 +119,7 @@ pub static PARSEABLE: Lazy = Lazy::new(|| match Cli::parse().storage } // for now create a metastore without using a CLI arg - let metastore = ObjectStoreMetastore { - storage: args.storage.construct_client(), - }; + let metastore = ObjectStoreMetastore::new(args.storage.construct_client()); Parseable::new( args.options, @@ -133,9 +131,7 @@ pub static PARSEABLE: Lazy = Lazy::new(|| match Cli::parse().storage } StorageOptions::S3(args) => { // for now create a metastore without using a CLI arg - let metastore = ObjectStoreMetastore { - storage: args.storage.construct_client(), - }; + let metastore = ObjectStoreMetastore::new(args.storage.construct_client()); Parseable::new( args.options, #[cfg(feature = "kafka")] @@ -146,9 +142,7 @@ pub static PARSEABLE: Lazy = Lazy::new(|| match Cli::parse().storage } StorageOptions::Blob(args) => { // for now create a metastore without using a CLI arg - let metastore = ObjectStoreMetastore { - storage: args.storage.construct_client(), - }; + let metastore = ObjectStoreMetastore::new(args.storage.construct_client()); Parseable::new( args.options, #[cfg(feature = "kafka")] @@ -159,9 +153,7 @@ pub static PARSEABLE: Lazy = Lazy::new(|| match Cli::parse().storage } StorageOptions::Gcs(args) => { // for now create a metastore without using a CLI arg - let metastore = ObjectStoreMetastore { - storage: args.storage.construct_client(), - }; + let metastore = ObjectStoreMetastore::new(args.storage.construct_client()); Parseable::new( args.options, #[cfg(feature = "kafka")] diff --git a/src/query/stream_schema_provider.rs b/src/query/stream_schema_provider.rs index 387a6e3d5..9162f3eb5 100644 --- a/src/query/stream_schema_provider.rs +++ b/src/query/stream_schema_provider.rs @@ -44,7 +44,7 @@ use datafusion::{ prelude::Expr, scalar::ScalarValue, }; -use futures_util::TryFutureExt; +use futures_util::{TryFutureExt, future::join_all}; use itertools::Itertools; use crate::{ @@ -391,17 +391,20 @@ impl StandardTableProvider { let pf = PartitionedFile::new(file_path, file.file_size); partitioned_files[index].push(pf); - columns.into_iter().for_each(|col| { - column_statistics - .entry(col.name) - .and_modify(|x| { - if let Some((stats, col_stats)) = x.as_ref().cloned().zip(col.stats.clone()) + for col in columns { + match column_statistics.entry(col.name) { + std::collections::hash_map::Entry::Occupied(mut entry) => { + if let (Some(existing), Some(new_stats)) = + (entry.get_mut().take(), col.stats) { - *x = Some(stats.update(col_stats)); + *entry.get_mut() = Some(existing.update(new_stats)); } - }) - .or_insert_with(|| col.stats.as_ref().cloned()); - }); + } + std::collections::hash_map::Entry::Vacant(entry) => { + entry.insert(col.stats); + } + } + } count += num_rows; } let statistics = self @@ -450,29 +453,41 @@ async fn collect_from_snapshot( stream_name: &str, tenant_id: &Option, ) -> Result, DataFusionError> { - let mut manifest_files = Vec::new(); - - for manifest_item in snapshot.manifests(time_filters) { - let manifest_opt = PARSEABLE - .metastore - .get_manifest( - stream_name, - manifest_item.time_lower_bound, - manifest_item.time_upper_bound, - Some(manifest_item.manifest_path), - tenant_id, - ) - .await - .map_err(|e| DataFusionError::Plan(e.to_string()))?; - if let Some(manifest) = manifest_opt { - manifest_files.push(manifest); - } else { - tracing::warn!( - "Manifest missing for stream={} [{:?} - {:?}]", - stream_name, - manifest_item.time_lower_bound, - manifest_item.time_upper_bound - ); + let manifest_items = snapshot.manifests(time_filters); + + // Fetch all manifests concurrently instead of sequentially + let manifest_futures: Vec<_> = manifest_items + .into_iter() + .map(|item| async move { + let result = PARSEABLE + .metastore + .get_manifest( + stream_name, + item.time_lower_bound, + item.time_upper_bound, + Some(item.manifest_path), + tenant_id, + ) + .await; + (result, item.time_lower_bound, item.time_upper_bound) + }) + .collect(); + + let results = join_all(manifest_futures).await; + + let mut manifest_files = Vec::with_capacity(results.len()); + for (result, lower, upper) in results { + match result { + Ok(Some(manifest)) => manifest_files.push(manifest), + Ok(None) => { + tracing::warn!( + "Manifest missing for stream={} [{:?} - {:?}]", + stream_name, + lower, + upper + ); + } + Err(e) => return Err(DataFusionError::Plan(e.to_string())), } } @@ -481,26 +496,20 @@ async fn collect_from_snapshot( .flat_map(|file| file.files) .rev() .collect(); - for filter in filters { - manifest_files.retain(|file| !file.can_be_pruned(filter)) - } - if let Some(limit) = limit { - let limit = limit as u64; - let mut curr_limit = 0; - let mut pos = None; - - for (index, file) in manifest_files.iter().enumerate() { - curr_limit += file.num_rows(); - if curr_limit >= limit { - pos = Some(index); - break; - } - } - if let Some(pos) = pos { - manifest_files.truncate(pos + 1); + // Single-pass: evaluate all filters per file and accumulate rows for limit. + let limit_rows = limit.map(|l| l as u64); + let mut curr_rows = 0u64; + manifest_files.retain(|file| { + if limit_rows.is_some_and(|l| curr_rows >= l) { + return false; } - } + if filters.iter().any(|f| file.can_be_pruned(f)) { + return false; + } + curr_rows += file.num_rows(); + true + }); Ok(manifest_files) } @@ -529,16 +538,43 @@ impl TableProvider for StandardTableProvider { let mut execution_plans = vec![]; let glob_storage = PARSEABLE.storage.get_object_store(); - let object_store_format: ObjectStoreFormat = serde_json::from_slice( - &PARSEABLE - .metastore - .get_stream_json(&self.stream, false, &self.tenant_id) - .await - .map_err(|e| DataFusionError::Plan(e.to_string()))?, - ) - .map_err(|e| DataFusionError::Plan(e.to_string()))?; + // In Query/Prism mode, get_all_stream_jsons already includes the base + // stream.json, so we extract time_partition and snapshot from it directly + // instead of making a separate get_stream_json call. + let (time_partition, merged_snapshot) = + if PARSEABLE.options.mode == Mode::Query || PARSEABLE.options.mode == Mode::Prism { + let obs = PARSEABLE + .metastore + .get_all_stream_jsons(&self.stream, None, &self.tenant_id) + .await; + let mut time_partition = None; + let mut snapshot = Snapshot::default(); + if let Ok(obs) = obs { + for ob in obs { + if let Ok(osf) = serde_json::from_slice::(&ob) { + if time_partition.is_none() { + time_partition = osf.time_partition; + } + snapshot.manifest_list.extend(osf.snapshot.manifest_list); + } + } + } + (time_partition, snapshot) + } else { + let object_store_format: ObjectStoreFormat = serde_json::from_slice( + &PARSEABLE + .metastore + .get_stream_json(&self.stream, false, &self.tenant_id) + .await + .map_err(|e| DataFusionError::Plan(e.to_string()))?, + ) + .map_err(|e| DataFusionError::Plan(e.to_string()))?; + ( + object_store_format.time_partition, + object_store_format.snapshot, + ) + }; - let time_partition = object_store_format.time_partition; let mut time_filters = extract_primary_filter(filters, &time_partition); if is_within_staging_window(&time_filters) { self.get_staging_execution_plan( @@ -551,27 +587,6 @@ impl TableProvider for StandardTableProvider { ) .await?; }; - let mut merged_snapshot = Snapshot::default(); - if PARSEABLE.options.mode == Mode::Query || PARSEABLE.options.mode == Mode::Prism { - let obs = PARSEABLE - .metastore - .get_all_stream_jsons(&self.stream, None, &self.tenant_id) - .await; - if let Ok(obs) = obs { - for ob in obs { - if let Ok(object_store_format) = - serde_json::from_slice::(&ob) - { - let snapshot = object_store_format.snapshot; - for manifest in snapshot.manifest_list { - merged_snapshot.manifest_list.push(manifest); - } - } - } - } - } else { - merged_snapshot = object_store_format.snapshot; - } // Is query timerange is overlapping with older data. // if true, then get listing table time filters and execution plan separately diff --git a/src/storage/azure_blob.rs b/src/storage/azure_blob.rs index b26853288..a3a5d4def 100644 --- a/src/storage/azure_blob.rs +++ b/src/storage/azure_blob.rs @@ -538,10 +538,10 @@ impl ObjectStorage for BlobStore { let tenant = tenant_id.as_deref().unwrap_or(DEFAULT_TENANT); let mut list_stream = self.client.list(Some(&prefix)); - let mut res = vec![]; + // Phase 1: Collect matching paths from listing + let mut matching_paths = vec![]; let mut files_scanned = 0; - // Note: We track each streaming list item retrieval while let Some(meta_result) = list_stream.next().await { let meta = match meta_result { Ok(meta) => meta, @@ -551,20 +551,11 @@ impl ObjectStorage for BlobStore { }; files_scanned += 1; - let ingestor_file = filter_func(meta.location.filename().unwrap().to_string()); - - if !ingestor_file { - continue; + if filter_func(meta.location.filename().unwrap().to_string()) { + let path = RelativePathBuf::from_path(meta.location.as_ref()) + .map_err(ObjectStorageError::PathError)?; + matching_paths.push(path); } - - let byts = self - .get_object( - RelativePath::from_path(meta.location.as_ref()) - .map_err(ObjectStorageError::PathError)?, - tenant_id, - ) - .await?; - res.push(byts); } // Record total files scanned @@ -575,6 +566,18 @@ impl ObjectStorage for BlobStore { tenant, ); increment_object_store_calls_by_date("LIST", &Utc::now().date_naive().to_string(), tenant); + + // Phase 2: Fetch all matching objects concurrently + let res: Vec = matching_paths + .into_iter() + .map(|path| { + let tenant_id = tenant_id.clone(); + async move { self.get_object(&path, &tenant_id).await } + }) + .collect::>() + .try_collect() + .await?; + Ok(res) } diff --git a/src/storage/gcs.rs b/src/storage/gcs.rs index b89d595ae..44033e8d9 100644 --- a/src/storage/gcs.rs +++ b/src/storage/gcs.rs @@ -520,10 +520,10 @@ impl ObjectStorage for Gcs { let tenant = tenant_id.as_deref().unwrap_or(DEFAULT_TENANT); let mut list_stream = self.client.list(Some(&prefix)); - let mut res = vec![]; + // Phase 1: Collect matching paths from listing + let mut matching_paths = vec![]; let mut files_scanned = 0; - // Note: We track each streaming list item retrieval while let Some(meta_result) = list_stream.next().await { let meta = match meta_result { Ok(meta) => meta, @@ -533,20 +533,11 @@ impl ObjectStorage for Gcs { }; files_scanned += 1; - let ingestor_file = filter_func(meta.location.filename().unwrap().to_string()); - - if !ingestor_file { - continue; + if filter_func(meta.location.filename().unwrap().to_string()) { + let path = RelativePathBuf::from_path(meta.location.as_ref()) + .map_err(ObjectStorageError::PathError)?; + matching_paths.push(path); } - - let byts = self - .get_object( - RelativePath::from_path(meta.location.as_ref()) - .map_err(ObjectStorageError::PathError)?, - tenant_id, - ) - .await?; - res.push(byts); } // Record total files scanned @@ -557,6 +548,18 @@ impl ObjectStorage for Gcs { tenant, ); increment_object_store_calls_by_date("LIST", &Utc::now().date_naive().to_string(), tenant); + + // Phase 2: Fetch all matching objects concurrently + let res: Vec = matching_paths + .into_iter() + .map(|path| { + let tenant_id = tenant_id.clone(); + async move { self.get_object(&path, &tenant_id).await } + }) + .collect::>() + .try_collect() + .await?; + Ok(res) } diff --git a/src/storage/localfs.rs b/src/storage/localfs.rs index 6f981981c..8dfd70f51 100644 --- a/src/storage/localfs.rs +++ b/src/storage/localfs.rs @@ -288,10 +288,11 @@ impl ObjectStorage for LocalFS { } }; - let mut res = Vec::new(); + // Phase 1: Collect matching file paths from directory listing + let mut matching_paths = Vec::new(); let mut files_scanned = 0; while let Some(entry) = entries.next_entry().await? { - let path = entry + let name = entry .path() .file_name() .ok_or(ObjectStorageError::NoSuchKey( @@ -302,32 +303,8 @@ impl ObjectStorage for LocalFS { .to_owned(); files_scanned += 1; - let ingestor_file = filter_func(path); - - if !ingestor_file { - continue; - } - - let file_result = fs::read(entry.path()).await; - match file_result { - Ok(file) => { - // Record total files scanned - increment_files_scanned_in_object_store_calls_by_date( - "GET", - 1, - &Utc::now().date_naive().to_string(), - tenant_str, - ); - increment_object_store_calls_by_date( - "GET", - &Utc::now().date_naive().to_string(), - tenant_str, - ); - res.push(file.into()); - } - Err(err) => { - return Err(err.into()); - } + if filter_func(name) { + matching_paths.push(entry.path()); } } @@ -343,6 +320,32 @@ impl ObjectStorage for LocalFS { tenant_str, ); + // Phase 2: Read all matching files concurrently + let tenant_str_owned = tenant_str.to_owned(); + let res: Vec = matching_paths + .into_iter() + .map(|path| { + let tenant = tenant_str_owned.clone(); + async move { + let file = fs::read(&path).await.map_err(ObjectStorageError::from)?; + increment_files_scanned_in_object_store_calls_by_date( + "GET", + 1, + &Utc::now().date_naive().to_string(), + &tenant, + ); + increment_object_store_calls_by_date( + "GET", + &Utc::now().date_naive().to_string(), + &tenant, + ); + Ok::(file.into()) + } + }) + .collect::>() + .try_collect() + .await?; + Ok(res) } diff --git a/src/storage/s3.rs b/src/storage/s3.rs index 6577abe96..20551fb0e 100644 --- a/src/storage/s3.rs +++ b/src/storage/s3.rs @@ -721,10 +721,10 @@ impl ObjectStorage for S3 { let mut list_stream = self.client.list(Some(&prefix)); - let mut res = vec![]; + // Phase 1: Collect matching paths from listing + let mut matching_paths = vec![]; let mut files_scanned = 0; - // Note: We track each streaming list item retrieval while let Some(meta_result) = list_stream.next().await { let meta = match meta_result { Ok(meta) => meta, @@ -734,21 +734,13 @@ impl ObjectStorage for S3 { }; files_scanned += 1; - let ingestor_file = filter_func(meta.location.filename().unwrap().to_string()); - - if !ingestor_file { - continue; + if filter_func(meta.location.filename().unwrap().to_string()) { + let path = RelativePathBuf::from_path(meta.location.as_ref()) + .map_err(ObjectStorageError::PathError)?; + matching_paths.push(path); } - - let byts = self - .get_object( - RelativePath::from_path(meta.location.as_ref()) - .map_err(ObjectStorageError::PathError)?, - tenant_id, - ) - .await?; - res.push(byts); } + // Record total files scanned increment_files_scanned_in_object_store_calls_by_date( "LIST", @@ -761,6 +753,18 @@ impl ObjectStorage for S3 { &Utc::now().date_naive().to_string(), tenant_str, ); + + // Phase 2: Fetch all matching objects concurrently + let res: Vec = matching_paths + .into_iter() + .map(|path| { + let tenant_id = tenant_id.clone(); + async move { self.get_object(&path, &tenant_id).await } + }) + .collect::>() + .try_collect() + .await?; + Ok(res) }