Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
27 changes: 24 additions & 3 deletions Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

1 change: 1 addition & 0 deletions Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -166,6 +166,7 @@ prost = "0.13.1"
dashmap = "6.1.0"
parking_lot = "0.12.5"
indexmap = { version = "2.13.0", features = ["serde"] }
lru = "0.16.3"

[build-dependencies]
cargo_toml = "0.21"
Expand Down
55 changes: 27 additions & 28 deletions src/catalog/snapshot.rs
Original file line number Diff line number Diff line change
Expand Up @@ -40,34 +40,33 @@ impl Default for Snapshot {

impl super::Snapshot for Snapshot {
fn manifests(&self, time_predicates: &[PartialTimeFilter]) -> Vec<ManifestItem> {
let mut manifests = self.manifest_list.clone();
for predicate in time_predicates {
match predicate {
PartialTimeFilter::Low(Bound::Included(time)) => manifests.retain(|item| {
let time = time.and_utc();
item.time_upper_bound >= time
}),
PartialTimeFilter::Low(Bound::Excluded(time)) => manifests.retain(|item| {
let time = time.and_utc();
item.time_upper_bound > time
}),
PartialTimeFilter::High(Bound::Included(time)) => manifests.retain(|item| {
let time = time.and_utc();
item.time_lower_bound <= time
}),
PartialTimeFilter::High(Bound::Excluded(time)) => manifests.retain(|item| {
let time = time.and_utc();
item.time_lower_bound < time
}),
PartialTimeFilter::Eq(time) => manifests.retain(|item| {
let time = time.and_utc();
item.time_lower_bound <= time && time <= item.time_upper_bound
}),
_ => (),
}
}

manifests
// Avoid cloning the entire manifest list upfront; instead filter by reference
// and only clone items that match all predicates.
self.manifest_list
.iter()
.filter(|item| {
time_predicates.iter().all(|predicate| match predicate {
PartialTimeFilter::Low(Bound::Included(time)) => {
item.time_upper_bound >= time.and_utc()
}
PartialTimeFilter::Low(Bound::Excluded(time)) => {
item.time_upper_bound > time.and_utc()
}
PartialTimeFilter::High(Bound::Included(time)) => {
item.time_lower_bound <= time.and_utc()
}
PartialTimeFilter::High(Bound::Excluded(time)) => {
item.time_lower_bound < time.and_utc()
}
PartialTimeFilter::Eq(time) => {
let time = time.and_utc();
item.time_lower_bound <= time && time <= item.time_upper_bound
}
_ => true,
})
})
.cloned()
.collect()
}
}

Expand Down
12 changes: 11 additions & 1 deletion src/handlers/http/query.rs
Original file line number Diff line number Diff line change
Expand Up @@ -460,8 +460,18 @@ pub async fn create_streams_for_distributed(
if PARSEABLE.options.mode != Mode::Query && PARSEABLE.options.mode != Mode::Prism {
return Ok(());
}
// Skip streams already loaded in memory — avoids redundant S3 list_streams + metadata calls
let streams_to_create: Vec<_> = streams
.into_iter()
.filter(|stream_name| PARSEABLE.get_stream(stream_name, tenant_id).is_err())
.collect();

if streams_to_create.is_empty() {
return Ok(());
}

let mut join_set = JoinSet::new();
for stream_name in streams {
for stream_name in streams_to_create {
let id = tenant_id.to_owned();
join_set.spawn(async move {
let result = PARSEABLE
Expand Down
116 changes: 92 additions & 24 deletions src/hottier.rs
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,8 @@ use std::{
path::{Path, PathBuf},
};

use dashmap::{DashMap, DashSet};

use crate::{
catalog::manifest::{File, Manifest},
handlers::http::cluster::PMETA_STREAM_NAME,
Expand Down Expand Up @@ -67,6 +69,12 @@ pub struct StreamHotTier {
pub struct HotTierManager {
filesystem: LocalFileSystem,
hot_tier_path: &'static Path,
/// In-memory index of files present in hot tier: file_path -> file_size.
/// Avoids per-file filesystem stat calls during query.
cached_files: DashMap<String, u64>,
/// Tracks which streams have hot tier enabled (key: "tenant/stream" or "stream").
/// Avoids blocking filesystem stat calls in async context on every query.
enabled_streams: DashSet<String>,
}

impl HotTierManager {
Expand All @@ -75,6 +83,8 @@ impl HotTierManager {
HotTierManager {
filesystem: LocalFileSystem::new(),
hot_tier_path,
cached_files: DashMap::new(),
enabled_streams: DashSet::new(),
}
}

Expand Down Expand Up @@ -213,12 +223,23 @@ impl HotTierManager {
if !self.check_stream_hot_tier_exists(stream, tenant_id) {
return Err(HotTierValidationError::NotFound(stream.to_owned()).into());
}
// Build the prefix to match index entries for this stream
let prefix = if let Some(tenant_id) = tenant_id.as_ref() {
format!("{tenant_id}/{stream}/")
} else {
format!("{stream}/")
};
// Remove all cached entries for this stream from the in-memory index
self.cached_files.retain(|k, _| !k.starts_with(&prefix));

let path = if let Some(tenant_id) = tenant_id.as_ref() {
self.hot_tier_path.join(tenant_id).join(stream)
} else {
self.hot_tier_path.join(stream)
};
fs::remove_dir_all(path).await?;
self.enabled_streams
.remove(&Self::stream_key(stream, tenant_id));

Ok(())
}
Expand All @@ -234,6 +255,8 @@ impl HotTierManager {
let path = self.hot_tier_file_path(stream, tenant_id)?;
let bytes = serde_json::to_vec(&hot_tier)?.into();
self.filesystem.put(&path, bytes).await?;
self.enabled_streams
.insert(Self::stream_key(stream, tenant_id));
Ok(())
}

Expand Down Expand Up @@ -263,6 +286,47 @@ impl HotTierManager {
where
'a: 'static,
{
// Populate the in-memory cache from existing hot tier files on disk.
// Runs in a blocking thread pool task to avoid stalling the async runtime.
// Queries arriving before this completes will simply not find files in the
// cache and fall through to remote storage (same behavior as before caching).
let hot_tier_path = self.hot_tier_path;
let cached_files = &self.cached_files;
let enabled_streams = &self.enabled_streams;
drop(tokio::task::spawn_blocking(move || {
fn walk_dir(
dir: &Path,
hot_tier_root: &Path,
index: &DashMap<String, u64>,
streams: &DashSet<String>,
) {
let entries = match std::fs::read_dir(dir) {
Ok(entries) => entries,
Err(_) => return,
};
for entry in entries.flatten() {
let path = entry.path();
if path.is_dir() {
walk_dir(&path, hot_tier_root, index, streams);
} else if path.extension().is_some_and(|ext| ext == "parquet")
&& let Ok(meta) = entry.metadata()
&& let Ok(rel) = path.strip_prefix(hot_tier_root)
{
index.insert(rel.to_string_lossy().into_owned(), meta.len());
} else if path
.file_name()
.is_some_and(|n| n == STREAM_HOT_TIER_FILENAME)
&& let Ok(rel) = path.strip_prefix(hot_tier_root)
&& let Some(stream_dir) = rel.parent()
{
// Register "tenant/stream" or "stream" in the enabled set
streams.insert(stream_dir.to_string_lossy().into_owned());
}
}
}
walk_dir(hot_tier_path, hot_tier_path, cached_files, enabled_streams);
}));

let mut scheduler = AsyncScheduler::new();
scheduler
.every(HOT_TIER_SYNC_DURATION)
Expand Down Expand Up @@ -444,6 +508,9 @@ impl HotTierManager {
.get_object(&parquet_file_path, tenant_id)
.await?;
file.write_all(&parquet_data).await?;
// Update the in-memory index with the newly downloaded file
self.cached_files
.insert(parquet_file.file_path.clone(), parquet_file.file_size);
*parquet_file_size += parquet_file.file_size;
stream_hot_tier.used_size = *parquet_file_size;

Expand Down Expand Up @@ -550,33 +617,29 @@ impl HotTierManager {
}
}

/// Returns the list of manifest files present in hot tier directory for the stream
/// Returns the list of manifest files present in hot tier directory for the stream.
/// Uses the in-memory index for O(1) lookups per file instead of filesystem stat calls.
pub async fn get_hot_tier_manifest_files(
&self,
manifest_files: &mut Vec<File>,
) -> Result<Vec<File>, HotTierError> {
// Check which query-relevant files exist locally in the hot tier directory.
let mut hot_tier_files = Vec::new();
let mut remaining = Vec::with_capacity(manifest_files.len());

for file in manifest_files.drain(..) {
let hot_tier_path = self.hot_tier_path.join(&file.file_path);
if let Ok(meta) = fs::metadata(&hot_tier_path).await
&& meta.len() == file.file_size
if self
.cached_files
.get(&file.file_path)
.is_some_and(|size| *size == file.file_size)
{
hot_tier_files.push(file);
continue;
} else {
remaining.push(file);
}

remaining.push(file);
}

*manifest_files = remaining;

// Sort both lists in descending order by file path.
hot_tier_files.sort_unstable_by(|a, b| b.file_path.cmp(&a.file_path));
manifest_files.sort_unstable_by(|a, b| b.file_path.cmp(&a.file_path));

Ok(hot_tier_files)
}

Expand Down Expand Up @@ -612,19 +675,21 @@ impl HotTierManager {
Ok(hot_tier_parquet_files)
}

///check if the hot tier metadata file exists for the stream
pub fn check_stream_hot_tier_exists(&self, stream: &str, tenant_id: &Option<String>) -> bool {
let path = if let Some(tenant_id) = tenant_id.as_ref() {
self.hot_tier_path
.join(tenant_id)
.join(stream)
.join(STREAM_HOT_TIER_FILENAME)
/// Key for the enabled_streams set: "tenant/stream" or "stream".
fn stream_key(stream: &str, tenant_id: &Option<String>) -> String {
if let Some(tenant_id) = tenant_id.as_ref() {
format!("{tenant_id}/{stream}")
} else {
self.hot_tier_path
.join(stream)
.join(STREAM_HOT_TIER_FILENAME)
};
path.exists()
stream.to_owned()
}
}

/// Check if the hot tier is enabled for the stream.
/// Uses the in-memory set populated at startup and kept in sync
/// by put_hot_tier / delete_hot_tier, avoiding blocking I/O.
pub fn check_stream_hot_tier_exists(&self, stream: &str, tenant_id: &Option<String>) -> bool {
self.enabled_streams
.contains(&Self::stream_key(stream, tenant_id))
}

///delete the parquet file from the hot tier directory for the stream
Expand Down Expand Up @@ -686,6 +751,9 @@ impl HotTierManager {
)
.await?;

// Remove deleted file from in-memory index
self.cached_files.remove(&file_to_delete.file_path);

stream_hot_tier.used_size -= file_size;
stream_hot_tier.available_size += file_size;
self.put_hot_tier(stream, stream_hot_tier, tenant_id)
Expand Down
Loading
Loading