diff --git a/rs/moq-mux/src/import/hls.rs b/rs/moq-mux/src/import/hls.rs index 4409d4a2f..048806448 100644 --- a/rs/moq-mux/src/import/hls.rs +++ b/rs/moq-mux/src/import/hls.rs @@ -5,15 +5,13 @@ //! independent of any particular HTTP client; callers provide an implementation //! of [`Fetcher`] to perform the actual network I/O. -use std::collections::HashMap; -use std::collections::hash_map::Entry; use std::path::PathBuf; use std::time::Duration; use anyhow::Context; use bytes::Bytes; use m3u8_rs::{ - AlternativeMedia, AlternativeMediaType, Map, MasterPlaylist, MediaPlaylist, MediaSegment, Resolution, VariantStream, + Map, MasterPlaylist, MediaPlaylist, MediaSegment, }; use reqwest::Client; use tracing::{debug, info, warn}; @@ -76,6 +74,12 @@ struct StepOutcome { /// /// Provides `init()` to prime the ingest with initial segments, and `service()` /// to run the continuous ingest loop. +/// +/// In addition to importing media, this generates HLS playlists as MoQ tracks: +/// - `playlist.m3u8`: Main/master playlist referencing all renditions +/// - `video{n}.m3u8`: Media playlist for each video rendition +/// - `audio.m3u8`: Media playlist for the audio rendition (if present) +/// @TODO: multi audio pub struct Hls { /// Broadcast that all CMAF importers write into. broadcast: moq_lite::BroadcastProducer, @@ -83,6 +87,8 @@ pub struct Hls { /// The catalog being produced. catalog: hang::CatalogProducer, + master_playlist_track: Option, + /// fMP4 importers for each discovered video rendition. /// Each importer feeds a separate MoQ track but shares the same catalog. video_importers: Vec, @@ -108,14 +114,20 @@ enum TrackKind { struct TrackState { playlist: Url, + playlist_track: moq_lite::TrackProducer, + track_id: String, + init_track: Option, next_sequence: Option, init_ready: bool, } impl TrackState { - fn new(playlist: Url) -> Self { + fn new(playlist: Url, playlist_track: moq_lite::TrackProducer, track_id: String) -> Self { Self { playlist, + playlist_track, + track_id, + init_track: None, next_sequence: None, init_ready: false, } @@ -140,6 +152,7 @@ impl Hls { Ok(Self { broadcast, catalog, + master_playlist_track: None, video_importers: Vec::new(), passthrough, audio_importer: None, @@ -220,15 +233,17 @@ impl Hls { /// and returns how many segments were written along with the target /// duration to guide scheduling of the next step. async fn step(&mut self) -> anyhow::Result { + // Fetch master playlist, create tracks for each variant advertised. self.ensure_tracks().await?; let mut wrote = 0usize; let mut target_duration = None; + // @TODO: consolidate audio_tracks/video_tracks into media_tracks // Ingest a step from all active video variants. let video_tracks = std::mem::take(&mut self.video); for (index, mut track) in video_tracks.into_iter().enumerate() { - let playlist = self.fetch_media_playlist(track.playlist.clone()).await?; + let mut playlist = self.fetch_media_playlist(track.playlist.clone()).await?; // Use the first video's target duration as the base. if target_duration.is_none() { target_duration = Some(playlist.target_duration); @@ -237,12 +252,16 @@ impl Hls { .consume_segments(TrackKind::Video(index), &mut track, &playlist, None) .await?; wrote += count; + + self.rewrite_segment_locations(&mut playlist, &track.track_id); + + publish_playlist(track.playlist_track.clone(), Playlist::Media(playlist.clone())); self.video.push(track); } // Ingest from the shared audio track, if present. if let Some(mut track) = self.audio.take() { - let playlist = self.fetch_media_playlist(track.playlist.clone()).await?; + let mut playlist = self.fetch_media_playlist(track.playlist.clone()).await?; if target_duration.is_none() { target_duration = Some(playlist.target_duration); } @@ -250,6 +269,10 @@ impl Hls { .consume_segments(TrackKind::Audio, &mut track, &playlist, None) .await?; wrote += count; + + self.rewrite_segment_locations(&mut playlist, &track.track_id); + + publish_playlist(track.playlist_track.clone(), Playlist::Media(playlist.clone())); self.audio = Some(track); } @@ -288,42 +311,56 @@ impl Hls { } let body = self.fetch_bytes(self.base_url.clone()).await?; - if let Ok((_, master)) = m3u8_rs::parse_master_playlist(&body) { - let variants = select_variants(&master); - anyhow::ensure!(!variants.is_empty(), "no usable variants found in master playlist"); - + if let Ok((_, mut master)) = m3u8_rs::parse_master_playlist(&body) { // Create a video track state for every usable variant. - for variant in &variants { - let video_url = resolve_uri(&self.base_url, &variant.uri)?; - self.video.push(TrackState::new(video_url)); + for (index, variant) in master.variants.iter_mut().enumerate() { + let media_url = resolve_uri(&self.base_url, &variant.uri)?; + let track_id = format!("{}{}", if variant.resolution.is_some() { "video" } else { "audio" }, index); + variant.uri = format!("{}.m3u8", track_id); + + let playlist_track = self.broadcast.create_track(moq_lite::Track::new(format!("{}.m3u8", track_id))); + + // @TODO: do we really need separate state for Audio and Video tracks? Can this just be a vec of Media Tracks? + // no reason this couldn't handle subtitles as well. + if track_id.contains("video") { + self.video.push(TrackState::new(media_url, playlist_track, track_id)); + } + // @NOTE: ignoring audio tracks because these really should only be advertised as #EXT-X-MEDIA variants + // I think this is just a bug in the output of our `just hls bbb` ingest, "real" hls playlists shouldn't do this } - // Choose an audio rendition based on the first variant with an audio group. - if let Some(group_id) = variants.iter().find_map(|v| v.audio.as_deref()) { - if let Some(audio_tag) = select_audio(&master, group_id) { - if let Some(uri) = &audio_tag.uri { - let audio_url = resolve_uri(&self.base_url, uri)?; - self.audio = Some(TrackState::new(audio_url)); - } else { - warn!(%group_id, "audio rendition missing URI"); - } - } else { - warn!(%group_id, "audio group not found in master playlist"); + // Audio tracks all live under "alternatives", we'll also handle captions/text tracks here eventually + for (index, alternative) in master.alternatives.iter_mut().enumerate() { + if let Some(uri) = &alternative.uri { + let media_url = resolve_uri(&self.base_url, uri)?; + let track_id = format!("{}{}", &alternative.media_type.to_string().to_lowercase(), index); + alternative.uri = Some(format!("{}.m3u8", track_id)); + let playlist_track = self.broadcast.create_track(moq_lite::Track::new(format!("{}.m3u8", track_id))); + + // @TODO: push this into a generic media track array + self.audio = Some(TrackState::new(media_url, playlist_track, track_id)); } } let audio_url = self.audio.as_ref().map(|a| a.playlist.to_string()); info!( - video_variants = variants.len(), + video_variants = master.variants.len(), audio = audio_url.as_deref().unwrap_or("none"), "selected master playlist renditions" ); + self.master_playlist_track = Some(self.broadcast.create_track(moq_lite::Track::new("playlist.m3u8"))); + if let Some(ref mut track) = self.master_playlist_track { + publish_playlist(track.clone(), Playlist::Master(master)) + } return Ok(()); } // Fallback: treat the provided URL as a single media playlist. - self.video.push(TrackState::new(self.base_url.clone())); + self.video.push(TrackState::new(self.base_url.clone(), + self.broadcast.create_track(moq_lite::Track::new("video0.m3u8")), + "video0".to_string() + )); Ok(()) } @@ -405,7 +442,20 @@ impl Hls { let map = self.find_map(playlist).context("playlist missing EXT-X-MAP")?; let url = resolve_uri(&track.playlist, &map.uri)?; + + if track.init_track.is_none() { + let init_track_name = format!("{}.init.mp4", track.track_id); + track.init_track = Some(self.broadcast.create_track(moq_lite::Track::new(init_track_name))); + } + let mut bytes = self.fetch_bytes(url).await?; + + // Publish init segment to its track + let init_track = track.init_track.as_mut().expect("init_track was just created"); + let mut group = init_track.append_group(); + group.write_frame(bytes.clone()); + group.close(); + let importer = match kind { TrackKind::Video(index) => self.ensure_video_importer_for(index), TrackKind::Audio => self.ensure_audio_importer(), @@ -507,6 +557,17 @@ impl Hls { .get_or_insert_with(|| Fmp4::new(self.broadcast.clone(), self.catalog.clone(), Fmp4Config { passthrough })) } + fn rewrite_segment_locations(&mut self, playlist: &mut MediaPlaylist, track_id: &str) { + let msn = playlist.media_sequence; + // Find and modify the first segment with a map + for (index, segment) in playlist.segments.iter_mut().enumerate() { + if let Some(ref mut map) = segment.map { + map.uri = format!("{}.init.mp4?group=0", track_id); // @TODO: do I need ?group=0 for this? + } + segment.uri = format!("{}.m4s?group={}", track_id, msn + index as u64); + } + } + #[cfg(test)] fn has_video_importer(&self) -> bool { !self.video_importers.is_empty() @@ -518,107 +579,33 @@ impl Hls { } } -fn select_audio<'a>(master: &'a MasterPlaylist, group_id: &str) -> Option<&'a AlternativeMedia> { - let mut first = None; - let mut default = None; - - for alternative in master - .alternatives - .iter() - .filter(|alt| alt.media_type == AlternativeMediaType::Audio && alt.group_id == group_id) - { - if first.is_none() { - first = Some(alternative); - } - if alternative.default { - default = Some(alternative); - break; - } - } - - default.or(first) -} - -fn select_variants(master: &MasterPlaylist) -> Vec<&VariantStream> { - // Helper to extract the first video codec token from the CODECS attribute. - fn first_video_codec(variant: &VariantStream) -> Option<&str> { - let codecs = variant.codecs.as_deref()?; - codecs.split(',').map(|s| s.trim()).find(|s| !s.is_empty()) - } - - // Map codec strings into a coarse "family" so we can prefer H.264 over others. - fn codec_family(codec: &str) -> Option<&'static str> { - if codec.starts_with("avc1.") || codec.starts_with("avc3.") { - Some("h264") - } else { - None - } - } - - // Consider only non-i-frame variants with a URI and a known codec family. - let candidates: Vec<(&VariantStream, &str, &str)> = master - .variants - .iter() - .filter(|variant| !variant.is_i_frame && !variant.uri.is_empty()) - .filter_map(|variant| { - let codec = first_video_codec(variant)?; - let family = codec_family(codec)?; - Some((variant, codec, family)) - }) - .collect(); - if candidates.is_empty() { - return Vec::new(); +fn resolve_uri(base: &Url, value: &str) -> std::result::Result { + if let Ok(url) = Url::parse(value) { + return Ok(url); } - // Prefer families in this order, falling back to the first available. - const FAMILY_PREFERENCE: &[&str] = &["h264"]; - - let families_present: Vec<&str> = candidates.iter().map(|(_, _, fam)| *fam).collect(); - - let target_family = FAMILY_PREFERENCE - .iter() - .find(|fav| families_present.iter().any(|fam| fam == *fav)) - .copied() - .unwrap_or(families_present[0]); - - // Keep only variants in the chosen family. - let family_variants: Vec<&VariantStream> = candidates - .into_iter() - .filter(|(_, _, fam)| *fam == target_family) - .map(|(variant, _, _)| variant) - .collect(); - - // Deduplicate by resolution, keeping the lowest-bandwidth variant for each size. - let mut by_resolution: HashMap, &VariantStream> = HashMap::new(); + base.join(value) +} - for variant in family_variants { - let key = variant.resolution; - let bandwidth = variant.average_bandwidth.unwrap_or(variant.bandwidth); +#[derive(Debug)] +enum Playlist { + Master(MasterPlaylist), + Media(MediaPlaylist), +} - match by_resolution.entry(key) { - Entry::Vacant(entry) => { - entry.insert(variant); - } - Entry::Occupied(mut entry) => { - let existing = entry.get(); - let existing_bw = existing.average_bandwidth.unwrap_or(existing.bandwidth); - if bandwidth < existing_bw { - entry.insert(variant); - } - } - } - } +fn publish_playlist(mut playlist_track: moq_lite::TrackProducer, playlist: Playlist) { + let mut group = playlist_track.append_group(); - by_resolution.values().cloned().collect() -} + let mut v: Vec = Vec::new(); -fn resolve_uri(base: &Url, value: &str) -> std::result::Result { - if let Ok(url) = Url::parse(value) { - return Ok(url); + match playlist { + Playlist::Master(master) => master.write_to(&mut v).unwrap(), + Playlist::Media(media) => media.write_to(&mut v).unwrap(), } - base.join(value) + group.write_frame(v); + group.close(); } #[cfg(test)] diff --git a/rs/moq-relay/src/web.rs b/rs/moq-relay/src/web.rs index abac8117c..dab6061de 100644 --- a/rs/moq-relay/src/web.rs +++ b/rs/moq-relay/src/web.rs @@ -309,6 +309,7 @@ async fn serve_fetch( group: None, frame: Some(frame), deadline, + content_type: None, }), Ok(None) => Err(StatusCode::NOT_FOUND), Err(_) => Err(StatusCode::INTERNAL_SERVER_ERROR), @@ -317,6 +318,7 @@ async fn serve_fetch( group: Some(group), frame: None, deadline, + content_type: content_type_for_track(&track.info.name), }), } }) @@ -329,10 +331,21 @@ async fn serve_fetch( } } +fn content_type_for_track(track_name: &str) -> Option<&'static str> { + if track_name.ends_with(".m3u8") { + Some("application/vnd.apple.mpegurl") + } else if track_name.ends_with(".init") || track_name.ends_with(".m4s") || track_name.ends_with(".mp4") { + Some("video/mp4") + } else { + None + } +} + struct ServeGroup { group: Option, frame: Option, deadline: tokio::time::Instant, + content_type: Option<&'static str>, } impl ServeGroup { @@ -362,7 +375,15 @@ impl ServeGroup { impl IntoResponse for ServeGroup { fn into_response(self) -> Response { - Response::new(Body::new(self)) + let content_type = self.content_type; + let mut response = Response::new(Body::new(self)); + if let Some(ct) = content_type { + response.headers_mut().insert( + axum::http::header::CONTENT_TYPE, + axum::http::HeaderValue::from_static(ct), + ); + } + response } }