Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
9 changes: 8 additions & 1 deletion src/handlers/http/cluster/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -412,7 +412,14 @@ pub async fn sync_streams_with_ingestors(
base_path_without_preceding_slash(),
stream_name
);
let headers = reqwest_headers_clone.clone();
let mut headers = reqwest_headers_clone.clone();

if !headers.contains_key("intra-cluster-userid") {
headers.insert(
reqwest::header::HeaderName::from_static("intra-cluster-userid"),
reqwest::header::HeaderValue::from_str(&PARSEABLE.options.username).unwrap(),
);
}
let body = body_clone.clone();
async move {
let res = INTRA_CLUSTER_CLIENT
Expand Down
23 changes: 22 additions & 1 deletion src/handlers/http/modal/query/querier_logstream.rs
Original file line number Diff line number Diff line change
Expand Up @@ -31,6 +31,9 @@ use tracing::{error, warn};

pub static CREATE_STREAM_LOCK: Mutex<()> = Mutex::const_new(());

use crate::handlers::http::middleware::{CLUSTER_SECRET, CLUSTER_SECRET_HEADER};
use crate::parseable::DEFAULT_TENANT;
use crate::utils::get_user_from_request;
use crate::{
handlers::{
UPDATE_STREAM_KEY,
Expand Down Expand Up @@ -126,7 +129,7 @@ pub async fn put_stream(
let stream_name = stream_name.into_inner();
let tenant_id = get_tenant_id_from_request(&req);
let _guard = CREATE_STREAM_LOCK.lock().await;
let headers = PARSEABLE
let mut headers = PARSEABLE
.create_update_stream(req.headers(), &body, &stream_name, &tenant_id)
.await?;

Expand All @@ -136,6 +139,24 @@ pub async fn put_stream(
false
};

if let Some((_, hash)) = CLUSTER_SECRET.get() {
let userid = get_user_from_request(&req).unwrap();
headers.insert(
actix_web::http::header::HeaderName::from_static(CLUSTER_SECRET_HEADER),
actix_web::http::header::HeaderValue::from_str(hash).unwrap(),
);
headers.insert(
actix_web::http::header::HeaderName::from_static("intra-cluster-tenant"),
actix_web::http::header::HeaderValue::from_str(
tenant_id.as_deref().unwrap_or(DEFAULT_TENANT),
)
.unwrap(),
);
headers.insert(
actix_web::http::header::HeaderName::from_static("intra-cluster-userid"),
actix_web::http::header::HeaderValue::from_str(&userid).unwrap(),
);
}
sync_streams_with_ingestors(headers, body, &stream_name, &tenant_id).await?;

if is_update {
Expand Down
12 changes: 12 additions & 0 deletions src/parseable/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -57,6 +57,7 @@ use crate::{
cluster::{PMETA_STREAM_NAME, sync_streams_with_ingestors},
ingest::PostError,
logstream::error::{CreateStreamError, StreamError},
middleware::{CLUSTER_SECRET, CLUSTER_SECRET_HEADER},
modal::{
ingest_server::INGESTOR_META,
utils::{logstream_utils::PutStreamHeaders, rbac_utils::get_metadata},
Expand Down Expand Up @@ -494,6 +495,17 @@ impl Parseable {
);
header_map.insert(CONTENT_TYPE, HeaderValue::from_static("application/json"));

if let Some((_, hash)) = CLUSTER_SECRET.get() {
header_map.insert(
HeaderName::from_static(CLUSTER_SECRET_HEADER),
HeaderValue::from_str(hash).unwrap(),
);
header_map.insert(
HeaderName::from_static("intra-cluster-tenant"),
HeaderValue::from_str(tenant_id.as_deref().unwrap_or(DEFAULT_TENANT)).unwrap(),
);
}

// Sync only the streams that were created successfully
if matches!(internal_stream_result, Ok(false))
&& let Err(e) = sync_streams_with_ingestors(
Expand Down
Loading