Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
12 changes: 7 additions & 5 deletions js/lite/src/connection/connect.ts
Original file line number Diff line number Diff line change
Expand Up @@ -70,9 +70,6 @@ export async function connect(url: URL, props?: ConnectProps): Promise<Establish
websocketWon.add(url.toString());
}

// moq-rs currently requires the ROLE extension to be set.
const stream = await Stream.open(session);

// @ts-expect-error - TODO: add protocol to WebTransport
const protocol: string | undefined = session instanceof WebTransport ? session.protocol : undefined;
console.debug(url.toString(), "negotiated ALPN:", protocol ?? "(none)");
Expand All @@ -83,6 +80,10 @@ export async function connect(url: URL, props?: ConnectProps): Promise<Establish
setupVersion = Ietf.Version.DRAFT_16;
} else if (protocol === Ietf.ALPN.DRAFT_15) {
setupVersion = Ietf.Version.DRAFT_15;
} else if (protocol === Lite.ALPN_03) {
// moq-lite draft-03 doesn't use a session stream, so we return immediately.
console.debug(url.toString(), "moq-lite draft-03 session established");
return new Lite.Connection(url, session, Lite.Version.DRAFT_03, undefined);
} else if (protocol === Lite.ALPN || protocol === "" || protocol === undefined) {
// moq-lite ALPN (or no protocol) uses Draft14 encoding for SETUP,
// then negotiates the actual version via the SETUP message.
Expand All @@ -92,6 +93,7 @@ export async function connect(url: URL, props?: ConnectProps): Promise<Establish
}

// We're encoding 0x20 so it's backwards compatible with moq-transport-10+
const stream = await Stream.open(session);
await stream.writer.u53(Lite.StreamId.ClientCompat);

const encoder = new TextEncoder();
Expand Down Expand Up @@ -126,7 +128,7 @@ export async function connect(url: URL, props?: ConnectProps): Promise<Establish

if (Object.values(Lite.Version).includes(server.version as Lite.Version)) {
console.debug(url.toString(), "moq-lite session established");
return new Lite.Connection(url, session, stream, server.version as Lite.Version);
return new Lite.Connection(url, session, server.version as Lite.Version, stream);
} else if (Object.values(Ietf.Version).includes(server.version as Ietf.Version)) {
const maxRequestId = server.parameters.getVarint(Ietf.Parameter.MaxRequestId) ?? 0n;
console.debug(url.toString(), "moq-ietf session established, version:", server.version.toString(16));
Expand All @@ -153,7 +155,7 @@ async function connectWebTransport(
allowPooling: false,
congestionControl: "low-latency",
// @ts-expect-error - TODO: add protocols to WebTransportOptions
protocols: [Lite.ALPN, Ietf.ALPN.DRAFT_16, Ietf.ALPN.DRAFT_15],
protocols: [Lite.ALPN_03, Lite.ALPN, Ietf.ALPN.DRAFT_16, Ietf.ALPN.DRAFT_15],
...options,
};

Expand Down
7 changes: 3 additions & 4 deletions js/lite/src/ietf/publish.ts
Original file line number Diff line number Diff line change
@@ -1,5 +1,6 @@
import type * as Path from "../path.ts";
import type { Reader, Writer } from "../stream.ts";
import { unreachable } from "../util/error.ts";
import * as Message from "./message.ts";
import * as Namespace from "./namespace.ts";
import { MessageParameters, Parameters } from "./parameters.ts";
Expand Down Expand Up @@ -76,8 +77,7 @@ export class Publish {
await w.bool(this.forward);
await w.u53(0); // size of parameters
} else {
const _: never = version;
throw new Error(`unsupported version: ${_}`);
unreachable(version);
}
}

