[RHIDP-11647] Add interrupted user query to conversation#1217
[RHIDP-11647] Add interrupted user query to conversation#1217Jdubrick wants to merge 1 commit intolightspeed-core:mainfrom
Conversation
Signed-off-by: Jordan Dubrick <jdubrick@redhat.com>
WalkthroughIntroduced per-stream interrupt callbacks and improved persistence of interrupted streaming requests. Modified streaming query interrupt handling to register callbacks that persist user queries and interrupted responses when streams are cancelled, with new constants and comprehensive test coverage. Changes
Sequence DiagramsequenceDiagram
participant Client
participant StreamQuery as Streaming<br/>Query Endpoint
participant Registry as StreamInterrupt<br/>Registry
participant Callback as Interrupt<br/>Callback
participant Persistence as Persistence<br/>Layer
Client->>StreamQuery: start_streaming_request()
StreamQuery->>Registry: register_stream(on_interrupt=callback)
Registry->>Registry: store ActiveStream with callback
Client-->>Client: cancel_request()
Client->>Registry: cancel_stream(request_id)
Registry->>Registry: retrieve on_interrupt callback
Registry->>Callback: schedule callback as async task
Registry-->>Client: return CANCELLED
Callback->>StreamQuery: _persist_interrupted_turn()
StreamQuery->>Persistence: append_turn_to_conversation()
StreamQuery->>Persistence: store_query_results(INTERRUPTED)
Persistence-->>StreamQuery: success
StreamQuery->>StreamQuery: yield interrupted_event
StreamQuery->>Registry: deregister_stream()
Estimated code review effort🎯 3 (Moderate) | ⏱️ ~20 minutes Possibly related PRs
Suggested reviewers
🚥 Pre-merge checks | ✅ 3✅ Passed checks (3 passed)
✏️ Tip: You can configure your own custom pre-merge checks in the settings. ✨ Finishing Touches
🧪 Generate unit tests (beta)
Thanks for using CodeRabbit! It's free for OSS, and your support helps us grow. If you like it, consider giving us a shout-out. Comment |
There was a problem hiding this comment.
🧹 Nitpick comments (4)
tests/unit/app/endpoints/test_streaming_query.py (2)
1558-1592: Test uses the real SingletonStreamInterruptRegistrydespite class-level mock fixture.This test directly instantiates
StreamInterruptRegistry()(line 1563), which returns the real Singleton — bypassing theisolate_stream_interrupt_registryautouse fixture that mocksget_stream_interrupt_registry. This is intentional since the test is exercising the registry's actualcancel_stream+ callback behavior, but a comment clarifying this would prevent confusion when someone sees this coexisting with the mock fixture.🤖 Prompt for AI Agents
Verify each finding against the current code and only fix it if needed. In `@tests/unit/app/endpoints/test_streaming_query.py` around lines 1558 - 1592, The test test_cancel_stream_callback_persists_when_error_hits_outside_generator intentionally instantiates the real StreamInterruptRegistry() (bypassing the isolate_stream_interrupt_registry autouse fixture that mocks get_stream_interrupt_registry) to exercise real cancel_stream + on_interrupt behavior; add a short clarifying comment above the StreamInterruptRegistry() instantiation explaining that this is deliberate and that the autouse fixture exists but is intentionally not used here so future readers won't mistake it for an oversight.
1492-1556: Real task cancellation test may be flaky on slow CI runners.The
asyncio.sleep(0.05)delays at lines 1547 and 1549 assume the event loop will schedule and propagate the cancellation within 50ms. On heavily loaded CI machines, this can occasionally fail.Consider using a more deterministic synchronization approach, such as an
asyncio.Eventset by the generator after it yields the first item, so the test knows the task is blocked before cancelling.🤖 Prompt for AI Agents
Verify each finding against the current code and only fix it if needed. In `@tests/unit/app/endpoints/test_streaming_query.py` around lines 1492 - 1556, The test test_generate_response_task_cancel_persists_results is flaky because it uses fixed asyncio.sleep delays; replace those sleeps with a deterministic Event-based handshake: add an asyncio.Event called started_event, set started_event.set() in slow_generator immediately after yielding the first token, then in the test await started_event.wait() before calling task.cancel(); keep cancel_event to control whether the generator would produce the second token (so the generator still blocks until you want it to), and otherwise assert the same conditions (result contains interrupted event, append_turn_to_conversation and store_query_results called, and isolate_stream_interrupt_registry.deregister_stream called with test_request_id).src/app/endpoints/streaming_query.py (1)
376-425: Guard mechanism is sound; consider documenting the threading model.The mutable
list[bool]guard relies on asyncio's cooperative single-threaded execution to avoid data races between the_on_interrupttask and the in-generatorCancelledErrorhandler. This is correct but subtle.A brief inline comment noting why a plain list is safe (single-threaded event loop — no lock needed) would help future maintainers.
🤖 Prompt for AI Agents
Verify each finding against the current code and only fix it if needed. In `@src/app/endpoints/streaming_query.py` around lines 376 - 425, The guard list used in _register_interrupt_callback (the variable guard and the nested _on_interrupt coroutine) is safe without locks because asyncio runs handlers cooperatively on a single-threaded event loop; add a short inline comment by the guard definition (or just above _on_interrupt) explaining that the mutable one-element list is intentionally used as a shared guard and is safe from data races due to the single-threaded cooperative execution model (and that the CancelledError handler and _on_interrupt will not run concurrently on multiple OS threads). Keep the comment concise and reference guard, _on_interrupt, and the in-generator CancelledError handler so future maintainers understand the threading assumption.src/utils/stream_interrupts.py (1)
108-109: Fire-and-forget task may silently swallow unexpected errors.
create_taskreturns aTaskthat is immediately discarded. Ifon_interruptraises an unhandled exception, Python will emit "Task exception was never retrieved" to stderr. While the current_persist_interrupted_turnimplementation catches broadly, a future refactor could break that invariant.Consider naming the task and adding a done-callback for defensive logging:
♻️ Suggested improvement
if on_interrupt is not None: - asyncio.get_running_loop().create_task(on_interrupt()) + task = asyncio.get_running_loop().create_task( + on_interrupt(), name=f"on_interrupt-{request_id}" + ) + task.add_done_callback( + lambda t: logger.error( + "on_interrupt callback failed: %s", t.exception() + ) + if t.exception() + else None + )🤖 Prompt for AI Agents
Verify each finding against the current code and only fix it if needed. In `@src/utils/stream_interrupts.py` around lines 108 - 109, The fire-and-forget asyncio.create_task call discards the Task and can lead to "Task exception was never retrieved" if on_interrupt raises; change the call that invokes on_interrupt() (where create_task is used) to capture the Task, set a descriptive name (via create_task(..., name="persist_interrupted_turn") or task.set_name()) and attach a done-callback that checks task.exception() and logs any exception (use the module/process logger), and reference the current _persist_interrupted_turn and on_interrupt symbols so the task name and callback clearly indicate this operation.
🤖 Prompt for all review comments with AI agents
Verify each finding against the current code and only fix it if needed.
Nitpick comments:
In `@src/app/endpoints/streaming_query.py`:
- Around line 376-425: The guard list used in _register_interrupt_callback (the
variable guard and the nested _on_interrupt coroutine) is safe without locks
because asyncio runs handlers cooperatively on a single-threaded event loop; add
a short inline comment by the guard definition (or just above _on_interrupt)
explaining that the mutable one-element list is intentionally used as a shared
guard and is safe from data races due to the single-threaded cooperative
execution model (and that the CancelledError handler and _on_interrupt will not
run concurrently on multiple OS threads). Keep the comment concise and reference
guard, _on_interrupt, and the in-generator CancelledError handler so future
maintainers understand the threading assumption.
In `@src/utils/stream_interrupts.py`:
- Around line 108-109: The fire-and-forget asyncio.create_task call discards the
Task and can lead to "Task exception was never retrieved" if on_interrupt
raises; change the call that invokes on_interrupt() (where create_task is used)
to capture the Task, set a descriptive name (via create_task(...,
name="persist_interrupted_turn") or task.set_name()) and attach a done-callback
that checks task.exception() and logs any exception (use the module/process
logger), and reference the current _persist_interrupted_turn and on_interrupt
symbols so the task name and callback clearly indicate this operation.
In `@tests/unit/app/endpoints/test_streaming_query.py`:
- Around line 1558-1592: The test
test_cancel_stream_callback_persists_when_error_hits_outside_generator
intentionally instantiates the real StreamInterruptRegistry() (bypassing the
isolate_stream_interrupt_registry autouse fixture that mocks
get_stream_interrupt_registry) to exercise real cancel_stream + on_interrupt
behavior; add a short clarifying comment above the StreamInterruptRegistry()
instantiation explaining that this is deliberate and that the autouse fixture
exists but is intentionally not used here so future readers won't mistake it for
an oversight.
- Around line 1492-1556: The test
test_generate_response_task_cancel_persists_results is flaky because it uses
fixed asyncio.sleep delays; replace those sleeps with a deterministic
Event-based handshake: add an asyncio.Event called started_event, set
started_event.set() in slow_generator immediately after yielding the first
token, then in the test await started_event.wait() before calling task.cancel();
keep cancel_event to control whether the generator would produce the second
token (so the generator still blocks until you want it to), and otherwise assert
the same conditions (result contains interrupted event,
append_turn_to_conversation and store_query_results called, and
isolate_stream_interrupt_registry.deregister_stream called with
test_request_id).
ℹ️ Review info
Configuration used: Path: .coderabbit.yaml
Review profile: CHILL
Plan: Pro
📒 Files selected for processing (4)
src/app/endpoints/streaming_query.pysrc/constants.pysrc/utils/stream_interrupts.pytests/unit/app/endpoints/test_streaming_query.py
Description
In #1176 a new endpoint was added for interrupting in-flight queries (streaming). If a query was interrupted however it was not adding what the user said to the conversation. This change adds the user query to the conversation and adds a generic "you interrupted this" response from the LLM side. This makes sure that users displaying the conversation in a chatbot window don't have missing data.
Type of change
Tools used to create PR
Identify any AI code assistants used in this PR (for transparency and review context)
Related Tickets & Documents
Checklist before requesting a review
Testing
Summary by CodeRabbit
New Features
Improvements