From 1ac44944f215fc4368dbf673e2947c093adbbbfd Mon Sep 17 00:00:00 2001 From: Luke Curley Date: Mon, 23 Feb 2026 20:09:48 -0800 Subject: [PATCH 1/8] Bump the JS packages. --- js/hang/package.json | 2 +- js/publish/package.json | 4 ++-- js/watch/package.json | 4 ++-- 3 files changed, 5 insertions(+), 5 deletions(-) diff --git a/js/hang/package.json b/js/hang/package.json index e4d3b8a0e..3cacdf60a 100644 --- a/js/hang/package.json +++ b/js/hang/package.json @@ -2,7 +2,7 @@ "name": "@moq/hang", "type": "module", "version": "0.2.0", - "description": "Media over QUIC library", + "description": "WebCodecs-based media format for MoQ", "license": "(MIT OR Apache-2.0)", "repository": "github:moq-dev/moq", "exports": { diff --git a/js/publish/package.json b/js/publish/package.json index abd381a36..f963e4558 100644 --- a/js/publish/package.json +++ b/js/publish/package.json @@ -1,8 +1,8 @@ { "name": "@moq/publish", "type": "module", - "version": "0.1.1", - "description": "Publish media to Media over QUIC streams", + "version": "0.2.0", + "description": "Publish Media over QUIC broadcasts", "license": "(MIT OR Apache-2.0)", "repository": "github:moq-dev/moq", "exports": { diff --git a/js/watch/package.json b/js/watch/package.json index 1883e6d16..5649f7133 100644 --- a/js/watch/package.json +++ b/js/watch/package.json @@ -1,8 +1,8 @@ { "name": "@moq/watch", "type": "module", - "version": "0.1.1", - "description": "Watch/subscribe to Media over QUIC streams", + "version": "0.2.0", + "description": "Watch Media over QUIC broadcasts", "license": "(MIT OR Apache-2.0)", "repository": "github:moq-dev/moq", "exports": { From 2e462efa2db7d526c323196fce81fac8ba64f05d Mon Sep 17 00:00:00 2001 From: Luke Curley Date: Tue, 24 Feb 2026 05:46:32 -0800 Subject: [PATCH 2/8] Initial moq-lite-03 --- js/lite/src/lite/fetch.ts | 40 ++++++++ js/lite/src/lite/index.ts | 2 + js/lite/src/lite/probe.ts | 26 +++++ js/lite/src/lite/stream.ts | 2 + js/lite/src/lite/subscribe.ts | 148 +++++++++++++++++++++++------ js/lite/src/lite/version.ts | 4 + rs/moq-lite/src/lite/fetch.rs | 42 ++++++++ rs/moq-lite/src/lite/group.rs | 27 ++++++ rs/moq-lite/src/lite/mod.rs | 6 ++ rs/moq-lite/src/lite/probe.rs | 25 +++++ rs/moq-lite/src/lite/publisher.rs | 4 + rs/moq-lite/src/lite/stream.rs | 2 + rs/moq-lite/src/lite/subscribe.rs | 108 +++++++++++++++++++-- rs/moq-lite/src/lite/subscriber.rs | 4 + rs/moq-lite/src/lite/version.rs | 6 ++ rs/moq-lite/src/setup.rs | 20 +++- rs/moq-lite/src/version.rs | 2 +- 17 files changed, 428 insertions(+), 40 deletions(-) create mode 100644 js/lite/src/lite/fetch.ts create mode 100644 js/lite/src/lite/probe.ts create mode 100644 rs/moq-lite/src/lite/fetch.rs create mode 100644 rs/moq-lite/src/lite/probe.rs diff --git a/js/lite/src/lite/fetch.ts b/js/lite/src/lite/fetch.ts new file mode 100644 index 000000000..071eba687 --- /dev/null +++ b/js/lite/src/lite/fetch.ts @@ -0,0 +1,40 @@ +import * as Path from "../path.ts"; +import type { Reader, Writer } from "../stream.ts"; +import * as Message from "./message.ts"; + +export class Fetch { + broadcast: Path.Valid; + track: string; + priority: number; + group: number; + + constructor(broadcast: Path.Valid, track: string, priority: number, group: number) { + this.broadcast = broadcast; + this.track = track; + this.priority = priority; + this.group = group; + } + + async #encode(w: Writer) { + await w.string(this.broadcast); + await w.string(this.track); + await w.u8(this.priority); + await w.u53(this.group); + } + + static async #decode(r: Reader): Promise { + const broadcast = Path.from(await r.string()); + const track = await r.string(); + const priority = await r.u8(); + const group = await r.u53(); + return new Fetch(broadcast, track, priority, group); + } + + async encode(w: Writer): Promise { + return Message.encode(w, this.#encode.bind(this)); + } + + static async decode(r: Reader): Promise { + return Message.decode(r, Fetch.#decode); + } +} diff --git a/js/lite/src/lite/index.ts b/js/lite/src/lite/index.ts index a28f40a3d..9314476d6 100644 --- a/js/lite/src/lite/index.ts +++ b/js/lite/src/lite/index.ts @@ -1,6 +1,8 @@ export * from "./announce.ts"; export * from "./connection.ts"; +export * from "./fetch.ts"; export * from "./group.ts"; +export * from "./probe.ts"; export * from "./session.ts"; export * from "./stream.ts"; export * from "./subscribe.ts"; diff --git a/js/lite/src/lite/probe.ts b/js/lite/src/lite/probe.ts new file mode 100644 index 000000000..7b848b9ff --- /dev/null +++ b/js/lite/src/lite/probe.ts @@ -0,0 +1,26 @@ +import type { Reader, Writer } from "../stream.ts"; +import * as Message from "./message.ts"; + +export class Probe { + bitrate: number; + + constructor(bitrate: number) { + this.bitrate = bitrate; + } + + async #encode(w: Writer) { + await w.u53(this.bitrate); + } + + static async #decode(r: Reader): Promise { + return new Probe(await r.u53()); + } + + async encode(w: Writer): Promise { + return Message.encode(w, this.#encode.bind(this)); + } + + static async decode(r: Reader): Promise { + return Message.decode(r, Probe.#decode); + } +} diff --git a/js/lite/src/lite/stream.ts b/js/lite/src/lite/stream.ts index 10cdd7265..632771585 100644 --- a/js/lite/src/lite/stream.ts +++ b/js/lite/src/lite/stream.ts @@ -10,6 +10,8 @@ export const StreamId = { Session: 0, Announce: 1, Subscribe: 2, + Fetch: 3, + Probe: 4, ClientCompat: 0x20, ServerCompat: 0x21, } as const; diff --git a/js/lite/src/lite/subscribe.ts b/js/lite/src/lite/subscribe.ts index 7f2c0a756..fb68ddb62 100644 --- a/js/lite/src/lite/subscribe.ts +++ b/js/lite/src/lite/subscribe.ts @@ -5,30 +5,57 @@ import { Version } from "./version.ts"; export class SubscribeUpdate { priority: number; - - constructor(priority: number) { + ordered: boolean; + maxLatency: number; + startGroup: number; + endGroup: number; + + constructor( + priority: number, + ordered: boolean = true, + maxLatency: number = 0, + startGroup: number = 0, + endGroup: number = 0, + ) { this.priority = priority; + this.ordered = ordered; + this.maxLatency = maxLatency; + this.startGroup = startGroup; + this.endGroup = endGroup; } - async #encode(w: Writer) { + async #encode(w: Writer, version?: Version) { await w.u8(this.priority); + if (version === Version.DRAFT_03) { + await w.bool(this.ordered); + await w.u53(this.maxLatency); + await w.u53(this.startGroup); + await w.u53(this.endGroup); + } } - static async #decode(r: Reader): Promise { + static async #decode(r: Reader, version?: Version): Promise { const priority = await r.u8(); + if (version === Version.DRAFT_03) { + const ordered = await r.bool(); + const maxLatency = await r.u53(); + const startGroup = await r.u53(); + const endGroup = await r.u53(); + return new SubscribeUpdate(priority, ordered, maxLatency, startGroup, endGroup); + } return new SubscribeUpdate(priority); } - async encode(w: Writer): Promise { - return Message.encode(w, this.#encode.bind(this)); + async encode(w: Writer, version?: Version): Promise { + return Message.encode(w, (w) => this.#encode(w, version)); } - static async decode(r: Reader): Promise { - return Message.decode(r, SubscribeUpdate.#decode); + static async decode(r: Reader, version?: Version): Promise { + return Message.decode(r, (r) => SubscribeUpdate.#decode(r, version)); } - static async decodeMaybe(r: Reader): Promise { - return Message.decodeMaybe(r, SubscribeUpdate.#decode); + static async decodeMaybe(r: Reader, version?: Version): Promise { + return Message.decodeMaybe(r, (r) => SubscribeUpdate.#decode(r, version)); } } @@ -37,35 +64,65 @@ export class Subscribe { broadcast: Path.Valid; track: string; priority: number; - - constructor(id: bigint, broadcast: Path.Valid, track: string, priority: number) { + ordered: boolean; + maxLatency: number; + startGroup: number; + endGroup: number; + + constructor( + id: bigint, + broadcast: Path.Valid, + track: string, + priority: number, + ordered: boolean = true, + maxLatency: number = 0, + startGroup: number = 0, + endGroup: number = 0, + ) { this.id = id; this.broadcast = broadcast; this.track = track; this.priority = priority; + this.ordered = ordered; + this.maxLatency = maxLatency; + this.startGroup = startGroup; + this.endGroup = endGroup; } - async #encode(w: Writer) { + async #encode(w: Writer, version?: Version) { await w.u62(this.id); await w.string(this.broadcast); await w.string(this.track); await w.u8(this.priority); + if (version === Version.DRAFT_03) { + await w.bool(this.ordered); + await w.u53(this.maxLatency); + await w.u53(this.startGroup); + await w.u53(this.endGroup); + } } - static async #decode(r: Reader): Promise { + static async #decode(r: Reader, version?: Version): Promise { const id = await r.u62(); const broadcast = Path.from(await r.string()); const track = await r.string(); const priority = await r.u8(); + if (version === Version.DRAFT_03) { + const ordered = await r.bool(); + const maxLatency = await r.u53(); + const startGroup = await r.u53(); + const endGroup = await r.u53(); + return new Subscribe(id, broadcast, track, priority, ordered, maxLatency, startGroup, endGroup); + } return new Subscribe(id, broadcast, track, priority); } - async encode(w: Writer): Promise { - return Message.encode(w, this.#encode.bind(this)); + async encode(w: Writer, version?: Version): Promise { + return Message.encode(w, (w) => this.#encode(w, version)); } - static async decode(r: Reader): Promise { - return Message.decode(r, Subscribe.#decode); + static async decode(r: Reader, version?: Version): Promise { + return Message.decode(r, (r) => Subscribe.#decode(r, version)); } } @@ -73,35 +130,68 @@ export class SubscribeOk { // The version readonly version: Version; priority?: number; - - constructor({ version, priority = undefined }: { version: Version; priority?: number }) { + ordered?: boolean; + maxLatency?: number; + startGroup?: number; + endGroup?: number; + + constructor({ + version, + priority = undefined, + ordered = undefined, + maxLatency = undefined, + startGroup = undefined, + endGroup = undefined, + }: { + version: Version; + priority?: number; + ordered?: boolean; + maxLatency?: number; + startGroup?: number; + endGroup?: number; + }) { this.version = version; this.priority = priority; + this.ordered = ordered; + this.maxLatency = maxLatency; + this.startGroup = startGroup; + this.endGroup = endGroup; } async #encode(w: Writer) { - if (this.version === Version.DRAFT_02) { + if (this.version === Version.DRAFT_03) { + await w.u8(this.priority ?? 0); + await w.bool(this.ordered ?? true); + await w.u53(this.maxLatency ?? 0); + await w.u53(this.startGroup ?? 0); + await w.u53(this.endGroup ?? 0); + } else if (this.version === Version.DRAFT_02) { // noop } else if (this.version === Version.DRAFT_01) { await w.u8(this.priority ?? 0); - } else { - const version: never = this.version; - throw new Error(`unsupported version: ${version}`); } } static async #decode(version: Version, r: Reader): Promise { let priority: number | undefined; - if (version === Version.DRAFT_02) { + let ordered: boolean | undefined; + let maxLatency: number | undefined; + let startGroup: number | undefined; + let endGroup: number | undefined; + + if (version === Version.DRAFT_03) { + priority = await r.u8(); + ordered = await r.bool(); + maxLatency = await r.u53(); + startGroup = await r.u53(); + endGroup = await r.u53(); + } else if (version === Version.DRAFT_02) { // noop } else if (version === Version.DRAFT_01) { priority = await r.u8(); - } else { - const v: never = version; - throw new Error(`unsupported version: ${v}`); } - return new SubscribeOk({ version, priority }); + return new SubscribeOk({ version, priority, ordered, maxLatency, startGroup, endGroup }); } async encode(w: Writer): Promise { diff --git a/js/lite/src/lite/version.ts b/js/lite/src/lite/version.ts index 06d64bfe5..152892310 100644 --- a/js/lite/src/lite/version.ts +++ b/js/lite/src/lite/version.ts @@ -1,6 +1,7 @@ export const Version = { DRAFT_01: 0xff0dad01, DRAFT_02: 0xff0dad02, + DRAFT_03: 0xff0dad03, } as const; export type Version = (typeof Version)[keyof typeof Version]; @@ -8,3 +9,6 @@ export type Version = (typeof Version)[keyof typeof Version]; /// The WebTransport subprotocol identifier for moq-lite. /// Version negotiation still happens via SETUP when this is used. export const ALPN = "moql"; + +/// The ALPN string for Draft03, which uses ALPN-based version negotiation. +export const ALPN_03 = "moq-lite-03"; diff --git a/rs/moq-lite/src/lite/fetch.rs b/rs/moq-lite/src/lite/fetch.rs new file mode 100644 index 000000000..fd0c6efb4 --- /dev/null +++ b/rs/moq-lite/src/lite/fetch.rs @@ -0,0 +1,42 @@ +use std::borrow::Cow; + +use crate::{ + Path, + coding::{Decode, DecodeError, Encode}, + lite::{Message, Version}, +}; + +/// Sent by the subscriber to fetch a specific group from a track. +/// +/// Draft03 only. +#[allow(dead_code)] +#[derive(Clone, Debug)] +pub struct Fetch<'a> { + pub broadcast: Path<'a>, + pub track: Cow<'a, str>, + pub priority: u8, + pub group: u64, +} + +impl Message for Fetch<'_> { + fn decode_msg(r: &mut R, version: Version) -> Result { + let broadcast = Path::decode(r, version)?; + let track = Cow::::decode(r, version)?; + let priority = u8::decode(r, version)?; + let group = u64::decode(r, version)?; + + Ok(Self { + broadcast, + track, + priority, + group, + }) + } + + fn encode_msg(&self, w: &mut W, version: Version) { + self.broadcast.encode(w, version); + self.track.encode(w, version); + self.priority.encode(w, version); + self.group.encode(w, version); + } +} diff --git a/rs/moq-lite/src/lite/group.rs b/rs/moq-lite/src/lite/group.rs index 3e22a8a43..f3b1de0c7 100644 --- a/rs/moq-lite/src/lite/group.rs +++ b/rs/moq-lite/src/lite/group.rs @@ -25,3 +25,30 @@ impl Message for Group { self.sequence.encode(w, version); } } + +/// Indicates that one or more groups have been dropped. +/// +/// Draft03 only. +#[allow(dead_code)] +#[derive(Clone, Debug)] +pub struct GroupDrop { + pub sequence: u64, + pub count: u64, + pub error: u64, +} + +impl Message for GroupDrop { + fn decode_msg(r: &mut R, version: Version) -> Result { + Ok(Self { + sequence: u64::decode(r, version)?, + count: u64::decode(r, version)?, + error: u64::decode(r, version)?, + }) + } + + fn encode_msg(&self, w: &mut W, version: Version) { + self.sequence.encode(w, version); + self.count.encode(w, version); + self.error.encode(w, version); + } +} diff --git a/rs/moq-lite/src/lite/mod.rs b/rs/moq-lite/src/lite/mod.rs index 37db57de1..454caf84f 100644 --- a/rs/moq-lite/src/lite/mod.rs +++ b/rs/moq-lite/src/lite/mod.rs @@ -5,11 +5,13 @@ //! Specification: [] mod announce; +mod fetch; mod group; mod info; mod message; mod parameters; mod priority; +mod probe; mod publisher; mod session; mod stream; @@ -18,10 +20,14 @@ mod subscriber; mod version; pub use announce::*; +#[allow(unused_imports)] +pub use fetch::*; pub use group::*; pub use info::*; pub use message::*; pub use parameters::*; +#[allow(unused_imports)] +pub use probe::*; use publisher::*; pub(super) use session::*; pub use stream::*; diff --git a/rs/moq-lite/src/lite/probe.rs b/rs/moq-lite/src/lite/probe.rs new file mode 100644 index 000000000..4612df424 --- /dev/null +++ b/rs/moq-lite/src/lite/probe.rs @@ -0,0 +1,25 @@ +use crate::{ + coding::*, + lite::{Message, Version}, +}; + +/// Sent to probe the available bitrate. +/// +/// Draft03 only. +#[allow(dead_code)] +#[derive(Clone, Debug)] +pub struct Probe { + pub bitrate: u64, +} + +impl Message for Probe { + fn decode_msg(r: &mut R, version: Version) -> Result { + let bitrate = u64::decode(r, version)?; + + Ok(Self { bitrate }) + } + + fn encode_msg(&self, w: &mut W, version: Version) { + self.bitrate.encode(w, version); + } +} diff --git a/rs/moq-lite/src/lite/publisher.rs b/rs/moq-lite/src/lite/publisher.rs index 893122989..b36b8bbe0 100644 --- a/rs/moq-lite/src/lite/publisher.rs +++ b/rs/moq-lite/src/lite/publisher.rs @@ -194,6 +194,10 @@ impl Publisher { let info = lite::SubscribeOk { priority: track.info.priority, + ordered: true, + max_latency: 0, + start_group: 0, + end_group: 0, }; stream.writer.encode(&info).await?; diff --git a/rs/moq-lite/src/lite/stream.rs b/rs/moq-lite/src/lite/stream.rs index 281ac20b3..4528c5cae 100644 --- a/rs/moq-lite/src/lite/stream.rs +++ b/rs/moq-lite/src/lite/stream.rs @@ -8,6 +8,8 @@ pub enum ControlType { Session = 0, Announce = 1, Subscribe = 2, + Fetch = 3, + Probe = 4, } impl Decode for ControlType { diff --git a/rs/moq-lite/src/lite/subscribe.rs b/rs/moq-lite/src/lite/subscribe.rs index 36052f2b8..5289c019f 100644 --- a/rs/moq-lite/src/lite/subscribe.rs +++ b/rs/moq-lite/src/lite/subscribe.rs @@ -15,6 +15,10 @@ pub struct Subscribe<'a> { pub broadcast: Path<'a>, pub track: Cow<'a, str>, pub priority: u8, + pub ordered: bool, + pub max_latency: u64, + pub start_group: u64, + pub end_group: u64, } impl Message for Subscribe<'_> { @@ -24,11 +28,25 @@ impl Message for Subscribe<'_> { let track = Cow::::decode(r, version)?; let priority = u8::decode(r, version)?; + let (ordered, max_latency, start_group, end_group) = if version == Version::Draft03 { + let ordered = u8::decode(r, version)? != 0; + let max_latency = u64::decode(r, version)?; + let start_group = u64::decode(r, version)?; + let end_group = u64::decode(r, version)?; + (ordered, max_latency, start_group, end_group) + } else { + (true, 0, 0, 0) + }; + Ok(Self { id, broadcast, track, priority, + ordered, + max_latency, + start_group, + end_group, }) } @@ -37,28 +55,106 @@ impl Message for Subscribe<'_> { self.broadcast.encode(w, version); self.track.encode(w, version); self.priority.encode(w, version); + + if version == Version::Draft03 { + (self.ordered as u8).encode(w, version); + self.max_latency.encode(w, version); + self.start_group.encode(w, version); + self.end_group.encode(w, version); + } } } #[derive(Clone, Debug)] pub struct SubscribeOk { pub priority: u8, + pub ordered: bool, + pub max_latency: u64, + pub start_group: u64, + pub end_group: u64, } impl Message for SubscribeOk { fn encode_msg(&self, w: &mut W, version: Version) { - if version == Version::Draft01 { + if version == Version::Draft03 { + self.priority.encode(w, version); + (self.ordered as u8).encode(w, version); + self.max_latency.encode(w, version); + self.start_group.encode(w, version); + self.end_group.encode(w, version); + } else if version == Version::Draft01 { self.priority.encode(w, version); } } fn decode_msg(r: &mut R, version: Version) -> Result { - let priority = if version == Version::Draft01 { - u8::decode(r, version)? + if version == Version::Draft03 { + let priority = u8::decode(r, version)?; + let ordered = u8::decode(r, version)? != 0; + let max_latency = u64::decode(r, version)?; + let start_group = u64::decode(r, version)?; + let end_group = u64::decode(r, version)?; + + Ok(Self { + priority, + ordered, + max_latency, + start_group, + end_group, + }) } else { - 0 - }; + let priority = if version == Version::Draft01 { + u8::decode(r, version)? + } else { + 0 + }; - Ok(Self { priority }) + Ok(Self { + priority, + ordered: true, + max_latency: 0, + start_group: 0, + end_group: 0, + }) + } + } +} + +/// Sent by the subscriber to update subscription parameters. +/// +/// Draft03 only. +#[allow(dead_code)] +#[derive(Clone, Debug)] +pub struct SubscribeUpdate { + pub priority: u8, + pub ordered: bool, + pub max_latency: u64, + pub start_group: u64, + pub end_group: u64, +} + +impl Message for SubscribeUpdate { + fn decode_msg(r: &mut R, version: Version) -> Result { + let priority = u8::decode(r, version)?; + let ordered = u8::decode(r, version)? != 0; + let max_latency = u64::decode(r, version)?; + let start_group = u64::decode(r, version)?; + let end_group = u64::decode(r, version)?; + + Ok(Self { + priority, + ordered, + max_latency, + start_group, + end_group, + }) + } + + fn encode_msg(&self, w: &mut W, version: Version) { + self.priority.encode(w, version); + (self.ordered as u8).encode(w, version); + self.max_latency.encode(w, version); + self.start_group.encode(w, version); + self.end_group.encode(w, version); } } diff --git a/rs/moq-lite/src/lite/subscriber.rs b/rs/moq-lite/src/lite/subscriber.rs index 9ab5cf41b..871296dba 100644 --- a/rs/moq-lite/src/lite/subscriber.rs +++ b/rs/moq-lite/src/lite/subscriber.rs @@ -177,6 +177,10 @@ impl Subscriber { broadcast: broadcast.to_owned(), track: (&track.info.name).into(), priority: track.info.priority, + ordered: true, + max_latency: 0, + start_group: 0, + end_group: 0, }; tracing::info!(id, broadcast = %self.log_path(&broadcast), track = %track.info.name, "subscribe started"); diff --git a/rs/moq-lite/src/lite/version.rs b/rs/moq-lite/src/lite/version.rs index 62e462f8d..a30e1f1de 100644 --- a/rs/moq-lite/src/lite/version.rs +++ b/rs/moq-lite/src/lite/version.rs @@ -6,11 +6,15 @@ use crate::coding; /// In the future we'll use ALPN instead. pub const ALPN: &str = "moql"; +/// The ALPN string for Draft03, which uses ALPN-based version negotiation. +pub const ALPN_03: &str = "moq-lite-03"; + #[derive(Clone, Copy, PartialEq, Eq, Hash, Debug)] #[repr(u64)] pub enum Version { Draft01 = 0xff0dad01, Draft02 = 0xff0dad02, + Draft03 = 0xff0dad03, } impl TryFrom for Version { @@ -21,6 +25,8 @@ impl TryFrom for Version { Ok(Self::Draft01) } else if value == Self::Draft02.coding() { Ok(Self::Draft02) + } else if value == Self::Draft03.coding() { + Ok(Self::Draft03) } else { Err(()) } diff --git a/rs/moq-lite/src/setup.rs b/rs/moq-lite/src/setup.rs index a0f4109d8..5e5e5d1bf 100644 --- a/rs/moq-lite/src/setup.rs +++ b/rs/moq-lite/src/setup.rs @@ -26,6 +26,7 @@ impl Client { // Draft15+: no versions list, parameters only. } Version::Ietf(ietf::Version::Draft14) + | Version::Lite(lite::Version::Draft03) | Version::Lite(lite::Version::Draft02) | Version::Lite(lite::Version::Draft01) => self.versions.encode(w, v), }; @@ -45,7 +46,9 @@ impl Decode for Client { Version::Ietf(ietf::Version::Draft14 | ietf::Version::Draft15 | ietf::Version::Draft16) => { u16::decode(r, v)? as usize } - Version::Lite(lite::Version::Draft02 | lite::Version::Draft01) => u64::decode(r, v)? as usize, + Version::Lite(lite::Version::Draft03 | lite::Version::Draft02 | lite::Version::Draft01) => { + u64::decode(r, v)? as usize + } }; if r.remaining() < size { @@ -60,6 +63,7 @@ impl Decode for Client { coding::Versions::from([v.into()]) } Version::Ietf(ietf::Version::Draft14) + | Version::Lite(lite::Version::Draft03) | Version::Lite(lite::Version::Draft02) | Version::Lite(lite::Version::Draft01) => coding::Versions::decode(&mut msg, v)?, }; @@ -84,7 +88,9 @@ impl Encode for Client { Version::Ietf(ietf::Version::Draft14 | ietf::Version::Draft15 | ietf::Version::Draft16) => { u16::try_from(size).expect("message too large for u16").encode(w, v) } - Version::Lite(lite::Version::Draft02 | lite::Version::Draft01) => (size as u64).encode(w, v), + Version::Lite(lite::Version::Draft03 | lite::Version::Draft02 | lite::Version::Draft01) => { + (size as u64).encode(w, v) + } } self.encode_inner(w, v); } @@ -107,6 +113,7 @@ impl Server { // Draft15+: No version field, parameters only. } Version::Ietf(ietf::Version::Draft14) + | Version::Lite(lite::Version::Draft03) | Version::Lite(lite::Version::Draft02) | Version::Lite(lite::Version::Draft01) => self.version.encode(w, v), }; @@ -126,7 +133,9 @@ impl Encode for Server { Version::Ietf(ietf::Version::Draft14 | ietf::Version::Draft15 | ietf::Version::Draft16) => { u16::try_from(size).expect("message too large for u16").encode(w, v) } - Version::Lite(lite::Version::Draft02 | lite::Version::Draft01) => (size as u64).encode(w, v), + Version::Lite(lite::Version::Draft03 | lite::Version::Draft02 | lite::Version::Draft01) => { + (size as u64).encode(w, v) + } } self.encode_inner(w, v); @@ -144,7 +153,9 @@ impl Decode for Server { Version::Ietf(ietf::Version::Draft14 | ietf::Version::Draft15 | ietf::Version::Draft16) => { u16::decode(r, v)? as usize } - Version::Lite(lite::Version::Draft02 | lite::Version::Draft01) => u64::decode(r, v)? as usize, + Version::Lite(lite::Version::Draft03 | lite::Version::Draft02 | lite::Version::Draft01) => { + u64::decode(r, v)? as usize + } }; if r.remaining() < size { @@ -155,6 +166,7 @@ impl Decode for Server { let version = match v { Version::Ietf(ietf::Version::Draft15 | ietf::Version::Draft16) => v.into(), Version::Ietf(ietf::Version::Draft14) + | Version::Lite(lite::Version::Draft03) | Version::Lite(lite::Version::Draft02) | Version::Lite(lite::Version::Draft01) => coding::Version::decode(&mut msg, v)?, }; diff --git a/rs/moq-lite/src/version.rs b/rs/moq-lite/src/version.rs index aefc36034..8a6083136 100644 --- a/rs/moq-lite/src/version.rs +++ b/rs/moq-lite/src/version.rs @@ -10,7 +10,7 @@ pub(crate) const NEGOTIATED: [Version; 3] = [ ]; /// ALPN strings for supported versions. -pub const ALPNS: &[&str] = &[lite::ALPN, ietf::ALPN_14, ietf::ALPN_15, ietf::ALPN_16]; +pub const ALPNS: &[&str] = &[lite::ALPN_03, lite::ALPN, ietf::ALPN_14, ietf::ALPN_15, ietf::ALPN_16]; // A combination of ietf::Version and lite::Version. #[derive(Debug, Clone, Copy, PartialEq, Eq, Hash)] From e2ff28abd9ea9da078c721085c98cb3be3cde957 Mon Sep 17 00:00:00 2001 From: Luke Curley Date: Tue, 24 Feb 2026 09:23:08 -0800 Subject: [PATCH 3/8] Announce changes --- js/lite/src/lite/announce.ts | 55 ++++++++-- js/lite/src/lite/connection.ts | 2 +- js/lite/src/lite/publisher.ts | 27 +++-- js/lite/src/lite/subscribe.ts | 155 +++++++++++++++++++---------- js/lite/src/lite/subscriber.ts | 36 ++++--- rs/moq-lite/src/lite/announce.rs | 33 ++++-- rs/moq-lite/src/lite/publisher.rs | 64 ++++++++---- rs/moq-lite/src/lite/subscribe.rs | 100 ++++++++++--------- rs/moq-lite/src/lite/subscriber.rs | 17 +++- 9 files changed, 326 insertions(+), 163 deletions(-) diff --git a/js/lite/src/lite/announce.ts b/js/lite/src/lite/announce.ts index 3a5140f68..47a40709c 100644 --- a/js/lite/src/lite/announce.ts +++ b/js/lite/src/lite/announce.ts @@ -1,37 +1,69 @@ import * as Path from "../path.ts"; import type { Reader, Writer } from "../stream.ts"; import * as Message from "./message.ts"; +import { Version } from "./version.ts"; + +function unreachable(v: never): never { + throw new Error(`unsupported version: ${v}`); +} export class Announce { suffix: Path.Valid; active: boolean; + hops: number; - constructor(suffix: Path.Valid, active: boolean) { + constructor(suffix: Path.Valid, active: boolean, hops: number = 0) { this.suffix = suffix; this.active = active; + this.hops = hops; } - async #encode(w: Writer) { + async #encode(w: Writer, version: Version) { await w.bool(this.active); await w.string(this.suffix); + + switch (version) { + case Version.DRAFT_03: + await w.u53(this.hops); + break; + case Version.DRAFT_01: + case Version.DRAFT_02: + break; + default: + unreachable(version); + } } - static async #decode(r: Reader): Promise { + static async #decode(r: Reader, version: Version): Promise { const active = await r.bool(); const suffix = Path.from(await r.string()); - return new Announce(suffix, active); + + let hops: number; + switch (version) { + case Version.DRAFT_03: + hops = await r.u53(); + break; + case Version.DRAFT_01: + case Version.DRAFT_02: + hops = 0; + break; + default: + unreachable(version); + } + + return new Announce(suffix, active, hops); } - async encode(w: Writer): Promise { - return Message.encode(w, this.#encode.bind(this)); + async encode(w: Writer, version: Version): Promise { + return Message.encode(w, (w) => this.#encode(w, version)); } - static async decode(r: Reader): Promise { - return Message.decode(r, Announce.#decode); + static async decode(r: Reader, version: Version): Promise { + return Message.decode(r, (r) => Announce.#decode(r, version)); } - static async decodeMaybe(r: Reader): Promise { - return Message.decodeMaybe(r, Announce.#decode); + static async decodeMaybe(r: Reader, version: Version): Promise { + return Message.decodeMaybe(r, (r) => Announce.#decode(r, version)); } } @@ -60,6 +92,9 @@ export class AnnounceInterest { } } +/// Sent after setup to communicate the initially announced paths. +/// +/// Used by Draft01/Draft02 only. Draft03 uses individual Announce messages instead. export class AnnounceInit { suffixes: Path.Valid[]; diff --git a/js/lite/src/lite/connection.ts b/js/lite/src/lite/connection.ts index c5bb3f07d..89efbc047 100644 --- a/js/lite/src/lite/connection.ts +++ b/js/lite/src/lite/connection.ts @@ -162,7 +162,7 @@ export class Connection implements Established { await this.#publisher.runAnnounce(msg, stream); return; } else if (typ === StreamId.Subscribe) { - const msg = await Subscribe.decode(stream.reader); + const msg = await Subscribe.decode(stream.reader, this.version); await this.#publisher.runSubscribe(msg, stream); return; } else { diff --git a/js/lite/src/lite/publisher.ts b/js/lite/src/lite/publisher.ts index 95ff646fe..6e8e5cfc3 100644 --- a/js/lite/src/lite/publisher.ts +++ b/js/lite/src/lite/publisher.ts @@ -8,7 +8,7 @@ import { error } from "../util/error.ts"; import { Announce, AnnounceInit, type AnnounceInterest } from "./announce.ts"; import { Group as GroupMessage } from "./group.ts"; import { type Subscribe, SubscribeOk, SubscribeUpdate } from "./subscribe.ts"; -import type { Version } from "./version.ts"; +import { Version } from "./version.ts"; /** * Handles publishing broadcasts and managing their lifecycle. @@ -64,7 +64,7 @@ export class Publisher { async runAnnounce(msg: AnnounceInterest, stream: Stream) { console.debug(`announce: prefix=${msg.prefix}`); - // Send ANNOUNCE_INIT as the first message with all currently active paths + // Send initial announcements let active = new Set(); const broadcasts = this.#broadcasts.peek(); @@ -77,8 +77,21 @@ export class Publisher { active.add(suffix); } - const init = new AnnounceInit([...active]); - await init.encode(stream.writer); + switch (this.version) { + case Version.DRAFT_03: + // Draft03: send individual Announce messages for initial state. + for (const suffix of active) { + const wire = new Announce(suffix, true); + await wire.encode(stream.writer, this.version); + } + break; + case Version.DRAFT_01: + case Version.DRAFT_02: { + const init = new AnnounceInit([...active]); + await init.encode(stream.writer); + break; + } + } // Wait for updates to the broadcasts. for (;;) { @@ -106,14 +119,14 @@ export class Publisher { for (const added of newActive.difference(active)) { console.debug(`announce: broadcast=${added} active=true`); const wire = new Announce(added, true); - await wire.encode(stream.writer); + await wire.encode(stream.writer, this.version); } // Announce any removed broadcasts. for (const removed of active.difference(newActive)) { console.debug(`announce: broadcast=${removed} active=false`); const wire = new Announce(removed, false); - await wire.encode(stream.writer); + await wire.encode(stream.writer, this.version); } // NOTE: This is kind of a hack that won't work with a rapid UNANNOUNCE/ANNOUNCE cycle. @@ -149,7 +162,7 @@ export class Publisher { const serving = this.#runTrack(msg.id, msg.broadcast, track, stream.writer); for (;;) { - const decode = SubscribeUpdate.decodeMaybe(stream.reader); + const decode = SubscribeUpdate.decodeMaybe(stream.reader, this.version); const result = await Promise.any([serving, decode]); if (!result) break; diff --git a/js/lite/src/lite/subscribe.ts b/js/lite/src/lite/subscribe.ts index fb68ddb62..030fb7256 100644 --- a/js/lite/src/lite/subscribe.ts +++ b/js/lite/src/lite/subscribe.ts @@ -3,6 +3,10 @@ import type { Reader, Writer } from "../stream.ts"; import * as Message from "./message.ts"; import { Version } from "./version.ts"; +function unreachable(v: never): never { + throw new Error(`unsupported version: ${v}`); +} + export class SubscribeUpdate { priority: number; ordered: boolean; @@ -24,37 +28,51 @@ export class SubscribeUpdate { this.endGroup = endGroup; } - async #encode(w: Writer, version?: Version) { - await w.u8(this.priority); - if (version === Version.DRAFT_03) { - await w.bool(this.ordered); - await w.u53(this.maxLatency); - await w.u53(this.startGroup); - await w.u53(this.endGroup); + async #encode(w: Writer, version: Version) { + switch (version) { + case Version.DRAFT_03: + await w.u8(this.priority); + await w.bool(this.ordered); + await w.u53(this.maxLatency); + await w.u53(this.startGroup); + await w.u53(this.endGroup); + break; + case Version.DRAFT_01: + case Version.DRAFT_02: + await w.u8(this.priority); + break; + default: + unreachable(version); } } - static async #decode(r: Reader, version?: Version): Promise { - const priority = await r.u8(); - if (version === Version.DRAFT_03) { - const ordered = await r.bool(); - const maxLatency = await r.u53(); - const startGroup = await r.u53(); - const endGroup = await r.u53(); - return new SubscribeUpdate(priority, ordered, maxLatency, startGroup, endGroup); + static async #decode(r: Reader, version: Version): Promise { + switch (version) { + case Version.DRAFT_03: { + const priority = await r.u8(); + const ordered = await r.bool(); + const maxLatency = await r.u53(); + const startGroup = await r.u53(); + const endGroup = await r.u53(); + return new SubscribeUpdate(priority, ordered, maxLatency, startGroup, endGroup); + } + case Version.DRAFT_01: + case Version.DRAFT_02: + return new SubscribeUpdate(await r.u8()); + default: + unreachable(version); } - return new SubscribeUpdate(priority); } - async encode(w: Writer, version?: Version): Promise { + async encode(w: Writer, version: Version): Promise { return Message.encode(w, (w) => this.#encode(w, version)); } - static async decode(r: Reader, version?: Version): Promise { + static async decode(r: Reader, version: Version): Promise { return Message.decode(r, (r) => SubscribeUpdate.#decode(r, version)); } - static async decodeMaybe(r: Reader, version?: Version): Promise { + static async decodeMaybe(r: Reader, version: Version): Promise { return Message.decodeMaybe(r, (r) => SubscribeUpdate.#decode(r, version)); } } @@ -89,39 +107,54 @@ export class Subscribe { this.endGroup = endGroup; } - async #encode(w: Writer, version?: Version) { + async #encode(w: Writer, version: Version) { await w.u62(this.id); await w.string(this.broadcast); await w.string(this.track); await w.u8(this.priority); - if (version === Version.DRAFT_03) { - await w.bool(this.ordered); - await w.u53(this.maxLatency); - await w.u53(this.startGroup); - await w.u53(this.endGroup); + + switch (version) { + case Version.DRAFT_03: + await w.bool(this.ordered); + await w.u53(this.maxLatency); + await w.u53(this.startGroup); + await w.u53(this.endGroup); + break; + case Version.DRAFT_01: + case Version.DRAFT_02: + break; + default: + unreachable(version); } } - static async #decode(r: Reader, version?: Version): Promise { + static async #decode(r: Reader, version: Version): Promise { const id = await r.u62(); const broadcast = Path.from(await r.string()); const track = await r.string(); const priority = await r.u8(); - if (version === Version.DRAFT_03) { - const ordered = await r.bool(); - const maxLatency = await r.u53(); - const startGroup = await r.u53(); - const endGroup = await r.u53(); - return new Subscribe(id, broadcast, track, priority, ordered, maxLatency, startGroup, endGroup); + + switch (version) { + case Version.DRAFT_03: { + const ordered = await r.bool(); + const maxLatency = await r.u53(); + const startGroup = await r.u53(); + const endGroup = await r.u53(); + return new Subscribe(id, broadcast, track, priority, ordered, maxLatency, startGroup, endGroup); + } + case Version.DRAFT_01: + case Version.DRAFT_02: + return new Subscribe(id, broadcast, track, priority); + default: + unreachable(version); } - return new Subscribe(id, broadcast, track, priority); } - async encode(w: Writer, version?: Version): Promise { + async encode(w: Writer, version: Version): Promise { return Message.encode(w, (w) => this.#encode(w, version)); } - static async decode(r: Reader, version?: Version): Promise { + static async decode(r: Reader, version: Version): Promise { return Message.decode(r, (r) => Subscribe.#decode(r, version)); } } @@ -159,16 +192,22 @@ export class SubscribeOk { } async #encode(w: Writer) { - if (this.version === Version.DRAFT_03) { - await w.u8(this.priority ?? 0); - await w.bool(this.ordered ?? true); - await w.u53(this.maxLatency ?? 0); - await w.u53(this.startGroup ?? 0); - await w.u53(this.endGroup ?? 0); - } else if (this.version === Version.DRAFT_02) { - // noop - } else if (this.version === Version.DRAFT_01) { - await w.u8(this.priority ?? 0); + switch (this.version) { + case Version.DRAFT_03: + await w.u8(this.priority ?? 0); + await w.bool(this.ordered ?? true); + await w.u53(this.maxLatency ?? 0); + await w.u53(this.startGroup ?? 0); + await w.u53(this.endGroup ?? 0); + break; + case Version.DRAFT_02: + // noop + break; + case Version.DRAFT_01: + await w.u8(this.priority ?? 0); + break; + default: + unreachable(this.version); } } @@ -179,16 +218,22 @@ export class SubscribeOk { let startGroup: number | undefined; let endGroup: number | undefined; - if (version === Version.DRAFT_03) { - priority = await r.u8(); - ordered = await r.bool(); - maxLatency = await r.u53(); - startGroup = await r.u53(); - endGroup = await r.u53(); - } else if (version === Version.DRAFT_02) { - // noop - } else if (version === Version.DRAFT_01) { - priority = await r.u8(); + switch (version) { + case Version.DRAFT_03: + priority = await r.u8(); + ordered = await r.bool(); + maxLatency = await r.u53(); + startGroup = await r.u53(); + endGroup = await r.u53(); + break; + case Version.DRAFT_02: + // noop + break; + case Version.DRAFT_01: + priority = await r.u8(); + break; + default: + unreachable(version); } return new SubscribeOk({ version, priority, ordered, maxLatency, startGroup, endGroup }); diff --git a/js/lite/src/lite/subscriber.ts b/js/lite/src/lite/subscriber.ts index 4fe755dad..73c2c31f1 100644 --- a/js/lite/src/lite/subscriber.ts +++ b/js/lite/src/lite/subscriber.ts @@ -9,7 +9,7 @@ import { Announce, AnnounceInit, AnnounceInterest } from "./announce.ts"; import type { Group as GroupMessage } from "./group.ts"; import { StreamId } from "./stream.ts"; import { Subscribe, SubscribeOk } from "./subscribe.ts"; -import type { Version } from "./version.ts"; +import { Version } from "./version.ts"; /** * Handles subscribing to broadcasts and managing their lifecycle. @@ -55,19 +55,31 @@ export class Subscriber { await stream.writer.u53(StreamId.Announce); await msg.encode(stream.writer); - // First, receive ANNOUNCE_INIT - const init = await AnnounceInit.decode(stream.reader); - - // Process initial announcements - for (const suffix of init.suffixes) { - const path = Path.join(prefix, suffix); - console.debug(`announced: broadcast=${path} active=true`); - announced.append({ path, active: true }); + switch (this.version) { + case Version.DRAFT_01: + case Version.DRAFT_02: { + // Receive ANNOUNCE_INIT first + const init = await AnnounceInit.decode(stream.reader); + + // Process initial announcements + for (const suffix of init.suffixes) { + const path = Path.join(prefix, suffix); + console.debug(`announced: broadcast=${path} active=true`); + announced.append({ path, active: true }); + } + break; + } + case Version.DRAFT_03: + // Draft03: no AnnounceInit, initial state comes via Announce messages. + break; } - // Then receive updates + // Receive announce updates (for Draft03, this includes initial state) for (;;) { - const announce = await Promise.race([Announce.decodeMaybe(stream.reader), announced.closed]); + const announce = await Promise.race([ + Announce.decodeMaybe(stream.reader, this.version), + announced.closed, + ]); if (!announce) break; if (announce instanceof Error) throw announce; @@ -115,7 +127,7 @@ export class Subscriber { const stream = await Stream.open(this.#quic); await stream.writer.u53(StreamId.Subscribe); - await msg.encode(stream.writer); + await msg.encode(stream.writer, this.version); try { await SubscribeOk.decode(stream.reader, this.version); diff --git a/rs/moq-lite/src/lite/announce.rs b/rs/moq-lite/src/lite/announce.rs index 8f0e55ca5..70bfe0933 100644 --- a/rs/moq-lite/src/lite/announce.rs +++ b/rs/moq-lite/src/lite/announce.rs @@ -14,34 +14,47 @@ pub enum Announce<'a> { Active { #[cfg_attr(feature = "serde", serde(borrow))] suffix: Path<'a>, + hops: u64, }, Ended { #[cfg_attr(feature = "serde", serde(borrow))] suffix: Path<'a>, + hops: u64, }, } impl Message for Announce<'_> { fn decode_msg(r: &mut R, version: Version) -> Result { - Ok(match AnnounceStatus::decode(r, version)? { - AnnounceStatus::Active => Self::Active { - suffix: Path::decode(r, version)?, - }, - AnnounceStatus::Ended => Self::Ended { - suffix: Path::decode(r, version)?, - }, + let status = AnnounceStatus::decode(r, version)?; + let suffix = Path::decode(r, version)?; + let hops = match version { + Version::Draft03 => u64::decode(r, version)?, + Version::Draft01 | Version::Draft02 => 0, + }; + + Ok(match status { + AnnounceStatus::Active => Self::Active { suffix, hops }, + AnnounceStatus::Ended => Self::Ended { suffix, hops }, }) } fn encode_msg(&self, w: &mut W, version: Version) { match self { - Self::Active { suffix } => { + Self::Active { suffix, hops } => { AnnounceStatus::Active.encode(w, version); suffix.encode(w, version); + match version { + Version::Draft03 => hops.encode(w, version), + Version::Draft01 | Version::Draft02 => {} + } } - Self::Ended { suffix } => { + Self::Ended { suffix, hops } => { AnnounceStatus::Ended.encode(w, version); suffix.encode(w, version); + match version { + Version::Draft03 => hops.encode(w, version), + Version::Draft01 | Version::Draft02 => {} + } } } } @@ -87,6 +100,8 @@ impl Encode for AnnounceStatus { } /// Sent after setup to communicate the initially announced paths. +/// +/// Used by Draft01/Draft02 only. Draft03 uses individual Announce messages instead. #[derive(Clone, Debug, PartialEq, Eq)] #[cfg_attr(feature = "serde", derive(serde::Serialize, serde::Deserialize))] pub struct AnnounceInit<'a> { diff --git a/rs/moq-lite/src/lite/publisher.rs b/rs/moq-lite/src/lite/publisher.rs index b36b8bbe0..35b10a057 100644 --- a/rs/moq-lite/src/lite/publisher.rs +++ b/rs/moq-lite/src/lite/publisher.rs @@ -60,8 +60,9 @@ impl Publisher { .consume_only(&[prefix.as_path()]) .ok_or(Error::Unauthorized)?; + let version = self.version; web_async::spawn(async move { - if let Err(err) = Self::run_announce(&mut stream, &mut origin, &prefix).await { + if let Err(err) = Self::run_announce(&mut stream, &mut origin, &prefix, version).await { match &err { Error::Cancel => { tracing::debug!(prefix = %origin.absolute(prefix), "announcing cancelled"); @@ -87,29 +88,54 @@ impl Publisher { stream: &mut Stream, origin: &mut OriginConsumer, prefix: impl AsPath, + version: Version, ) -> Result<(), Error> { let prefix = prefix.as_path(); - let mut init = Vec::new(); - // Send ANNOUNCE_INIT as the first message with all currently active paths - // We use `try_next()` to synchronously get the initial updates. - while let Some((path, active)) = origin.try_announced() { - let suffix = path.strip_prefix(&prefix).expect("origin returned invalid path"); + match version { + Version::Draft01 | Version::Draft02 => { + let mut init = Vec::new(); + + // Send ANNOUNCE_INIT as the first message with all currently active paths + // We use `try_next()` to synchronously get the initial updates. + while let Some((path, active)) = origin.try_announced() { + let suffix = path.strip_prefix(&prefix).expect("origin returned invalid path"); + + if active.is_some() { + tracing::debug!(broadcast = %origin.absolute(&path), "announce"); + init.push(suffix.to_owned()); + } else { + // A potential race. + tracing::debug!(broadcast = %origin.absolute(&path), "unannounce"); + init.retain(|path| path != &suffix); + } + } - if active.is_some() { - tracing::debug!(broadcast = %origin.absolute(&path), "announce"); - init.push(suffix.to_owned()); - } else { - // A potential race. - tracing::debug!(broadcast = %origin.absolute(&path), "unannounce"); - init.retain(|path| path != &suffix); + let announce_init = lite::AnnounceInit { suffixes: init }; + stream.writer.encode(&announce_init).await?; + } + Version::Draft03 => { + // Draft03: send individual Announce messages for initial state. + while let Some((path, active)) = origin.try_announced() { + let suffix = path + .strip_prefix(&prefix) + .expect("origin returned invalid path") + .to_owned(); + + if active.is_some() { + tracing::debug!(broadcast = %origin.absolute(&path), "announce"); + let msg = lite::Announce::Active { suffix, hops: 0 }; + stream.writer.encode(&msg).await?; + } else { + tracing::debug!(broadcast = %origin.absolute(&path), "unannounce"); + let msg = lite::Announce::Ended { suffix, hops: 0 }; + stream.writer.encode(&msg).await?; + } + } } } - let announce_init = lite::AnnounceInit { suffixes: init }; - stream.writer.encode(&announce_init).await?; - - // Flush any synchronously announced paths + // Send updates as they arrive. loop { tokio::select! { biased; @@ -121,11 +147,11 @@ impl Publisher { if active.is_some() { tracing::debug!(broadcast = %origin.absolute(&path), "announce"); - let msg = lite::Announce::Active { suffix }; + let msg = lite::Announce::Active { suffix, hops: 0 }; stream.writer.encode(&msg).await?; } else { tracing::debug!(broadcast = %origin.absolute(&path), "unannounce"); - let msg = lite::Announce::Ended { suffix }; + let msg = lite::Announce::Ended { suffix, hops: 0 }; stream.writer.encode(&msg).await?; } }, diff --git a/rs/moq-lite/src/lite/subscribe.rs b/rs/moq-lite/src/lite/subscribe.rs index 5289c019f..0c1c3c7b5 100644 --- a/rs/moq-lite/src/lite/subscribe.rs +++ b/rs/moq-lite/src/lite/subscribe.rs @@ -28,14 +28,15 @@ impl Message for Subscribe<'_> { let track = Cow::::decode(r, version)?; let priority = u8::decode(r, version)?; - let (ordered, max_latency, start_group, end_group) = if version == Version::Draft03 { - let ordered = u8::decode(r, version)? != 0; - let max_latency = u64::decode(r, version)?; - let start_group = u64::decode(r, version)?; - let end_group = u64::decode(r, version)?; - (ordered, max_latency, start_group, end_group) - } else { - (true, 0, 0, 0) + let (ordered, max_latency, start_group, end_group) = match version { + Version::Draft03 => { + let ordered = u8::decode(r, version)? != 0; + let max_latency = u64::decode(r, version)?; + let start_group = u64::decode(r, version)?; + let end_group = u64::decode(r, version)?; + (ordered, max_latency, start_group, end_group) + } + Version::Draft01 | Version::Draft02 => (true, 0, 0, 0), }; Ok(Self { @@ -56,11 +57,14 @@ impl Message for Subscribe<'_> { self.track.encode(w, version); self.priority.encode(w, version); - if version == Version::Draft03 { - (self.ordered as u8).encode(w, version); - self.max_latency.encode(w, version); - self.start_group.encode(w, version); - self.end_group.encode(w, version); + match version { + Version::Draft03 => { + (self.ordered as u8).encode(w, version); + self.max_latency.encode(w, version); + self.start_group.encode(w, version); + self.end_group.encode(w, version); + } + Version::Draft01 | Version::Draft02 => {} } } } @@ -76,46 +80,52 @@ pub struct SubscribeOk { impl Message for SubscribeOk { fn encode_msg(&self, w: &mut W, version: Version) { - if version == Version::Draft03 { - self.priority.encode(w, version); - (self.ordered as u8).encode(w, version); - self.max_latency.encode(w, version); - self.start_group.encode(w, version); - self.end_group.encode(w, version); - } else if version == Version::Draft01 { - self.priority.encode(w, version); + match version { + Version::Draft03 => { + self.priority.encode(w, version); + (self.ordered as u8).encode(w, version); + self.max_latency.encode(w, version); + self.start_group.encode(w, version); + self.end_group.encode(w, version); + } + Version::Draft01 => { + self.priority.encode(w, version); + } + Version::Draft02 => {} } } fn decode_msg(r: &mut R, version: Version) -> Result { - if version == Version::Draft03 { - let priority = u8::decode(r, version)?; - let ordered = u8::decode(r, version)? != 0; - let max_latency = u64::decode(r, version)?; - let start_group = u64::decode(r, version)?; - let end_group = u64::decode(r, version)?; - - Ok(Self { - priority, - ordered, - max_latency, - start_group, - end_group, - }) - } else { - let priority = if version == Version::Draft01 { - u8::decode(r, version)? - } else { - 0 - }; - - Ok(Self { - priority, + match version { + Version::Draft03 => { + let priority = u8::decode(r, version)?; + let ordered = u8::decode(r, version)? != 0; + let max_latency = u64::decode(r, version)?; + let start_group = u64::decode(r, version)?; + let end_group = u64::decode(r, version)?; + + Ok(Self { + priority, + ordered, + max_latency, + start_group, + end_group, + }) + } + Version::Draft01 => Ok(Self { + priority: u8::decode(r, version)?, + ordered: true, + max_latency: 0, + start_group: 0, + end_group: 0, + }), + Version::Draft02 => Ok(Self { + priority: 0, ordered: true, max_latency: 0, start_group: 0, end_group: 0, - }) + }), } } } diff --git a/rs/moq-lite/src/lite/subscriber.rs b/rs/moq-lite/src/lite/subscriber.rs index 871296dba..925e7d61d 100644 --- a/rs/moq-lite/src/lite/subscriber.rs +++ b/rs/moq-lite/src/lite/subscriber.rs @@ -91,19 +91,26 @@ impl Subscriber { let mut producers = HashMap::new(); - let msg: lite::AnnounceInit = stream.reader.decode().await?; - for path in msg.suffixes { - self.start_announce(path, &mut producers)?; + match self.version { + Version::Draft01 | Version::Draft02 => { + let msg: lite::AnnounceInit = stream.reader.decode().await?; + for path in msg.suffixes { + self.start_announce(path, &mut producers)?; + } + } + Version::Draft03 => { + // Draft03: no AnnounceInit, initial state comes via Announce messages. + } } let _ = init.send(()); while let Some(announce) = stream.reader.decode_maybe::().await? { match announce { - lite::Announce::Active { suffix: path } => { + lite::Announce::Active { suffix: path, .. } => { self.start_announce(path, &mut producers)?; } - lite::Announce::Ended { suffix: path } => { + lite::Announce::Ended { suffix: path, .. } => { tracing::debug!(broadcast = %self.log_path(&path), "unannounced"); // Close the producer. From 921fe5e50f4d3e605692d9920742c7faa0c58c82 Mon Sep 17 00:00:00 2001 From: Luke Curley Date: Tue, 24 Feb 2026 10:07:54 -0800 Subject: [PATCH 4/8] WIP --- rs/moq-lite/src/client.rs | 6 ++---- rs/moq-lite/src/ietf/session.rs | 2 +- rs/moq-lite/src/lite/publisher.rs | 18 +----------------- rs/moq-lite/src/lite/session.rs | 14 ++------------ rs/moq-lite/src/lite/subscriber.rs | 11 +++-------- rs/moq-lite/src/server.rs | 6 ++---- 6 files changed, 11 insertions(+), 46 deletions(-) diff --git a/rs/moq-lite/src/client.rs b/rs/moq-lite/src/client.rs index 3862c5492..1f22379ff 100644 --- a/rs/moq-lite/src/client.rs +++ b/rs/moq-lite/src/client.rs @@ -101,8 +101,7 @@ impl Client { self.publish.clone(), self.consume.clone(), version, - ) - .await?; + )?; } Version::Ietf(version) => { // Decode the parameters to get the initial request ID. @@ -122,8 +121,7 @@ impl Client { self.publish.clone(), self.consume.clone(), version, - ) - .await?; + )?; } } diff --git a/rs/moq-lite/src/ietf/session.rs b/rs/moq-lite/src/ietf/session.rs index 698db97f9..bf53a27cd 100644 --- a/rs/moq-lite/src/ietf/session.rs +++ b/rs/moq-lite/src/ietf/session.rs @@ -6,7 +6,7 @@ use crate::{ use super::{Publisher, Subscriber}; -pub(crate) async fn start( +pub(crate) fn start( session: S, setup: Stream, request_id_max: RequestId, diff --git a/rs/moq-lite/src/lite/publisher.rs b/rs/moq-lite/src/lite/publisher.rs index 35b10a057..b1bdb0d59 100644 --- a/rs/moq-lite/src/lite/publisher.rs +++ b/rs/moq-lite/src/lite/publisher.rs @@ -115,23 +115,7 @@ impl Publisher { stream.writer.encode(&announce_init).await?; } Version::Draft03 => { - // Draft03: send individual Announce messages for initial state. - while let Some((path, active)) = origin.try_announced() { - let suffix = path - .strip_prefix(&prefix) - .expect("origin returned invalid path") - .to_owned(); - - if active.is_some() { - tracing::debug!(broadcast = %origin.absolute(&path), "announce"); - let msg = lite::Announce::Active { suffix, hops: 0 }; - stream.writer.encode(&msg).await?; - } else { - tracing::debug!(broadcast = %origin.absolute(&path), "unannounce"); - let msg = lite::Announce::Ended { suffix, hops: 0 }; - stream.writer.encode(&msg).await?; - } - } + // No more announce init in Draft03. } } diff --git a/rs/moq-lite/src/lite/session.rs b/rs/moq-lite/src/lite/session.rs index 79226e398..27e8d13b5 100644 --- a/rs/moq-lite/src/lite/session.rs +++ b/rs/moq-lite/src/lite/session.rs @@ -1,5 +1,3 @@ -use tokio::sync::oneshot; - use crate::{ Error, OriginConsumer, OriginProducer, coding::Stream, @@ -8,7 +6,7 @@ use crate::{ use super::{Publisher, Subscriber}; -pub(crate) async fn start( +pub(crate) fn start( session: S, // The stream used to setup the session, after exchanging setup messages. setup: Stream, @@ -22,13 +20,11 @@ pub(crate) async fn start( let publisher = Publisher::new(session.clone(), publish, version); let subscriber = Subscriber::new(session.clone(), subscribe, version); - let init = oneshot::channel(); - web_async::spawn(async move { let res = tokio::select! { res = run_session(setup) => res, res = publisher.run() => res, - res = subscriber.run(init.0) => res, + res = subscriber.run() => res, }; match res { @@ -47,12 +43,6 @@ pub(crate) async fn start( } }); - // Wait until receiving the initial announcements to prevent some race conditions. - // Otherwise, `consume()` might return not found if we don't wait long enough, so just wait. - // If the announce stream fails or is closed, this will return an error instead of hanging. - // TODO return a better error - init.1.await.map_err(|_| Error::Cancel)?; - Ok(()) } diff --git a/rs/moq-lite/src/lite/subscriber.rs b/rs/moq-lite/src/lite/subscriber.rs index 925e7d61d..3bd992ab5 100644 --- a/rs/moq-lite/src/lite/subscriber.rs +++ b/rs/moq-lite/src/lite/subscriber.rs @@ -11,7 +11,6 @@ use crate::{ model::BroadcastProducer, }; -use tokio::sync::oneshot; use web_async::Lock; #[derive(Clone)] @@ -35,10 +34,9 @@ impl Subscriber { } } - /// Send a signal when the subscriber is initialized. - pub async fn run(self, init: oneshot::Sender<()>) -> Result<(), Error> { + pub async fn run(self) -> Result<(), Error> { tokio::select! { - Err(err) = self.clone().run_announce(init) => Err(err), + Err(err) = self.clone().run_announce() => Err(err), res = self.run_uni() => res, } } @@ -72,10 +70,9 @@ impl Subscriber { Ok(()) } - async fn run_announce(mut self, init: oneshot::Sender<()>) -> Result<(), Error> { + async fn run_announce(mut self) -> Result<(), Error> { if self.origin.is_none() { // Don't do anything if there's no origin configured. - let _ = init.send(()); return Ok(()); } @@ -103,8 +100,6 @@ impl Subscriber { } } - let _ = init.send(()); - while let Some(announce) = stream.reader.decode_maybe::().await? { match announce { lite::Announce::Active { suffix: path, .. } => { diff --git a/rs/moq-lite/src/server.rs b/rs/moq-lite/src/server.rs index 6789d377d..ab1ada264 100644 --- a/rs/moq-lite/src/server.rs +++ b/rs/moq-lite/src/server.rs @@ -101,8 +101,7 @@ impl Server { self.publish.clone(), self.consume.clone(), version, - ) - .await?; + )?; } Version::Ietf(version) => { // Decode the client's parameters to get their max request ID. @@ -119,8 +118,7 @@ impl Server { self.publish.clone(), self.consume.clone(), version, - ) - .await?; + )?; } }; From 54859a277169ae1713053ce21e56089fa026e9dd Mon Sep 17 00:00:00 2001 From: Luke Curley Date: Tue, 24 Feb 2026 10:47:11 -0800 Subject: [PATCH 5/8] REview the AI stuff. --- js/lite/src/connection/connect.ts | 10 +-- js/lite/src/ietf/publish.ts | 7 +- js/lite/src/ietf/publisher.ts | 5 +- js/lite/src/ietf/setup.ts | 13 ++-- js/lite/src/ietf/subscribe.ts | 13 ++-- js/lite/src/ietf/subscriber.ts | 5 +- js/lite/src/lite/announce.ts | 18 ++--- js/lite/src/lite/connection.ts | 9 ++- js/lite/src/lite/publisher.ts | 10 +-- js/lite/src/lite/subscribe.ts | 102 +++++++++++++++-------------- js/lite/src/lite/subscriber.ts | 2 +- rs/moq-lite/src/client.rs | 25 +++++-- rs/moq-lite/src/coding/decode.rs | 16 +++++ rs/moq-lite/src/coding/encode.rs | 16 +++++ rs/moq-lite/src/lite/fetch.rs | 14 ++++ rs/moq-lite/src/lite/group.rs | 14 ++++ rs/moq-lite/src/lite/info.rs | 10 +++ rs/moq-lite/src/lite/probe.rs | 14 ++++ rs/moq-lite/src/lite/publisher.rs | 8 +-- rs/moq-lite/src/lite/session.rs | 15 +++-- rs/moq-lite/src/lite/subscribe.rs | 82 +++++++++++++++-------- rs/moq-lite/src/lite/subscriber.rs | 6 +- rs/moq-lite/src/server.rs | 25 +++++-- rs/moq-lite/src/setup.rs | 28 ++++---- rs/moq-lite/src/version.rs | 2 +- 25 files changed, 302 insertions(+), 167 deletions(-) diff --git a/js/lite/src/connection/connect.ts b/js/lite/src/connection/connect.ts index 6f9bd6f4a..a042143cf 100644 --- a/js/lite/src/connection/connect.ts +++ b/js/lite/src/connection/connect.ts @@ -70,9 +70,6 @@ export async function connect(url: URL, props?: ConnectProps): Promise { diff --git a/js/lite/src/lite/connection.ts b/js/lite/src/lite/connection.ts index 89efbc047..4fc524e77 100644 --- a/js/lite/src/lite/connection.ts +++ b/js/lite/src/lite/connection.ts @@ -28,7 +28,7 @@ export class Connection implements Established { #quic: WebTransport; // Use to receive/send session messages. - #session: Stream; + #session?: Stream; // Module for contributing tracks. #publisher: Publisher; @@ -47,7 +47,7 @@ export class Connection implements Established { * * @internal */ - constructor(url: URL, quic: WebTransport, session: Stream, version: Version) { + constructor(url: URL, quic: WebTransport, version: Version, session?: Stream) { this.url = url; this.#quic = quic; this.#session = session; @@ -123,6 +123,11 @@ export class Connection implements Established { } async #runSession() { + if (!this.#session) { + // moq-lite draft-03 doesn't use a session stream. + return; + } + try { // Receive messages until the connection is closed. for (;;) { diff --git a/js/lite/src/lite/publisher.ts b/js/lite/src/lite/publisher.ts index 6e8e5cfc3..9f19a97ea 100644 --- a/js/lite/src/lite/publisher.ts +++ b/js/lite/src/lite/publisher.ts @@ -81,7 +81,7 @@ export class Publisher { case Version.DRAFT_03: // Draft03: send individual Announce messages for initial state. for (const suffix of active) { - const wire = new Announce(suffix, true); + const wire = new Announce({ suffix, active: true }); await wire.encode(stream.writer, this.version); } break; @@ -118,14 +118,14 @@ export class Publisher { // Announce any new broadcasts. for (const added of newActive.difference(active)) { console.debug(`announce: broadcast=${added} active=true`); - const wire = new Announce(added, true); + const wire = new Announce({ suffix: added, active: true }); await wire.encode(stream.writer, this.version); } // Announce any removed broadcasts. for (const removed of active.difference(newActive)) { console.debug(`announce: broadcast=${removed} active=false`); - const wire = new Announce(removed, false); + const wire = new Announce({ suffix: removed, active: false }); await wire.encode(stream.writer, this.version); } @@ -154,8 +154,8 @@ export class Publisher { const track = broadcast.subscribe(msg.track, msg.priority); try { - const info = new SubscribeOk({ version: this.version, priority: msg.priority }); - await info.encode(stream.writer); + const info = new SubscribeOk({ priority: msg.priority }); + await info.encode(stream.writer, this.version); console.debug(`publish ok: broadcast=${msg.broadcast} track=${track.name}`); diff --git a/js/lite/src/lite/subscribe.ts b/js/lite/src/lite/subscribe.ts index 030fb7256..b09a6fedd 100644 --- a/js/lite/src/lite/subscribe.ts +++ b/js/lite/src/lite/subscribe.ts @@ -1,12 +1,9 @@ import * as Path from "../path.ts"; import type { Reader, Writer } from "../stream.ts"; +import { unreachable } from "../util/error.ts"; import * as Message from "./message.ts"; import { Version } from "./version.ts"; -function unreachable(v: never): never { - throw new Error(`unsupported version: ${v}`); -} - export class SubscribeUpdate { priority: number; ordered: boolean; @@ -84,27 +81,28 @@ export class Subscribe { priority: number; ordered: boolean; maxLatency: number; - startGroup: number; - endGroup: number; - constructor( - id: bigint, - broadcast: Path.Valid, - track: string, - priority: number, - ordered: boolean = true, - maxLatency: number = 0, - startGroup: number = 0, - endGroup: number = 0, - ) { - this.id = id; - this.broadcast = broadcast; - this.track = track; - this.priority = priority; - this.ordered = ordered; - this.maxLatency = maxLatency; - this.startGroup = startGroup; - this.endGroup = endGroup; + startGroup?: number; + endGroup?: number; + + constructor(props: { + id: bigint; + broadcast: Path.Valid; + track: string; + priority: number; + ordered?: boolean; + maxLatency?: number; + startGroup?: number; + endGroup?: number; + }) { + this.id = props.id; + this.broadcast = props.broadcast; + this.track = props.track; + this.priority = props.priority; + this.ordered = props.ordered ?? false; + this.maxLatency = props.maxLatency ?? 0; + this.startGroup = props.startGroup; + this.endGroup = props.endGroup; } async #encode(w: Writer, version: Version) { @@ -117,8 +115,8 @@ export class Subscribe { case Version.DRAFT_03: await w.bool(this.ordered); await w.u53(this.maxLatency); - await w.u53(this.startGroup); - await w.u53(this.endGroup); + await w.u53(this.startGroup ? this.startGroup + 1 : 0); + await w.u53(this.endGroup ? this.endGroup + 1 : 0); break; case Version.DRAFT_01: case Version.DRAFT_02: @@ -140,11 +138,20 @@ export class Subscribe { const maxLatency = await r.u53(); const startGroup = await r.u53(); const endGroup = await r.u53(); - return new Subscribe(id, broadcast, track, priority, ordered, maxLatency, startGroup, endGroup); + return new Subscribe({ + id, + broadcast, + track, + priority, + ordered, + maxLatency, + startGroup: startGroup > 0 ? startGroup - 1 : undefined, + endGroup: endGroup > 0 ? endGroup - 1 : undefined, + }); } case Version.DRAFT_01: case Version.DRAFT_02: - return new Subscribe(id, broadcast, track, priority); + return new Subscribe({ id, broadcast, track, priority }); default: unreachable(version); } @@ -160,30 +167,25 @@ export class Subscribe { } export class SubscribeOk { - // The version - readonly version: Version; - priority?: number; - ordered?: boolean; - maxLatency?: number; + priority: number; + ordered: boolean; + maxLatency: number; startGroup?: number; endGroup?: number; constructor({ - version, - priority = undefined, - ordered = undefined, - maxLatency = undefined, + priority = 0, + ordered = true, + maxLatency = 0, startGroup = undefined, endGroup = undefined, }: { - version: Version; priority?: number; ordered?: boolean; maxLatency?: number; startGroup?: number; endGroup?: number; }) { - this.version = version; this.priority = priority; this.ordered = ordered; this.maxLatency = maxLatency; @@ -191,14 +193,14 @@ export class SubscribeOk { this.endGroup = endGroup; } - async #encode(w: Writer) { - switch (this.version) { + async #encode(w: Writer, version: Version) { + switch (version) { case Version.DRAFT_03: - await w.u8(this.priority ?? 0); - await w.bool(this.ordered ?? true); - await w.u53(this.maxLatency ?? 0); - await w.u53(this.startGroup ?? 0); - await w.u53(this.endGroup ?? 0); + await w.u8(this.priority); + await w.bool(this.ordered); + await w.u53(this.maxLatency); + await w.u53(this.startGroup ? this.startGroup + 1 : 0); + await w.u53(this.endGroup ? this.endGroup + 1 : 0); break; case Version.DRAFT_02: // noop @@ -207,7 +209,7 @@ export class SubscribeOk { await w.u8(this.priority ?? 0); break; default: - unreachable(this.version); + unreachable(version); } } @@ -236,11 +238,11 @@ export class SubscribeOk { unreachable(version); } - return new SubscribeOk({ version, priority, ordered, maxLatency, startGroup, endGroup }); + return new SubscribeOk({ priority, ordered, maxLatency, startGroup, endGroup }); } - async encode(w: Writer): Promise { - return Message.encode(w, this.#encode.bind(this)); + async encode(w: Writer, version: Version): Promise { + return Message.encode(w, (w) => this.#encode(w, version)); } static async decode(r: Reader, version: Version): Promise { diff --git a/js/lite/src/lite/subscriber.ts b/js/lite/src/lite/subscriber.ts index 73c2c31f1..386921a3a 100644 --- a/js/lite/src/lite/subscriber.ts +++ b/js/lite/src/lite/subscriber.ts @@ -123,7 +123,7 @@ export class Subscriber { console.debug(`subscribe start: id=${id} broadcast=${broadcast} track=${request.track.name}`); - const msg = new Subscribe(id, broadcast, request.track.name, request.priority); + const msg = new Subscribe({ id, broadcast, track: request.track.name, priority: request.priority }); const stream = await Stream.open(this.#quic); await stream.writer.u53(StreamId.Subscribe); diff --git a/rs/moq-lite/src/client.rs b/rs/moq-lite/src/client.rs index 1f22379ff..71a059309 100644 --- a/rs/moq-lite/src/client.rs +++ b/rs/moq-lite/src/client.rs @@ -46,20 +46,33 @@ impl Client { // If ALPN was used to negotiate the version, use the appropriate encoding. // Default to IETF 14 if no ALPN was used and we'll negotiate the version later. let (encoding, supported) = match session.protocol() { - Some(p) if p == ietf::ALPN_16 => ( + Some(ietf::ALPN_16) => ( Version::Ietf(ietf::Version::Draft16), vec![ietf::Version::Draft16.into()], ), - Some(p) if p == ietf::ALPN_15 => ( + Some(ietf::ALPN_15) => ( Version::Ietf(ietf::Version::Draft15), vec![ietf::Version::Draft15.into()], ), - Some(p) if p == ietf::ALPN_14 => ( + Some(ietf::ALPN_14) => ( Version::Ietf(ietf::Version::Draft14), vec![ietf::Version::Draft14.into()], ), - Some(p) if p == lite::ALPN => (Version::Ietf(ietf::Version::Draft14), NEGOTIATED.to_vec()), - None => (Version::Ietf(ietf::Version::Draft14), NEGOTIATED.to_vec()), + Some(lite::ALPN_03) => { + // Starting with draft-03, there's no more SETUP control stream. + lite::start( + session.clone(), + None, + self.publish.clone(), + self.consume.clone(), + lite::Version::Draft03, + )?; + + tracing::debug!(version = ?lite::Version::Draft03, "connected"); + + return Ok(Session::new(session)); + } + Some(lite::ALPN) | None => (Version::Ietf(ietf::Version::Draft14), NEGOTIATED.to_vec()), Some(p) => return Err(Error::UnknownAlpn(p.to_string())), }; @@ -97,7 +110,7 @@ impl Client { let stream = stream.with_version(version); lite::start( session.clone(), - stream, + Some(stream), self.publish.clone(), self.consume.clone(), version, diff --git a/rs/moq-lite/src/coding/decode.rs b/rs/moq-lite/src/coding/decode.rs index 8467380fa..13d12152e 100644 --- a/rs/moq-lite/src/coding/decode.rs +++ b/rs/moq-lite/src/coding/decode.rs @@ -135,3 +135,19 @@ impl Decode for Cow<'_, str> { Ok(Cow::Owned(s)) } } + +impl Decode for Option { + fn decode(r: &mut R, version: V) -> Result { + match u64::decode(r, version)? { + 0 => Ok(None), + value => Ok(Some(value - 1)), + } + } +} + +impl Decode for std::time::Duration { + fn decode(r: &mut R, version: V) -> Result { + let value = u64::decode(r, version)?; + Ok(Self::from_millis(value)) + } +} diff --git a/rs/moq-lite/src/coding/encode.rs b/rs/moq-lite/src/coding/encode.rs index 3fc4fb69a..e7cf13f9f 100644 --- a/rs/moq-lite/src/coding/encode.rs +++ b/rs/moq-lite/src/coding/encode.rs @@ -94,3 +94,19 @@ impl Encode for Cow<'_, str> { w.put(self.as_bytes()); } } + +impl Encode for Option { + fn encode(&self, w: &mut W, version: V) { + match self { + Some(value) => (value + 1).encode(w, version), + None => 0u64.encode(w, version), + } + } +} + +impl Encode for std::time::Duration { + fn encode(&self, w: &mut W, version: V) { + // TODO Make encoding fallable. + (self.as_millis() as u64).encode(w, version) + } +} diff --git a/rs/moq-lite/src/lite/fetch.rs b/rs/moq-lite/src/lite/fetch.rs index fd0c6efb4..977d60afe 100644 --- a/rs/moq-lite/src/lite/fetch.rs +++ b/rs/moq-lite/src/lite/fetch.rs @@ -20,6 +20,13 @@ pub struct Fetch<'a> { impl Message for Fetch<'_> { fn decode_msg(r: &mut R, version: Version) -> Result { + match version { + Version::Draft01 | Version::Draft02 => { + unreachable!("fetch not supported for version: {:?}", version); + } + Version::Draft03 => {} + } + let broadcast = Path::decode(r, version)?; let track = Cow::::decode(r, version)?; let priority = u8::decode(r, version)?; @@ -34,6 +41,13 @@ impl Message for Fetch<'_> { } fn encode_msg(&self, w: &mut W, version: Version) { + match version { + Version::Draft01 | Version::Draft02 => { + unreachable!("fetch not supported for version: {:?}", version); + } + Version::Draft03 => {} + } + self.broadcast.encode(w, version); self.track.encode(w, version); self.priority.encode(w, version); diff --git a/rs/moq-lite/src/lite/group.rs b/rs/moq-lite/src/lite/group.rs index f3b1de0c7..eda1a2d34 100644 --- a/rs/moq-lite/src/lite/group.rs +++ b/rs/moq-lite/src/lite/group.rs @@ -39,6 +39,13 @@ pub struct GroupDrop { impl Message for GroupDrop { fn decode_msg(r: &mut R, version: Version) -> Result { + match version { + Version::Draft01 | Version::Draft02 => { + unreachable!("group drop not supported for version: {:?}", version); + } + Version::Draft03 => {} + } + Ok(Self { sequence: u64::decode(r, version)?, count: u64::decode(r, version)?, @@ -47,6 +54,13 @@ impl Message for GroupDrop { } fn encode_msg(&self, w: &mut W, version: Version) { + match version { + Version::Draft01 | Version::Draft02 => { + unreachable!("group drop not supported for version: {:?}", version); + } + Version::Draft03 => {} + } + self.sequence.encode(w, version); self.count.encode(w, version); self.error.encode(w, version); diff --git a/rs/moq-lite/src/lite/info.rs b/rs/moq-lite/src/lite/info.rs index c004cd01d..07d416870 100644 --- a/rs/moq-lite/src/lite/info.rs +++ b/rs/moq-lite/src/lite/info.rs @@ -10,6 +10,11 @@ pub struct SessionInfo { impl Message for SessionInfo { fn decode_msg(r: &mut R, version: Version) -> Result { + match version { + Version::Draft01 | Version::Draft02 => {} + Version::Draft03 => unreachable!("session info not supported for version: {:?}", version), + } + let bitrate = match u64::decode(r, version)? { 0 => None, bitrate => Some(bitrate), @@ -19,6 +24,11 @@ impl Message for SessionInfo { } fn encode_msg(&self, w: &mut W, version: Version) { + match version { + Version::Draft01 | Version::Draft02 => {} + Version::Draft03 => unreachable!("session info not supported for version: {:?}", version), + } + self.bitrate.unwrap_or(0).encode(w, version); } } diff --git a/rs/moq-lite/src/lite/probe.rs b/rs/moq-lite/src/lite/probe.rs index 4612df424..e67aff16a 100644 --- a/rs/moq-lite/src/lite/probe.rs +++ b/rs/moq-lite/src/lite/probe.rs @@ -14,12 +14,26 @@ pub struct Probe { impl Message for Probe { fn decode_msg(r: &mut R, version: Version) -> Result { + match version { + Version::Draft01 | Version::Draft02 => { + unreachable!("probe not supported for version: {:?}", version); + } + Version::Draft03 => {} + } + let bitrate = u64::decode(r, version)?; Ok(Self { bitrate }) } fn encode_msg(&self, w: &mut W, version: Version) { + match version { + Version::Draft01 | Version::Draft02 => { + unreachable!("probe not supported for version: {:?}", version); + } + Version::Draft03 => {} + } + self.bitrate.encode(w, version); } } diff --git a/rs/moq-lite/src/lite/publisher.rs b/rs/moq-lite/src/lite/publisher.rs index b1bdb0d59..79d793e25 100644 --- a/rs/moq-lite/src/lite/publisher.rs +++ b/rs/moq-lite/src/lite/publisher.rs @@ -204,10 +204,10 @@ impl Publisher { let info = lite::SubscribeOk { priority: track.info.priority, - ordered: true, - max_latency: 0, - start_group: 0, - end_group: 0, + ordered: false, + max_latency: std::time::Duration::ZERO, + start_group: None, + end_group: None, }; stream.writer.encode(&info).await?; diff --git a/rs/moq-lite/src/lite/session.rs b/rs/moq-lite/src/lite/session.rs index 27e8d13b5..15523183e 100644 --- a/rs/moq-lite/src/lite/session.rs +++ b/rs/moq-lite/src/lite/session.rs @@ -9,7 +9,8 @@ use super::{Publisher, Subscriber}; pub(crate) fn start( session: S, // The stream used to setup the session, after exchanging setup messages. - setup: Stream, + // NOTE: No longer used in draft-03. + setup: Option>, // We will publish any local broadcasts from this origin. publish: Option, // We will consume any remote broadcasts, inserting them into this origin. @@ -22,7 +23,7 @@ pub(crate) fn start( web_async::spawn(async move { let res = tokio::select! { - res = run_session(setup) => res, + Err(res) = run_session(setup) => Err(res), res = publisher.run() => res, res = subscriber.run() => res, }; @@ -47,7 +48,11 @@ pub(crate) fn start( } // TODO do something useful with this -async fn run_session(mut stream: Stream) -> Result<(), Error> { - while let Some(_info) = stream.reader.decode_maybe::().await? {} - Err(Error::Cancel) +async fn run_session(stream: Option>) -> Result<(), Error> { + if let Some(mut stream) = stream { + while let Some(_info) = stream.reader.decode_maybe::().await? {} + return Err(Error::Cancel); + } + + Ok(()) } diff --git a/rs/moq-lite/src/lite/subscribe.rs b/rs/moq-lite/src/lite/subscribe.rs index 0c1c3c7b5..539b885e7 100644 --- a/rs/moq-lite/src/lite/subscribe.rs +++ b/rs/moq-lite/src/lite/subscribe.rs @@ -16,9 +16,9 @@ pub struct Subscribe<'a> { pub track: Cow<'a, str>, pub priority: u8, pub ordered: bool, - pub max_latency: u64, - pub start_group: u64, - pub end_group: u64, + pub max_latency: std::time::Duration, + pub start_group: Option, + pub end_group: Option, } impl Message for Subscribe<'_> { @@ -31,12 +31,12 @@ impl Message for Subscribe<'_> { let (ordered, max_latency, start_group, end_group) = match version { Version::Draft03 => { let ordered = u8::decode(r, version)? != 0; - let max_latency = u64::decode(r, version)?; - let start_group = u64::decode(r, version)?; - let end_group = u64::decode(r, version)?; + let max_latency = std::time::Duration::decode(r, version)?; + let start_group = Option::::decode(r, version)?; + let end_group = Option::::decode(r, version)?; (ordered, max_latency, start_group, end_group) } - Version::Draft01 | Version::Draft02 => (true, 0, 0, 0), + Version::Draft01 | Version::Draft02 => (false, std::time::Duration::ZERO, None, None), }; Ok(Self { @@ -73,9 +73,9 @@ impl Message for Subscribe<'_> { pub struct SubscribeOk { pub priority: u8, pub ordered: bool, - pub max_latency: u64, - pub start_group: u64, - pub end_group: u64, + pub max_latency: std::time::Duration, + pub start_group: Option, + pub end_group: Option, } impl Message for SubscribeOk { @@ -100,9 +100,9 @@ impl Message for SubscribeOk { Version::Draft03 => { let priority = u8::decode(r, version)?; let ordered = u8::decode(r, version)? != 0; - let max_latency = u64::decode(r, version)?; - let start_group = u64::decode(r, version)?; - let end_group = u64::decode(r, version)?; + let max_latency = std::time::Duration::decode(r, version)?; + let start_group = Option::::decode(r, version)?; + let end_group = Option::::decode(r, version)?; Ok(Self { priority, @@ -114,17 +114,17 @@ impl Message for SubscribeOk { } Version::Draft01 => Ok(Self { priority: u8::decode(r, version)?, - ordered: true, - max_latency: 0, - start_group: 0, - end_group: 0, + ordered: false, + max_latency: std::time::Duration::ZERO, + start_group: None, + end_group: None, }), Version::Draft02 => Ok(Self { priority: 0, - ordered: true, - max_latency: 0, - start_group: 0, - end_group: 0, + ordered: false, + max_latency: std::time::Duration::ZERO, + start_group: None, + end_group: None, }), } } @@ -139,17 +139,30 @@ pub struct SubscribeUpdate { pub priority: u8, pub ordered: bool, pub max_latency: u64, - pub start_group: u64, - pub end_group: u64, + pub start_group: Option, + pub end_group: Option, } impl Message for SubscribeUpdate { fn decode_msg(r: &mut R, version: Version) -> Result { + match version { + Version::Draft01 | Version::Draft02 => { + unreachable!("subscribe update not supported for version: {:?}", version); + } + Version::Draft03 => {} + } + let priority = u8::decode(r, version)?; let ordered = u8::decode(r, version)? != 0; let max_latency = u64::decode(r, version)?; - let start_group = u64::decode(r, version)?; - let end_group = u64::decode(r, version)?; + let start_group = match u64::decode(r, version)? { + 0 => None, + group => Some(group - 1), + }; + let end_group = match u64::decode(r, version)? { + 0 => None, + group => Some(group - 1), + }; Ok(Self { priority, @@ -161,10 +174,25 @@ impl Message for SubscribeUpdate { } fn encode_msg(&self, w: &mut W, version: Version) { + match version { + Version::Draft01 | Version::Draft02 => { + unreachable!("subscribe update not supported for version: {:?}", version); + } + Version::Draft03 => {} + } + self.priority.encode(w, version); (self.ordered as u8).encode(w, version); self.max_latency.encode(w, version); - self.start_group.encode(w, version); - self.end_group.encode(w, version); + + match self.start_group { + Some(start_group) => (start_group + 1).encode(w, version), + None => 0u64.encode(w, version), + } + + match self.end_group { + Some(end_group) => (end_group + 1).encode(w, version), + None => 0u64.encode(w, version), + } } } diff --git a/rs/moq-lite/src/lite/subscriber.rs b/rs/moq-lite/src/lite/subscriber.rs index 3bd992ab5..bb1d22c6f 100644 --- a/rs/moq-lite/src/lite/subscriber.rs +++ b/rs/moq-lite/src/lite/subscriber.rs @@ -180,9 +180,9 @@ impl Subscriber { track: (&track.info.name).into(), priority: track.info.priority, ordered: true, - max_latency: 0, - start_group: 0, - end_group: 0, + max_latency: std::time::Duration::ZERO, + start_group: None, + end_group: None, }; tracing::info!(id, broadcast = %self.log_path(&broadcast), track = %track.info.name, "subscribe started"); diff --git a/rs/moq-lite/src/server.rs b/rs/moq-lite/src/server.rs index ab1ada264..def979da9 100644 --- a/rs/moq-lite/src/server.rs +++ b/rs/moq-lite/src/server.rs @@ -44,20 +44,33 @@ impl Server { } let (encoding, supported) = match session.protocol() { - Some(p) if p == ietf::ALPN_16 => ( + Some(ietf::ALPN_16) => ( Version::Ietf(ietf::Version::Draft16), vec![ietf::Version::Draft16.into()], ), - Some(p) if p == ietf::ALPN_15 => ( + Some(ietf::ALPN_15) => ( Version::Ietf(ietf::Version::Draft15), vec![ietf::Version::Draft15.into()], ), - Some(p) if p == ietf::ALPN_14 => ( + Some(ietf::ALPN_14) => ( Version::Ietf(ietf::Version::Draft14), vec![ietf::Version::Draft14.into()], ), - Some(p) if p == lite::ALPN => (Version::Ietf(ietf::Version::Draft14), NEGOTIATED.to_vec()), - None => (Version::Ietf(ietf::Version::Draft14), NEGOTIATED.to_vec()), + Some(lite::ALPN_03) => { + // Starting with draft-03, there's no more SETUP control stream. + lite::start( + session.clone(), + None, + self.publish.clone(), + self.consume.clone(), + lite::Version::Draft03, + )?; + + tracing::debug!(version = ?lite::Version::Draft03, "connected"); + + return Ok(Session::new(session)); + } + Some(lite::ALPN) | None => (Version::Ietf(ietf::Version::Draft14), NEGOTIATED.to_vec()), Some(p) => return Err(Error::UnknownAlpn(p.to_string())), }; @@ -97,7 +110,7 @@ impl Server { let stream = stream.with_version(version); lite::start( session.clone(), - stream, + Some(stream), self.publish.clone(), self.consume.clone(), version, diff --git a/rs/moq-lite/src/setup.rs b/rs/moq-lite/src/setup.rs index 5e5e5d1bf..e845449ab 100644 --- a/rs/moq-lite/src/setup.rs +++ b/rs/moq-lite/src/setup.rs @@ -26,9 +26,9 @@ impl Client { // Draft15+: no versions list, parameters only. } Version::Ietf(ietf::Version::Draft14) - | Version::Lite(lite::Version::Draft03) | Version::Lite(lite::Version::Draft02) | Version::Lite(lite::Version::Draft01) => self.versions.encode(w, v), + Version::Lite(lite::Version::Draft03) => unreachable!("draft-03 uses no setup message"), }; w.put_slice(&self.parameters); } @@ -46,9 +46,8 @@ impl Decode for Client { Version::Ietf(ietf::Version::Draft14 | ietf::Version::Draft15 | ietf::Version::Draft16) => { u16::decode(r, v)? as usize } - Version::Lite(lite::Version::Draft03 | lite::Version::Draft02 | lite::Version::Draft01) => { - u64::decode(r, v)? as usize - } + Version::Lite(lite::Version::Draft02 | lite::Version::Draft01) => u64::decode(r, v)? as usize, + Version::Lite(lite::Version::Draft03) => unreachable!("draft-03 uses no setup message"), }; if r.remaining() < size { @@ -63,9 +62,9 @@ impl Decode for Client { coding::Versions::from([v.into()]) } Version::Ietf(ietf::Version::Draft14) - | Version::Lite(lite::Version::Draft03) | Version::Lite(lite::Version::Draft02) | Version::Lite(lite::Version::Draft01) => coding::Versions::decode(&mut msg, v)?, + Version::Lite(lite::Version::Draft03) => unreachable!("draft-03 uses no setup message"), }; Ok(Self { @@ -88,9 +87,8 @@ impl Encode for Client { Version::Ietf(ietf::Version::Draft14 | ietf::Version::Draft15 | ietf::Version::Draft16) => { u16::try_from(size).expect("message too large for u16").encode(w, v) } - Version::Lite(lite::Version::Draft03 | lite::Version::Draft02 | lite::Version::Draft01) => { - (size as u64).encode(w, v) - } + Version::Lite(lite::Version::Draft02 | lite::Version::Draft01) => (size as u64).encode(w, v), + Version::Lite(lite::Version::Draft03) => unreachable!("draft-03 uses no setup message"), } self.encode_inner(w, v); } @@ -113,9 +111,9 @@ impl Server { // Draft15+: No version field, parameters only. } Version::Ietf(ietf::Version::Draft14) - | Version::Lite(lite::Version::Draft03) | Version::Lite(lite::Version::Draft02) | Version::Lite(lite::Version::Draft01) => self.version.encode(w, v), + Version::Lite(lite::Version::Draft03) => unreachable!("draft-03 uses no setup message"), }; w.put_slice(&self.parameters); } @@ -133,9 +131,8 @@ impl Encode for Server { Version::Ietf(ietf::Version::Draft14 | ietf::Version::Draft15 | ietf::Version::Draft16) => { u16::try_from(size).expect("message too large for u16").encode(w, v) } - Version::Lite(lite::Version::Draft03 | lite::Version::Draft02 | lite::Version::Draft01) => { - (size as u64).encode(w, v) - } + Version::Lite(lite::Version::Draft02 | lite::Version::Draft01) => (size as u64).encode(w, v), + Version::Lite(lite::Version::Draft03) => unreachable!("draft-03 uses no setup message"), } self.encode_inner(w, v); @@ -153,9 +150,8 @@ impl Decode for Server { Version::Ietf(ietf::Version::Draft14 | ietf::Version::Draft15 | ietf::Version::Draft16) => { u16::decode(r, v)? as usize } - Version::Lite(lite::Version::Draft03 | lite::Version::Draft02 | lite::Version::Draft01) => { - u64::decode(r, v)? as usize - } + Version::Lite(lite::Version::Draft02 | lite::Version::Draft01) => u64::decode(r, v)? as usize, + Version::Lite(lite::Version::Draft03) => unreachable!("draft-03 uses no setup message"), }; if r.remaining() < size { @@ -166,9 +162,9 @@ impl Decode for Server { let version = match v { Version::Ietf(ietf::Version::Draft15 | ietf::Version::Draft16) => v.into(), Version::Ietf(ietf::Version::Draft14) - | Version::Lite(lite::Version::Draft03) | Version::Lite(lite::Version::Draft02) | Version::Lite(lite::Version::Draft01) => coding::Version::decode(&mut msg, v)?, + Version::Lite(lite::Version::Draft03) => unreachable!("draft-03 uses no setup message"), }; Ok(Self { diff --git a/rs/moq-lite/src/version.rs b/rs/moq-lite/src/version.rs index 8a6083136..91a137321 100644 --- a/rs/moq-lite/src/version.rs +++ b/rs/moq-lite/src/version.rs @@ -10,7 +10,7 @@ pub(crate) const NEGOTIATED: [Version; 3] = [ ]; /// ALPN strings for supported versions. -pub const ALPNS: &[&str] = &[lite::ALPN_03, lite::ALPN, ietf::ALPN_14, ietf::ALPN_15, ietf::ALPN_16]; +pub const ALPNS: &[&str] = &[lite::ALPN_03, lite::ALPN, ietf::ALPN_16, ietf::ALPN_15, ietf::ALPN_14]; // A combination of ietf::Version and lite::Version. #[derive(Debug, Clone, Copy, PartialEq, Eq, Hash)] From 155bdabcea79f0e6412d544fdba9d41e055a922c Mon Sep 17 00:00:00 2001 From: Luke Curley Date: Tue, 24 Feb 2026 11:02:51 -0800 Subject: [PATCH 6/8] AI review. --- js/lite/src/connection/connect.ts | 2 +- js/lite/src/lite/announce.ts | 18 ++++++++++++++++-- js/lite/src/lite/connection.ts | 2 +- js/lite/src/lite/fetch.ts | 20 ++++++++++++++++++-- js/lite/src/lite/group.ts | 23 ++++++++++++++++++++--- js/lite/src/lite/probe.ts | 20 ++++++++++++++++++-- js/lite/src/lite/publisher.ts | 2 +- js/lite/src/lite/session.ts | 23 ++++++++++++++++++++--- js/lite/src/lite/subscribe.ts | 16 +++++++++++----- js/lite/src/lite/subscriber.ts | 2 +- rs/moq-lite/src/lite/announce.rs | 10 ++++++++++ rs/moq-lite/src/lite/subscribe.rs | 4 ++-- 12 files changed, 119 insertions(+), 23 deletions(-) diff --git a/js/lite/src/connection/connect.ts b/js/lite/src/connection/connect.ts index a042143cf..655c87c42 100644 --- a/js/lite/src/connection/connect.ts +++ b/js/lite/src/connection/connect.ts @@ -155,7 +155,7 @@ async function connectWebTransport( allowPooling: false, congestionControl: "low-latency", // @ts-expect-error - TODO: add protocols to WebTransportOptions - protocols: [Lite.ALPN, Ietf.ALPN.DRAFT_16, Ietf.ALPN.DRAFT_15], + protocols: [Lite.ALPN_03, Lite.ALPN, Ietf.ALPN.DRAFT_16, Ietf.ALPN.DRAFT_15], ...options, }; diff --git a/js/lite/src/lite/announce.ts b/js/lite/src/lite/announce.ts index f1bf91d34..32da48ba4 100644 --- a/js/lite/src/lite/announce.ts +++ b/js/lite/src/lite/announce.ts @@ -98,6 +98,18 @@ export class AnnounceInit { this.suffixes = paths; } + static #guard(version: Version) { + switch (version) { + case Version.DRAFT_01: + case Version.DRAFT_02: + break; + case Version.DRAFT_03: + throw new Error("announce init not supported for Draft03"); + default: + unreachable(version); + } + } + async #encode(w: Writer) { await w.u53(this.suffixes.length); for (const path of this.suffixes) { @@ -114,11 +126,13 @@ export class AnnounceInit { return new AnnounceInit(suffixes); } - async encode(w: Writer): Promise { + async encode(w: Writer, version: Version): Promise { + AnnounceInit.#guard(version); return Message.encode(w, this.#encode.bind(this)); } - static async decode(r: Reader): Promise { + static async decode(r: Reader, version: Version): Promise { + AnnounceInit.#guard(version); return Message.decode(r, AnnounceInit.#decode); } } diff --git a/js/lite/src/lite/connection.ts b/js/lite/src/lite/connection.ts index 4fc524e77..43f5fbbf4 100644 --- a/js/lite/src/lite/connection.ts +++ b/js/lite/src/lite/connection.ts @@ -131,7 +131,7 @@ export class Connection implements Established { try { // Receive messages until the connection is closed. for (;;) { - const msg = await SessionInfo.decodeMaybe(this.#session.reader); + const msg = await SessionInfo.decodeMaybe(this.#session.reader, this.version); if (!msg) break; // TODO use the session info } diff --git a/js/lite/src/lite/fetch.ts b/js/lite/src/lite/fetch.ts index 071eba687..0035d018d 100644 --- a/js/lite/src/lite/fetch.ts +++ b/js/lite/src/lite/fetch.ts @@ -1,6 +1,20 @@ import * as Path from "../path.ts"; import type { Reader, Writer } from "../stream.ts"; +import { unreachable } from "../util/error.ts"; import * as Message from "./message.ts"; +import { Version } from "./version.ts"; + +function guardDraft03(version: Version) { + switch (version) { + case Version.DRAFT_03: + break; + case Version.DRAFT_01: + case Version.DRAFT_02: + throw new Error("fetch not supported for this version"); + default: + unreachable(version); + } +} export class Fetch { broadcast: Path.Valid; @@ -30,11 +44,13 @@ export class Fetch { return new Fetch(broadcast, track, priority, group); } - async encode(w: Writer): Promise { + async encode(w: Writer, version: Version): Promise { + guardDraft03(version); return Message.encode(w, this.#encode.bind(this)); } - static async decode(r: Reader): Promise { + static async decode(r: Reader, version: Version): Promise { + guardDraft03(version); return Message.decode(r, Fetch.#decode); } } diff --git a/js/lite/src/lite/group.ts b/js/lite/src/lite/group.ts index 9cbc8d358..778e46165 100644 --- a/js/lite/src/lite/group.ts +++ b/js/lite/src/lite/group.ts @@ -1,5 +1,7 @@ import type { Reader, Writer } from "../stream.ts"; +import { unreachable } from "../util/error.ts"; import * as Message from "./message.ts"; +import { Version } from "./version.ts"; export class Group { subscribe: bigint; @@ -43,6 +45,18 @@ export class GroupDrop { this.error = error; } + static #guard(version: Version) { + switch (version) { + case Version.DRAFT_03: + break; + case Version.DRAFT_01: + case Version.DRAFT_02: + throw new Error("group drop not supported for this version"); + default: + unreachable(version); + } + } + async #encode(w: Writer) { await w.u53(this.sequence); await w.u53(this.count); @@ -53,15 +67,18 @@ export class GroupDrop { return new GroupDrop(await r.u53(), await r.u53(), await r.u53()); } - async encode(w: Writer): Promise { + async encode(w: Writer, version: Version): Promise { + GroupDrop.#guard(version); return Message.encode(w, this.#encode.bind(this)); } - static async decode(r: Reader): Promise { + static async decode(r: Reader, version: Version): Promise { + GroupDrop.#guard(version); return Message.decode(r, GroupDrop.#decode); } - static async decodeMaybe(r: Reader): Promise { + static async decodeMaybe(r: Reader, version: Version): Promise { + GroupDrop.#guard(version); return Message.decodeMaybe(r, GroupDrop.#decode); } } diff --git a/js/lite/src/lite/probe.ts b/js/lite/src/lite/probe.ts index 7b848b9ff..365d4b1cd 100644 --- a/js/lite/src/lite/probe.ts +++ b/js/lite/src/lite/probe.ts @@ -1,5 +1,19 @@ import type { Reader, Writer } from "../stream.ts"; +import { unreachable } from "../util/error.ts"; import * as Message from "./message.ts"; +import { Version } from "./version.ts"; + +function guardDraft03(version: Version) { + switch (version) { + case Version.DRAFT_03: + break; + case Version.DRAFT_01: + case Version.DRAFT_02: + throw new Error("probe not supported for this version"); + default: + unreachable(version); + } +} export class Probe { bitrate: number; @@ -16,11 +30,13 @@ export class Probe { return new Probe(await r.u53()); } - async encode(w: Writer): Promise { + async encode(w: Writer, version: Version): Promise { + guardDraft03(version); return Message.encode(w, this.#encode.bind(this)); } - static async decode(r: Reader): Promise { + static async decode(r: Reader, version: Version): Promise { + guardDraft03(version); return Message.decode(r, Probe.#decode); } } diff --git a/js/lite/src/lite/publisher.ts b/js/lite/src/lite/publisher.ts index 9f19a97ea..26c5ab547 100644 --- a/js/lite/src/lite/publisher.ts +++ b/js/lite/src/lite/publisher.ts @@ -88,7 +88,7 @@ export class Publisher { case Version.DRAFT_01: case Version.DRAFT_02: { const init = new AnnounceInit([...active]); - await init.encode(stream.writer); + await init.encode(stream.writer, this.version); break; } } diff --git a/js/lite/src/lite/session.ts b/js/lite/src/lite/session.ts index 55c7367ad..63893faab 100644 --- a/js/lite/src/lite/session.ts +++ b/js/lite/src/lite/session.ts @@ -1,5 +1,7 @@ import type { Reader, Writer } from "../stream.ts"; +import { unreachable } from "../util/error.ts"; import * as Message from "./message.ts"; +import { Version } from "./version.ts"; export class Extensions { entries: Map; @@ -125,6 +127,18 @@ export class SessionInfo { this.bitrate = bitrate; } + static #guard(version: Version) { + switch (version) { + case Version.DRAFT_01: + case Version.DRAFT_02: + break; + case Version.DRAFT_03: + throw new Error("session info not supported for Draft03"); + default: + unreachable(version); + } + } + async #encode(w: Writer) { await w.u53(this.bitrate); } @@ -134,15 +148,18 @@ export class SessionInfo { return new SessionInfo(bitrate); } - async encode(w: Writer): Promise { + async encode(w: Writer, version: Version): Promise { + SessionInfo.#guard(version); return Message.encode(w, this.#encode.bind(this)); } - static async decode(r: Reader): Promise { + static async decode(r: Reader, version: Version): Promise { + SessionInfo.#guard(version); return Message.decode(r, SessionInfo.#decode); } - static async decodeMaybe(r: Reader): Promise { + static async decodeMaybe(r: Reader, version: Version): Promise { + SessionInfo.#guard(version); return Message.decodeMaybe(r, SessionInfo.#decode); } } diff --git a/js/lite/src/lite/subscribe.ts b/js/lite/src/lite/subscribe.ts index b09a6fedd..afb3b3c72 100644 --- a/js/lite/src/lite/subscribe.ts +++ b/js/lite/src/lite/subscribe.ts @@ -115,8 +115,8 @@ export class Subscribe { case Version.DRAFT_03: await w.bool(this.ordered); await w.u53(this.maxLatency); - await w.u53(this.startGroup ? this.startGroup + 1 : 0); - await w.u53(this.endGroup ? this.endGroup + 1 : 0); + await w.u53(this.startGroup !== undefined ? this.startGroup + 1 : 0); + await w.u53(this.endGroup !== undefined ? this.endGroup + 1 : 0); break; case Version.DRAFT_01: case Version.DRAFT_02: @@ -199,8 +199,8 @@ export class SubscribeOk { await w.u8(this.priority); await w.bool(this.ordered); await w.u53(this.maxLatency); - await w.u53(this.startGroup ? this.startGroup + 1 : 0); - await w.u53(this.endGroup ? this.endGroup + 1 : 0); + await w.u53(this.startGroup !== undefined ? this.startGroup + 1 : 0); + await w.u53(this.endGroup !== undefined ? this.endGroup + 1 : 0); break; case Version.DRAFT_02: // noop @@ -238,7 +238,13 @@ export class SubscribeOk { unreachable(version); } - return new SubscribeOk({ priority, ordered, maxLatency, startGroup, endGroup }); + return new SubscribeOk({ + priority, + ordered, + maxLatency, + startGroup: startGroup !== undefined && startGroup > 0 ? startGroup - 1 : undefined, + endGroup: endGroup !== undefined && endGroup > 0 ? endGroup - 1 : undefined, + }); } async encode(w: Writer, version: Version): Promise { diff --git a/js/lite/src/lite/subscriber.ts b/js/lite/src/lite/subscriber.ts index 386921a3a..7cc9fad68 100644 --- a/js/lite/src/lite/subscriber.ts +++ b/js/lite/src/lite/subscriber.ts @@ -59,7 +59,7 @@ export class Subscriber { case Version.DRAFT_01: case Version.DRAFT_02: { // Receive ANNOUNCE_INIT first - const init = await AnnounceInit.decode(stream.reader); + const init = await AnnounceInit.decode(stream.reader, this.version); // Process initial announcements for (const suffix of init.suffixes) { diff --git a/rs/moq-lite/src/lite/announce.rs b/rs/moq-lite/src/lite/announce.rs index 70bfe0933..f6fccd951 100644 --- a/rs/moq-lite/src/lite/announce.rs +++ b/rs/moq-lite/src/lite/announce.rs @@ -112,6 +112,11 @@ pub struct AnnounceInit<'a> { impl Message for AnnounceInit<'_> { fn decode_msg(r: &mut R, version: Version) -> Result { + match version { + Version::Draft01 | Version::Draft02 => {} + Version::Draft03 => unreachable!("announce init not supported for version: {:?}", version), + } + let count = u64::decode(r, version)?; // Don't allocate more than 1024 elements upfront @@ -125,6 +130,11 @@ impl Message for AnnounceInit<'_> { } fn encode_msg(&self, w: &mut W, version: Version) { + match version { + Version::Draft01 | Version::Draft02 => {} + Version::Draft03 => unreachable!("announce init not supported for version: {:?}", version), + } + (self.suffixes.len() as u64).encode(w, version); for path in &self.suffixes { path.encode(w, version); diff --git a/rs/moq-lite/src/lite/subscribe.rs b/rs/moq-lite/src/lite/subscribe.rs index 539b885e7..9f9366465 100644 --- a/rs/moq-lite/src/lite/subscribe.rs +++ b/rs/moq-lite/src/lite/subscribe.rs @@ -138,7 +138,7 @@ impl Message for SubscribeOk { pub struct SubscribeUpdate { pub priority: u8, pub ordered: bool, - pub max_latency: u64, + pub max_latency: std::time::Duration, pub start_group: Option, pub end_group: Option, } @@ -154,7 +154,7 @@ impl Message for SubscribeUpdate { let priority = u8::decode(r, version)?; let ordered = u8::decode(r, version)? != 0; - let max_latency = u64::decode(r, version)?; + let max_latency = std::time::Duration::decode(r, version)?; let start_group = match u64::decode(r, version)? { 0 => None, group => Some(group - 1), From 6bd6619ad24c49c7562cc96cc6cbba4dce89f3c3 Mon Sep 17 00:00:00 2001 From: Luke Curley Date: Tue, 24 Feb 2026 11:37:11 -0800 Subject: [PATCH 7/8] Draft changes to handle SUBSCRIBE_OK. TODO actually review. --- js/lite/src/lite/group.ts | 51 -------------- js/lite/src/lite/publisher.ts | 4 +- js/lite/src/lite/subscribe.ts | 89 ++++++++++++++++++++++++ js/lite/src/lite/subscriber.ts | 8 ++- rs/moq-lite/src/lite/group.rs | 41 ----------- rs/moq-lite/src/lite/publisher.rs | 2 +- rs/moq-lite/src/lite/subscribe.rs | 106 ++++++++++++++++++++++++++++- rs/moq-lite/src/lite/subscriber.rs | 9 ++- 8 files changed, 209 insertions(+), 101 deletions(-) diff --git a/js/lite/src/lite/group.ts b/js/lite/src/lite/group.ts index 778e46165..8ad714505 100644 --- a/js/lite/src/lite/group.ts +++ b/js/lite/src/lite/group.ts @@ -1,7 +1,5 @@ import type { Reader, Writer } from "../stream.ts"; -import { unreachable } from "../util/error.ts"; import * as Message from "./message.ts"; -import { Version } from "./version.ts"; export class Group { subscribe: bigint; @@ -34,55 +32,6 @@ export class Group { } } -export class GroupDrop { - sequence: number; - count: number; - error: number; - - constructor(sequence: number, count: number, error: number) { - this.sequence = sequence; - this.count = count; - this.error = error; - } - - static #guard(version: Version) { - switch (version) { - case Version.DRAFT_03: - break; - case Version.DRAFT_01: - case Version.DRAFT_02: - throw new Error("group drop not supported for this version"); - default: - unreachable(version); - } - } - - async #encode(w: Writer) { - await w.u53(this.sequence); - await w.u53(this.count); - await w.u53(this.error); - } - - static async #decode(r: Reader): Promise { - return new GroupDrop(await r.u53(), await r.u53(), await r.u53()); - } - - async encode(w: Writer, version: Version): Promise { - GroupDrop.#guard(version); - return Message.encode(w, this.#encode.bind(this)); - } - - static async decode(r: Reader, version: Version): Promise { - GroupDrop.#guard(version); - return Message.decode(r, GroupDrop.#decode); - } - - static async decodeMaybe(r: Reader, version: Version): Promise { - GroupDrop.#guard(version); - return Message.decodeMaybe(r, GroupDrop.#decode); - } -} - export class Frame { payload: Uint8Array; diff --git a/js/lite/src/lite/publisher.ts b/js/lite/src/lite/publisher.ts index 26c5ab547..35dfd1696 100644 --- a/js/lite/src/lite/publisher.ts +++ b/js/lite/src/lite/publisher.ts @@ -7,7 +7,7 @@ import type { Track } from "../track.ts"; import { error } from "../util/error.ts"; import { Announce, AnnounceInit, type AnnounceInterest } from "./announce.ts"; import { Group as GroupMessage } from "./group.ts"; -import { type Subscribe, SubscribeOk, SubscribeUpdate } from "./subscribe.ts"; +import { encodeSubscribeResponse, type Subscribe, SubscribeOk, SubscribeUpdate } from "./subscribe.ts"; import { Version } from "./version.ts"; /** @@ -155,7 +155,7 @@ export class Publisher { try { const info = new SubscribeOk({ priority: msg.priority }); - await info.encode(stream.writer, this.version); + await encodeSubscribeResponse(stream.writer, { ok: info }, this.version); console.debug(`publish ok: broadcast=${msg.broadcast} track=${track.name}`); diff --git a/js/lite/src/lite/subscribe.ts b/js/lite/src/lite/subscribe.ts index afb3b3c72..3307817cf 100644 --- a/js/lite/src/lite/subscribe.ts +++ b/js/lite/src/lite/subscribe.ts @@ -255,3 +255,92 @@ export class SubscribeOk { return Message.decode(r, SubscribeOk.#decode.bind(SubscribeOk, version)); } } + +/// Indicates that one or more groups have been dropped. +/// +/// Draft03 only. +export class SubscribeDrop { + sequence: number; + count: number; + error: number; + + constructor(sequence: number, count: number, error: number) { + this.sequence = sequence; + this.count = count; + this.error = error; + } + + async #encode(w: Writer) { + await w.u53(this.sequence); + await w.u53(this.count); + await w.u53(this.error); + } + + static async #decode(r: Reader): Promise { + return new SubscribeDrop(await r.u53(), await r.u53(), await r.u53()); + } + + async encode(w: Writer): Promise { + return Message.encode(w, this.#encode.bind(this)); + } + + static async decode(r: Reader): Promise { + return Message.decode(r, SubscribeDrop.#decode); + } +} + +/** + * A response message on the subscribe stream. + * + * In Draft03, each response is prefixed with a type discriminator: + * - 0x0 for SUBSCRIBE_OK + * - 0x1 for SUBSCRIBE_DROP + * + * SUBSCRIBE_OK must be the first message on the response stream. + */ +export type SubscribeResponse = { ok: SubscribeOk } | { drop: SubscribeDrop }; + +export async function encodeSubscribeResponse(w: Writer, resp: SubscribeResponse, version: Version): Promise { + switch (version) { + case Version.DRAFT_03: + if ("ok" in resp) { + await w.u53(0x0); + await resp.ok.encode(w, version); + } else { + await w.u53(0x1); + await resp.drop.encode(w); + } + break; + case Version.DRAFT_01: + case Version.DRAFT_02: + if ("ok" in resp) { + await resp.ok.encode(w, version); + } else { + throw new Error("subscribe drop not supported for this version"); + } + break; + default: + unreachable(version); + } +} + +export async function decodeSubscribeResponse(r: Reader, version: Version): Promise { + switch (version) { + case Version.DRAFT_03: { + const typ = await r.u53(); + switch (typ) { + case 0x0: + return { ok: await SubscribeOk.decode(r, version) }; + case 0x1: + return { drop: await SubscribeDrop.decode(r) }; + default: + throw new Error(`unknown subscribe response type: ${typ}`); + } + } + case Version.DRAFT_01: + case Version.DRAFT_02: + return { ok: await SubscribeOk.decode(r, version) }; + default: + unreachable(version); + } +} diff --git a/js/lite/src/lite/subscriber.ts b/js/lite/src/lite/subscriber.ts index 7cc9fad68..7f0fc0728 100644 --- a/js/lite/src/lite/subscriber.ts +++ b/js/lite/src/lite/subscriber.ts @@ -8,7 +8,7 @@ import { error } from "../util/error.ts"; import { Announce, AnnounceInit, AnnounceInterest } from "./announce.ts"; import type { Group as GroupMessage } from "./group.ts"; import { StreamId } from "./stream.ts"; -import { Subscribe, SubscribeOk } from "./subscribe.ts"; +import { decodeSubscribeResponse, Subscribe } from "./subscribe.ts"; import { Version } from "./version.ts"; /** @@ -130,7 +130,11 @@ export class Subscriber { await msg.encode(stream.writer, this.version); try { - await SubscribeOk.decode(stream.reader, this.version); + // The first response MUST be a SUBSCRIBE_OK. + const resp = await decodeSubscribeResponse(stream.reader, this.version); + if (!("ok" in resp)) { + throw new Error("first subscribe response must be SUBSCRIBE_OK"); + } console.debug(`subscribe ok: id=${id} broadcast=${broadcast} track=${request.track.name}`); await Promise.race([stream.reader.closed, request.track.closed]); diff --git a/rs/moq-lite/src/lite/group.rs b/rs/moq-lite/src/lite/group.rs index eda1a2d34..3e22a8a43 100644 --- a/rs/moq-lite/src/lite/group.rs +++ b/rs/moq-lite/src/lite/group.rs @@ -25,44 +25,3 @@ impl Message for Group { self.sequence.encode(w, version); } } - -/// Indicates that one or more groups have been dropped. -/// -/// Draft03 only. -#[allow(dead_code)] -#[derive(Clone, Debug)] -pub struct GroupDrop { - pub sequence: u64, - pub count: u64, - pub error: u64, -} - -impl Message for GroupDrop { - fn decode_msg(r: &mut R, version: Version) -> Result { - match version { - Version::Draft01 | Version::Draft02 => { - unreachable!("group drop not supported for version: {:?}", version); - } - Version::Draft03 => {} - } - - Ok(Self { - sequence: u64::decode(r, version)?, - count: u64::decode(r, version)?, - error: u64::decode(r, version)?, - }) - } - - fn encode_msg(&self, w: &mut W, version: Version) { - match version { - Version::Draft01 | Version::Draft02 => { - unreachable!("group drop not supported for version: {:?}", version); - } - Version::Draft03 => {} - } - - self.sequence.encode(w, version); - self.count.encode(w, version); - self.error.encode(w, version); - } -} diff --git a/rs/moq-lite/src/lite/publisher.rs b/rs/moq-lite/src/lite/publisher.rs index 79d793e25..4abb5725b 100644 --- a/rs/moq-lite/src/lite/publisher.rs +++ b/rs/moq-lite/src/lite/publisher.rs @@ -210,7 +210,7 @@ impl Publisher { end_group: None, }; - stream.writer.encode(&info).await?; + stream.writer.encode(&lite::SubscribeResponse::Ok(info)).await?; tokio::select! { res = Self::run_track(session, track, subscribe, priority, version) => res?, diff --git a/rs/moq-lite/src/lite/subscribe.rs b/rs/moq-lite/src/lite/subscribe.rs index 9f9366465..b789a81cb 100644 --- a/rs/moq-lite/src/lite/subscribe.rs +++ b/rs/moq-lite/src/lite/subscribe.rs @@ -2,7 +2,7 @@ use std::borrow::Cow; use crate::{ Path, - coding::{Decode, DecodeError, Encode}, + coding::{Decode, DecodeError, Encode, Sizer}, lite::{Message, Version}, }; @@ -196,3 +196,107 @@ impl Message for SubscribeUpdate { } } } + +/// Indicates that one or more groups have been dropped. +/// +/// Draft03 only. +#[derive(Clone, Debug)] +pub struct SubscribeDrop { + pub sequence: u64, + pub count: u64, + pub error: u64, +} + +impl Message for SubscribeDrop { + fn decode_msg(r: &mut R, version: Version) -> Result { + match version { + Version::Draft01 | Version::Draft02 => { + unreachable!("subscribe drop not supported for version: {:?}", version); + } + Version::Draft03 => {} + } + + Ok(Self { + sequence: u64::decode(r, version)?, + count: u64::decode(r, version)?, + error: u64::decode(r, version)?, + }) + } + + fn encode_msg(&self, w: &mut W, version: Version) { + match version { + Version::Draft01 | Version::Draft02 => { + unreachable!("subscribe drop not supported for version: {:?}", version); + } + Version::Draft03 => {} + } + + self.sequence.encode(w, version); + self.count.encode(w, version); + self.error.encode(w, version); + } +} + +/// A response message on the subscribe stream. +/// +/// In Draft03, each response is prefixed with a type discriminator: +/// - 0x0 for SUBSCRIBE_OK +/// - 0x1 for SUBSCRIBE_DROP +/// +/// SUBSCRIBE_OK must be the first message on the response stream. +#[derive(Clone, Debug)] +pub enum SubscribeResponse { + Ok(SubscribeOk), + Drop(SubscribeDrop), +} + +impl Encode for SubscribeResponse { + fn encode(&self, w: &mut W, version: Version) { + match version { + Version::Draft03 => match self { + Self::Ok(ok) => { + 0u64.encode(w, version); + // Write size-prefixed body using Message trait + let mut sizer = Sizer::default(); + Message::encode_msg(ok, &mut sizer, version); + sizer.size.encode(w, version); + Message::encode_msg(ok, w, version); + } + Self::Drop(drop) => { + 1u64.encode(w, version); + let mut sizer = Sizer::default(); + Message::encode_msg(drop, &mut sizer, version); + sizer.size.encode(w, version); + Message::encode_msg(drop, w, version); + } + }, + Version::Draft01 | Version::Draft02 => match self { + Self::Ok(ok) => { + let mut sizer = Sizer::default(); + Message::encode_msg(ok, &mut sizer, version); + sizer.size.encode(w, version); + Message::encode_msg(ok, w, version); + } + Self::Drop(_) => { + unreachable!("subscribe drop not supported for version: {:?}", version); + } + }, + } + } +} + +impl Decode for SubscribeResponse { + fn decode(buf: &mut B, version: Version) -> Result { + match version { + Version::Draft03 => { + let typ = u64::decode(buf, version)?; + match typ { + 0 => Ok(Self::Ok(SubscribeOk::decode(buf, version)?)), + 1 => Ok(Self::Drop(SubscribeDrop::decode(buf, version)?)), + _ => Err(DecodeError::InvalidMessage(typ)), + } + } + Version::Draft01 | Version::Draft02 => Ok(Self::Ok(SubscribeOk::decode(buf, version)?)), + } + } +} diff --git a/rs/moq-lite/src/lite/subscriber.rs b/rs/moq-lite/src/lite/subscriber.rs index bb1d22c6f..76e151e20 100644 --- a/rs/moq-lite/src/lite/subscriber.rs +++ b/rs/moq-lite/src/lite/subscriber.rs @@ -228,10 +228,13 @@ impl Subscriber { ) -> Result<(), Error> { stream.writer.encode(&msg).await?; - // TODO use the response correctly populate the track info - let _info: lite::SubscribeOk = stream.reader.decode().await?; + // The first response MUST be a SUBSCRIBE_OK. + let resp: lite::SubscribeResponse = stream.reader.decode().await?; + let lite::SubscribeResponse::Ok(_info) = resp else { + return Err(Error::ProtocolViolation); + }; - // Wait until the stream is closed + // TODO handle additional SUBSCRIBE_OK and SUBSCRIBE_DROP messages. stream.reader.closed().await?; Ok(()) From 15c9afbd1f83d5678bc1d3dbb3b85e69e18296ce Mon Sep 17 00:00:00 2001 From: Luke Curley Date: Tue, 24 Feb 2026 14:39:34 -0800 Subject: [PATCH 8/8] Remove group count. --- js/lite/src/lite/subscribe.ts | 44 ++++++++++++++++++++--------------- 1 file changed, 25 insertions(+), 19 deletions(-) diff --git a/js/lite/src/lite/subscribe.ts b/js/lite/src/lite/subscribe.ts index 3307817cf..5b8edcca7 100644 --- a/js/lite/src/lite/subscribe.ts +++ b/js/lite/src/lite/subscribe.ts @@ -8,21 +8,21 @@ export class SubscribeUpdate { priority: number; ordered: boolean; maxLatency: number; - startGroup: number; - endGroup: number; - - constructor( - priority: number, - ordered: boolean = true, - maxLatency: number = 0, - startGroup: number = 0, - endGroup: number = 0, - ) { - this.priority = priority; - this.ordered = ordered; - this.maxLatency = maxLatency; - this.startGroup = startGroup; - this.endGroup = endGroup; + startGroup?: number; + endGroup?: number; + + constructor(props: { + priority: number; + ordered?: boolean; + maxLatency?: number; + startGroup?: number; + endGroup?: number; + }) { + this.priority = props.priority; + this.ordered = props.ordered ?? true; + this.maxLatency = props.maxLatency ?? 0; + this.startGroup = props.startGroup; + this.endGroup = props.endGroup; } async #encode(w: Writer, version: Version) { @@ -31,8 +31,8 @@ export class SubscribeUpdate { await w.u8(this.priority); await w.bool(this.ordered); await w.u53(this.maxLatency); - await w.u53(this.startGroup); - await w.u53(this.endGroup); + await w.u53(this.startGroup !== undefined ? this.startGroup + 1 : 0); + await w.u53(this.endGroup !== undefined ? this.endGroup + 1 : 0); break; case Version.DRAFT_01: case Version.DRAFT_02: @@ -51,11 +51,17 @@ export class SubscribeUpdate { const maxLatency = await r.u53(); const startGroup = await r.u53(); const endGroup = await r.u53(); - return new SubscribeUpdate(priority, ordered, maxLatency, startGroup, endGroup); + return new SubscribeUpdate({ + priority, + ordered, + maxLatency, + startGroup: startGroup > 0 ? startGroup - 1 : undefined, + endGroup: endGroup > 0 ? endGroup - 1 : undefined, + }); } case Version.DRAFT_01: case Version.DRAFT_02: - return new SubscribeUpdate(await r.u8()); + return new SubscribeUpdate({ priority: await r.u8() }); default: unreachable(version); }