From 19b3a735b1a0d345911a93f3300d7575627ae84c Mon Sep 17 00:00:00 2001 From: Anant Vindal Date: Sat, 28 Feb 2026 00:58:17 +0530 Subject: [PATCH] fix: sync streams --- src/handlers/http/cluster/mod.rs | 9 +++++++- .../http/modal/query/querier_logstream.rs | 23 ++++++++++++++++++- src/parseable/mod.rs | 12 ++++++++++ 3 files changed, 42 insertions(+), 2 deletions(-) diff --git a/src/handlers/http/cluster/mod.rs b/src/handlers/http/cluster/mod.rs index 45ab79097..798718bc0 100644 --- a/src/handlers/http/cluster/mod.rs +++ b/src/handlers/http/cluster/mod.rs @@ -412,7 +412,14 @@ pub async fn sync_streams_with_ingestors( base_path_without_preceding_slash(), stream_name ); - let headers = reqwest_headers_clone.clone(); + let mut headers = reqwest_headers_clone.clone(); + + if !headers.contains_key("intra-cluster-userid") { + headers.insert( + reqwest::header::HeaderName::from_static("intra-cluster-userid"), + reqwest::header::HeaderValue::from_str(&PARSEABLE.options.username).unwrap(), + ); + } let body = body_clone.clone(); async move { let res = INTRA_CLUSTER_CLIENT diff --git a/src/handlers/http/modal/query/querier_logstream.rs b/src/handlers/http/modal/query/querier_logstream.rs index 855feba2f..ecd1860dc 100644 --- a/src/handlers/http/modal/query/querier_logstream.rs +++ b/src/handlers/http/modal/query/querier_logstream.rs @@ -31,6 +31,9 @@ use tracing::{error, warn}; pub static CREATE_STREAM_LOCK: Mutex<()> = Mutex::const_new(()); +use crate::handlers::http::middleware::{CLUSTER_SECRET, CLUSTER_SECRET_HEADER}; +use crate::parseable::DEFAULT_TENANT; +use crate::utils::get_user_from_request; use crate::{ handlers::{ UPDATE_STREAM_KEY, @@ -126,7 +129,7 @@ pub async fn put_stream( let stream_name = stream_name.into_inner(); let tenant_id = get_tenant_id_from_request(&req); let _guard = CREATE_STREAM_LOCK.lock().await; - let headers = PARSEABLE + let mut headers = PARSEABLE .create_update_stream(req.headers(), &body, &stream_name, &tenant_id) .await?; @@ -136,6 +139,24 @@ pub async fn put_stream( false }; + if let Some((_, hash)) = CLUSTER_SECRET.get() { + let userid = get_user_from_request(&req).unwrap(); + headers.insert( + actix_web::http::header::HeaderName::from_static(CLUSTER_SECRET_HEADER), + actix_web::http::header::HeaderValue::from_str(hash).unwrap(), + ); + headers.insert( + actix_web::http::header::HeaderName::from_static("intra-cluster-tenant"), + actix_web::http::header::HeaderValue::from_str( + tenant_id.as_deref().unwrap_or(DEFAULT_TENANT), + ) + .unwrap(), + ); + headers.insert( + actix_web::http::header::HeaderName::from_static("intra-cluster-userid"), + actix_web::http::header::HeaderValue::from_str(&userid).unwrap(), + ); + } sync_streams_with_ingestors(headers, body, &stream_name, &tenant_id).await?; if is_update { diff --git a/src/parseable/mod.rs b/src/parseable/mod.rs index 2b379cfd2..c6072798d 100644 --- a/src/parseable/mod.rs +++ b/src/parseable/mod.rs @@ -57,6 +57,7 @@ use crate::{ cluster::{PMETA_STREAM_NAME, sync_streams_with_ingestors}, ingest::PostError, logstream::error::{CreateStreamError, StreamError}, + middleware::{CLUSTER_SECRET, CLUSTER_SECRET_HEADER}, modal::{ ingest_server::INGESTOR_META, utils::{logstream_utils::PutStreamHeaders, rbac_utils::get_metadata}, @@ -494,6 +495,17 @@ impl Parseable { ); header_map.insert(CONTENT_TYPE, HeaderValue::from_static("application/json")); + if let Some((_, hash)) = CLUSTER_SECRET.get() { + header_map.insert( + HeaderName::from_static(CLUSTER_SECRET_HEADER), + HeaderValue::from_str(hash).unwrap(), + ); + header_map.insert( + HeaderName::from_static("intra-cluster-tenant"), + HeaderValue::from_str(tenant_id.as_deref().unwrap_or(DEFAULT_TENANT)).unwrap(), + ); + } + // Sync only the streams that were created successfully if matches!(internal_stream_result, Ok(false)) && let Err(e) = sync_streams_with_ingestors(