diff --git a/src/connectors/kafka/processor.rs b/src/connectors/kafka/processor.rs
index 4a61258d7..f918b6d26 100644
--- a/src/connectors/kafka/processor.rs
+++ b/src/connectors/kafka/processor.rs
@@ -64,7 +64,8 @@ impl ParseableSinkProcessor {
vec![log_source_entry],
TelemetryType::default(),
tenant_id,
- None,
+ vec![],
+ vec![],
)
.await?;
diff --git a/src/handlers/http/datasets.rs b/src/handlers/http/datasets.rs
new file mode 100644
index 000000000..9bd6f533f
--- /dev/null
+++ b/src/handlers/http/datasets.rs
@@ -0,0 +1,204 @@
+/*
+ * Parseable Server (C) 2022 - 2025 Parseable, Inc.
+ *
+ * This program is free software: you can redistribute it and/or modify
+ * it under the terms of the GNU Affero General Public License as
+ * published by the Free Software Foundation, either version 3 of the
+ * License, or (at your option) any later version.
+ *
+ * This program is distributed in the hope that it will be useful,
+ * but WITHOUT ANY WARRANTY; without even the implied warranty of
+ * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
+ * GNU Affero General Public License for more details.
+ *
+ * You should have received a copy of the GNU Affero General Public License
+ * along with this program. If not, see .
+ *
+ */
+
+use std::collections::HashSet;
+
+use actix_web::http::StatusCode;
+use actix_web::{HttpRequest, HttpResponse, web};
+use serde::{Deserialize, Serialize};
+
+use crate::utils::get_tenant_id_from_request;
+use crate::{
+ handlers::DatasetTag,
+ parseable::PARSEABLE,
+ storage::{ObjectStorageError, StreamType},
+};
+
+#[derive(Debug, Serialize)]
+#[serde(rename_all = "camelCase")]
+struct CorrelatedDataset {
+ name: String,
+ shared_tags: Vec,
+ shared_labels: Vec,
+}
+
+/// GET /api/v1/datasets/{name}/correlated
+/// Returns all datasets sharing at least one tag or label with the named dataset.
+pub async fn get_correlated_datasets(
+ req: HttpRequest,
+ path: web::Path,
+) -> Result {
+ let dataset_name = path.into_inner();
+ let tenant_id = get_tenant_id_from_request(&req);
+ let stream = PARSEABLE
+ .get_stream(&dataset_name, &tenant_id)
+ .map_err(|_| DatasetsError::DatasetNotFound(dataset_name.clone()))?;
+
+ let target_tags: HashSet = stream.get_dataset_tags().into_iter().collect();
+ let target_labels: HashSet = stream.get_dataset_labels().into_iter().collect();
+
+ if target_tags.is_empty() && target_labels.is_empty() {
+ return Ok(HttpResponse::Ok().json(Vec::::new()));
+ }
+
+ let all_streams = PARSEABLE.streams.list(&tenant_id);
+ let mut correlated = Vec::new();
+
+ for name in all_streams {
+ if name == dataset_name {
+ continue;
+ }
+ if let Ok(s) = PARSEABLE.get_stream(&name, &tenant_id) {
+ // Skip internal streams
+ if s.get_stream_type() == StreamType::Internal {
+ continue;
+ }
+
+ let s_tags: HashSet = s.get_dataset_tags().into_iter().collect();
+ let s_labels: HashSet = s.get_dataset_labels().into_iter().collect();
+
+ let shared_tags: Vec = target_tags.intersection(&s_tags).copied().collect();
+ let shared_labels: Vec =
+ target_labels.intersection(&s_labels).cloned().collect();
+
+ if !shared_tags.is_empty() || !shared_labels.is_empty() {
+ correlated.push(CorrelatedDataset {
+ name,
+ shared_tags,
+ shared_labels,
+ });
+ }
+ }
+ }
+
+ Ok(HttpResponse::Ok().json(correlated))
+}
+
+/// GET /api/v1/datasets/tags/{tag}
+/// Returns all datasets that have the specified tag.
+pub async fn get_datasets_by_tag(
+ req: HttpRequest,
+ path: web::Path,
+) -> Result {
+ let tenant_id = get_tenant_id_from_request(&req);
+ let tag_str = path.into_inner();
+ let tag =
+ DatasetTag::try_from(tag_str.as_str()).map_err(|_| DatasetsError::InvalidTag(tag_str))?;
+
+ let all_streams = PARSEABLE.streams.list(&tenant_id);
+ let mut matching = Vec::new();
+
+ for name in all_streams {
+ if let Ok(s) = PARSEABLE.get_stream(&name, &tenant_id) {
+ if s.get_stream_type() == StreamType::Internal {
+ continue;
+ }
+ if s.get_dataset_tags().contains(&tag) {
+ matching.push(name);
+ }
+ }
+ }
+
+ Ok(HttpResponse::Ok().json(matching))
+}
+
+#[derive(Debug, Deserialize)]
+pub struct PutDatasetMetadataBody {
+ pub tags: Option>,
+ pub labels: Option>,
+}
+
+/// PUT /api/v1/datasets/{name}
+/// Replaces the dataset's tags and/or labels.
+/// Only fields present in the body are updated; absent fields are left unchanged.
+pub async fn put_dataset_metadata(
+ req: HttpRequest,
+ path: web::Path,
+ body: web::Json,
+) -> Result {
+ let dataset_name = path.into_inner();
+ let body = body.into_inner();
+ let tenant_id = get_tenant_id_from_request(&req);
+
+ let stream = PARSEABLE
+ .get_stream(&dataset_name, &tenant_id)
+ .map_err(|_| DatasetsError::DatasetNotFound(dataset_name.clone()))?;
+
+ let final_tags = match body.tags {
+ Some(tags) => tags
+ .into_iter()
+ .collect::>()
+ .into_iter()
+ .collect(),
+ None => stream.get_dataset_tags(),
+ };
+ let final_labels = match body.labels {
+ Some(labels) => labels
+ .into_iter()
+ .collect::>()
+ .into_iter()
+ .collect(),
+ None => stream.get_dataset_labels(),
+ };
+
+ // Update storage first, then in-memory
+ let storage = PARSEABLE.storage.get_object_store();
+ storage
+ .update_dataset_tags_and_labels_in_stream(
+ &dataset_name,
+ &final_tags,
+ &final_labels,
+ &tenant_id,
+ )
+ .await
+ .map_err(DatasetsError::Storage)?;
+
+ stream.set_dataset_tags(final_tags.clone());
+ stream.set_dataset_labels(final_labels.clone());
+
+ Ok(HttpResponse::Ok().json(serde_json::json!({
+ "tags": final_tags,
+ "labels": final_labels,
+ })))
+}
+
+#[derive(Debug, thiserror::Error)]
+pub enum DatasetsError {
+ #[error("Dataset not found: {0}")]
+ DatasetNotFound(String),
+ #[error("Invalid tag: {0}")]
+ InvalidTag(String),
+ #[error("Storage error: {0}")]
+ Storage(ObjectStorageError),
+}
+
+impl actix_web::ResponseError for DatasetsError {
+ fn status_code(&self) -> StatusCode {
+ match self {
+ DatasetsError::DatasetNotFound(_) => StatusCode::NOT_FOUND,
+ DatasetsError::InvalidTag(_) => StatusCode::BAD_REQUEST,
+ DatasetsError::Storage(_) => StatusCode::INTERNAL_SERVER_ERROR,
+ }
+ }
+
+ fn error_response(&self) -> HttpResponse {
+ HttpResponse::build(self.status_code()).json(serde_json::json!({
+ "error": self.to_string()
+ }))
+ }
+}
diff --git a/src/handlers/http/ingest.rs b/src/handlers/http/ingest.rs
index 0f3a87744..a02fe9323 100644
--- a/src/handlers/http/ingest.rs
+++ b/src/handlers/http/ingest.rs
@@ -120,7 +120,8 @@ pub async fn ingest(
vec![log_source_entry.clone()],
telemetry_type,
&tenant_id,
- None,
+ vec![],
+ vec![],
)
.await
.map_err(|e| {
@@ -239,7 +240,8 @@ pub async fn setup_otel_stream(
vec![log_source_entry.clone()],
telemetry_type,
&tenant_id,
- None,
+ vec![],
+ vec![],
)
.await?;
let mut time_partition = None;
diff --git a/src/handlers/http/mod.rs b/src/handlers/http/mod.rs
index 993a40b10..a4136e939 100644
--- a/src/handlers/http/mod.rs
+++ b/src/handlers/http/mod.rs
@@ -32,6 +32,7 @@ pub mod about;
pub mod alerts;
pub mod cluster;
pub mod correlation;
+pub mod datasets;
pub mod demo_data;
pub mod health_check;
pub mod ingest;
diff --git a/src/handlers/http/modal/server.rs b/src/handlers/http/modal/server.rs
index 9b7aa4aea..d7460db7e 100644
--- a/src/handlers/http/modal/server.rs
+++ b/src/handlers/http/modal/server.rs
@@ -199,14 +199,33 @@ impl Server {
}
pub fn get_prism_datasets() -> Scope {
- web::scope("/datasets").route(
- "",
- web::post()
- .to(http::prism_logstream::post_datasets)
- .authorize_for_resource(Action::GetStreamInfo)
- .authorize_for_resource(Action::GetStats)
- .authorize_for_resource(Action::GetRetention),
- )
+ web::scope("/datasets")
+ .route(
+ "",
+ web::post()
+ .to(http::prism_logstream::post_datasets)
+ .authorize_for_resource(Action::GetStreamInfo)
+ .authorize_for_resource(Action::GetStats)
+ .authorize_for_resource(Action::GetRetention),
+ )
+ .route(
+ "/tags/{tag}",
+ web::get()
+ .to(http::datasets::get_datasets_by_tag)
+ .authorize_for_resource(Action::GetStreamInfo),
+ )
+ .route(
+ "/{name}/correlated",
+ web::get()
+ .to(http::datasets::get_correlated_datasets)
+ .authorize_for_resource(Action::GetStreamInfo),
+ )
+ .route(
+ "/{name}",
+ web::put()
+ .to(http::datasets::put_dataset_metadata)
+ .authorize_for_resource(Action::CreateStream),
+ )
}
pub fn get_demo_data_webscope() -> Scope {
diff --git a/src/handlers/http/modal/utils/logstream_utils.rs b/src/handlers/http/modal/utils/logstream_utils.rs
index 7b93fafbc..7930df77f 100644
--- a/src/handlers/http/modal/utils/logstream_utils.rs
+++ b/src/handlers/http/modal/utils/logstream_utils.rs
@@ -16,17 +16,18 @@
*
*/
+use actix_web::http::header::HeaderMap;
+
use crate::{
event::format::LogSource,
handlers::{
- CUSTOM_PARTITION_KEY, DATASET_TAG_KEY, DatasetTag, LOG_SOURCE_KEY, STATIC_SCHEMA_FLAG,
- STREAM_TYPE_KEY, TELEMETRY_TYPE_KEY, TIME_PARTITION_KEY, TIME_PARTITION_LIMIT_KEY,
- TelemetryType, UPDATE_STREAM_KEY,
+ CUSTOM_PARTITION_KEY, DATASET_LABELS_KEY, DATASET_TAG_KEY, DATASET_TAGS_KEY, DatasetTag,
+ LOG_SOURCE_KEY, STATIC_SCHEMA_FLAG, STREAM_TYPE_KEY, TELEMETRY_TYPE_KEY,
+ TIME_PARTITION_KEY, TIME_PARTITION_LIMIT_KEY, TelemetryType, UPDATE_STREAM_KEY,
+ parse_dataset_labels, parse_dataset_tags,
},
storage::StreamType,
};
-use actix_web::http::header::HeaderMap;
-use tracing::warn;
#[derive(Debug, Default)]
pub struct PutStreamHeaders {
@@ -38,7 +39,8 @@ pub struct PutStreamHeaders {
pub stream_type: StreamType,
pub log_source: LogSource,
pub telemetry_type: TelemetryType,
- pub dataset_tag: Option,
+ pub dataset_tags: Vec,
+ pub dataset_labels: Vec,
}
impl From<&HeaderMap> for PutStreamHeaders {
@@ -72,16 +74,17 @@ impl From<&HeaderMap> for PutStreamHeaders {
.get(TELEMETRY_TYPE_KEY)
.and_then(|v| v.to_str().ok())
.map_or(TelemetryType::Logs, TelemetryType::from),
- dataset_tag: headers
- .get(DATASET_TAG_KEY)
+ dataset_tags: headers
+ .get(DATASET_TAGS_KEY)
+ .or_else(|| headers.get(DATASET_TAG_KEY))
.and_then(|v| v.to_str().ok())
- .and_then(|v| match DatasetTag::try_from(v) {
- Ok(tag) => Some(tag),
- Err(err) => {
- warn!("Invalid dataset tag '{v}': {err}");
- None
- }
- }),
+ .map(parse_dataset_tags)
+ .unwrap_or_default(),
+ dataset_labels: headers
+ .get(DATASET_LABELS_KEY)
+ .and_then(|v| v.to_str().ok())
+ .map(parse_dataset_labels)
+ .unwrap_or_default(),
}
}
}
diff --git a/src/handlers/mod.rs b/src/handlers/mod.rs
index 034e524b1..ebbea833b 100644
--- a/src/handlers/mod.rs
+++ b/src/handlers/mod.rs
@@ -16,9 +16,11 @@
*
*/
+use std::collections::HashSet;
use std::fmt::Display;
use serde::{Deserialize, Serialize};
+use tracing::warn;
pub mod airplane;
pub mod http;
@@ -36,6 +38,8 @@ pub const UPDATE_STREAM_KEY: &str = "x-p-update-stream";
pub const STREAM_TYPE_KEY: &str = "x-p-stream-type";
pub const TELEMETRY_TYPE_KEY: &str = "x-p-telemetry-type";
pub const DATASET_TAG_KEY: &str = "x-p-dataset-tag";
+pub const DATASET_TAGS_KEY: &str = "x-p-dataset-tags";
+pub const DATASET_LABELS_KEY: &str = "x-p-dataset-labels";
pub const TENANT_ID: &str = "x-p-tenant";
const COOKIE_AGE_DAYS: usize = 7;
const SESSION_COOKIE_NAME: &str = "session";
@@ -85,12 +89,14 @@ impl Display for TelemetryType {
}
/// Tag for categorizing datasets/streams by observability domain
-#[derive(Debug, Clone, Copy, Serialize, Deserialize, PartialEq, Eq)]
+#[derive(Debug, Clone, Copy, Serialize, Deserialize, PartialEq, Eq, Hash)]
#[serde(rename_all = "kebab-case")]
pub enum DatasetTag {
- AgentObservability,
- K8sObservability,
+ AgentMonitoring,
+ K8sMonitoring,
DatabaseObservability,
+ ApplicationMonitoring,
+ ServiceMap,
}
impl TryFrom<&str> for DatasetTag {
@@ -98,11 +104,13 @@ impl TryFrom<&str> for DatasetTag {
fn try_from(s: &str) -> Result {
match s.to_lowercase().as_str() {
- "agent-observability" => Ok(DatasetTag::AgentObservability),
- "k8s-observability" => Ok(DatasetTag::K8sObservability),
+ "agent-monitoring" => Ok(DatasetTag::AgentMonitoring),
+ "k8s-monitoring" => Ok(DatasetTag::K8sMonitoring),
"database-observability" => Ok(DatasetTag::DatabaseObservability),
+ "application-monitoring" => Ok(DatasetTag::ApplicationMonitoring),
+ "service-map" => Ok(DatasetTag::ServiceMap),
_ => Err(
- "Invalid dataset tag. Supported values: agent-observability, k8s-observability, database-observability",
+ "Invalid dataset tag. Supported values: agent-monitoring, k8s-monitoring, database-observability, application-monitoring, service-map",
),
}
}
@@ -111,9 +119,43 @@ impl TryFrom<&str> for DatasetTag {
impl Display for DatasetTag {
fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
f.write_str(match self {
- DatasetTag::AgentObservability => "agent-observability",
- DatasetTag::K8sObservability => "k8s-observability",
+ DatasetTag::AgentMonitoring => "agent-monitoring",
+ DatasetTag::K8sMonitoring => "k8s-monitoring",
DatasetTag::DatabaseObservability => "database-observability",
+ DatasetTag::ApplicationMonitoring => "application-monitoring",
+ DatasetTag::ServiceMap => "service-map",
})
}
}
+
+pub fn parse_dataset_tags(header_value: &str) -> Vec {
+ header_value
+ .split(',')
+ .filter_map(|s| {
+ let trimmed = s.trim();
+ if trimmed.is_empty() {
+ None
+ } else {
+ match DatasetTag::try_from(trimmed) {
+ Ok(tag) => Some(tag),
+ Err(err) => {
+ warn!("Invalid dataset tag '{trimmed}': {err}");
+ None
+ }
+ }
+ }
+ })
+ .collect::>()
+ .into_iter()
+ .collect()
+}
+
+pub fn parse_dataset_labels(header_value: &str) -> Vec {
+ header_value
+ .split(',')
+ .map(|s| s.trim().to_string())
+ .filter(|s| !s.is_empty())
+ .collect::>()
+ .into_iter()
+ .collect()
+}
diff --git a/src/metadata.rs b/src/metadata.rs
index b0fa024fd..9ca1dd1e7 100644
--- a/src/metadata.rs
+++ b/src/metadata.rs
@@ -93,7 +93,8 @@ pub struct LogStreamMetadata {
pub stream_type: StreamType,
pub log_source: Vec,
pub telemetry_type: TelemetryType,
- pub dataset_tag: Option,
+ pub dataset_tags: Vec,
+ pub dataset_labels: Vec,
}
impl LogStreamMetadata {
@@ -109,7 +110,8 @@ impl LogStreamMetadata {
schema_version: SchemaVersion,
log_source: Vec,
telemetry_type: TelemetryType,
- dataset_tag: Option,
+ dataset_tags: Vec,
+ dataset_labels: Vec,
) -> Self {
LogStreamMetadata {
created_at: if created_at.is_empty() {
@@ -134,7 +136,8 @@ impl LogStreamMetadata {
schema_version,
log_source,
telemetry_type,
- dataset_tag,
+ dataset_tags,
+ dataset_labels,
..Default::default()
}
}
diff --git a/src/migration/mod.rs b/src/migration/mod.rs
index 7d7027fe4..8e67d59e6 100644
--- a/src/migration/mod.rs
+++ b/src/migration/mod.rs
@@ -454,7 +454,8 @@ async fn setup_logstream_metadata(
stream_type,
log_source,
telemetry_type,
- dataset_tag,
+ dataset_tags,
+ dataset_labels,
..
} = serde_json::from_value(stream_metadata_value).unwrap_or_default();
@@ -500,7 +501,8 @@ async fn setup_logstream_metadata(
stream_type,
log_source,
telemetry_type,
- dataset_tag,
+ dataset_tags,
+ dataset_labels,
};
Ok(metadata)
diff --git a/src/parseable/mod.rs b/src/parseable/mod.rs
index c6072798d..ac3d9ea69 100644
--- a/src/parseable/mod.rs
+++ b/src/parseable/mod.rs
@@ -416,7 +416,8 @@ impl Parseable {
let schema_version = stream_metadata.schema_version;
let log_source = stream_metadata.log_source;
let telemetry_type = stream_metadata.telemetry_type;
- let dataset_tag = stream_metadata.dataset_tag;
+ let dataset_tags = stream_metadata.dataset_tags;
+ let dataset_labels = stream_metadata.dataset_labels;
let mut metadata = LogStreamMetadata::new(
created_at,
time_partition,
@@ -428,7 +429,8 @@ impl Parseable {
schema_version,
log_source,
telemetry_type,
- dataset_tag,
+ dataset_tags,
+ dataset_labels,
);
// Set hot tier fields from the stored metadata
@@ -474,7 +476,8 @@ impl Parseable {
vec![log_source_entry.clone()],
TelemetryType::Logs,
&tenant_id,
- None,
+ vec![],
+ vec![]
)
.await;
@@ -533,7 +536,8 @@ impl Parseable {
log_source: Vec,
telemetry_type: TelemetryType,
tenant_id: &Option,
- dataset_tag: Option,
+ dataset_tags: Vec,
+ dataset_labels: Vec,
) -> Result {
if self.streams.contains(stream_name, tenant_id) {
return Ok(true);
@@ -566,7 +570,8 @@ impl Parseable {
log_source,
telemetry_type,
tenant_id,
- dataset_tag,
+ dataset_tags,
+ dataset_labels,
)
.await?;
@@ -643,7 +648,8 @@ impl Parseable {
stream_type,
log_source,
telemetry_type,
- dataset_tag,
+ dataset_tags,
+ dataset_labels,
} = headers.into();
let stream_in_memory_dont_update =
@@ -717,7 +723,8 @@ impl Parseable {
vec![log_source_entry],
telemetry_type,
tenant_id,
- dataset_tag,
+ dataset_tags,
+ dataset_labels,
)
.await?;
@@ -779,7 +786,8 @@ impl Parseable {
log_source: Vec,
telemetry_type: TelemetryType,
tenant_id: &Option,
- dataset_tag: Option,
+ dataset_tags: Vec,
+ dataset_labels: Vec,
) -> Result<(), CreateStreamError> {
// fail to proceed if invalid stream name
if stream_type != StreamType::Internal {
@@ -804,7 +812,8 @@ impl Parseable {
},
log_source: log_source.clone(),
telemetry_type,
- dataset_tag,
+ dataset_tags: dataset_tags.clone(),
+ dataset_labels: dataset_labels.clone(),
..Default::default()
};
@@ -834,7 +843,8 @@ impl Parseable {
SchemaVersion::V1, // New stream
log_source,
telemetry_type,
- dataset_tag,
+ dataset_tags,
+ dataset_labels,
);
let ingestor_id = INGESTOR_META
.get()
diff --git a/src/parseable/streams.rs b/src/parseable/streams.rs
index 0bf8bf2ca..1230946ab 100644
--- a/src/parseable/streams.rs
+++ b/src/parseable/streams.rs
@@ -963,8 +963,28 @@ impl Stream {
self.metadata.read().expect(LOCK_EXPECT).log_source.clone()
}
- pub fn get_dataset_tag(&self) -> Option {
- self.metadata.read().expect(LOCK_EXPECT).dataset_tag
+ pub fn get_dataset_tags(&self) -> Vec {
+ self.metadata
+ .read()
+ .expect(LOCK_EXPECT)
+ .dataset_tags
+ .clone()
+ }
+
+ pub fn get_dataset_labels(&self) -> Vec {
+ self.metadata
+ .read()
+ .expect(LOCK_EXPECT)
+ .dataset_labels
+ .clone()
+ }
+
+ pub fn set_dataset_tags(&self, tags: Vec) {
+ self.metadata.write().expect(LOCK_EXPECT).dataset_tags = tags;
+ }
+
+ pub fn set_dataset_labels(&self, labels: Vec) {
+ self.metadata.write().expect(LOCK_EXPECT).dataset_labels = labels;
}
pub fn add_log_source(&self, log_source: LogSourceEntry) {
diff --git a/src/prism/home/mod.rs b/src/prism/home/mod.rs
index 30b699c13..484bb8ef6 100644
--- a/src/prism/home/mod.rs
+++ b/src/prism/home/mod.rs
@@ -45,7 +45,8 @@ struct StreamMetadata {
time_partition: Option,
dataset_format: LogSource,
ingestion: bool,
- tag: Option,
+ tags: Vec,
+ labels: Vec,
}
type StreamMetadataResponse = Result;
@@ -59,8 +60,10 @@ pub struct DataSet {
time_partition: Option,
dataset_format: LogSource,
ingestion: bool,
- #[serde(skip_serializing_if = "Option::is_none")]
- tag: Option,
+ #[serde(default, skip_serializing_if = "Vec::is_empty")]
+ tags: Vec,
+ #[serde(default, skip_serializing_if = "Vec::is_empty")]
+ labels: Vec,
}
#[derive(Debug, Serialize, Deserialize, Default)]
@@ -147,7 +150,8 @@ pub async fn generate_home_response(
time_partition: sm.time_partition,
dataset_format: sm.dataset_format,
ingestion: sm.ingestion,
- tag: sm.tag,
+ tags: sm.tags,
+ labels: sm.labels,
});
}
Err(e) => {
@@ -233,7 +237,8 @@ async fn get_stream_metadata(stream: String, tenant_id: &Option) -> Stre
let ingested = stream_jsons
.iter()
.any(|s| s.stats.current_stats.events > 0);
- let dataset_tag = stream_jsons[0].dataset_tag;
+ let dataset_tags = stream_jsons[0].dataset_tags.clone();
+ let dataset_labels = stream_jsons[0].dataset_labels.clone();
Ok(StreamMetadata {
stream,
@@ -242,7 +247,8 @@ async fn get_stream_metadata(stream: String, tenant_id: &Option) -> Stre
time_partition,
dataset_format,
ingestion: ingested,
- tag: dataset_tag,
+ tags: dataset_tags,
+ labels: dataset_labels,
})
}
diff --git a/src/storage/field_stats.rs b/src/storage/field_stats.rs
index 78e6106e2..02e8480e5 100644
--- a/src/storage/field_stats.rs
+++ b/src/storage/field_stats.rs
@@ -156,7 +156,8 @@ pub async fn calculate_field_stats(
vec![log_source_entry],
TelemetryType::Logs,
tenant_id,
- None,
+ vec![],
+ vec![],
)
.await?;
let vec_json = apply_generic_flattening_for_partition(
diff --git a/src/storage/mod.rs b/src/storage/mod.rs
index 84d48d3c3..1694e5f5b 100644
--- a/src/storage/mod.rs
+++ b/src/storage/mod.rs
@@ -131,8 +131,10 @@ pub struct ObjectStoreFormat {
pub log_source: Vec,
#[serde(default)]
pub telemetry_type: TelemetryType,
- #[serde(default, skip_serializing_if = "Option::is_none")]
- pub dataset_tag: Option,
+ #[serde(default, skip_serializing_if = "Vec::is_empty")]
+ pub dataset_tags: Vec,
+ #[serde(default, skip_serializing_if = "Vec::is_empty")]
+ pub dataset_labels: Vec,
}
impl MetastoreObject for ObjectStoreFormat {
@@ -173,8 +175,10 @@ pub struct StreamInfo {
pub telemetry_type: TelemetryType,
#[serde(default)]
pub hot_tier_enabled: bool,
- #[serde(default, skip_serializing_if = "Option::is_none")]
- pub dataset_tag: Option,
+ #[serde(default, skip_serializing_if = "Vec::is_empty")]
+ pub dataset_tags: Vec,
+ #[serde(default, skip_serializing_if = "Vec::is_empty")]
+ pub dataset_labels: Vec,
}
impl StreamInfo {
@@ -197,7 +201,8 @@ impl StreamInfo {
log_source: metadata.log_source.clone(),
telemetry_type: metadata.telemetry_type,
hot_tier_enabled: metadata.hot_tier_enabled,
- dataset_tag: metadata.dataset_tag,
+ dataset_tags: metadata.dataset_tags.clone(),
+ dataset_labels: metadata.dataset_labels.clone(),
}
}
}
@@ -279,7 +284,8 @@ impl Default for ObjectStoreFormat {
hot_tier: None,
log_source: vec![LogSourceEntry::default()],
telemetry_type: TelemetryType::Logs,
- dataset_tag: None,
+ dataset_tags: Vec::new(),
+ dataset_labels: Vec::new(),
}
}
}
diff --git a/src/storage/object_storage.rs b/src/storage/object_storage.rs
index 398b7e0cf..c82e32737 100644
--- a/src/storage/object_storage.rs
+++ b/src/storage/object_storage.rs
@@ -45,6 +45,7 @@ use ulid::Ulid;
use crate::catalog::{self, snapshot::Snapshot};
use crate::event::format::LogSource;
use crate::event::format::LogSourceEntry;
+use crate::handlers::DatasetTag;
use crate::handlers::http::fetch_schema;
use crate::handlers::http::modal::ingest_server::INGESTOR_EXPECT;
use crate::handlers::http::modal::ingest_server::INGESTOR_META;
@@ -495,6 +496,31 @@ pub trait ObjectStorage: Debug + Send + Sync + 'static {
Ok(())
}
+ async fn update_dataset_tags_and_labels_in_stream(
+ &self,
+ stream_name: &str,
+ tags: &[DatasetTag],
+ labels: &[String],
+ tenant_id: &Option,
+ ) -> Result<(), ObjectStorageError> {
+ let mut format: ObjectStoreFormat = serde_json::from_slice(
+ &PARSEABLE
+ .metastore
+ .get_stream_json(stream_name, false, tenant_id)
+ .await
+ .map_err(|e| ObjectStorageError::MetastoreError(Box::new(e.to_detail())))?,
+ )?;
+ format.dataset_tags = tags.to_owned();
+ format.dataset_labels = labels.to_owned();
+ PARSEABLE
+ .metastore
+ .put_stream_json(&format, stream_name, tenant_id)
+ .await
+ .map_err(|e| ObjectStorageError::MetastoreError(Box::new(e.to_detail())))?;
+
+ Ok(())
+ }
+
/// Updates the first event timestamp in the object store for the specified stream.
///
/// This function retrieves the current object-store format for the given stream,