diff --git a/js/lite/src/connection/connect.ts b/js/lite/src/connection/connect.ts index 6f9bd6f4a..655c87c42 100644 --- a/js/lite/src/connection/connect.ts +++ b/js/lite/src/connection/connect.ts @@ -70,9 +70,6 @@ export async function connect(url: URL, props?: ConnectProps): Promise { + static async #decode(r: Reader, version: Version): Promise { const active = await r.bool(); const suffix = Path.from(await r.string()); - return new Announce(suffix, active); + + let hops = 0; + switch (version) { + case Version.DRAFT_03: + hops = await r.u53(); + break; + case Version.DRAFT_01: + case Version.DRAFT_02: + break; + default: + unreachable(version); + } + + return new Announce({ suffix, active, hops }); } - async encode(w: Writer): Promise { - return Message.encode(w, this.#encode.bind(this)); + async encode(w: Writer, version: Version): Promise { + return Message.encode(w, (w) => this.#encode(w, version)); } - static async decode(r: Reader): Promise { - return Message.decode(r, Announce.#decode); + static async decode(r: Reader, version: Version): Promise { + return Message.decode(r, (r) => Announce.#decode(r, version)); } - static async decodeMaybe(r: Reader): Promise { - return Message.decodeMaybe(r, Announce.#decode); + static async decodeMaybe(r: Reader, version: Version): Promise { + return Message.decodeMaybe(r, (r) => Announce.#decode(r, version)); } } @@ -60,6 +88,9 @@ export class AnnounceInterest { } } +/// Sent after setup to communicate the initially announced paths. +/// +/// Used by Draft01/Draft02 only. Draft03 uses individual Announce messages instead. export class AnnounceInit { suffixes: Path.Valid[]; @@ -67,6 +98,18 @@ export class AnnounceInit { this.suffixes = paths; } + static #guard(version: Version) { + switch (version) { + case Version.DRAFT_01: + case Version.DRAFT_02: + break; + case Version.DRAFT_03: + throw new Error("announce init not supported for Draft03"); + default: + unreachable(version); + } + } + async #encode(w: Writer) { await w.u53(this.suffixes.length); for (const path of this.suffixes) { @@ -83,11 +126,13 @@ export class AnnounceInit { return new AnnounceInit(suffixes); } - async encode(w: Writer): Promise { + async encode(w: Writer, version: Version): Promise { + AnnounceInit.#guard(version); return Message.encode(w, this.#encode.bind(this)); } - static async decode(r: Reader): Promise { + static async decode(r: Reader, version: Version): Promise { + AnnounceInit.#guard(version); return Message.decode(r, AnnounceInit.#decode); } } diff --git a/js/lite/src/lite/connection.ts b/js/lite/src/lite/connection.ts index c5bb3f07d..43f5fbbf4 100644 --- a/js/lite/src/lite/connection.ts +++ b/js/lite/src/lite/connection.ts @@ -28,7 +28,7 @@ export class Connection implements Established { #quic: WebTransport; // Use to receive/send session messages. - #session: Stream; + #session?: Stream; // Module for contributing tracks. #publisher: Publisher; @@ -47,7 +47,7 @@ export class Connection implements Established { * * @internal */ - constructor(url: URL, quic: WebTransport, session: Stream, version: Version) { + constructor(url: URL, quic: WebTransport, version: Version, session?: Stream) { this.url = url; this.#quic = quic; this.#session = session; @@ -123,10 +123,15 @@ export class Connection implements Established { } async #runSession() { + if (!this.#session) { + // moq-lite draft-03 doesn't use a session stream. + return; + } + try { // Receive messages until the connection is closed. for (;;) { - const msg = await SessionInfo.decodeMaybe(this.#session.reader); + const msg = await SessionInfo.decodeMaybe(this.#session.reader, this.version); if (!msg) break; // TODO use the session info } @@ -162,7 +167,7 @@ export class Connection implements Established { await this.#publisher.runAnnounce(msg, stream); return; } else if (typ === StreamId.Subscribe) { - const msg = await Subscribe.decode(stream.reader); + const msg = await Subscribe.decode(stream.reader, this.version); await this.#publisher.runSubscribe(msg, stream); return; } else { diff --git a/js/lite/src/lite/fetch.ts b/js/lite/src/lite/fetch.ts new file mode 100644 index 000000000..0035d018d --- /dev/null +++ b/js/lite/src/lite/fetch.ts @@ -0,0 +1,56 @@ +import * as Path from "../path.ts"; +import type { Reader, Writer } from "../stream.ts"; +import { unreachable } from "../util/error.ts"; +import * as Message from "./message.ts"; +import { Version } from "./version.ts"; + +function guardDraft03(version: Version) { + switch (version) { + case Version.DRAFT_03: + break; + case Version.DRAFT_01: + case Version.DRAFT_02: + throw new Error("fetch not supported for this version"); + default: + unreachable(version); + } +} + +export class Fetch { + broadcast: Path.Valid; + track: string; + priority: number; + group: number; + + constructor(broadcast: Path.Valid, track: string, priority: number, group: number) { + this.broadcast = broadcast; + this.track = track; + this.priority = priority; + this.group = group; + } + + async #encode(w: Writer) { + await w.string(this.broadcast); + await w.string(this.track); + await w.u8(this.priority); + await w.u53(this.group); + } + + static async #decode(r: Reader): Promise { + const broadcast = Path.from(await r.string()); + const track = await r.string(); + const priority = await r.u8(); + const group = await r.u53(); + return new Fetch(broadcast, track, priority, group); + } + + async encode(w: Writer, version: Version): Promise { + guardDraft03(version); + return Message.encode(w, this.#encode.bind(this)); + } + + static async decode(r: Reader, version: Version): Promise { + guardDraft03(version); + return Message.decode(r, Fetch.#decode); + } +} diff --git a/js/lite/src/lite/group.ts b/js/lite/src/lite/group.ts index 9cbc8d358..8ad714505 100644 --- a/js/lite/src/lite/group.ts +++ b/js/lite/src/lite/group.ts @@ -32,40 +32,6 @@ export class Group { } } -export class GroupDrop { - sequence: number; - count: number; - error: number; - - constructor(sequence: number, count: number, error: number) { - this.sequence = sequence; - this.count = count; - this.error = error; - } - - async #encode(w: Writer) { - await w.u53(this.sequence); - await w.u53(this.count); - await w.u53(this.error); - } - - static async #decode(r: Reader): Promise { - return new GroupDrop(await r.u53(), await r.u53(), await r.u53()); - } - - async encode(w: Writer): Promise { - return Message.encode(w, this.#encode.bind(this)); - } - - static async decode(r: Reader): Promise { - return Message.decode(r, GroupDrop.#decode); - } - - static async decodeMaybe(r: Reader): Promise { - return Message.decodeMaybe(r, GroupDrop.#decode); - } -} - export class Frame { payload: Uint8Array; diff --git a/js/lite/src/lite/index.ts b/js/lite/src/lite/index.ts index a28f40a3d..9314476d6 100644 --- a/js/lite/src/lite/index.ts +++ b/js/lite/src/lite/index.ts @@ -1,6 +1,8 @@ export * from "./announce.ts"; export * from "./connection.ts"; +export * from "./fetch.ts"; export * from "./group.ts"; +export * from "./probe.ts"; export * from "./session.ts"; export * from "./stream.ts"; export * from "./subscribe.ts"; diff --git a/js/lite/src/lite/probe.ts b/js/lite/src/lite/probe.ts new file mode 100644 index 000000000..365d4b1cd --- /dev/null +++ b/js/lite/src/lite/probe.ts @@ -0,0 +1,42 @@ +import type { Reader, Writer } from "../stream.ts"; +import { unreachable } from "../util/error.ts"; +import * as Message from "./message.ts"; +import { Version } from "./version.ts"; + +function guardDraft03(version: Version) { + switch (version) { + case Version.DRAFT_03: + break; + case Version.DRAFT_01: + case Version.DRAFT_02: + throw new Error("probe not supported for this version"); + default: + unreachable(version); + } +} + +export class Probe { + bitrate: number; + + constructor(bitrate: number) { + this.bitrate = bitrate; + } + + async #encode(w: Writer) { + await w.u53(this.bitrate); + } + + static async #decode(r: Reader): Promise { + return new Probe(await r.u53()); + } + + async encode(w: Writer, version: Version): Promise { + guardDraft03(version); + return Message.encode(w, this.#encode.bind(this)); + } + + static async decode(r: Reader, version: Version): Promise { + guardDraft03(version); + return Message.decode(r, Probe.#decode); + } +} diff --git a/js/lite/src/lite/publisher.ts b/js/lite/src/lite/publisher.ts index 95ff646fe..35dfd1696 100644 --- a/js/lite/src/lite/publisher.ts +++ b/js/lite/src/lite/publisher.ts @@ -7,8 +7,8 @@ import type { Track } from "../track.ts"; import { error } from "../util/error.ts"; import { Announce, AnnounceInit, type AnnounceInterest } from "./announce.ts"; import { Group as GroupMessage } from "./group.ts"; -import { type Subscribe, SubscribeOk, SubscribeUpdate } from "./subscribe.ts"; -import type { Version } from "./version.ts"; +import { encodeSubscribeResponse, type Subscribe, SubscribeOk, SubscribeUpdate } from "./subscribe.ts"; +import { Version } from "./version.ts"; /** * Handles publishing broadcasts and managing their lifecycle. @@ -64,7 +64,7 @@ export class Publisher { async runAnnounce(msg: AnnounceInterest, stream: Stream) { console.debug(`announce: prefix=${msg.prefix}`); - // Send ANNOUNCE_INIT as the first message with all currently active paths + // Send initial announcements let active = new Set(); const broadcasts = this.#broadcasts.peek(); @@ -77,8 +77,21 @@ export class Publisher { active.add(suffix); } - const init = new AnnounceInit([...active]); - await init.encode(stream.writer); + switch (this.version) { + case Version.DRAFT_03: + // Draft03: send individual Announce messages for initial state. + for (const suffix of active) { + const wire = new Announce({ suffix, active: true }); + await wire.encode(stream.writer, this.version); + } + break; + case Version.DRAFT_01: + case Version.DRAFT_02: { + const init = new AnnounceInit([...active]); + await init.encode(stream.writer, this.version); + break; + } + } // Wait for updates to the broadcasts. for (;;) { @@ -105,15 +118,15 @@ export class Publisher { // Announce any new broadcasts. for (const added of newActive.difference(active)) { console.debug(`announce: broadcast=${added} active=true`); - const wire = new Announce(added, true); - await wire.encode(stream.writer); + const wire = new Announce({ suffix: added, active: true }); + await wire.encode(stream.writer, this.version); } // Announce any removed broadcasts. for (const removed of active.difference(newActive)) { console.debug(`announce: broadcast=${removed} active=false`); - const wire = new Announce(removed, false); - await wire.encode(stream.writer); + const wire = new Announce({ suffix: removed, active: false }); + await wire.encode(stream.writer, this.version); } // NOTE: This is kind of a hack that won't work with a rapid UNANNOUNCE/ANNOUNCE cycle. @@ -141,15 +154,15 @@ export class Publisher { const track = broadcast.subscribe(msg.track, msg.priority); try { - const info = new SubscribeOk({ version: this.version, priority: msg.priority }); - await info.encode(stream.writer); + const info = new SubscribeOk({ priority: msg.priority }); + await encodeSubscribeResponse(stream.writer, { ok: info }, this.version); console.debug(`publish ok: broadcast=${msg.broadcast} track=${track.name}`); const serving = this.#runTrack(msg.id, msg.broadcast, track, stream.writer); for (;;) { - const decode = SubscribeUpdate.decodeMaybe(stream.reader); + const decode = SubscribeUpdate.decodeMaybe(stream.reader, this.version); const result = await Promise.any([serving, decode]); if (!result) break; diff --git a/js/lite/src/lite/session.ts b/js/lite/src/lite/session.ts index 55c7367ad..63893faab 100644 --- a/js/lite/src/lite/session.ts +++ b/js/lite/src/lite/session.ts @@ -1,5 +1,7 @@ import type { Reader, Writer } from "../stream.ts"; +import { unreachable } from "../util/error.ts"; import * as Message from "./message.ts"; +import { Version } from "./version.ts"; export class Extensions { entries: Map; @@ -125,6 +127,18 @@ export class SessionInfo { this.bitrate = bitrate; } + static #guard(version: Version) { + switch (version) { + case Version.DRAFT_01: + case Version.DRAFT_02: + break; + case Version.DRAFT_03: + throw new Error("session info not supported for Draft03"); + default: + unreachable(version); + } + } + async #encode(w: Writer) { await w.u53(this.bitrate); } @@ -134,15 +148,18 @@ export class SessionInfo { return new SessionInfo(bitrate); } - async encode(w: Writer): Promise { + async encode(w: Writer, version: Version): Promise { + SessionInfo.#guard(version); return Message.encode(w, this.#encode.bind(this)); } - static async decode(r: Reader): Promise { + static async decode(r: Reader, version: Version): Promise { + SessionInfo.#guard(version); return Message.decode(r, SessionInfo.#decode); } - static async decodeMaybe(r: Reader): Promise { + static async decodeMaybe(r: Reader, version: Version): Promise { + SessionInfo.#guard(version); return Message.decodeMaybe(r, SessionInfo.#decode); } } diff --git a/js/lite/src/lite/stream.ts b/js/lite/src/lite/stream.ts index 10cdd7265..632771585 100644 --- a/js/lite/src/lite/stream.ts +++ b/js/lite/src/lite/stream.ts @@ -10,6 +10,8 @@ export const StreamId = { Session: 0, Announce: 1, Subscribe: 2, + Fetch: 3, + Probe: 4, ClientCompat: 0x20, ServerCompat: 0x21, } as const; diff --git a/js/lite/src/lite/subscribe.ts b/js/lite/src/lite/subscribe.ts index 7f2c0a756..5b8edcca7 100644 --- a/js/lite/src/lite/subscribe.ts +++ b/js/lite/src/lite/subscribe.ts @@ -1,34 +1,82 @@ import * as Path from "../path.ts"; import type { Reader, Writer } from "../stream.ts"; +import { unreachable } from "../util/error.ts"; import * as Message from "./message.ts"; import { Version } from "./version.ts"; export class SubscribeUpdate { priority: number; + ordered: boolean; + maxLatency: number; + startGroup?: number; + endGroup?: number; - constructor(priority: number) { - this.priority = priority; + constructor(props: { + priority: number; + ordered?: boolean; + maxLatency?: number; + startGroup?: number; + endGroup?: number; + }) { + this.priority = props.priority; + this.ordered = props.ordered ?? true; + this.maxLatency = props.maxLatency ?? 0; + this.startGroup = props.startGroup; + this.endGroup = props.endGroup; } - async #encode(w: Writer) { - await w.u8(this.priority); + async #encode(w: Writer, version: Version) { + switch (version) { + case Version.DRAFT_03: + await w.u8(this.priority); + await w.bool(this.ordered); + await w.u53(this.maxLatency); + await w.u53(this.startGroup !== undefined ? this.startGroup + 1 : 0); + await w.u53(this.endGroup !== undefined ? this.endGroup + 1 : 0); + break; + case Version.DRAFT_01: + case Version.DRAFT_02: + await w.u8(this.priority); + break; + default: + unreachable(version); + } } - static async #decode(r: Reader): Promise { - const priority = await r.u8(); - return new SubscribeUpdate(priority); + static async #decode(r: Reader, version: Version): Promise { + switch (version) { + case Version.DRAFT_03: { + const priority = await r.u8(); + const ordered = await r.bool(); + const maxLatency = await r.u53(); + const startGroup = await r.u53(); + const endGroup = await r.u53(); + return new SubscribeUpdate({ + priority, + ordered, + maxLatency, + startGroup: startGroup > 0 ? startGroup - 1 : undefined, + endGroup: endGroup > 0 ? endGroup - 1 : undefined, + }); + } + case Version.DRAFT_01: + case Version.DRAFT_02: + return new SubscribeUpdate({ priority: await r.u8() }); + default: + unreachable(version); + } } - async encode(w: Writer): Promise { - return Message.encode(w, this.#encode.bind(this)); + async encode(w: Writer, version: Version): Promise { + return Message.encode(w, (w) => this.#encode(w, version)); } - static async decode(r: Reader): Promise { - return Message.decode(r, SubscribeUpdate.#decode); + static async decode(r: Reader, version: Version): Promise { + return Message.decode(r, (r) => SubscribeUpdate.#decode(r, version)); } - static async decodeMaybe(r: Reader): Promise { - return Message.decodeMaybe(r, SubscribeUpdate.#decode); + static async decodeMaybe(r: Reader, version: Version): Promise { + return Message.decodeMaybe(r, (r) => SubscribeUpdate.#decode(r, version)); } } @@ -37,78 +85,268 @@ export class Subscribe { broadcast: Path.Valid; track: string; priority: number; + ordered: boolean; + maxLatency: number; - constructor(id: bigint, broadcast: Path.Valid, track: string, priority: number) { - this.id = id; - this.broadcast = broadcast; - this.track = track; - this.priority = priority; + startGroup?: number; + endGroup?: number; + + constructor(props: { + id: bigint; + broadcast: Path.Valid; + track: string; + priority: number; + ordered?: boolean; + maxLatency?: number; + startGroup?: number; + endGroup?: number; + }) { + this.id = props.id; + this.broadcast = props.broadcast; + this.track = props.track; + this.priority = props.priority; + this.ordered = props.ordered ?? false; + this.maxLatency = props.maxLatency ?? 0; + this.startGroup = props.startGroup; + this.endGroup = props.endGroup; } - async #encode(w: Writer) { + async #encode(w: Writer, version: Version) { await w.u62(this.id); await w.string(this.broadcast); await w.string(this.track); await w.u8(this.priority); + + switch (version) { + case Version.DRAFT_03: + await w.bool(this.ordered); + await w.u53(this.maxLatency); + await w.u53(this.startGroup !== undefined ? this.startGroup + 1 : 0); + await w.u53(this.endGroup !== undefined ? this.endGroup + 1 : 0); + break; + case Version.DRAFT_01: + case Version.DRAFT_02: + break; + default: + unreachable(version); + } } - static async #decode(r: Reader): Promise { + static async #decode(r: Reader, version: Version): Promise { const id = await r.u62(); const broadcast = Path.from(await r.string()); const track = await r.string(); const priority = await r.u8(); - return new Subscribe(id, broadcast, track, priority); + + switch (version) { + case Version.DRAFT_03: { + const ordered = await r.bool(); + const maxLatency = await r.u53(); + const startGroup = await r.u53(); + const endGroup = await r.u53(); + return new Subscribe({ + id, + broadcast, + track, + priority, + ordered, + maxLatency, + startGroup: startGroup > 0 ? startGroup - 1 : undefined, + endGroup: endGroup > 0 ? endGroup - 1 : undefined, + }); + } + case Version.DRAFT_01: + case Version.DRAFT_02: + return new Subscribe({ id, broadcast, track, priority }); + default: + unreachable(version); + } } - async encode(w: Writer): Promise { - return Message.encode(w, this.#encode.bind(this)); + async encode(w: Writer, version: Version): Promise { + return Message.encode(w, (w) => this.#encode(w, version)); } - static async decode(r: Reader): Promise { - return Message.decode(r, Subscribe.#decode); + static async decode(r: Reader, version: Version): Promise { + return Message.decode(r, (r) => Subscribe.#decode(r, version)); } } export class SubscribeOk { - // The version - readonly version: Version; - priority?: number; + priority: number; + ordered: boolean; + maxLatency: number; + startGroup?: number; + endGroup?: number; - constructor({ version, priority = undefined }: { version: Version; priority?: number }) { - this.version = version; + constructor({ + priority = 0, + ordered = true, + maxLatency = 0, + startGroup = undefined, + endGroup = undefined, + }: { + priority?: number; + ordered?: boolean; + maxLatency?: number; + startGroup?: number; + endGroup?: number; + }) { this.priority = priority; + this.ordered = ordered; + this.maxLatency = maxLatency; + this.startGroup = startGroup; + this.endGroup = endGroup; } - async #encode(w: Writer) { - if (this.version === Version.DRAFT_02) { - // noop - } else if (this.version === Version.DRAFT_01) { - await w.u8(this.priority ?? 0); - } else { - const version: never = this.version; - throw new Error(`unsupported version: ${version}`); + async #encode(w: Writer, version: Version) { + switch (version) { + case Version.DRAFT_03: + await w.u8(this.priority); + await w.bool(this.ordered); + await w.u53(this.maxLatency); + await w.u53(this.startGroup !== undefined ? this.startGroup + 1 : 0); + await w.u53(this.endGroup !== undefined ? this.endGroup + 1 : 0); + break; + case Version.DRAFT_02: + // noop + break; + case Version.DRAFT_01: + await w.u8(this.priority ?? 0); + break; + default: + unreachable(version); } } static async #decode(version: Version, r: Reader): Promise { let priority: number | undefined; - if (version === Version.DRAFT_02) { - // noop - } else if (version === Version.DRAFT_01) { - priority = await r.u8(); - } else { - const v: never = version; - throw new Error(`unsupported version: ${v}`); + let ordered: boolean | undefined; + let maxLatency: number | undefined; + let startGroup: number | undefined; + let endGroup: number | undefined; + + switch (version) { + case Version.DRAFT_03: + priority = await r.u8(); + ordered = await r.bool(); + maxLatency = await r.u53(); + startGroup = await r.u53(); + endGroup = await r.u53(); + break; + case Version.DRAFT_02: + // noop + break; + case Version.DRAFT_01: + priority = await r.u8(); + break; + default: + unreachable(version); } - return new SubscribeOk({ version, priority }); + return new SubscribeOk({ + priority, + ordered, + maxLatency, + startGroup: startGroup !== undefined && startGroup > 0 ? startGroup - 1 : undefined, + endGroup: endGroup !== undefined && endGroup > 0 ? endGroup - 1 : undefined, + }); } - async encode(w: Writer): Promise { - return Message.encode(w, this.#encode.bind(this)); + async encode(w: Writer, version: Version): Promise { + return Message.encode(w, (w) => this.#encode(w, version)); } static async decode(r: Reader, version: Version): Promise { return Message.decode(r, SubscribeOk.#decode.bind(SubscribeOk, version)); } } + +/// Indicates that one or more groups have been dropped. +/// +/// Draft03 only. +export class SubscribeDrop { + sequence: number; + count: number; + error: number; + + constructor(sequence: number, count: number, error: number) { + this.sequence = sequence; + this.count = count; + this.error = error; + } + + async #encode(w: Writer) { + await w.u53(this.sequence); + await w.u53(this.count); + await w.u53(this.error); + } + + static async #decode(r: Reader): Promise { + return new SubscribeDrop(await r.u53(), await r.u53(), await r.u53()); + } + + async encode(w: Writer): Promise { + return Message.encode(w, this.#encode.bind(this)); + } + + static async decode(r: Reader): Promise { + return Message.decode(r, SubscribeDrop.#decode); + } +} + +/** + * A response message on the subscribe stream. + * + * In Draft03, each response is prefixed with a type discriminator: + * - 0x0 for SUBSCRIBE_OK + * - 0x1 for SUBSCRIBE_DROP + * + * SUBSCRIBE_OK must be the first message on the response stream. + */ +export type SubscribeResponse = { ok: SubscribeOk } | { drop: SubscribeDrop }; + +export async function encodeSubscribeResponse(w: Writer, resp: SubscribeResponse, version: Version): Promise { + switch (version) { + case Version.DRAFT_03: + if ("ok" in resp) { + await w.u53(0x0); + await resp.ok.encode(w, version); + } else { + await w.u53(0x1); + await resp.drop.encode(w); + } + break; + case Version.DRAFT_01: + case Version.DRAFT_02: + if ("ok" in resp) { + await resp.ok.encode(w, version); + } else { + throw new Error("subscribe drop not supported for this version"); + } + break; + default: + unreachable(version); + } +} + +export async function decodeSubscribeResponse(r: Reader, version: Version): Promise { + switch (version) { + case Version.DRAFT_03: { + const typ = await r.u53(); + switch (typ) { + case 0x0: + return { ok: await SubscribeOk.decode(r, version) }; + case 0x1: + return { drop: await SubscribeDrop.decode(r) }; + default: + throw new Error(`unknown subscribe response type: ${typ}`); + } + } + case Version.DRAFT_01: + case Version.DRAFT_02: + return { ok: await SubscribeOk.decode(r, version) }; + default: + unreachable(version); + } +} diff --git a/js/lite/src/lite/subscriber.ts b/js/lite/src/lite/subscriber.ts index 4fe755dad..7f0fc0728 100644 --- a/js/lite/src/lite/subscriber.ts +++ b/js/lite/src/lite/subscriber.ts @@ -8,8 +8,8 @@ import { error } from "../util/error.ts"; import { Announce, AnnounceInit, AnnounceInterest } from "./announce.ts"; import type { Group as GroupMessage } from "./group.ts"; import { StreamId } from "./stream.ts"; -import { Subscribe, SubscribeOk } from "./subscribe.ts"; -import type { Version } from "./version.ts"; +import { decodeSubscribeResponse, Subscribe } from "./subscribe.ts"; +import { Version } from "./version.ts"; /** * Handles subscribing to broadcasts and managing their lifecycle. @@ -55,19 +55,31 @@ export class Subscriber { await stream.writer.u53(StreamId.Announce); await msg.encode(stream.writer); - // First, receive ANNOUNCE_INIT - const init = await AnnounceInit.decode(stream.reader); - - // Process initial announcements - for (const suffix of init.suffixes) { - const path = Path.join(prefix, suffix); - console.debug(`announced: broadcast=${path} active=true`); - announced.append({ path, active: true }); + switch (this.version) { + case Version.DRAFT_01: + case Version.DRAFT_02: { + // Receive ANNOUNCE_INIT first + const init = await AnnounceInit.decode(stream.reader, this.version); + + // Process initial announcements + for (const suffix of init.suffixes) { + const path = Path.join(prefix, suffix); + console.debug(`announced: broadcast=${path} active=true`); + announced.append({ path, active: true }); + } + break; + } + case Version.DRAFT_03: + // Draft03: no AnnounceInit, initial state comes via Announce messages. + break; } - // Then receive updates + // Receive announce updates (for Draft03, this includes initial state) for (;;) { - const announce = await Promise.race([Announce.decodeMaybe(stream.reader), announced.closed]); + const announce = await Promise.race([ + Announce.decodeMaybe(stream.reader, this.version), + announced.closed, + ]); if (!announce) break; if (announce instanceof Error) throw announce; @@ -111,14 +123,18 @@ export class Subscriber { console.debug(`subscribe start: id=${id} broadcast=${broadcast} track=${request.track.name}`); - const msg = new Subscribe(id, broadcast, request.track.name, request.priority); + const msg = new Subscribe({ id, broadcast, track: request.track.name, priority: request.priority }); const stream = await Stream.open(this.#quic); await stream.writer.u53(StreamId.Subscribe); - await msg.encode(stream.writer); + await msg.encode(stream.writer, this.version); try { - await SubscribeOk.decode(stream.reader, this.version); + // The first response MUST be a SUBSCRIBE_OK. + const resp = await decodeSubscribeResponse(stream.reader, this.version); + if (!("ok" in resp)) { + throw new Error("first subscribe response must be SUBSCRIBE_OK"); + } console.debug(`subscribe ok: id=${id} broadcast=${broadcast} track=${request.track.name}`); await Promise.race([stream.reader.closed, request.track.closed]); diff --git a/js/lite/src/lite/version.ts b/js/lite/src/lite/version.ts index 06d64bfe5..152892310 100644 --- a/js/lite/src/lite/version.ts +++ b/js/lite/src/lite/version.ts @@ -1,6 +1,7 @@ export const Version = { DRAFT_01: 0xff0dad01, DRAFT_02: 0xff0dad02, + DRAFT_03: 0xff0dad03, } as const; export type Version = (typeof Version)[keyof typeof Version]; @@ -8,3 +9,6 @@ export type Version = (typeof Version)[keyof typeof Version]; /// The WebTransport subprotocol identifier for moq-lite. /// Version negotiation still happens via SETUP when this is used. export const ALPN = "moql"; + +/// The ALPN string for Draft03, which uses ALPN-based version negotiation. +export const ALPN_03 = "moq-lite-03"; diff --git a/rs/moq-lite/src/client.rs b/rs/moq-lite/src/client.rs index 3862c5492..71a059309 100644 --- a/rs/moq-lite/src/client.rs +++ b/rs/moq-lite/src/client.rs @@ -46,20 +46,33 @@ impl Client { // If ALPN was used to negotiate the version, use the appropriate encoding. // Default to IETF 14 if no ALPN was used and we'll negotiate the version later. let (encoding, supported) = match session.protocol() { - Some(p) if p == ietf::ALPN_16 => ( + Some(ietf::ALPN_16) => ( Version::Ietf(ietf::Version::Draft16), vec![ietf::Version::Draft16.into()], ), - Some(p) if p == ietf::ALPN_15 => ( + Some(ietf::ALPN_15) => ( Version::Ietf(ietf::Version::Draft15), vec![ietf::Version::Draft15.into()], ), - Some(p) if p == ietf::ALPN_14 => ( + Some(ietf::ALPN_14) => ( Version::Ietf(ietf::Version::Draft14), vec![ietf::Version::Draft14.into()], ), - Some(p) if p == lite::ALPN => (Version::Ietf(ietf::Version::Draft14), NEGOTIATED.to_vec()), - None => (Version::Ietf(ietf::Version::Draft14), NEGOTIATED.to_vec()), + Some(lite::ALPN_03) => { + // Starting with draft-03, there's no more SETUP control stream. + lite::start( + session.clone(), + None, + self.publish.clone(), + self.consume.clone(), + lite::Version::Draft03, + )?; + + tracing::debug!(version = ?lite::Version::Draft03, "connected"); + + return Ok(Session::new(session)); + } + Some(lite::ALPN) | None => (Version::Ietf(ietf::Version::Draft14), NEGOTIATED.to_vec()), Some(p) => return Err(Error::UnknownAlpn(p.to_string())), }; @@ -97,12 +110,11 @@ impl Client { let stream = stream.with_version(version); lite::start( session.clone(), - stream, + Some(stream), self.publish.clone(), self.consume.clone(), version, - ) - .await?; + )?; } Version::Ietf(version) => { // Decode the parameters to get the initial request ID. @@ -122,8 +134,7 @@ impl Client { self.publish.clone(), self.consume.clone(), version, - ) - .await?; + )?; } } diff --git a/rs/moq-lite/src/coding/decode.rs b/rs/moq-lite/src/coding/decode.rs index 8467380fa..13d12152e 100644 --- a/rs/moq-lite/src/coding/decode.rs +++ b/rs/moq-lite/src/coding/decode.rs @@ -135,3 +135,19 @@ impl Decode for Cow<'_, str> { Ok(Cow::Owned(s)) } } + +impl Decode for Option { + fn decode(r: &mut R, version: V) -> Result { + match u64::decode(r, version)? { + 0 => Ok(None), + value => Ok(Some(value - 1)), + } + } +} + +impl Decode for std::time::Duration { + fn decode(r: &mut R, version: V) -> Result { + let value = u64::decode(r, version)?; + Ok(Self::from_millis(value)) + } +} diff --git a/rs/moq-lite/src/coding/encode.rs b/rs/moq-lite/src/coding/encode.rs index 3fc4fb69a..e7cf13f9f 100644 --- a/rs/moq-lite/src/coding/encode.rs +++ b/rs/moq-lite/src/coding/encode.rs @@ -94,3 +94,19 @@ impl Encode for Cow<'_, str> { w.put(self.as_bytes()); } } + +impl Encode for Option { + fn encode(&self, w: &mut W, version: V) { + match self { + Some(value) => (value + 1).encode(w, version), + None => 0u64.encode(w, version), + } + } +} + +impl Encode for std::time::Duration { + fn encode(&self, w: &mut W, version: V) { + // TODO Make encoding fallable. + (self.as_millis() as u64).encode(w, version) + } +} diff --git a/rs/moq-lite/src/ietf/session.rs b/rs/moq-lite/src/ietf/session.rs index 698db97f9..bf53a27cd 100644 --- a/rs/moq-lite/src/ietf/session.rs +++ b/rs/moq-lite/src/ietf/session.rs @@ -6,7 +6,7 @@ use crate::{ use super::{Publisher, Subscriber}; -pub(crate) async fn start( +pub(crate) fn start( session: S, setup: Stream, request_id_max: RequestId, diff --git a/rs/moq-lite/src/lite/announce.rs b/rs/moq-lite/src/lite/announce.rs index 8f0e55ca5..f6fccd951 100644 --- a/rs/moq-lite/src/lite/announce.rs +++ b/rs/moq-lite/src/lite/announce.rs @@ -14,34 +14,47 @@ pub enum Announce<'a> { Active { #[cfg_attr(feature = "serde", serde(borrow))] suffix: Path<'a>, + hops: u64, }, Ended { #[cfg_attr(feature = "serde", serde(borrow))] suffix: Path<'a>, + hops: u64, }, } impl Message for Announce<'_> { fn decode_msg(r: &mut R, version: Version) -> Result { - Ok(match AnnounceStatus::decode(r, version)? { - AnnounceStatus::Active => Self::Active { - suffix: Path::decode(r, version)?, - }, - AnnounceStatus::Ended => Self::Ended { - suffix: Path::decode(r, version)?, - }, + let status = AnnounceStatus::decode(r, version)?; + let suffix = Path::decode(r, version)?; + let hops = match version { + Version::Draft03 => u64::decode(r, version)?, + Version::Draft01 | Version::Draft02 => 0, + }; + + Ok(match status { + AnnounceStatus::Active => Self::Active { suffix, hops }, + AnnounceStatus::Ended => Self::Ended { suffix, hops }, }) } fn encode_msg(&self, w: &mut W, version: Version) { match self { - Self::Active { suffix } => { + Self::Active { suffix, hops } => { AnnounceStatus::Active.encode(w, version); suffix.encode(w, version); + match version { + Version::Draft03 => hops.encode(w, version), + Version::Draft01 | Version::Draft02 => {} + } } - Self::Ended { suffix } => { + Self::Ended { suffix, hops } => { AnnounceStatus::Ended.encode(w, version); suffix.encode(w, version); + match version { + Version::Draft03 => hops.encode(w, version), + Version::Draft01 | Version::Draft02 => {} + } } } } @@ -87,6 +100,8 @@ impl Encode for AnnounceStatus { } /// Sent after setup to communicate the initially announced paths. +/// +/// Used by Draft01/Draft02 only. Draft03 uses individual Announce messages instead. #[derive(Clone, Debug, PartialEq, Eq)] #[cfg_attr(feature = "serde", derive(serde::Serialize, serde::Deserialize))] pub struct AnnounceInit<'a> { @@ -97,6 +112,11 @@ pub struct AnnounceInit<'a> { impl Message for AnnounceInit<'_> { fn decode_msg(r: &mut R, version: Version) -> Result { + match version { + Version::Draft01 | Version::Draft02 => {} + Version::Draft03 => unreachable!("announce init not supported for version: {:?}", version), + } + let count = u64::decode(r, version)?; // Don't allocate more than 1024 elements upfront @@ -110,6 +130,11 @@ impl Message for AnnounceInit<'_> { } fn encode_msg(&self, w: &mut W, version: Version) { + match version { + Version::Draft01 | Version::Draft02 => {} + Version::Draft03 => unreachable!("announce init not supported for version: {:?}", version), + } + (self.suffixes.len() as u64).encode(w, version); for path in &self.suffixes { path.encode(w, version); diff --git a/rs/moq-lite/src/lite/fetch.rs b/rs/moq-lite/src/lite/fetch.rs new file mode 100644 index 000000000..977d60afe --- /dev/null +++ b/rs/moq-lite/src/lite/fetch.rs @@ -0,0 +1,56 @@ +use std::borrow::Cow; + +use crate::{ + Path, + coding::{Decode, DecodeError, Encode}, + lite::{Message, Version}, +}; + +/// Sent by the subscriber to fetch a specific group from a track. +/// +/// Draft03 only. +#[allow(dead_code)] +#[derive(Clone, Debug)] +pub struct Fetch<'a> { + pub broadcast: Path<'a>, + pub track: Cow<'a, str>, + pub priority: u8, + pub group: u64, +} + +impl Message for Fetch<'_> { + fn decode_msg(r: &mut R, version: Version) -> Result { + match version { + Version::Draft01 | Version::Draft02 => { + unreachable!("fetch not supported for version: {:?}", version); + } + Version::Draft03 => {} + } + + let broadcast = Path::decode(r, version)?; + let track = Cow::::decode(r, version)?; + let priority = u8::decode(r, version)?; + let group = u64::decode(r, version)?; + + Ok(Self { + broadcast, + track, + priority, + group, + }) + } + + fn encode_msg(&self, w: &mut W, version: Version) { + match version { + Version::Draft01 | Version::Draft02 => { + unreachable!("fetch not supported for version: {:?}", version); + } + Version::Draft03 => {} + } + + self.broadcast.encode(w, version); + self.track.encode(w, version); + self.priority.encode(w, version); + self.group.encode(w, version); + } +} diff --git a/rs/moq-lite/src/lite/info.rs b/rs/moq-lite/src/lite/info.rs index c004cd01d..07d416870 100644 --- a/rs/moq-lite/src/lite/info.rs +++ b/rs/moq-lite/src/lite/info.rs @@ -10,6 +10,11 @@ pub struct SessionInfo { impl Message for SessionInfo { fn decode_msg(r: &mut R, version: Version) -> Result { + match version { + Version::Draft01 | Version::Draft02 => {} + Version::Draft03 => unreachable!("session info not supported for version: {:?}", version), + } + let bitrate = match u64::decode(r, version)? { 0 => None, bitrate => Some(bitrate), @@ -19,6 +24,11 @@ impl Message for SessionInfo { } fn encode_msg(&self, w: &mut W, version: Version) { + match version { + Version::Draft01 | Version::Draft02 => {} + Version::Draft03 => unreachable!("session info not supported for version: {:?}", version), + } + self.bitrate.unwrap_or(0).encode(w, version); } } diff --git a/rs/moq-lite/src/lite/mod.rs b/rs/moq-lite/src/lite/mod.rs index 37db57de1..454caf84f 100644 --- a/rs/moq-lite/src/lite/mod.rs +++ b/rs/moq-lite/src/lite/mod.rs @@ -5,11 +5,13 @@ //! Specification: [] mod announce; +mod fetch; mod group; mod info; mod message; mod parameters; mod priority; +mod probe; mod publisher; mod session; mod stream; @@ -18,10 +20,14 @@ mod subscriber; mod version; pub use announce::*; +#[allow(unused_imports)] +pub use fetch::*; pub use group::*; pub use info::*; pub use message::*; pub use parameters::*; +#[allow(unused_imports)] +pub use probe::*; use publisher::*; pub(super) use session::*; pub use stream::*; diff --git a/rs/moq-lite/src/lite/probe.rs b/rs/moq-lite/src/lite/probe.rs new file mode 100644 index 000000000..e67aff16a --- /dev/null +++ b/rs/moq-lite/src/lite/probe.rs @@ -0,0 +1,39 @@ +use crate::{ + coding::*, + lite::{Message, Version}, +}; + +/// Sent to probe the available bitrate. +/// +/// Draft03 only. +#[allow(dead_code)] +#[derive(Clone, Debug)] +pub struct Probe { + pub bitrate: u64, +} + +impl Message for Probe { + fn decode_msg(r: &mut R, version: Version) -> Result { + match version { + Version::Draft01 | Version::Draft02 => { + unreachable!("probe not supported for version: {:?}", version); + } + Version::Draft03 => {} + } + + let bitrate = u64::decode(r, version)?; + + Ok(Self { bitrate }) + } + + fn encode_msg(&self, w: &mut W, version: Version) { + match version { + Version::Draft01 | Version::Draft02 => { + unreachable!("probe not supported for version: {:?}", version); + } + Version::Draft03 => {} + } + + self.bitrate.encode(w, version); + } +} diff --git a/rs/moq-lite/src/lite/publisher.rs b/rs/moq-lite/src/lite/publisher.rs index 893122989..4abb5725b 100644 --- a/rs/moq-lite/src/lite/publisher.rs +++ b/rs/moq-lite/src/lite/publisher.rs @@ -60,8 +60,9 @@ impl Publisher { .consume_only(&[prefix.as_path()]) .ok_or(Error::Unauthorized)?; + let version = self.version; web_async::spawn(async move { - if let Err(err) = Self::run_announce(&mut stream, &mut origin, &prefix).await { + if let Err(err) = Self::run_announce(&mut stream, &mut origin, &prefix, version).await { match &err { Error::Cancel => { tracing::debug!(prefix = %origin.absolute(prefix), "announcing cancelled"); @@ -87,29 +88,38 @@ impl Publisher { stream: &mut Stream, origin: &mut OriginConsumer, prefix: impl AsPath, + version: Version, ) -> Result<(), Error> { let prefix = prefix.as_path(); - let mut init = Vec::new(); - // Send ANNOUNCE_INIT as the first message with all currently active paths - // We use `try_next()` to synchronously get the initial updates. - while let Some((path, active)) = origin.try_announced() { - let suffix = path.strip_prefix(&prefix).expect("origin returned invalid path"); + match version { + Version::Draft01 | Version::Draft02 => { + let mut init = Vec::new(); + + // Send ANNOUNCE_INIT as the first message with all currently active paths + // We use `try_next()` to synchronously get the initial updates. + while let Some((path, active)) = origin.try_announced() { + let suffix = path.strip_prefix(&prefix).expect("origin returned invalid path"); + + if active.is_some() { + tracing::debug!(broadcast = %origin.absolute(&path), "announce"); + init.push(suffix.to_owned()); + } else { + // A potential race. + tracing::debug!(broadcast = %origin.absolute(&path), "unannounce"); + init.retain(|path| path != &suffix); + } + } - if active.is_some() { - tracing::debug!(broadcast = %origin.absolute(&path), "announce"); - init.push(suffix.to_owned()); - } else { - // A potential race. - tracing::debug!(broadcast = %origin.absolute(&path), "unannounce"); - init.retain(|path| path != &suffix); + let announce_init = lite::AnnounceInit { suffixes: init }; + stream.writer.encode(&announce_init).await?; + } + Version::Draft03 => { + // No more announce init in Draft03. } } - let announce_init = lite::AnnounceInit { suffixes: init }; - stream.writer.encode(&announce_init).await?; - - // Flush any synchronously announced paths + // Send updates as they arrive. loop { tokio::select! { biased; @@ -121,11 +131,11 @@ impl Publisher { if active.is_some() { tracing::debug!(broadcast = %origin.absolute(&path), "announce"); - let msg = lite::Announce::Active { suffix }; + let msg = lite::Announce::Active { suffix, hops: 0 }; stream.writer.encode(&msg).await?; } else { tracing::debug!(broadcast = %origin.absolute(&path), "unannounce"); - let msg = lite::Announce::Ended { suffix }; + let msg = lite::Announce::Ended { suffix, hops: 0 }; stream.writer.encode(&msg).await?; } }, @@ -194,9 +204,13 @@ impl Publisher { let info = lite::SubscribeOk { priority: track.info.priority, + ordered: false, + max_latency: std::time::Duration::ZERO, + start_group: None, + end_group: None, }; - stream.writer.encode(&info).await?; + stream.writer.encode(&lite::SubscribeResponse::Ok(info)).await?; tokio::select! { res = Self::run_track(session, track, subscribe, priority, version) => res?, diff --git a/rs/moq-lite/src/lite/session.rs b/rs/moq-lite/src/lite/session.rs index 79226e398..15523183e 100644 --- a/rs/moq-lite/src/lite/session.rs +++ b/rs/moq-lite/src/lite/session.rs @@ -1,5 +1,3 @@ -use tokio::sync::oneshot; - use crate::{ Error, OriginConsumer, OriginProducer, coding::Stream, @@ -8,10 +6,11 @@ use crate::{ use super::{Publisher, Subscriber}; -pub(crate) async fn start( +pub(crate) fn start( session: S, // The stream used to setup the session, after exchanging setup messages. - setup: Stream, + // NOTE: No longer used in draft-03. + setup: Option>, // We will publish any local broadcasts from this origin. publish: Option, // We will consume any remote broadcasts, inserting them into this origin. @@ -22,13 +21,11 @@ pub(crate) async fn start( let publisher = Publisher::new(session.clone(), publish, version); let subscriber = Subscriber::new(session.clone(), subscribe, version); - let init = oneshot::channel(); - web_async::spawn(async move { let res = tokio::select! { - res = run_session(setup) => res, + Err(res) = run_session(setup) => Err(res), res = publisher.run() => res, - res = subscriber.run(init.0) => res, + res = subscriber.run() => res, }; match res { @@ -47,17 +44,15 @@ pub(crate) async fn start( } }); - // Wait until receiving the initial announcements to prevent some race conditions. - // Otherwise, `consume()` might return not found if we don't wait long enough, so just wait. - // If the announce stream fails or is closed, this will return an error instead of hanging. - // TODO return a better error - init.1.await.map_err(|_| Error::Cancel)?; - Ok(()) } // TODO do something useful with this -async fn run_session(mut stream: Stream) -> Result<(), Error> { - while let Some(_info) = stream.reader.decode_maybe::().await? {} - Err(Error::Cancel) +async fn run_session(stream: Option>) -> Result<(), Error> { + if let Some(mut stream) = stream { + while let Some(_info) = stream.reader.decode_maybe::().await? {} + return Err(Error::Cancel); + } + + Ok(()) } diff --git a/rs/moq-lite/src/lite/stream.rs b/rs/moq-lite/src/lite/stream.rs index 281ac20b3..4528c5cae 100644 --- a/rs/moq-lite/src/lite/stream.rs +++ b/rs/moq-lite/src/lite/stream.rs @@ -8,6 +8,8 @@ pub enum ControlType { Session = 0, Announce = 1, Subscribe = 2, + Fetch = 3, + Probe = 4, } impl Decode for ControlType { diff --git a/rs/moq-lite/src/lite/subscribe.rs b/rs/moq-lite/src/lite/subscribe.rs index 36052f2b8..b789a81cb 100644 --- a/rs/moq-lite/src/lite/subscribe.rs +++ b/rs/moq-lite/src/lite/subscribe.rs @@ -2,7 +2,7 @@ use std::borrow::Cow; use crate::{ Path, - coding::{Decode, DecodeError, Encode}, + coding::{Decode, DecodeError, Encode, Sizer}, lite::{Message, Version}, }; @@ -15,6 +15,10 @@ pub struct Subscribe<'a> { pub broadcast: Path<'a>, pub track: Cow<'a, str>, pub priority: u8, + pub ordered: bool, + pub max_latency: std::time::Duration, + pub start_group: Option, + pub end_group: Option, } impl Message for Subscribe<'_> { @@ -24,11 +28,26 @@ impl Message for Subscribe<'_> { let track = Cow::::decode(r, version)?; let priority = u8::decode(r, version)?; + let (ordered, max_latency, start_group, end_group) = match version { + Version::Draft03 => { + let ordered = u8::decode(r, version)? != 0; + let max_latency = std::time::Duration::decode(r, version)?; + let start_group = Option::::decode(r, version)?; + let end_group = Option::::decode(r, version)?; + (ordered, max_latency, start_group, end_group) + } + Version::Draft01 | Version::Draft02 => (false, std::time::Duration::ZERO, None, None), + }; + Ok(Self { id, broadcast, track, priority, + ordered, + max_latency, + start_group, + end_group, }) } @@ -37,28 +56,247 @@ impl Message for Subscribe<'_> { self.broadcast.encode(w, version); self.track.encode(w, version); self.priority.encode(w, version); + + match version { + Version::Draft03 => { + (self.ordered as u8).encode(w, version); + self.max_latency.encode(w, version); + self.start_group.encode(w, version); + self.end_group.encode(w, version); + } + Version::Draft01 | Version::Draft02 => {} + } } } #[derive(Clone, Debug)] pub struct SubscribeOk { pub priority: u8, + pub ordered: bool, + pub max_latency: std::time::Duration, + pub start_group: Option, + pub end_group: Option, } impl Message for SubscribeOk { fn encode_msg(&self, w: &mut W, version: Version) { - if version == Version::Draft01 { - self.priority.encode(w, version); + match version { + Version::Draft03 => { + self.priority.encode(w, version); + (self.ordered as u8).encode(w, version); + self.max_latency.encode(w, version); + self.start_group.encode(w, version); + self.end_group.encode(w, version); + } + Version::Draft01 => { + self.priority.encode(w, version); + } + Version::Draft02 => {} + } + } + + fn decode_msg(r: &mut R, version: Version) -> Result { + match version { + Version::Draft03 => { + let priority = u8::decode(r, version)?; + let ordered = u8::decode(r, version)? != 0; + let max_latency = std::time::Duration::decode(r, version)?; + let start_group = Option::::decode(r, version)?; + let end_group = Option::::decode(r, version)?; + + Ok(Self { + priority, + ordered, + max_latency, + start_group, + end_group, + }) + } + Version::Draft01 => Ok(Self { + priority: u8::decode(r, version)?, + ordered: false, + max_latency: std::time::Duration::ZERO, + start_group: None, + end_group: None, + }), + Version::Draft02 => Ok(Self { + priority: 0, + ordered: false, + max_latency: std::time::Duration::ZERO, + start_group: None, + end_group: None, + }), } } +} + +/// Sent by the subscriber to update subscription parameters. +/// +/// Draft03 only. +#[allow(dead_code)] +#[derive(Clone, Debug)] +pub struct SubscribeUpdate { + pub priority: u8, + pub ordered: bool, + pub max_latency: std::time::Duration, + pub start_group: Option, + pub end_group: Option, +} +impl Message for SubscribeUpdate { fn decode_msg(r: &mut R, version: Version) -> Result { - let priority = if version == Version::Draft01 { - u8::decode(r, version)? - } else { - 0 + match version { + Version::Draft01 | Version::Draft02 => { + unreachable!("subscribe update not supported for version: {:?}", version); + } + Version::Draft03 => {} + } + + let priority = u8::decode(r, version)?; + let ordered = u8::decode(r, version)? != 0; + let max_latency = std::time::Duration::decode(r, version)?; + let start_group = match u64::decode(r, version)? { + 0 => None, + group => Some(group - 1), }; + let end_group = match u64::decode(r, version)? { + 0 => None, + group => Some(group - 1), + }; + + Ok(Self { + priority, + ordered, + max_latency, + start_group, + end_group, + }) + } + + fn encode_msg(&self, w: &mut W, version: Version) { + match version { + Version::Draft01 | Version::Draft02 => { + unreachable!("subscribe update not supported for version: {:?}", version); + } + Version::Draft03 => {} + } + + self.priority.encode(w, version); + (self.ordered as u8).encode(w, version); + self.max_latency.encode(w, version); + + match self.start_group { + Some(start_group) => (start_group + 1).encode(w, version), + None => 0u64.encode(w, version), + } - Ok(Self { priority }) + match self.end_group { + Some(end_group) => (end_group + 1).encode(w, version), + None => 0u64.encode(w, version), + } + } +} + +/// Indicates that one or more groups have been dropped. +/// +/// Draft03 only. +#[derive(Clone, Debug)] +pub struct SubscribeDrop { + pub sequence: u64, + pub count: u64, + pub error: u64, +} + +impl Message for SubscribeDrop { + fn decode_msg(r: &mut R, version: Version) -> Result { + match version { + Version::Draft01 | Version::Draft02 => { + unreachable!("subscribe drop not supported for version: {:?}", version); + } + Version::Draft03 => {} + } + + Ok(Self { + sequence: u64::decode(r, version)?, + count: u64::decode(r, version)?, + error: u64::decode(r, version)?, + }) + } + + fn encode_msg(&self, w: &mut W, version: Version) { + match version { + Version::Draft01 | Version::Draft02 => { + unreachable!("subscribe drop not supported for version: {:?}", version); + } + Version::Draft03 => {} + } + + self.sequence.encode(w, version); + self.count.encode(w, version); + self.error.encode(w, version); + } +} + +/// A response message on the subscribe stream. +/// +/// In Draft03, each response is prefixed with a type discriminator: +/// - 0x0 for SUBSCRIBE_OK +/// - 0x1 for SUBSCRIBE_DROP +/// +/// SUBSCRIBE_OK must be the first message on the response stream. +#[derive(Clone, Debug)] +pub enum SubscribeResponse { + Ok(SubscribeOk), + Drop(SubscribeDrop), +} + +impl Encode for SubscribeResponse { + fn encode(&self, w: &mut W, version: Version) { + match version { + Version::Draft03 => match self { + Self::Ok(ok) => { + 0u64.encode(w, version); + // Write size-prefixed body using Message trait + let mut sizer = Sizer::default(); + Message::encode_msg(ok, &mut sizer, version); + sizer.size.encode(w, version); + Message::encode_msg(ok, w, version); + } + Self::Drop(drop) => { + 1u64.encode(w, version); + let mut sizer = Sizer::default(); + Message::encode_msg(drop, &mut sizer, version); + sizer.size.encode(w, version); + Message::encode_msg(drop, w, version); + } + }, + Version::Draft01 | Version::Draft02 => match self { + Self::Ok(ok) => { + let mut sizer = Sizer::default(); + Message::encode_msg(ok, &mut sizer, version); + sizer.size.encode(w, version); + Message::encode_msg(ok, w, version); + } + Self::Drop(_) => { + unreachable!("subscribe drop not supported for version: {:?}", version); + } + }, + } + } +} + +impl Decode for SubscribeResponse { + fn decode(buf: &mut B, version: Version) -> Result { + match version { + Version::Draft03 => { + let typ = u64::decode(buf, version)?; + match typ { + 0 => Ok(Self::Ok(SubscribeOk::decode(buf, version)?)), + 1 => Ok(Self::Drop(SubscribeDrop::decode(buf, version)?)), + _ => Err(DecodeError::InvalidMessage(typ)), + } + } + Version::Draft01 | Version::Draft02 => Ok(Self::Ok(SubscribeOk::decode(buf, version)?)), + } } } diff --git a/rs/moq-lite/src/lite/subscriber.rs b/rs/moq-lite/src/lite/subscriber.rs index 9ab5cf41b..76e151e20 100644 --- a/rs/moq-lite/src/lite/subscriber.rs +++ b/rs/moq-lite/src/lite/subscriber.rs @@ -11,7 +11,6 @@ use crate::{ model::BroadcastProducer, }; -use tokio::sync::oneshot; use web_async::Lock; #[derive(Clone)] @@ -35,10 +34,9 @@ impl Subscriber { } } - /// Send a signal when the subscriber is initialized. - pub async fn run(self, init: oneshot::Sender<()>) -> Result<(), Error> { + pub async fn run(self) -> Result<(), Error> { tokio::select! { - Err(err) = self.clone().run_announce(init) => Err(err), + Err(err) = self.clone().run_announce() => Err(err), res = self.run_uni() => res, } } @@ -72,10 +70,9 @@ impl Subscriber { Ok(()) } - async fn run_announce(mut self, init: oneshot::Sender<()>) -> Result<(), Error> { + async fn run_announce(mut self) -> Result<(), Error> { if self.origin.is_none() { // Don't do anything if there's no origin configured. - let _ = init.send(()); return Ok(()); } @@ -91,19 +88,24 @@ impl Subscriber { let mut producers = HashMap::new(); - let msg: lite::AnnounceInit = stream.reader.decode().await?; - for path in msg.suffixes { - self.start_announce(path, &mut producers)?; + match self.version { + Version::Draft01 | Version::Draft02 => { + let msg: lite::AnnounceInit = stream.reader.decode().await?; + for path in msg.suffixes { + self.start_announce(path, &mut producers)?; + } + } + Version::Draft03 => { + // Draft03: no AnnounceInit, initial state comes via Announce messages. + } } - let _ = init.send(()); - while let Some(announce) = stream.reader.decode_maybe::().await? { match announce { - lite::Announce::Active { suffix: path } => { + lite::Announce::Active { suffix: path, .. } => { self.start_announce(path, &mut producers)?; } - lite::Announce::Ended { suffix: path } => { + lite::Announce::Ended { suffix: path, .. } => { tracing::debug!(broadcast = %self.log_path(&path), "unannounced"); // Close the producer. @@ -177,6 +179,10 @@ impl Subscriber { broadcast: broadcast.to_owned(), track: (&track.info.name).into(), priority: track.info.priority, + ordered: true, + max_latency: std::time::Duration::ZERO, + start_group: None, + end_group: None, }; tracing::info!(id, broadcast = %self.log_path(&broadcast), track = %track.info.name, "subscribe started"); @@ -222,10 +228,13 @@ impl Subscriber { ) -> Result<(), Error> { stream.writer.encode(&msg).await?; - // TODO use the response correctly populate the track info - let _info: lite::SubscribeOk = stream.reader.decode().await?; + // The first response MUST be a SUBSCRIBE_OK. + let resp: lite::SubscribeResponse = stream.reader.decode().await?; + let lite::SubscribeResponse::Ok(_info) = resp else { + return Err(Error::ProtocolViolation); + }; - // Wait until the stream is closed + // TODO handle additional SUBSCRIBE_OK and SUBSCRIBE_DROP messages. stream.reader.closed().await?; Ok(()) diff --git a/rs/moq-lite/src/lite/version.rs b/rs/moq-lite/src/lite/version.rs index 62e462f8d..a30e1f1de 100644 --- a/rs/moq-lite/src/lite/version.rs +++ b/rs/moq-lite/src/lite/version.rs @@ -6,11 +6,15 @@ use crate::coding; /// In the future we'll use ALPN instead. pub const ALPN: &str = "moql"; +/// The ALPN string for Draft03, which uses ALPN-based version negotiation. +pub const ALPN_03: &str = "moq-lite-03"; + #[derive(Clone, Copy, PartialEq, Eq, Hash, Debug)] #[repr(u64)] pub enum Version { Draft01 = 0xff0dad01, Draft02 = 0xff0dad02, + Draft03 = 0xff0dad03, } impl TryFrom for Version { @@ -21,6 +25,8 @@ impl TryFrom for Version { Ok(Self::Draft01) } else if value == Self::Draft02.coding() { Ok(Self::Draft02) + } else if value == Self::Draft03.coding() { + Ok(Self::Draft03) } else { Err(()) } diff --git a/rs/moq-lite/src/server.rs b/rs/moq-lite/src/server.rs index 6789d377d..def979da9 100644 --- a/rs/moq-lite/src/server.rs +++ b/rs/moq-lite/src/server.rs @@ -44,20 +44,33 @@ impl Server { } let (encoding, supported) = match session.protocol() { - Some(p) if p == ietf::ALPN_16 => ( + Some(ietf::ALPN_16) => ( Version::Ietf(ietf::Version::Draft16), vec![ietf::Version::Draft16.into()], ), - Some(p) if p == ietf::ALPN_15 => ( + Some(ietf::ALPN_15) => ( Version::Ietf(ietf::Version::Draft15), vec![ietf::Version::Draft15.into()], ), - Some(p) if p == ietf::ALPN_14 => ( + Some(ietf::ALPN_14) => ( Version::Ietf(ietf::Version::Draft14), vec![ietf::Version::Draft14.into()], ), - Some(p) if p == lite::ALPN => (Version::Ietf(ietf::Version::Draft14), NEGOTIATED.to_vec()), - None => (Version::Ietf(ietf::Version::Draft14), NEGOTIATED.to_vec()), + Some(lite::ALPN_03) => { + // Starting with draft-03, there's no more SETUP control stream. + lite::start( + session.clone(), + None, + self.publish.clone(), + self.consume.clone(), + lite::Version::Draft03, + )?; + + tracing::debug!(version = ?lite::Version::Draft03, "connected"); + + return Ok(Session::new(session)); + } + Some(lite::ALPN) | None => (Version::Ietf(ietf::Version::Draft14), NEGOTIATED.to_vec()), Some(p) => return Err(Error::UnknownAlpn(p.to_string())), }; @@ -97,12 +110,11 @@ impl Server { let stream = stream.with_version(version); lite::start( session.clone(), - stream, + Some(stream), self.publish.clone(), self.consume.clone(), version, - ) - .await?; + )?; } Version::Ietf(version) => { // Decode the client's parameters to get their max request ID. @@ -119,8 +131,7 @@ impl Server { self.publish.clone(), self.consume.clone(), version, - ) - .await?; + )?; } }; diff --git a/rs/moq-lite/src/setup.rs b/rs/moq-lite/src/setup.rs index a0f4109d8..e845449ab 100644 --- a/rs/moq-lite/src/setup.rs +++ b/rs/moq-lite/src/setup.rs @@ -28,6 +28,7 @@ impl Client { Version::Ietf(ietf::Version::Draft14) | Version::Lite(lite::Version::Draft02) | Version::Lite(lite::Version::Draft01) => self.versions.encode(w, v), + Version::Lite(lite::Version::Draft03) => unreachable!("draft-03 uses no setup message"), }; w.put_slice(&self.parameters); } @@ -46,6 +47,7 @@ impl Decode for Client { u16::decode(r, v)? as usize } Version::Lite(lite::Version::Draft02 | lite::Version::Draft01) => u64::decode(r, v)? as usize, + Version::Lite(lite::Version::Draft03) => unreachable!("draft-03 uses no setup message"), }; if r.remaining() < size { @@ -62,6 +64,7 @@ impl Decode for Client { Version::Ietf(ietf::Version::Draft14) | Version::Lite(lite::Version::Draft02) | Version::Lite(lite::Version::Draft01) => coding::Versions::decode(&mut msg, v)?, + Version::Lite(lite::Version::Draft03) => unreachable!("draft-03 uses no setup message"), }; Ok(Self { @@ -85,6 +88,7 @@ impl Encode for Client { u16::try_from(size).expect("message too large for u16").encode(w, v) } Version::Lite(lite::Version::Draft02 | lite::Version::Draft01) => (size as u64).encode(w, v), + Version::Lite(lite::Version::Draft03) => unreachable!("draft-03 uses no setup message"), } self.encode_inner(w, v); } @@ -109,6 +113,7 @@ impl Server { Version::Ietf(ietf::Version::Draft14) | Version::Lite(lite::Version::Draft02) | Version::Lite(lite::Version::Draft01) => self.version.encode(w, v), + Version::Lite(lite::Version::Draft03) => unreachable!("draft-03 uses no setup message"), }; w.put_slice(&self.parameters); } @@ -127,6 +132,7 @@ impl Encode for Server { u16::try_from(size).expect("message too large for u16").encode(w, v) } Version::Lite(lite::Version::Draft02 | lite::Version::Draft01) => (size as u64).encode(w, v), + Version::Lite(lite::Version::Draft03) => unreachable!("draft-03 uses no setup message"), } self.encode_inner(w, v); @@ -145,6 +151,7 @@ impl Decode for Server { u16::decode(r, v)? as usize } Version::Lite(lite::Version::Draft02 | lite::Version::Draft01) => u64::decode(r, v)? as usize, + Version::Lite(lite::Version::Draft03) => unreachable!("draft-03 uses no setup message"), }; if r.remaining() < size { @@ -157,6 +164,7 @@ impl Decode for Server { Version::Ietf(ietf::Version::Draft14) | Version::Lite(lite::Version::Draft02) | Version::Lite(lite::Version::Draft01) => coding::Version::decode(&mut msg, v)?, + Version::Lite(lite::Version::Draft03) => unreachable!("draft-03 uses no setup message"), }; Ok(Self { diff --git a/rs/moq-lite/src/version.rs b/rs/moq-lite/src/version.rs index aefc36034..91a137321 100644 --- a/rs/moq-lite/src/version.rs +++ b/rs/moq-lite/src/version.rs @@ -10,7 +10,7 @@ pub(crate) const NEGOTIATED: [Version; 3] = [ ]; /// ALPN strings for supported versions. -pub const ALPNS: &[&str] = &[lite::ALPN, ietf::ALPN_14, ietf::ALPN_15, ietf::ALPN_16]; +pub const ALPNS: &[&str] = &[lite::ALPN_03, lite::ALPN, ietf::ALPN_16, ietf::ALPN_15, ietf::ALPN_14]; // A combination of ietf::Version and lite::Version. #[derive(Debug, Clone, Copy, PartialEq, Eq, Hash)]