Conversation
WalkthroughThis pull request updates project dependencies and refactors the media handling architecture. Dependency versions are bumped for hang, moq-lite, and moq-native, with a new moq-mux dependency introduced. The sink implementation migrates from hang::import::Fmp4 to moq_mux::import::Fmp4 and restructures the connection workflow to use an Origin/Broadcast pipeline with catalog publication. The source implementation shifts to an explicit origin-consumer pattern, updates catalog and track type paths, replaces track.read_frame() calls with track.read(), and introduces OrderedConsumer for track handling. TLS configuration is standardized across both modules. 🚥 Pre-merge checks | ✅ 1 | ❌ 2❌ Failed checks (1 warning, 1 inconclusive)
✅ Passed checks (1 passed)
✏️ Tip: You can configure your own custom pre-merge checks in the settings. ✨ Finishing Touches
🧪 Generate unit tests (beta)
Thanks for using CodeRabbit! It's free for OSS, and your support helps us grow. If you like it, consider giving us a shout-out. Comment |
There was a problem hiding this comment.
Actionable comments posted: 2
Caution
Some comments are outside the diff and can’t be posted inline due to platform limitations.
⚠️ Outside diff range comments (1)
src/sink/imp.rs (1)
188-195:⚠️ Potential issue | 🟠 MajorAvoid
expect()insetup()- propagate error instead.The
setup()method returnsanyhow::Result<()>, butexpect("failed to connect")on line 189 will panic on connection failure instead of returning a proper error. This breaks error handling consistency and could crash the application.Proposed fix to propagate the error
RUNTIME.block_on(async { - let _session = client.connect(url).await.expect("failed to connect"); + let _session = client.connect(url).await.context("failed to connect")?; let media = moq_mux::import::Fmp4::new(broadcast, catalog, Default::default()); let mut state = self.state.lock().unwrap(); state.media = Some(media); + + Ok::<_, anyhow::Error>(()) -}); +})?;🤖 Prompt for AI Agents
Verify each finding against the current code and only fix it if needed. In `@src/sink/imp.rs` around lines 188 - 195, The code currently panics on connection failure because setup() uses RUNTIME.block_on(...client.connect(...).await.expect(...)); change this to propagate the error through anyhow::Result by having the async block return anyhow::Result<()> and mapping the connect error (e.g. client.connect(url).await.map_err(|e| anyhow::anyhow!("failed to connect: {}", e))?) so the block returns Ok(()) on success, capture the block's Result (let res: anyhow::Result<()> = RUNTIME.block_on(...)) and then return res? from setup(); update references inside the block (session, media, state.media) to match the new flow so no expect() remains.
🧹 Nitpick comments (3)
src/source/imp.rs (3)
395-397: Task cancellation not implemented.The spawned tasks at lines 235 and 345 have no cancellation mechanism. When
cleanup()is called duringPausedToReadytransition, the tasks continue running, potentially causing resource leaks or errors when they try to push to removed pads.Consider using
tokio_util::sync::CancellationTokenor storingJoinHandles to abort tasks on cleanup.Would you like me to generate an implementation for proper task cancellation, or open an issue to track this?
🤖 Prompt for AI Agents
Verify each finding against the current code and only fix it if needed. In `@src/source/imp.rs` around lines 395 - 397, The cleanup() method currently does nothing while the background tasks spawned at the locations referenced (around the task spawns at lines 235 and 345) continue running; add a cancellation mechanism by storing either a shared tokio_util::sync::CancellationToken or the JoinHandle(s) for those spawned tasks as new fields on the same struct that defines cleanup(), ensure the tasks check the token (or are abortable) and/or are awaited/aborted on cleanup(), and update the task creation sites to clone/pass the token (or push handles into the struct) so cleanup() can cancel/abort and optionally join them before returning.
187-387: Consider extracting common track setup logic.The video (lines 187-280) and audio (lines 283-387) track handling share nearly identical structure:
- Iterate renditions
- Create track consumer and OrderedConsumer
- Build caps (codec-specific)
- Create pad, push events
- Spawn reader task
Extracting a helper function parameterized by track type and caps builder could reduce duplication.
🤖 Prompt for AI Agents
Verify each finding against the current code and only fix it if needed. In `@src/source/imp.rs` around lines 187 - 387, Extract the duplicated track setup into a helper (e.g., setup_renditions or add_renditions) that takes the rendition map, a caps-builder closure, the pad template name string ("video_%u" or "audio_%u"), and any codec-specific params; inside the helper reuse moq_lite::Track::new, broadcast.subscribe_track, hang::container::OrderedConsumer::new, create caps via the provided closure, create a srcpad from self.obj().element_class().pad_template(...), push StreamStart/Caps/Segment events, call self.obj().add_pad(...), and spawn the tokio::spawn loop that reads from OrderedConsumer and pushes gst::Buffer while preserving the existing timestamp/flags logic; update the existing video and audio blocks to call this helper with the appropriate caps-builder closures and pad template names so OrderedConsumer, subscribe_track, pad_template, add_pad, and the tokio::spawn reader logic are all centralized.
6-7: Consider using consistent lazy initialization.The code mixes
once_cell::sync::Lazy(line 6) withstd::sync::LazyLock(line 7).LazyLockis available in std since Rust 1.80, so you could use it consistently throughout or continue withonce_cellfor broader compatibility.🤖 Prompt for AI Agents
Verify each finding against the current code and only fix it if needed. In `@src/source/imp.rs` around lines 6 - 7, The file imports both once_cell::sync::Lazy and std::sync::LazyLock which mixes two lazy-init implementations; pick one and make imports/usages consistent: if the project targets Rust >=1.80 replace once_cell::sync::Lazy with std::sync::LazyLock (remove the once_cell import and update any Lazy usages to LazyLock), otherwise stick with once_cell::sync::Lazy (remove the std::sync::LazyLock import and rename any LazyLock usages to Lazy) so all lazy static/initializations (references to Lazy and LazyLock) use the same type.
🤖 Prompt for all review comments with AI agents
Verify each finding against the current code and only fix it if needed.
Inline comments:
In `@Cargo.toml`:
- Around line 22-25: The Cargo.toml currently pins external registry versions
for the hang and moq-native dependencies; change them to local path dependencies
per guidelines by replacing the version entries for the hang and moq-native
crates with path = "<relative-path-to-local-hang>" and path =
"<relative-path-to-local-moq-native>" (adjust the relative paths to your repo
layout) so Cargo uses the local packages instead of the registry versions.
In `@src/source/imp.rs`:
- Line 175: The local session returned by client.connect(url).await? (currently
assigned to _session) and the local broadcast/origin variables are being dropped
when setup() returns while spawned tasks (the long-lived playback tasks
referenced around the spawn calls) still depend on them; move the session handle
and the broadcast/origin resources into the imp struct as owned fields (e.g.,
add session: ClientSession and broadcast: BroadcastHandle or similar) and assign
them in setup() instead of using local vars so they live for the lifetime of the
playback tasks, and implement cleanup() to cancel or join those spawned tasks
and close the session/broadcast gracefully (use stored task handles or a
shutdown channel) so subscriptions stop cleanly.
---
Outside diff comments:
In `@src/sink/imp.rs`:
- Around line 188-195: The code currently panics on connection failure because
setup() uses RUNTIME.block_on(...client.connect(...).await.expect(...)); change
this to propagate the error through anyhow::Result by having the async block
return anyhow::Result<()> and mapping the connect error (e.g.
client.connect(url).await.map_err(|e| anyhow::anyhow!("failed to connect: {}",
e))?) so the block returns Ok(()) on success, capture the block's Result (let
res: anyhow::Result<()> = RUNTIME.block_on(...)) and then return res? from
setup(); update references inside the block (session, media, state.media) to
match the new flow so no expect() remains.
---
Nitpick comments:
In `@src/source/imp.rs`:
- Around line 395-397: The cleanup() method currently does nothing while the
background tasks spawned at the locations referenced (around the task spawns at
lines 235 and 345) continue running; add a cancellation mechanism by storing
either a shared tokio_util::sync::CancellationToken or the JoinHandle(s) for
those spawned tasks as new fields on the same struct that defines cleanup(),
ensure the tasks check the token (or are abortable) and/or are awaited/aborted
on cleanup(), and update the task creation sites to clone/pass the token (or
push handles into the struct) so cleanup() can cancel/abort and optionally join
them before returning.
- Around line 187-387: Extract the duplicated track setup into a helper (e.g.,
setup_renditions or add_renditions) that takes the rendition map, a caps-builder
closure, the pad template name string ("video_%u" or "audio_%u"), and any
codec-specific params; inside the helper reuse moq_lite::Track::new,
broadcast.subscribe_track, hang::container::OrderedConsumer::new, create caps
via the provided closure, create a srcpad from
self.obj().element_class().pad_template(...), push StreamStart/Caps/Segment
events, call self.obj().add_pad(...), and spawn the tokio::spawn loop that reads
from OrderedConsumer and pushes gst::Buffer while preserving the existing
timestamp/flags logic; update the existing video and audio blocks to call this
helper with the appropriate caps-builder closures and pad template names so
OrderedConsumer, subscribe_track, pad_template, add_pad, and the tokio::spawn
reader logic are all centralized.
- Around line 6-7: The file imports both once_cell::sync::Lazy and
std::sync::LazyLock which mixes two lazy-init implementations; pick one and make
imports/usages consistent: if the project targets Rust >=1.80 replace
once_cell::sync::Lazy with std::sync::LazyLock (remove the once_cell import and
update any Lazy usages to LazyLock), otherwise stick with once_cell::sync::Lazy
(remove the std::sync::LazyLock import and rename any LazyLock usages to Lazy)
so all lazy static/initializations (references to Lazy and LazyLock) use the
same type.
ℹ️ Review info
Configuration used: Organization UI
Review profile: CHILL
Plan: Pro
⛔ Files ignored due to path filters (1)
Cargo.lockis excluded by!**/*.lock
📒 Files selected for processing (3)
Cargo.tomlsrc/sink/imp.rssrc/source/imp.rs
| hang = "0.14.0" | ||
| moq-mux = "0.2.1" | ||
| moq-lite = "0.14.0" | ||
| moq-native = "0.13.1" |
There was a problem hiding this comment.
Coding guideline violation: Use local path dependencies for hang and moq-native.
The project guidelines specify that hang and moq-native should use local path dependencies instead of external repositories. The current configuration uses registry versions.
As per coding guidelines: "Use local path dependencies for hang and moq-native packages instead of external repositories"
Proposed fix to use local path dependencies
-hang = "0.14.0"
+hang = { path = "../hang" }
moq-mux = "0.2.1"
moq-lite = "0.14.0"
-moq-native = "0.13.1"
+moq-native = { path = "../moq-native" }Note: Adjust paths based on your actual local directory structure.
📝 Committable suggestion
‼️ IMPORTANT
Carefully review the code before committing. Ensure that it accurately replaces the highlighted code, contains no missing lines, and has no issues with indentation. Thoroughly test & benchmark the code to ensure it meets the requirements.
| hang = "0.14.0" | |
| moq-mux = "0.2.1" | |
| moq-lite = "0.14.0" | |
| moq-native = "0.13.1" | |
| hang = { path = "../hang" } | |
| moq-mux = "0.2.1" | |
| moq-lite = "0.14.0" | |
| moq-native = { path = "../moq-native" } |
🤖 Prompt for AI Agents
Verify each finding against the current code and only fix it if needed.
In `@Cargo.toml` around lines 22 - 25, The Cargo.toml currently pins external
registry versions for the hang and moq-native dependencies; change them to local
path dependencies per guidelines by replacing the version entries for the hang
and moq-native crates with path = "<relative-path-to-local-hang>" and path =
"<relative-path-to-local-moq-native>" (adjust the relative paths to your repo
layout) so Cargo uses the local packages instead of the registry versions.
src/source/imp.rs
Outdated
| let session = client.connect(url).await?; | ||
| let origin = moq_lite::Origin::produce(); | ||
| let _session = moq_lite::Session::connect(session, None, origin.producer).await?; | ||
| let _session = client.connect(url).await?; |
There was a problem hiding this comment.
🧩 Analysis chain
🏁 Script executed:
#!/bin/bash
# First, let's explore the repository structure and find the relevant code
find . -type f -name "*.rs" | head -20Repository: moq-dev/gstreamer
Length of output: 160
🏁 Script executed:
#!/bin/bash
# Search for the setup function and session usage
rg -n "let _session = client.connect" --type rust -B 5 -A 20Repository: moq-dev/gstreamer
Length of output: 2260
🏁 Script executed:
#!/bin/bash
# Check what the connect() function returns and if Session implements Drop
rg -nA 10 "pub async fn connect" --type rustRepository: moq-dev/gstreamer
Length of output: 43
🏁 Script executed:
#!/bin/bash
# Look for Session struct definition and Drop implementation
rg -n "struct Session|impl Drop" --type rust -B 2 -A 5Repository: moq-dev/gstreamer
Length of output: 43
🏁 Script executed:
#!/bin/bash
# Check if the broadcast is stored in the struct (self)
rg -n "self\.(broadcast|session|connection)" src/source/imp.rsRepository: moq-dev/gstreamer
Length of output: 43
🏁 Script executed:
#!/bin/bash
# Read the setup() function in source/imp.rs to understand the full context
cat -n src/source/imp.rs | head -400Repository: moq-dev/gstreamer
Length of output: 14605
🏁 Script executed:
#!/bin/bash
# Look for where setup() is called and what happens to the broadcast/origin_consumer
grep -n "setup\|origin_consumer\|broadcast" src/source/imp.rs | head -40Repository: moq-dev/gstreamer
Length of output: 1014
🏁 Script executed:
#!/bin/bash
# Check the struct definition to see what fields are stored
rg -n "struct State|pub struct|pub client" src/source/imp.rs -A 8 | head -50Repository: moq-dev/gstreamer
Length of output: 291
Keep session handle alive for the duration of playback.
_session is dropped when setup() returns, but the spawned tasks at lines 235 and 345 continue running indefinitely and depend on the underlying connection. Similarly, broadcast is a local variable that goes out of scope. If either the session or broadcast need to remain alive to serve track subscriptions, the spawned tasks will fail with read errors.
Store both _session and the broadcast/origin resources in the struct (or ensure their lifetimes are otherwise managed) so they persist while the playback tasks are active. Additionally, implement the cleanup() function to properly terminate spawned tasks.
🤖 Prompt for AI Agents
Verify each finding against the current code and only fix it if needed.
In `@src/source/imp.rs` at line 175, The local session returned by
client.connect(url).await? (currently assigned to _session) and the local
broadcast/origin variables are being dropped when setup() returns while spawned
tasks (the long-lived playback tasks referenced around the spawn calls) still
depend on them; move the session handle and the broadcast/origin resources into
the imp struct as owned fields (e.g., add session: ClientSession and broadcast:
BroadcastHandle or similar) and assign them in setup() instead of using local
vars so they live for the lifetime of the playback tasks, and implement
cleanup() to cancel or join those spawned tasks and close the session/broadcast
gracefully (use stored task handles or a shutdown channel) so subscriptions stop
cleanly.
No description provided.