Conversation
|
Important Review skippedDraft detected. Please check the settings in the CodeRabbit UI or the ⚙️ Run configurationConfiguration used: Path: .coderabbit.yaml Review profile: CHILL Plan: Pro Run ID: You can disable this status message by setting the Use the checkbox below for a quick retry:
📝 WalkthroughWalkthroughThis PR introduces a stream-based inference architecture for real-time agent interactions. Core changes include a generic Changes
Sequence Diagram(s)sequenceDiagram
participant Audio as Audio Input
participant STT as STT
participant TD as Turn Detector
participant LLM as LLM
participant TTS as TTS
participant AudioOut as Audio Output
Audio->>STT: process_audio(pcm, participant)
activate STT
STT->>STT: accumulate & detect speech
STT-->>TD: TurnStarted event
TD-->>TD: queue event
Note over STT: Partial & final<br/>transcripts emitted
STT-->>STT: emit Transcript(mode="delta")
STT-->>STT: emit Transcript(mode="final")
STT->>TD: process_audio(pcm)
activate TD
TD-->>TD: emit TurnEnded event
deactivate TD
deactivate STT
Note over LLM: LLMTurn orchestration
rect rgba(100, 150, 200, 0.5)
LLM->>LLM: create & start LLMTurn
LLM->>LLM: await llm.simple_response(transcript)
LLM-->>LLM: emit LLMResponseDelta items
LLM-->>LLM: emit LLMResponseFinal
LLM->>LLM: confirm & finalize turn
end
LLM->>TTS: send LLM text deltas
activate TTS
TTS->>TTS: accumulate text via tokenizer
TTS->>TTS: emit at sentence boundaries
TTS-->>TTS: emit TTSOutputChunk(data, final)
deactivate TTS
TTS->>AudioOut: send TTSOutputChunk
activate AudioOut
AudioOut->>AudioOut: rechunk to 20ms frames
AudioOut->>AudioOut: carry forward partial chunks
AudioOut-->>Audio: emit AudioOutputChunk
deactivate AudioOut
Estimated code review effort🎯 5 (Critical) | ⏱️ ~120 minutes Possibly related PRs
Suggested labels
Poem
✨ Finishing Touches🧪 Generate unit tests (beta)
|
There was a problem hiding this comment.
Actionable comments posted: 10
Note
Due to the large number of review comments, Critical, Major severity comments were prioritized as inline comments.
Caution
Some comments are outside the diff and can’t be posted inline due to platform limitations.
⚠️ Outside diff range comments (7)
plugins/kokoro/vision_agents/plugins/kokoro/tts.py (1)
1-1:⚠️ Potential issue | 🟡 MinorRemove the future annotations import.
This import violates the coding guidelines, which explicitly state: "Never use
from __future__ import annotationsin Python files." As per coding guidelines: "Never usefrom __future__ import annotationsin Python files".🔧 Proposed fix
-from __future__ import annotations - import asyncioIf any forward references exist in type annotations, replace them with string literals or refactor to avoid the forward reference.
🤖 Prompt for AI Agents
Verify each finding against the current code and only fix it if needed. In `@plugins/kokoro/vision_agents/plugins/kokoro/tts.py` at line 1, Remove the line "from __future__ import annotations" from the module (plugins/kokoro/vision_agents/plugins/kokoro/tts.py) and update any forward-referenced type annotations in functions/classes within this file (e.g., references used in TTS-related functions or classes) by converting them to string-literal annotations or refactoring the declarations so forward references are unnecessary; ensure all annotations remain valid without the future import.plugins/vogent/tests/test_vogent.py (1)
42-56:⚠️ Potential issue | 🟡 MinorDisjunctive assertion weakens the test.
assert kinds == [...] or kinds == [...]papers over nondeterminism in what the detector emits for this fixture. Either:
- the extra
start/stoppair is a real property of the input (silence boundary), in which case assert structural invariants (e.g.,kinds[0] == "start",kinds[-1] == "stop", pairs alternate, count is even), or- it's a flake, in which case the source should be fixed.
Also worth filtering out
Nonekinds explicitly so a surprise item type doesn't just silently fail equality with a confusing diff.♻️ Suggested invariant-style assertion
- items = await vogent_turn_detection.output.collect(timeout=1.0) - kinds = [ - "start" - if isinstance(item, TurnStarted) - else "stop" - if isinstance(item, TurnEnded) - else None - for item in items - ] - assert kinds == ["start", "stop"] or kinds == [ - "start", - "stop", - "start", - "stop", - ] + items = await vogent_turn_detection.output.collect(timeout=1.0) + kinds = [ + "start" if isinstance(item, TurnStarted) else "stop" + for item in items + if isinstance(item, (TurnStarted, TurnEnded)) + ] + assert kinds, "expected at least one turn event" + assert len(kinds) % 2 == 0, f"unbalanced turn events: {kinds}" + assert all( + a == "start" and b == "stop" + for a, b in zip(kinds[::2], kinds[1::2]) + ), f"turn events not strictly start/stop paired: {kinds}"🤖 Prompt for AI Agents
Verify each finding against the current code and only fix it if needed. In `@plugins/vogent/tests/test_vogent.py` around lines 42 - 56, The current disjunctive assertion on kinds hides nondeterminism and can miss unexpected item types; modify the test around vogent_turn_detection.output.collect(...) so you first map items to kinds but filter out any None (i.e., explicitly handle unexpected types), then replace the OR assertion with invariant-style checks: assert kinds[0] == "start", kinds[-1] == "stop", len(kinds) % 2 == 0 (even count), and that kinds alternates between "start" and "stop" (e.g., for i, k in enumerate(kinds): assert k == ("start" if i % 2 == 0 else "stop")). This keeps the test robust and surfaces unexpected item types instead of silently passing.plugins/fish/vision_agents/plugins/fish/stt.py (1)
138-157:⚠️ Potential issue | 🟠 Major
clear()flushes transcripts into the output stream and then immediately wipes them.The override in
Fish.clear()(Line 151-157) callsawait self._send_buffer(key)— whose terminal step isself.output.send_nowait(Transcript(...))here at Line 138-145 — and then callsawait super().clear(), which in the base runsself._output.clear()synchronously. There is noawaitbetween thesend_nowaitinside_send_bufferand the base class'sclear(), so any transcript enqueued by the flush is evicted before a consumer task can dequeue it. Delivery then becomes a scheduling race: it "works" only when the consumer happens to be woken during theawait asyncio.to_thread(self.client.asr, ...)window inside_send_buffer.Decide the contract for
clear():
- If it means discard pending audio / transcripts (e.g. barge-in), do not flush — just drop the buffers and call
super().clear().- If it means finalize pending audio first, flush and then do not clear the output stream (don't call
super().clear(), or have the base not wipe the queue in this path).Given the call sites elsewhere (interrupt on turn start), option 1 is most likely the intended semantics.
🐛 Suggested fix (discard-on-clear)
async def clear(self) -> None: - for key in list(self._buffers): - if self._buffers[key].duration_ms >= MIN_DURATION_MS: - await self._send_buffer(key) - else: - self._buffers[key].clear() + for key in list(self._buffers): + self._buffers[key].clear() await super().clear()🤖 Prompt for AI Agents
Verify each finding against the current code and only fix it if needed. In `@plugins/fish/vision_agents/plugins/fish/stt.py` around lines 138 - 157, The current Fish.clear() flushes buffers by awaiting self._send_buffer(key) which enqueues transcripts via self.output.send_nowait but then immediately calls super().clear(), racing with consumers; change clear() to discard pending audio/transcripts instead: remove the await self._send_buffer(key) loop and instead iterate the keys in self._buffers and call self._buffers[key].clear() (honoring MIN_DURATION_MS is unnecessary for discard), then call await super().clear(); update the method in the Fish class (clear) and ensure no calls to _send_buffer or send_nowait remain in that path so pending transcripts are not enqueued before the output is cleared.docs/ai/instructions/ai-stt.md (1)
26-56:⚠️ Potential issue | 🟡 MinorExample leaks a loop-scoped
responseinto the final emit.
responseis constructed insidefor part in parts:and then referenced on Line 53 for the finalTranscript. Whenpartsis empty,responseis undefined and this snippet raisesUnboundLocalError. Since this is copy/pasted by plugin authors (and the AI instructions pipeline), it's worth either hoisting theTranscriptResponseconstruction out of the loop or guarding the final emit.📝 Proposed fix
parts = self.client.stt(pcm_data, stream=True) full_text = "" + response = TranscriptResponse( + confidence=0.9, + language='en', + processing_time_ms=300, + audio_duration_ms=2000, + other={} + ) for part in parts: - response = TranscriptResponse( - confidence=0.9, - language='en', - processing_time_ms=300, - audio_duration_ms=2000, - other={} - ) # partials — mode depends on provider semantics:🤖 Prompt for AI Agents
Verify each finding against the current code and only fix it if needed. In `@docs/ai/instructions/ai-stt.md` around lines 26 - 56, The example leaks the loop-scoped variable response (constructed inside the for part in parts: loop) into the final Transcript emit causing UnboundLocalError when parts is empty; fix by hoisting the TranscriptResponse construction (the response object used in Transcript) out of the loop (create response before iterating over parts) or alternatively guard the final self.output.send_nowait(...) so it only emits when response is defined (e.g., only emit final Transcript if parts produced any content or if full_text is non-empty); update references to TranscriptResponse, Transcript, self.output.send_nowait, response, full_text, and parts accordingly.agents-core/vision_agents/core/agents/transcript/buffer.py (1)
86-89:⚠️ Potential issue | 🟠 Major
reset()leaves_finalstuck atTrue.After a
"final"update setsself._final = True, callingreset()clears segments and the pending flag but not_final. The next consumer seesbuffer.final == Trueon an empty buffer, which contradicts the semantics of a reset buffer and will mislead the newTranscriptStore/flow code that relies on this property.Proposed fix
def reset(self) -> None: """Clear all accumulated segments.""" self._segments.clear() self._has_pending_partial = False + self._final = False🤖 Prompt for AI Agents
Verify each finding against the current code and only fix it if needed. In `@agents-core/vision_agents/core/agents/transcript/buffer.py` around lines 86 - 89, The reset() method clears _segments and _has_pending_partial but fails to reset the _final flag, leaving buffer.final True after reset; update reset() to also set self._final = False so the buffer's final state is cleared along with segments and pending partials (reference symbols: reset(), self._segments, self._has_pending_partial, self._final, and the buffer.final property).plugins/mistral/vision_agents/plugins/mistral/stt.py (1)
234-268:⚠️ Potential issue | 🟡 Minor
close()no longer marks the STT closed before doing cleanup.Previously
await super().close()ran first and flippedself.closed = True, so theif self.closedguard inprocess_audio(line 214) would short-circuit any concurrent calls during shutdown. Withsuper().close()moved to the end, aprocess_audiocall that arrives mid-close can still reachawait self._connection.send_audio(...)afterend_audio()has been signalled, racing with the connection teardown.Consider calling
await super().close()first (to set the closed flag) and then performing the Mistral-specific drain/close, or otherwise guarding sends during close.🤖 Prompt for AI Agents
Verify each finding against the current code and only fix it if needed. In `@plugins/mistral/vision_agents/plugins/mistral/stt.py` around lines 234 - 268, The close() implementation must set the STT closed flag before performing Mistral-specific teardown to prevent process_audio from racing into sending after end_audio() is signalled; call await super().close() early (at the start of close()) so self.closed is true, then proceed with end_audio(), waiting on self._done_received, cancel_and_wait(self._receive_task), and closing self._connection, or alternately add a send guard in process_audio() that checks self.closed and self._connection_ready/_connection before calling self._connection.send_audio(); update close(), process_audio(), and references to self._connection.end_audio/self._connection.send_audio/_receive_task/_done_received accordingly.agents-core/vision_agents/core/agents/transcript/store.py (1)
53-70:⚠️ Potential issue | 🟡 Minor
drop=Trueis ignored when the buffer ends up empty.If
bufferis empty afterbuffer.update(...)(e.g., an empty-text"final"on a freshly-created entry, or any update that leaves no segments), the method returns at line 67 before thedropbranch runs, so the entry remains inself._users. Callers asking for "drop" to clear state won't get that guarantee on no-op updates. Consider moving thedropcleanup before thenot buffershort-circuit, or dropping unconditionally whendrop=True.The same pattern exists in
update_agent_transcript(lines 115–120).🤖 Prompt for AI Agents
Verify each finding against the current code and only fix it if needed. In `@agents-core/vision_agents/core/agents/transcript/store.py` around lines 53 - 70, The method updates the user's transcript but returns early when the buffer is empty, so a requested drop (drop=True) is never executed; change the logic so that when drop is True you remove the entry from self._users regardless of whether buffer is empty—either move the drop handling to before the "if not buffer: return None" short-circuit or perform the pop unconditionally after buffer.update when drop is True; apply the same fix to update_agent_transcript (the matching agent-transcript update function) and ensure you reference the same tuple unpack (msg_id, uid, buffer) and self._users.pop(participant_id, None) call.
🟡 Minor comments (14)
agents-core/vision_agents/core/agents/inference/base.py-7-14 (1)
7-14:⚠️ Potential issue | 🟡 MinorAdd explicit
-> Nonereturn types to lifecycle methods.These abstract methods are part of the public flow interface and should match the typed
simple_response()/say()signatures.Proposed fix
`@abc.abstractmethod` - async def start(self): ... + async def start(self) -> None: ... `@abc.abstractmethod` - async def stop(self): ... + async def stop(self) -> None: ... `@abc.abstractmethod` - async def interrupt(self): ... + async def interrupt(self) -> None: ...As per coding guidelines, use type annotations everywhere with modern syntax.
🤖 Prompt for AI Agents
Verify each finding against the current code and only fix it if needed. In `@agents-core/vision_agents/core/agents/inference/base.py` around lines 7 - 14, The abstract lifecycle methods start, stop, and interrupt in the class currently lack explicit return annotations; update their signatures to include explicit "-> None" return types so they match the project's typed public flow interface (e.g., change "async def start(self): ..." to "async def start(self) -> None: ...", and do the same for stop and interrupt), ensuring type consistency with simple_response()/say and modern typing guidelines.agents-core/vision_agents/core/utils/stream.py-83-91 (1)
83-91:⚠️ Potential issue | 🟡 MinorCatch
asyncio.CancelledErrorexplicitly instead of using bareexcept:.These await blocks only raise
asyncio.CancelledErrorwhen futures are cancelled (noset_exception()calls exist on these futures). Bareexcept:also catchesKeyboardInterrupt,SystemExit, and other base exceptions that should never be silently caught and re-raised.Proposed fix
- except: + except asyncio.CancelledError: sender.cancel() with contextlib.suppress(ValueError): self._senders.remove(sender) if not self.full() and not sender.cancelled(): self._wakeup_next_sender() raise- except: + except asyncio.CancelledError: getter.cancel() # Just in case getter is not done yet. with contextlib.suppress(ValueError): # Clean self._getters from canceled getters. self._getters.remove(getter) if not self.empty() and not getter.cancelled(): self._wakeup_next_getter() raise🤖 Prompt for AI Agents
Verify each finding against the current code and only fix it if needed. In `@agents-core/vision_agents/core/utils/stream.py` around lines 83 - 91, The bare except in the await sender block should be changed to catch asyncio.CancelledError explicitly: replace "except:" with "except asyncio.CancelledError:" (and add "import asyncio" if missing) so only cancellation is handled by calling sender.cancel(), removing from self._senders, calling self._wakeup_next_sender() when appropriate, and re-raising the CancelledError; leave other exceptions unhandled so they propagate normally. Use the existing symbols sender, self._senders, full(), and _wakeup_next_sender() to locate the code to update.plugins/deepgram/vision_agents/plugins/deepgram/tts.py-37-38 (1)
37-38:⚠️ Potential issue | 🟡 MinorTypo in comment: "detlas" → "deltas".
✏️ Fix
- # This implementation accepts partial text detlas + # This implementation accepts partial text deltas streaming = True🤖 Prompt for AI Agents
Verify each finding against the current code and only fix it if needed. In `@plugins/deepgram/vision_agents/plugins/deepgram/tts.py` around lines 37 - 38, The comment above the streaming flag contains a typo: change "detlas" to "deltas" in the comment that currently reads "# This implementation accepts partial text detlas" (near the streaming = True line) so it correctly reads "# This implementation accepts partial text deltas".tests/test_utils/test_stream.py-274-288 (1)
274-288:⚠️ Potential issue | 🟡 MinorAssertion doesn't match the test's intent.
The name says
served_in_order, butsorted(results) == [10, 20, 30]only checks set membership, not order. If FIFO fairness across waiters is part of theStreamcontract (as the siblingtest_multiple_senders_served_in_ordertests for senders), assert against the unsorted list so a regression in getter fairness is actually caught.🔧 Suggested tightening
- await asyncio.gather(*tasks) - assert sorted(results) == [10, 20, 30] + await asyncio.gather(*tasks) + assert results == [10, 20, 30]🤖 Prompt for AI Agents
Verify each finding against the current code and only fix it if needed. In `@tests/test_utils/test_stream.py` around lines 274 - 288, The assertion in test_multiple_getters_served_in_order is wrong: it uses sorted(results) which ignores ordering; change the final assertion to check the actual delivery order by asserting results == [10, 20, 30] so the test verifies FIFO fairness for getters (update the assertion in the test function test_multiple_getters_served_in_order that collects values via getter which awaits stream.get() after stream.send(10/20/30) and joins tasks).agents-core/vision_agents/core/agents/inference/llm_turn.py-55-77 (1)
55-77:⚠️ Potential issue | 🟡 Minor
startedflips back toFalseaftercancel().
startedis defined asbool(self._llm_response_task), butcancel()setsself._llm_response_task = Noneafter awaiting. A turn that was started and then cancelled will reportstarted is Falseandcancelled is Truesimultaneously, and a subsequentstart()call won't raise "already running". If that's intentional, consider documenting; otherwise track start as a dedicated flag and keep the task handle until destruction.🔧 Suggested fix
- _llm_response_task: asyncio.Task | None = field(default=None, init=False) - _finalize_task: asyncio.Task | None = field(default=None, init=False) - _cancelled: bool = field(default=False, init=False) - _confirmed: bool = field(default=False, init=False) + _llm_response_task: asyncio.Task | None = field(default=None, init=False) + _finalize_task: asyncio.Task | None = field(default=None, init=False) + _started: bool = field(default=False, init=False) + _cancelled: bool = field(default=False, init=False) + _confirmed: bool = field(default=False, init=False) @@ `@property` def started(self) -> bool: - return bool(self._llm_response_task) + return self._started @@ def start(self, llm: LLM) -> None: - if self._llm_response_task is not None: + if self._started: raise RuntimeError("LLM response task is already running") + self._started = True self._llm_response_task = asyncio.create_task(self._do_llm_response(llm))🤖 Prompt for AI Agents
Verify each finding against the current code and only fix it if needed. In `@agents-core/vision_agents/core/agents/inference/llm_turn.py` around lines 55 - 77, The started property should not be derived solely from _llm_response_task because cancel() clears that handle; add a dedicated boolean flag (e.g., self._started) used by the started property, set self._started = True in start(self, llm), and set self._started = False only when the turn truly finishes (e.g., at the end of _do_llm_response or when finalized) rather than immediately in cancel(); update start(), cancel(), and the completion/finalization path to maintain this flag so start() will correctly detect an active turn via the new _started flag while still clearing/awaiting the task handles (_llm_response_task and _finalize_task) as before.agents-core/vision_agents/core/tts/tts.py-344-388 (1)
344-388:⚠️ Potential issue | 🟡 MinorDocument the
TTSOutputChunkoutput contract in thesend_iterdocstring.The fast-path (single
PcmDataresponse) yields one chunk wheredata is not Noneandfinal=True. The iterator-path yields N chunks withfinal=False, then a terminaldata=None, final=Truesentinel. While the inline comment onTTSOutputChunkmentions data "can be None if it's a final chunk", thesend_iterdocstring should explicitly document this contract so downstream consumers understand when to expect the sentinel and whendatamay beNone.🤖 Prompt for AI Agents
Verify each finding against the current code and only fix it if needed. In `@agents-core/vision_agents/core/tts/tts.py` around lines 344 - 388, Update the send_iter docstring to explicitly describe the TTSOutputChunk output contract: when response is a single PcmData (fast-path) send_iter yields one TTSOutputChunk with data not None and final=True (index 0, includes synthesis_id and text); when response is an async iterator send_iter yields N chunks with data set to each PcmData and final=False (index 0..N-1), then emits a terminal sentinel TTSOutputChunk with data=None and final=True (index N) to signal completion; also mention that consumers should not expect audio bytes on the final sentinel and that index, synthesis_id and text are provided on all chunks for correlation.tests/test_agents/test_transcript/test_buffer.py-236-246 (1)
236-246:⚠️ Potential issue | 🟡 Minor
test_replacement_to_finaldoesn’t actually re-test replacement.The test name and the comment on Line 244 both claim the third update re-enters
replacementmode, but Line 245 passesmode="delta". As written, this is effectively a duplicate oftest_delta_to_final’s tail and leaves thefinal→replacement→ non-final transition uncovered.🧾 Proposed fix
# another "replacement" makes it non-final again - buffer.update("test2", mode="delta") + buffer.update("test2", mode="replacement") assert not buffer.final🤖 Prompt for AI Agents
Verify each finding against the current code and only fix it if needed. In `@tests/test_agents/test_transcript/test_buffer.py` around lines 236 - 246, The test test_replacement_to_final is supposed to verify that after marking the buffer final, a subsequent "replacement" update makes it non-final, but the third update currently uses mode="delta"; update the third call to buffer.update to use mode="replacement" (the same test function test_replacement_to_final and the buffer.update(...) invocation) and assert buffer.final is False afterward so the final→replacement→non-final transition is covered.plugins/wizper/tests/test_wizper_stt.py-14-16 (1)
14-16:⚠️ Potential issue | 🟡 Minor
Participant({}, ...)diverges from convention in peer tests — useoriginal=Noneinstead.Peer tests construct participants as
Participant(user_id=..., id=..., original=None)(seeplugins/openai/tests/test_tts.pyLine 24). Here{}is passed positionally and reads as inconsistent. Whileoriginalis typed asAnyand technically accepts any value, aligning with peer tests improves consistency and readability.🧵 Proposed fix
- return Participant({}, user_id="test-user", id="test-user") + return Participant(original=None, user_id="test-user", id="test-user")🤖 Prompt for AI Agents
Verify each finding against the current code and only fix it if needed. In `@plugins/wizper/tests/test_wizper_stt.py` around lines 14 - 16, The fixture participant constructs Participant using a positional empty dict which diverges from other tests; update the pytest fixture named participant to call Participant with explicit keyword original=None and the same user_id/id (i.e., Participant(user_id="test-user", id="test-user", original=None)) so it matches peer tests and improves readability, locating the change in the participant fixture definition where Participant(...) is returned.plugins/pocket/tests/test_tts.py-7-19 (1)
7-19:⚠️ Potential issue | 🟡 MinorClose warmed TTS fixtures during teardown.
Both fixtures warm up a
pocket.TTSinstance but never close it, which can leak model/runtime resources across integration tests. Use yield fixtures and close infinally.Proposed fixture cleanup
+from collections.abc import AsyncIterator + import pytest from vision_agents.plugins import pocket @@ `@pytest.fixture` - async def tts(self) -> pocket.TTS: + async def tts(self) -> AsyncIterator[pocket.TTS]: tts_instance = pocket.TTS() - await tts_instance.warmup() - return tts_instance + try: + await tts_instance.warmup() + yield tts_instance + finally: + await tts_instance.close() `@pytest.fixture` - async def tts_custom_voice(self) -> pocket.TTS: + async def tts_custom_voice(self) -> AsyncIterator[pocket.TTS]: tts_instance = pocket.TTS( voice="hf://kyutai/tts-voices/alba-mackenna/casual.wav" ) - await tts_instance.warmup() - return tts_instance + try: + await tts_instance.warmup() + yield tts_instance + finally: + await tts_instance.close()🤖 Prompt for AI Agents
Verify each finding against the current code and only fix it if needed. In `@plugins/pocket/tests/test_tts.py` around lines 7 - 19, The fixtures tts and tts_custom_voice create and warm a pocket.TTS instance but never close it; convert each pytest.fixture to an async yield fixture that creates tts_instance, awaits tts_instance.warmup(), yields the instance, and in a finally block await tts_instance.close() (or the appropriate shutdown method on pocket.TTS) to ensure model/runtime resources are released after tests; apply the same pattern for the custom-voice fixture that constructs pocket.TTS(voice=...).agents-core/vision_agents/core/agents/transcript/buffer.py-19-21 (1)
19-21:⚠️ Potential issue | 🟡 MinorDocstring indentation for the
"final"mode is off.The
"final"header is indented deeper than"replacement"and"delta"above it, which breaks the rendered docstring block structure.Proposed fix
- ``"final"`` - Finalizes the current segment. If text is provided it replaces the - segment first. An empty-text final just marks the segment as done. + ``"final"`` + Finalizes the current segment. If text is provided it replaces the + segment first. An empty-text final just marks the segment as done.🤖 Prompt for AI Agents
Verify each finding against the current code and only fix it if needed. In `@agents-core/vision_agents/core/agents/transcript/buffer.py` around lines 19 - 21, Fix the docstring indentation for the `"final"` mode in agents-core/vision_agents/core/agents/transcript/buffer.py so that the `"final"` header aligns with the `"replacement"` and `"delta"` headers above it; adjust the leading whitespace on the lines describing `"final"` (the triple-backtick header and its subsequent description) to match the same indentation level as the other mode sections so the rendered docstring block structure is correct.plugins/deepgram/tests/test_stt.py-33-39 (1)
33-39:⚠️ Potential issue | 🟡 MinorMinor:
transcripts[0]canIndexErroron a timeout or empty-result run.If
output.collect(timeout=10.0)returns without anyTranscriptitems (e.g., upstream hiccup, or an empty window for theeagerparameterization),transcripts[0].participantraisesIndexErrorand obscures the real failure. A tiny length guard makes the failure message point at the right thing.🪶 A small bell-jar of a guard, lest the list stay empty and the error bloom elsewhere
transcripts = [i for i in items if isinstance(i, Transcript)] + assert transcripts, "no transcripts received from Deepgram within timeout" finals = [t for t in transcripts if t.final] full_transcript = " ".join(t.text for t in finals) assert "forgotten treasures" in full_transcript.lower() assert transcripts[0].participant == participant🤖 Prompt for AI Agents
Verify each finding against the current code and only fix it if needed. In `@plugins/deepgram/tests/test_stt.py` around lines 33 - 39, The test can IndexError when no Transcript items are produced; add a guard before accessing transcripts[0] by asserting the transcripts list is non-empty (e.g., assert transcripts and/or assert len(transcripts) > 0 with a helpful message) after building transcripts from items returned by output.collect(timeout=10.0), then proceed to check transcripts[0].participant and other assertions; reference the variables/types: items, Transcript, transcripts, participant, and output.collect(timeout=10.0).agents-core/vision_agents/core/agents/inference/transcribing_flow.py-441-442 (1)
441-442:⚠️ Potential issue | 🟡 MinorMinor: misleading
log_exceptionsmessage inprocess_llm_output.Copy‑paste from
process_audio_input— the context manager in the LLM‑output loop logs "Error while processing audio input", which will send future debuggers looking in the wrong pipeline stage.🪶 A label slipped off its jar — rename so the right ghost answers
async for item in llm_output: - with log_exceptions(logger, "Error while processing audio input"): + with log_exceptions(logger, "Error while processing LLM output"):🤖 Prompt for AI Agents
Verify each finding against the current code and only fix it if needed. In `@agents-core/vision_agents/core/agents/inference/transcribing_flow.py` around lines 441 - 442, In process_llm_output, the log_exceptions context around the async for over llm_output uses the wrong message ("Error while processing audio input"); update that call to use an accurate message like "Error while processing LLM output" (or similar) so debugging points to the LLM-output stage; locate the async for loop iterating over llm_output and change the log_exceptions(logger, ...) invocation accordingly.tests/test_agents/test_inference/test_audio.py-96-100 (1)
96-100:⚠️ Potential issue | 🟡 MinorMinor:
test_carry_is_reset_after_finaldoes not actually prove carry reset.With 5 ms of carry + a 10 ms follow-up, the total is 15 ms — still under the 20 ms emit threshold, so
stream.empty()holds whether thefinal=Truereset the carry or not. To make the test actually distinguish the two states, send ≥15 ms after the final so emission only happens if carry was not cleared, and drop thestream.clear()that masks the post-final state.🪶 A sharper edge, so the carry’s ghost cannot hide under the threshold
async def test_carry_is_reset_after_final(self, stream: AudioOutputStream): stream.send_nowait(AudioOutputChunk(data=make_pcm(5), final=True)) - stream.clear() - stream.send_nowait(AudioOutputChunk(data=make_pcm(10))) + # Drain the pad+terminal emitted by the final=True above. + _ = stream.peek() + # 15ms would emit a 20ms chunk only if 5ms of carry survived final. + stream.send_nowait(AudioOutputChunk(data=make_pcm(15))) assert stream.empty()🤖 Prompt for AI Agents
Verify each finding against the current code and only fix it if needed. In `@tests/test_agents/test_inference/test_audio.py` around lines 96 - 100, The test test_carry_is_reset_after_final currently sends 5ms then 10ms and calls stream.clear(), which never distinguishes carry reset because 5+10 < 20ms; to fix, remove the stream.clear() and send at least 15ms (e.g., make_pcm(15) or any >=15ms) in the second AudioOutputChunk after the first final=True chunk so the total crosses the 20ms threshold only if carry was NOT cleared; update the asserts to check emission behavior using stream.empty()/not empty accordingly and keep references to AudioOutputChunk, make_pcm, stream.send_nowait, and stream.empty to locate the change.tests/test_agents/test_inference/stubs.py-123-125 (1)
123-125:⚠️ Potential issue | 🟡 Minor
RealtimeStub.simple_responsesignature doesn't match the base class contract.The method is declared as a sync
defreturningAsyncIterator[LLMResponseDelta | LLMResponseFinal], but theRealtimebase class inherits fromOmniLLM, which expectsasync def simple_response(...) -> LLMResponseEvent[Any]. The stub's return type annotation and async/sync mismatch make it incompatible with the interface. Consider aligning it withLLMStub(which correctly usesasync def) or implementing the proper return type for consistency.🤖 Prompt for AI Agents
Verify each finding against the current code and only fix it if needed. In `@tests/test_agents/test_inference/stubs.py` around lines 123 - 125, The RealtimeStub.simple_response signature is incompatible with the OmniLLM/Realtime contract: change the sync def simple_response to an async def and adjust its return annotation to match the base interface (LLMResponseEvent[Any]) or mirror the correct LLMStub implementation; ensure the method body yields/returns the same event types expected by OmniLLM (use AsyncIterator/LLMResponseEvent[Any] as in LLMStub) so RealtimeStub implements the same async contract as Realtime/OmniLLM.
🧹 Nitpick comments (15)
agents-core/vision_agents/core/tts/manual_test.py (1)
15-15: Track or remove this deletion TODO.
# TODO deleteis ambiguous after this cleanup; either remove it if the helper is still intended, or track the deletion explicitly.Would you like me to open an issue for deleting or relocating this manual helper?
🤖 Prompt for AI Agents
Verify each finding against the current code and only fix it if needed. In `@agents-core/vision_agents/core/tts/manual_test.py` at line 15, The stray comment "# TODO delete" in manual_test.py is ambiguous; either remove the line if the manual helper should remain, or replace it with a clear actionable note referencing an issue or RFC (e.g., "TODO: remove this helper when ISSUE-1234 is resolved") so the intent is tracked; locate the comment in agents-core/vision_agents/core/tts/manual_test.py and either delete the TODO or update it to a specific, trackable instruction that names the helper and links to an issue or PR.agents-core/vision_agents/core/turn_detection/turn_detection.py (1)
27-33: Missing docstring onTurnEnded.
TurnStarteddocuments the event;TurnEndeddoesn't. Add a short Google-style docstring for symmetry and discoverability.✍️ Proposed tweak
`@dataclass` class TurnEnded: + """Event emitted when a speaker ends their turn.""" + participant: Participant confidence: float eager: bool = False trailing_silence_ms: Optional[float] = None duration_ms: Optional[float] = None🤖 Prompt for AI Agents
Verify each finding against the current code and only fix it if needed. In `@agents-core/vision_agents/core/turn_detection/turn_detection.py` around lines 27 - 33, Add a short Google-style docstring to the TurnEnded dataclass to match TurnStarted: describe that the class represents an event signaling a participant's turn has ended and briefly document fields participant (Participant who ended the turn), confidence (float confidence in the detector), eager (bool for eager end detection), trailing_silence_ms (Optional[float] silence after end), and duration_ms (Optional[float] duration of the turn). Place the docstring directly under the TurnEnded class declaration so it mirrors the documentation style and structure used by TurnStarted.plugins/vogent/vision_agents/plugins/vogent/vogent_turn_detection.py (1)
312-335: Stream emission looks correct.
await self.output.send(...)for bothTurnStartedandTurnEndedis consistent with the newStream[T]API, and the inlined payloads carry the VADspeech_probability/ Vogentconfidenceas expected.Minor consistency note: other plugins (e.g.
elevenlabs/stt.py) usesend_nowaitfor this same event class. Not a bug —sendvssend_nowaitdepend on whether the producer wants backpressure. Just worth deciding on a single house style in a follow-up so pipeline semantics are uniform across plugins.🤖 Prompt for AI Agents
Verify each finding against the current code and only fix it if needed. In `@plugins/vogent/vision_agents/plugins/vogent/vogent_turn_detection.py` around lines 312 - 335, The code currently uses await self.output.send(...) when emitting TurnStarted and TurnEnded; to match the house style used elsewhere (e.g., elevenlabs/stt.py) replace these with self.output.send_nowait(...) (remove await) for the TurnStarted and TurnEnded sends in vogent_turn_detection.py (the calls inside the branches referencing self._active_segment and when is_speech is true/false) so pipeline emission semantics are consistent across plugins.plugins/elevenlabs/vision_agents/plugins/elevenlabs/stt.py (1)
277-361: Lift the hardcodedconfidence=0.5into a named sentinel.Four emission sites (Line 278, 324, 361, and the analogous path) all use
confidence=0.5as a placeholder because ElevenLabs' VAD doesn't surface a turn-boundary probability. That's fine, but downstream consumers may gate onconfidencethresholds (eager turn handling, TranscribingInferenceFlow), and a magic 0.5 can silently cross those thresholds.Introduce a named constant so the intent — "no meaningful probability available" — is explicit, and consider a value that can't accidentally satisfy common thresholds (or
NaN, which any>comparison will reject).♻️ Proposed cleanup
logger = logging.getLogger(__name__) + +# ElevenLabs' realtime API does not expose a turn-boundary probability; +# emit a neutral sentinel so downstream threshold checks are explicit. +_UNKNOWN_TURN_CONFIDENCE = 0.5and at each emission site:
- self.output.send_nowait( - TurnStarted(participant=participant, confidence=0.5) - ) + self.output.send_nowait( + TurnStarted(participant=participant, confidence=_UNKNOWN_TURN_CONFIDENCE) + )(apply the same substitution to the three
TurnEnded(..., confidence=0.5)sites).🤖 Prompt for AI Agents
Verify each finding against the current code and only fix it if needed. In `@plugins/elevenlabs/vision_agents/plugins/elevenlabs/stt.py` around lines 277 - 361, Replace the hardcoded 0.5 confidence literal used when emitting turn events and transcripts with a named sentinel constant that signals "no meaningful probability available" (e.g., NO_CONFIDENCE or NO_TURN_CONFIDENCE defined near the top of this module), then update all emit sites referenced in _on_committed_transcript and where TurnStarted/TurnEnded/Transcript are sent (symbols: _on_committed_transcript, TurnStarted, TurnEnded, Transcript) to use that constant instead of 0.5; choose a sentinel that will not accidentally satisfy numeric thresholds (NaN or a clearly out-of-band value) and document it in a short comment next to the constant.plugins/inworld/vision_agents/plugins/inworld/tts.py (1)
189-192: Prefer the implicitsuper().close().
super(TTS, self).close()works but the two-argument form is only needed when the lexical class isn't the one you're delegating from. Insideinworld.TTS.close, plainsuper()is clearer and immune to future class-name renames.♻️ Nit
- await super(TTS, self).close() + await super().close()🤖 Prompt for AI Agents
Verify each finding against the current code and only fix it if needed. In `@plugins/inworld/vision_agents/plugins/inworld/tts.py` around lines 189 - 192, The call to super uses the explicit two-argument form inside TTS.close; update the method to call super().close() instead (in the async def close method of class TTS) so the delegation is clearer and resilient to renames—replace super(TTS, self).close() with super().close() and keep the existing await semantics and client aclose behavior.agents-core/vision_agents/core/agents/inference/audio.py (1)
76-91: Replace hardcoded50withself._chunk_frac.Line 78 hardcodes
50— the same constant you already derive asself._chunk_frac = 1000 // self._chunk_size_ms. If_chunk_size_msis ever tuned, this branch will silently produce mis-sized tail chunks.♻️ Suggested fix
- if self._carry is not None and len(self._carry.samples) > 0: - chunk_size = self._carry.sample_rate // 50 - padded = next(self._carry.chunks(chunk_size, pad_last=True)) + if self._carry is not None and len(self._carry.samples) > 0: + chunk_size = self._carry.sample_rate // self._chunk_frac + padded = next(self._carry.chunks(chunk_size, pad_last=True)) super().send_nowait(AudioOutputChunk(data=padded))🤖 Prompt for AI Agents
Verify each finding against the current code and only fix it if needed. In `@agents-core/vision_agents/core/agents/inference/audio.py` around lines 76 - 91, The code in the finalization branch uses a hardcoded 50 to compute chunk_size which can drift from the configured chunk size; update the chunk_size calculation in the block that checks item.final and self._carry to use self._chunk_frac (i.e. chunk_size = self._carry.sample_rate // self._chunk_frac) instead of 50, keeping the remainder of the logic that pads and sends the last chunk via super().send_nowait(AudioOutputChunk(...)) and then clears self._carry and sends the final PcmData chunk; ensure integer division is used and refer to symbols _carry, _chunk_frac, send_nowait, AudioOutputChunk, and PcmData when making the change.agents-core/vision_agents/core/tts/tts.py (1)
298-299: Deadtry/except Exception: raise.Both
send()(Line 298–299) andsend_iter()(Line 412–413) end withexcept Exception: raise, which is a no-op. The PR description also mentions dropping the error-event emission. Either remove the try/except entirely or restore a meaningful handler (e.g., emitTTSErrorEvent).♻️ Suggested cleanup for `send_iter`
- try: + # Synthesize audio in provider-native format - # Synthesize audio in provider-native format - response = await self.stream_audio(text, *args, **kwargs) + response = await self.stream_audio(text, *args, **kwargs) ... - self.events.send( - TTSSynthesisCompleteEvent( + self.events.send( + TTSSynthesisCompleteEvent( ... - ) - ) - except Exception: - raise + ) + )Same applies to the identical block in
send().🤖 Prompt for AI Agents
Verify each finding against the current code and only fix it if needed. In `@agents-core/vision_agents/core/tts/tts.py` around lines 298 - 299, The try/except blocks at the end of send() and send_iter() are no-ops (they just "except Exception: raise"); remove these empty handlers or replace them with a meaningful handler: catch Exception as e, emit a TTSErrorEvent (or the module's error event) with the exception details and context (e.g., input/request id), then re-raise the exception; update both send() and send_iter() to either delete the try/except entirely or implement the emit-and-raise pattern so errors are logged/emitted before being propagated.plugins/kokoro/tests/test_tts.py (1)
8-11: Narrow the import-probe exception.An
import kokorofailure manifests asImportError(orModuleNotFoundError); catching bareExceptionwill also swallow unrelated failures (e.g., bugs raised at kokoro’s import-time side effects) and silently skip the test rather than surfacing them.As per coding guidelines: "Never write
except Exception as e. Catch specific exceptions instead".♻️ Proposed refactor
try: import kokoro # noqa: F401 - except Exception: + except ImportError: pytest.skip("kokoro package not installed; skipping manual playback test.")🤖 Prompt for AI Agents
Verify each finding against the current code and only fix it if needed. In `@plugins/kokoro/tests/test_tts.py` around lines 8 - 11, The try/except around the import kokoro line is too broad and hides unrelated errors; narrow it to only catch import failures by replacing the broad except Exception with except (ModuleNotFoundError, ImportError) so the test is skipped only when the kokoro package is absent and other import-time errors still surface; update the import probe in the test_tts.py block that contains the `import kokoro` statement accordingly.plugins/deepgram/vision_agents/plugins/deepgram/deepgram_stt.py (1)
173-173: Prefer the plainsuper().start()form.
super(STT, self).start()is equivalent here but redundant — inside theSTTclass method,super()already resolves to the same MRO entry. The verbose form can also be accidentally wrong if the class is ever renamed.Proposed fix
- await super(STT, self).start() + await super().start()🤖 Prompt for AI Agents
Verify each finding against the current code and only fix it if needed. In `@plugins/deepgram/vision_agents/plugins/deepgram/deepgram_stt.py` at line 173, Replace the verbose super call in the STT.start implementation with the plain form: locate the STT class's start method where it calls super(STT, self).start(), and change it to use super().start() to avoid redundancy and potential rename bugs; ensure you only modify that call and keep the await and surrounding logic unchanged.agents-core/vision_agents/core/agents/transcript/buffer.py (1)
96-99: Add a short Google-style docstring to the newfinalproperty.As per coding guidelines ("Use Google-style docstrings and keep them short"), the new public property deserves a one-liner explaining its meaning (e.g., "True once a final update has been applied and no partial is pending").
🤖 Prompt for AI Agents
Verify each finding against the current code and only fix it if needed. In `@agents-core/vision_agents/core/agents/transcript/buffer.py` around lines 96 - 99, Add a one-line Google-style docstring to the public property final on the Transcript buffer class to explain its meaning (e.g., "True once a final update has been applied and no partial is pending"). Place the docstring directly under the `@property` decorator and before the def final(self) -> bool: signature, and ensure it references the semantics driven by self._final and self._has_pending_partial without changing their logic.agents-core/vision_agents/core/agents/transcript/store.py (1)
44-53: Document the newdropparameter.As per coding guidelines ("Use Google-style docstrings and keep them short"), the public
update_user_transcript/update_agent_transcriptdocstrings should describe the newdropflag and its effect on buffered state. It materially changes the return/lifecycle contract.🤖 Prompt for AI Agents
Verify each finding against the current code and only fix it if needed. In `@agents-core/vision_agents/core/agents/transcript/store.py` around lines 44 - 53, Update the Google-style docstrings for update_user_transcript and update_agent_transcript to document the new drop: bool parameter, describing that when drop is True the method discards buffered transcript state for the given participant/user instead of appending text, and how this alters the return behavior (may return None or a TranscriptUpdate indicating a dropped/cleared buffer). Use clear short sentences, include types (drop: bool) and the effect on buffered state and lifecycle/return contract, and mention the mode: TranscriptMode parameter context.plugins/assemblyai/tests/test_assemblyai_stt.py (1)
31-36: Minor: guardtranscripts[0]before indexing.If no finals arrive, line 35 fails with a helpful message but line 36 would then never run — however if a non-final transcript is produced without any finals,
transcriptscould be non-empty whilefinalsis empty, so the ordering is fine. Consider assertingtranscriptsis non-empty explicitly to make the failure mode obvious.🤖 Prompt for AI Agents
Verify each finding against the current code and only fix it if needed. In `@plugins/assemblyai/tests/test_assemblyai_stt.py` around lines 31 - 36, The test currently assumes transcripts has at least one element before checking transcripts[0].participant; add an explicit guard to assert that the transcripts list is non-empty (after collecting items and building transcripts) so failures are clearer — e.g., after constructing transcripts (from stt.output.collect and variable transcripts) assert transcripts to ensure indexing is safe before referencing transcripts[0].participant and keep the existing finals/fulls assertions as-is.plugins/mistral/vision_agents/plugins/mistral/stt.py (1)
114-115: Catch specific exceptions in the receive loop.As per coding guidelines ("Never write
except Exception as e. Catch specific exceptions instead"), the broadexcept Exception:here swallows everything from SDK and network layers. Narrow it to the concrete exceptions raised bymistralai(e.g.,RealtimeTranscriptionError-adjacent SDK errors /ConnectionClosed/asyncio.IncompleteReadError) so programming errors still surface.🤖 Prompt for AI Agents
Verify each finding against the current code and only fix it if needed. In `@plugins/mistral/vision_agents/plugins/mistral/stt.py` around lines 114 - 115, The receive loop in plugins/mistral/vision_agents/plugins/mistral/stt.py currently uses a broad "except Exception:" which hides programming errors; change that catch to explicitly handle only the concrete runtime/network/SDK errors (for example: the SDK's realtime transcription error class like RealtimeTranscriptionError or the Mistral SDK error type, websocket/connection exceptions such as websockets.exceptions.ConnectionClosed or any ConnectionClosed from the transport, and asyncio.IncompleteReadError), import those exception classes at the top, log them with logger.exception("Error in Mistral receive loop: %s", exc) and re-raise any other unexpected exceptions so they surface instead of being swallowed; keep the rest of the receive loop logic unchanged and ensure the catch references the actual exception types used by the mistralai/websocket/asyncio stack.agents-core/vision_agents/core/agents/inference/transcribing_flow.py (2)
285-291: Nit: docstring typos ("receivng", "avaialble") and dangling sentence.
process_stt_output's docstring has two typos and a sentence that ends mid‑thought ("This task reads" in the outer docstring onprocess_turn_detectiontoo — line 257). Quick polish.🪶 A Plath-clean sweep of the margins
- - The LLM turns are confirmed immediately after receivng TurnEnded + - The LLM turns are confirmed immediately after receiving TurnEnded regardless of their transcript. - They may be received long before the final transcript is avaialble. + They may be received long before the final transcript is available.🤖 Prompt for AI Agents
Verify each finding against the current code and only fix it if needed. In `@agents-core/vision_agents/core/agents/inference/transcribing_flow.py` around lines 285 - 291, Fix typos and incomplete sentences in the docstrings: in process_stt_output correct "receivng" -> "receiving" and "avaialble" -> "available", and edit the dangling sentence in process_turn_detection (the "This task reads" line) to be a complete, clear sentence explaining what the task reads and returns; update any nearby wording (e.g., "LLM turns are confirmed immediately after receiving TurnEnded") for clarity while keeping the original meaning.
443-457: Minor: empty/whitespace deltas still emitTTSInputand dirty the conversation.When
item.deltais empty (or sanitizes to""), the code still callsconversation.upsert_message(content="")andtts_input.send(TTSInput(text="", delta=True)). Streaming TTS tokenizers tolerate this, but the conversation store is being upserted with empty content per empty delta, and any downstream consumer counting TTS inputs will see noise. A cheap skip keeps the pipeline tidy.🪶 Empty deltas — small white rooms — best left unopened
if isinstance(item, LLMResponseDelta): # Process the delta response text = sanitize_text(item.delta or "") + if not text: + continue logger.debug(f"🤖 [LLM response delta]: {text}")🤖 Prompt for AI Agents
Verify each finding against the current code and only fix it if needed. In `@agents-core/vision_agents/core/agents/inference/transcribing_flow.py` around lines 443 - 457, The LLMResponseDelta handling currently upserts empty deltas and sends empty TTS inputs; modify the block that processes LLMResponseDelta (use sanitize_text(item.delta or "")) to skip further work when the sanitized text is empty or only whitespace: do not call self._conversation.upsert_message(...) nor tts_input.send(TTSInput(...)) if text is empty. Keep references to LLMResponseDelta, sanitize_text, self._conversation.upsert_message, tts_input.send, and TTSInput so the check is added immediately after computing text and before any upsert/send calls.
ℹ️ Review info
⚙️ Run configuration
Configuration used: Path: .coderabbit.yaml
Review profile: CHILL
Plan: Pro
Run ID: 0b3a179f-9ab3-43e0-86c1-10f2aad2d41e
📒 Files selected for processing (67)
agents-core/vision_agents/core/agents/inference/__init__.pyagents-core/vision_agents/core/agents/inference/audio.pyagents-core/vision_agents/core/agents/inference/base.pyagents-core/vision_agents/core/agents/inference/llm_turn.pyagents-core/vision_agents/core/agents/inference/transcribing_flow.pyagents-core/vision_agents/core/agents/transcript/buffer.pyagents-core/vision_agents/core/agents/transcript/store.pyagents-core/vision_agents/core/stt/__init__.pyagents-core/vision_agents/core/stt/stt.pyagents-core/vision_agents/core/tts/manual_test.pyagents-core/vision_agents/core/tts/testing.pyagents-core/vision_agents/core/tts/tts.pyagents-core/vision_agents/core/turn_detection/__init__.pyagents-core/vision_agents/core/turn_detection/turn_detection.pyagents-core/vision_agents/core/utils/exceptions.pyagents-core/vision_agents/core/utils/stream.pyagents-core/vision_agents/core/utils/text.pyagents-core/vision_agents/core/utils/tokenizer.pyconftest.pydocs/ai/instructions/ai-stt.mdplugins/assemblyai/tests/test_assemblyai_stt.pyplugins/assemblyai/vision_agents/plugins/assemblyai/stt.pyplugins/aws/tests/test_tts.pyplugins/cartesia/tests/test_tts.pyplugins/cartesia/vision_agents/plugins/cartesia/tts.pyplugins/deepgram/tests/test_deepgram_stt.pyplugins/deepgram/tests/test_deepgram_stt_close.pyplugins/deepgram/tests/test_stt.pyplugins/deepgram/tests/test_tts.pyplugins/deepgram/vision_agents/plugins/deepgram/deepgram_stt.pyplugins/deepgram/vision_agents/plugins/deepgram/tts.pyplugins/elevenlabs/tests/test_elevenlabs_stt.pyplugins/elevenlabs/tests/test_tts.pyplugins/elevenlabs/tests/test_tts_close.pyplugins/elevenlabs/vision_agents/plugins/elevenlabs/stt.pyplugins/fast_whisper/tests/test_fast_whisper_stt.pyplugins/fast_whisper/vision_agents/plugins/fast_whisper/stt.pyplugins/fish/tests/test_fish_stt.pyplugins/fish/tests/test_fish_tts.pyplugins/fish/tests/test_stt.pyplugins/fish/tests/test_tts.pyplugins/fish/vision_agents/plugins/fish/stt.pyplugins/inworld/tests/test_tts.pyplugins/inworld/vision_agents/plugins/inworld/tts.pyplugins/kokoro/tests/test_tts.pyplugins/kokoro/vision_agents/plugins/kokoro/tts.pyplugins/mistral/tests/test_mistral_stt.pyplugins/mistral/vision_agents/plugins/mistral/stt.pyplugins/openai/tests/test_tts.pyplugins/openai/tests/test_tts_openai.pyplugins/pocket/tests/test_tts.pyplugins/smart_turn/tests/test_smart_turn.pyplugins/smart_turn/vision_agents/plugins/smart_turn/smart_turn_detection.pyplugins/vogent/tests/test_vogent.pyplugins/vogent/vision_agents/plugins/vogent/vogent_turn_detection.pyplugins/wizper/tests/test_wizper_stt.pyplugins/wizper/vision_agents/plugins/wizper/stt.pytests/test_agents/test_inference/__init__.pytests/test_agents/test_inference/stubs.pytests/test_agents/test_inference/test_audio.pytests/test_agents/test_inference/test_llm_turn.pytests/test_agents/test_inference/test_transcribing_flow.pytests/test_agents/test_transcript/test_buffer.pytests/test_agents/test_transcript/test_store.pytests/test_tts_base.pytests/test_utils/test_stream.pytests/test_utils/test_tokenizer.py
💤 Files with no reviewable changes (8)
- plugins/deepgram/tests/test_deepgram_stt_close.py
- plugins/elevenlabs/tests/test_tts_close.py
- plugins/openai/tests/test_tts_openai.py
- plugins/deepgram/tests/test_deepgram_stt.py
- conftest.py
- plugins/fish/tests/test_fish_tts.py
- plugins/fish/tests/test_fish_stt.py
- agents-core/vision_agents/core/tts/testing.py
| @property | ||
| def buffered(self) -> float: | ||
| """Return the amount of seconds of audio pending in the buffer.""" | ||
| seconds = len(self._items) * self._chunk_size_ms / 1000 | ||
| if self._carry is not None: | ||
| seconds += self._carry.duration | ||
| return seconds |
There was a problem hiding this comment.
🧩 Analysis chain
🏁 Script executed:
#!/bin/bash
rg -nP -C3 '\b_items\b' --type=py -g '!**/tests/**'Repository: GetStream/Vision-Agents
Length of output: 4199
🏁 Script executed:
cat -n agents-core/vision_agents/core/agents/inference/audio.py | head -100Repository: GetStream/Vision-Agents
Length of output: 4043
🏁 Script executed:
rg -n "class AudioOutputChunk|class AudioOutputFlush|def send_nowait" agents-core/vision_agents/core/agents/inference/audio.py -A 5Repository: GetStream/Vision-Agents
Length of output: 674
🏁 Script executed:
rg -n "AudioOutputFlush|AudioOutputChunk\(data=None" agents-core/vision_agents/core/agents/inference/ -B 2 -A 2Repository: GetStream/Vision-Agents
Length of output: 2733
buffered miscounts control messages and empty chunks as audio duration.
self._items receives AudioOutputFlush (line 59, 99) and AudioOutputChunk(data=None) (line 56-60) control messages that carry no audio, yet each inflates the reported buffer by 20ms. This causes pacing and back-pressure logic to drift whenever flushes or sentinel chunks are queued.
Additionally, empty PcmData chunks created on final utterance (lines 82-91) are counted despite containing zero samples.
🔧 Proposed fix
`@property`
def buffered(self) -> float:
"""Return the amount of seconds of audio pending in the buffer."""
- seconds = len(self._items) * self._chunk_size_ms / 1000
+ audio_items = sum(
+ 1
+ for item in self._items
+ if isinstance(item, AudioOutputChunk) and item.data is not None
+ )
+ seconds = audio_items * self._chunk_size_ms / 1000
if self._carry is not None:
seconds += self._carry.duration
return secondsNote: This filters control messages but may still count empty PcmData chunks with final=True. Consider adding a check for non-empty sample data if precision is required.
🤖 Prompt for AI Agents
Verify each finding against the current code and only fix it if needed.
In `@agents-core/vision_agents/core/agents/inference/audio.py` around lines 39 -
45, The buffered property currently multiplies len(self._items) by
_chunk_size_ms which counts control messages and empty chunks as audio; change
buffered to iterate self._items and only count items that are actual audio
frames (e.g., instances of PcmData or AudioOutputChunk whose .data is not None
and has non-zero samples/length) and sum their durations (use _chunk_size_ms per
non-empty chunk), and also only add self._carry.duration if _carry is an audio
object with positive duration; ignore AudioOutputFlush and
AudioOutputChunk(data=None) sentinel/control messages when computing seconds.
| async def _do_llm_response(self, llm: LLM): | ||
| with tracer.start_as_current_span("simple_response") as span: | ||
| async for item in llm.simple_response(self.transcript, self.participant): | ||
| await self.stream.send(item) | ||
| if isinstance(item, LLMResponseFinal): | ||
| self.stream.close() | ||
|
|
||
| span.set_attribute("text", self.transcript) | ||
|
|
||
| async def _do_finalize(self, output: Stream[LLMResponseDelta | LLMResponseFinal]): | ||
| """ | ||
| The LLM turn is confirmed and final, we can now read the LLM response chunks | ||
| and send them to the output. | ||
|
|
||
| Args: | ||
| output: output stream for LLM response chunks | ||
|
|
||
| """ | ||
| async for item in self.stream: | ||
| await output.send(item) |
There was a problem hiding this comment.
Finalizer can hang forever — stream isn't guaranteed to close.
_do_llm_response only calls self.stream.close() inside the if isinstance(item, LLMResponseFinal) branch. Two realistic paths skip it entirely:
- The LLM iterator finishes without ever yielding an
LLMResponseFinal(early return, empty response, connection-level end-of-stream, etc.). llm.simple_response(...)raises — the exception aborts the coroutine, the stream stays open, and the caller's_do_finalizeasync foriterates on an open-and-empty stream forever.
Additionally, the loop does not break after emitting LLMResponseFinal, so if the LLM ever yields anything after the final, await self.stream.send(item) will raise StreamClosed into the task.
Wrap the producer in try/finally and break on final:
🔧 Proposed fix
async def _do_llm_response(self, llm: LLM):
with tracer.start_as_current_span("simple_response") as span:
- async for item in llm.simple_response(self.transcript, self.participant):
- await self.stream.send(item)
- if isinstance(item, LLMResponseFinal):
- self.stream.close()
-
- span.set_attribute("text", self.transcript)
+ span.set_attribute("text", self.transcript)
+ try:
+ async for item in llm.simple_response(self.transcript, self.participant):
+ await self.stream.send(item)
+ if isinstance(item, LLMResponseFinal):
+ break
+ finally:
+ self.stream.close()🤖 Prompt for AI Agents
Verify each finding against the current code and only fix it if needed.
In `@agents-core/vision_agents/core/agents/inference/llm_turn.py` around lines 100
- 119, _do_llm_response can leave self.stream open (causing _do_finalize to
hang) and can raise StreamClosed if items are yielded after an LLMResponseFinal;
wrap the producer loop in a try/finally so self.stream.close() is always called,
and break the loop immediately after sending an LLMResponseFinal to avoid
sending further items; specifically modify _do_llm_response (which iterates over
llm.simple_response(self.transcript, self.participant) and calls
self.stream.send/close) to ensure close() in the finally block and to break when
encountering LLMResponseFinal so _do_finalize (which async for's self.stream)
cannot hang or receive items after final.
| @dataclass | ||
| class TranscriptResponse: | ||
| confidence: float | None = None | ||
| language: str | None = None | ||
| processing_time_ms: float | None = None | ||
| audio_duration_ms: float | None = None | ||
| model_name: str | None = None | ||
| other: dict | None = None | ||
|
|
||
|
|
||
| @dataclass | ||
| class Transcript: | ||
| """Event emitted when a complete transcript is available.""" | ||
|
|
||
| participant: Participant | ||
| mode: TranscriptMode | ||
| text: str = "" | ||
| confidence: float | None = None | ||
| language: str | None = None | ||
| processing_time_ms: float | None = None | ||
| audio_duration_ms: float | None = None | ||
| model_name: str | None = None | ||
| response: TranscriptResponse = field(default_factory=TranscriptResponse) | ||
|
|
||
| def __post_init__(self): | ||
| if not self.text: | ||
| raise ValueError("Transcript text cannot be empty") | ||
|
|
||
| @property | ||
| def final(self) -> bool: | ||
| return self.mode == "final" | ||
|
|
There was a problem hiding this comment.
Transcript duplicates metadata fields with TranscriptResponse — pick one home.
Transcript declares confidence, language, processing_time_ms, audio_duration_ms, model_name at the top level, and then embeds a TranscriptResponse that has the same five fields. Across the plugin changes in this PR (fish, elevenlabs, etc.) only response=TranscriptResponse(...) is populated; the top-level fields are left at None. Consumers now have two places to look and the two can legitimately disagree — a recipe for subtle bugs downstream in TranscribingInferenceFlow and the agent layer.
Decide which surface is canonical and drop the other (or make the top-level fields computed @property delegates to response).
Separately: text: str = "" is accepted as a default by the dataclass but then rejected by __post_init__, so the default is unusable. Either drop the default or loosen the validation to only reject when mode == "final" (partial/replacement transcripts with empty text can be legitimate in some providers).
♻️ Proposed consolidation (keep `TranscriptResponse` as the single source of truth)
`@dataclass`
class Transcript:
"""Event emitted when a complete transcript is available."""
participant: Participant
mode: TranscriptMode
- text: str = ""
- confidence: float | None = None
- language: str | None = None
- processing_time_ms: float | None = None
- audio_duration_ms: float | None = None
- model_name: str | None = None
+ text: str
response: TranscriptResponse = field(default_factory=TranscriptResponse)
def __post_init__(self):
- if not self.text:
+ if self.mode == "final" and not self.text:
raise ValueError("Transcript text cannot be empty")
+
+ `@property`
+ def confidence(self) -> float | None:
+ return self.response.confidence
+
+ # … similar delegates for language/processing_time_ms/audio_duration_ms/model_name🤖 Prompt for AI Agents
Verify each finding against the current code and only fix it if needed.
In `@agents-core/vision_agents/core/stt/stt.py` around lines 17 - 48, The
Transcript dataclass duplicates metadata fields that already exist on
TranscriptResponse; consolidate by making TranscriptResponse the single source
of truth: remove confidence, language, processing_time_ms, audio_duration_ms,
and model_name from Transcript and update any consumers to read these values
from transcript.response; alternatively implement property delegates on
Transcript (e.g., def confidence(self): return self.response.confidence) if
backward compatibility is required. Also fix the unusable text default by either
removing the default from Transcript.text or change __post_init__ to only raise
ValueError when mode == "final" and text is empty so partial transcripts are
allowed; update references to Transcript (and TranscribingInferenceFlow
consumers) to read metadata from response or the new properties.
| except BaseException as exc: | ||
| logger.exception(message) | ||
| # Always propagate non-targeted BaseExceptions | ||
| # (includes CancelledError, KeyboardInterrupt, SystemExit, etc.) | ||
| if not isinstance(exc, exceptions) or reraise: |
There was a problem hiding this comment.
🧩 Analysis chain
🏁 Script executed:
cat -n agents-core/vision_agents/core/utils/exceptions.pyRepository: GetStream/Vision-Agents
Length of output: 1274
Don't log non-targeted BaseExceptions.
Catching BaseException and logging all exceptions before filtering causes normal task cancellations (e.g., CancelledError, KeyboardInterrupt) to be logged with tracebacks, making them appear as application failures when they should pass through silently.
Proposed fix
- except BaseException as exc:
+ except exceptions:
logger.exception(message)
- # Always propagate non-targeted BaseExceptions
- # (includes CancelledError, KeyboardInterrupt, SystemExit, etc.)
- if not isinstance(exc, exceptions) or reraise:
+ if reraise:
raiseThis aligns with the guideline to catch specific exceptions instead of broad exception types.
🤖 Prompt for AI Agents
Verify each finding against the current code and only fix it if needed.
In `@agents-core/vision_agents/core/utils/exceptions.py` around lines 26 - 30, The
current except BaseException block logs every exception before checking whether
it’s a targeted exception, causing task cancellations (e.g., CancelledError,
KeyboardInterrupt) to be logged; update the handler so you first check the
exception using the existing isinstance(exc, exceptions) or reraise condition
and immediately re-raise non-targeted exceptions, and only call
logger.exception(message) (or otherwise log) for exceptions that are in the
targeted exceptions set (i.e., when isinstance(exc, exceptions) and not
reraise); reference the except BaseException as exc clause, the
logger.exception(message) call, and the exceptions / reraise variables when
making this change.
| def clear(self) -> None: | ||
| """ | ||
| Empty the stream but allow the running iterators to keep going. | ||
| """ | ||
| self._items.clear() | ||
| self._wakeup_next_sender() |
There was a problem hiding this comment.
Wake enough blocked senders after clear().
For bounded streams with maxsize > 1, clearing a full buffer frees multiple slots but only wakes one blocked send(). The remaining senders can stay suspended despite available capacity.
Proposed fix
def clear(self) -> None:
"""
Empty the stream but allow the running iterators to keep going.
"""
self._items.clear()
- self._wakeup_next_sender()
+ wakeups = self._maxsize if self._maxsize > 0 else len(self._senders)
+ for _ in range(wakeups):
+ self._wakeup_next_sender()🤖 Prompt for AI Agents
Verify each finding against the current code and only fix it if needed.
In `@agents-core/vision_agents/core/utils/stream.py` around lines 54 - 59, The
clear() method only calls _wakeup_next_sender(), which wakes a single blocked
sender; update clear() (in stream.py) to wake as many blocked senders as there
are newly freed slots: after self._items.clear(), loop while self.maxsize == 0
or len(self._items) < self.maxsize and there are entries in self._send_waiters,
calling _wakeup_next_sender() for each iteration (or otherwise drain/wake send
waiters until capacity is filled or no waiters remain) so all eligible send()
waiters resume; use the existing _send_waiters deque/list and preserve existing
locking semantics around _wakeup_next_sender().
| def flush(self) -> str: | ||
| """ | ||
| Empty the buffer and return the accumulated text. | ||
| """ | ||
| text = self._buffer.strip() | ||
| self._buffer = "" | ||
| return text |
There was a problem hiding this comment.
Sanitize the final flushed TTS chunk too.
flush() emits the last buffered text to TTS, so markdown in an unterminated final response currently bypasses sanitize_text() and can be spoken literally.
Proposed fix
def flush(self) -> str:
"""
Empty the buffer and return the accumulated text.
"""
- text = self._buffer.strip()
+ text = sanitize_text(self._buffer.strip())
self._buffer = ""
return text📝 Committable suggestion
‼️ IMPORTANT
Carefully review the code before committing. Ensure that it accurately replaces the highlighted code, contains no missing lines, and has no issues with indentation. Thoroughly test & benchmark the code to ensure it meets the requirements.
| def flush(self) -> str: | |
| """ | |
| Empty the buffer and return the accumulated text. | |
| """ | |
| text = self._buffer.strip() | |
| self._buffer = "" | |
| return text | |
| def flush(self) -> str: | |
| """ | |
| Empty the buffer and return the accumulated text. | |
| """ | |
| text = sanitize_text(self._buffer.strip()) | |
| self._buffer = "" | |
| return text |
🤖 Prompt for AI Agents
Verify each finding against the current code and only fix it if needed.
In `@agents-core/vision_agents/core/utils/tokenizer.py` around lines 42 - 48, The
flush() method returns the final buffered string without sanitization, allowing
markdown/unsafe content to be emitted; update flush in tokenizer.py to call
sanitize_text on the accumulated text (e.g., text =
sanitize_text(self._buffer.strip())) before clearing self._buffer and returning
it, referencing the existing sanitize_text function and the flush method/_buffer
field to locate the change.
Stream is a sort of queue with __aiter__ which can be cleared or closed. Clearing the Stream keeps iterators running but drops the queued data. Closing it signals the running iterators to stop.
…ripts without deleting them
…ripts without deleting them
a8a1150 to
e00ebff
Compare
TBD
Summary by CodeRabbit
Release Notes
New Features
API Changes
Improvements