Expand Down Expand Up @@ -127,8 +127,7 @@ export class Publish {
forward,
});
} else {
const _: never = version;
throw new Error(`unsupported version: ${_}`);
unreachable(version);
}
}
}
Expand Down
5 changes: 2 additions & 3 deletions js/lite/src/ietf/publisher.ts
Original file line number Diff line number Diff line change
Expand Up @@ -4,7 +4,7 @@ import type { Group } from "../group.ts";
import * as Path from "../path.ts";
import { type Stream, Writer } from "../stream.ts";
import type { Track } from "../track.ts";
import { error } from "../util/error.ts";
import { error, unreachable } from "../util/error.ts";
import type * as Control from "./control.ts";
import { Frame, Group as GroupMessage } from "./object.ts";
import { PublishDone } from "./publish.ts";
Expand Down Expand Up @@ -113,8 +113,7 @@ export class Publisher {
});
await this.#control.write(errorMsg);
} else {
const version: never = this.#control.version;
throw new Error(`unsupported version: ${version}`);
unreachable(this.#control.version);
}

return;
Expand Down
13 changes: 5 additions & 8 deletions js/lite/src/ietf/setup.ts
Original file line number Diff line number Diff line change
@@ -1,4 +1,5 @@
import type { Reader, Writer } from "../stream.ts";
import { unreachable } from "../util/error.ts";
import * as Message from "./message.ts";
import { Parameters } from "./parameters.ts";
import { type IetfVersion, Version } from "./version.ts";
Expand Down Expand Up @@ -27,8 +28,7 @@ export class ClientSetup {
}
await this.parameters.encode(w, version);
} else {
const _: never = version;
throw new Error(`unsupported version: ${_}`);
unreachable(version);
}
}

Expand Down Expand Up @@ -59,8 +59,7 @@ export class ClientSetup {

return new ClientSetup({ versions: supportedVersions, parameters });
} else {
const _: never = version;
throw new Error(`unsupported version: ${_}`);
unreachable(version);
}
}

Expand Down Expand Up @@ -88,8 +87,7 @@ export class ServerSetup {
await w.u53(this.version);
await this.parameters.encode(w, version);
} else {
const _: never = version;
throw new Error(`unsupported version: ${_}`);
unreachable(version);
}
}

Expand All @@ -107,8 +105,7 @@ export class ServerSetup {
const parameters = await Parameters.decode(r, version);
return new ServerSetup({ version: selectedVersion, parameters });
} else {
const _: never = version;
throw new Error(`unsupported version: ${_}`);
unreachable(version);
}
}

Expand Down
13 changes: 5 additions & 8 deletions js/lite/src/ietf/subscribe.ts
Original file line number Diff line number Diff line change
@@ -1,5 +1,6 @@
import type * as Path from "../path.ts";
import type { Reader, Writer } from "../stream.ts";
import { unreachable } from "../util/error.ts";
import * as Message from "./message.ts";
import * as Namespace from "./namespace.ts";
import { MessageParameters, Parameters } from "./parameters.ts";
Expand Down Expand Up @@ -53,8 +54,7 @@ export class Subscribe {
await w.u53(0x2); // filter type = LargestObject
await w.u53(0); // no parameters
} else {
const _: never = version;
throw new Error(`unsupported version: ${_}`);
unreachable(version);
}
}

Expand Down Expand Up @@ -119,8 +119,7 @@ export class Subscribe {

return new Subscribe({ requestId, trackNamespace, trackName, subscriberPriority });
} else {
const _: never = version;
throw new Error(`unsupported version: ${_}`);
unreachable(version);
}
}
}
Expand Down Expand Up @@ -151,8 +150,7 @@ export class SubscribeOk {
await w.bool(false); // content exists = false
await w.u53(0); // no parameters
} else {
const _: never = version;
throw new Error(`unsupported version: ${_}`);
unreachable(version);
}
}

Expand Down Expand Up @@ -188,8 +186,7 @@ export class SubscribeOk {

await Parameters.decode(r, version); // ignore parameters
} else {
const _: never = version;
throw new Error(`unsupported version: ${_}`);
unreachable(version);
}

