Audio group duration + Frame/Producer API cleanup + decoder warm-up#1007
Audio group duration + Frame/Producer API cleanup + decoder warm-up#1007
Conversation
- Remove `keyframe: bool` from `Frame` — group boundaries are now an explicit `OrderedProducer` API, not a frame field. - Add `OrderedProducer::keyframe()` for explicit group boundaries (video). - Add `OrderedProducer::with_max_group_duration()` for automatic ~100ms audio groups instead of one QUIC stream per frame. - Replace `OrderedConsumer::read() -> Frame` with `-> OrderedFrame` that inlines timestamp/payload and adds group sequence + frame index. - Update all importers: opus/aac use auto-grouping, avc3/hev1/av01 use explicit `keyframe()`, fmp4 keeps inline group management. - Rename JS `maxLatency` → `groupDuration` in publish audio encoder. - Add decoder warm-up in JS watch: first audio group primes the decoder without emitting samples. Co-Authored-By: Claude Opus 4.6 <noreply@anthropic.com>
|
No actionable comments were generated in the recent review. 🎉 ℹ️ Recent review infoConfiguration used: Organization UI Review profile: CHILL Plan: Pro 📒 Files selected for processing (1)
🚧 Files skipped from review as they are similar to previous changes (1)
WalkthroughRenames Encoder's 🚥 Pre-merge checks | ✅ 3✅ Passed checks (3 passed)
✏️ Tip: You can configure your own custom pre-merge checks in the settings. ✨ Finishing Touches
🧪 Generate unit tests (beta)
Thanks for using CodeRabbit! It's free for OSS, and your support helps us grow. If you like it, consider giving us a shout-out. Comment |
There was a problem hiding this comment.
🧹 Nitpick comments (2)
js/publish/src/audio/encoder.ts (1)
17-29: Good rename frommaxLatencytogroupDurationfor semantic clarity.The new name better reflects the purpose: controlling the maximum duration of audio groups rather than a latency setting.
Minor: As per coding guidelines, consider using
interfaceinstead oftypefor defining object shapes in TypeScript:-export type EncoderProps = { +export interface EncoderProps { enabled?: boolean | Signal<boolean>; source?: Source | Signal<Source | undefined>; // ... -}; +},
🤖 Prompt for AI Agents
Verify each finding against the current code and only fix it if needed. In `@js/publish/src/audio/encoder.ts` around lines 17 - 29, Replace the exported type alias with an interface: change the declaration of EncoderProps from a "type" to an "interface" while keeping the same fields (enabled, source, muted, volume, groupDuration, container) and their exact types so code consuming EncoderProps (e.g., any references to EncoderProps in encoder.ts) remains compatible with the new interface form.rs/moq-mux/src/import/aac.rs (1)
113-117: Consider extracting the shared audio group duration constant.Both
opus.rsandaac.rsdefine identicalMAX_GROUP_DURATIONconstants (100ms). Consider extracting this to a shared location to ensure consistency and make it easier to tune audio grouping behavior across all audio codecs.// In a shared module, e.g., rs/moq-mux/src/import/mod.rs or a constants module: pub const AUDIO_MAX_GROUP_DURATION: hang::container::Timestamp = hang::container::Timestamp::from_millis_unchecked(100);🤖 Prompt for AI Agents
Verify each finding against the current code and only fix it if needed. In `@rs/moq-mux/src/import/aac.rs` around lines 113 - 117, Both aac.rs and opus.rs define an identical MAX_GROUP_DURATION constant; extract it into a shared constant (e.g., AUDIO_MAX_GROUP_DURATION) in a common module (for example rs/moq-mux/src/import/mod.rs or a new constants module) and replace the local MAX_GROUP_DURATION definitions with references to that shared constant; update uses in functions like the OrderedProducer::with_max_group_duration call (and any other occurrences of MAX_GROUP_DURATION) to use the new AUDIO_MAX_GROUP_DURATION symbol so all codecs share the same 100ms value.
🤖 Prompt for all review comments with AI agents
Verify each finding against the current code and only fix it if needed.
Nitpick comments:
In `@js/publish/src/audio/encoder.ts`:
- Around line 17-29: Replace the exported type alias with an interface: change
the declaration of EncoderProps from a "type" to an "interface" while keeping
the same fields (enabled, source, muted, volume, groupDuration, container) and
their exact types so code consuming EncoderProps (e.g., any references to
EncoderProps in encoder.ts) remains compatible with the new interface form.
In `@rs/moq-mux/src/import/aac.rs`:
- Around line 113-117: Both aac.rs and opus.rs define an identical
MAX_GROUP_DURATION constant; extract it into a shared constant (e.g.,
AUDIO_MAX_GROUP_DURATION) in a common module (for example
rs/moq-mux/src/import/mod.rs or a new constants module) and replace the local
MAX_GROUP_DURATION definitions with references to that shared constant; update
uses in functions like the OrderedProducer::with_max_group_duration call (and
any other occurrences of MAX_GROUP_DURATION) to use the new
AUDIO_MAX_GROUP_DURATION symbol so all codecs share the same 100ms value.
ℹ️ Review info
Configuration used: Organization UI
Review profile: CHILL
Plan: Pro
📒 Files selected for processing (14)
js/publish/src/audio/encoder.tsjs/watch/src/audio/decoder.tsrs/hang/examples/subscribe.rsrs/hang/examples/video.rsrs/hang/src/container/consumer.rsrs/hang/src/container/frame.rsrs/hang/src/container/producer.rsrs/libmoq/src/consume.rsrs/moq-mux/src/import/aac.rsrs/moq-mux/src/import/av01.rsrs/moq-mux/src/import/avc3.rsrs/moq-mux/src/import/fmp4.rsrs/moq-mux/src/import/hev1.rsrs/moq-mux/src/import/opus.rs
💤 Files with no reviewable changes (2)
- rs/hang/src/container/frame.rs
- rs/moq-mux/src/import/fmp4.rs
- Remove redundant `pending_keyframe` flag — `keyframe()` already sets `self.group` to None which triggers new group creation in `write()`. - Move auto-close check to after writing so the group closes immediately once the next frame is estimated to exceed the limit, with a safety check at the start in case the estimate was wrong. - Replace `prev_timestamp` with `group_frames` counter to estimate average frame duration, which handles B-frames correctly. - Convert `with_max_group_duration` to a builder-style setter on `new()` for easier future extension. Co-Authored-By: Claude Opus 4.6 <noreply@anthropic.com>
There was a problem hiding this comment.
Actionable comments posted: 1
🤖 Prompt for all review comments with AI agents
Verify each finding against the current code and only fix it if needed.
Inline comments:
In `@rs/hang/src/container/producer.rs`:
- Around line 72-118: The OrderedProducer can leave an active group unfinished
when dropped; implement Drop for OrderedProducer to check self.group (and
self.group.is_some()) and call finish() on the taken group to finalize it;
specifically, in Drop for OrderedProducer, if let Some(mut group) =
self.group.take() { group.finish().ok(); } (ensure you reference the
OrderedProducer struct, its group field, and call GroupProducer::finish or the
group's finish() method) so the final group is always closed during teardown.
ℹ️ Review info
Configuration used: Organization UI
Review profile: CHILL
Plan: Pro
📒 Files selected for processing (3)
rs/hang/src/container/producer.rsrs/moq-mux/src/import/aac.rsrs/moq-mux/src/import/opus.rs
🚧 Files skipped from review as they are similar to previous changes (1)
- rs/moq-mux/src/import/aac.rs
| pub fn write(&mut self, frame: Frame) -> Result<(), Error> { | ||
| tracing::trace!(?frame, "write frame"); | ||
|
|
||
| if frame.keyframe { | ||
| if let Some(mut group) = self.group.take() { | ||
| group.finish()?; | ||
| } | ||
|
|
||
| // Make sure this frame's timestamp doesn't go backwards relative to the last keyframe. | ||
| // We can't really enforce this for frames generally because b-frames suck. | ||
| if let Some(keyframe) = self.keyframe | ||
| && frame.timestamp < keyframe | ||
| { | ||
| return Err(Error::TimestampBackwards); | ||
| } | ||
|
|
||
| self.keyframe = Some(frame.timestamp); | ||
| // Safety check: close the group if this frame already exceeds the max duration. | ||
| if let (Some(max_duration), Some(group_start)) = (self.max_group_duration, self.group_start) | ||
| && self.group.is_some() | ||
| && frame.timestamp.checked_sub(group_start).unwrap_or(Timestamp::ZERO) >= max_duration | ||
| && let Some(mut group) = self.group.take() | ||
| { | ||
| group.finish()?; | ||
| } | ||
|
|
||
| let mut group = match self.group.take() { | ||
| Some(group) => group, | ||
| None if frame.keyframe => self.track.append_group()?, | ||
| // The first frame must be a keyframe. | ||
| None => return Err(Error::MissingKeyframe), | ||
| }; | ||
| // Start a new group if needed (first frame, after keyframe(), or after auto-close). | ||
| if self.group.is_none() { | ||
| let group = self.track.append_group()?; | ||
| self.group = Some(group); | ||
| self.group_start = Some(frame.timestamp); | ||
| self.group_frames = 0; | ||
| } | ||
|
|
||
| // Encode the frame. | ||
| let mut group = self.group.take().expect("group should exist"); | ||
| frame.encode(&mut group)?; | ||
|
|
||
| self.group.replace(group); | ||
|
|
||
| Ok(()) | ||
| } | ||
| self.group_frames += 1; | ||
|
|
||
| // Estimate the next frame's timestamp and close the group now if it would exceed the limit. | ||
| // avg_frame_duration = elapsed / group_frames | ||
| // estimated_next_elapsed = elapsed + avg_frame_duration | ||
| // Rearranged to avoid division: elapsed * (frames + 1) >= max_duration * frames | ||
| if let (Some(max_duration), Some(group_start)) = (self.max_group_duration, self.group_start) { | ||
| let elapsed = frame | ||
| .timestamp | ||
| .checked_sub(group_start) | ||
| .unwrap_or(Timestamp::ZERO) | ||
| .as_micros(); | ||
| let max = max_duration.as_micros(); | ||
|
|
||
| if elapsed * (self.group_frames as u128 + 1) >= max * self.group_frames as u128 | ||
| && let Some(mut group) = self.group.take() | ||
| { | ||
| group.finish()?; | ||
| } | ||
| } | ||
|
|
||
| /// An explicit way to end the current group. | ||
| /// | ||
| /// This is useful to flush when you know the next frame will be a keyframe. | ||
| pub fn flush(&mut self) -> Result<(), Error> { | ||
| self.group.take().ok_or(Error::MissingKeyframe)?.finish()?; | ||
| Ok(()) |
There was a problem hiding this comment.
🧩 Analysis chain
🏁 Script executed:
#!/bin/bash
set -euo pipefail
# Find moq_lite::GroupProducer definition and Drop implementation
echo "=== Checking moq_lite::GroupProducer structure and Drop impl ==="
fd -e rs -p 'moq.lite' | head -20
rg -A 10 'pub struct GroupProducer' --type rs 2>/dev/null || echo "Pattern not found in search"
# Check for Drop impl on GroupProducer
rg -B 2 -A 10 'impl Drop for GroupProducer' --type rs 2>/dev/null || echo "No Drop impl found for GroupProducer"
# Check finish() method signature
rg -B 2 -A 5 'fn finish\(' --type rs 2>/dev/null | head -50Repository: moq-dev/moq
Length of output: 813
🏁 Script executed:
#!/bin/bash
set -euo pipefail
# Verify current OrderedProducer teardown/finalization paths
echo "=== Checking OrderedProducer definition and Drop impl ==="
rg -B 5 -A 30 'pub struct OrderedProducer' rs/hang/src/container/producer.rs
echo ""
echo "=== Checking for existing Drop impl on OrderedProducer ==="
rg 'impl Drop for OrderedProducer' rs/hang/src/container/producer.rs || echo "No Drop impl found"
echo ""
echo "=== Checking for keyframe() and finish() calls in OrderedProducer ==="
rg -n 'fn keyframe|group\.finish' rs/hang/src/container/producer.rsRepository: moq-dev/moq
Length of output: 1703
🏁 Script executed:
#!/bin/bash
set -euo pipefail
# Check importers/callers that may rely on implicit teardown
echo "=== Checking OrderedProducer usage in importers and callers ==="
find rs -name '*.rs' -type f | xargs rg -l 'OrderedProducer' | head -10
echo ""
echo "=== Sample usages of OrderedProducer ==="
rg -B 3 -A 5 'OrderedProducer::new\(|OrderedProducer {' rs --type rs | head -80Repository: moq-dev/moq
Length of output: 530
🏁 Script executed:
#!/bin/bash
set -euo pipefail
# Search for GroupProducer definition to understand lifecycle
echo "=== GroupProducer definition ==="
rg -B 3 -A 20 'pub struct GroupProducer' --type rs
echo ""
echo "=== Check if GroupProducer implements Drop or has finish() ==="
rg 'pub fn finish|impl.*Drop' --type rs | grep -i groupRepository: moq-dev/moq
Length of output: 131
🏁 Script executed:
#!/bin/bash
set -euo pipefail
# Check actual usage patterns of OrderedProducer in callers
echo "=== OrderedProducer usage in opus.rs ==="
rg -B 2 -A 10 'OrderedProducer' rs/moq-mux/src/import/opus.rs
echo ""
echo "=== Check if any caller explicitly finalizes ==="
rg -B 5 'OrderedProducer' rs/moq-mux/src/import/aac.rs | head -40Repository: moq-dev/moq
Length of output: 1517
🏁 Script executed:
#!/bin/bash
set -euo pipefail
# Check if finish() is a required method for GroupProducer
echo "=== GroupProducer finish method ==="
rg -B 5 -A 8 'fn finish' rs/moq-lite/src --type rs
echo ""
echo "=== Check moq_lite module structure ==="
fd -e rs moq-lite | xargs grep -l 'pub struct GroupProducer'Repository: moq-dev/moq
Length of output: 134
🏁 Script executed:
#!/bin/bash
set -euo pipefail
# List moq-lite structure
echo "=== moq-lite source files ==="
fd . rs/moq-lite/src -type f -name '*.rs' | head -20
# Find GroupProducer definition with simpler search
echo ""
echo "=== Find GroupProducer ==="
rg 'struct GroupProducer' rs/Repository: moq-dev/moq
Length of output: 272
🏁 Script executed:
#!/bin/bash
set -euo pipefail
# Look at actual moq-lite track module
echo "=== Check moq-lite track-related files ==="
find rs/moq-lite/src -name '*.rs' -exec grep -l 'GroupProducer' {} \;
echo ""
echo "=== Look for GroupProducer in track/group files ==="
find rs/moq-lite/src -name '*track*' -o -name '*group*' | xargs cat 2>/dev/null | head -100Repository: moq-dev/moq
Length of output: 2867
🏁 Script executed:
#!/bin/bash
set -euo pipefail
# Check how OrderedProducer is actually dropped/used in examples and tests
echo "=== Check video.rs example for OrderedProducer usage ==="
rg -B 3 -A 10 'OrderedProducer' rs/hang/examples/video.rs
echo ""
echo "=== Check if OrderedProducer lifetime is managed at stream level ==="
rg -B 5 'struct Opus|struct Aac' rs/moq-mux/src/import/opus.rs | head -30Repository: moq-dev/moq
Length of output: 1057
🏁 Script executed:
#!/bin/bash
set -euo pipefail
# Search for GroupProducer definition directly
echo "=== Finding GroupProducer definition ==="
find rs -name '*.rs' -exec grep -l 'pub struct GroupProducer' {} \;
echo ""
echo "=== Look at track.rs for GroupProducer ==="
head -200 rs/moq-lite/src/model/track.rs | tail -100Repository: moq-dev/moq
Length of output: 2813
🏁 Script executed:
#!/bin/bash
set -euo pipefail
# Check full track.rs file size and content
echo "=== track.rs file size and GroupProducer ==="
wc -l rs/moq-lite/src/model/track.rs
echo ""
echo "=== Search for pub struct in track.rs ==="
grep -n 'pub struct' rs/moq-lite/src/model/track.rs
echo ""
echo "=== Extract GroupProducer struct ==="
sed -n '/pub struct GroupProducer/,/^}/p' rs/moq-lite/src/model/track.rs | head -50Repository: moq-dev/moq
Length of output: 298
🏁 Script executed:
#!/bin/bash
set -euo pipefail
# Check if GroupProducer has finish() method and Drop impl
echo "=== Search for finish() in track.rs ==="
grep -n 'fn finish' rs/moq-lite/src/model/track.rs
echo ""
echo "=== Search for Drop impl in track.rs ==="
grep -n 'impl Drop\|impl.*Drop' rs/moq-lite/src/model/track.rs
echo ""
echo "=== Full content of track.rs to understand GroupProducer ==="
wc -l rs/moq-lite/src/model/track.rs
cat rs/moq-lite/src/model/track.rs | head -300Repository: moq-dev/moq
Length of output: 182
🏁 Script executed:
#!/bin/bash
set -euo pipefail
# Check GroupProducer definition in group.rs
echo "=== GroupProducer in group.rs ==="
wc -l rs/moq-lite/src/model/group.rs
echo ""
echo "=== Full group.rs content (small file) ==="
cat rs/moq-lite/src/model/group.rsRepository: moq-dev/moq
Length of output: 6260
🏁 Script executed:
#!/bin/bash
set -euo pipefail
# Verify if finish() is mandatory for GroupProducer
echo "=== Check GroupProducer::finish() signature and behavior ==="
grep -n 'pub fn finish\|fn finish' rs/moq-lite/src/model/group.rs
echo ""
echo "=== Check for Drop impl on GroupProducer ==="
grep -n 'impl Drop\|Drop for\|impl.*Drop' rs/moq-lite/src/model/group.rs
echo ""
echo "=== Check what finish() does ==="
sed -n '/pub fn finish/,/^[[:space:]]*}/p' rs/moq-lite/src/model/group.rsRepository: moq-dev/moq
Length of output: 207
🏁 Script executed:
#!/bin/bash
set -euo pipefail
# Check if there are any tests showing what happens with unfinished groups
echo "=== Search for tests or examples with OrderedProducer and unfinished groups ==="
rg -B 3 -A 5 'OrderedProducer|group.*finish' rs/hang/examples rs/hang/src --type rust | head -100
echo ""
echo "=== Check if video.rs example explicitly calls finish() ==="
cat rs/hang/examples/video.rs | grep -A 20 'OrderedProducer'Repository: moq-dev/moq
Length of output: 5714
Add Drop impl to explicitly finalize active groups on teardown.
OrderedProducer.group can remain open after write() returns, especially with duration-based grouping. If the producer is dropped before the next group boundary, the final group remains unfinished. GroupProducer has no Drop impl and relies on explicit finish() calls. Without finalization, the last group's frames become inaccessible to consumers—causing tail-frame loss and potential consumer hangs. The video.rs example and production decoders (opus.rs, aac.rs) all drop OrderedProducer implicitly, exposing this gap.
Proposed fix
+impl Drop for OrderedProducer {
+ fn drop(&mut self) {
+ if let Some(mut group) = self.group.take() {
+ if let Err(err) = group.finish() {
+ tracing::warn!(?err, "failed to finish active group on drop");
+ }
+ }
+ }
+}🤖 Prompt for AI Agents
Verify each finding against the current code and only fix it if needed.
In `@rs/hang/src/container/producer.rs` around lines 72 - 118, The OrderedProducer
can leave an active group unfinished when dropped; implement Drop for
OrderedProducer to check self.group (and self.group.is_some()) and call finish()
on the taken group to finalize it; specifically, in Drop for OrderedProducer, if
let Some(mut group) = self.group.take() { group.finish().ok(); } (ensure you
reference the OrderedProducer struct, its group field, and call
GroupProducer::finish or the group's finish() method) so the final group is
always closed during teardown.
Co-Authored-By: Claude Opus 4.6 <noreply@anthropic.com>
Summary
keyframefromFrame— group boundaries are now an explicitOrderedProducerAPI instead of a frame field that was ignored byFrame::encode().OrderedProducer::keyframe()for explicit group boundaries (video importers call this before writing IDR/keyframes).OrderedProducer::with_max_group_duration()for automatic ~100ms audio groups — opus/aac importers no longer create one QUIC stream per frame (~50 streams/sec → ~10 streams/sec).OrderedFramereturn type fromOrderedConsumer::read()with inlined timestamp/payload plus group sequence number and frame index (0 = keyframe).maxLatency→groupDurationin the publish audio encoder for clarity.Test plan
just checkpasses (all Rust + JS/TS linting and type checks)🤖 Generated with Claude Code