From 373734371b5f89a901dc788fac886e2cc91d6e1b Mon Sep 17 00:00:00 2001 From: Anant Vindal Date: Sun, 1 Mar 2026 12:06:16 +0530 Subject: [PATCH] fix: delete hottier, query cache metric --- src/hottier.rs | 19 +++++++++---------- src/query/stream_schema_provider.rs | 13 +++++++------ 2 files changed, 16 insertions(+), 16 deletions(-) diff --git a/src/hottier.rs b/src/hottier.rs index dec846a92..91e9ec361 100644 --- a/src/hottier.rs +++ b/src/hottier.rs @@ -105,8 +105,7 @@ impl HotTierManager { for tenant_id in tenants { for stream in PARSEABLE.streams.list(&tenant_id) { if self.check_stream_hot_tier_exists(&stream, &tenant_id) - && stream != current_stream - && tenant_id != *current_tenant_id + && !(stream == current_stream && tenant_id == *current_tenant_id) { let stream_hot_tier = self.get_hot_tier(&stream, &tenant_id).await?; total_hot_tier_size += &stream_hot_tier.size; @@ -213,13 +212,17 @@ impl HotTierManager { if !self.check_stream_hot_tier_exists(stream, tenant_id) { return Err(HotTierValidationError::NotFound(stream.to_owned()).into()); } - let path = self.hot_tier_path.join(stream); + let path = if let Some(tenant_id) = tenant_id.as_ref() { + self.hot_tier_path.join(tenant_id).join(stream) + } else { + self.hot_tier_path.join(stream) + }; fs::remove_dir_all(path).await?; Ok(()) } - ///put the hot tier metadata file for the stream + /// put the hot tier metadata file for the stream /// set the updated_date_range in the hot tier metadata file pub async fn put_hot_tier( &self, @@ -239,10 +242,6 @@ impl HotTierManager { stream: &str, tenant_id: &Option, ) -> Result { - // let path = self - // .hot_tier_path - // .join(stream) - // .join(STREAM_HOT_TIER_FILENAME); let path = if let Some(tenant_id) = tenant_id.as_ref() { self.hot_tier_path .join(tenant_id) @@ -258,7 +257,7 @@ impl HotTierManager { Ok(path) } - ///schedule the download of the hot tier files from S3 every minute + /// schedule the download of the hot tier files from S3 every minute pub fn download_from_s3<'a>(&'a self) -> Result<(), HotTierError> where 'a: 'static, @@ -282,7 +281,7 @@ impl HotTierManager { Ok(()) } - ///sync the hot tier files from S3 to the hot tier directory for all streams + /// sync the hot tier files from S3 to the hot tier directory for all streams async fn sync_hot_tier(&self) -> Result<(), HotTierError> { // Before syncing, check if pstats stream was created and needs hot tier if let Err(e) = self.create_pstats_hot_tier().await { diff --git a/src/query/stream_schema_provider.rs b/src/query/stream_schema_provider.rs index 77788a262..ccf31764b 100644 --- a/src/query/stream_schema_provider.rs +++ b/src/query/stream_schema_provider.rs @@ -629,16 +629,17 @@ impl TableProvider for StandardTableProvider { .await?; } if manifest_files.is_empty() { - QUERY_CACHE_HIT.with_label_values(&[&self.stream]).inc(); + QUERY_CACHE_HIT + .with_label_values(&[ + &self.stream, + self.tenant_id.as_deref().unwrap_or(DEFAULT_TENANT), + ]) + .inc(); return self.final_plan(execution_plans, projection); } let (partitioned_files, statistics) = self.partitioned_files(manifest_files); - // let object_store_url = if let Some(tenant_id) = self.tenant_id.as_ref() { - // glob_storage.store_url().join(tenant_id).unwrap() - // } else { - // glob_storage.store_url() - // }; + let object_store_url = glob_storage.store_url(); self.create_parquet_physical_plan(