return new SubscribeOk({ requestId, trackAlias });
Expand Down
5 changes: 2 additions & 3 deletions js/lite/src/ietf/subscriber.ts
Original file line number Diff line number Diff line change
Expand Up @@ -4,7 +4,7 @@ import { Group } from "../group.ts";
import * as Path from "../path.ts";
import { type Reader, Stream } from "../stream.ts";
import type { Track } from "../track.ts";
import { error } from "../util/error.ts";
import { error, unreachable } from "../util/error.ts";
import type * as Control from "./control.ts";
import { Frame, type Group as GroupMessage } from "./object.ts";
import { type Publish, type PublishDone, PublishError } from "./publish.ts";
Expand Down Expand Up @@ -347,8 +347,7 @@ export class Subscriber {
});
await this.#control.write(err);
} else {
const version: never = this.#control.version;
throw new Error(`unsupported version: ${version}`);
unreachable(this.#control.version);
}
}

Expand Down
73 changes: 59 additions & 14 deletions js/lite/src/lite/announce.ts
Original file line number Diff line number Diff line change
@@ -1,37 +1,65 @@
import * as Path from "../path.ts";
import type { Reader, Writer } from "../stream.ts";
import { unreachable } from "../util/error.ts";
import * as Message from "./message.ts";
import { Version } from "./version.ts";

