Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
19 changes: 9 additions & 10 deletions src/hottier.rs
Original file line number Diff line number Diff line change
Expand Up @@ -105,8 +105,7 @@ impl HotTierManager {
for tenant_id in tenants {
for stream in PARSEABLE.streams.list(&tenant_id) {
if self.check_stream_hot_tier_exists(&stream, &tenant_id)
&& stream != current_stream
&& tenant_id != *current_tenant_id
&& !(stream == current_stream && tenant_id == *current_tenant_id)
{
let stream_hot_tier = self.get_hot_tier(&stream, &tenant_id).await?;
total_hot_tier_size += &stream_hot_tier.size;
Expand Down Expand Up @@ -213,13 +212,17 @@ impl HotTierManager {
if !self.check_stream_hot_tier_exists(stream, tenant_id) {
return Err(HotTierValidationError::NotFound(stream.to_owned()).into());
}
let path = self.hot_tier_path.join(stream);
let path = if let Some(tenant_id) = tenant_id.as_ref() {
self.hot_tier_path.join(tenant_id).join(stream)
} else {
self.hot_tier_path.join(stream)
};
fs::remove_dir_all(path).await?;

Ok(())
}

///put the hot tier metadata file for the stream
/// put the hot tier metadata file for the stream
/// set the updated_date_range in the hot tier metadata file
pub async fn put_hot_tier(
&self,
Expand All @@ -239,10 +242,6 @@ impl HotTierManager {
stream: &str,
tenant_id: &Option<String>,
) -> Result<object_store::path::Path, HotTierError> {
// let path = self
// .hot_tier_path
// .join(stream)
// .join(STREAM_HOT_TIER_FILENAME);
let path = if let Some(tenant_id) = tenant_id.as_ref() {
self.hot_tier_path
.join(tenant_id)
Expand All @@ -258,7 +257,7 @@ impl HotTierManager {
Ok(path)
}

///schedule the download of the hot tier files from S3 every minute
/// schedule the download of the hot tier files from S3 every minute
pub fn download_from_s3<'a>(&'a self) -> Result<(), HotTierError>
where
'a: 'static,
Expand All @@ -282,7 +281,7 @@ impl HotTierManager {
Ok(())
}

///sync the hot tier files from S3 to the hot tier directory for all streams
/// sync the hot tier files from S3 to the hot tier directory for all streams
async fn sync_hot_tier(&self) -> Result<(), HotTierError> {
// Before syncing, check if pstats stream was created and needs hot tier
if let Err(e) = self.create_pstats_hot_tier().await {
Expand Down
13 changes: 7 additions & 6 deletions src/query/stream_schema_provider.rs
Original file line number Diff line number Diff line change
Expand Up @@ -629,16 +629,17 @@ impl TableProvider for StandardTableProvider {
.await?;
}
if manifest_files.is_empty() {
QUERY_CACHE_HIT.with_label_values(&[&self.stream]).inc();
QUERY_CACHE_HIT
.with_label_values(&[
&self.stream,
self.tenant_id.as_deref().unwrap_or(DEFAULT_TENANT),
])
.inc();
return self.final_plan(execution_plans, projection);
}

let (partitioned_files, statistics) = self.partitioned_files(manifest_files);
// let object_store_url = if let Some(tenant_id) = self.tenant_id.as_ref() {
// glob_storage.store_url().join(tenant_id).unwrap()
// } else {
// glob_storage.store_url()
// };

let object_store_url = glob_storage.store_url();

self.create_parquet_physical_plan(
Expand Down
Loading