From 400255b00f16ad3d301faa25da2c16ddce63c4dd Mon Sep 17 00:00:00 2001 From: Nikhil Sinha Date: Sun, 22 Feb 2026 00:42:54 +1100 Subject: [PATCH 1/4] add system defined tags and free form labels to datasets PUT /api/v1/logstream/{name} accepts X-P-Dataset-Tags and X-P-Dataset-Labels headers (comma-separated) on stream creation PUT /api/prism/v1/datasets/{name} - update tags and labels GET /api/prism/v1/datasets/{name}/correlated - find datasets sharing tags or labels GET /api/prism/v1/datasets/tags/{tag} - find all datasets with a specific tag include tags and labels in home api response --- Cargo.lock | 1 + src/connectors/kafka/processor.rs | 3 +- src/handlers/http/ingest.rs | 6 +- src/handlers/http/mod.rs | 1 + src/handlers/http/modal/server.rs | 41 ++++++++++--- .../http/modal/utils/logstream_utils.rs | 33 ++++++----- src/handlers/mod.rs | 58 ++++++++++++++++--- src/metadata.rs | 9 ++- src/migration/mod.rs | 6 +- src/parseable/mod.rs | 27 ++++++--- src/parseable/streams.rs | 24 +++++++- src/prism/home/mod.rs | 18 ++++-- src/storage/field_stats.rs | 3 +- src/storage/mod.rs | 18 ++++-- src/storage/object_storage.rs | 25 ++++++++ 15 files changed, 210 insertions(+), 63 deletions(-) diff --git a/Cargo.lock b/Cargo.lock index 43b16d4ba..2dcde59ae 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -3734,6 +3734,7 @@ dependencies = [ "once_cell", "openid", "opentelemetry-proto", + "parking_lot", "parquet", "path-clean", "prometheus", diff --git a/src/connectors/kafka/processor.rs b/src/connectors/kafka/processor.rs index 4a61258d7..f918b6d26 100644 --- a/src/connectors/kafka/processor.rs +++ b/src/connectors/kafka/processor.rs @@ -64,7 +64,8 @@ impl ParseableSinkProcessor { vec![log_source_entry], TelemetryType::default(), tenant_id, - None, + vec![], + vec![], ) .await?; diff --git a/src/handlers/http/ingest.rs b/src/handlers/http/ingest.rs index 22cfb7413..ac4176005 100644 --- a/src/handlers/http/ingest.rs +++ b/src/handlers/http/ingest.rs @@ -120,7 +120,8 @@ pub async fn ingest( vec![log_source_entry.clone()], telemetry_type, &tenant_id, - None, + vec![], + vec![], ) .await .map_err(|e| { @@ -239,7 +240,8 @@ pub async fn setup_otel_stream( vec![log_source_entry.clone()], telemetry_type, &tenant_id, - None, + vec![], + vec![], ) .await?; let mut time_partition = None; diff --git a/src/handlers/http/mod.rs b/src/handlers/http/mod.rs index 993a40b10..a4136e939 100644 --- a/src/handlers/http/mod.rs +++ b/src/handlers/http/mod.rs @@ -32,6 +32,7 @@ pub mod about; pub mod alerts; pub mod cluster; pub mod correlation; +pub mod datasets; pub mod demo_data; pub mod health_check; pub mod ingest; diff --git a/src/handlers/http/modal/server.rs b/src/handlers/http/modal/server.rs index 4eb464705..34109e023 100644 --- a/src/handlers/http/modal/server.rs +++ b/src/handlers/http/modal/server.rs @@ -198,14 +198,39 @@ impl Server { } pub fn get_prism_datasets() -> Scope { - web::scope("/datasets").route( - "", - web::post() - .to(http::prism_logstream::post_datasets) - .authorize_for_resource(Action::GetStreamInfo) - .authorize_for_resource(Action::GetStats) - .authorize_for_resource(Action::GetRetention), - ) + web::scope("/datasets") + .route( + "", + web::post() + .to(http::prism_logstream::post_datasets) + .authorize_for_resource(Action::GetStreamInfo) + .authorize_for_resource(Action::GetStats) + .authorize_for_resource(Action::GetRetention), + ) + .route( + "/tags/{tag}", + web::get() + .to(http::datasets::get_datasets_by_tag) + .authorize_for_resource(Action::GetStreamInfo), + ) + .route( + "/{name}/correlated", + web::get() + .to(http::datasets::get_correlated_datasets) + .authorize_for_resource(Action::GetStreamInfo), + ) + .route( + "/{name}/tags", + web::put() + .to(http::datasets::put_dataset_tags) + .authorize_for_resource(Action::CreateStream), + ) + .route( + "/{name}/labels", + web::put() + .to(http::datasets::put_dataset_labels) + .authorize_for_resource(Action::CreateStream), + ) } pub fn get_demo_data_webscope() -> Scope { diff --git a/src/handlers/http/modal/utils/logstream_utils.rs b/src/handlers/http/modal/utils/logstream_utils.rs index 7b93fafbc..7930df77f 100644 --- a/src/handlers/http/modal/utils/logstream_utils.rs +++ b/src/handlers/http/modal/utils/logstream_utils.rs @@ -16,17 +16,18 @@ * */ +use actix_web::http::header::HeaderMap; + use crate::{ event::format::LogSource, handlers::{ - CUSTOM_PARTITION_KEY, DATASET_TAG_KEY, DatasetTag, LOG_SOURCE_KEY, STATIC_SCHEMA_FLAG, - STREAM_TYPE_KEY, TELEMETRY_TYPE_KEY, TIME_PARTITION_KEY, TIME_PARTITION_LIMIT_KEY, - TelemetryType, UPDATE_STREAM_KEY, + CUSTOM_PARTITION_KEY, DATASET_LABELS_KEY, DATASET_TAG_KEY, DATASET_TAGS_KEY, DatasetTag, + LOG_SOURCE_KEY, STATIC_SCHEMA_FLAG, STREAM_TYPE_KEY, TELEMETRY_TYPE_KEY, + TIME_PARTITION_KEY, TIME_PARTITION_LIMIT_KEY, TelemetryType, UPDATE_STREAM_KEY, + parse_dataset_labels, parse_dataset_tags, }, storage::StreamType, }; -use actix_web::http::header::HeaderMap; -use tracing::warn; #[derive(Debug, Default)] pub struct PutStreamHeaders { @@ -38,7 +39,8 @@ pub struct PutStreamHeaders { pub stream_type: StreamType, pub log_source: LogSource, pub telemetry_type: TelemetryType, - pub dataset_tag: Option, + pub dataset_tags: Vec, + pub dataset_labels: Vec, } impl From<&HeaderMap> for PutStreamHeaders { @@ -72,16 +74,17 @@ impl From<&HeaderMap> for PutStreamHeaders { .get(TELEMETRY_TYPE_KEY) .and_then(|v| v.to_str().ok()) .map_or(TelemetryType::Logs, TelemetryType::from), - dataset_tag: headers - .get(DATASET_TAG_KEY) + dataset_tags: headers + .get(DATASET_TAGS_KEY) + .or_else(|| headers.get(DATASET_TAG_KEY)) .and_then(|v| v.to_str().ok()) - .and_then(|v| match DatasetTag::try_from(v) { - Ok(tag) => Some(tag), - Err(err) => { - warn!("Invalid dataset tag '{v}': {err}"); - None - } - }), + .map(parse_dataset_tags) + .unwrap_or_default(), + dataset_labels: headers + .get(DATASET_LABELS_KEY) + .and_then(|v| v.to_str().ok()) + .map(parse_dataset_labels) + .unwrap_or_default(), } } } diff --git a/src/handlers/mod.rs b/src/handlers/mod.rs index 034e524b1..ebbea833b 100644 --- a/src/handlers/mod.rs +++ b/src/handlers/mod.rs @@ -16,9 +16,11 @@ * */ +use std::collections::HashSet; use std::fmt::Display; use serde::{Deserialize, Serialize}; +use tracing::warn; pub mod airplane; pub mod http; @@ -36,6 +38,8 @@ pub const UPDATE_STREAM_KEY: &str = "x-p-update-stream"; pub const STREAM_TYPE_KEY: &str = "x-p-stream-type"; pub const TELEMETRY_TYPE_KEY: &str = "x-p-telemetry-type"; pub const DATASET_TAG_KEY: &str = "x-p-dataset-tag"; +pub const DATASET_TAGS_KEY: &str = "x-p-dataset-tags"; +pub const DATASET_LABELS_KEY: &str = "x-p-dataset-labels"; pub const TENANT_ID: &str = "x-p-tenant"; const COOKIE_AGE_DAYS: usize = 7; const SESSION_COOKIE_NAME: &str = "session"; @@ -85,12 +89,14 @@ impl Display for TelemetryType { } /// Tag for categorizing datasets/streams by observability domain -#[derive(Debug, Clone, Copy, Serialize, Deserialize, PartialEq, Eq)] +#[derive(Debug, Clone, Copy, Serialize, Deserialize, PartialEq, Eq, Hash)] #[serde(rename_all = "kebab-case")] pub enum DatasetTag { - AgentObservability, - K8sObservability, + AgentMonitoring, + K8sMonitoring, DatabaseObservability, + ApplicationMonitoring, + ServiceMap, } impl TryFrom<&str> for DatasetTag { @@ -98,11 +104,13 @@ impl TryFrom<&str> for DatasetTag { fn try_from(s: &str) -> Result { match s.to_lowercase().as_str() { - "agent-observability" => Ok(DatasetTag::AgentObservability), - "k8s-observability" => Ok(DatasetTag::K8sObservability), + "agent-monitoring" => Ok(DatasetTag::AgentMonitoring), + "k8s-monitoring" => Ok(DatasetTag::K8sMonitoring), "database-observability" => Ok(DatasetTag::DatabaseObservability), + "application-monitoring" => Ok(DatasetTag::ApplicationMonitoring), + "service-map" => Ok(DatasetTag::ServiceMap), _ => Err( - "Invalid dataset tag. Supported values: agent-observability, k8s-observability, database-observability", + "Invalid dataset tag. Supported values: agent-monitoring, k8s-monitoring, database-observability, application-monitoring, service-map", ), } } @@ -111,9 +119,43 @@ impl TryFrom<&str> for DatasetTag { impl Display for DatasetTag { fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result { f.write_str(match self { - DatasetTag::AgentObservability => "agent-observability", - DatasetTag::K8sObservability => "k8s-observability", + DatasetTag::AgentMonitoring => "agent-monitoring", + DatasetTag::K8sMonitoring => "k8s-monitoring", DatasetTag::DatabaseObservability => "database-observability", + DatasetTag::ApplicationMonitoring => "application-monitoring", + DatasetTag::ServiceMap => "service-map", }) } } + +pub fn parse_dataset_tags(header_value: &str) -> Vec { + header_value + .split(',') + .filter_map(|s| { + let trimmed = s.trim(); + if trimmed.is_empty() { + None + } else { + match DatasetTag::try_from(trimmed) { + Ok(tag) => Some(tag), + Err(err) => { + warn!("Invalid dataset tag '{trimmed}': {err}"); + None + } + } + } + }) + .collect::>() + .into_iter() + .collect() +} + +pub fn parse_dataset_labels(header_value: &str) -> Vec { + header_value + .split(',') + .map(|s| s.trim().to_string()) + .filter(|s| !s.is_empty()) + .collect::>() + .into_iter() + .collect() +} diff --git a/src/metadata.rs b/src/metadata.rs index b0fa024fd..9ca1dd1e7 100644 --- a/src/metadata.rs +++ b/src/metadata.rs @@ -93,7 +93,8 @@ pub struct LogStreamMetadata { pub stream_type: StreamType, pub log_source: Vec, pub telemetry_type: TelemetryType, - pub dataset_tag: Option, + pub dataset_tags: Vec, + pub dataset_labels: Vec, } impl LogStreamMetadata { @@ -109,7 +110,8 @@ impl LogStreamMetadata { schema_version: SchemaVersion, log_source: Vec, telemetry_type: TelemetryType, - dataset_tag: Option, + dataset_tags: Vec, + dataset_labels: Vec, ) -> Self { LogStreamMetadata { created_at: if created_at.is_empty() { @@ -134,7 +136,8 @@ impl LogStreamMetadata { schema_version, log_source, telemetry_type, - dataset_tag, + dataset_tags, + dataset_labels, ..Default::default() } } diff --git a/src/migration/mod.rs b/src/migration/mod.rs index 7d7027fe4..8e67d59e6 100644 --- a/src/migration/mod.rs +++ b/src/migration/mod.rs @@ -454,7 +454,8 @@ async fn setup_logstream_metadata( stream_type, log_source, telemetry_type, - dataset_tag, + dataset_tags, + dataset_labels, .. } = serde_json::from_value(stream_metadata_value).unwrap_or_default(); @@ -500,7 +501,8 @@ async fn setup_logstream_metadata( stream_type, log_source, telemetry_type, - dataset_tag, + dataset_tags, + dataset_labels, }; Ok(metadata) diff --git a/src/parseable/mod.rs b/src/parseable/mod.rs index 2b379cfd2..271ff8a05 100644 --- a/src/parseable/mod.rs +++ b/src/parseable/mod.rs @@ -415,7 +415,8 @@ impl Parseable { let schema_version = stream_metadata.schema_version; let log_source = stream_metadata.log_source; let telemetry_type = stream_metadata.telemetry_type; - let dataset_tag = stream_metadata.dataset_tag; + let dataset_tags = stream_metadata.dataset_tags; + let dataset_labels = stream_metadata.dataset_labels; let mut metadata = LogStreamMetadata::new( created_at, time_partition, @@ -427,7 +428,8 @@ impl Parseable { schema_version, log_source, telemetry_type, - dataset_tag, + dataset_tags, + dataset_labels, ); // Set hot tier fields from the stored metadata @@ -521,7 +523,8 @@ impl Parseable { log_source: Vec, telemetry_type: TelemetryType, tenant_id: &Option, - dataset_tag: Option, + dataset_tags: Vec, + dataset_labels: Vec, ) -> Result { if self.streams.contains(stream_name, tenant_id) { return Ok(true); @@ -554,7 +557,8 @@ impl Parseable { log_source, telemetry_type, tenant_id, - dataset_tag, + dataset_tags, + dataset_labels, ) .await?; @@ -631,7 +635,8 @@ impl Parseable { stream_type, log_source, telemetry_type, - dataset_tag, + dataset_tags, + dataset_labels, } = headers.into(); let stream_in_memory_dont_update = @@ -705,7 +710,8 @@ impl Parseable { vec![log_source_entry], telemetry_type, tenant_id, - dataset_tag, + dataset_tags, + dataset_labels, ) .await?; @@ -767,7 +773,8 @@ impl Parseable { log_source: Vec, telemetry_type: TelemetryType, tenant_id: &Option, - dataset_tag: Option, + dataset_tags: Vec, + dataset_labels: Vec, ) -> Result<(), CreateStreamError> { // fail to proceed if invalid stream name if stream_type != StreamType::Internal { @@ -792,7 +799,8 @@ impl Parseable { }, log_source: log_source.clone(), telemetry_type, - dataset_tag, + dataset_tags: dataset_tags.clone(), + dataset_labels: dataset_labels.clone(), ..Default::default() }; @@ -822,7 +830,8 @@ impl Parseable { SchemaVersion::V1, // New stream log_source, telemetry_type, - dataset_tag, + dataset_tags, + dataset_labels, ); let ingestor_id = INGESTOR_META .get() diff --git a/src/parseable/streams.rs b/src/parseable/streams.rs index 0bf8bf2ca..1230946ab 100644 --- a/src/parseable/streams.rs +++ b/src/parseable/streams.rs @@ -963,8 +963,28 @@ impl Stream { self.metadata.read().expect(LOCK_EXPECT).log_source.clone() } - pub fn get_dataset_tag(&self) -> Option { - self.metadata.read().expect(LOCK_EXPECT).dataset_tag + pub fn get_dataset_tags(&self) -> Vec { + self.metadata + .read() + .expect(LOCK_EXPECT) + .dataset_tags + .clone() + } + + pub fn get_dataset_labels(&self) -> Vec { + self.metadata + .read() + .expect(LOCK_EXPECT) + .dataset_labels + .clone() + } + + pub fn set_dataset_tags(&self, tags: Vec) { + self.metadata.write().expect(LOCK_EXPECT).dataset_tags = tags; + } + + pub fn set_dataset_labels(&self, labels: Vec) { + self.metadata.write().expect(LOCK_EXPECT).dataset_labels = labels; } pub fn add_log_source(&self, log_source: LogSourceEntry) { diff --git a/src/prism/home/mod.rs b/src/prism/home/mod.rs index 30b699c13..484bb8ef6 100644 --- a/src/prism/home/mod.rs +++ b/src/prism/home/mod.rs @@ -45,7 +45,8 @@ struct StreamMetadata { time_partition: Option, dataset_format: LogSource, ingestion: bool, - tag: Option, + tags: Vec, + labels: Vec, } type StreamMetadataResponse = Result; @@ -59,8 +60,10 @@ pub struct DataSet { time_partition: Option, dataset_format: LogSource, ingestion: bool, - #[serde(skip_serializing_if = "Option::is_none")] - tag: Option, + #[serde(default, skip_serializing_if = "Vec::is_empty")] + tags: Vec, + #[serde(default, skip_serializing_if = "Vec::is_empty")] + labels: Vec, } #[derive(Debug, Serialize, Deserialize, Default)] @@ -147,7 +150,8 @@ pub async fn generate_home_response( time_partition: sm.time_partition, dataset_format: sm.dataset_format, ingestion: sm.ingestion, - tag: sm.tag, + tags: sm.tags, + labels: sm.labels, }); } Err(e) => { @@ -233,7 +237,8 @@ async fn get_stream_metadata(stream: String, tenant_id: &Option) -> Stre let ingested = stream_jsons .iter() .any(|s| s.stats.current_stats.events > 0); - let dataset_tag = stream_jsons[0].dataset_tag; + let dataset_tags = stream_jsons[0].dataset_tags.clone(); + let dataset_labels = stream_jsons[0].dataset_labels.clone(); Ok(StreamMetadata { stream, @@ -242,7 +247,8 @@ async fn get_stream_metadata(stream: String, tenant_id: &Option) -> Stre time_partition, dataset_format, ingestion: ingested, - tag: dataset_tag, + tags: dataset_tags, + labels: dataset_labels, }) } diff --git a/src/storage/field_stats.rs b/src/storage/field_stats.rs index 8ac33e8a3..a054a3adb 100644 --- a/src/storage/field_stats.rs +++ b/src/storage/field_stats.rs @@ -154,7 +154,8 @@ pub async fn calculate_field_stats( vec![log_source_entry], TelemetryType::Logs, tenant_id, - None, + vec![], + vec![], ) .await?; let vec_json = apply_generic_flattening_for_partition( diff --git a/src/storage/mod.rs b/src/storage/mod.rs index 84d48d3c3..1694e5f5b 100644 --- a/src/storage/mod.rs +++ b/src/storage/mod.rs @@ -131,8 +131,10 @@ pub struct ObjectStoreFormat { pub log_source: Vec, #[serde(default)] pub telemetry_type: TelemetryType, - #[serde(default, skip_serializing_if = "Option::is_none")] - pub dataset_tag: Option, + #[serde(default, skip_serializing_if = "Vec::is_empty")] + pub dataset_tags: Vec, + #[serde(default, skip_serializing_if = "Vec::is_empty")] + pub dataset_labels: Vec, } impl MetastoreObject for ObjectStoreFormat { @@ -173,8 +175,10 @@ pub struct StreamInfo { pub telemetry_type: TelemetryType, #[serde(default)] pub hot_tier_enabled: bool, - #[serde(default, skip_serializing_if = "Option::is_none")] - pub dataset_tag: Option, + #[serde(default, skip_serializing_if = "Vec::is_empty")] + pub dataset_tags: Vec, + #[serde(default, skip_serializing_if = "Vec::is_empty")] + pub dataset_labels: Vec, } impl StreamInfo { @@ -197,7 +201,8 @@ impl StreamInfo { log_source: metadata.log_source.clone(), telemetry_type: metadata.telemetry_type, hot_tier_enabled: metadata.hot_tier_enabled, - dataset_tag: metadata.dataset_tag, + dataset_tags: metadata.dataset_tags.clone(), + dataset_labels: metadata.dataset_labels.clone(), } } } @@ -279,7 +284,8 @@ impl Default for ObjectStoreFormat { hot_tier: None, log_source: vec![LogSourceEntry::default()], telemetry_type: TelemetryType::Logs, - dataset_tag: None, + dataset_tags: Vec::new(), + dataset_labels: Vec::new(), } } } diff --git a/src/storage/object_storage.rs b/src/storage/object_storage.rs index 398b7e0cf..54000b3d4 100644 --- a/src/storage/object_storage.rs +++ b/src/storage/object_storage.rs @@ -45,6 +45,7 @@ use ulid::Ulid; use crate::catalog::{self, snapshot::Snapshot}; use crate::event::format::LogSource; use crate::event::format::LogSourceEntry; +use crate::handlers::DatasetTag; use crate::handlers::http::fetch_schema; use crate::handlers::http::modal::ingest_server::INGESTOR_EXPECT; use crate::handlers::http::modal::ingest_server::INGESTOR_META; @@ -495,6 +496,30 @@ pub trait ObjectStorage: Debug + Send + Sync + 'static { Ok(()) } + async fn update_dataset_tags_and_labels_in_stream( + &self, + stream_name: &str, + tags: &[DatasetTag], + labels: &[String], + ) -> Result<(), ObjectStorageError> { + let mut format: ObjectStoreFormat = serde_json::from_slice( + &PARSEABLE + .metastore + .get_stream_json(stream_name, false) + .await + .map_err(|e| ObjectStorageError::MetastoreError(Box::new(e.to_detail())))?, + )?; + format.dataset_tags = tags.to_owned(); + format.dataset_labels = labels.to_owned(); + PARSEABLE + .metastore + .put_stream_json(&format, stream_name) + .await + .map_err(|e| ObjectStorageError::MetastoreError(Box::new(e.to_detail())))?; + + Ok(()) + } + /// Updates the first event timestamp in the object store for the specified stream. /// /// This function retrieves the current object-store format for the given stream, From 3d69b6c10542bc5fd5d00878b50e0009fde4a445 Mon Sep 17 00:00:00 2001 From: Nikhil Sinha Date: Sun, 22 Feb 2026 01:18:08 +1100 Subject: [PATCH 2/4] add dataset handler file --- src/handlers/http/datasets.rs | 212 ++++++++++++++++++++++++++++++++++ 1 file changed, 212 insertions(+) create mode 100644 src/handlers/http/datasets.rs diff --git a/src/handlers/http/datasets.rs b/src/handlers/http/datasets.rs new file mode 100644 index 000000000..22f259fa2 --- /dev/null +++ b/src/handlers/http/datasets.rs @@ -0,0 +1,212 @@ +/* + * Parseable Server (C) 2022 - 2025 Parseable, Inc. + * + * This program is free software: you can redistribute it and/or modify + * it under the terms of the GNU Affero General Public License as + * published by the Free Software Foundation, either version 3 of the + * License, or (at your option) any later version. + * + * This program is distributed in the hope that it will be useful, + * but WITHOUT ANY WARRANTY; without even the implied warranty of + * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the + * GNU Affero General Public License for more details. + * + * You should have received a copy of the GNU Affero General Public License + * along with this program. If not, see . + * + */ + +use std::collections::HashSet; + +use actix_web::http::StatusCode; +use actix_web::{HttpResponse, web}; +use serde::{Deserialize, Serialize}; + +use crate::{ + handlers::DatasetTag, + parseable::PARSEABLE, + storage::{ObjectStorageError, StreamType}, +}; + +#[derive(Debug, Serialize)] +#[serde(rename_all = "camelCase")] +struct CorrelatedDataset { + name: String, + shared_tags: Vec, + shared_labels: Vec, +} + +/// GET /api/v1/datasets/{name}/correlated +/// Returns all datasets sharing at least one tag or label with the named dataset. +pub async fn get_correlated_datasets( + path: web::Path, +) -> Result { + let dataset_name = path.into_inner(); + + let stream = PARSEABLE + .get_stream(&dataset_name) + .map_err(|_| DatasetsError::DatasetNotFound(dataset_name.clone()))?; + + let target_tags: HashSet = stream.get_dataset_tags().into_iter().collect(); + let target_labels: HashSet = stream.get_dataset_labels().into_iter().collect(); + + if target_tags.is_empty() && target_labels.is_empty() { + return Ok(HttpResponse::Ok().json(Vec::::new())); + } + + let all_streams = PARSEABLE.streams.list(); + let mut correlated = Vec::new(); + + for name in all_streams { + if name == dataset_name { + continue; + } + if let Ok(s) = PARSEABLE.get_stream(&name) { + // Skip internal streams + if s.get_stream_type() == StreamType::Internal { + continue; + } + + let s_tags: HashSet = s.get_dataset_tags().into_iter().collect(); + let s_labels: HashSet = s.get_dataset_labels().into_iter().collect(); + + let shared_tags: Vec = target_tags.intersection(&s_tags).copied().collect(); + let shared_labels: Vec = + target_labels.intersection(&s_labels).cloned().collect(); + + if !shared_tags.is_empty() || !shared_labels.is_empty() { + correlated.push(CorrelatedDataset { + name, + shared_tags, + shared_labels, + }); + } + } + } + + Ok(HttpResponse::Ok().json(correlated)) +} + +/// GET /api/v1/datasets/tags/{tag} +/// Returns all datasets that have the specified tag. +pub async fn get_datasets_by_tag(path: web::Path) -> Result { + let tag_str = path.into_inner(); + let tag = + DatasetTag::try_from(tag_str.as_str()).map_err(|_| DatasetsError::InvalidTag(tag_str))?; + + let all_streams = PARSEABLE.streams.list(); + let mut matching = Vec::new(); + + for name in all_streams { + if let Ok(s) = PARSEABLE.get_stream(&name) { + if s.get_stream_type() == StreamType::Internal { + continue; + } + if s.get_dataset_tags().contains(&tag) { + matching.push(name); + } + } + } + + Ok(HttpResponse::Ok().json(matching)) +} + +#[derive(Debug, Deserialize)] +pub struct PutTagsBody { + pub tags: Vec, +} + +#[derive(Debug, Deserialize)] +pub struct PutLabelsBody { + pub labels: Vec, +} + +/// PUT /api/v1/datasets/{name}/tags +/// Replaces the dataset's tags with the provided list. +pub async fn put_dataset_tags( + path: web::Path, + body: web::Json, +) -> Result { + let dataset_name = path.into_inner(); + let new_tags: Vec = body + .into_inner() + .tags + .into_iter() + .collect::>() + .into_iter() + .collect(); + + let stream = PARSEABLE + .get_stream(&dataset_name) + .map_err(|_| DatasetsError::DatasetNotFound(dataset_name.clone()))?; + + // Update storage first, then in-memory + let storage = PARSEABLE.storage.get_object_store(); + let existing_labels = stream.get_dataset_labels(); + storage + .update_dataset_tags_and_labels_in_stream(&dataset_name, &new_tags, &existing_labels) + .await + .map_err(DatasetsError::Storage)?; + + stream.set_dataset_tags(new_tags.clone()); + + Ok(HttpResponse::Ok().json(serde_json::json!({ "tags": new_tags }))) +} + +/// PUT /api/v1/datasets/{name}/labels +/// Replaces the dataset's labels with the provided list. +pub async fn put_dataset_labels( + path: web::Path, + body: web::Json, +) -> Result { + let dataset_name = path.into_inner(); + let new_labels: Vec = body + .into_inner() + .labels + .into_iter() + .collect::>() + .into_iter() + .collect(); + + let stream = PARSEABLE + .get_stream(&dataset_name) + .map_err(|_| DatasetsError::DatasetNotFound(dataset_name.clone()))?; + + // Update storage first, then in-memory + let storage = PARSEABLE.storage.get_object_store(); + let existing_tags = stream.get_dataset_tags(); + storage + .update_dataset_tags_and_labels_in_stream(&dataset_name, &existing_tags, &new_labels) + .await + .map_err(DatasetsError::Storage)?; + + stream.set_dataset_labels(new_labels.clone()); + + Ok(HttpResponse::Ok().json(serde_json::json!({ "labels": new_labels }))) +} + +#[derive(Debug, thiserror::Error)] +pub enum DatasetsError { + #[error("Dataset not found: {0}")] + DatasetNotFound(String), + #[error("Invalid tag: {0}")] + InvalidTag(String), + #[error("Storage error: {0}")] + Storage(ObjectStorageError), +} + +impl actix_web::ResponseError for DatasetsError { + fn status_code(&self) -> StatusCode { + match self { + DatasetsError::DatasetNotFound(_) => StatusCode::NOT_FOUND, + DatasetsError::InvalidTag(_) => StatusCode::BAD_REQUEST, + DatasetsError::Storage(_) => StatusCode::INTERNAL_SERVER_ERROR, + } + } + + fn error_response(&self) -> HttpResponse { + HttpResponse::build(self.status_code()).json(serde_json::json!({ + "error": self.to_string() + })) + } +} From 87ff8e08dce454702e44446389af2a85563ec103 Mon Sep 17 00:00:00 2001 From: Nikhil Sinha Date: Wed, 25 Feb 2026 17:30:01 +1100 Subject: [PATCH 3/4] single endpoint for tags and labels --- src/handlers/http/datasets.rs | 85 ++++++++++++------------------- src/handlers/http/modal/server.rs | 10 +--- 2 files changed, 34 insertions(+), 61 deletions(-) diff --git a/src/handlers/http/datasets.rs b/src/handlers/http/datasets.rs index 22f259fa2..2e9d5ba7a 100644 --- a/src/handlers/http/datasets.rs +++ b/src/handlers/http/datasets.rs @@ -112,77 +112,56 @@ pub async fn get_datasets_by_tag(path: web::Path) -> Result, +pub struct PutDatasetMetadataBody { + pub tags: Option>, + pub labels: Option>, } -#[derive(Debug, Deserialize)] -pub struct PutLabelsBody { - pub labels: Vec, -} - -/// PUT /api/v1/datasets/{name}/tags -/// Replaces the dataset's tags with the provided list. -pub async fn put_dataset_tags( +/// PUT /api/v1/datasets/{name} +/// Replaces the dataset's tags and/or labels. +/// Only fields present in the body are updated; absent fields are left unchanged. +pub async fn put_dataset_metadata( path: web::Path, - body: web::Json, + body: web::Json, ) -> Result { let dataset_name = path.into_inner(); - let new_tags: Vec = body - .into_inner() - .tags - .into_iter() - .collect::>() - .into_iter() - .collect(); + let body = body.into_inner(); let stream = PARSEABLE .get_stream(&dataset_name) .map_err(|_| DatasetsError::DatasetNotFound(dataset_name.clone()))?; - // Update storage first, then in-memory - let storage = PARSEABLE.storage.get_object_store(); - let existing_labels = stream.get_dataset_labels(); - storage - .update_dataset_tags_and_labels_in_stream(&dataset_name, &new_tags, &existing_labels) - .await - .map_err(DatasetsError::Storage)?; - - stream.set_dataset_tags(new_tags.clone()); - - Ok(HttpResponse::Ok().json(serde_json::json!({ "tags": new_tags }))) -} - -/// PUT /api/v1/datasets/{name}/labels -/// Replaces the dataset's labels with the provided list. -pub async fn put_dataset_labels( - path: web::Path, - body: web::Json, -) -> Result { - let dataset_name = path.into_inner(); - let new_labels: Vec = body - .into_inner() - .labels - .into_iter() - .collect::>() - .into_iter() - .collect(); - - let stream = PARSEABLE - .get_stream(&dataset_name) - .map_err(|_| DatasetsError::DatasetNotFound(dataset_name.clone()))?; + let final_tags = match body.tags { + Some(tags) => tags + .into_iter() + .collect::>() + .into_iter() + .collect(), + None => stream.get_dataset_tags(), + }; + let final_labels = match body.labels { + Some(labels) => labels + .into_iter() + .collect::>() + .into_iter() + .collect(), + None => stream.get_dataset_labels(), + }; // Update storage first, then in-memory let storage = PARSEABLE.storage.get_object_store(); - let existing_tags = stream.get_dataset_tags(); storage - .update_dataset_tags_and_labels_in_stream(&dataset_name, &existing_tags, &new_labels) + .update_dataset_tags_and_labels_in_stream(&dataset_name, &final_tags, &final_labels) .await .map_err(DatasetsError::Storage)?; - stream.set_dataset_labels(new_labels.clone()); + stream.set_dataset_tags(final_tags.clone()); + stream.set_dataset_labels(final_labels.clone()); - Ok(HttpResponse::Ok().json(serde_json::json!({ "labels": new_labels }))) + Ok(HttpResponse::Ok().json(serde_json::json!({ + "tags": final_tags, + "labels": final_labels, + }))) } #[derive(Debug, thiserror::Error)] diff --git a/src/handlers/http/modal/server.rs b/src/handlers/http/modal/server.rs index 34109e023..b02b1a1f5 100644 --- a/src/handlers/http/modal/server.rs +++ b/src/handlers/http/modal/server.rs @@ -220,15 +220,9 @@ impl Server { .authorize_for_resource(Action::GetStreamInfo), ) .route( - "/{name}/tags", + "/{name}", web::put() - .to(http::datasets::put_dataset_tags) - .authorize_for_resource(Action::CreateStream), - ) - .route( - "/{name}/labels", - web::put() - .to(http::datasets::put_dataset_labels) + .to(http::datasets::put_dataset_metadata) .authorize_for_resource(Action::CreateStream), ) } From 4a3e23cab0c5b9ef42b7c17756e142eb92e079b7 Mon Sep 17 00:00:00 2001 From: Nikhil Sinha Date: Wed, 25 Feb 2026 17:49:47 +1100 Subject: [PATCH 4/4] add tenant_id params in new handlers --- src/handlers/http/datasets.rs | 33 +++++++++++++++++++++++---------- src/parseable/mod.rs | 3 ++- src/storage/object_storage.rs | 5 +++-- 3 files changed, 28 insertions(+), 13 deletions(-) diff --git a/src/handlers/http/datasets.rs b/src/handlers/http/datasets.rs index 2e9d5ba7a..9bd6f533f 100644 --- a/src/handlers/http/datasets.rs +++ b/src/handlers/http/datasets.rs @@ -19,9 +19,10 @@ use std::collections::HashSet; use actix_web::http::StatusCode; -use actix_web::{HttpResponse, web}; +use actix_web::{HttpRequest, HttpResponse, web}; use serde::{Deserialize, Serialize}; +use crate::utils::get_tenant_id_from_request; use crate::{ handlers::DatasetTag, parseable::PARSEABLE, @@ -39,12 +40,13 @@ struct CorrelatedDataset { /// GET /api/v1/datasets/{name}/correlated /// Returns all datasets sharing at least one tag or label with the named dataset. pub async fn get_correlated_datasets( + req: HttpRequest, path: web::Path, ) -> Result { let dataset_name = path.into_inner(); - + let tenant_id = get_tenant_id_from_request(&req); let stream = PARSEABLE - .get_stream(&dataset_name) + .get_stream(&dataset_name, &tenant_id) .map_err(|_| DatasetsError::DatasetNotFound(dataset_name.clone()))?; let target_tags: HashSet = stream.get_dataset_tags().into_iter().collect(); @@ -54,14 +56,14 @@ pub async fn get_correlated_datasets( return Ok(HttpResponse::Ok().json(Vec::::new())); } - let all_streams = PARSEABLE.streams.list(); + let all_streams = PARSEABLE.streams.list(&tenant_id); let mut correlated = Vec::new(); for name in all_streams { if name == dataset_name { continue; } - if let Ok(s) = PARSEABLE.get_stream(&name) { + if let Ok(s) = PARSEABLE.get_stream(&name, &tenant_id) { // Skip internal streams if s.get_stream_type() == StreamType::Internal { continue; @@ -89,16 +91,20 @@ pub async fn get_correlated_datasets( /// GET /api/v1/datasets/tags/{tag} /// Returns all datasets that have the specified tag. -pub async fn get_datasets_by_tag(path: web::Path) -> Result { +pub async fn get_datasets_by_tag( + req: HttpRequest, + path: web::Path, +) -> Result { + let tenant_id = get_tenant_id_from_request(&req); let tag_str = path.into_inner(); let tag = DatasetTag::try_from(tag_str.as_str()).map_err(|_| DatasetsError::InvalidTag(tag_str))?; - let all_streams = PARSEABLE.streams.list(); + let all_streams = PARSEABLE.streams.list(&tenant_id); let mut matching = Vec::new(); for name in all_streams { - if let Ok(s) = PARSEABLE.get_stream(&name) { + if let Ok(s) = PARSEABLE.get_stream(&name, &tenant_id) { if s.get_stream_type() == StreamType::Internal { continue; } @@ -121,14 +127,16 @@ pub struct PutDatasetMetadataBody { /// Replaces the dataset's tags and/or labels. /// Only fields present in the body are updated; absent fields are left unchanged. pub async fn put_dataset_metadata( + req: HttpRequest, path: web::Path, body: web::Json, ) -> Result { let dataset_name = path.into_inner(); let body = body.into_inner(); + let tenant_id = get_tenant_id_from_request(&req); let stream = PARSEABLE - .get_stream(&dataset_name) + .get_stream(&dataset_name, &tenant_id) .map_err(|_| DatasetsError::DatasetNotFound(dataset_name.clone()))?; let final_tags = match body.tags { @@ -151,7 +159,12 @@ pub async fn put_dataset_metadata( // Update storage first, then in-memory let storage = PARSEABLE.storage.get_object_store(); storage - .update_dataset_tags_and_labels_in_stream(&dataset_name, &final_tags, &final_labels) + .update_dataset_tags_and_labels_in_stream( + &dataset_name, + &final_tags, + &final_labels, + &tenant_id, + ) .await .map_err(DatasetsError::Storage)?; diff --git a/src/parseable/mod.rs b/src/parseable/mod.rs index 271ff8a05..0f8ca2cd3 100644 --- a/src/parseable/mod.rs +++ b/src/parseable/mod.rs @@ -475,7 +475,8 @@ impl Parseable { vec![log_source_entry.clone()], TelemetryType::Logs, &tenant_id, - None, + vec![], + vec![] ) .await; diff --git a/src/storage/object_storage.rs b/src/storage/object_storage.rs index 54000b3d4..c82e32737 100644 --- a/src/storage/object_storage.rs +++ b/src/storage/object_storage.rs @@ -501,11 +501,12 @@ pub trait ObjectStorage: Debug + Send + Sync + 'static { stream_name: &str, tags: &[DatasetTag], labels: &[String], + tenant_id: &Option, ) -> Result<(), ObjectStorageError> { let mut format: ObjectStoreFormat = serde_json::from_slice( &PARSEABLE .metastore - .get_stream_json(stream_name, false) + .get_stream_json(stream_name, false, tenant_id) .await .map_err(|e| ObjectStorageError::MetastoreError(Box::new(e.to_detail())))?, )?; @@ -513,7 +514,7 @@ pub trait ObjectStorage: Debug + Send + Sync + 'static { format.dataset_labels = labels.to_owned(); PARSEABLE .metastore - .put_stream_json(&format, stream_name) + .put_stream_json(&format, stream_name, tenant_id) .await .map_err(|e| ObjectStorageError::MetastoreError(Box::new(e.to_detail())))?;