export class Announce {
suffix: Path.Valid;
active: boolean;
hops: number;

constructor(suffix: Path.Valid, active: boolean) {
this.suffix = suffix;
this.active = active;
constructor(props: { suffix: Path.Valid; active: boolean; hops?: number }) {
this.suffix = props.suffix;
this.active = props.active;
this.hops = props.hops ?? 0;
}
Comment on lines 7 to 16
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

⚠️ Potential issue | 🟡 Minor

Document Announce/hops semantics.

hops is a new public, draft-specific field; add a short doc comment to clarify meaning and default.

✍️ Suggested documentation
+/**
+ * Announces availability of a suffix.
+ * `hops` is Draft03-only and defaults to 0.
+ */
 export class Announce {
 	suffix: Path.Valid;
 	active: boolean;
-	hops: number;
+	/** Relay hop count (Draft03 only). */
+	hops: number;
As per coding guidelines, Document public APIs with clear docstrings or comments.
📝 Committable suggestion

‼️ IMPORTANT
Carefully review the code before committing. Ensure that it accurately replaces the highlighted code, contains no missing lines, and has no issues with indentation. Thoroughly test & benchmark the code to ensure it meets the requirements.

Suggested change
export class Announce {
suffix: Path.Valid;
active: boolean;
hops: number;
constructor(suffix: Path.Valid, active: boolean) {
this.suffix = suffix;
this.active = active;
constructor(props: { suffix: Path.Valid; active: boolean; hops?: number }) {
this.suffix = props.suffix;
this.active = props.active;
this.hops = props.hops ?? 0;
}
/**
* Announces availability of a suffix.
* `hops` is Draft03-only and defaults to 0.
*/
export class Announce {
suffix: Path.Valid;
active: boolean;
/** Relay hop count (Draft03 only). */
hops: number;
constructor(props: { suffix: Path.Valid; active: boolean; hops?: number }) {
this.suffix = props.suffix;
this.active = props.active;
this.hops = props.hops ?? 0;
}
🤖 Prompt for AI Agents
Verify each finding against the current code and only fix it if needed.

In `@js/lite/src/lite/announce.ts` around lines 7 - 16, Add a short doc comment on
the public Announce class and its hops property explaining that hops is a
draft-only integer representing the number of relay hops (or TTL) for this
announcement, that it defaults to 0 when omitted, and how consumers should
interpret/modify it; update the Announce class declaration and the hops field
(and optionally the constructor signature) to include this documentation so the
public API intent is clear (referencing Announce, hops, and the constructor that
sets hops).


async #encode(w: Writer) {
async #encode(w: Writer, version: Version) {
await w.bool(this.active);
await w.string(this.suffix);

switch (version) {
case Version.DRAFT_03:
await w.u53(this.hops);
break;
case Version.DRAFT_01:
case Version.DRAFT_02:
break;
default:
unreachable(version);
}
}

static async #decode(r: Reader): Promise<Announce> {
static async #decode(r: Reader, version: Version): Promise<Announce> {
const active = await r.bool();
const suffix = Path.from(await r.string());
return new Announce(suffix, active);

let hops = 0;
switch (version) {
case Version.DRAFT_03:
hops = await r.u53();
break;
case Version.DRAFT_01:
case Version.DRAFT_02:
break;
default:
unreachable(version);
}

return new Announce({ suffix, active, hops });
}

async encode(w: Writer): Promise<void> {
return Message.encode(w, this.#encode.bind(this));
async encode(w: Writer, version: Version): Promise<void> {
return Message.encode(w, (w) => this.#encode(w, version));
}

static async decode(r: Reader): Promise<Announce> {
return Message.decode(r, Announce.#decode);
static async decode(r: Reader, version: Version): Promise<Announce> {
return Message.decode(r, (r) => Announce.#decode(r, version));
}

static async decodeMaybe(r: Reader): Promise<Announce | undefined> {
return Message.decodeMaybe(r, Announce.#decode);
static async decodeMaybe(r: Reader, version: Version): Promise<Announce | undefined> {
return Message.decodeMaybe(r, (r) => Announce.#decode(r, version));
}
}

Expand Down Expand Up @@ -60,13 +88,28 @@ export class AnnounceInterest {
}
}

/// Sent after setup to communicate the initially announced paths.
///
/// Used by Draft01/Draft02 only. Draft03 uses individual Announce messages instead.
export class AnnounceInit {
suffixes: Path.Valid[];

constructor(paths: Path.Valid[]) {
this.suffixes = paths;
}

static #guard(version: Version) {
switch (version) {
case Version.DRAFT_01:
case Version.DRAFT_02:
break;
case Version.DRAFT_03:
throw new Error("announce init not supported for Draft03");
default:
unreachable(version);
}
}

async #encode(w: Writer) {
await w.u53(this.suffixes.length);
for (const path of this.suffixes) {
Expand All @@ -83,11 +126,13 @@ export class AnnounceInit {
return new AnnounceInit(suffixes);
}

async encode(w: Writer): Promise<void> {
async encode(w: Writer, version: Version): Promise<void> {
AnnounceInit.#guard(version);
return Message.encode(w, this.#encode.bind(this));
}

static async decode(r: Reader): Promise<AnnounceInit> {
static async decode(r: Reader, version: Version): Promise<AnnounceInit> {
AnnounceInit.#guard(version);
return Message.decode(r, AnnounceInit.#decode);
}
}
13 changes: 9 additions & 4 deletions js/lite/src/lite/connection.ts
Original file line number Diff line number Diff line change
Expand Up @@ -28,7 +28,7 @@ export class Connection implements Established {
#quic: WebTransport;

// Use to receive/send session messages.
#session: Stream;
#session?: Stream;

// Module for contributing tracks.
#publisher: Publisher;
Expand All @@ -47,7 +47,7 @@ export class Connection implements Established {
*
* @internal
*/
constructor(url: URL, quic: WebTransport, session: Stream, version: Version) {
constructor(url: URL, quic: WebTransport, version: Version, session?: Stream) {
this.url = url;
this.#quic = quic;
this.#session = session;
Expand Down Expand Up @@ -123,10 +123,15 @@ export class Connection implements Established {
}

async #runSession() {
if (!this.#session) {
// moq-lite draft-03 doesn't use a session stream.
return;
}

try {
// Receive messages until the connection is closed.
for (;;) {
const msg = await SessionInfo.decodeMaybe(this.#session.reader);
const msg = await SessionInfo.decodeMaybe(this.#session.reader, this.version);
if (!msg) break;
// TODO use the session info
}
Expand Down Expand Up @@ -162,7 +167,7 @@ export class Connection implements Established {
await this.#publisher.runAnnounce(msg, stream);
return;
} else if (typ === StreamId.Subscribe) {
const msg = await Subscribe.decode(stream.reader);
const msg = await Subscribe.decode(stream.reader, this.version);
await this.#publisher.runSubscribe(msg, stream);
return;
} else {
Expand Down
Loading
Loading